Skip to content

events: notifications live update (cherry-pick #18980 to version-2025.12)#18990

Merged
BeryJu merged 1 commit intoversion-2025.12from
cherry-pick/18980-to-version-2025.12
Dec 24, 2025
Merged

events: notifications live update (cherry-pick #18980 to version-2025.12)#18990
BeryJu merged 1 commit intoversion-2025.12from
cherry-pick/18980-to-version-2025.12

Conversation

@authentik-automation
Copy link
Contributor

Cherry-pick of #18980 to version-2025.12 branch.

Original PR: #18980
Original Author: @BeryJu
Cherry-picked commit: 162e05f

@netlify
Copy link

netlify bot commented Dec 21, 2025

Deploy Preview for authentik-integrations ready!

Name Link
🔨 Latest commit db3244a
🔍 Latest deploy log https://app.netlify.com/projects/authentik-integrations/deploys/694be2ba33c28400086d2b8e
😎 Deploy Preview https://deploy-preview-18990--authentik-integrations.netlify.app
📱 Preview on mobile
Toggle QR Code...

QR Code

Use your smartphone camera to open QR code link.

To edit notification comments on pull requests, go to your Netlify project configuration.

@netlify
Copy link

netlify bot commented Dec 21, 2025

Deploy Preview for authentik-docs ready!

Name Link
🔨 Latest commit db3244a
🔍 Latest deploy log https://app.netlify.com/projects/authentik-docs/deploys/694be2bab0222a00085f1306
😎 Deploy Preview https://deploy-preview-18990--authentik-docs.netlify.app
📱 Preview on mobile
Toggle QR Code...

QR Code

Use your smartphone camera to open QR code link.

To edit notification comments on pull requests, go to your Netlify project configuration.

@codecov
Copy link

codecov bot commented Dec 21, 2025

❌ 10 Tests Failed:

Tests completed Failed Passed Skipped
2859 10 2849 2
View the top 3 failed test(s) by shortest run time
tests.e2e.test_provider_proxy_forward.TestProviderProxyForward::test_caddy
Stack Traces | 7.74s run time
self = <docker.api.client.APIClient object at 0x7fc2e88dcb90>
response = <Response [409]>

    def _raise_for_status(self, response):
        """Raises stored :class:`APIError`, if one occurred."""
        try:
>           response.raise_for_status()

.venv/lib/python3.13.../docker/api/client.py:275: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <Response [409]>

    def raise_for_status(self):
        """Raises :class:`HTTPError`, if one occurred."""
    
        http_error_msg = ""
        if isinstance(self.reason, bytes):
            # We attempt to decode utf-8 first because some servers
            # choose to localize their reason strings. If the string
            # isn't utf-8, we fall back to iso-8859-1 for all other
            # encodings. (See PR #3538)
            try:
                reason = self.reason.decode("utf-8")
            except UnicodeDecodeError:
                reason = self.reason.decode("iso-8859-1")
        else:
            reason = self.reason
    
        if 400 <= self.status_code < 500:
            http_error_msg = (
                f"{self.status_code} Client Error: {reason} for url: {self.url}"
            )
    
        elif 500 <= self.status_code < 600:
            http_error_msg = (
                f"{self.status_code} Server Error: {reason} for url: {self.url}"
            )
    
        if http_error_msg:
>           raise HTTPError(http_error_msg, response=self)
E           requests.exceptions.HTTPError: 409 Client Error: Conflict for url: http+docker:.../localhost/v1.48/containers/create?name=ak-test-outpost

.venv/lib/python3.13.../site-packages/requests/models.py:1026: HTTPError

The above exception was the direct cause of the following exception:

self = <unittest.case._Outcome object at 0x7fc2e93b4050>
test_case = <tests.e2e.test_provider_proxy_forward.TestProviderProxyForward testMethod=test_caddy>
subTest = False

    @contextlib.contextmanager
    def testPartExecutor(self, test_case, subTest=False):
        old_success = self.success
        self.success = True
        try:
>           yield

.../hostedtoolcache/Python/3.13.11........./x64/lib/python3.13/unittest/case.py:58: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_proxy_forward.TestProviderProxyForward testMethod=test_caddy>
result = <TestCaseFunction test_caddy>

    def run(self, result=None):
        if result is None:
            result = self.defaultTestResult()
            startTestRun = getattr(result, 'startTestRun', None)
            stopTestRun = getattr(result, 'stopTestRun', None)
            if startTestRun is not None:
                startTestRun()
        else:
            stopTestRun = None
    
        result.startTest(self)
        try:
            testMethod = getattr(self, self._testMethodName)
            if (getattr(self.__class__, "__unittest_skip__", False) or
                getattr(testMethod, "__unittest_skip__", False)):
                # If the class or method was skipped.
                skip_why = (getattr(self.__class__, '__unittest_skip_why__', '')
                            or getattr(testMethod, '__unittest_skip_why__', ''))
                _addSkip(result, self, skip_why)
                return result
    
            expecting_failure = (
                getattr(self, "__unittest_expecting_failure__", False) or
                getattr(testMethod, "__unittest_expecting_failure__", False)
            )
            outcome = _Outcome(result)
            start_time = time.perf_counter()
            try:
                self._outcome = outcome
    
                with outcome.testPartExecutor(self):
                    self._callSetUp()
                if outcome.success:
                    outcome.expecting_failure = expecting_failure
                    with outcome.testPartExecutor(self):
>                       self._callTestMethod(testMethod)

.../hostedtoolcache/Python/3.13.11........./x64/lib/python3.13/unittest/case.py:651: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_proxy_forward.TestProviderProxyForward testMethod=test_caddy>
method = <bound method TestProviderProxyForward.test_caddy of <tests.e2e.test_provider_proxy_forward.TestProviderProxyForward testMethod=test_caddy>>

    def _callTestMethod(self, method):
>       if method() is not None:

.../hostedtoolcache/Python/3.13.11........./x64/lib/python3.13/unittest/case.py:606: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_proxy_forward.TestProviderProxyForward testMethod=test_caddy>
args = (), kwargs = {}

    @wraps(func)
    def wrapper(self: TransactionTestCase, *args, **kwargs):
        """Run test again if we're below max_retries, including tearDown and
        setUp. Otherwise raise the error"""
        nonlocal count
        try:
>           return func(self, *args, **kwargs)

tests/e2e/utils.py:492: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_proxy_forward.TestProviderProxyForward testMethod=test_caddy>

    @retry()
    def test_caddy(self):
        """Test caddy"""
        local_config_path = (
            Path(__file__).parent / "proxy_forward_auth" / "caddy_single" / "Caddyfile"
        )
        self.run_container(
            image="docker.io/library/caddy:2.8",
            ports={
                "80": "80",
            },
            volumes={
                local_config_path: {
                    "bind": ".../etc/caddy/Caddyfile",
                }
            },
        )
    
>       self.prepare()

tests/e2e/test_provider_proxy_forward.py:217: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

args = (<tests.e2e.test_provider_proxy_forward.TestProviderProxyForward testMethod=test_caddy>,)
kwargs = {}, file = 'default/flow-default-invalidation-flow.yaml'
content = 'version: 1\nmetadata:\n  name: Default - Invalidation flow\nentries:\n- attrs:\n    designation: invalidation\n    na...0\n    stage: !KeyOf default-invalidation-logout\n    target: !KeyOf flow\n  model: authentik_flows.flowstagebinding\n'

    @wraps(func)
    def wrapper(*args, **kwargs):
        for file in files:
            content = BlueprintInstance(path=file).retrieve()
            Importer.from_string(content).apply()
>       return func(*args, **kwargs)

.../blueprints/tests/__init__.py:25: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

args = (<tests.e2e.test_provider_proxy_forward.TestProviderProxyForward testMethod=test_caddy>,)
kwargs = {}, file = 'default/flow-default-provider-invalidation.yaml'
content = 'version: 1\nmetadata:\n  name: Default - Provider invalidation flow\nentries:\n- attrs:\n    designation: invalidatio...ation: none\n  identifiers:\n    slug: default-provider-invalidation-flow\n  model: authentik_flows.flow\n  id: flow\n'

    @wraps(func)
    def wrapper(*args, **kwargs):
        for file in files:
            content = BlueprintInstance(path=file).retrieve()
            Importer.from_string(content).apply()
>       return func(*args, **kwargs)

.../blueprints/tests/__init__.py:25: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

args = (<tests.e2e.test_provider_proxy_forward.TestProviderProxyForward testMethod=test_caddy>,)
kwargs = {}, file = 'system/providers-proxy.yaml'
content = 'version: 1\nmetadata:\n  labels:\n    blueprints.goauthentik.io/system: "true"\n  name: System - Proxy Provider - Sco...ser.group_attributes(request),\n                "is_superuser": request.user.is_superuser,\n            }\n        }\n'

    @wraps(func)
    def wrapper(*args, **kwargs):
        for file in files:
            content = BlueprintInstance(path=file).retrieve()
            Importer.from_string(content).apply()
>       return func(*args, **kwargs)

.../blueprints/tests/__init__.py:25: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

args = (<tests.e2e.test_provider_proxy_forward.TestProviderProxyForward testMethod=test_caddy>,)
kwargs = {}, config = <AuthentikCryptoConfig: authentik_crypto>

    @wraps(func)
    def wrapper(*args, **kwargs):
        config = apps.get_app_config(app_name)
        if isinstance(config, ManagedAppConfig):
            config._on_startup_callback(None)
>       return func(*args, **kwargs)

.../blueprints/tests/__init__.py:43: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_proxy_forward.TestProviderProxyForward testMethod=test_caddy>

    @apply_blueprint(
        "default/flow-default-authentication-flow.yaml",
        "default/flow-default-invalidation-flow.yaml",
    )
    @apply_blueprint(
        "default/flow-default-provider-authorization-implicit-consent.yaml",
        "default/flow-default-provider-invalidation.yaml",
    )
    @apply_blueprint(
        "system/providers-oauth2.yaml",
        "system/providers-proxy.yaml",
    )
    @reconcile_app("authentik_crypto")
    def prepare(self):
        proxy: ProxyProvider = ProxyProvider.objects.create(
            name=generate_id(),
            mode=ProxyMode.FORWARD_SINGLE,
            authorization_flow=Flow.objects.get(
                slug="default-provider-authorization-implicit-consent"
            ),
            invalidation_flow=Flow.objects.get(slug="default-provider-invalidation-flow"),
            internal_host=f"http://{self.host}",
            external_host="http://localhost",
        )
        # Ensure OAuth2 Params are set
        proxy.set_oauth_defaults()
        proxy.save()
        # we need to create an application to actually access the proxy
        Application.objects.create(name=generate_id(), slug=generate_id(), provider=proxy)
        outpost: Outpost = Outpost.objects.create(
            name=generate_id(),
            type=OutpostType.PROXY,
        )
        outpost.providers.add(proxy)
        outpost.build_user_permissions(outpost.user)
    
>       self.start_outpost(outpost)

tests/e2e/test_provider_proxy_forward.py:78: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_proxy_forward.TestProviderProxyForward testMethod=test_caddy>
outpost = <Outpost: Outpost zQciekzROO6hRrGCyWytmHgDGpuIDnidZdUA3Ky5>

    def start_outpost(self, outpost: Outpost):
        """Start proxy container based on outpost created"""
>       self.run_container(
            image=self.get_container_image("ghcr.io/goauthentik/dev-proxy"),
            ports={
                "9000": "9000",
            },
            environment={
                "AUTHENTIK_TOKEN": outpost.token.key,
            },
            name="ak-test-outpost",
        )

tests/e2e/test_provider_proxy_forward.py:31: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_proxy_forward.TestProviderProxyForward testMethod=test_caddy>
specs = {'detach': True, 'environment': {'AUTHENTIK_HOST': 'http://10.1.0.43:38703', 'AUTHENTIK_TOKEN': 'TwDxtwCWJ7wtr7b4rx036...rry-pick-18980-to-version-2025.12', 'labels': {'io.goauthentik.test': 'yL1RkvpOKuEc2Vau5KRHjWB7jje0mXUeCXksjXWQ'}, ...}

    def run_container(self, **specs: dict[str, Any]) -> Container:
        if "network_mode" not in specs:
            specs["network"] = self.__network.name
        specs["labels"] = self.docker_labels
        specs["detach"] = True
        if hasattr(self, "live_server_url"):
            specs.setdefault("environment", {})
            specs["environment"]["AUTHENTIK_HOST"] = self.live_server_url
>       container = self.docker_client.containers.run(**specs)

tests/e2e/utils.py:122: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <docker.models.containers.ContainerCollection object at 0x7fc2ea8df5c0>
image = 'ghcr.io/goauthentik/dev-proxy:gh-cherry-pick-18980-to-version-2025.12'
command = None, stdout = True, stderr = False, remove = False
kwargs = {'environment': {'AUTHENTIK_HOST': 'http://10.1.0.43:38703', 'AUTHENTIK_TOKEN': 'TwDxtwCWJ7wtr7b4rx036vMfIXWCPMHMcXBi2...e0mXUeCXksjXWQ'}, 'name': 'ak-test-outpost', 'network': 'authentik-test-v9K5Bb591Z7c5FEZg4JVTFC8fD3UtGNfnMZE5qNy', ...}
stream = False, detach = True, platform = None

    def run(self, image, command=None, stdout=True, stderr=False,
            remove=False, **kwargs):
        """
        Run a container. By default, it will wait for the container to finish
        and return its logs, similar to ``docker run``.
    
        If the ``detach`` argument is ``True``, it will start the container
        and immediately return a :py:class:`Container` object, similar to
        ``docker run -d``.
    
        Example:
            Run a container and get its output:
    
            >>> import docker
            >>> client = docker.from_env()
            >>> client.containers.run('alpine', 'echo hello world')
            b'hello world\\n'
    
            Run a container and detach:
    
            >>> container = client.containers.run('bfirsh/reticulate-splines',
                                                  detach=True)
            >>> container.logs()
            'Reticulating spline 1...\\nReticulating spline 2...\\n'
    
        Args:
            image (str): The image to run.
            command (str or list): The command to run in the container.
            auto_remove (bool): enable auto-removal of the container on daemon
                side when the container's process exits.
            blkio_weight_device: Block IO weight (relative device weight) in
                the form of: ``[{"Path": "device_path", "Weight": weight}]``.
            blkio_weight: Block IO weight (relative weight), accepts a weight
                value between 10 and 1000.
            cap_add (list of str): Add kernel capabilities. For example,
                ``["SYS_ADMIN", "MKNOD"]``.
            cap_drop (list of str): Drop kernel capabilities.
            cgroup_parent (str): Override the default parent cgroup.
            cgroupns (str): Override the default cgroup namespace mode for the
                container. One of:
                - ``private`` the container runs in its own private cgroup
                  namespace.
                - ``host`` use the host system's cgroup namespace.
            cpu_count (int): Number of usable CPUs (Windows only).
            cpu_percent (int): Usable percentage of the available CPUs
                (Windows only).
            cpu_period (int): The length of a CPU period in microseconds.
            cpu_quota (int): Microseconds of CPU time that the container can
                get in a CPU period.
            cpu_rt_period (int): Limit CPU real-time period in microseconds.
            cpu_rt_runtime (int): Limit CPU real-time runtime in microseconds.
            cpu_shares (int): CPU shares (relative weight).
            cpuset_cpus (str): CPUs in which to allow execution (``0-3``,
                ``0,1``).
            cpuset_mems (str): Memory nodes (MEMs) in which to allow execution
                (``0-3``, ``0,1``). Only effective on NUMA systems.
            detach (bool): Run container in the background and return a
                :py:class:`Container` object.
            device_cgroup_rules (:py:class:`list`): A list of cgroup rules to
                apply to the container.
            device_read_bps: Limit read rate (bytes per second) from a device
                in the form of: `[{"Path": "device_path", "Rate": rate}]`
            device_read_iops: Limit read rate (IO per second) from a device.
            device_write_bps: Limit write rate (bytes per second) from a
                device.
            device_write_iops: Limit write rate (IO per second) from a device.
            devices (:py:class:`list`): Expose host devices to the container,
                as a list of strings in the form
                ``<path_on_host>:<path_in_container>:<cgroup_permissions>``.
    
                For example, ``/dev/sda:/dev/xvda:rwm`` allows the container
                to have read-write access to the host's ``/dev/sda`` via a
                node named ``/dev/xvda`` inside the container.
            device_requests (:py:class:`list`): Expose host resources such as
                GPUs to the container, as a list of
                :py:class:`docker.types.DeviceRequest` instances.
            dns (:py:class:`list`): Set custom DNS servers.
            dns_opt (:py:class:`list`): Additional options to be added to the
                container's ``resolv.conf`` file.
            dns_search (:py:class:`list`): DNS search domains.
            domainname (str or list): Set custom DNS search domains.
            entrypoint (str or list): The entrypoint for the container.
            environment (dict or list): Environment variables to set inside
                the container, as a dictionary or a list of strings in the
                format ``["SOMEVARIABLE=xxx"]``.
            extra_hosts (dict): Additional hostnames to resolve inside the
                container, as a mapping of hostname to IP address.
            group_add (:py:class:`list`): List of additional group names and/or
                IDs that the container process will run as.
            healthcheck (dict): Specify a test to perform to check that the
                container is healthy. The dict takes the following keys:
    
                - test (:py:class:`list` or str): Test to perform to determine
                    container health. Possible values:
    
                    - Empty list: Inherit healthcheck from parent image
                    - ``["NONE"]``: Disable healthcheck
                    - ``["CMD", args...]``: exec arguments directly.
                    - ``["CMD-SHELL", command]``: Run command in the system's
                      default shell.
    
                    If a string is provided, it will be used as a ``CMD-SHELL``
                    command.
                - interval (int): The time to wait between checks in
                  nanoseconds. It should be 0 or at least 1000000 (1 ms).
                - timeout (int): The time to wait before considering the check
                  to have hung. It should be 0 or at least 1000000 (1 ms).
                - retries (int): The number of consecutive failures needed to
                    consider a container as unhealthy.
                - start_period (int): Start period for the container to
                    initialize before starting health-retries countdown in
                    nanoseconds. It should be 0 or at least 1000000 (1 ms).
            hostname (str): Optional hostname for the container.
            init (bool): Run an init inside the container that forwards
                signals and reaps processes
            init_path (str): Path to the docker-init binary
            ipc_mode (str): Set the IPC mode for the container.
            isolation (str): Isolation technology to use. Default: `None`.
            kernel_memory (int or str): Kernel memory limit
            labels (dict or list): A dictionary of name-value labels (e.g.
                ``{"label1": "value1", "label2": "value2"}``) or a list of
                names of labels to set with empty values (e.g.
                ``["label1", "label2"]``)
            links (dict): Mapping of links using the
                ``{'container': 'alias'}`` format. The alias is optional.
                Containers declared in this dict will be linked to the new
                container using the provided alias. Default: ``None``.
            log_config (LogConfig): Logging configuration.
            lxc_conf (dict): LXC config.
            mac_address (str): MAC address to assign to the container.
            mem_limit (int or str): Memory limit. Accepts float values
                (which represent the memory limit of the created container in
                bytes) or a string with a units identification char
                (``100000b``, ``1000k``, ``128m``, ``1g``). If a string is
                specified without a units character, bytes are assumed as an
                intended unit.
            mem_reservation (int or str): Memory soft limit.
            mem_swappiness (int): Tune a container's memory swappiness
                behavior. Accepts number between 0 and 100.
            memswap_limit (str or int): Maximum amount of memory + swap a
                container is allowed to consume.
            mounts (:py:class:`list`): Specification for mounts to be added to
                the container. More powerful alternative to ``volumes``. Each
                item in the list is expected to be a
                :py:class:`docker.types.Mount` object.
            name (str): The name for this container.
            nano_cpus (int):  CPU quota in units of 1e-9 CPUs.
            network (str): Name of the network this container will be connected
                to at creation time. You can connect to additional networks
                using :py:meth:`Network.connect`. Incompatible with
                ``network_mode``.
            network_disabled (bool): Disable networking.
            network_mode (str): One of:
    
                - ``bridge`` Create a new network stack for the container on
                  the bridge network.
                - ``none`` No networking for this container.
                - ``container:<name|id>`` Reuse another container's network
                  stack.
                - ``host`` Use the host network stack.
                  This mode is incompatible with ``ports``.
    
                Incompatible with ``network``.
            networking_config (Dict[str, EndpointConfig]):
                Dictionary of EndpointConfig objects for each container network.
                The key is the name of the network.
                Defaults to ``None``.
    
                Used in conjuction with ``network``.
    
                Incompatible with ``network_mode``.
            oom_kill_disable (bool): Whether to disable OOM killer.
            oom_score_adj (int): An integer value containing the score given
                to the container in order to tune OOM killer preferences.
            pid_mode (str): If set to ``host``, use the host PID namespace
                inside the container.
            pids_limit (int): Tune a container's pids limit. Set ``-1`` for
                unlimited.
            platform (str): Platform in the format ``os[/arch[/variant]]``.
                Only used if the method needs to pull the requested image.
            ports (dict): Ports to bind inside the container.
    
                The keys of the dictionary are the ports to bind inside the
                container, either as an integer or a string in the form
                ``port/protocol``, where the protocol is either ``tcp``,
                ``udp``, or ``sctp``.
    
                The values of the dictionary are the corresponding ports to
                open on the host, which can be either:
    
                - The port number, as an integer. For example,
                  ``{'2222/tcp': 3333}`` will expose port 2222 inside the
                  container as port 3333 on the host.
                - ``None``, to assign a random host port. For example,
                  ``{'2222/tcp': None}``.
                - A tuple of ``(address, port)`` if you want to specify the
                  host interface. For example,
                  ``{'1111/tcp': ('127.0.0.1', 1111)}``.
                - A list of integers, if you want to bind multiple host ports
                  to a single container port. For example,
                  ``{'1111/tcp': [1234, 4567]}``.
    
                Incompatible with ``host`` network mode.
            privileged (bool): Give extended privileges to this container.
            publish_all_ports (bool): Publish all ports to the host.
            read_only (bool): Mount the container's root filesystem as read
                only.
            remove (bool): Remove the container when it has finished running.
                Default: ``False``.
            restart_policy (dict): Restart the container when it exits.
                Configured as a dictionary with keys:
    
                - ``Name`` One of ``on-failure``, or ``always``.
                - ``MaximumRetryCount`` Number of times to restart the
                  container on failure.
    
                For example:
                ``{"Name": "on-failure", "MaximumRetryCount": 5}``
    
            runtime (str): Runtime to use with this container.
            security_opt (:py:class:`list`): A list of string values to
                customize labels for MLS systems, such as SELinux.
            shm_size (str or int): Size of /dev/shm (e.g. ``1G``).
            stdin_open (bool): Keep ``STDIN`` open even if not attached.
            stdout (bool): Return logs from ``STDOUT`` when ``detach=False``.
                Default: ``True``.
            stderr (bool): Return logs from ``STDERR`` when ``detach=False``.
                Default: ``False``.
            stop_signal (str): The stop signal to use to stop the container
                (e.g. ``SIGINT``).
            storage_opt (dict): Storage driver options per container as a
                key-value mapping.
            stream (bool): If true and ``detach`` is false, return a log
                generator instead of a string. Ignored if ``detach`` is true.
                Default: ``False``.
            sysctls (dict): Kernel parameters to set in the container.
            tmpfs (dict): Temporary filesystems to mount, as a dictionary
                mapping a path inside the container to options for that path.
    
                For example:
    
                .. code-block:: python
    
                    {
                        '/mnt/vol2': '',
                        '/mnt/vol1': 'size=3G,uid=1000'
                    }
    
            tty (bool): Allocate a pseudo-TTY.
            ulimits (:py:class:`list`): Ulimits to set inside the container,
                as a list of :py:class:`docker.types.Ulimit` instances.
            use_config_proxy (bool): If ``True``, and if the docker client
                configuration file (``~/.docker/config.json`` by default)
                contains a proxy configuration, the corresponding environment
                variables will be set in the container being built.
            user (str or int): Username or UID to run commands as inside the
                container.
            userns_mode (str): Sets the user namespace mode for the container
                when user namespace remapping option is enabled. Supported
                values are: ``host``
            uts_mode (str): Sets the UTS namespace mode for the container.
                Supported values are: ``host``
            version (str): The version of the API to use. Set to ``auto`` to
                automatically detect the server's version. Default: ``1.35``
            volume_driver (str): The name of a volume driver/plugin.
            volumes (dict or list): A dictionary to configure volumes mounted
                inside the container. The key is either the host path or a
                volume name, and the value is a dictionary with the keys:
    
                - ``bind`` The path to mount the volume inside the container
                - ``mode`` Either ``rw`` to mount the volume read/write, or
                  ``ro`` to mount it read-only.
    
                For example:
    
                .. code-block:: python
    
                    {'/home/user1/': {'bind': '/mnt/vol2', 'mode': 'rw'},
                     '/var/www': {'bind': '/mnt/vol1', 'mode': 'ro'}}
    
                Or a list of strings which each one of its elements specifies a
                mount volume.
    
                For example:
    
                .. code-block:: python
    
                    ['/home/user1/:/mnt/vol2','/var/www:/mnt/vol1']
    
            volumes_from (:py:class:`list`): List of container names or IDs to
                get volumes from.
            working_dir (str): Path to the working directory.
    
        Returns:
            The container logs, either ``STDOUT``, ``STDERR``, or both,
            depending on the value of the ``stdout`` and ``stderr`` arguments.
    
            ``STDOUT`` and ``STDERR`` may be read only if either ``json-file``
            or ``journald`` logging driver used. Thus, if you are using none of
            these drivers, a ``None`` object is returned instead. See the
            `Engine API documentation
            <https://docs.docker..../engine/api/v1.30/#operation/ContainerLogs/>`_
            for full details.
    
            If ``detach`` is ``True``, a :py:class:`Container` object is
            returned instead.
    
        Raises:
            :py:class:`docker.errors.ContainerError`
                If the container exits with a non-zero exit code and
                ``detach`` is ``False``.
            :py:class:`docker.errors.ImageNotFound`
                If the specified image does not exist.
            :py:class:`docker.errors.APIError`
                If the server returns an error.
        """
        if isinstance(image, Image):
            image = image.id
        stream = kwargs.pop('stream', False)
        detach = kwargs.pop('detach', False)
        platform = kwargs.get('platform', None)
    
        if detach and remove:
            if version_gte(self.client.api._version, '1.25'):
                kwargs["auto_remove"] = True
            else:
                raise RuntimeError("The options 'detach' and 'remove' cannot "
                                   "be used together in api versions < 1.25.")
    
        if kwargs.get('network') and kwargs.get('network_mode'):
            raise RuntimeError(
                'The options "network" and "network_mode" can not be used '
                'together.'
            )
    
        if kwargs.get('networking_config') and not kwargs.get('network'):
            raise RuntimeError(
                'The option "networking_config" can not be used '
                'without "network".'
            )
    
        try:
>           container = self.create(image=image, command=command,
                                    detach=detach, **kwargs)

.venv/lib/python3.13.../docker/models/containers.py:876: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <docker.models.containers.ContainerCollection object at 0x7fc2ea8df5c0>
image = 'ghcr.io/goauthentik/dev-proxy:gh-cherry-pick-18980-to-version-2025.12'
command = None, kwargs = {}
create_kwargs = {'command': None, 'detach': True, 'environment': {'AUTHENTIK_HOST': 'http://10.1.0.43:38703', 'AUTHENTIK_TOKEN': 'TwDx...st-v9K5Bb591Z7c5FEZg4JVTFC8fD3UtGNfnMZE5qNy', 'PortBindings': {'9000/tcp': [{'HostIp': '', 'HostPort': '9000'}]}}, ...}

    def create(self, image, command=None, **kwargs):
        """
        Create a container without starting it. Similar to ``docker create``.
    
        Takes the same arguments as :py:meth:`run`, except for ``stdout``,
        ``stderr``, and ``remove``.
    
        Returns:
            A :py:class:`Container` object.
    
        Raises:
            :py:class:`docker.errors.ImageNotFound`
                If the specified image does not exist.
            :py:class:`docker.errors.APIError`
                If the server returns an error.
        """
        if isinstance(image, Image):
            image = image.id
        kwargs['image'] = image
        kwargs['command'] = command
        kwargs['version'] = self.client.api._version
        create_kwargs = _create_container_args(kwargs)
>       resp = self.client.api.create_container(**create_kwargs)

.venv/lib/python3.13.../docker/models/containers.py:935: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <docker.api.client.APIClient object at 0x7fc2e88dcb90>
image = 'ghcr.io/goauthentik/dev-proxy:gh-cherry-pick-18980-to-version-2025.12'
command = None, hostname = None, user = None, detach = True, stdin_open = False
tty = False, ports = [('9000', 'tcp')]
environment = ['AUTHENTIK_TOKEN=TwDxtwCWJ7wtr7b4rx036vMfIXWCPMHMcXBi2f32nyGVYo5guJW4TZrAB8Wb', 'AUTHENTIK_HOST=http://10.1.0.43:38703']
volumes = None, network_disabled = False, name = 'ak-test-outpost'
entrypoint = None, working_dir = None, domainname = None
host_config = {'NetworkMode': 'authentik-test-v9K5Bb591Z7c5FEZg4JVTFC8fD3UtGNfnMZE5qNy', 'PortBindings': {'9000/tcp': [{'HostIp': '', 'HostPort': '9000'}]}}
mac_address = None
labels = {'io.goauthentik.test': 'yL1RkvpOKuEc2Vau5KRHjWB7jje0mXUeCXksjXWQ'}
stop_signal = None
networking_config = {'authentik-test-v9K5Bb591Z7c5FEZg4JVTFC8fD3UtGNfnMZE5qNy': None}
healthcheck = None, stop_timeout = None, runtime = None, use_config_proxy = True
platform = None

    def create_container(self, image, command=None, hostname=None, user=None,
                         detach=False, stdin_open=False, tty=False, ports=None,
                         environment=None, volumes=None,
                         network_disabled=False, name=None, entrypoint=None,
                         working_dir=None, domainname=None, host_config=None,
                         mac_address=None, labels=None, stop_signal=None,
                         networking_config=None, healthcheck=None,
                         stop_timeout=None, runtime=None,
                         use_config_proxy=True, platform=None):
        """
        Creates a container. Parameters are similar to those for the ``docker
        run`` command except it doesn't support the attach options (``-a``).
    
        The arguments that are passed directly to this function are
        host-independent configuration options. Host-specific configuration
        is passed with the `host_config` argument. You'll normally want to
        use this method in combination with the :py:meth:`create_host_config`
        method to generate ``host_config``.
    
        **Port bindings**
    
        Port binding is done in two parts: first, provide a list of ports to
        open inside the container with the ``ports`` parameter, then declare
        bindings with the ``host_config`` parameter. For example:
    
        .. code-block:: python
    
            container_id = client.api.create_container(
                'busybox', 'ls', ports=[1111, 2222],
                host_config=client.api.create_host_config(port_bindings={
                    1111: 4567,
                    2222: None
                })
            )
    
    
        You can limit the host address on which the port will be exposed like
        such:
    
        .. code-block:: python
    
            client.api.create_host_config(
                port_bindings={1111: ('127.0.0.1', 4567)}
            )
    
        Or without host port assignment:
    
        .. code-block:: python
    
            client.api.create_host_config(port_bindings={1111: ('127.0.0.1',)})
    
        If you wish to use UDP instead of TCP (default), you need to declare
        ports as such in both the config and host config:
    
        .. code-block:: python
    
            container_id = client.api.create_container(
                'busybox', 'ls', ports=[(1111, 'udp'), 2222],
                host_config=client.api.create_host_config(port_bindings={
                    '1111/udp': 4567, 2222: None
                })
            )
    
        To bind multiple host ports to a single container port, use the
        following syntax:
    
        .. code-block:: python
    
            client.api.create_host_config(port_bindings={
                1111: [1234, 4567]
            })
    
        You can also bind multiple IPs to a single container port:
    
        .. code-block:: python
    
            client.api.create_host_config(port_bindings={
                1111: [
                    ('192.168.0.100', 1234),
                    ('192.168.0.101', 1234)
                ]
            })
    
        **Using volumes**
    
        Volume declaration is done in two parts. Provide a list of
        paths to use as mountpoints inside the container with the
        ``volumes`` parameter, and declare mappings from paths on the host
        in the ``host_config`` section.
    
        .. code-block:: python
    
            container_id = client.api.create_container(
                'busybox', 'ls', volumes=['/mnt/vol1', '/mnt/vol2'],
                host_config=client.api.create_host_config(binds={
                    '/home/user1/': {
                        'bind': '/mnt/vol2',
                        'mode': 'rw',
                    },
                    '/var/www': {
                        'bind': '/mnt/vol1',
                        'mode': 'ro',
                    },
                    '/autofs/user1': {
                        'bind': '/mnt/vol3',
                        'mode': 'rw',
                        'propagation': 'shared'
                    }
                })
            )
    
        You can alternatively specify binds as a list. This code is equivalent
        to the example above:
    
        .. code-block:: python
    
            container_id = client.api.create_container(
                'busybox', 'ls', volumes=['/mnt/vol1', '/mnt/vol2', '/mnt/vol3'],
                host_config=client.api.create_host_config(binds=[
                    '/home/user1/:/mnt/vol2',
                    '/var/www:/mnt/vol1:ro',
                    '/autofs/user1:/mnt/vol3:rw,shared',
                ])
            )
    
        **Networking**
    
        You can specify networks to connect the container to by using the
        ``networking_config`` parameter. At the time of creation, you can
        only connect a container to a single networking, but you
        can create more connections by using
        :py:meth:`~connect_container_to_network`.
    
        For example:
    
        .. code-block:: python
    
            networking_config = client.api.create_networking_config({
                'network1': client.api.create_endpoint_config(
                    ipv4_address='172.28.0.124',
                    aliases=['foo', 'bar'],
                    links=['container2']
                )
            })
    
            ctnr = client.api.create_container(
                img, command, networking_config=networking_config
            )
    
        Args:
            image (str): The image to run
            command (str or list): The command to be run in the container
            hostname (str): Optional hostname for the container
            user (str or int): Username or UID
            detach (bool): Detached mode: run container in the background and
                return container ID
            stdin_open (bool): Keep STDIN open even if not attached
            tty (bool): Allocate a pseudo-TTY
            ports (list of ints): A list of port numbers
            environment (dict or list): A dictionary or a list of strings in
                the following format ``["PASSWORD=xxx"]`` or
                ``{"PASSWORD": "xxx"}``.
            volumes (str or list): List of paths inside the container to use
                as volumes.
            network_disabled (bool): Disable networking
            name (str): A name for the container
            entrypoint (str or list): An entrypoint
            working_dir (str): Path to the working directory
            domainname (str): The domain name to use for the container
            host_config (dict): A dictionary created with
                :py:meth:`create_host_config`.
            mac_address (str): The Mac Address to assign the container
            labels (dict or list): A dictionary of name-value labels (e.g.
                ``{"label1": "value1", "label2": "value2"}``) or a list of
                names of labels to set with empty values (e.g.
                ``["label1", "label2"]``)
            stop_signal (str): The stop signal to use to stop the container
                (e.g. ``SIGINT``).
            stop_timeout (int): Timeout to stop the container, in seconds.
                Default: 10
            networking_config (dict): A networking configuration generated
                by :py:meth:`create_networking_config`.
            runtime (str): Runtime to use with this container.
            healthcheck (dict): Specify a test to perform to check that the
                container is healthy.
            use_config_proxy (bool): If ``True``, and if the docker client
                configuration file (``~/.docker/config.json`` by default)
                contains a proxy configuration, the corresponding environment
                variables will be set in the container being created.
            platform (str): Platform in the format ``os[/arch[/variant]]``.
    
        Returns:
            A dictionary with an image 'Id' key and a 'Warnings' key.
    
        Raises:
            :py:class:`docker.errors.ImageNotFound`
                If the specified image does not exist.
            :py:class:`docker.errors.APIError`
                If the server returns an error.
        """
        if isinstance(volumes, str):
            volumes = [volumes, ]
    
        if isinstance(environment, dict):
            environment = utils.utils.format_environment(environment)
    
        if use_config_proxy:
            environment = self._proxy_configs.inject_proxy_environment(
                environment
            ) or None
    
        config = self.create_container_config(
            image, command, hostname, user, detach, stdin_open, tty,
            ports, environment, volumes,
            network_disabled, entrypoint, working_dir, domainname,
            host_config, mac_address, labels,
            stop_signal, networking_config, healthcheck,
            stop_timeout, runtime
        )
>       return self.create_container_from_config(config, name, platform)

.venv/lib/python3.13.../docker/api/container.py:440: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <docker.api.client.APIClient object at 0x7fc2e88dcb90>
config = {'Hostname': None, 'Domainname': None, 'ExposedPorts': {'9000/tcp': {}}, 'User': None, 'Tty': False, 'OpenStdin': Fals...RkvpOKuEc2Vau5KRHjWB7jje0mXUeCXksjXWQ'}, 'StopSignal': None, 'Healthcheck': None, 'StopTimeout': None, 'Runtime': None}
name = 'ak-test-outpost', platform = None

    def create_container_from_config(self, config, name=None, platform=None):
        u = self._url("/containers/create")
        params = {
            'name': name
        }
        if platform:
            if utils.version_lt(self._version, '1.41'):
                raise errors.InvalidVersion(
                    'platform is not supported for API version < 1.41'
                )
            params['platform'] = platform
        res = self._post_json(u, data=config, params=params)
>       return self._result(res, True)

.venv/lib/python3.13.../docker/api/container.py:457: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <docker.api.client.APIClient object at 0x7fc2e88dcb90>
response = <Response [409]>, json = True, binary = False

    def _result(self, response, json=False, binary=False):
        assert not (json and binary)
>       self._raise_for_status(response)

.venv/lib/python3.13.../docker/api/client.py:281: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <docker.api.client.APIClient object at 0x7fc2e88dcb90>
response = <Response [409]>

    def _raise_for_status(self, response):
        """Raises stored :class:`APIError`, if one occurred."""
        try:
            response.raise_for_status()
        except requests.exceptions.HTTPError as e:
>           raise create_api_error_from_http_exception(e) from e

.venv/lib/python3.13.../docker/api/client.py:277: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

e = HTTPError('409 Client Error: Conflict for url: http+docker:.../localhost/v1.48/containers/create?name=ak-test-outpost')

    def create_api_error_from_http_exception(e):
        """
        Create a suitable APIError from requests.exceptions.HTTPError.
        """
        response = e.response
        try:
            explanation = response.json()['message']
        except ValueError:
            explanation = (response.text or '').strip()
        cls = APIError
        if response.status_code == 404:
            explanation_msg = (explanation or '').lower()
            if any(fragment in explanation_msg
                   for fragment in _image_not_found_explanation_fragments):
                cls = ImageNotFound
            else:
                cls = NotFound
>       raise cls(e, response=response, explanation=explanation) from e
E       docker.errors.APIError: 409 Client Error for http+docker:.../localhost/v1.48/containers/create?name=ak-test-outpost: Conflict ("Conflict. The container name "/ak-test-outpost" is already in use by container "c08f4147bc2957fba4657885a87e5c011e424e4c999fbc6a2d88a93d38cadf75". You have to remove (or rename) that container to be able to reuse that name.")

.venv/lib/python3.13.../site-packages/docker/errors.py:39: APIError
tests.e2e.test_provider_proxy_forward.TestProviderProxyForward::test_traefik
Stack Traces | 8.29s run time
self = <docker.api.client.APIClient object at 0x7fc2e8ef96d0>
response = <Response [409]>

    def _raise_for_status(self, response):
        """Raises stored :class:`APIError`, if one occurred."""
        try:
>           response.raise_for_status()

.venv/lib/python3.13.../docker/api/client.py:275: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <Response [409]>

    def raise_for_status(self):
        """Raises :class:`HTTPError`, if one occurred."""
    
        http_error_msg = ""
        if isinstance(self.reason, bytes):
            # We attempt to decode utf-8 first because some servers
            # choose to localize their reason strings. If the string
            # isn't utf-8, we fall back to iso-8859-1 for all other
            # encodings. (See PR #3538)
            try:
                reason = self.reason.decode("utf-8")
            except UnicodeDecodeError:
                reason = self.reason.decode("iso-8859-1")
        else:
            reason = self.reason
    
        if 400 <= self.status_code < 500:
            http_error_msg = (
                f"{self.status_code} Client Error: {reason} for url: {self.url}"
            )
    
        elif 500 <= self.status_code < 600:
            http_error_msg = (
                f"{self.status_code} Server Error: {reason} for url: {self.url}"
            )
    
        if http_error_msg:
>           raise HTTPError(http_error_msg, response=self)
E           requests.exceptions.HTTPError: 409 Client Error: Conflict for url: http+docker:.../localhost/v1.48/containers/create?name=ak-test-outpost

.venv/lib/python3.13.../site-packages/requests/models.py:1026: HTTPError

The above exception was the direct cause of the following exception:

self = <unittest.case._Outcome object at 0x7fc2e8efbb10>
test_case = <tests.e2e.test_provider_proxy_forward.TestProviderProxyForward testMethod=test_traefik>
subTest = False

    @contextlib.contextmanager
    def testPartExecutor(self, test_case, subTest=False):
        old_success = self.success
        self.success = True
        try:
>           yield

.../hostedtoolcache/Python/3.13.11........./x64/lib/python3.13/unittest/case.py:58: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_proxy_forward.TestProviderProxyForward testMethod=test_traefik>
result = <TestCaseFunction test_traefik>

    def run(self, result=None):
        if result is None:
            result = self.defaultTestResult()
            startTestRun = getattr(result, 'startTestRun', None)
            stopTestRun = getattr(result, 'stopTestRun', None)
            if startTestRun is not None:
                startTestRun()
        else:
            stopTestRun = None
    
        result.startTest(self)
        try:
            testMethod = getattr(self, self._testMethodName)
            if (getattr(self.__class__, "__unittest_skip__", False) or
                getattr(testMethod, "__unittest_skip__", False)):
                # If the class or method was skipped.
                skip_why = (getattr(self.__class__, '__unittest_skip_why__', '')
                            or getattr(testMethod, '__unittest_skip_why__', ''))
                _addSkip(result, self, skip_why)
                return result
    
            expecting_failure = (
                getattr(self, "__unittest_expecting_failure__", False) or
                getattr(testMethod, "__unittest_expecting_failure__", False)
            )
            outcome = _Outcome(result)
            start_time = time.perf_counter()
            try:
                self._outcome = outcome
    
                with outcome.testPartExecutor(self):
                    self._callSetUp()
                if outcome.success:
                    outcome.expecting_failure = expecting_failure
                    with outcome.testPartExecutor(self):
>                       self._callTestMethod(testMethod)

.../hostedtoolcache/Python/3.13.11........./x64/lib/python3.13/unittest/case.py:651: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_proxy_forward.TestProviderProxyForward testMethod=test_traefik>
method = <bound method TestProviderProxyForward.test_traefik of <tests.e2e.test_provider_proxy_forward.TestProviderProxyForward testMethod=test_traefik>>

    def _callTestMethod(self, method):
>       if method() is not None:

.../hostedtoolcache/Python/3.13.11........./x64/lib/python3.13/unittest/case.py:606: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_proxy_forward.TestProviderProxyForward testMethod=test_traefik>
args = (), kwargs = {}

    @wraps(func)
    def wrapper(self: TransactionTestCase, *args, **kwargs):
        """Run test again if we're below max_retries, including tearDown and
        setUp. Otherwise raise the error"""
        nonlocal count
        try:
>           return func(self, *args, **kwargs)

tests/e2e/utils.py:492: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_proxy_forward.TestProviderProxyForward testMethod=test_traefik>

    @retry()
    def test_traefik(self):
        """Test traefik"""
        local_config_path = (
            Path(__file__).parent / "proxy_forward_auth" / "traefik_single" / "config-static.yaml"
        )
        self.run_container(
            image="docker.io/library/traefik:3.1",
            ports={
                "80": "80",
            },
            volumes={
                local_config_path: {
                    "bind": ".../etc/traefik/traefik.yml",
                }
            },
        )
    
>       self.prepare()

tests/e2e/test_provider_proxy_forward.py:98: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

args = (<tests.e2e.test_provider_proxy_forward.TestProviderProxyForward testMethod=test_traefik>,)
kwargs = {}, file = 'default/flow-default-invalidation-flow.yaml'
content = 'version: 1\nmetadata:\n  name: Default - Invalidation flow\nentries:\n- attrs:\n    designation: invalidation\n    na...0\n    stage: !KeyOf default-invalidation-logout\n    target: !KeyOf flow\n  model: authentik_flows.flowstagebinding\n'

    @wraps(func)
    def wrapper(*args, **kwargs):
        for file in files:
            content = BlueprintInstance(path=file).retrieve()
            Importer.from_string(content).apply()
>       return func(*args, **kwargs)

.../blueprints/tests/__init__.py:25: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

args = (<tests.e2e.test_provider_proxy_forward.TestProviderProxyForward testMethod=test_traefik>,)
kwargs = {}, file = 'default/flow-default-provider-invalidation.yaml'
content = 'version: 1\nmetadata:\n  name: Default - Provider invalidation flow\nentries:\n- attrs:\n    designation: invalidatio...ation: none\n  identifiers:\n    slug: default-provider-invalidation-flow\n  model: authentik_flows.flow\n  id: flow\n'

    @wraps(func)
    def wrapper(*args, **kwargs):
        for file in files:
            content = BlueprintInstance(path=file).retrieve()
            Importer.from_string(content).apply()
>       return func(*args, **kwargs)

.../blueprints/tests/__init__.py:25: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

args = (<tests.e2e.test_provider_proxy_forward.TestProviderProxyForward testMethod=test_traefik>,)
kwargs = {}, file = 'system/providers-proxy.yaml'
content = 'version: 1\nmetadata:\n  labels:\n    blueprints.goauthentik.io/system: "true"\n  name: System - Proxy Provider - Sco...ser.group_attributes(request),\n                "is_superuser": request.user.is_superuser,\n            }\n        }\n'

    @wraps(func)
    def wrapper(*args, **kwargs):
        for file in files:
            content = BlueprintInstance(path=file).retrieve()
            Importer.from_string(content).apply()
>       return func(*args, **kwargs)

.../blueprints/tests/__init__.py:25: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

args = (<tests.e2e.test_provider_proxy_forward.TestProviderProxyForward testMethod=test_traefik>,)
kwargs = {}, config = <AuthentikCryptoConfig: authentik_crypto>

    @wraps(func)
    def wrapper(*args, **kwargs):
        config = apps.get_app_config(app_name)
        if isinstance(config, ManagedAppConfig):
            config._on_startup_callback(None)
>       return func(*args, **kwargs)

.../blueprints/tests/__init__.py:43: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_proxy_forward.TestProviderProxyForward testMethod=test_traefik>

    @apply_blueprint(
        "default/flow-default-authentication-flow.yaml",
        "default/flow-default-invalidation-flow.yaml",
    )
    @apply_blueprint(
        "default/flow-default-provider-authorization-implicit-consent.yaml",
        "default/flow-default-provider-invalidation.yaml",
    )
    @apply_blueprint(
        "system/providers-oauth2.yaml",
        "system/providers-proxy.yaml",
    )
    @reconcile_app("authentik_crypto")
    def prepare(self):
        proxy: ProxyProvider = ProxyProvider.objects.create(
            name=generate_id(),
            mode=ProxyMode.FORWARD_SINGLE,
            authorization_flow=Flow.objects.get(
                slug="default-provider-authorization-implicit-consent"
            ),
            invalidation_flow=Flow.objects.get(slug="default-provider-invalidation-flow"),
            internal_host=f"http://{self.host}",
            external_host="http://localhost",
        )
        # Ensure OAuth2 Params are set
        proxy.set_oauth_defaults()
        proxy.save()
        # we need to create an application to actually access the proxy
        Application.objects.create(name=generate_id(), slug=generate_id(), provider=proxy)
        outpost: Outpost = Outpost.objects.create(
            name=generate_id(),
            type=OutpostType.PROXY,
        )
        outpost.providers.add(proxy)
        outpost.build_user_permissions(outpost.user)
    
>       self.start_outpost(outpost)

tests/e2e/test_provider_proxy_forward.py:78: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_proxy_forward.TestProviderProxyForward testMethod=test_traefik>
outpost = <Outpost: Outpost NAmkLiQzAz38MoObQveIDwutYvWRD7Wio25iHo8Z>

    def start_outpost(self, outpost: Outpost):
        """Start proxy container based on outpost created"""
>       self.run_container(
            image=self.get_container_image("ghcr.io/goauthentik/dev-proxy"),
            ports={
                "9000": "9000",
            },
            environment={
                "AUTHENTIK_TOKEN": outpost.token.key,
            },
            name="ak-test-outpost",
        )

tests/e2e/test_provider_proxy_forward.py:31: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_proxy_forward.TestProviderProxyForward testMethod=test_traefik>
specs = {'detach': True, 'environment': {'AUTHENTIK_HOST': 'http://10.1.0.43:38703', 'AUTHENTIK_TOKEN': 'hlqzJhPbNC5ucC48Bn4zH...rry-pick-18980-to-version-2025.12', 'labels': {'io.goauthentik.test': 'yL1RkvpOKuEc2Vau5KRHjWB7jje0mXUeCXksjXWQ'}, ...}

    def run_container(self, **specs: dict[str, Any]) -> Container:
        if "network_mode" not in specs:
            specs["network"] = self.__network.name
        specs["labels"] = self.docker_labels
        specs["detach"] = True
        if hasattr(self, "live_server_url"):
            specs.setdefault("environment", {})
            specs["environment"]["AUTHENTIK_HOST"] = self.live_server_url
>       container = self.docker_client.containers.run(**specs)

tests/e2e/utils.py:122: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <docker.models.containers.ContainerCollection object at 0x7fc2e9f5c6b0>
image = 'ghcr.io/goauthentik/dev-proxy:gh-cherry-pick-18980-to-version-2025.12'
command = None, stdout = True, stderr = False, remove = False
kwargs = {'environment': {'AUTHENTIK_HOST': 'http://10.1.0.43:38703', 'AUTHENTIK_TOKEN': 'hlqzJhPbNC5ucC48Bn4zHhsBQOWmNztKVVpcE...e0mXUeCXksjXWQ'}, 'name': 'ak-test-outpost', 'network': 'authentik-test-BN3xz0rLajemti6XybaPLjUM0RkyvFBibTyy67DM', ...}
stream = False, detach = True, platform = None

    def run(self, image, command=None, stdout=True, stderr=False,
            remove=False, **kwargs):
        """
        Run a container. By default, it will wait for the container to finish
        and return its logs, similar to ``docker run``.
    
        If the ``detach`` argument is ``True``, it will start the container
        and immediately return a :py:class:`Container` object, similar to
        ``docker run -d``.
    
        Example:
            Run a container and get its output:
    
            >>> import docker
            >>> client = docker.from_env()
            >>> client.containers.run('alpine', 'echo hello world')
            b'hello world\\n'
    
            Run a container and detach:
    
            >>> container = client.containers.run('bfirsh/reticulate-splines',
                                                  detach=True)
            >>> container.logs()
            'Reticulating spline 1...\\nReticulating spline 2...\\n'
    
        Args:
            image (str): The image to run.
            command (str or list): The command to run in the container.
            auto_remove (bool): enable auto-removal of the container on daemon
                side when the container's process exits.
            blkio_weight_device: Block IO weight (relative device weight) in
                the form of: ``[{"Path": "device_path", "Weight": weight}]``.
            blkio_weight: Block IO weight (relative weight), accepts a weight
                value between 10 and 1000.
            cap_add (list of str): Add kernel capabilities. For example,
                ``["SYS_ADMIN", "MKNOD"]``.
            cap_drop (list of str): Drop kernel capabilities.
            cgroup_parent (str): Override the default parent cgroup.
            cgroupns (str): Override the default cgroup namespace mode for the
                container. One of:
                - ``private`` the container runs in its own private cgroup
                  namespace.
                - ``host`` use the host system's cgroup namespace.
            cpu_count (int): Number of usable CPUs (Windows only).
            cpu_percent (int): Usable percentage of the available CPUs
                (Windows only).
            cpu_period (int): The length of a CPU period in microseconds.
            cpu_quota (int): Microseconds of CPU time that the container can
                get in a CPU period.
            cpu_rt_period (int): Limit CPU real-time period in microseconds.
            cpu_rt_runtime (int): Limit CPU real-time runtime in microseconds.
            cpu_shares (int): CPU shares (relative weight).
            cpuset_cpus (str): CPUs in which to allow execution (``0-3``,
                ``0,1``).
            cpuset_mems (str): Memory nodes (MEMs) in which to allow execution
                (``0-3``, ``0,1``). Only effective on NUMA systems.
            detach (bool): Run container in the background and return a
                :py:class:`Container` object.
            device_cgroup_rules (:py:class:`list`): A list of cgroup rules to
                apply to the container.
            device_read_bps: Limit read rate (bytes per second) from a device
                in the form of: `[{"Path": "device_path", "Rate": rate}]`
            device_read_iops: Limit read rate (IO per second) from a device.
            device_write_bps: Limit write rate (bytes per second) from a
                device.
            device_write_iops: Limit write rate (IO per second) from a device.
            devices (:py:class:`list`): Expose host devices to the container,
                as a list of strings in the form
                ``<path_on_host>:<path_in_container>:<cgroup_permissions>``.
    
                For example, ``/dev/sda:/dev/xvda:rwm`` allows the container
                to have read-write access to the host's ``/dev/sda`` via a
                node named ``/dev/xvda`` inside the container.
            device_requests (:py:class:`list`): Expose host resources such as
                GPUs to the container, as a list of
                :py:class:`docker.types.DeviceRequest` instances.
            dns (:py:class:`list`): Set custom DNS servers.
            dns_opt (:py:class:`list`): Additional options to be added to the
                container's ``resolv.conf`` file.
            dns_search (:py:class:`list`): DNS search domains.
            domainname (str or list): Set custom DNS search domains.
            entrypoint (str or list): The entrypoint for the container.
            environment (dict or list): Environment variables to set inside
                the container, as a dictionary or a list of strings in the
                format ``["SOMEVARIABLE=xxx"]``.
            extra_hosts (dict): Additional hostnames to resolve inside the
                container, as a mapping of hostname to IP address.
            group_add (:py:class:`list`): List of additional group names and/or
                IDs that the container process will run as.
            healthcheck (dict): Specify a test to perform to check that the
                container is healthy. The dict takes the following keys:
    
                - test (:py:class:`list` or str): Test to perform to determine
                    container health. Possible values:
    
                    - Empty list: Inherit healthcheck from parent image
                    - ``["NONE"]``: Disable healthcheck
                    - ``["CMD", args...]``: exec arguments directly.
                    - ``["CMD-SHELL", command]``: Run command in the system's
                      default shell.
    
                    If a string is provided, it will be used as a ``CMD-SHELL``
                    command.
                - interval (int): The time to wait between checks in
                  nanoseconds. It should be 0 or at least 1000000 (1 ms).
                - timeout (int): The time to wait before considering the check
                  to have hung. It should be 0 or at least 1000000 (1 ms).
                - retries (int): The number of consecutive failures needed to
                    consider a container as unhealthy.
                - start_period (int): Start period for the container to
                    initialize before starting health-retries countdown in
                    nanoseconds. It should be 0 or at least 1000000 (1 ms).
            hostname (str): Optional hostname for the container.
            init (bool): Run an init inside the container that forwards
                signals and reaps processes
            init_path (str): Path to the docker-init binary
            ipc_mode (str): Set the IPC mode for the container.
            isolation (str): Isolation technology to use. Default: `None`.
            kernel_memory (int or str): Kernel memory limit
            labels (dict or list): A dictionary of name-value labels (e.g.
                ``{"label1": "value1", "label2": "value2"}``) or a list of
                names of labels to set with empty values (e.g.
                ``["label1", "label2"]``)
            links (dict): Mapping of links using the
                ``{'container': 'alias'}`` format. The alias is optional.
                Containers declared in this dict will be linked to the new
                container using the provided alias. Default: ``None``.
            log_config (LogConfig): Logging configuration.
            lxc_conf (dict): LXC config.
            mac_address (str): MAC address to assign to the container.
            mem_limit (int or str): Memory limit. Accepts float values
                (which represent the memory limit of the created container in
                bytes) or a string with a units identification char
                (``100000b``, ``1000k``, ``128m``, ``1g``). If a string is
                specified without a units character, bytes are assumed as an
                intended unit.
            mem_reservation (int or str): Memory soft limit.
            mem_swappiness (int): Tune a container's memory swappiness
                behavior. Accepts number between 0 and 100.
            memswap_limit (str or int): Maximum amount of memory + swap a
                container is allowed to consume.
            mounts (:py:class:`list`): Specification for mounts to be added to
                the container. More powerful alternative to ``volumes``. Each
                item in the list is expected to be a
                :py:class:`docker.types.Mount` object.
            name (str): The name for this container.
            nano_cpus (int):  CPU quota in units of 1e-9 CPUs.
            network (str): Name of the network this container will be connected
                to at creation time. You can connect to additional networks
                using :py:meth:`Network.connect`. Incompatible with
                ``network_mode``.
            network_disabled (bool): Disable networking.
            network_mode (str): One of:
    
                - ``bridge`` Create a new network stack for the container on
                  the bridge network.
                - ``none`` No networking for this container.
                - ``container:<name|id>`` Reuse another container's network
                  stack.
                - ``host`` Use the host network stack.
                  This mode is incompatible with ``ports``.
    
                Incompatible with ``network``.
            networking_config (Dict[str, EndpointConfig]):
                Dictionary of EndpointConfig objects for each container network.
                The key is the name of the network.
                Defaults to ``None``.
    
                Used in conjuction with ``network``.
    
                Incompatible with ``network_mode``.
            oom_kill_disable (bool): Whether to disable OOM killer.
            oom_score_adj (int): An integer value containing the score given
                to the container in order to tune OOM killer preferences.
            pid_mode (str): If set to ``host``, use the host PID namespace
                inside the container.
            pids_limit (int): Tune a container's pids limit. Set ``-1`` for
                unlimited.
            platform (str): Platform in the format ``os[/arch[/variant]]``.
                Only used if the method needs to pull the requested image.
            ports (dict): Ports to bind inside the container.
    
                The keys of the dictionary are the ports to bind inside the
                container, either as an integer or a string in the form
                ``port/protocol``, where the protocol is either ``tcp``,
                ``udp``, or ``sctp``.
    
                The values of the dictionary are the corresponding ports to
                open on the host, which can be either:
    
                - The port number, as an integer. For example,
                  ``{'2222/tcp': 3333}`` will expose port 2222 inside the
                  container as port 3333 on the host.
                - ``None``, to assign a random host port. For example,
                  ``{'2222/tcp': None}``.
                - A tuple of ``(address, port)`` if you want to specify the
                  host interface. For example,
                  ``{'1111/tcp': ('127.0.0.1', 1111)}``.
                - A list of integers, if you want to bind multiple host ports
                  to a single container port. For example,
                  ``{'1111/tcp': [1234, 4567]}``.
    
                Incompatible with ``host`` network mode.
            privileged (bool): Give extended privileges to this container.
            publish_all_ports (bool): Publish all ports to the host.
            read_only (bool): Mount the container's root filesystem as read
                only.
            remove (bool): Remove the container when it has finished running.
                Default: ``False``.
            restart_policy (dict): Restart the container when it exits.
                Configured as a dictionary with keys:
    
                - ``Name`` One of ``on-failure``, or ``always``.
                - ``MaximumRetryCount`` Number of times to restart the
                  container on failure.
    
                For example:
                ``{"Name": "on-failure", "MaximumRetryCount": 5}``
    
            runtime (str): Runtime to use with this container.
            security_opt (:py:class:`list`): A list of string values to
                customize labels for MLS systems, such as SELinux.
            shm_size (str or int): Size of /dev/shm (e.g. ``1G``).
            stdin_open (bool): Keep ``STDIN`` open even if not attached.
            stdout (bool): Return logs from ``STDOUT`` when ``detach=False``.
                Default: ``True``.
            stderr (bool): Return logs from ``STDERR`` when ``detach=False``.
                Default: ``False``.
            stop_signal (str): The stop signal to use to stop the container
                (e.g. ``SIGINT``).
            storage_opt (dict): Storage driver options per container as a
                key-value mapping.
            stream (bool): If true and ``detach`` is false, return a log
                generator instead of a string. Ignored if ``detach`` is true.
                Default: ``False``.
            sysctls (dict): Kernel parameters to set in the container.
            tmpfs (dict): Temporary filesystems to mount, as a dictionary
                mapping a path inside the container to options for that path.
    
                For example:
    
                .. code-block:: python
    
                    {
                        '/mnt/vol2': '',
                        '/mnt/vol1': 'size=3G,uid=1000'
                    }
    
            tty (bool): Allocate a pseudo-TTY.
            ulimits (:py:class:`list`): Ulimits to set inside the container,
                as a list of :py:class:`docker.types.Ulimit` instances.
            use_config_proxy (bool): If ``True``, and if the docker client
                configuration file (``~/.docker/config.json`` by default)
                contains a proxy configuration, the corresponding environment
                variables will be set in the container being built.
            user (str or int): Username or UID to run commands as inside the
                container.
            userns_mode (str): Sets the user namespace mode for the container
                when user namespace remapping option is enabled. Supported
                values are: ``host``
            uts_mode (str): Sets the UTS namespace mode for the container.
                Supported values are: ``host``
            version (str): The version of the API to use. Set to ``auto`` to
                automatically detect the server's version. Default: ``1.35``
            volume_driver (str): The name of a volume driver/plugin.
            volumes (dict or list): A dictionary to configure volumes mounted
                inside the container. The key is either the host path or a
                volume name, and the value is a dictionary with the keys:
    
                - ``bind`` The path to mount the volume inside the container
                - ``mode`` Either ``rw`` to mount the volume read/write, or
                  ``ro`` to mount it read-only.
    
                For example:
    
                .. code-block:: python
    
                    {'/home/user1/': {'bind': '/mnt/vol2', 'mode': 'rw'},
                     '/var/www': {'bind': '/mnt/vol1', 'mode': 'ro'}}
    
                Or a list of strings which each one of its elements specifies a
                mount volume.
    
                For example:
    
                .. code-block:: python
    
                    ['/home/user1/:/mnt/vol2','/var/www:/mnt/vol1']
    
            volumes_from (:py:class:`list`): List of container names or IDs to
                get volumes from.
            working_dir (str): Path to the working directory.
    
        Returns:
            The container logs, either ``STDOUT``, ``STDERR``, or both,
            depending on the value of the ``stdout`` and ``stderr`` arguments.
    
            ``STDOUT`` and ``STDERR`` may be read only if either ``json-file``
            or ``journald`` logging driver used. Thus, if you are using none of
            these drivers, a ``None`` object is returned instead. See the
            `Engine API documentation
            <https://docs.docker..../engine/api/v1.30/#operation/ContainerLogs/>`_
            for full details.
    
            If ``detach`` is ``True``, a :py:class:`Container` object is
            returned instead.
    
        Raises:
            :py:class:`docker.errors.ContainerError`
                If the container exits with a non-zero exit code and
                ``detach`` is ``False``.
            :py:class:`docker.errors.ImageNotFound`
                If the specified image does not exist.
            :py:class:`docker.errors.APIError`
                If the server returns an error.
        """
        if isinstance(image, Image):
            image = image.id
        stream = kwargs.pop('stream', False)
        detach = kwargs.pop('detach', False)
        platform = kwargs.get('platform', None)
    
        if detach and remove:
            if version_gte(self.client.api._version, '1.25'):
                kwargs["auto_remove"] = True
            else:
                raise RuntimeError("The options 'detach' and 'remove' cannot "
                                   "be used together in api versions < 1.25.")
    
        if kwargs.get('network') and kwargs.get('network_mode'):
            raise RuntimeError(
                'The options "network" and "network_mode" can not be used '
                'together.'
            )
    
        if kwargs.get('networking_config') and not kwargs.get('network'):
            raise RuntimeError(
                'The option "networking_config" can not be used '
                'without "network".'
            )
    
        try:
>           container = self.create(image=image, command=command,
                                    detach=detach, **kwargs)

.venv/lib/python3.13.../docker/models/containers.py:876: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <docker.models.containers.ContainerCollection object at 0x7fc2e9f5c6b0>
image = 'ghcr.io/goauthentik/dev-proxy:gh-cherry-pick-18980-to-version-2025.12'
command = None, kwargs = {}
create_kwargs = {'command': None, 'detach': True, 'environment': {'AUTHENTIK_HOST': 'http://10.1.0.43:38703', 'AUTHENTIK_TOKEN': 'hlqz...st-BN3xz0rLajemti6XybaPLjUM0RkyvFBibTyy67DM', 'PortBindings': {'9000/tcp': [{'HostIp': '', 'HostPort': '9000'}]}}, ...}

    def create(self, image, command=None, **kwargs):
        """
        Create a container without starting it. Similar to ``docker create``.
    
        Takes the same arguments as :py:meth:`run`, except for ``stdout``,
        ``stderr``, and ``remove``.
    
        Returns:
            A :py:class:`Container` object.
    
        Raises:
            :py:class:`docker.errors.ImageNotFound`
                If the specified image does not exist.
            :py:class:`docker.errors.APIError`
                If the server returns an error.
        """
        if isinstance(image, Image):
            image = image.id
        kwargs['image'] = image
        kwargs['command'] = command
        kwargs['version'] = self.client.api._version
        create_kwargs = _create_container_args(kwargs)
>       resp = self.client.api.create_container(**create_kwargs)

.venv/lib/python3.13.../docker/models/containers.py:935: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <docker.api.client.APIClient object at 0x7fc2e8ef96d0>
image = 'ghcr.io/goauthentik/dev-proxy:gh-cherry-pick-18980-to-version-2025.12'
command = None, hostname = None, user = None, detach = True, stdin_open = False
tty = False, ports = [('9000', 'tcp')]
environment = ['AUTHENTIK_TOKEN=hlqzJhPbNC5ucC48Bn4zHhsBQOWmNztKVVpcEh9bGgoYuBkJ8zHpD6mIVFVQ', 'AUTHENTIK_HOST=http://10.1.0.43:38703']
volumes = None, network_disabled = False, name = 'ak-test-outpost'
entrypoint = None, working_dir = None, domainname = None
host_config = {'NetworkMode': 'authentik-test-BN3xz0rLajemti6XybaPLjUM0RkyvFBibTyy67DM', 'PortBindings': {'9000/tcp': [{'HostIp': '', 'HostPort': '9000'}]}}
mac_address = None
labels = {'io.goauthentik.test': 'yL1RkvpOKuEc2Vau5KRHjWB7jje0mXUeCXksjXWQ'}
stop_signal = None
networking_config = {'authentik-test-BN3xz0rLajemti6XybaPLjUM0RkyvFBibTyy67DM': None}
healthcheck = None, stop_timeout = None, runtime = None, use_config_proxy = True
platform = None

    def create_container(self, image, command=None, hostname=None, user=None,
                         detach=False, stdin_open=False, tty=False, ports=None,
                         environment=None, volumes=None,
                         network_disabled=False, name=None, entrypoint=None,
                         working_dir=None, domainname=None, host_config=None,
                         mac_address=None, labels=None, stop_signal=None,
                         networking_config=None, healthcheck=None,
                         stop_timeout=None, runtime=None,
                         use_config_proxy=True, platform=None):
        """
        Creates a container. Parameters are similar to those for the ``docker
        run`` command except it doesn't support the attach options (``-a``).
    
        The arguments that are passed directly to this function are
        host-independent configuration options. Host-specific configuration
        is passed with the `host_config` argument. You'll normally want to
        use this method in combination with the :py:meth:`create_host_config`
        method to generate ``host_config``.
    
        **Port bindings**
    
        Port binding is done in two parts: first, provide a list of ports to
        open inside the container with the ``ports`` parameter, then declare
        bindings with the ``host_config`` parameter. For example:
    
        .. code-block:: python
    
            container_id = client.api.create_container(
                'busybox', 'ls', ports=[1111, 2222],
                host_config=client.api.create_host_config(port_bindings={
                    1111: 4567,
                    2222: None
                })
            )
    
    
        You can limit the host address on which the port will be exposed like
        such:
    
        .. code-block:: python
    
            client.api.create_host_config(
                port_bindings={1111: ('127.0.0.1', 4567)}
            )
    
        Or without host port assignment:
    
        .. code-block:: python
    
            client.api.create_host_config(port_bindings={1111: ('127.0.0.1',)})
    
        If you wish to use UDP instead of TCP (default), you need to declare
        ports as such in both the config and host config:
    
        .. code-block:: python
    
            container_id = client.api.create_container(
                'busybox', 'ls', ports=[(1111, 'udp'), 2222],
                host_config=client.api.create_host_config(port_bindings={
                    '1111/udp': 4567, 2222: None
                })
            )
    
        To bind multiple host ports to a single container port, use the
        following syntax:
    
        .. code-block:: python
    
            client.api.create_host_config(port_bindings={
                1111: [1234, 4567]
            })
    
        You can also bind multiple IPs to a single container port:
    
        .. code-block:: python
    
            client.api.create_host_config(port_bindings={
                1111: [
                    ('192.168.0.100', 1234),
                    ('192.168.0.101', 1234)
                ]
            })
    
        **Using volumes**
    
        Volume declaration is done in two parts. Provide a list of
        paths to use as mountpoints inside the container with the
        ``volumes`` parameter, and declare mappings from paths on the host
        in the ``host_config`` section.
    
        .. code-block:: python
    
            container_id = client.api.create_container(
                'busybox', 'ls', volumes=['/mnt/vol1', '/mnt/vol2'],
                host_config=client.api.create_host_config(binds={
                    '/home/user1/': {
                        'bind': '/mnt/vol2',
                        'mode': 'rw',
                    },
                    '/var/www': {
                        'bind': '/mnt/vol1',
                        'mode': 'ro',
                    },
                    '/autofs/user1': {
                        'bind': '/mnt/vol3',
                        'mode': 'rw',
                        'propagation': 'shared'
                    }
                })
            )
    
        You can alternatively specify binds as a list. This code is equivalent
        to the example above:
    
        .. code-block:: python
    
            container_id = client.api.create_container(
                'busybox', 'ls', volumes=['/mnt/vol1', '/mnt/vol2', '/mnt/vol3'],
                host_config=client.api.create_host_config(binds=[
                    '/home/user1/:/mnt/vol2',
                    '/var/www:/mnt/vol1:ro',
                    '/autofs/user1:/mnt/vol3:rw,shared',
                ])
            )
    
        **Networking**
    
        You can specify networks to connect the container to by using the
        ``networking_config`` parameter. At the time of creation, you can
        only connect a container to a single networking, but you
        can create more connections by using
        :py:meth:`~connect_container_to_network`.
    
        For example:
    
        .. code-block:: python
    
            networking_config = client.api.create_networking_config({
                'network1': client.api.create_endpoint_config(
                    ipv4_address='172.28.0.124',
                    aliases=['foo', 'bar'],
                    links=['container2']
                )
            })
    
            ctnr = client.api.create_container(
                img, command, networking_config=networking_config
            )
    
        Args:
            image (str): The image to run
            command (str or list): The command to be run in the container
            hostname (str): Optional hostname for the container
            user (str or int): Username or UID
            detach (bool): Detached mode: run container in the background and
                return container ID
            stdin_open (bool): Keep STDIN open even if not attached
            tty (bool): Allocate a pseudo-TTY
            ports (list of ints): A list of port numbers
            environment (dict or list): A dictionary or a list of strings in
                the following format ``["PASSWORD=xxx"]`` or
                ``{"PASSWORD": "xxx"}``.
            volumes (str or list): List of paths inside the container to use
                as volumes.
            network_disabled (bool): Disable networking
            name (str): A name for the container
            entrypoint (str or list): An entrypoint
            working_dir (str): Path to the working directory
            domainname (str): The domain name to use for the container
            host_config (dict): A dictionary created with
                :py:meth:`create_host_config`.
            mac_address (str): The Mac Address to assign the container
            labels (dict or list): A dictionary of name-value labels (e.g.
                ``{"label1": "value1", "label2": "value2"}``) or a list of
                names of labels to set with empty values (e.g.
                ``["label1", "label2"]``)
            stop_signal (str): The stop signal to use to stop the container
                (e.g. ``SIGINT``).
            stop_timeout (int): Timeout to stop the container, in seconds.
                Default: 10
            networking_config (dict): A networking configuration generated
                by :py:meth:`create_networking_config`.
            runtime (str): Runtime to use with this container.
            healthcheck (dict): Specify a test to perform to check that the
                container is healthy.
            use_config_proxy (bool): If ``True``, and if the docker client
                configuration file (``~/.docker/config.json`` by default)
                contains a proxy configuration, the corresponding environment
                variables will be set in the container being created.
            platform (str): Platform in the format ``os[/arch[/variant]]``.
    
        Returns:
            A dictionary with an image 'Id' key and a 'Warnings' key.
    
        Raises:
            :py:class:`docker.errors.ImageNotFound`
                If the specified image does not exist.
            :py:class:`docker.errors.APIError`
                If the server returns an error.
        """
        if isinstance(volumes, str):
            volumes = [volumes, ]
    
        if isinstance(environment, dict):
            environment = utils.utils.format_environment(environment)
    
        if use_config_proxy:
            environment = self._proxy_configs.inject_proxy_environment(
                environment
            ) or None
    
        config = self.create_container_config(
            image, command, hostname, user, detach, stdin_open, tty,
            ports, environment, volumes,
            network_disabled, entrypoint, working_dir, domainname,
            host_config, mac_address, labels,
            stop_signal, networking_config, healthcheck,
            stop_timeout, runtime
        )
>       return self.create_container_from_config(config, name, platform)

.venv/lib/python3.13.../docker/api/container.py:440: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <docker.api.client.APIClient object at 0x7fc2e8ef96d0>
config = {'Hostname': None, 'Domainname': None, 'ExposedPorts': {'9000/tcp': {}}, 'User': None, 'Tty': False, 'OpenStdin': Fals...RkvpOKuEc2Vau5KRHjWB7jje0mXUeCXksjXWQ'}, 'StopSignal': None, 'Healthcheck': None, 'StopTimeout': None, 'Runtime': None}
name = 'ak-test-outpost', platform = None

    def create_container_from_config(self, config, name=None, platform=None):
        u = self._url("/containers/create")
        params = {
            'name': name
        }
        if platform:
            if utils.version_lt(self._version, '1.41'):
                raise errors.InvalidVersion(
                    'platform is not supported for API version < 1.41'
                )
            params['platform'] = platform
        res = self._post_json(u, data=config, params=params)
>       return self._result(res, True)

.venv/lib/python3.13.../docker/api/container.py:457: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <docker.api.client.APIClient object at 0x7fc2e8ef96d0>
response = <Response [409]>, json = True, binary = False

    def _result(self, response, json=False, binary=False):
        assert not (json and binary)
>       self._raise_for_status(response)

.venv/lib/python3.13.../docker/api/client.py:281: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <docker.api.client.APIClient object at 0x7fc2e8ef96d0>
response = <Response [409]>

    def _raise_for_status(self, response):
        """Raises stored :class:`APIError`, if one occurred."""
        try:
            response.raise_for_status()
        except requests.exceptions.HTTPError as e:
>           raise create_api_error_from_http_exception(e) from e

.venv/lib/python3.13.../docker/api/client.py:277: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

e = HTTPError('409 Client Error: Conflict for url: http+docker:.../localhost/v1.48/containers/create?name=ak-test-outpost')

    def create_api_error_from_http_exception(e):
        """
        Create a suitable APIError from requests.exceptions.HTTPError.
        """
        response = e.response
        try:
            explanation = response.json()['message']
        except ValueError:
            explanation = (response.text or '').strip()
        cls = APIError
        if response.status_code == 404:
            explanation_msg = (explanation or '').lower()
            if any(fragment in explanation_msg
                   for fragment in _image_not_found_explanation_fragments):
                cls = ImageNotFound
            else:
                cls = NotFound
>       raise cls(e, response=response, explanation=explanation) from e
E       docker.errors.APIError: 409 Client Error for http+docker:.../localhost/v1.48/containers/create?name=ak-test-outpost: Conflict ("Conflict. The container name "/ak-test-outpost" is already in use by container "c08f4147bc2957fba4657885a87e5c011e424e4c999fbc6a2d88a93d38cadf75". You have to remove (or rename) that container to be able to reuse that name.")

.venv/lib/python3.13.../site-packages/docker/errors.py:39: APIError
tests.e2e.test_provider_ldap.TestProviderLDAP::test_ldap_bind_success_ssl
Stack Traces | 50s run time
self = <unittest.case._Outcome object at 0x7fccb364a780>
test_case = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_success_ssl>
subTest = False

    @contextlib.contextmanager
    def testPartExecutor(self, test_case, subTest=False):
        old_success = self.success
        self.success = True
        try:
>           yield

.../hostedtoolcache/Python/3.13.11........./x64/lib/python3.13/unittest/case.py:58: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_success_ssl>
result = <TestCaseFunction test_ldap_bind_success_ssl>

    def run(self, result=None):
        if result is None:
            result = self.defaultTestResult()
            startTestRun = getattr(result, 'startTestRun', None)
            stopTestRun = getattr(result, 'stopTestRun', None)
            if startTestRun is not None:
                startTestRun()
        else:
            stopTestRun = None
    
        result.startTest(self)
        try:
            testMethod = getattr(self, self._testMethodName)
            if (getattr(self.__class__, "__unittest_skip__", False) or
                getattr(testMethod, "__unittest_skip__", False)):
                # If the class or method was skipped.
                skip_why = (getattr(self.__class__, '__unittest_skip_why__', '')
                            or getattr(testMethod, '__unittest_skip_why__', ''))
                _addSkip(result, self, skip_why)
                return result
    
            expecting_failure = (
                getattr(self, "__unittest_expecting_failure__", False) or
                getattr(testMethod, "__unittest_expecting_failure__", False)
            )
            outcome = _Outcome(result)
            start_time = time.perf_counter()
            try:
                self._outcome = outcome
    
                with outcome.testPartExecutor(self):
                    self._callSetUp()
                if outcome.success:
                    outcome.expecting_failure = expecting_failure
                    with outcome.testPartExecutor(self):
>                       self._callTestMethod(testMethod)

.../hostedtoolcache/Python/3.13.11........./x64/lib/python3.13/unittest/case.py:651: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_success_ssl>
method = <bound method TestProviderLDAP.test_ldap_bind_success_ssl of <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_success_ssl>>

    def _callTestMethod(self, method):
>       if method() is not None:

.../hostedtoolcache/Python/3.13.11........./x64/lib/python3.13/unittest/case.py:606: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_success_ssl>
args = (), kwargs = {}

    @wraps(func)
    def wrapper(self: TransactionTestCase, *args, **kwargs):
        """Run test again if we're below max_retries, including tearDown and
        setUp. Otherwise raise the error"""
        nonlocal count
        try:
>           return func(self, *args, **kwargs)

tests/e2e/utils.py:492: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

args = (<tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_success_ssl>,)
kwargs = {}, file = 'default/flow-default-invalidation-flow.yaml'
content = 'version: 1\nmetadata:\n  name: Default - Invalidation flow\nentries:\n- attrs:\n    designation: invalidation\n    na...0\n    stage: !KeyOf default-invalidation-logout\n    target: !KeyOf flow\n  model: authentik_flows.flowstagebinding\n'

    @wraps(func)
    def wrapper(*args, **kwargs):
        for file in files:
            content = BlueprintInstance(path=file).retrieve()
            Importer.from_string(content).apply()
>       return func(*args, **kwargs)

.../blueprints/tests/__init__.py:25: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_success_ssl>

    @retry()
    @apply_blueprint(
        "default/flow-default-authentication-flow.yaml",
        "default/flow-default-invalidation-flow.yaml",
    )
    def test_ldap_bind_success_ssl(self):
        """Test simple bind with ssl"""
>       self._prepare()

tests/e2e/test_provider_ldap.py:93: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_success_ssl>

    def _prepare(self) -> User:
        """prepare user, provider, app and container"""
        self.user.attributes["extraAttribute"] = "bar"
        self.user.save()
    
        ldap: LDAPProvider = LDAPProvider.objects.create(
            name=generate_id(),
            authorization_flow=Flow.objects.get(slug="default-authentication-flow"),
            search_mode=APIAccessMode.CACHED,
        )
        self.user.assign_perms_to_managed_role("search_full_directory", ldap)
        # we need to create an application to actually access the ldap
        Application.objects.create(name=generate_id(), slug=generate_id(), provider=ldap)
        outpost: Outpost = Outpost.objects.create(
            name=generate_id(),
            type=OutpostType.LDAP,
            _config=asdict(OutpostConfig(log_level="debug")),
        )
        outpost.providers.add(ldap)
    
>       self.start_ldap(outpost)

tests/e2e/test_provider_ldap.py:56: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_success_ssl>
outpost = <Outpost: Outpost dKzRG4PhGmyx3QuuWQ73hdSJb6Uh0ar3XP7aRfOP>

    def start_ldap(self, outpost: Outpost):
        """Start ldap container based on outpost created"""
>       self.run_container(
            image=self.get_container_image("ghcr.io/goauthentik/dev-ldap"),
            ports={
                "3389": "3389",
                "6636": "6636",
            },
            environment={
                "AUTHENTIK_TOKEN": outpost.token.key,
            },
        )

tests/e2e/test_provider_ldap.py:25: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_success_ssl>
specs = {'detach': True, 'environment': {'AUTHENTIK_HOST': 'http://10.1.0.251:39839', 'AUTHENTIK_TOKEN': 'VJBdML0Wrb3JIzSng4Xb...rry-pick-18980-to-version-2025.12', 'labels': {'io.goauthentik.test': '8TO1sLngWbkTggEhDe0QG7KEMv8fxWcg1cNYKBSs'}, ...}
container = <Container: b3f65039c96f>
state = {'Dead': False, 'Error': '', 'ExitCode': 0, 'FinishedAt': '0001-01-01T00:00:00Z', ...}

    def run_container(self, **specs: dict[str, Any]) -> Container:
        if "network_mode" not in specs:
            specs["network"] = self.__network.name
        specs["labels"] = self.docker_labels
        specs["detach"] = True
        if hasattr(self, "live_server_url"):
            specs.setdefault("environment", {})
            specs["environment"]["AUTHENTIK_HOST"] = self.live_server_url
        container = self.docker_client.containers.run(**specs)
        container.reload()
        state = container.attrs.get("State", {})
        if "Health" not in state:
            return container
>       self.wait_for_container(container)

tests/e2e/utils.py:127: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_success_ssl>
container = <Container: b3f65039c96f>

    def wait_for_container(self, container: Container):
        """Check that container is health"""
        attempt = 0
        while True:
            container.reload()
            status = container.attrs.get("State", {}).get("Health", {}).get("Status")
            if status == "healthy":
                return container
            sleep(1)
            attempt += 1
            if attempt >= self.max_healthcheck_attempts:
>               raise self.failureException("Container failed to start")
E               AssertionError: Container failed to start

tests/e2e/utils.py:101: AssertionError
tests.e2e.test_provider_proxy.TestProviderProxy::test_proxy_basic_auth
Stack Traces | 50.6s run time
self = <unittest.case._Outcome object at 0x7fc2eaa209d0>
test_case = <tests.e2e.test_provider_proxy.TestProviderProxy testMethod=test_proxy_basic_auth>
subTest = False

    @contextlib.contextmanager
    def testPartExecutor(self, test_case, subTest=False):
        old_success = self.success
        self.success = True
        try:
>           yield

.../hostedtoolcache/Python/3.13.11........./x64/lib/python3.13/unittest/case.py:58: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_proxy.TestProviderProxy testMethod=test_proxy_basic_auth>
result = <TestCaseFunction test_proxy_basic_auth>

    def run(self, result=None):
        if result is None:
            result = self.defaultTestResult()
            startTestRun = getattr(result, 'startTestRun', None)
            stopTestRun = getattr(result, 'stopTestRun', None)
            if startTestRun is not None:
                startTestRun()
        else:
            stopTestRun = None
    
        result.startTest(self)
        try:
            testMethod = getattr(self, self._testMethodName)
            if (getattr(self.__class__, "__unittest_skip__", False) or
                getattr(testMethod, "__unittest_skip__", False)):
                # If the class or method was skipped.
                skip_why = (getattr(self.__class__, '__unittest_skip_why__', '')
                            or getattr(testMethod, '__unittest_skip_why__', ''))
                _addSkip(result, self, skip_why)
                return result
    
            expecting_failure = (
                getattr(self, "__unittest_expecting_failure__", False) or
                getattr(testMethod, "__unittest_expecting_failure__", False)
            )
            outcome = _Outcome(result)
            start_time = time.perf_counter()
            try:
                self._outcome = outcome
    
                with outcome.testPartExecutor(self):
                    self._callSetUp()
                if outcome.success:
                    outcome.expecting_failure = expecting_failure
                    with outcome.testPartExecutor(self):
>                       self._callTestMethod(testMethod)

.../hostedtoolcache/Python/3.13.11........./x64/lib/python3.13/unittest/case.py:651: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_proxy.TestProviderProxy testMethod=test_proxy_basic_auth>
method = <bound method TestProviderProxy.test_proxy_basic_auth of <tests.e2e.test_provider_proxy.TestProviderProxy testMethod=test_proxy_basic_auth>>

    def _callTestMethod(self, method):
>       if method() is not None:

.../hostedtoolcache/Python/3.13.11........./x64/lib/python3.13/unittest/case.py:606: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_proxy.TestProviderProxy testMethod=test_proxy_basic_auth>
args = (), kwargs = {}

    @wraps(func)
    def wrapper(self: TransactionTestCase, *args, **kwargs):
        """Run test again if we're below max_retries, including tearDown and
        setUp. Otherwise raise the error"""
        nonlocal count
        try:
>           return func(self, *args, **kwargs)

tests/e2e/utils.py:492: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

args = (<tests.e2e.test_provider_proxy.TestProviderProxy testMethod=test_proxy_basic_auth>,)
kwargs = {}, file = 'default/flow-default-invalidation-flow.yaml'
content = 'version: 1\nmetadata:\n  name: Default - Invalidation flow\nentries:\n- attrs:\n    designation: invalidation\n    na...0\n    stage: !KeyOf default-invalidation-logout\n    target: !KeyOf flow\n  model: authentik_flows.flowstagebinding\n'

    @wraps(func)
    def wrapper(*args, **kwargs):
        for file in files:
            content = BlueprintInstance(path=file).retrieve()
            Importer.from_string(content).apply()
>       return func(*args, **kwargs)

.../blueprints/tests/__init__.py:25: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

args = (<tests.e2e.test_provider_proxy.TestProviderProxy testMethod=test_proxy_basic_auth>,)
kwargs = {}, file = 'default/flow-default-provider-invalidation.yaml'
content = 'version: 1\nmetadata:\n  name: Default - Provider invalidation flow\nentries:\n- attrs:\n    designation: invalidatio...ation: none\n  identifiers:\n    slug: default-provider-invalidation-flow\n  model: authentik_flows.flow\n  id: flow\n'

    @wraps(func)
    def wrapper(*args, **kwargs):
        for file in files:
            content = BlueprintInstance(path=file).retrieve()
            Importer.from_string(content).apply()
>       return func(*args, **kwargs)

.../blueprints/tests/__init__.py:25: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

args = (<tests.e2e.test_provider_proxy.TestProviderProxy testMethod=test_proxy_basic_auth>,)
kwargs = {}, file = 'system/providers-proxy.yaml'
content = 'version: 1\nmetadata:\n  labels:\n    blueprints.goauthentik.io/system: "true"\n  name: System - Proxy Provider - Sco...ser.group_attributes(request),\n                "is_superuser": request.user.is_superuser,\n            }\n        }\n'

    @wraps(func)
    def wrapper(*args, **kwargs):
        for file in files:
            content = BlueprintInstance(path=file).retrieve()
            Importer.from_string(content).apply()
>       return func(*args, **kwargs)

.../blueprints/tests/__init__.py:25: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

args = (<tests.e2e.test_provider_proxy.TestProviderProxy testMethod=test_proxy_basic_auth>,)
kwargs = {}, config = <AuthentikCryptoConfig: authentik_crypto>

    @wraps(func)
    def wrapper(*args, **kwargs):
        config = apps.get_app_config(app_name)
        if isinstance(config, ManagedAppConfig):
            config._on_startup_callback(None)
>       return func(*args, **kwargs)

.../blueprints/tests/__init__.py:43: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_proxy.TestProviderProxy testMethod=test_proxy_basic_auth>

    @retry()
    @apply_blueprint(
        "default/flow-default-authentication-flow.yaml",
        "default/flow-default-invalidation-flow.yaml",
    )
    @apply_blueprint(
        "default/flow-default-provider-authorization-implicit-consent.yaml",
        "default/flow-default-provider-invalidation.yaml",
    )
    @apply_blueprint(
        "system/providers-oauth2.yaml",
        "system/providers-proxy.yaml",
    )
    @reconcile_app("authentik_crypto")
    def test_proxy_basic_auth(self):
        """Test simple outpost setup with single provider"""
        cred = generate_id()
        attr = "basic-password"  # nosec
        self.user.attributes["basic-username"] = cred
        self.user.attributes[attr] = cred
        self.user.save()
    
        proxy: ProxyProvider = ProxyProvider.objects.create(
            name=generate_id(),
            authorization_flow=Flow.objects.get(
                slug="default-provider-authorization-implicit-consent"
            ),
            invalidation_flow=Flow.objects.get(slug="default-provider-invalidation-flow"),
            internal_host=f"http://{self.host}",
            external_host="http://localhost:9000",
            basic_auth_enabled=True,
            basic_auth_user_attribute="basic-username",
            basic_auth_password_attribute=attr,
        )
        # Ensure OAuth2 Params are set
        proxy.set_oauth_defaults()
        proxy.save()
        # we need to create an application to actually access the proxy
        Application.objects.create(name=generate_id(), slug=generate_id(), provider=proxy)
        outpost: Outpost = Outpost.objects.create(
            name=generate_id(),
            type=OutpostType.PROXY,
        )
        outpost.providers.add(proxy)
        outpost.build_user_permissions(outpost.user)
    
>       self.start_proxy(outpost)

tests/e2e/test_provider_proxy.py:177: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_proxy.TestProviderProxy testMethod=test_proxy_basic_auth>
outpost = <Outpost: Outpost yEbLpJK9GLDnvwVfcygfMoRqPFIQghYlhDfFwZhQ>

    def start_proxy(self, outpost: Outpost):
        """Start proxy container based on outpost created"""
>       self.run_container(
            image=self.get_container_image("ghcr.io/goauthentik/dev-proxy"),
            ports={
                "9000": "9000",
            },
            environment={
                "AUTHENTIK_TOKEN": outpost.token.key,
            },
        )

tests/e2e/test_provider_proxy.py:38: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_proxy.TestProviderProxy testMethod=test_proxy_basic_auth>
specs = {'detach': True, 'environment': {'AUTHENTIK_HOST': 'http://10.1.0.43:59341', 'AUTHENTIK_TOKEN': 'vN0znSRwOcse1jXOtudUl...rry-pick-18980-to-version-2025.12', 'labels': {'io.goauthentik.test': 'yL1RkvpOKuEc2Vau5KRHjWB7jje0mXUeCXksjXWQ'}, ...}
container = <Container: 1db5e7aa914a>
state = {'Dead': False, 'Error': '', 'ExitCode': 0, 'FinishedAt': '0001-01-01T00:00:00Z', ...}

    def run_container(self, **specs: dict[str, Any]) -> Container:
        if "network_mode" not in specs:
            specs["network"] = self.__network.name
        specs["labels"] = self.docker_labels
        specs["detach"] = True
        if hasattr(self, "live_server_url"):
            specs.setdefault("environment", {})
            specs["environment"]["AUTHENTIK_HOST"] = self.live_server_url
        container = self.docker_client.containers.run(**specs)
        container.reload()
        state = container.attrs.get("State", {})
        if "Health" not in state:
            return container
>       self.wait_for_container(container)

tests/e2e/utils.py:127: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_proxy.TestProviderProxy testMethod=test_proxy_basic_auth>
container = <Container: 1db5e7aa914a>

    def wait_for_container(self, container: Container):
        """Check that container is health"""
        attempt = 0
        while True:
            container.reload()
            status = container.attrs.get("State", {}).get("Health", {}).get("Status")
            if status == "healthy":
                return container
            sleep(1)
            attempt += 1
            if attempt >= self.max_healthcheck_attempts:
>               raise self.failureException("Container failed to start")
E               AssertionError: Container failed to start

tests/e2e/utils.py:101: AssertionError
tests.e2e.test_provider_proxy.TestProviderProxy::test_proxy_simple
Stack Traces | 51.7s run time
self = <unittest.case._Outcome object at 0x7fc2ea1e6ea0>
test_case = <tests.e2e.test_provider_proxy.TestProviderProxy testMethod=test_proxy_simple>
subTest = False

    @contextlib.contextmanager
    def testPartExecutor(self, test_case, subTest=False):
        old_success = self.success
        self.success = True
        try:
>           yield

.../hostedtoolcache/Python/3.13.11........./x64/lib/python3.13/unittest/case.py:58: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_proxy.TestProviderProxy testMethod=test_proxy_simple>
result = <TestCaseFunction test_proxy_simple>

    def run(self, result=None):
        if result is None:
            result = self.defaultTestResult()
            startTestRun = getattr(result, 'startTestRun', None)
            stopTestRun = getattr(result, 'stopTestRun', None)
            if startTestRun is not None:
                startTestRun()
        else:
            stopTestRun = None
    
        result.startTest(self)
        try:
            testMethod = getattr(self, self._testMethodName)
            if (getattr(self.__class__, "__unittest_skip__", False) or
                getattr(testMethod, "__unittest_skip__", False)):
                # If the class or method was skipped.
                skip_why = (getattr(self.__class__, '__unittest_skip_why__', '')
                            or getattr(testMethod, '__unittest_skip_why__', ''))
                _addSkip(result, self, skip_why)
                return result
    
            expecting_failure = (
                getattr(self, "__unittest_expecting_failure__", False) or
                getattr(testMethod, "__unittest_expecting_failure__", False)
            )
            outcome = _Outcome(result)
            start_time = time.perf_counter()
            try:
                self._outcome = outcome
    
                with outcome.testPartExecutor(self):
                    self._callSetUp()
                if outcome.success:
                    outcome.expecting_failure = expecting_failure
                    with outcome.testPartExecutor(self):
>                       self._callTestMethod(testMethod)

.../hostedtoolcache/Python/3.13.11........./x64/lib/python3.13/unittest/case.py:651: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_proxy.TestProviderProxy testMethod=test_proxy_simple>
method = <bound method TestProviderProxy.test_proxy_simple of <tests.e2e.test_provider_proxy.TestProviderProxy testMethod=test_proxy_simple>>

    def _callTestMethod(self, method):
>       if method() is not None:

.../hostedtoolcache/Python/3.13.11........./x64/lib/python3.13/unittest/case.py:606: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_proxy.TestProviderProxy testMethod=test_proxy_simple>
args = (), kwargs = {}

    @wraps(func)
    def wrapper(self: TransactionTestCase, *args, **kwargs):
        """Run test again if we're below max_retries, including tearDown and
        setUp. Otherwise raise the error"""
        nonlocal count
        try:
>           return func(self, *args, **kwargs)

tests/e2e/utils.py:492: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

args = (<tests.e2e.test_provider_proxy.TestProviderProxy testMethod=test_proxy_simple>,)
kwargs = {}, file = 'default/flow-default-invalidation-flow.yaml'
content = 'version: 1\nmetadata:\n  name: Default - Invalidation flow\nentries:\n- attrs:\n    designation: invalidation\n    na...0\n    stage: !KeyOf default-invalidation-logout\n    target: !KeyOf flow\n  model: authentik_flows.flowstagebinding\n'

    @wraps(func)
    def wrapper(*args, **kwargs):
        for file in files:
            content = BlueprintInstance(path=file).retrieve()
            Importer.from_string(content).apply()
>       return func(*args, **kwargs)

.../blueprints/tests/__init__.py:25: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

args = (<tests.e2e.test_provider_proxy.TestProviderProxy testMethod=test_proxy_simple>,)
kwargs = {}, file = 'default/flow-default-provider-invalidation.yaml'
content = 'version: 1\nmetadata:\n  name: Default - Provider invalidation flow\nentries:\n- attrs:\n    designation: invalidatio...ation: none\n  identifiers:\n    slug: default-provider-invalidation-flow\n  model: authentik_flows.flow\n  id: flow\n'

    @wraps(func)
    def wrapper(*args, **kwargs):
        for file in files:
            content = BlueprintInstance(path=file).retrieve()
            Importer.from_string(content).apply()
>       return func(*args, **kwargs)

.../blueprints/tests/__init__.py:25: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

args = (<tests.e2e.test_provider_proxy.TestProviderProxy testMethod=test_proxy_simple>,)
kwargs = {}, file = 'system/providers-proxy.yaml'
content = 'version: 1\nmetadata:\n  labels:\n    blueprints.goauthentik.io/system: "true"\n  name: System - Proxy Provider - Sco...ser.group_attributes(request),\n                "is_superuser": request.user.is_superuser,\n            }\n        }\n'

    @wraps(func)
    def wrapper(*args, **kwargs):
        for file in files:
            content = BlueprintInstance(path=file).retrieve()
            Importer.from_string(content).apply()
>       return func(*args, **kwargs)

.../blueprints/tests/__init__.py:25: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

args = (<tests.e2e.test_provider_proxy.TestProviderProxy testMethod=test_proxy_simple>,)
kwargs = {}, config = <AuthentikCryptoConfig: authentik_crypto>

    @wraps(func)
    def wrapper(*args, **kwargs):
        config = apps.get_app_config(app_name)
        if isinstance(config, ManagedAppConfig):
            config._on_startup_callback(None)
>       return func(*args, **kwargs)

.../blueprints/tests/__init__.py:43: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_proxy.TestProviderProxy testMethod=test_proxy_simple>

    @retry()
    @apply_blueprint(
        "default/flow-default-authentication-flow.yaml",
        "default/flow-default-invalidation-flow.yaml",
    )
    @apply_blueprint(
        "default/flow-default-provider-authorization-implicit-consent.yaml",
        "default/flow-default-provider-invalidation.yaml",
    )
    @apply_blueprint(
        "system/providers-oauth2.yaml",
        "system/providers-proxy.yaml",
    )
    @reconcile_app("authentik_crypto")
    def test_proxy_simple(self):
        """Test simple outpost setup with single provider"""
        # set additionalHeaders to test later
        self.user.attributes["additionalHeaders"] = {"X-Foo": "bar"}
        self.user.save()
    
        proxy: ProxyProvider = ProxyProvider.objects.create(
            name=generate_id(),
            authorization_flow=Flow.objects.get(
                slug="default-provider-authorization-implicit-consent"
            ),
            invalidation_flow=Flow.objects.get(slug="default-provider-invalidation-flow"),
            internal_host=f"http://{self.host}",
            external_host="http://localhost:9000",
        )
        # Ensure OAuth2 Params are set
        proxy.set_oauth_defaults()
        proxy.save()
        # we need to create an application to actually access the proxy
        Application.objects.create(name=generate_id(), slug=generate_id(), provider=proxy)
        outpost: Outpost = Outpost.objects.create(
            name=generate_id(),
            type=OutpostType.PROXY,
        )
        outpost.providers.add(proxy)
        outpost.build_user_permissions(outpost.user)
    
>       self.start_proxy(outpost)

tests/e2e/test_provider_proxy.py:89: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_proxy.TestProviderProxy testMethod=test_proxy_simple>
outpost = <Outpost: Outpost 8nDYwU4NESMReL1iRS33p7kbZdxQ7ASzJ2idA2fw>

    def start_proxy(self, outpost: Outpost):
        """Start proxy container based on outpost created"""
>       self.run_container(
            image=self.get_container_image("ghcr.io/goauthentik/dev-proxy"),
            ports={
                "9000": "9000",
            },
            environment={
                "AUTHENTIK_TOKEN": outpost.token.key,
            },
        )

tests/e2e/test_provider_proxy.py:38: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_proxy.TestProviderProxy testMethod=test_proxy_simple>
specs = {'detach': True, 'environment': {'AUTHENTIK_HOST': 'http://10.1.0.43:59341', 'AUTHENTIK_TOKEN': 'xcgqSGLHhtY0Q0a5Ifp9a...rry-pick-18980-to-version-2025.12', 'labels': {'io.goauthentik.test': 'yL1RkvpOKuEc2Vau5KRHjWB7jje0mXUeCXksjXWQ'}, ...}
container = <Container: 817b3c844506>
state = {'Dead': False, 'Error': '', 'ExitCode': 0, 'FinishedAt': '0001-01-01T00:00:00Z', ...}

    def run_container(self, **specs: dict[str, Any]) -> Container:
        if "network_mode" not in specs:
            specs["network"] = self.__network.name
        specs["labels"] = self.docker_labels
        specs["detach"] = True
        if hasattr(self, "live_server_url"):
            specs.setdefault("environment", {})
            specs["environment"]["AUTHENTIK_HOST"] = self.live_server_url
        container = self.docker_client.containers.run(**specs)
        container.reload()
        state = container.attrs.get("State", {})
        if "Health" not in state:
            return container
>       self.wait_for_container(container)

tests/e2e/utils.py:127: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_proxy.TestProviderProxy testMethod=test_proxy_simple>
container = <Container: 817b3c844506>

    def wait_for_container(self, container: Container):
        """Check that container is health"""
        attempt = 0
        while True:
            container.reload()
            status = container.attrs.get("State", {}).get("Health", {}).get("Status")
            if status == "healthy":
                return container
            sleep(1)
            attempt += 1
            if attempt >= self.max_healthcheck_attempts:
>               raise self.failureException("Container failed to start")
E               AssertionError: Container failed to start

tests/e2e/utils.py:101: AssertionError
tests.e2e.test_provider_ldap.TestProviderLDAP::test_ldap_schema
Stack Traces | 51.8s run time
self = <unittest.case._Outcome object at 0x7fccaf694b00>
test_case = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_schema>
subTest = False

    @contextlib.contextmanager
    def testPartExecutor(self, test_case, subTest=False):
        old_success = self.success
        self.success = True
        try:
>           yield

.../hostedtoolcache/Python/3.13.11........./x64/lib/python3.13/unittest/case.py:58: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_schema>
result = <TestCaseFunction test_ldap_schema>

    def run(self, result=None):
        if result is None:
            result = self.defaultTestResult()
            startTestRun = getattr(result, 'startTestRun', None)
            stopTestRun = getattr(result, 'stopTestRun', None)
            if startTestRun is not None:
                startTestRun()
        else:
            stopTestRun = None
    
        result.startTest(self)
        try:
            testMethod = getattr(self, self._testMethodName)
            if (getattr(self.__class__, "__unittest_skip__", False) or
                getattr(testMethod, "__unittest_skip__", False)):
                # If the class or method was skipped.
                skip_why = (getattr(self.__class__, '__unittest_skip_why__', '')
                            or getattr(testMethod, '__unittest_skip_why__', ''))
                _addSkip(result, self, skip_why)
                return result
    
            expecting_failure = (
                getattr(self, "__unittest_expecting_failure__", False) or
                getattr(testMethod, "__unittest_expecting_failure__", False)
            )
            outcome = _Outcome(result)
            start_time = time.perf_counter()
            try:
                self._outcome = outcome
    
                with outcome.testPartExecutor(self):
                    self._callSetUp()
                if outcome.success:
                    outcome.expecting_failure = expecting_failure
                    with outcome.testPartExecutor(self):
>                       self._callTestMethod(testMethod)

.../hostedtoolcache/Python/3.13.11........./x64/lib/python3.13/unittest/case.py:651: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_schema>
method = <bound method TestProviderLDAP.test_ldap_schema of <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_schema>>

    def _callTestMethod(self, method):
>       if method() is not None:

.../hostedtoolcache/Python/3.13.11........./x64/lib/python3.13/unittest/case.py:606: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_schema>
args = (), kwargs = {}

    @wraps(func)
    def wrapper(self: TransactionTestCase, *args, **kwargs):
        """Run test again if we're below max_retries, including tearDown and
        setUp. Otherwise raise the error"""
        nonlocal count
        try:
>           return func(self, *args, **kwargs)

tests/e2e/utils.py:492: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

args = (<tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_schema>,)
kwargs = {}, file = 'default/flow-default-invalidation-flow.yaml'
content = 'version: 1\nmetadata:\n  name: Default - Invalidation flow\nentries:\n- attrs:\n    designation: invalidation\n    na...0\n    stage: !KeyOf default-invalidation-logout\n    target: !KeyOf flow\n  model: authentik_flows.flowstagebinding\n'

    @wraps(func)
    def wrapper(*args, **kwargs):
        for file in files:
            content = BlueprintInstance(path=file).retrieve()
            Importer.from_string(content).apply()
>       return func(*args, **kwargs)

.../blueprints/tests/__init__.py:25: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

args = (<tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_schema>,)
kwargs = {}, config = <AuthentikTenantsConfig: authentik_tenants>

    @wraps(func)
    def wrapper(*args, **kwargs):
        config = apps.get_app_config(app_name)
        if isinstance(config, ManagedAppConfig):
            config._on_startup_callback(None)
>       return func(*args, **kwargs)

.../blueprints/tests/__init__.py:43: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

args = (<tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_schema>,)
kwargs = {}, config = <AuthentikOutpostConfig: authentik_outposts>

    @wraps(func)
    def wrapper(*args, **kwargs):
        config = apps.get_app_config(app_name)
        if isinstance(config, ManagedAppConfig):
            config._on_startup_callback(None)
>       return func(*args, **kwargs)

.../blueprints/tests/__init__.py:43: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_schema>

    @retry()
    @apply_blueprint(
        "default/flow-default-authentication-flow.yaml",
        "default/flow-default-invalidation-flow.yaml",
    )
    @reconcile_app("authentik_tenants")
    @reconcile_app("authentik_outposts")
    def test_ldap_schema(self):
        """Test LDAP Schema"""
>       self._prepare()

tests/e2e/test_provider_ldap.py:416: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_schema>

    def _prepare(self) -> User:
        """prepare user, provider, app and container"""
        self.user.attributes["extraAttribute"] = "bar"
        self.user.save()
    
        ldap: LDAPProvider = LDAPProvider.objects.create(
            name=generate_id(),
            authorization_flow=Flow.objects.get(slug="default-authentication-flow"),
            search_mode=APIAccessMode.CACHED,
        )
        self.user.assign_perms_to_managed_role("search_full_directory", ldap)
        # we need to create an application to actually access the ldap
        Application.objects.create(name=generate_id(), slug=generate_id(), provider=ldap)
        outpost: Outpost = Outpost.objects.create(
            name=generate_id(),
            type=OutpostType.LDAP,
            _config=asdict(OutpostConfig(log_level="debug")),
        )
        outpost.providers.add(ldap)
    
>       self.start_ldap(outpost)

tests/e2e/test_provider_ldap.py:56: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_schema>
outpost = <Outpost: Outpost jcNv95P64VCM1X5kf88oWgWidBoUQzdgT3AbwnjH>

    def start_ldap(self, outpost: Outpost):
        """Start ldap container based on outpost created"""
>       self.run_container(
            image=self.get_container_image("ghcr.io/goauthentik/dev-ldap"),
            ports={
                "3389": "3389",
                "6636": "6636",
            },
            environment={
                "AUTHENTIK_TOKEN": outpost.token.key,
            },
        )

tests/e2e/test_provider_ldap.py:25: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_schema>
specs = {'detach': True, 'environment': {'AUTHENTIK_HOST': 'http://10.1.0.251:39839', 'AUTHENTIK_TOKEN': 'RDBPnPztn5JY8F8iUo9l...rry-pick-18980-to-version-2025.12', 'labels': {'io.goauthentik.test': '8TO1sLngWbkTggEhDe0QG7KEMv8fxWcg1cNYKBSs'}, ...}
container = <Container: 547ac3edc694>
state = {'Dead': False, 'Error': '', 'ExitCode': 0, 'FinishedAt': '0001-01-01T00:00:00Z', ...}

    def run_container(self, **specs: dict[str, Any]) -> Container:
        if "network_mode" not in specs:
            specs["network"] = self.__network.name
        specs["labels"] = self.docker_labels
        specs["detach"] = True
        if hasattr(self, "live_server_url"):
            specs.setdefault("environment", {})
            specs["environment"]["AUTHENTIK_HOST"] = self.live_server_url
        container = self.docker_client.containers.run(**specs)
        container.reload()
        state = container.attrs.get("State", {})
        if "Health" not in state:
            return container
>       self.wait_for_container(container)

tests/e2e/utils.py:127: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_schema>
container = <Container: 547ac3edc694>

    def wait_for_container(self, container: Container):
        """Check that container is health"""
        attempt = 0
        while True:
            container.reload()
            status = container.attrs.get("State", {}).get("Health", {}).get("Status")
            if status == "healthy":
                return container
            sleep(1)
            attempt += 1
            if attempt >= self.max_healthcheck_attempts:
>               raise self.failureException("Container failed to start")
E               AssertionError: Container failed to start

tests/e2e/utils.py:101: AssertionError
tests.e2e.test_provider_radius.TestProviderRadius::test_radius_bind_fail
Stack Traces | 54.8s run time
self = <unittest.case._Outcome object at 0x7fc2a4dcf4d0>
test_case = <tests.e2e.test_provider_radius.TestProviderRadius testMethod=test_radius_bind_fail>
subTest = False

    @contextlib.contextmanager
    def testPartExecutor(self, test_case, subTest=False):
        old_success = self.success
        self.success = True
        try:
>           yield

.../hostedtoolcache/Python/3.13.11........./x64/lib/python3.13/unittest/case.py:58: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_radius.TestProviderRadius testMethod=test_radius_bind_fail>
result = <TestCaseFunction test_radius_bind_fail>

    def run(self, result=None):
        if result is None:
            result = self.defaultTestResult()
            startTestRun = getattr(result, 'startTestRun', None)
            stopTestRun = getattr(result, 'stopTestRun', None)
            if startTestRun is not None:
                startTestRun()
        else:
            stopTestRun = None
    
        result.startTest(self)
        try:
            testMethod = getattr(self, self._testMethodName)
            if (getattr(self.__class__, "__unittest_skip__", False) or
                getattr(testMethod, "__unittest_skip__", False)):
                # If the class or method was skipped.
                skip_why = (getattr(self.__class__, '__unittest_skip_why__', '')
                            or getattr(testMethod, '__unittest_skip_why__', ''))
                _addSkip(result, self, skip_why)
                return result
    
            expecting_failure = (
                getattr(self, "__unittest_expecting_failure__", False) or
                getattr(testMethod, "__unittest_expecting_failure__", False)
            )
            outcome = _Outcome(result)
            start_time = time.perf_counter()
            try:
                self._outcome = outcome
    
                with outcome.testPartExecutor(self):
                    self._callSetUp()
                if outcome.success:
                    outcome.expecting_failure = expecting_failure
                    with outcome.testPartExecutor(self):
>                       self._callTestMethod(testMethod)

.../hostedtoolcache/Python/3.13.11........./x64/lib/python3.13/unittest/case.py:651: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_radius.TestProviderRadius testMethod=test_radius_bind_fail>
method = <bound method TestProviderRadius.test_radius_bind_fail of <tests.e2e.test_provider_radius.TestProviderRadius testMethod=test_radius_bind_fail>>

    def _callTestMethod(self, method):
>       if method() is not None:

.../hostedtoolcache/Python/3.13.11........./x64/lib/python3.13/unittest/case.py:606: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_radius.TestProviderRadius testMethod=test_radius_bind_fail>
args = (), kwargs = {}

    @wraps(func)
    def wrapper(self: TransactionTestCase, *args, **kwargs):
        """Run test again if we're below max_retries, including tearDown and
        setUp. Otherwise raise the error"""
        nonlocal count
        try:
>           return func(self, *args, **kwargs)

tests/e2e/utils.py:492: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

args = (<tests.e2e.test_provider_radius.TestProviderRadius testMethod=test_radius_bind_fail>,)
kwargs = {}, file = 'default/flow-default-invalidation-flow.yaml'
content = 'version: 1\nmetadata:\n  name: Default - Invalidation flow\nentries:\n- attrs:\n    designation: invalidation\n    na...0\n    stage: !KeyOf default-invalidation-logout\n    target: !KeyOf flow\n  model: authentik_flows.flowstagebinding\n'

    @wraps(func)
    def wrapper(*args, **kwargs):
        for file in files:
            content = BlueprintInstance(path=file).retrieve()
            Importer.from_string(content).apply()
>       return func(*args, **kwargs)

.../blueprints/tests/__init__.py:25: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_radius.TestProviderRadius testMethod=test_radius_bind_fail>

    @retry(exceptions=[Timeout])
    @apply_blueprint(
        "default/flow-default-authentication-flow.yaml",
        "default/flow-default-invalidation-flow.yaml",
    )
    def test_radius_bind_fail(self):
        """Test simple bind (failed)"""
>       self._prepare()

tests/e2e/test_provider_radius.py:86: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_radius.TestProviderRadius testMethod=test_radius_bind_fail>

    def _prepare(self) -> User:
        """prepare user, provider, app and container"""
        radius: RadiusProvider = RadiusProvider.objects.create(
            name=generate_id(),
            authorization_flow=Flow.objects.get(slug="default-authentication-flow"),
            shared_secret=self.shared_secret,
        )
        # we need to create an application to actually access radius
        Application.objects.create(name=generate_id(), slug=generate_id(), provider=radius)
        outpost: Outpost = Outpost.objects.create(
            name=generate_id(),
            type=OutpostType.RADIUS,
            _config=asdict(OutpostConfig(log_level="debug")),
        )
        outpost.providers.add(radius)
    
>       self.start_radius(outpost)

tests/e2e/test_provider_radius.py:52: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_radius.TestProviderRadius testMethod=test_radius_bind_fail>
outpost = <Outpost: Outpost jniA5BYH4JIt5INbgrup1bTkCO7BelgYzr4oW1uD>

    def start_radius(self, outpost: Outpost):
        """Start radius container based on outpost created"""
>       self.run_container(
            image=self.get_container_image("ghcr.io/goauthentik/dev-radius"),
            ports={"1812/udp": 1812},
            environment={
                "AUTHENTIK_TOKEN": outpost.token.key,
            },
        )

tests/e2e/test_provider_radius.py:28: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_radius.TestProviderRadius testMethod=test_radius_bind_fail>
specs = {'detach': True, 'environment': {'AUTHENTIK_HOST': 'http://10.1.0.12:36259', 'AUTHENTIK_TOKEN': '3037jaUlGDFDJRMmUhM88...rry-pick-18980-to-version-2025.12', 'labels': {'io.goauthentik.test': 'RDXAQaGNo8TR6CiHJeojCgDs2hCcsglZNFBaAwg1'}, ...}
container = <Container: 93831a938477>
state = {'Dead': False, 'Error': '', 'ExitCode': 0, 'FinishedAt': '0001-01-01T00:00:00Z', ...}

    def run_container(self, **specs: dict[str, Any]) -> Container:
        if "network_mode" not in specs:
            specs["network"] = self.__network.name
        specs["labels"] = self.docker_labels
        specs["detach"] = True
        if hasattr(self, "live_server_url"):
            specs.setdefault("environment", {})
            specs["environment"]["AUTHENTIK_HOST"] = self.live_server_url
        container = self.docker_client.containers.run(**specs)
        container.reload()
        state = container.attrs.get("State", {})
        if "Health" not in state:
            return container
>       self.wait_for_container(container)

tests/e2e/utils.py:127: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_radius.TestProviderRadius testMethod=test_radius_bind_fail>
container = <Container: 93831a938477>

    def wait_for_container(self, container: Container):
        """Check that container is health"""
        attempt = 0
        while True:
            container.reload()
            status = container.attrs.get("State", {}).get("Health", {}).get("Status")
            if status == "healthy":
                return container
            sleep(1)
            attempt += 1
            if attempt >= self.max_healthcheck_attempts:
>               raise self.failureException("Container failed to start")
E               AssertionError: Container failed to start

tests/e2e/utils.py:101: AssertionError
tests.e2e.test_provider_saml.TestProviderSAML::test_sp_initiated_implicit_post_buffer
Stack Traces | 99.2s run time
self = <django.db.backends.utils.CursorWrapper object at 0x7fb6123faf30>
sql = 'TRUNCATE "authentik_sources_oauth_oauthsourcepropertymapping", "authentik_events_notificationwebhookmapping", "authen..._providers_ldap_ldapprovider", "django_channels_postgres_message", "authentik_stages_authenticator_email_emaildevice";'
params = None
ignored_wrapper_args = (False, {'connection': <DatabaseWrapper vendor='postgresql' alias='default'>, 'cursor': <django.db.backends.utils.CursorWrapper object at 0x7fb6123faf30>})

    def _execute(self, sql, params, *ignored_wrapper_args):
        # Raise a warning during app initialization (stored_app_configs is only
        # ever set during testing).
        if not apps.ready and not apps.stored_app_configs:
            warnings.warn(self.APPS_NOT_READY_WARNING_MSG, category=RuntimeWarning)
        self.db.validate_no_broken_transaction()
        with self.db.wrap_database_errors:
            if params is None:
                # params default might be backend specific.
>               return self.cursor.execute(sql)

.venv/lib/python3.13.../db/backends/utils.py:103: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <django_prometheus.db.common.ExportingCursorWrapper.<locals>.CursorWrapper [closed] [IDLE] (host=localhost user=authentik database=test_authentik) at 0x7fb612fef890>
args = ('TRUNCATE "authentik_sources_oauth_oauthsourcepropertymapping", "authentik_events_notificationwebhookmapping", "authe...roviders_ldap_ldapprovider", "django_channels_postgres_message", "authentik_stages_authenticator_email_emaildevice";',)
kwargs = {}

    def execute(self, *args, **kwargs):
        execute_total.labels(alias, vendor).inc()
        with (
            query_duration_seconds.labels(**labels).time(),
            ExceptionCounterByType(errors_total, extra_labels=labels),
        ):
>           return super().execute(*args, **kwargs)

.venv/lib/python3.13.../django_prometheus/db/common.py:69: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <django_prometheus.db.common.ExportingCursorWrapper.<locals>.CursorWrapper [closed] [IDLE] (host=localhost user=authentik database=test_authentik) at 0x7fb612fef890>
query = 'TRUNCATE "authentik_sources_oauth_oauthsourcepropertymapping", "authentik_events_notificationwebhookmapping", "authen..._providers_ldap_ldapprovider", "django_channels_postgres_message", "authentik_stages_authenticator_email_emaildevice";'
params = None

    def execute(
        self,
        query: Query,
        params: Params | None = None,
        *,
        prepare: bool | None = None,
        binary: bool | None = None,
    ) -> Self:
        """
        Execute a query or command to the database.
        """
        try:
            with self._conn.lock:
                self._conn.wait(
                    self._execute_gen(query, params, prepare=prepare, binary=binary)
                )
        except e._NO_TRACEBACK as ex:
>           raise ex.with_traceback(None)
E           psycopg.errors.DeadlockDetected: deadlock detected
E           DETAIL:  Process 310 waits for AccessExclusiveLock on relation 16569 of database 16389; blocked by process 381.
E           Process 381 waits for AccessShareLock on relation 18413 of database 16389; blocked by process 310.
E           HINT:  See server log for query details.

.venv/lib/python3.13....../site-packages/psycopg/cursor.py:97: DeadlockDetected

The above exception was the direct cause of the following exception:

self = <django.core.management.commands.flush.Command object at 0x7fb6137bba80>
options = {'allow_cascade': False, 'database': 'default', 'force_color': False, 'inhibit_post_migrate': False, ...}
database = 'default'
connection = <DatabaseWrapper vendor='postgresql' alias='default'>
verbosity = 0, interactive = False, reset_sequences = False
allow_cascade = False, inhibit_post_migrate = False

        def handle(self, **options):
            database = options["database"]
            connection = connections[database]
            verbosity = options["verbosity"]
            interactive = options["interactive"]
            # The following are stealth options used by Django's internals.
            reset_sequences = options.get("reset_sequences", True)
            allow_cascade = options.get("allow_cascade", False)
            inhibit_post_migrate = options.get("inhibit_post_migrate", False)
    
            self.style = no_style()
    
            # Import the 'management' module within each installed app, to register
            # dispatcher events.
            for app_config in apps.get_app_configs():
                try:
                    import_module(".management", app_config.name)
                except ImportError:
                    pass
    
            sql_list = sql_flush(
                self.style,
                connection,
                reset_sequences=reset_sequences,
                allow_cascade=allow_cascade,
            )
    
            if interactive:
                confirm = input(
                    """You have requested a flush of the database.
    This will IRREVERSIBLY DESTROY all data currently in the "%s" database,
    and return each table to an empty state.
    Are you sure you want to do this?
    
        Type 'yes' to continue, or 'no' to cancel: """
                    % connection.settings_dict["NAME"]
                )
            else:
                confirm = "yes"
    
            if confirm == "yes":
                try:
>                   connection.ops.execute_sql_flush(sql_list)

.venv/lib/python3.13.../management/commands/flush.py:74: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <psqlextra.backend.operations.PostgresOperations object at 0x7fb62022f770>
sql_list = ['TRUNCATE "authentik_sources_oauth_oauthsourcepropertymapping", "authentik_events_notificationwebhookmapping", "authe...providers_ldap_ldapprovider", "django_channels_postgres_message", "authentik_stages_authenticator_email_emaildevice";']

    def execute_sql_flush(self, sql_list):
        """Execute a list of SQL statements to flush the database."""
        with transaction.atomic(
            using=self.connection.alias,
            savepoint=self.connection.features.can_rollback_ddl,
        ):
            with self.connection.cursor() as cursor:
                for sql in sql_list:
>                   cursor.execute(sql)

.venv/lib/python3.13.../backends/base/operations.py:473: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

args = (<django.db.backends.utils.CursorWrapper object at 0x7fb6123faf30>, 'TRUNCATE "authentik_sources_oauth_oauthsourceprop...providers_ldap_ldapprovider", "django_channels_postgres_message", "authentik_stages_authenticator_email_emaildevice";')
kwargs = {}

    def runner(*args: "P.args", **kwargs: "P.kwargs"):
        # type: (...) -> R
        if sentry_sdk.get_client().get_integration(integration) is None:
            return original_function(*args, **kwargs)
    
>       return sentry_patched_function(*args, **kwargs)

.venv/lib/python3.13.../site-packages/sentry_sdk/utils.py:1816: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <django.db.backends.utils.CursorWrapper object at 0x7fb6123faf30>
sql = 'TRUNCATE "authentik_sources_oauth_oauthsourcepropertymapping", "authentik_events_notificationwebhookmapping", "authen..._providers_ldap_ldapprovider", "django_channels_postgres_message", "authentik_stages_authenticator_email_emaildevice";'
params = None

    @ensure_integration_enabled(DjangoIntegration, real_execute)
    def execute(self, sql, params=None):
        # type: (CursorWrapper, Any, Optional[Any]) -> Any
        with record_sql_queries(
            cursor=self.cursor,
            query=sql,
            params_list=params,
            paramstyle="format",
            executemany=False,
            span_origin=DjangoIntegration.origin_db,
        ) as span:
            _set_db_data(span, self)
>           result = real_execute(self, sql, params)

.venv/lib/python3.13.../integrations/django/__init__.py:651: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <django.db.backends.utils.CursorWrapper object at 0x7fb6123faf30>
sql = 'TRUNCATE "authentik_sources_oauth_oauthsourcepropertymapping", "authentik_events_notificationwebhookmapping", "authen..._providers_ldap_ldapprovider", "django_channels_postgres_message", "authentik_stages_authenticator_email_emaildevice";'
params = None

    def execute(self, sql, params=None):
>       return self._execute_with_wrappers(
            sql, params, many=False, executor=self._execute
        )

.venv/lib/python3.13.../db/backends/utils.py:79: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <django.db.backends.utils.CursorWrapper object at 0x7fb6123faf30>
sql = 'TRUNCATE "authentik_sources_oauth_oauthsourcepropertymapping", "authentik_events_notificationwebhookmapping", "authen..._providers_ldap_ldapprovider", "django_channels_postgres_message", "authentik_stages_authenticator_email_emaildevice";'
params = None, many = False
executor = <bound method CursorWrapper._execute of <django.db.backends.utils.CursorWrapper object at 0x7fb6123faf30>>

    def _execute_with_wrappers(self, sql, params, many, executor):
        context = {"connection": self.db, "cursor": self}
        for wrapper in reversed(self.db.execute_wrappers):
            executor = functools.partial(wrapper, executor)
>       return executor(sql, params, many, context)

.venv/lib/python3.13.../db/backends/utils.py:92: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <django.db.backends.utils.CursorWrapper object at 0x7fb6123faf30>
sql = 'TRUNCATE "authentik_sources_oauth_oauthsourcepropertymapping", "authentik_events_notificationwebhookmapping", "authen..._providers_ldap_ldapprovider", "django_channels_postgres_message", "authentik_stages_authenticator_email_emaildevice";'
params = None
ignored_wrapper_args = (False, {'connection': <DatabaseWrapper vendor='postgresql' alias='default'>, 'cursor': <django.db.backends.utils.CursorWrapper object at 0x7fb6123faf30>})

    def _execute(self, sql, params, *ignored_wrapper_args):
        # Raise a warning during app initialization (stored_app_configs is only
        # ever set during testing).
        if not apps.ready and not apps.stored_app_configs:
            warnings.warn(self.APPS_NOT_READY_WARNING_MSG, category=RuntimeWarning)
        self.db.validate_no_broken_transaction()
>       with self.db.wrap_database_errors:

.venv/lib/python3.13.../db/backends/utils.py:100: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <django.db.utils.DatabaseErrorWrapper object at 0x7fb617784ec0>
exc_type = <class 'psycopg.errors.DeadlockDetected'>
exc_value = DeadlockDetected('deadlock detected\nDETAIL:  Process 310 waits for AccessExclusiveLock on relation 16569 of database ...ccessShareLock on relation 18413 of database 16389; blocked by process 310.\nHINT:  See server log for query details.')
traceback = <traceback object at 0x7fb612312d40>

    def __exit__(self, exc_type, exc_value, traceback):
        if exc_type is None:
            return
        for dj_exc_type in (
            DataError,
            OperationalError,
            IntegrityError,
            InternalError,
            ProgrammingError,
            NotSupportedError,
            DatabaseError,
            InterfaceError,
            Error,
        ):
            db_exc_type = getattr(self.wrapper.Database, dj_exc_type.__name__)
            if issubclass(exc_type, db_exc_type):
                dj_exc_value = dj_exc_type(*exc_value.args)
                # Only set the 'errors_occurred' flag for errors that may make
                # the connection unusable.
                if dj_exc_type not in (DataError, IntegrityError):
                    self.wrapper.errors_occurred = True
>               raise dj_exc_value.with_traceback(traceback) from exc_value

.venv/lib/python3.13.../django/db/utils.py:91: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <django.db.backends.utils.CursorWrapper object at 0x7fb6123faf30>
sql = 'TRUNCATE "authentik_sources_oauth_oauthsourcepropertymapping", "authentik_events_notificationwebhookmapping", "authen..._providers_ldap_ldapprovider", "django_channels_postgres_message", "authentik_stages_authenticator_email_emaildevice";'
params = None
ignored_wrapper_args = (False, {'connection': <DatabaseWrapper vendor='postgresql' alias='default'>, 'cursor': <django.db.backends.utils.CursorWrapper object at 0x7fb6123faf30>})

    def _execute(self, sql, params, *ignored_wrapper_args):
        # Raise a warning during app initialization (stored_app_configs is only
        # ever set during testing).
        if not apps.ready and not apps.stored_app_configs:
            warnings.warn(self.APPS_NOT_READY_WARNING_MSG, category=RuntimeWarning)
        self.db.validate_no_broken_transaction()
        with self.db.wrap_database_errors:
            if params is None:
                # params default might be backend specific.
>               return self.cursor.execute(sql)

.venv/lib/python3.13.../db/backends/utils.py:103: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <django_prometheus.db.common.ExportingCursorWrapper.<locals>.CursorWrapper [closed] [IDLE] (host=localhost user=authentik database=test_authentik) at 0x7fb612fef890>
args = ('TRUNCATE "authentik_sources_oauth_oauthsourcepropertymapping", "authentik_events_notificationwebhookmapping", "authe...roviders_ldap_ldapprovider", "django_channels_postgres_message", "authentik_stages_authenticator_email_emaildevice";',)
kwargs = {}

    def execute(self, *args, **kwargs):
        execute_total.labels(alias, vendor).inc()
        with (
            query_duration_seconds.labels(**labels).time(),
            ExceptionCounterByType(errors_total, extra_labels=labels),
        ):
>           return super().execute(*args, **kwargs)

.venv/lib/python3.13.../django_prometheus/db/common.py:69: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <django_prometheus.db.common.ExportingCursorWrapper.<locals>.CursorWrapper [closed] [IDLE] (host=localhost user=authentik database=test_authentik) at 0x7fb612fef890>
query = 'TRUNCATE "authentik_sources_oauth_oauthsourcepropertymapping", "authentik_events_notificationwebhookmapping", "authen..._providers_ldap_ldapprovider", "django_channels_postgres_message", "authentik_stages_authenticator_email_emaildevice";'
params = None

    def execute(
        self,
        query: Query,
        params: Params | None = None,
        *,
        prepare: bool | None = None,
        binary: bool | None = None,
    ) -> Self:
        """
        Execute a query or command to the database.
        """
        try:
            with self._conn.lock:
                self._conn.wait(
                    self._execute_gen(query, params, prepare=prepare, binary=binary)
                )
        except e._NO_TRACEBACK as ex:
>           raise ex.with_traceback(None)
E           django.db.utils.OperationalError: deadlock detected
E           DETAIL:  Process 310 waits for AccessExclusiveLock on relation 16569 of database 16389; blocked by process 381.
E           Process 381 waits for AccessShareLock on relation 18413 of database 16389; blocked by process 310.
E           HINT:  See server log for query details.

.venv/lib/python3.13....../site-packages/psycopg/cursor.py:97: OperationalError

The above exception was the direct cause of the following exception:

self = <tests.e2e.test_provider_saml.TestProviderSAML testMethod=test_sp_initiated_implicit_post_buffer>
result = <TestCaseFunction test_sp_initiated_implicit_post_buffer>
debug = False

    def _setup_and_call(self, result, debug=False):
        """
        Perform the following in order: pre-setup, run test, post-teardown,
        skipping pre/post hooks if test is set to be skipped.
    
        If debug=True, reraise any errors in setup and use super().debug()
        instead of __call__() to run the test.
        """
        testMethod = getattr(self, self._testMethodName)
        skipped = getattr(self.__class__, "__unittest_skip__", False) or getattr(
            testMethod, "__unittest_skip__", False
        )
    
        # Convert async test methods.
        if iscoroutinefunction(testMethod):
            setattr(self, self._testMethodName, async_to_sync(testMethod))
    
        if not skipped:
            try:
                if self.__class__._pre_setup_ran_eagerly:
                    self.__class__._pre_setup_ran_eagerly = False
                else:
                    self._pre_setup()
            except Exception:
                if debug:
                    raise
                result.addError(self, sys.exc_info())
                return
        if debug:
            super().debug()
        else:
            super().__call__(result)
        if not skipped:
            try:
>               self._post_teardown()

.venv/lib/python3.13.../django/test/testcases.py:379: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_saml.TestProviderSAML testMethod=test_sp_initiated_implicit_post_buffer>

    def _post_teardown(self):
        """
        Perform post-test things:
        * Flush the contents of the database to leave a clean slate. If the
          class has an 'available_apps' attribute, don't fire post_migrate.
        * Force-close the connection so the next test gets a clean cursor.
        """
        try:
>           self._fixture_teardown()

.venv/lib/python3.13.../django/test/testcases.py:1231: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_saml.TestProviderSAML testMethod=test_sp_initiated_implicit_post_buffer>

    def _fixture_teardown(self):
        # Allow TRUNCATE ... CASCADE and don't emit the post_migrate signal
        # when flushing only a subset of the apps
        for db_name in self._databases_names(include_mirrors=False):
            # Flush the database
            inhibit_post_migrate = (
                self.available_apps is not None
                or (  # Inhibit the post_migrate signal when using serialized
                    # rollback to avoid trying to recreate the serialized data.
                    self.serialized_rollback
                    and hasattr(connections[db_name], "_test_serialized_contents")
                )
            )
>           call_command(
                "flush",
                verbosity=0,
                interactive=False,
                database=db_name,
                reset_sequences=False,
                allow_cascade=self.available_apps is not None,
                inhibit_post_migrate=inhibit_post_migrate,
            )

.venv/lib/python3.13.../django/test/testcases.py:1266: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

command_name = 'flush', args = ()
options = {'allow_cascade': False, 'database': 'default', 'inhibit_post_migrate': False, 'interactive': False, ...}
command = <django.core.management.commands.flush.Command object at 0x7fb6137bba80>
app_name = 'django.core'
parser = CommandParser(prog=' flush', usage=None, description='Removes ALL DATA from the database, including data added during ....', formatter_class=<class 'django.core.management.base.DjangoHelpFormatter'>, conflict_handler='error', add_help=True)
opt_mapping = {'database': 'database', 'force_color': 'force_color', 'help': 'help', 'no_color': 'no_color', ...}
arg_options = {'allow_cascade': False, 'database': 'default', 'inhibit_post_migrate': False, 'interactive': False, ...}
parse_args = []

    def call_command(command_name, *args, **options):
        """
        Call the given command, with the given options and args/kwargs.
    
        This is the primary API you should use for calling specific commands.
    
        `command_name` may be a string or a command object. Using a string is
        preferred unless the command object is required for further processing or
        testing.
    
        Some examples:
            call_command('migrate')
            call_command('shell', plain=True)
            call_command('sqlmigrate', 'myapp')
    
            from django.core.management.commands import flush
            cmd = flush.Command()
            call_command(cmd, verbosity=0, interactive=False)
            # Do something with cmd ...
        """
        if isinstance(command_name, BaseCommand):
            # Command object passed in.
            command = command_name
            command_name = command.__class__.__module__.split(".")[-1]
        else:
            # Load the command object by name.
            try:
                app_name = get_commands()[command_name]
            except KeyError:
                raise CommandError("Unknown command: %r" % command_name)
    
            if isinstance(app_name, BaseCommand):
                # If the command is already loaded, use it directly.
                command = app_name
            else:
                command = load_command_class(app_name, command_name)
    
        # Simulate argument parsing to get the option defaults (see #10080 for details).
        parser = command.create_parser("", command_name)
        # Use the `dest` option name from the parser option
        opt_mapping = {
            min(s_opt.option_strings).lstrip("-").replace("-", "_"): s_opt.dest
            for s_opt in parser._actions
            if s_opt.option_strings
        }
        arg_options = {opt_mapping.get(key, key): value for key, value in options.items()}
        parse_args = []
        for arg in args:
            if isinstance(arg, (list, tuple)):
                parse_args += map(str, arg)
            else:
                parse_args.append(str(arg))
    
        def get_actions(parser):
            # Parser actions and actions from sub-parser choices.
            for opt in parser._actions:
                if isinstance(opt, _SubParsersAction):
                    for sub_opt in opt.choices.values():
                        yield from get_actions(sub_opt)
                else:
                    yield opt
    
        parser_actions = list(get_actions(parser))
        mutually_exclusive_required_options = {
            opt
            for group in parser._mutually_exclusive_groups
            for opt in group._group_actions
            if group.required
        }
        # Any required arguments which are passed in via **options must be passed
        # to parse_args().
        for opt in parser_actions:
            if opt.dest in options and (
                opt.required or opt in mutually_exclusive_required_options
            ):
                opt_dest_count = sum(v == opt.dest for v in opt_mapping.values())
                if opt_dest_count > 1:
                    raise TypeError(
                        f"Cannot pass the dest {opt.dest!r} that matches multiple "
                        f"arguments via **options."
                    )
                parse_args.append(min(opt.option_strings))
                if isinstance(opt, (_AppendConstAction, _CountAction, _StoreConstAction)):
                    continue
                value = arg_options[opt.dest]
                if isinstance(value, (list, tuple)):
                    parse_args += map(str, value)
                else:
                    parse_args.append(str(value))
        defaults = parser.parse_args(args=parse_args)
        defaults = dict(defaults._get_kwargs(), **arg_options)
        # Raise an error if any unknown options were passed.
        stealth_options = set(command.base_stealth_options + command.stealth_options)
        dest_parameters = {action.dest for action in parser_actions}
        valid_options = (dest_parameters | stealth_options).union(opt_mapping)
        unknown_options = set(options) - valid_options
        if unknown_options:
            raise TypeError(
                "Unknown option(s) for %s command: %s. "
                "Valid options are: %s."
                % (
                    command_name,
                    ", ".join(sorted(unknown_options)),
                    ", ".join(sorted(valid_options)),
                )
            )
        # Move positional args out of options to mimic legacy optparse
        args = defaults.pop("args", ())
        if "skip_checks" not in options:
            defaults["skip_checks"] = True
    
>       return command.execute(*args, **defaults)

.venv/lib/python3.13.../core/management/__init__.py:194: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <django.core.management.commands.flush.Command object at 0x7fb6137bba80>
args = ()
options = {'allow_cascade': False, 'database': 'default', 'force_color': False, 'inhibit_post_migrate': False, ...}

    def execute(self, *args, **options):
        """
        Try to execute this command, performing system checks if needed (as
        controlled by the ``requires_system_checks`` attribute, except if
        force-skipped).
        """
        if options["force_color"] and options["no_color"]:
            raise CommandError(
                "The --no-color and --force-color options can't be used together."
            )
        if options["force_color"]:
            self.style = color_style(force_color=True)
        elif options["no_color"]:
            self.style = no_style()
            self.stderr.style_func = None
        if options.get("stdout"):
            self.stdout = OutputWrapper(options["stdout"])
        if options.get("stderr"):
            self.stderr = OutputWrapper(options["stderr"])
    
        if self.requires_system_checks and not options["skip_checks"]:
            check_kwargs = self.get_check_kwargs(options)
            self.check(**check_kwargs)
        if self.requires_migrations_checks:
            self.check_migrations()
>       output = self.handle(*args, **options)

.venv/lib/python3.13.../core/management/base.py:460: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <django.core.management.commands.flush.Command object at 0x7fb6137bba80>
options = {'allow_cascade': False, 'database': 'default', 'force_color': False, 'inhibit_post_migrate': False, ...}
database = 'default'
connection = <DatabaseWrapper vendor='postgresql' alias='default'>
verbosity = 0, interactive = False, reset_sequences = False
allow_cascade = False, inhibit_post_migrate = False

        def handle(self, **options):
            database = options["database"]
            connection = connections[database]
            verbosity = options["verbosity"]
            interactive = options["interactive"]
            # The following are stealth options used by Django's internals.
            reset_sequences = options.get("reset_sequences", True)
            allow_cascade = options.get("allow_cascade", False)
            inhibit_post_migrate = options.get("inhibit_post_migrate", False)
    
            self.style = no_style()
    
            # Import the 'management' module within each installed app, to register
            # dispatcher events.
            for app_config in apps.get_app_configs():
                try:
                    import_module(".management", app_config.name)
                except ImportError:
                    pass
    
            sql_list = sql_flush(
                self.style,
                connection,
                reset_sequences=reset_sequences,
                allow_cascade=allow_cascade,
            )
    
            if interactive:
                confirm = input(
                    """You have requested a flush of the database.
    This will IRREVERSIBLY DESTROY all data currently in the "%s" database,
    and return each table to an empty state.
    Are you sure you want to do this?
    
        Type 'yes' to continue, or 'no' to cancel: """
                    % connection.settings_dict["NAME"]
                )
            else:
                confirm = "yes"
    
            if confirm == "yes":
                try:
                    connection.ops.execute_sql_flush(sql_list)
                except Exception as exc:
>                   raise CommandError(
                        "Database %s couldn't be flushed. Possible reasons:\n"
                        "  * The database isn't running or isn't configured correctly.\n"
                        "  * At least one of the expected database tables doesn't exist.\n"
                        "  * The SQL was invalid.\n"
                        "Hint: Look at the output of 'django-admin sqlflush'. "
                        "That's the SQL this command wasn't able to run."
                        % (connection.settings_dict["NAME"],)
                    ) from exc
E                   django.core.management.base.CommandError: Database test_authentik couldn't be flushed. Possible reasons:
E                     * The database isn't running or isn't configured correctly.
E                     * At least one of the expected database tables doesn't exist.
E                     * The SQL was invalid.
E                   Hint: Look at the output of 'django-admin sqlflush'. That's the SQL this command wasn't able to run.

.venv/lib/python3.13.../management/commands/flush.py:76: CommandError
tests.e2e.test_provider_radius.TestProviderRadius::test_radius_bind_success
Stack Traces | 143s run time
self = <unittest.case._Outcome object at 0x7fc2a5c423c0>
test_case = <tests.e2e.test_provider_radius.TestProviderRadius testMethod=test_radius_bind_success>
subTest = False

    @contextlib.contextmanager
    def testPartExecutor(self, test_case, subTest=False):
        old_success = self.success
        self.success = True
        try:
>           yield

.../hostedtoolcache/Python/3.13.11........./x64/lib/python3.13/unittest/case.py:58: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_radius.TestProviderRadius testMethod=test_radius_bind_success>
result = <TestCaseFunction test_radius_bind_success>

    def run(self, result=None):
        if result is None:
            result = self.defaultTestResult()
            startTestRun = getattr(result, 'startTestRun', None)
            stopTestRun = getattr(result, 'stopTestRun', None)
            if startTestRun is not None:
                startTestRun()
        else:
            stopTestRun = None
    
        result.startTest(self)
        try:
            testMethod = getattr(self, self._testMethodName)
            if (getattr(self.__class__, "__unittest_skip__", False) or
                getattr(testMethod, "__unittest_skip__", False)):
                # If the class or method was skipped.
                skip_why = (getattr(self.__class__, '__unittest_skip_why__', '')
                            or getattr(testMethod, '__unittest_skip_why__', ''))
                _addSkip(result, self, skip_why)
                return result
    
            expecting_failure = (
                getattr(self, "__unittest_expecting_failure__", False) or
                getattr(testMethod, "__unittest_expecting_failure__", False)
            )
            outcome = _Outcome(result)
            start_time = time.perf_counter()
            try:
                self._outcome = outcome
    
                with outcome.testPartExecutor(self):
                    self._callSetUp()
                if outcome.success:
                    outcome.expecting_failure = expecting_failure
                    with outcome.testPartExecutor(self):
>                       self._callTestMethod(testMethod)

.../hostedtoolcache/Python/3.13.11........./x64/lib/python3.13/unittest/case.py:651: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_radius.TestProviderRadius testMethod=test_radius_bind_success>
method = <bound method TestProviderRadius.test_radius_bind_success of <tests.e2e.test_provider_radius.TestProviderRadius testMethod=test_radius_bind_success>>

    def _callTestMethod(self, method):
>       if method() is not None:

.../hostedtoolcache/Python/3.13.11........./x64/lib/python3.13/unittest/case.py:606: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_radius.TestProviderRadius testMethod=test_radius_bind_success>
args = (), kwargs = {}

    @wraps(func)
    def wrapper(self: TransactionTestCase, *args, **kwargs):
        """Run test again if we're below max_retries, including tearDown and
        setUp. Otherwise raise the error"""
        nonlocal count
        try:
>           return func(self, *args, **kwargs)

tests/e2e/utils.py:492: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

args = (<tests.e2e.test_provider_radius.TestProviderRadius testMethod=test_radius_bind_success>,)
kwargs = {}, file = 'default/flow-default-invalidation-flow.yaml'
content = 'version: 1\nmetadata:\n  name: Default - Invalidation flow\nentries:\n- attrs:\n    designation: invalidation\n    na...0\n    stage: !KeyOf default-invalidation-logout\n    target: !KeyOf flow\n  model: authentik_flows.flowstagebinding\n'

    @wraps(func)
    def wrapper(*args, **kwargs):
        for file in files:
            content = BlueprintInstance(path=file).retrieve()
            Importer.from_string(content).apply()
>       return func(*args, **kwargs)

.../blueprints/tests/__init__.py:25: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_radius.TestProviderRadius testMethod=test_radius_bind_success>

    @retry(exceptions=[Timeout])
    @apply_blueprint(
        "default/flow-default-authentication-flow.yaml",
        "default/flow-default-invalidation-flow.yaml",
    )
    def test_radius_bind_success(self):
        """Test simple bind"""
>       self._prepare()

tests/e2e/test_provider_radius.py:64: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_radius.TestProviderRadius testMethod=test_radius_bind_success>

    def _prepare(self) -> User:
        """prepare user, provider, app and container"""
        radius: RadiusProvider = RadiusProvider.objects.create(
            name=generate_id(),
            authorization_flow=Flow.objects.get(slug="default-authentication-flow"),
            shared_secret=self.shared_secret,
        )
        # we need to create an application to actually access radius
        Application.objects.create(name=generate_id(), slug=generate_id(), provider=radius)
        outpost: Outpost = Outpost.objects.create(
            name=generate_id(),
            type=OutpostType.RADIUS,
            _config=asdict(OutpostConfig(log_level="debug")),
        )
        outpost.providers.add(radius)
    
>       self.start_radius(outpost)

tests/e2e/test_provider_radius.py:52: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_radius.TestProviderRadius testMethod=test_radius_bind_success>
outpost = <Outpost: Outpost lyuifMhidtLY0agmTSaYAGxiyuNnbqyMpOhLdXrU>

    def start_radius(self, outpost: Outpost):
        """Start radius container based on outpost created"""
>       self.run_container(
            image=self.get_container_image("ghcr.io/goauthentik/dev-radius"),
            ports={"1812/udp": 1812},
            environment={
                "AUTHENTIK_TOKEN": outpost.token.key,
            },
        )

tests/e2e/test_provider_radius.py:28: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_radius.TestProviderRadius testMethod=test_radius_bind_success>
specs = {'detach': True, 'environment': {'AUTHENTIK_HOST': 'http://10.1.0.12:36259', 'AUTHENTIK_TOKEN': 'p7eJFNMcikQcJtFafFvq6...rry-pick-18980-to-version-2025.12', 'labels': {'io.goauthentik.test': 'RDXAQaGNo8TR6CiHJeojCgDs2hCcsglZNFBaAwg1'}, ...}
container = <Container: 05136549ebbd>
state = {'Dead': False, 'Error': '', 'ExitCode': 0, 'FinishedAt': '0001-01-01T00:00:00Z', ...}

    def run_container(self, **specs: dict[str, Any]) -> Container:
        if "network_mode" not in specs:
            specs["network"] = self.__network.name
        specs["labels"] = self.docker_labels
        specs["detach"] = True
        if hasattr(self, "live_server_url"):
            specs.setdefault("environment", {})
            specs["environment"]["AUTHENTIK_HOST"] = self.live_server_url
        container = self.docker_client.containers.run(**specs)
        container.reload()
        state = container.attrs.get("State", {})
        if "Health" not in state:
            return container
>       self.wait_for_container(container)

tests/e2e/utils.py:127: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_radius.TestProviderRadius testMethod=test_radius_bind_success>
container = <Container: 05136549ebbd>

    def wait_for_container(self, container: Container):
        """Check that container is health"""
        attempt = 0
        while True:
            container.reload()
            status = container.attrs.get("State", {}).get("Health", {}).get("Status")
            if status == "healthy":
                return container
            sleep(1)
            attempt += 1
            if attempt >= self.max_healthcheck_attempts:
>               raise self.failureException("Container failed to start")
E               AssertionError: Container failed to start

tests/e2e/utils.py:101: AssertionError
tests.e2e.test_provider_proxy_forward.TestProviderProxyForward::test_envoy
Stack Traces | 154s run time
self = <unittest.case._Outcome object at 0x7fc2ea7fde80>
test_case = <tests.e2e.test_provider_proxy_forward.TestProviderProxyForward testMethod=test_envoy>
subTest = False

    @contextlib.contextmanager
    def testPartExecutor(self, test_case, subTest=False):
        old_success = self.success
        self.success = True
        try:
>           yield

.../hostedtoolcache/Python/3.13.11........./x64/lib/python3.13/unittest/case.py:58: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_proxy_forward.TestProviderProxyForward testMethod=test_envoy>
result = <TestCaseFunction test_envoy>

    def run(self, result=None):
        if result is None:
            result = self.defaultTestResult()
            startTestRun = getattr(result, 'startTestRun', None)
            stopTestRun = getattr(result, 'stopTestRun', None)
            if startTestRun is not None:
                startTestRun()
        else:
            stopTestRun = None
    
        result.startTest(self)
        try:
            testMethod = getattr(self, self._testMethodName)
            if (getattr(self.__class__, "__unittest_skip__", False) or
                getattr(testMethod, "__unittest_skip__", False)):
                # If the class or method was skipped.
                skip_why = (getattr(self.__class__, '__unittest_skip_why__', '')
                            or getattr(testMethod, '__unittest_skip_why__', ''))
                _addSkip(result, self, skip_why)
                return result
    
            expecting_failure = (
                getattr(self, "__unittest_expecting_failure__", False) or
                getattr(testMethod, "__unittest_expecting_failure__", False)
            )
            outcome = _Outcome(result)
            start_time = time.perf_counter()
            try:
                self._outcome = outcome
    
                with outcome.testPartExecutor(self):
                    self._callSetUp()
                if outcome.success:
                    outcome.expecting_failure = expecting_failure
                    with outcome.testPartExecutor(self):
>                       self._callTestMethod(testMethod)

.../hostedtoolcache/Python/3.13.11........./x64/lib/python3.13/unittest/case.py:651: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_proxy_forward.TestProviderProxyForward testMethod=test_envoy>
method = <bound method TestProviderProxyForward.test_envoy of <tests.e2e.test_provider_proxy_forward.TestProviderProxyForward testMethod=test_envoy>>

    def _callTestMethod(self, method):
>       if method() is not None:

.../hostedtoolcache/Python/3.13.11........./x64/lib/python3.13/unittest/case.py:606: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_proxy_forward.TestProviderProxyForward testMethod=test_envoy>
args = (), kwargs = {}

    @wraps(func)
    def wrapper(self: TransactionTestCase, *args, **kwargs):
        """Run test again if we're below max_retries, including tearDown and
        setUp. Otherwise raise the error"""
        nonlocal count
        try:
>           return func(self, *args, **kwargs)

tests/e2e/utils.py:492: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_proxy_forward.TestProviderProxyForward testMethod=test_envoy>

    @retry()
    def test_envoy(self):
        """Test envoy"""
        self.run_container(
            image="docker.io/envoyproxy/envoy:v1.25-latest",
            ports={
                "10000": "80",
            },
            volumes={
                f"{Path(__file__).parent / "proxy_forward_auth" / "envoy_single" / "envoy.yaml"}": {
                    "bind": ".../etc/envoy/envoy.yaml",
                }
            },
        )
    
>       self.prepare()

tests/e2e/test_provider_proxy_forward.py:176: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

args = (<tests.e2e.test_provider_proxy_forward.TestProviderProxyForward testMethod=test_envoy>,)
kwargs = {}, file = 'default/flow-default-invalidation-flow.yaml'
content = 'version: 1\nmetadata:\n  name: Default - Invalidation flow\nentries:\n- attrs:\n    designation: invalidation\n    na...0\n    stage: !KeyOf default-invalidation-logout\n    target: !KeyOf flow\n  model: authentik_flows.flowstagebinding\n'

    @wraps(func)
    def wrapper(*args, **kwargs):
        for file in files:
            content = BlueprintInstance(path=file).retrieve()
            Importer.from_string(content).apply()
>       return func(*args, **kwargs)

.../blueprints/tests/__init__.py:25: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

args = (<tests.e2e.test_provider_proxy_forward.TestProviderProxyForward testMethod=test_envoy>,)
kwargs = {}, file = 'default/flow-default-provider-invalidation.yaml'
content = 'version: 1\nmetadata:\n  name: Default - Provider invalidation flow\nentries:\n- attrs:\n    designation: invalidatio...ation: none\n  identifiers:\n    slug: default-provider-invalidation-flow\n  model: authentik_flows.flow\n  id: flow\n'

    @wraps(func)
    def wrapper(*args, **kwargs):
        for file in files:
            content = BlueprintInstance(path=file).retrieve()
            Importer.from_string(content).apply()
>       return func(*args, **kwargs)

.../blueprints/tests/__init__.py:25: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

args = (<tests.e2e.test_provider_proxy_forward.TestProviderProxyForward testMethod=test_envoy>,)
kwargs = {}, file = 'system/providers-proxy.yaml'
content = 'version: 1\nmetadata:\n  labels:\n    blueprints.goauthentik.io/system: "true"\n  name: System - Proxy Provider - Sco...ser.group_attributes(request),\n                "is_superuser": request.user.is_superuser,\n            }\n        }\n'

    @wraps(func)
    def wrapper(*args, **kwargs):
        for file in files:
            content = BlueprintInstance(path=file).retrieve()
            Importer.from_string(content).apply()
>       return func(*args, **kwargs)

.../blueprints/tests/__init__.py:25: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

args = (<tests.e2e.test_provider_proxy_forward.TestProviderProxyForward testMethod=test_envoy>,)
kwargs = {}, config = <AuthentikCryptoConfig: authentik_crypto>

    @wraps(func)
    def wrapper(*args, **kwargs):
        config = apps.get_app_config(app_name)
        if isinstance(config, ManagedAppConfig):
            config._on_startup_callback(None)
>       return func(*args, **kwargs)

.../blueprints/tests/__init__.py:43: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_proxy_forward.TestProviderProxyForward testMethod=test_envoy>

    @apply_blueprint(
        "default/flow-default-authentication-flow.yaml",
        "default/flow-default-invalidation-flow.yaml",
    )
    @apply_blueprint(
        "default/flow-default-provider-authorization-implicit-consent.yaml",
        "default/flow-default-provider-invalidation.yaml",
    )
    @apply_blueprint(
        "system/providers-oauth2.yaml",
        "system/providers-proxy.yaml",
    )
    @reconcile_app("authentik_crypto")
    def prepare(self):
        proxy: ProxyProvider = ProxyProvider.objects.create(
            name=generate_id(),
            mode=ProxyMode.FORWARD_SINGLE,
            authorization_flow=Flow.objects.get(
                slug="default-provider-authorization-implicit-consent"
            ),
            invalidation_flow=Flow.objects.get(slug="default-provider-invalidation-flow"),
            internal_host=f"http://{self.host}",
            external_host="http://localhost",
        )
        # Ensure OAuth2 Params are set
        proxy.set_oauth_defaults()
        proxy.save()
        # we need to create an application to actually access the proxy
        Application.objects.create(name=generate_id(), slug=generate_id(), provider=proxy)
        outpost: Outpost = Outpost.objects.create(
            name=generate_id(),
            type=OutpostType.PROXY,
        )
        outpost.providers.add(proxy)
        outpost.build_user_permissions(outpost.user)
    
>       self.start_outpost(outpost)

tests/e2e/test_provider_proxy_forward.py:78: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_proxy_forward.TestProviderProxyForward testMethod=test_envoy>
outpost = <Outpost: Outpost 2FEsYlbSmxW1V4wsIu2m0SKLMfkLr30ECsoHd6qa>

    def start_outpost(self, outpost: Outpost):
        """Start proxy container based on outpost created"""
>       self.run_container(
            image=self.get_container_image("ghcr.io/goauthentik/dev-proxy"),
            ports={
                "9000": "9000",
            },
            environment={
                "AUTHENTIK_TOKEN": outpost.token.key,
            },
            name="ak-test-outpost",
        )

tests/e2e/test_provider_proxy_forward.py:31: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_proxy_forward.TestProviderProxyForward testMethod=test_envoy>
specs = {'detach': True, 'environment': {'AUTHENTIK_HOST': 'http://10.1.0.43:38703', 'AUTHENTIK_TOKEN': 'rAmYtIdCvGv9YC2yvAxlo...rry-pick-18980-to-version-2025.12', 'labels': {'io.goauthentik.test': 'yL1RkvpOKuEc2Vau5KRHjWB7jje0mXUeCXksjXWQ'}, ...}
container = <Container: c08f4147bc29>
state = {'Dead': False, 'Error': '', 'ExitCode': 0, 'FinishedAt': '0001-01-01T00:00:00Z', ...}

    def run_container(self, **specs: dict[str, Any]) -> Container:
        if "network_mode" not in specs:
            specs["network"] = self.__network.name
        specs["labels"] = self.docker_labels
        specs["detach"] = True
        if hasattr(self, "live_server_url"):
            specs.setdefault("environment", {})
            specs["environment"]["AUTHENTIK_HOST"] = self.live_server_url
        container = self.docker_client.containers.run(**specs)
        container.reload()
        state = container.attrs.get("State", {})
        if "Health" not in state:
            return container
>       self.wait_for_container(container)

tests/e2e/utils.py:127: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_proxy_forward.TestProviderProxyForward testMethod=test_envoy>
container = <Container: c08f4147bc29>

    def wait_for_container(self, container: Container):
        """Check that container is health"""
        attempt = 0
        while True:
            container.reload()
            status = container.attrs.get("State", {}).get("Health", {}).get("Status")
            if status == "healthy":
                return container
            sleep(1)
            attempt += 1
            if attempt >= self.max_healthcheck_attempts:
>               raise self.failureException("Container failed to start")
E               AssertionError: Container failed to start

tests/e2e/utils.py:101: AssertionError

To view more test analytics, go to the Test Analytics Dashboard
📋 Got 3 mins? Take this short survey to help us improve Test Analytics.

* this has been broken for a while but no one noticed...? cc @rissson

Signed-off-by: Jens Langhammer <jens@goauthentik.io>

* send WS broadcast for new notifications

Signed-off-by: Jens Langhammer <jens@goauthentik.io>

* add tests

Signed-off-by: Jens Langhammer <jens@goauthentik.io>

* better layout

Signed-off-by: Jens Langhammer <jens@goauthentik.io>

* fix e2e tests

Signed-off-by: Jens Langhammer <jens@goauthentik.io>

---------

Signed-off-by: Jens Langhammer <jens@goauthentik.io>
@BeryJu BeryJu force-pushed the cherry-pick/18980-to-version-2025.12 branch from cf550c7 to db3244a Compare December 24, 2025 12:55
@BeryJu BeryJu merged commit dfd11ce into version-2025.12 Dec 24, 2025
78 of 85 checks passed
@BeryJu BeryJu deleted the cherry-pick/18980-to-version-2025.12 branch December 24, 2025 13:29
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant