Skip to content

website/docs: fix Transifex link in translation guide (cherry-pick #19735 to version-2025.12)#19771

Merged
BeryJu merged 2 commits intoversion-2025.12from
cherry-pick/19735-to-version-2025.12
Jan 27, 2026
Merged

website/docs: fix Transifex link in translation guide (cherry-pick #19735 to version-2025.12)#19771
BeryJu merged 2 commits intoversion-2025.12from
cherry-pick/19735-to-version-2025.12

Conversation

@authentik-automation
Copy link
Contributor

Cherry-pick of #19735 to version-2025.12 branch.

Original PR: #19735
Original Author: @dominic-r
Cherry-picked commit: 6653ea3

@netlify
Copy link

netlify bot commented Jan 26, 2026

Deploy Preview for authentik-integrations ready!

Name Link
🔨 Latest commit da08a7b
🔍 Latest deploy log https://app.netlify.com/projects/authentik-integrations/deploys/6977c02d5162c000086e46d2
😎 Deploy Preview https://deploy-preview-19771--authentik-integrations.netlify.app
📱 Preview on mobile
Toggle QR Code...

QR Code

Use your smartphone camera to open QR code link.

To edit notification comments on pull requests, go to your Netlify project configuration.

@codecov
Copy link

codecov bot commented Jan 26, 2026

❌ 15 Tests Failed:

Tests completed Failed Passed Skipped
2869 15 2854 4
View the top 3 failed test(s) by shortest run time
tests.e2e.test_provider_oauth2_grafana.TestProviderOAuth2OAuth::test_authorization_logout
Stack Traces | 19.4s run time
self = <django.db.backends.utils.CursorWrapper object at 0x7f34c9e312b0>
sql = 'TRUNCATE "authentik_providers_saml_samlsession", "authentik_providers_rac_connectiontoken", "authentik_providers_oaut..., "authentik_providers_google_workspace_googleworkspaceprovide8e50", "authentik_policies_reputation_reputationpolicy";'
params = None
ignored_wrapper_args = (False, {'connection': <DatabaseWrapper vendor='postgresql' alias='default'>, 'cursor': <django.db.backends.utils.CursorWrapper object at 0x7f34c9e312b0>})

    def _execute(self, sql, params, *ignored_wrapper_args):
        # Raise a warning during app initialization (stored_app_configs is only
        # ever set during testing).
        if not apps.ready and not apps.stored_app_configs:
            warnings.warn(self.APPS_NOT_READY_WARNING_MSG, category=RuntimeWarning)
        self.db.validate_no_broken_transaction()
        with self.db.wrap_database_errors:
            if params is None:
                # params default might be backend specific.
>               return self.cursor.execute(sql)

.venv/lib/python3.13.../db/backends/utils.py:103: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <django_prometheus.db.common.ExportingCursorWrapper.<locals>.CursorWrapper [closed] [IDLE] (host=localhost user=authentik database=test_authentik) at 0x7f34ca1841d0>
args = ('TRUNCATE "authentik_providers_saml_samlsession", "authentik_providers_rac_connectiontoken", "authentik_providers_oau..."authentik_providers_google_workspace_googleworkspaceprovide8e50", "authentik_policies_reputation_reputationpolicy";',)
kwargs = {}

    def execute(self, *args, **kwargs):
        execute_total.labels(alias, vendor).inc()
        with (
            query_duration_seconds.labels(**labels).time(),
            ExceptionCounterByType(errors_total, extra_labels=labels),
        ):
>           return super().execute(*args, **kwargs)

.venv/lib/python3.13.../django_prometheus/db/common.py:69: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <django_prometheus.db.common.ExportingCursorWrapper.<locals>.CursorWrapper [closed] [IDLE] (host=localhost user=authentik database=test_authentik) at 0x7f34ca1841d0>
query = 'TRUNCATE "authentik_providers_saml_samlsession", "authentik_providers_rac_connectiontoken", "authentik_providers_oaut..., "authentik_providers_google_workspace_googleworkspaceprovide8e50", "authentik_policies_reputation_reputationpolicy";'
params = None

    def execute(
        self,
        query: Query,
        params: Params | None = None,
        *,
        prepare: bool | None = None,
        binary: bool | None = None,
    ) -> Self:
        """
        Execute a query or command to the database.
        """
        try:
            with self._conn.lock:
                self._conn.wait(
                    self._execute_gen(query, params, prepare=prepare, binary=binary)
                )
        except e._NO_TRACEBACK as ex:
>           raise ex.with_traceback(None)
E           psycopg.errors.DeadlockDetected: deadlock detected
E           DETAIL:  Process 239 waits for AccessExclusiveLock on relation 18272 of database 16389; blocked by process 257.
E           Process 257 waits for AccessShareLock on relation 19465 of database 16389; blocked by process 239.
E           HINT:  See server log for query details.

.venv/lib/python3.13....../site-packages/psycopg/cursor.py:97: DeadlockDetected

The above exception was the direct cause of the following exception:

self = <django.core.management.commands.flush.Command object at 0x7f34c8843130>
options = {'allow_cascade': False, 'database': 'default', 'force_color': False, 'inhibit_post_migrate': False, ...}
database = 'default'
connection = <DatabaseWrapper vendor='postgresql' alias='default'>
verbosity = 0, interactive = False, reset_sequences = False
allow_cascade = False, inhibit_post_migrate = False

        def handle(self, **options):
            database = options["database"]
            connection = connections[database]
            verbosity = options["verbosity"]
            interactive = options["interactive"]
            # The following are stealth options used by Django's internals.
            reset_sequences = options.get("reset_sequences", True)
            allow_cascade = options.get("allow_cascade", False)
            inhibit_post_migrate = options.get("inhibit_post_migrate", False)
    
            self.style = no_style()
    
            # Import the 'management' module within each installed app, to register
            # dispatcher events.
            for app_config in apps.get_app_configs():
                try:
                    import_module(".management", app_config.name)
                except ImportError:
                    pass
    
            sql_list = sql_flush(
                self.style,
                connection,
                reset_sequences=reset_sequences,
                allow_cascade=allow_cascade,
            )
    
            if interactive:
                confirm = input(
                    """You have requested a flush of the database.
    This will IRREVERSIBLY DESTROY all data currently in the "%s" database,
    and return each table to an empty state.
    Are you sure you want to do this?
    
        Type 'yes' to continue, or 'no' to cancel: """
                    % connection.settings_dict["NAME"]
                )
            else:
                confirm = "yes"
    
            if confirm == "yes":
                try:
>                   connection.ops.execute_sql_flush(sql_list)

.venv/lib/python3.13.../management/commands/flush.py:74: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <psqlextra.backend.operations.PostgresOperations object at 0x7f34d4407b60>
sql_list = ['TRUNCATE "authentik_providers_saml_samlsession", "authentik_providers_rac_connectiontoken", "authentik_providers_oau... "authentik_providers_google_workspace_googleworkspaceprovide8e50", "authentik_policies_reputation_reputationpolicy";']

    def execute_sql_flush(self, sql_list):
        """Execute a list of SQL statements to flush the database."""
        with transaction.atomic(
            using=self.connection.alias,
            savepoint=self.connection.features.can_rollback_ddl,
        ):
            with self.connection.cursor() as cursor:
                for sql in sql_list:
>                   cursor.execute(sql)

.venv/lib/python3.13.../backends/base/operations.py:473: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

args = (<django.db.backends.utils.CursorWrapper object at 0x7f34c9e312b0>, 'TRUNCATE "authentik_providers_saml_samlsession", ... "authentik_providers_google_workspace_googleworkspaceprovide8e50", "authentik_policies_reputation_reputationpolicy";')
kwargs = {}

    def runner(*args: "P.args", **kwargs: "P.kwargs"):
        # type: (...) -> R
        if sentry_sdk.get_client().get_integration(integration) is None:
            return original_function(*args, **kwargs)
    
>       return sentry_patched_function(*args, **kwargs)

.venv/lib/python3.13.../site-packages/sentry_sdk/utils.py:1816: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <django.db.backends.utils.CursorWrapper object at 0x7f34c9e312b0>
sql = 'TRUNCATE "authentik_providers_saml_samlsession", "authentik_providers_rac_connectiontoken", "authentik_providers_oaut..., "authentik_providers_google_workspace_googleworkspaceprovide8e50", "authentik_policies_reputation_reputationpolicy";'
params = None

    @ensure_integration_enabled(DjangoIntegration, real_execute)
    def execute(self, sql, params=None):
        # type: (CursorWrapper, Any, Optional[Any]) -> Any
        with record_sql_queries(
            cursor=self.cursor,
            query=sql,
            params_list=params,
            paramstyle="format",
            executemany=False,
            span_origin=DjangoIntegration.origin_db,
        ) as span:
            _set_db_data(span, self)
>           result = real_execute(self, sql, params)

.venv/lib/python3.13.../integrations/django/__init__.py:651: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <django.db.backends.utils.CursorWrapper object at 0x7f34c9e312b0>
sql = 'TRUNCATE "authentik_providers_saml_samlsession", "authentik_providers_rac_connectiontoken", "authentik_providers_oaut..., "authentik_providers_google_workspace_googleworkspaceprovide8e50", "authentik_policies_reputation_reputationpolicy";'
params = None

    def execute(self, sql, params=None):
>       return self._execute_with_wrappers(
            sql, params, many=False, executor=self._execute
        )

.venv/lib/python3.13.../db/backends/utils.py:79: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <django.db.backends.utils.CursorWrapper object at 0x7f34c9e312b0>
sql = 'TRUNCATE "authentik_providers_saml_samlsession", "authentik_providers_rac_connectiontoken", "authentik_providers_oaut..., "authentik_providers_google_workspace_googleworkspaceprovide8e50", "authentik_policies_reputation_reputationpolicy";'
params = None, many = False
executor = <bound method CursorWrapper._execute of <django.db.backends.utils.CursorWrapper object at 0x7f34c9e312b0>>

    def _execute_with_wrappers(self, sql, params, many, executor):
        context = {"connection": self.db, "cursor": self}
        for wrapper in reversed(self.db.execute_wrappers):
            executor = functools.partial(wrapper, executor)
>       return executor(sql, params, many, context)

.venv/lib/python3.13.../db/backends/utils.py:92: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <django.db.backends.utils.CursorWrapper object at 0x7f34c9e312b0>
sql = 'TRUNCATE "authentik_providers_saml_samlsession", "authentik_providers_rac_connectiontoken", "authentik_providers_oaut..., "authentik_providers_google_workspace_googleworkspaceprovide8e50", "authentik_policies_reputation_reputationpolicy";'
params = None
ignored_wrapper_args = (False, {'connection': <DatabaseWrapper vendor='postgresql' alias='default'>, 'cursor': <django.db.backends.utils.CursorWrapper object at 0x7f34c9e312b0>})

    def _execute(self, sql, params, *ignored_wrapper_args):
        # Raise a warning during app initialization (stored_app_configs is only
        # ever set during testing).
        if not apps.ready and not apps.stored_app_configs:
            warnings.warn(self.APPS_NOT_READY_WARNING_MSG, category=RuntimeWarning)
        self.db.validate_no_broken_transaction()
>       with self.db.wrap_database_errors:

.venv/lib/python3.13.../db/backends/utils.py:100: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <django.db.utils.DatabaseErrorWrapper object at 0x7f34cbc34ec0>
exc_type = <class 'psycopg.errors.DeadlockDetected'>
exc_value = DeadlockDetected('deadlock detected\nDETAIL:  Process 239 waits for AccessExclusiveLock on relation 18272 of database ...ccessShareLock on relation 19465 of database 16389; blocked by process 239.\nHINT:  See server log for query details.')
traceback = <traceback object at 0x7f34cf6f3240>

    def __exit__(self, exc_type, exc_value, traceback):
        if exc_type is None:
            return
        for dj_exc_type in (
            DataError,
            OperationalError,
            IntegrityError,
            InternalError,
            ProgrammingError,
            NotSupportedError,
            DatabaseError,
            InterfaceError,
            Error,
        ):
            db_exc_type = getattr(self.wrapper.Database, dj_exc_type.__name__)
            if issubclass(exc_type, db_exc_type):
                dj_exc_value = dj_exc_type(*exc_value.args)
                # Only set the 'errors_occurred' flag for errors that may make
                # the connection unusable.
                if dj_exc_type not in (DataError, IntegrityError):
                    self.wrapper.errors_occurred = True
>               raise dj_exc_value.with_traceback(traceback) from exc_value

.venv/lib/python3.13.../django/db/utils.py:91: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <django.db.backends.utils.CursorWrapper object at 0x7f34c9e312b0>
sql = 'TRUNCATE "authentik_providers_saml_samlsession", "authentik_providers_rac_connectiontoken", "authentik_providers_oaut..., "authentik_providers_google_workspace_googleworkspaceprovide8e50", "authentik_policies_reputation_reputationpolicy";'
params = None
ignored_wrapper_args = (False, {'connection': <DatabaseWrapper vendor='postgresql' alias='default'>, 'cursor': <django.db.backends.utils.CursorWrapper object at 0x7f34c9e312b0>})

    def _execute(self, sql, params, *ignored_wrapper_args):
        # Raise a warning during app initialization (stored_app_configs is only
        # ever set during testing).
        if not apps.ready and not apps.stored_app_configs:
            warnings.warn(self.APPS_NOT_READY_WARNING_MSG, category=RuntimeWarning)
        self.db.validate_no_broken_transaction()
        with self.db.wrap_database_errors:
            if params is None:
                # params default might be backend specific.
>               return self.cursor.execute(sql)

.venv/lib/python3.13.../db/backends/utils.py:103: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <django_prometheus.db.common.ExportingCursorWrapper.<locals>.CursorWrapper [closed] [IDLE] (host=localhost user=authentik database=test_authentik) at 0x7f34ca1841d0>
args = ('TRUNCATE "authentik_providers_saml_samlsession", "authentik_providers_rac_connectiontoken", "authentik_providers_oau..."authentik_providers_google_workspace_googleworkspaceprovide8e50", "authentik_policies_reputation_reputationpolicy";',)
kwargs = {}

    def execute(self, *args, **kwargs):
        execute_total.labels(alias, vendor).inc()
        with (
            query_duration_seconds.labels(**labels).time(),
            ExceptionCounterByType(errors_total, extra_labels=labels),
        ):
>           return super().execute(*args, **kwargs)

.venv/lib/python3.13.../django_prometheus/db/common.py:69: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <django_prometheus.db.common.ExportingCursorWrapper.<locals>.CursorWrapper [closed] [IDLE] (host=localhost user=authentik database=test_authentik) at 0x7f34ca1841d0>
query = 'TRUNCATE "authentik_providers_saml_samlsession", "authentik_providers_rac_connectiontoken", "authentik_providers_oaut..., "authentik_providers_google_workspace_googleworkspaceprovide8e50", "authentik_policies_reputation_reputationpolicy";'
params = None

    def execute(
        self,
        query: Query,
        params: Params | None = None,
        *,
        prepare: bool | None = None,
        binary: bool | None = None,
    ) -> Self:
        """
        Execute a query or command to the database.
        """
        try:
            with self._conn.lock:
                self._conn.wait(
                    self._execute_gen(query, params, prepare=prepare, binary=binary)
                )
        except e._NO_TRACEBACK as ex:
>           raise ex.with_traceback(None)
E           django.db.utils.OperationalError: deadlock detected
E           DETAIL:  Process 239 waits for AccessExclusiveLock on relation 18272 of database 16389; blocked by process 257.
E           Process 257 waits for AccessShareLock on relation 19465 of database 16389; blocked by process 239.
E           HINT:  See server log for query details.

.venv/lib/python3.13....../site-packages/psycopg/cursor.py:97: OperationalError

The above exception was the direct cause of the following exception:

self = <tests.e2e.test_provider_oauth2_grafana.TestProviderOAuth2OAuth testMethod=test_authorization_logout>
result = <TestCaseFunction test_authorization_logout>, debug = False

    def _setup_and_call(self, result, debug=False):
        """
        Perform the following in order: pre-setup, run test, post-teardown,
        skipping pre/post hooks if test is set to be skipped.
    
        If debug=True, reraise any errors in setup and use super().debug()
        instead of __call__() to run the test.
        """
        testMethod = getattr(self, self._testMethodName)
        skipped = getattr(self.__class__, "__unittest_skip__", False) or getattr(
            testMethod, "__unittest_skip__", False
        )
    
        # Convert async test methods.
        if iscoroutinefunction(testMethod):
            setattr(self, self._testMethodName, async_to_sync(testMethod))
    
        if not skipped:
            try:
                if self.__class__._pre_setup_ran_eagerly:
                    self.__class__._pre_setup_ran_eagerly = False
                else:
                    self._pre_setup()
            except Exception:
                if debug:
                    raise
                result.addError(self, sys.exc_info())
                return
        if debug:
            super().debug()
        else:
            super().__call__(result)
        if not skipped:
            try:
>               self._post_teardown()

.venv/lib/python3.13.../django/test/testcases.py:379: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_oauth2_grafana.TestProviderOAuth2OAuth testMethod=test_authorization_logout>

    def _post_teardown(self):
        broker = get_broker()
        broker.flush_all()
        broker.close()
>       return super()._post_teardown()

tests/e2e/utils.py:113: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_oauth2_grafana.TestProviderOAuth2OAuth testMethod=test_authorization_logout>

    def _post_teardown(self):
        """
        Perform post-test things:
        * Flush the contents of the database to leave a clean slate. If the
          class has an 'available_apps' attribute, don't fire post_migrate.
        * Force-close the connection so the next test gets a clean cursor.
        """
        try:
>           self._fixture_teardown()

.venv/lib/python3.13.../django/test/testcases.py:1231: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_oauth2_grafana.TestProviderOAuth2OAuth testMethod=test_authorization_logout>

    def _fixture_teardown(self):
        # Allow TRUNCATE ... CASCADE and don't emit the post_migrate signal
        # when flushing only a subset of the apps
        for db_name in self._databases_names(include_mirrors=False):
            # Flush the database
            inhibit_post_migrate = (
                self.available_apps is not None
                or (  # Inhibit the post_migrate signal when using serialized
                    # rollback to avoid trying to recreate the serialized data.
                    self.serialized_rollback
                    and hasattr(connections[db_name], "_test_serialized_contents")
                )
            )
>           call_command(
                "flush",
                verbosity=0,
                interactive=False,
                database=db_name,
                reset_sequences=False,
                allow_cascade=self.available_apps is not None,
                inhibit_post_migrate=inhibit_post_migrate,
            )

.venv/lib/python3.13.../django/test/testcases.py:1266: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

command_name = 'flush', args = ()
options = {'allow_cascade': False, 'database': 'default', 'inhibit_post_migrate': False, 'interactive': False, ...}
command = <django.core.management.commands.flush.Command object at 0x7f34c8843130>
app_name = 'django.core'
parser = CommandParser(prog=' flush', usage=None, description='Removes ALL DATA from the database, including data added during ....', formatter_class=<class 'django.core.management.base.DjangoHelpFormatter'>, conflict_handler='error', add_help=True)
opt_mapping = {'database': 'database', 'force_color': 'force_color', 'help': 'help', 'no_color': 'no_color', ...}
arg_options = {'allow_cascade': False, 'database': 'default', 'inhibit_post_migrate': False, 'interactive': False, ...}
parse_args = []

    def call_command(command_name, *args, **options):
        """
        Call the given command, with the given options and args/kwargs.
    
        This is the primary API you should use for calling specific commands.
    
        `command_name` may be a string or a command object. Using a string is
        preferred unless the command object is required for further processing or
        testing.
    
        Some examples:
            call_command('migrate')
            call_command('shell', plain=True)
            call_command('sqlmigrate', 'myapp')
    
            from django.core.management.commands import flush
            cmd = flush.Command()
            call_command(cmd, verbosity=0, interactive=False)
            # Do something with cmd ...
        """
        if isinstance(command_name, BaseCommand):
            # Command object passed in.
            command = command_name
            command_name = command.__class__.__module__.split(".")[-1]
        else:
            # Load the command object by name.
            try:
                app_name = get_commands()[command_name]
            except KeyError:
                raise CommandError("Unknown command: %r" % command_name)
    
            if isinstance(app_name, BaseCommand):
                # If the command is already loaded, use it directly.
                command = app_name
            else:
                command = load_command_class(app_name, command_name)
    
        # Simulate argument parsing to get the option defaults (see #10080 for details).
        parser = command.create_parser("", command_name)
        # Use the `dest` option name from the parser option
        opt_mapping = {
            min(s_opt.option_strings).lstrip("-").replace("-", "_"): s_opt.dest
            for s_opt in parser._actions
            if s_opt.option_strings
        }
        arg_options = {opt_mapping.get(key, key): value for key, value in options.items()}
        parse_args = []
        for arg in args:
            if isinstance(arg, (list, tuple)):
                parse_args += map(str, arg)
            else:
                parse_args.append(str(arg))
    
        def get_actions(parser):
            # Parser actions and actions from sub-parser choices.
            for opt in parser._actions:
                if isinstance(opt, _SubParsersAction):
                    for sub_opt in opt.choices.values():
                        yield from get_actions(sub_opt)
                else:
                    yield opt
    
        parser_actions = list(get_actions(parser))
        mutually_exclusive_required_options = {
            opt
            for group in parser._mutually_exclusive_groups
            for opt in group._group_actions
            if group.required
        }
        # Any required arguments which are passed in via **options must be passed
        # to parse_args().
        for opt in parser_actions:
            if opt.dest in options and (
                opt.required or opt in mutually_exclusive_required_options
            ):
                opt_dest_count = sum(v == opt.dest for v in opt_mapping.values())
                if opt_dest_count > 1:
                    raise TypeError(
                        f"Cannot pass the dest {opt.dest!r} that matches multiple "
                        f"arguments via **options."
                    )
                parse_args.append(min(opt.option_strings))
                if isinstance(opt, (_AppendConstAction, _CountAction, _StoreConstAction)):
                    continue
                value = arg_options[opt.dest]
                if isinstance(value, (list, tuple)):
                    parse_args += map(str, value)
                else:
                    parse_args.append(str(value))
        defaults = parser.parse_args(args=parse_args)
        defaults = dict(defaults._get_kwargs(), **arg_options)
        # Raise an error if any unknown options were passed.
        stealth_options = set(command.base_stealth_options + command.stealth_options)
        dest_parameters = {action.dest for action in parser_actions}
        valid_options = (dest_parameters | stealth_options).union(opt_mapping)
        unknown_options = set(options) - valid_options
        if unknown_options:
            raise TypeError(
                "Unknown option(s) for %s command: %s. "
                "Valid options are: %s."
                % (
                    command_name,
                    ", ".join(sorted(unknown_options)),
                    ", ".join(sorted(valid_options)),
                )
            )
        # Move positional args out of options to mimic legacy optparse
        args = defaults.pop("args", ())
        if "skip_checks" not in options:
            defaults["skip_checks"] = True
    
>       return command.execute(*args, **defaults)

.venv/lib/python3.13.../core/management/__init__.py:194: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <django.core.management.commands.flush.Command object at 0x7f34c8843130>
args = ()
options = {'allow_cascade': False, 'database': 'default', 'force_color': False, 'inhibit_post_migrate': False, ...}

    def execute(self, *args, **options):
        """
        Try to execute this command, performing system checks if needed (as
        controlled by the ``requires_system_checks`` attribute, except if
        force-skipped).
        """
        if options["force_color"] and options["no_color"]:
            raise CommandError(
                "The --no-color and --force-color options can't be used together."
            )
        if options["force_color"]:
            self.style = color_style(force_color=True)
        elif options["no_color"]:
            self.style = no_style()
            self.stderr.style_func = None
        if options.get("stdout"):
            self.stdout = OutputWrapper(options["stdout"])
        if options.get("stderr"):
            self.stderr = OutputWrapper(options["stderr"])
    
        if self.requires_system_checks and not options["skip_checks"]:
            check_kwargs = self.get_check_kwargs(options)
            self.check(**check_kwargs)
        if self.requires_migrations_checks:
            self.check_migrations()
>       output = self.handle(*args, **options)

.venv/lib/python3.13.../core/management/base.py:464: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <django.core.management.commands.flush.Command object at 0x7f34c8843130>
options = {'allow_cascade': False, 'database': 'default', 'force_color': False, 'inhibit_post_migrate': False, ...}
database = 'default'
connection = <DatabaseWrapper vendor='postgresql' alias='default'>
verbosity = 0, interactive = False, reset_sequences = False
allow_cascade = False, inhibit_post_migrate = False

        def handle(self, **options):
            database = options["database"]
            connection = connections[database]
            verbosity = options["verbosity"]
            interactive = options["interactive"]
            # The following are stealth options used by Django's internals.
            reset_sequences = options.get("reset_sequences", True)
            allow_cascade = options.get("allow_cascade", False)
            inhibit_post_migrate = options.get("inhibit_post_migrate", False)
    
            self.style = no_style()
    
            # Import the 'management' module within each installed app, to register
            # dispatcher events.
            for app_config in apps.get_app_configs():
                try:
                    import_module(".management", app_config.name)
                except ImportError:
                    pass
    
            sql_list = sql_flush(
                self.style,
                connection,
                reset_sequences=reset_sequences,
                allow_cascade=allow_cascade,
            )
    
            if interactive:
                confirm = input(
                    """You have requested a flush of the database.
    This will IRREVERSIBLY DESTROY all data currently in the "%s" database,
    and return each table to an empty state.
    Are you sure you want to do this?
    
        Type 'yes' to continue, or 'no' to cancel: """
                    % connection.settings_dict["NAME"]
                )
            else:
                confirm = "yes"
    
            if confirm == "yes":
                try:
                    connection.ops.execute_sql_flush(sql_list)
                except Exception as exc:
>                   raise CommandError(
                        "Database %s couldn't be flushed. Possible reasons:\n"
                        "  * The database isn't running or isn't configured correctly.\n"
                        "  * At least one of the expected database tables doesn't exist.\n"
                        "  * The SQL was invalid.\n"
                        "Hint: Look at the output of 'django-admin sqlflush'. "
                        "That's the SQL this command wasn't able to run."
                        % (connection.settings_dict["NAME"],)
                    ) from exc
E                   django.core.management.base.CommandError: Database test_authentik couldn't be flushed. Possible reasons:
E                     * The database isn't running or isn't configured correctly.
E                     * At least one of the expected database tables doesn't exist.
E                     * The SQL was invalid.
E                   Hint: Look at the output of 'django-admin sqlflush'. That's the SQL this command wasn't able to run.

.venv/lib/python3.13.../management/commands/flush.py:76: CommandError
tests.e2e.test_provider_ldap.TestProviderLDAP::test_ldap_bind_success
Stack Traces | 50.5s run time
self = <unittest.case._Outcome object at 0x7f32cb8c7450>
test_case = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_success>
subTest = False

    @contextlib.contextmanager
    def testPartExecutor(self, test_case, subTest=False):
        old_success = self.success
        self.success = True
        try:
>           yield

.../hostedtoolcache/Python/3.13.11........./x64/lib/python3.13/unittest/case.py:58: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_success>
result = <TestCaseFunction test_ldap_bind_success>

    def run(self, result=None):
        if result is None:
            result = self.defaultTestResult()
            startTestRun = getattr(result, 'startTestRun', None)
            stopTestRun = getattr(result, 'stopTestRun', None)
            if startTestRun is not None:
                startTestRun()
        else:
            stopTestRun = None
    
        result.startTest(self)
        try:
            testMethod = getattr(self, self._testMethodName)
            if (getattr(self.__class__, "__unittest_skip__", False) or
                getattr(testMethod, "__unittest_skip__", False)):
                # If the class or method was skipped.
                skip_why = (getattr(self.__class__, '__unittest_skip_why__', '')
                            or getattr(testMethod, '__unittest_skip_why__', ''))
                _addSkip(result, self, skip_why)
                return result
    
            expecting_failure = (
                getattr(self, "__unittest_expecting_failure__", False) or
                getattr(testMethod, "__unittest_expecting_failure__", False)
            )
            outcome = _Outcome(result)
            start_time = time.perf_counter()
            try:
                self._outcome = outcome
    
                with outcome.testPartExecutor(self):
                    self._callSetUp()
                if outcome.success:
                    outcome.expecting_failure = expecting_failure
                    with outcome.testPartExecutor(self):
>                       self._callTestMethod(testMethod)

.../hostedtoolcache/Python/3.13.11........./x64/lib/python3.13/unittest/case.py:651: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_success>
method = <bound method TestProviderLDAP.test_ldap_bind_success of <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_success>>

    def _callTestMethod(self, method):
>       if method() is not None:

.../hostedtoolcache/Python/3.13.11........./x64/lib/python3.13/unittest/case.py:606: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_success>
args = (), kwargs = {}

    @wraps(func)
    def wrapper(self: TransactionTestCase, *args, **kwargs):
        """Run test again if we're below max_retries, including tearDown and
        setUp. Otherwise raise the error"""
        nonlocal count
        try:
>           return func(self, *args, **kwargs)

tests/e2e/utils.py:405: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

args = (<tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_success>,)
kwargs = {}, file = 'default/flow-default-invalidation-flow.yaml'
content = 'version: 1\nmetadata:\n  name: Default - Invalidation flow\nentries:\n- attrs:\n    designation: invalidation\n    na...0\n    stage: !KeyOf default-invalidation-logout\n    target: !KeyOf flow\n  model: authentik_flows.flowstagebinding\n'

    @wraps(func)
    def wrapper(*args, **kwargs):
        for file in files:
            content = BlueprintInstance(path=file).retrieve()
            Importer.from_string(content).apply()
>       return func(*args, **kwargs)

.../blueprints/tests/__init__.py:25: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_success>

    @retry()
    @apply_blueprint(
        "default/flow-default-authentication-flow.yaml",
        "default/flow-default-invalidation-flow.yaml",
    )
    def test_ldap_bind_success(self):
        """Test simple bind"""
>       self._prepare()

tests/e2e/test_provider_ldap.py:66: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_success>

    def _prepare(self) -> User:
        """prepare user, provider, app and container"""
        self.user.attributes["extraAttribute"] = "bar"
        self.user.save()
    
        ldap: LDAPProvider = LDAPProvider.objects.create(
            name=generate_id(),
            authorization_flow=Flow.objects.get(slug="default-authentication-flow"),
            search_mode=APIAccessMode.CACHED,
        )
        self.user.assign_perms_to_managed_role("search_full_directory", ldap)
        # we need to create an application to actually access the ldap
        Application.objects.create(name=generate_id(), slug=generate_id(), provider=ldap)
        outpost: Outpost = Outpost.objects.create(
            name=generate_id(),
            type=OutpostType.LDAP,
            _config=asdict(OutpostConfig(log_level="debug")),
        )
        outpost.providers.add(ldap)
    
>       self.start_ldap(outpost)

tests/e2e/test_provider_ldap.py:56: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_success>
outpost = <Outpost: Outpost Oo3UyPw9k9HKoH9KqHkghfNASqchAD6UlJiqfGwu>

    def start_ldap(self, outpost: Outpost):
        """Start ldap container based on outpost created"""
>       self.run_container(
            image=self.get_container_image("ghcr.io/goauthentik/dev-ldap"),
            ports={
                "3389": "3389",
                "6636": "6636",
            },
            environment={
                "AUTHENTIK_TOKEN": outpost.token.key,
            },
        )

tests/e2e/test_provider_ldap.py:25: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_success>
specs = {'detach': True, 'environment': {'AUTHENTIK_HOST': 'http://10.1.0.238:51533', 'AUTHENTIK_TOKEN': '06G4AVWb2Vogw9JpzScA...r.io/goauthentik/dev-ldap:gh-main', 'labels': {'io.goauthentik.test': 'd2xPN5ygYmpYNm3zpEzDcqnSQo8TqQridlEF1mZ7'}, ...}
container = <Container: 5be62fbcfc1a>
state = {'Dead': False, 'Error': '', 'ExitCode': 0, 'FinishedAt': '0001-01-01T00:00:00Z', ...}

    def run_container(self, **specs: Any) -> Container:
        if "network_mode" not in specs:
            specs["network"] = self.__network.name
        specs["labels"] = self.docker_labels
        specs["detach"] = True
        if hasattr(self, "live_server_url"):
            specs.setdefault("environment", {})
            specs["environment"]["AUTHENTIK_HOST"] = self.live_server_url
        container: Container = self.docker_client.containers.run(**specs)
        container.reload()
        state = container.attrs.get("State", {})
        if "Health" not in state:
            return container
>       self.wait_for_container(container)

tests/docker.py:83: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_success>
container = <Container: 5be62fbcfc1a>

    def wait_for_container(self, container: Container) -> Container:
        """Check that container is health"""
        attempt = 0
        while True:
            container.reload()
            status = container.attrs.get("State", {}).get("Health", {}).get("Status")
            if status == "healthy":
                return container
            sleep(1)
            attempt += 1
            if attempt >= self.max_healthcheck_attempts:
                self.output_container_logs(container)
>               raise self.failureException("Container failed to start")
E               AssertionError: Container failed to start

tests/docker.py:57: AssertionError
tests.e2e.test_provider_ldap.TestProviderLDAP::test_ldap_bind_success_starttls
Stack Traces | 50.6s run time
self = <unittest.case._Outcome object at 0x7f32caa356a0>
test_case = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_success_starttls>
subTest = False

    @contextlib.contextmanager
    def testPartExecutor(self, test_case, subTest=False):
        old_success = self.success
        self.success = True
        try:
>           yield

.../hostedtoolcache/Python/3.13.11........./x64/lib/python3.13/unittest/case.py:58: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_success_starttls>
result = <TestCaseFunction test_ldap_bind_success_starttls>

    def run(self, result=None):
        if result is None:
            result = self.defaultTestResult()
            startTestRun = getattr(result, 'startTestRun', None)
            stopTestRun = getattr(result, 'stopTestRun', None)
            if startTestRun is not None:
                startTestRun()
        else:
            stopTestRun = None
    
        result.startTest(self)
        try:
            testMethod = getattr(self, self._testMethodName)
            if (getattr(self.__class__, "__unittest_skip__", False) or
                getattr(testMethod, "__unittest_skip__", False)):
                # If the class or method was skipped.
                skip_why = (getattr(self.__class__, '__unittest_skip_why__', '')
                            or getattr(testMethod, '__unittest_skip_why__', ''))
                _addSkip(result, self, skip_why)
                return result
    
            expecting_failure = (
                getattr(self, "__unittest_expecting_failure__", False) or
                getattr(testMethod, "__unittest_expecting_failure__", False)
            )
            outcome = _Outcome(result)
            start_time = time.perf_counter()
            try:
                self._outcome = outcome
    
                with outcome.testPartExecutor(self):
                    self._callSetUp()
                if outcome.success:
                    outcome.expecting_failure = expecting_failure
                    with outcome.testPartExecutor(self):
>                       self._callTestMethod(testMethod)

.../hostedtoolcache/Python/3.13.11........./x64/lib/python3.13/unittest/case.py:651: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_success_starttls>
method = <bound method TestProviderLDAP.test_ldap_bind_success_starttls of <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_success_starttls>>

    def _callTestMethod(self, method):
>       if method() is not None:

.../hostedtoolcache/Python/3.13.11........./x64/lib/python3.13/unittest/case.py:606: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_success_starttls>
args = (), kwargs = {}

    @wraps(func)
    def wrapper(self: TransactionTestCase, *args, **kwargs):
        """Run test again if we're below max_retries, including tearDown and
        setUp. Otherwise raise the error"""
        nonlocal count
        try:
>           return func(self, *args, **kwargs)

tests/e2e/utils.py:405: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

args = (<tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_success_starttls>,)
kwargs = {}, file = 'default/flow-default-invalidation-flow.yaml'
content = 'version: 1\nmetadata:\n  name: Default - Invalidation flow\nentries:\n- attrs:\n    designation: invalidation\n    na...0\n    stage: !KeyOf default-invalidation-logout\n    target: !KeyOf flow\n  model: authentik_flows.flowstagebinding\n'

    @wraps(func)
    def wrapper(*args, **kwargs):
        for file in files:
            content = BlueprintInstance(path=file).retrieve()
            Importer.from_string(content).apply()
>       return func(*args, **kwargs)

.../blueprints/tests/__init__.py:25: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_success_starttls>

    @retry()
    @apply_blueprint(
        "default/flow-default-authentication-flow.yaml",
        "default/flow-default-invalidation-flow.yaml",
    )
    def test_ldap_bind_success_starttls(self):
        """Test simple bind with ssl"""
>       self._prepare()

tests/e2e/test_provider_ldap.py:120: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_success_starttls>

    def _prepare(self) -> User:
        """prepare user, provider, app and container"""
        self.user.attributes["extraAttribute"] = "bar"
        self.user.save()
    
        ldap: LDAPProvider = LDAPProvider.objects.create(
            name=generate_id(),
            authorization_flow=Flow.objects.get(slug="default-authentication-flow"),
            search_mode=APIAccessMode.CACHED,
        )
        self.user.assign_perms_to_managed_role("search_full_directory", ldap)
        # we need to create an application to actually access the ldap
        Application.objects.create(name=generate_id(), slug=generate_id(), provider=ldap)
        outpost: Outpost = Outpost.objects.create(
            name=generate_id(),
            type=OutpostType.LDAP,
            _config=asdict(OutpostConfig(log_level="debug")),
        )
        outpost.providers.add(ldap)
    
>       self.start_ldap(outpost)

tests/e2e/test_provider_ldap.py:56: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_success_starttls>
outpost = <Outpost: Outpost Q4t8yWuSLQwhNdJpk9lBzO6d9mhhwSqEHBqPXBUM>

    def start_ldap(self, outpost: Outpost):
        """Start ldap container based on outpost created"""
>       self.run_container(
            image=self.get_container_image("ghcr.io/goauthentik/dev-ldap"),
            ports={
                "3389": "3389",
                "6636": "6636",
            },
            environment={
                "AUTHENTIK_TOKEN": outpost.token.key,
            },
        )

tests/e2e/test_provider_ldap.py:25: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_success_starttls>
specs = {'detach': True, 'environment': {'AUTHENTIK_HOST': 'http://10.1.0.238:51533', 'AUTHENTIK_TOKEN': '9PxVJNHaqK5FtfBcRrkE...r.io/goauthentik/dev-ldap:gh-main', 'labels': {'io.goauthentik.test': 'd2xPN5ygYmpYNm3zpEzDcqnSQo8TqQridlEF1mZ7'}, ...}
container = <Container: c2d2d0ae3dca>
state = {'Dead': False, 'Error': '', 'ExitCode': 0, 'FinishedAt': '0001-01-01T00:00:00Z', ...}

    def run_container(self, **specs: Any) -> Container:
        if "network_mode" not in specs:
            specs["network"] = self.__network.name
        specs["labels"] = self.docker_labels
        specs["detach"] = True
        if hasattr(self, "live_server_url"):
            specs.setdefault("environment", {})
            specs["environment"]["AUTHENTIK_HOST"] = self.live_server_url
        container: Container = self.docker_client.containers.run(**specs)
        container.reload()
        state = container.attrs.get("State", {})
        if "Health" not in state:
            return container
>       self.wait_for_container(container)

tests/docker.py:83: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_success_starttls>
container = <Container: c2d2d0ae3dca>

    def wait_for_container(self, container: Container) -> Container:
        """Check that container is health"""
        attempt = 0
        while True:
            container.reload()
            status = container.attrs.get("State", {}).get("Health", {}).get("Status")
            if status == "healthy":
                return container
            sleep(1)
            attempt += 1
            if attempt >= self.max_healthcheck_attempts:
                self.output_container_logs(container)
>               raise self.failureException("Container failed to start")
E               AssertionError: Container failed to start

tests/docker.py:57: AssertionError
tests.e2e.test_provider_ldap.TestProviderLDAP::test_ldap_schema
Stack Traces | 51s run time
self = <unittest.case._Outcome object at 0x7f32caf61eb0>
test_case = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_schema>
subTest = False

    @contextlib.contextmanager
    def testPartExecutor(self, test_case, subTest=False):
        old_success = self.success
        self.success = True
        try:
>           yield

.../hostedtoolcache/Python/3.13.11........./x64/lib/python3.13/unittest/case.py:58: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_schema>
result = <TestCaseFunction test_ldap_schema>

    def run(self, result=None):
        if result is None:
            result = self.defaultTestResult()
            startTestRun = getattr(result, 'startTestRun', None)
            stopTestRun = getattr(result, 'stopTestRun', None)
            if startTestRun is not None:
                startTestRun()
        else:
            stopTestRun = None
    
        result.startTest(self)
        try:
            testMethod = getattr(self, self._testMethodName)
            if (getattr(self.__class__, "__unittest_skip__", False) or
                getattr(testMethod, "__unittest_skip__", False)):
                # If the class or method was skipped.
                skip_why = (getattr(self.__class__, '__unittest_skip_why__', '')
                            or getattr(testMethod, '__unittest_skip_why__', ''))
                _addSkip(result, self, skip_why)
                return result
    
            expecting_failure = (
                getattr(self, "__unittest_expecting_failure__", False) or
                getattr(testMethod, "__unittest_expecting_failure__", False)
            )
            outcome = _Outcome(result)
            start_time = time.perf_counter()
            try:
                self._outcome = outcome
    
                with outcome.testPartExecutor(self):
                    self._callSetUp()
                if outcome.success:
                    outcome.expecting_failure = expecting_failure
                    with outcome.testPartExecutor(self):
>                       self._callTestMethod(testMethod)

.../hostedtoolcache/Python/3.13.11........./x64/lib/python3.13/unittest/case.py:651: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_schema>
method = <bound method TestProviderLDAP.test_ldap_schema of <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_schema>>

    def _callTestMethod(self, method):
>       if method() is not None:

.../hostedtoolcache/Python/3.13.11........./x64/lib/python3.13/unittest/case.py:606: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_schema>
args = (), kwargs = {}

    @wraps(func)
    def wrapper(self: TransactionTestCase, *args, **kwargs):
        """Run test again if we're below max_retries, including tearDown and
        setUp. Otherwise raise the error"""
        nonlocal count
        try:
>           return func(self, *args, **kwargs)

tests/e2e/utils.py:405: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

args = (<tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_schema>,)
kwargs = {}, file = 'default/flow-default-invalidation-flow.yaml'
content = 'version: 1\nmetadata:\n  name: Default - Invalidation flow\nentries:\n- attrs:\n    designation: invalidation\n    na...0\n    stage: !KeyOf default-invalidation-logout\n    target: !KeyOf flow\n  model: authentik_flows.flowstagebinding\n'

    @wraps(func)
    def wrapper(*args, **kwargs):
        for file in files:
            content = BlueprintInstance(path=file).retrieve()
            Importer.from_string(content).apply()
>       return func(*args, **kwargs)

.../blueprints/tests/__init__.py:25: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

args = (<tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_schema>,)
kwargs = {}, config = <AuthentikTenantsConfig: authentik_tenants>

    @wraps(func)
    def wrapper(*args, **kwargs):
        config = apps.get_app_config(app_name)
        if isinstance(config, ManagedAppConfig):
            config._on_startup_callback(None)
>       return func(*args, **kwargs)

.../blueprints/tests/__init__.py:43: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

args = (<tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_schema>,)
kwargs = {}, config = <AuthentikOutpostConfig: authentik_outposts>

    @wraps(func)
    def wrapper(*args, **kwargs):
        config = apps.get_app_config(app_name)
        if isinstance(config, ManagedAppConfig):
            config._on_startup_callback(None)
>       return func(*args, **kwargs)

.../blueprints/tests/__init__.py:43: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_schema>

    @retry()
    @apply_blueprint(
        "default/flow-default-authentication-flow.yaml",
        "default/flow-default-invalidation-flow.yaml",
    )
    @reconcile_app("authentik_tenants")
    @reconcile_app("authentik_outposts")
    def test_ldap_schema(self):
        """Test LDAP Schema"""
>       self._prepare()

tests/e2e/test_provider_ldap.py:416: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_schema>

    def _prepare(self) -> User:
        """prepare user, provider, app and container"""
        self.user.attributes["extraAttribute"] = "bar"
        self.user.save()
    
        ldap: LDAPProvider = LDAPProvider.objects.create(
            name=generate_id(),
            authorization_flow=Flow.objects.get(slug="default-authentication-flow"),
            search_mode=APIAccessMode.CACHED,
        )
        self.user.assign_perms_to_managed_role("search_full_directory", ldap)
        # we need to create an application to actually access the ldap
        Application.objects.create(name=generate_id(), slug=generate_id(), provider=ldap)
        outpost: Outpost = Outpost.objects.create(
            name=generate_id(),
            type=OutpostType.LDAP,
            _config=asdict(OutpostConfig(log_level="debug")),
        )
        outpost.providers.add(ldap)
    
>       self.start_ldap(outpost)

tests/e2e/test_provider_ldap.py:56: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_schema>
outpost = <Outpost: Outpost ybpg2qtqHbqwHZdvu2krsLAHnImX17zDhTUnJr2U>

    def start_ldap(self, outpost: Outpost):
        """Start ldap container based on outpost created"""
>       self.run_container(
            image=self.get_container_image("ghcr.io/goauthentik/dev-ldap"),
            ports={
                "3389": "3389",
                "6636": "6636",
            },
            environment={
                "AUTHENTIK_TOKEN": outpost.token.key,
            },
        )

tests/e2e/test_provider_ldap.py:25: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_schema>
specs = {'detach': True, 'environment': {'AUTHENTIK_HOST': 'http://10.1.0.238:51533', 'AUTHENTIK_TOKEN': 'Wlcf2gXeYqJsfKOcACpC...r.io/goauthentik/dev-ldap:gh-main', 'labels': {'io.goauthentik.test': 'd2xPN5ygYmpYNm3zpEzDcqnSQo8TqQridlEF1mZ7'}, ...}
container = <Container: a85e488f58d5>
state = {'Dead': False, 'Error': '', 'ExitCode': 0, 'FinishedAt': '0001-01-01T00:00:00Z', ...}

    def run_container(self, **specs: Any) -> Container:
        if "network_mode" not in specs:
            specs["network"] = self.__network.name
        specs["labels"] = self.docker_labels
        specs["detach"] = True
        if hasattr(self, "live_server_url"):
            specs.setdefault("environment", {})
            specs["environment"]["AUTHENTIK_HOST"] = self.live_server_url
        container: Container = self.docker_client.containers.run(**specs)
        container.reload()
        state = container.attrs.get("State", {})
        if "Health" not in state:
            return container
>       self.wait_for_container(container)

tests/docker.py:83: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_schema>
container = <Container: a85e488f58d5>

    def wait_for_container(self, container: Container) -> Container:
        """Check that container is health"""
        attempt = 0
        while True:
            container.reload()
            status = container.attrs.get("State", {}).get("Health", {}).get("Status")
            if status == "healthy":
                return container
            sleep(1)
            attempt += 1
            if attempt >= self.max_healthcheck_attempts:
                self.output_container_logs(container)
>               raise self.failureException("Container failed to start")
E               AssertionError: Container failed to start

tests/docker.py:57: AssertionError
tests.e2e.test_provider_ldap.TestProviderLDAP::test_ldap_search_attrs_filter
Stack Traces | 51s run time
self = <unittest.case._Outcome object at 0x7f32cb8c4c50>
test_case = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_search_attrs_filter>
subTest = False

    @contextlib.contextmanager
    def testPartExecutor(self, test_case, subTest=False):
        old_success = self.success
        self.success = True
        try:
>           yield

.../hostedtoolcache/Python/3.13.11........./x64/lib/python3.13/unittest/case.py:58: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_search_attrs_filter>
result = <TestCaseFunction test_ldap_search_attrs_filter>

    def run(self, result=None):
        if result is None:
            result = self.defaultTestResult()
            startTestRun = getattr(result, 'startTestRun', None)
            stopTestRun = getattr(result, 'stopTestRun', None)
            if startTestRun is not None:
                startTestRun()
        else:
            stopTestRun = None
    
        result.startTest(self)
        try:
            testMethod = getattr(self, self._testMethodName)
            if (getattr(self.__class__, "__unittest_skip__", False) or
                getattr(testMethod, "__unittest_skip__", False)):
                # If the class or method was skipped.
                skip_why = (getattr(self.__class__, '__unittest_skip_why__', '')
                            or getattr(testMethod, '__unittest_skip_why__', ''))
                _addSkip(result, self, skip_why)
                return result
    
            expecting_failure = (
                getattr(self, "__unittest_expecting_failure__", False) or
                getattr(testMethod, "__unittest_expecting_failure__", False)
            )
            outcome = _Outcome(result)
            start_time = time.perf_counter()
            try:
                self._outcome = outcome
    
                with outcome.testPartExecutor(self):
                    self._callSetUp()
                if outcome.success:
                    outcome.expecting_failure = expecting_failure
                    with outcome.testPartExecutor(self):
>                       self._callTestMethod(testMethod)

.../hostedtoolcache/Python/3.13.11........./x64/lib/python3.13/unittest/case.py:651: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_search_attrs_filter>
method = <bound method TestProviderLDAP.test_ldap_search_attrs_filter of <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_search_attrs_filter>>

    def _callTestMethod(self, method):
>       if method() is not None:

.../hostedtoolcache/Python/3.13.11........./x64/lib/python3.13/unittest/case.py:606: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_search_attrs_filter>
args = (), kwargs = {}

    @wraps(func)
    def wrapper(self: TransactionTestCase, *args, **kwargs):
        """Run test again if we're below max_retries, including tearDown and
        setUp. Otherwise raise the error"""
        nonlocal count
        try:
>           return func(self, *args, **kwargs)

tests/e2e/utils.py:405: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

args = (<tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_search_attrs_filter>,)
kwargs = {}, file = 'default/flow-default-invalidation-flow.yaml'
content = 'version: 1\nmetadata:\n  name: Default - Invalidation flow\nentries:\n- attrs:\n    designation: invalidation\n    na...0\n    stage: !KeyOf default-invalidation-logout\n    target: !KeyOf flow\n  model: authentik_flows.flowstagebinding\n'

    @wraps(func)
    def wrapper(*args, **kwargs):
        for file in files:
            content = BlueprintInstance(path=file).retrieve()
            Importer.from_string(content).apply()
>       return func(*args, **kwargs)

.../blueprints/tests/__init__.py:25: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

args = (<tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_search_attrs_filter>,)
kwargs = {}, config = <AuthentikTenantsConfig: authentik_tenants>

    @wraps(func)
    def wrapper(*args, **kwargs):
        config = apps.get_app_config(app_name)
        if isinstance(config, ManagedAppConfig):
            config._on_startup_callback(None)
>       return func(*args, **kwargs)

.../blueprints/tests/__init__.py:43: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

args = (<tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_search_attrs_filter>,)
kwargs = {}, config = <AuthentikOutpostConfig: authentik_outposts>

    @wraps(func)
    def wrapper(*args, **kwargs):
        config = apps.get_app_config(app_name)
        if isinstance(config, ManagedAppConfig):
            config._on_startup_callback(None)
>       return func(*args, **kwargs)

.../blueprints/tests/__init__.py:43: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_search_attrs_filter>

    @retry()
    @apply_blueprint(
        "default/flow-default-authentication-flow.yaml",
        "default/flow-default-invalidation-flow.yaml",
    )
    @reconcile_app("authentik_tenants")
    @reconcile_app("authentik_outposts")
    def test_ldap_search_attrs_filter(self):
        """Test search with attributes filtering"""
        # Remove akadmin to ensure list is correct
        # Remove user before starting container so it's not cached
        User.objects.filter(username="akadmin").delete()
    
>       outpost = self._prepare()

tests/e2e/test_provider_ldap.py:442: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_search_attrs_filter>

    def _prepare(self) -> User:
        """prepare user, provider, app and container"""
        self.user.attributes["extraAttribute"] = "bar"
        self.user.save()
    
        ldap: LDAPProvider = LDAPProvider.objects.create(
            name=generate_id(),
            authorization_flow=Flow.objects.get(slug="default-authentication-flow"),
            search_mode=APIAccessMode.CACHED,
        )
        self.user.assign_perms_to_managed_role("search_full_directory", ldap)
        # we need to create an application to actually access the ldap
        Application.objects.create(name=generate_id(), slug=generate_id(), provider=ldap)
        outpost: Outpost = Outpost.objects.create(
            name=generate_id(),
            type=OutpostType.LDAP,
            _config=asdict(OutpostConfig(log_level="debug")),
        )
        outpost.providers.add(ldap)
    
>       self.start_ldap(outpost)

tests/e2e/test_provider_ldap.py:56: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_search_attrs_filter>
outpost = <Outpost: Outpost FB4rfE1KYbKmXSABdbOv6jwNQFKensEdP55GGMPU>

    def start_ldap(self, outpost: Outpost):
        """Start ldap container based on outpost created"""
>       self.run_container(
            image=self.get_container_image("ghcr.io/goauthentik/dev-ldap"),
            ports={
                "3389": "3389",
                "6636": "6636",
            },
            environment={
                "AUTHENTIK_TOKEN": outpost.token.key,
            },
        )

tests/e2e/test_provider_ldap.py:25: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_search_attrs_filter>
specs = {'detach': True, 'environment': {'AUTHENTIK_HOST': 'http://10.1.0.238:51533', 'AUTHENTIK_TOKEN': 'gzxHkkKiWOLcGAcEsKFa...r.io/goauthentik/dev-ldap:gh-main', 'labels': {'io.goauthentik.test': 'd2xPN5ygYmpYNm3zpEzDcqnSQo8TqQridlEF1mZ7'}, ...}
container = <Container: 3ac106f001c6>
state = {'Dead': False, 'Error': '', 'ExitCode': 0, 'FinishedAt': '0001-01-01T00:00:00Z', ...}

    def run_container(self, **specs: Any) -> Container:
        if "network_mode" not in specs:
            specs["network"] = self.__network.name
        specs["labels"] = self.docker_labels
        specs["detach"] = True
        if hasattr(self, "live_server_url"):
            specs.setdefault("environment", {})
            specs["environment"]["AUTHENTIK_HOST"] = self.live_server_url
        container: Container = self.docker_client.containers.run(**specs)
        container.reload()
        state = container.attrs.get("State", {})
        if "Health" not in state:
            return container
>       self.wait_for_container(container)

tests/docker.py:83: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_search_attrs_filter>
container = <Container: 3ac106f001c6>

    def wait_for_container(self, container: Container) -> Container:
        """Check that container is health"""
        attempt = 0
        while True:
            container.reload()
            status = container.attrs.get("State", {}).get("Health", {}).get("Status")
            if status == "healthy":
                return container
            sleep(1)
            attempt += 1
            if attempt >= self.max_healthcheck_attempts:
                self.output_container_logs(container)
>               raise self.failureException("Container failed to start")
E               AssertionError: Container failed to start

tests/docker.py:57: AssertionError
tests.e2e.test_provider_ldap.TestProviderLDAP::test_ldap_bind_fail
Stack Traces | 51.1s run time
self = <unittest.case._Outcome object at 0x7f32c9033b60>
test_case = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_fail>
subTest = False

    @contextlib.contextmanager
    def testPartExecutor(self, test_case, subTest=False):
        old_success = self.success
        self.success = True
        try:
>           yield

.../hostedtoolcache/Python/3.13.11........./x64/lib/python3.13/unittest/case.py:58: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_fail>
result = <TestCaseFunction test_ldap_bind_fail>

    def run(self, result=None):
        if result is None:
            result = self.defaultTestResult()
            startTestRun = getattr(result, 'startTestRun', None)
            stopTestRun = getattr(result, 'stopTestRun', None)
            if startTestRun is not None:
                startTestRun()
        else:
            stopTestRun = None
    
        result.startTest(self)
        try:
            testMethod = getattr(self, self._testMethodName)
            if (getattr(self.__class__, "__unittest_skip__", False) or
                getattr(testMethod, "__unittest_skip__", False)):
                # If the class or method was skipped.
                skip_why = (getattr(self.__class__, '__unittest_skip_why__', '')
                            or getattr(testMethod, '__unittest_skip_why__', ''))
                _addSkip(result, self, skip_why)
                return result
    
            expecting_failure = (
                getattr(self, "__unittest_expecting_failure__", False) or
                getattr(testMethod, "__unittest_expecting_failure__", False)
            )
            outcome = _Outcome(result)
            start_time = time.perf_counter()
            try:
                self._outcome = outcome
    
                with outcome.testPartExecutor(self):
                    self._callSetUp()
                if outcome.success:
                    outcome.expecting_failure = expecting_failure
                    with outcome.testPartExecutor(self):
>                       self._callTestMethod(testMethod)

.../hostedtoolcache/Python/3.13.11........./x64/lib/python3.13/unittest/case.py:651: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_fail>
method = <bound method TestProviderLDAP.test_ldap_bind_fail of <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_fail>>

    def _callTestMethod(self, method):
>       if method() is not None:

.../hostedtoolcache/Python/3.13.11........./x64/lib/python3.13/unittest/case.py:606: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_fail>
args = (), kwargs = {}

    @wraps(func)
    def wrapper(self: TransactionTestCase, *args, **kwargs):
        """Run test again if we're below max_retries, including tearDown and
        setUp. Otherwise raise the error"""
        nonlocal count
        try:
>           return func(self, *args, **kwargs)

tests/e2e/utils.py:405: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

args = (<tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_fail>,)
kwargs = {}, file = 'default/flow-default-invalidation-flow.yaml'
content = 'version: 1\nmetadata:\n  name: Default - Invalidation flow\nentries:\n- attrs:\n    designation: invalidation\n    na...0\n    stage: !KeyOf default-invalidation-logout\n    target: !KeyOf flow\n  model: authentik_flows.flowstagebinding\n'

    @wraps(func)
    def wrapper(*args, **kwargs):
        for file in files:
            content = BlueprintInstance(path=file).retrieve()
            Importer.from_string(content).apply()
>       return func(*args, **kwargs)

.../blueprints/tests/__init__.py:25: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_fail>

    @retry()
    @apply_blueprint(
        "default/flow-default-authentication-flow.yaml",
        "default/flow-default-invalidation-flow.yaml",
    )
    def test_ldap_bind_fail(self):
        """Test simple bind (failed)"""
>       self._prepare()

tests/e2e/test_provider_ldap.py:148: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_fail>

    def _prepare(self) -> User:
        """prepare user, provider, app and container"""
        self.user.attributes["extraAttribute"] = "bar"
        self.user.save()
    
        ldap: LDAPProvider = LDAPProvider.objects.create(
            name=generate_id(),
            authorization_flow=Flow.objects.get(slug="default-authentication-flow"),
            search_mode=APIAccessMode.CACHED,
        )
        self.user.assign_perms_to_managed_role("search_full_directory", ldap)
        # we need to create an application to actually access the ldap
        Application.objects.create(name=generate_id(), slug=generate_id(), provider=ldap)
        outpost: Outpost = Outpost.objects.create(
            name=generate_id(),
            type=OutpostType.LDAP,
            _config=asdict(OutpostConfig(log_level="debug")),
        )
        outpost.providers.add(ldap)
    
>       self.start_ldap(outpost)

tests/e2e/test_provider_ldap.py:56: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_fail>
outpost = <Outpost: Outpost QkMCEzQnLUVMed91QT2Fc0Q6e3TE2e9VZGZbP1GF>

    def start_ldap(self, outpost: Outpost):
        """Start ldap container based on outpost created"""
>       self.run_container(
            image=self.get_container_image("ghcr.io/goauthentik/dev-ldap"),
            ports={
                "3389": "3389",
                "6636": "6636",
            },
            environment={
                "AUTHENTIK_TOKEN": outpost.token.key,
            },
        )

tests/e2e/test_provider_ldap.py:25: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_fail>
specs = {'detach': True, 'environment': {'AUTHENTIK_HOST': 'http://10.1.0.238:51533', 'AUTHENTIK_TOKEN': 'C8XOMyewGzHAeh9aC2pG...r.io/goauthentik/dev-ldap:gh-main', 'labels': {'io.goauthentik.test': 'd2xPN5ygYmpYNm3zpEzDcqnSQo8TqQridlEF1mZ7'}, ...}
container = <Container: 8186fe960303>
state = {'Dead': False, 'Error': '', 'ExitCode': 0, 'FinishedAt': '0001-01-01T00:00:00Z', ...}

    def run_container(self, **specs: Any) -> Container:
        if "network_mode" not in specs:
            specs["network"] = self.__network.name
        specs["labels"] = self.docker_labels
        specs["detach"] = True
        if hasattr(self, "live_server_url"):
            specs.setdefault("environment", {})
            specs["environment"]["AUTHENTIK_HOST"] = self.live_server_url
        container: Container = self.docker_client.containers.run(**specs)
        container.reload()
        state = container.attrs.get("State", {})
        if "Health" not in state:
            return container
>       self.wait_for_container(container)

tests/docker.py:83: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_fail>
container = <Container: 8186fe960303>

    def wait_for_container(self, container: Container) -> Container:
        """Check that container is health"""
        attempt = 0
        while True:
            container.reload()
            status = container.attrs.get("State", {}).get("Health", {}).get("Status")
            if status == "healthy":
                return container
            sleep(1)
            attempt += 1
            if attempt >= self.max_healthcheck_attempts:
                self.output_container_logs(container)
>               raise self.failureException("Container failed to start")
E               AssertionError: Container failed to start

tests/docker.py:57: AssertionError
tests.e2e.test_provider_ldap.TestProviderLDAP::test_ldap_bind_search
Stack Traces | 51.2s run time
self = <unittest.case._Outcome object at 0x7f32cb30b9b0>
test_case = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_search>
subTest = False

    @contextlib.contextmanager
    def testPartExecutor(self, test_case, subTest=False):
        old_success = self.success
        self.success = True
        try:
>           yield

.../hostedtoolcache/Python/3.13.11........./x64/lib/python3.13/unittest/case.py:58: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_search>
result = <TestCaseFunction test_ldap_bind_search>

    def run(self, result=None):
        if result is None:
            result = self.defaultTestResult()
            startTestRun = getattr(result, 'startTestRun', None)
            stopTestRun = getattr(result, 'stopTestRun', None)
            if startTestRun is not None:
                startTestRun()
        else:
            stopTestRun = None
    
        result.startTest(self)
        try:
            testMethod = getattr(self, self._testMethodName)
            if (getattr(self.__class__, "__unittest_skip__", False) or
                getattr(testMethod, "__unittest_skip__", False)):
                # If the class or method was skipped.
                skip_why = (getattr(self.__class__, '__unittest_skip_why__', '')
                            or getattr(testMethod, '__unittest_skip_why__', ''))
                _addSkip(result, self, skip_why)
                return result
    
            expecting_failure = (
                getattr(self, "__unittest_expecting_failure__", False) or
                getattr(testMethod, "__unittest_expecting_failure__", False)
            )
            outcome = _Outcome(result)
            start_time = time.perf_counter()
            try:
                self._outcome = outcome
    
                with outcome.testPartExecutor(self):
                    self._callSetUp()
                if outcome.success:
                    outcome.expecting_failure = expecting_failure
                    with outcome.testPartExecutor(self):
>                       self._callTestMethod(testMethod)

.../hostedtoolcache/Python/3.13.11........./x64/lib/python3.13/unittest/case.py:651: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_search>
method = <bound method TestProviderLDAP.test_ldap_bind_search of <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_search>>

    def _callTestMethod(self, method):
>       if method() is not None:

.../hostedtoolcache/Python/3.13.11........./x64/lib/python3.13/unittest/case.py:606: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_search>
args = (), kwargs = {}

    @wraps(func)
    def wrapper(self: TransactionTestCase, *args, **kwargs):
        """Run test again if we're below max_retries, including tearDown and
        setUp. Otherwise raise the error"""
        nonlocal count
        try:
>           return func(self, *args, **kwargs)

tests/e2e/utils.py:405: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

args = (<tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_search>,)
kwargs = {}, file = 'default/flow-default-invalidation-flow.yaml'
content = 'version: 1\nmetadata:\n  name: Default - Invalidation flow\nentries:\n- attrs:\n    designation: invalidation\n    na...0\n    stage: !KeyOf default-invalidation-logout\n    target: !KeyOf flow\n  model: authentik_flows.flowstagebinding\n'

    @wraps(func)
    def wrapper(*args, **kwargs):
        for file in files:
            content = BlueprintInstance(path=file).retrieve()
            Importer.from_string(content).apply()
>       return func(*args, **kwargs)

.../blueprints/tests/__init__.py:25: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

args = (<tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_search>,)
kwargs = {}, config = <AuthentikTenantsConfig: authentik_tenants>

    @wraps(func)
    def wrapper(*args, **kwargs):
        config = apps.get_app_config(app_name)
        if isinstance(config, ManagedAppConfig):
            config._on_startup_callback(None)
>       return func(*args, **kwargs)

.../blueprints/tests/__init__.py:43: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

args = (<tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_search>,)
kwargs = {}, config = <AuthentikOutpostConfig: authentik_outposts>

    @wraps(func)
    def wrapper(*args, **kwargs):
        config = apps.get_app_config(app_name)
        if isinstance(config, ManagedAppConfig):
            config._on_startup_callback(None)
>       return func(*args, **kwargs)

.../blueprints/tests/__init__.py:43: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_search>

    @retry()
    @apply_blueprint(
        "default/flow-default-authentication-flow.yaml",
        "default/flow-default-invalidation-flow.yaml",
    )
    @reconcile_app("authentik_tenants")
    @reconcile_app("authentik_outposts")
    def test_ldap_bind_search(self):
        """Test simple bind + search"""
        # Remove akadmin to ensure list is correct
        # Remove user before starting container so it's not cached
        User.objects.filter(username="akadmin").delete()
    
>       outpost = self._prepare()

tests/e2e/test_provider_ldap.py:182: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_search>

    def _prepare(self) -> User:
        """prepare user, provider, app and container"""
        self.user.attributes["extraAttribute"] = "bar"
        self.user.save()
    
        ldap: LDAPProvider = LDAPProvider.objects.create(
            name=generate_id(),
            authorization_flow=Flow.objects.get(slug="default-authentication-flow"),
            search_mode=APIAccessMode.CACHED,
        )
        self.user.assign_perms_to_managed_role("search_full_directory", ldap)
        # we need to create an application to actually access the ldap
        Application.objects.create(name=generate_id(), slug=generate_id(), provider=ldap)
        outpost: Outpost = Outpost.objects.create(
            name=generate_id(),
            type=OutpostType.LDAP,
            _config=asdict(OutpostConfig(log_level="debug")),
        )
        outpost.providers.add(ldap)
    
>       self.start_ldap(outpost)

tests/e2e/test_provider_ldap.py:56: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_search>
outpost = <Outpost: Outpost dT4yBMhKl5PZ4qowWiL8SWTUJRihqdI6Xwo6HN4G>

    def start_ldap(self, outpost: Outpost):
        """Start ldap container based on outpost created"""
>       self.run_container(
            image=self.get_container_image("ghcr.io/goauthentik/dev-ldap"),
            ports={
                "3389": "3389",
                "6636": "6636",
            },
            environment={
                "AUTHENTIK_TOKEN": outpost.token.key,
            },
        )

tests/e2e/test_provider_ldap.py:25: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_search>
specs = {'detach': True, 'environment': {'AUTHENTIK_HOST': 'http://10.1.0.238:51533', 'AUTHENTIK_TOKEN': 'Zr3Mm7DCjEwr5sU5VXQC...r.io/goauthentik/dev-ldap:gh-main', 'labels': {'io.goauthentik.test': 'd2xPN5ygYmpYNm3zpEzDcqnSQo8TqQridlEF1mZ7'}, ...}
container = <Container: 50ea62ab16e9>
state = {'Dead': False, 'Error': '', 'ExitCode': 0, 'FinishedAt': '0001-01-01T00:00:00Z', ...}

    def run_container(self, **specs: Any) -> Container:
        if "network_mode" not in specs:
            specs["network"] = self.__network.name
        specs["labels"] = self.docker_labels
        specs["detach"] = True
        if hasattr(self, "live_server_url"):
            specs.setdefault("environment", {})
            specs["environment"]["AUTHENTIK_HOST"] = self.live_server_url
        container: Container = self.docker_client.containers.run(**specs)
        container.reload()
        state = container.attrs.get("State", {})
        if "Health" not in state:
            return container
>       self.wait_for_container(container)

tests/docker.py:83: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_search>
container = <Container: 50ea62ab16e9>

    def wait_for_container(self, container: Container) -> Container:
        """Check that container is health"""
        attempt = 0
        while True:
            container.reload()
            status = container.attrs.get("State", {}).get("Health", {}).get("Status")
            if status == "healthy":
                return container
            sleep(1)
            attempt += 1
            if attempt >= self.max_healthcheck_attempts:
                self.output_container_logs(container)
>               raise self.failureException("Container failed to start")
E               AssertionError: Container failed to start

tests/docker.py:57: AssertionError
tests.e2e.test_provider_ldap.TestProviderLDAP::test_ldap_bind_search_no_perms
Stack Traces | 51.4s run time
self = <unittest.case._Outcome object at 0x7f32c9093a80>
test_case = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_search_no_perms>
subTest = False

    @contextlib.contextmanager
    def testPartExecutor(self, test_case, subTest=False):
        old_success = self.success
        self.success = True
        try:
>           yield

.../hostedtoolcache/Python/3.13.11........./x64/lib/python3.13/unittest/case.py:58: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_search_no_perms>
result = <TestCaseFunction test_ldap_bind_search_no_perms>

    def run(self, result=None):
        if result is None:
            result = self.defaultTestResult()
            startTestRun = getattr(result, 'startTestRun', None)
            stopTestRun = getattr(result, 'stopTestRun', None)
            if startTestRun is not None:
                startTestRun()
        else:
            stopTestRun = None
    
        result.startTest(self)
        try:
            testMethod = getattr(self, self._testMethodName)
            if (getattr(self.__class__, "__unittest_skip__", False) or
                getattr(testMethod, "__unittest_skip__", False)):
                # If the class or method was skipped.
                skip_why = (getattr(self.__class__, '__unittest_skip_why__', '')
                            or getattr(testMethod, '__unittest_skip_why__', ''))
                _addSkip(result, self, skip_why)
                return result
    
            expecting_failure = (
                getattr(self, "__unittest_expecting_failure__", False) or
                getattr(testMethod, "__unittest_expecting_failure__", False)
            )
            outcome = _Outcome(result)
            start_time = time.perf_counter()
            try:
                self._outcome = outcome
    
                with outcome.testPartExecutor(self):
                    self._callSetUp()
                if outcome.success:
                    outcome.expecting_failure = expecting_failure
                    with outcome.testPartExecutor(self):
>                       self._callTestMethod(testMethod)

.../hostedtoolcache/Python/3.13.11........./x64/lib/python3.13/unittest/case.py:651: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_search_no_perms>
method = <bound method TestProviderLDAP.test_ldap_bind_search_no_perms of <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_search_no_perms>>

    def _callTestMethod(self, method):
>       if method() is not None:

.../hostedtoolcache/Python/3.13.11........./x64/lib/python3.13/unittest/case.py:606: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_search_no_perms>
args = (), kwargs = {}

    @wraps(func)
    def wrapper(self: TransactionTestCase, *args, **kwargs):
        """Run test again if we're below max_retries, including tearDown and
        setUp. Otherwise raise the error"""
        nonlocal count
        try:
>           return func(self, *args, **kwargs)

tests/e2e/utils.py:405: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

args = (<tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_search_no_perms>,)
kwargs = {}, file = 'default/flow-default-invalidation-flow.yaml'
content = 'version: 1\nmetadata:\n  name: Default - Invalidation flow\nentries:\n- attrs:\n    designation: invalidation\n    na...0\n    stage: !KeyOf default-invalidation-logout\n    target: !KeyOf flow\n  model: authentik_flows.flowstagebinding\n'

    @wraps(func)
    def wrapper(*args, **kwargs):
        for file in files:
            content = BlueprintInstance(path=file).retrieve()
            Importer.from_string(content).apply()
>       return func(*args, **kwargs)

.../blueprints/tests/__init__.py:25: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

args = (<tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_search_no_perms>,)
kwargs = {}, config = <AuthentikTenantsConfig: authentik_tenants>

    @wraps(func)
    def wrapper(*args, **kwargs):
        config = apps.get_app_config(app_name)
        if isinstance(config, ManagedAppConfig):
            config._on_startup_callback(None)
>       return func(*args, **kwargs)

.../blueprints/tests/__init__.py:43: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

args = (<tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_search_no_perms>,)
kwargs = {}, config = <AuthentikOutpostConfig: authentik_outposts>

    @wraps(func)
    def wrapper(*args, **kwargs):
        config = apps.get_app_config(app_name)
        if isinstance(config, ManagedAppConfig):
            config._on_startup_callback(None)
>       return func(*args, **kwargs)

.../blueprints/tests/__init__.py:43: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_search_no_perms>

    @retry()
    @apply_blueprint(
        "default/flow-default-authentication-flow.yaml",
        "default/flow-default-invalidation-flow.yaml",
    )
    @reconcile_app("authentik_tenants")
    @reconcile_app("authentik_outposts")
    def test_ldap_bind_search_no_perms(self):
        """Test simple bind + search"""
        user = create_test_user()
>       self._prepare()

tests/e2e/test_provider_ldap.py:328: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_search_no_perms>

    def _prepare(self) -> User:
        """prepare user, provider, app and container"""
        self.user.attributes["extraAttribute"] = "bar"
        self.user.save()
    
        ldap: LDAPProvider = LDAPProvider.objects.create(
            name=generate_id(),
            authorization_flow=Flow.objects.get(slug="default-authentication-flow"),
            search_mode=APIAccessMode.CACHED,
        )
        self.user.assign_perms_to_managed_role("search_full_directory", ldap)
        # we need to create an application to actually access the ldap
        Application.objects.create(name=generate_id(), slug=generate_id(), provider=ldap)
        outpost: Outpost = Outpost.objects.create(
            name=generate_id(),
            type=OutpostType.LDAP,
            _config=asdict(OutpostConfig(log_level="debug")),
        )
        outpost.providers.add(ldap)
    
>       self.start_ldap(outpost)

tests/e2e/test_provider_ldap.py:56: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_search_no_perms>
outpost = <Outpost: Outpost nDIBN70OOyQCmQee4NJu9Ogm5MkPTFzssO9ts1cN>

    def start_ldap(self, outpost: Outpost):
        """Start ldap container based on outpost created"""
>       self.run_container(
            image=self.get_container_image("ghcr.io/goauthentik/dev-ldap"),
            ports={
                "3389": "3389",
                "6636": "6636",
            },
            environment={
                "AUTHENTIK_TOKEN": outpost.token.key,
            },
        )

tests/e2e/test_provider_ldap.py:25: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_search_no_perms>
specs = {'detach': True, 'environment': {'AUTHENTIK_HOST': 'http://10.1.0.238:51533', 'AUTHENTIK_TOKEN': 'E7987dEPIC8FiYpfX9aH...r.io/goauthentik/dev-ldap:gh-main', 'labels': {'io.goauthentik.test': 'd2xPN5ygYmpYNm3zpEzDcqnSQo8TqQridlEF1mZ7'}, ...}
container = <Container: 976e8a89dff2>
state = {'Dead': False, 'Error': '', 'ExitCode': 0, 'FinishedAt': '0001-01-01T00:00:00Z', ...}

    def run_container(self, **specs: Any) -> Container:
        if "network_mode" not in specs:
            specs["network"] = self.__network.name
        specs["labels"] = self.docker_labels
        specs["detach"] = True
        if hasattr(self, "live_server_url"):
            specs.setdefault("environment", {})
            specs["environment"]["AUTHENTIK_HOST"] = self.live_server_url
        container: Container = self.docker_client.containers.run(**specs)
        container.reload()
        state = container.attrs.get("State", {})
        if "Health" not in state:
            return container
>       self.wait_for_container(container)

tests/docker.py:83: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_search_no_perms>
container = <Container: 976e8a89dff2>

    def wait_for_container(self, container: Container) -> Container:
        """Check that container is health"""
        attempt = 0
        while True:
            container.reload()
            status = container.attrs.get("State", {}).get("Health", {}).get("Status")
            if status == "healthy":
                return container
            sleep(1)
            attempt += 1
            if attempt >= self.max_healthcheck_attempts:
                self.output_container_logs(container)
>               raise self.failureException("Container failed to start")
E               AssertionError: Container failed to start

tests/docker.py:57: AssertionError
tests.e2e.test_provider_radius.TestProviderRadius::test_radius_bind_success
Stack Traces | 51.5s run time
self = <unittest.case._Outcome object at 0x7fc4985a20d0>
test_case = <tests.e2e.test_provider_radius.TestProviderRadius testMethod=test_radius_bind_success>
subTest = False

    @contextlib.contextmanager
    def testPartExecutor(self, test_case, subTest=False):
        old_success = self.success
        self.success = True
        try:
>           yield

.../hostedtoolcache/Python/3.13.11........./x64/lib/python3.13/unittest/case.py:58: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_radius.TestProviderRadius testMethod=test_radius_bind_success>
result = <TestCaseFunction test_radius_bind_success>

    def run(self, result=None):
        if result is None:
            result = self.defaultTestResult()
            startTestRun = getattr(result, 'startTestRun', None)
            stopTestRun = getattr(result, 'stopTestRun', None)
            if startTestRun is not None:
                startTestRun()
        else:
            stopTestRun = None
    
        result.startTest(self)
        try:
            testMethod = getattr(self, self._testMethodName)
            if (getattr(self.__class__, "__unittest_skip__", False) or
                getattr(testMethod, "__unittest_skip__", False)):
                # If the class or method was skipped.
                skip_why = (getattr(self.__class__, '__unittest_skip_why__', '')
                            or getattr(testMethod, '__unittest_skip_why__', ''))
                _addSkip(result, self, skip_why)
                return result
    
            expecting_failure = (
                getattr(self, "__unittest_expecting_failure__", False) or
                getattr(testMethod, "__unittest_expecting_failure__", False)
            )
            outcome = _Outcome(result)
            start_time = time.perf_counter()
            try:
                self._outcome = outcome
    
                with outcome.testPartExecutor(self):
                    self._callSetUp()
                if outcome.success:
                    outcome.expecting_failure = expecting_failure
                    with outcome.testPartExecutor(self):
>                       self._callTestMethod(testMethod)

.../hostedtoolcache/Python/3.13.11........./x64/lib/python3.13/unittest/case.py:651: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_radius.TestProviderRadius testMethod=test_radius_bind_success>
method = <bound method TestProviderRadius.test_radius_bind_success of <tests.e2e.test_provider_radius.TestProviderRadius testMethod=test_radius_bind_success>>

    def _callTestMethod(self, method):
>       if method() is not None:

.../hostedtoolcache/Python/3.13.11........./x64/lib/python3.13/unittest/case.py:606: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_radius.TestProviderRadius testMethod=test_radius_bind_success>
args = (), kwargs = {}

    @wraps(func)
    def wrapper(self: TransactionTestCase, *args, **kwargs):
        """Run test again if we're below max_retries, including tearDown and
        setUp. Otherwise raise the error"""
        nonlocal count
        try:
>           return func(self, *args, **kwargs)

tests/e2e/utils.py:405: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

args = (<tests.e2e.test_provider_radius.TestProviderRadius testMethod=test_radius_bind_success>,)
kwargs = {}, file = 'default/flow-default-invalidation-flow.yaml'
content = 'version: 1\nmetadata:\n  name: Default - Invalidation flow\nentries:\n- attrs:\n    designation: invalidation\n    na...0\n    stage: !KeyOf default-invalidation-logout\n    target: !KeyOf flow\n  model: authentik_flows.flowstagebinding\n'

    @wraps(func)
    def wrapper(*args, **kwargs):
        for file in files:
            content = BlueprintInstance(path=file).retrieve()
            Importer.from_string(content).apply()
>       return func(*args, **kwargs)

.../blueprints/tests/__init__.py:25: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_radius.TestProviderRadius testMethod=test_radius_bind_success>

    @retry(exceptions=[Timeout])
    @apply_blueprint(
        "default/flow-default-authentication-flow.yaml",
        "default/flow-default-invalidation-flow.yaml",
    )
    def test_radius_bind_success(self):
        """Test simple bind"""
>       self._prepare()

tests/e2e/test_provider_radius.py:64: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_radius.TestProviderRadius testMethod=test_radius_bind_success>

    def _prepare(self) -> User:
        """prepare user, provider, app and container"""
        radius: RadiusProvider = RadiusProvider.objects.create(
            name=generate_id(),
            authorization_flow=Flow.objects.get(slug="default-authentication-flow"),
            shared_secret=self.shared_secret,
        )
        # we need to create an application to actually access radius
        Application.objects.create(name=generate_id(), slug=generate_id(), provider=radius)
        outpost: Outpost = Outpost.objects.create(
            name=generate_id(),
            type=OutpostType.RADIUS,
            _config=asdict(OutpostConfig(log_level="debug")),
        )
        outpost.providers.add(radius)
    
>       self.start_radius(outpost)

tests/e2e/test_provider_radius.py:52: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_radius.TestProviderRadius testMethod=test_radius_bind_success>
outpost = <Outpost: Outpost 6wddg1uOb9OB2MzPrk9CXL6a8WTuLtqtC4l0bKrv>

    def start_radius(self, outpost: Outpost):
        """Start radius container based on outpost created"""
>       self.run_container(
            image=self.get_container_image("ghcr.io/goauthentik/dev-radius"),
            ports={"1812/udp": 1812},
            environment={
                "AUTHENTIK_TOKEN": outpost.token.key,
            },
        )

tests/e2e/test_provider_radius.py:28: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_radius.TestProviderRadius testMethod=test_radius_bind_success>
specs = {'detach': True, 'environment': {'AUTHENTIK_HOST': 'http://10.1.0.149:54631', 'AUTHENTIK_TOKEN': 'xFciNxIYCNaTH8UU1S3E...io/goauthentik/dev-radius:gh-main', 'labels': {'io.goauthentik.test': 'qGi1bl8PFMH64Xr6MZAPkgjhG8UCyQ3B1Kkj1P9K'}, ...}
container = <Container: 06f077ece6c2>
state = {'Dead': False, 'Error': '', 'ExitCode': 0, 'FinishedAt': '0001-01-01T00:00:00Z', ...}

    def run_container(self, **specs: Any) -> Container:
        if "network_mode" not in specs:
            specs["network"] = self.__network.name
        specs["labels"] = self.docker_labels
        specs["detach"] = True
        if hasattr(self, "live_server_url"):
            specs.setdefault("environment", {})
            specs["environment"]["AUTHENTIK_HOST"] = self.live_server_url
        container: Container = self.docker_client.containers.run(**specs)
        container.reload()
        state = container.attrs.get("State", {})
        if "Health" not in state:
            return container
>       self.wait_for_container(container)

tests/docker.py:83: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_radius.TestProviderRadius testMethod=test_radius_bind_success>
container = <Container: 06f077ece6c2>

    def wait_for_container(self, container: Container) -> Container:
        """Check that container is health"""
        attempt = 0
        while True:
            container.reload()
            status = container.attrs.get("State", {}).get("Health", {}).get("Status")
            if status == "healthy":
                return container
            sleep(1)
            attempt += 1
            if attempt >= self.max_healthcheck_attempts:
                self.output_container_logs(container)
>               raise self.failureException("Container failed to start")
E               AssertionError: Container failed to start

tests/docker.py:57: AssertionError
tests.e2e.test_provider_proxy_forward.TestProviderProxyForward::test_caddy
Stack Traces | 52.2s run time
self = <unittest.case._Outcome object at 0x7f12221fb360>
test_case = <tests.e2e.test_provider_proxy_forward.TestProviderProxyForward testMethod=test_caddy>
subTest = False

    @contextlib.contextmanager
    def testPartExecutor(self, test_case, subTest=False):
        old_success = self.success
        self.success = True
        try:
>           yield

.../hostedtoolcache/Python/3.13.11........./x64/lib/python3.13/unittest/case.py:58: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_proxy_forward.TestProviderProxyForward testMethod=test_caddy>
result = <TestCaseFunction test_caddy>

    def run(self, result=None):
        if result is None:
            result = self.defaultTestResult()
            startTestRun = getattr(result, 'startTestRun', None)
            stopTestRun = getattr(result, 'stopTestRun', None)
            if startTestRun is not None:
                startTestRun()
        else:
            stopTestRun = None
    
        result.startTest(self)
        try:
            testMethod = getattr(self, self._testMethodName)
            if (getattr(self.__class__, "__unittest_skip__", False) or
                getattr(testMethod, "__unittest_skip__", False)):
                # If the class or method was skipped.
                skip_why = (getattr(self.__class__, '__unittest_skip_why__', '')
                            or getattr(testMethod, '__unittest_skip_why__', ''))
                _addSkip(result, self, skip_why)
                return result
    
            expecting_failure = (
                getattr(self, "__unittest_expecting_failure__", False) or
                getattr(testMethod, "__unittest_expecting_failure__", False)
            )
            outcome = _Outcome(result)
            start_time = time.perf_counter()
            try:
                self._outcome = outcome
    
                with outcome.testPartExecutor(self):
                    self._callSetUp()
                if outcome.success:
                    outcome.expecting_failure = expecting_failure
                    with outcome.testPartExecutor(self):
>                       self._callTestMethod(testMethod)

.../hostedtoolcache/Python/3.13.11........./x64/lib/python3.13/unittest/case.py:651: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_proxy_forward.TestProviderProxyForward testMethod=test_caddy>
method = <bound method TestProviderProxyForward.test_caddy of <tests.e2e.test_provider_proxy_forward.TestProviderProxyForward testMethod=test_caddy>>

    def _callTestMethod(self, method):
>       if method() is not None:

.../hostedtoolcache/Python/3.13.11........./x64/lib/python3.13/unittest/case.py:606: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_proxy_forward.TestProviderProxyForward testMethod=test_caddy>
args = (), kwargs = {}

    @wraps(func)
    def wrapper(self: TransactionTestCase, *args, **kwargs):
        """Run test again if we're below max_retries, including tearDown and
        setUp. Otherwise raise the error"""
        nonlocal count
        try:
>           return func(self, *args, **kwargs)

tests/e2e/utils.py:405: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_proxy_forward.TestProviderProxyForward testMethod=test_caddy>

    @retry()
    def test_caddy(self):
        """Test caddy"""
        local_config_path = (
            Path(__file__).parent / "proxy_forward_auth" / "caddy_single" / "Caddyfile"
        )
        self.run_container(
            image="docker.io/library/caddy:2.8",
            ports={
                "80": "80",
            },
            volumes={
                local_config_path: {
                    "bind": ".../etc/caddy/Caddyfile",
                }
            },
        )
    
>       self.prepare()

tests/e2e/test_provider_proxy_forward.py:217: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

args = (<tests.e2e.test_provider_proxy_forward.TestProviderProxyForward testMethod=test_caddy>,)
kwargs = {}, file = 'default/flow-default-invalidation-flow.yaml'
content = 'version: 1\nmetadata:\n  name: Default - Invalidation flow\nentries:\n- attrs:\n    designation: invalidation\n    na...0\n    stage: !KeyOf default-invalidation-logout\n    target: !KeyOf flow\n  model: authentik_flows.flowstagebinding\n'

    @wraps(func)
    def wrapper(*args, **kwargs):
        for file in files:
            content = BlueprintInstance(path=file).retrieve()
            Importer.from_string(content).apply()
>       return func(*args, **kwargs)

.../blueprints/tests/__init__.py:25: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

args = (<tests.e2e.test_provider_proxy_forward.TestProviderProxyForward testMethod=test_caddy>,)
kwargs = {}, file = 'default/flow-default-provider-invalidation.yaml'
content = 'version: 1\nmetadata:\n  name: Default - Provider invalidation flow\nentries:\n- attrs:\n    designation: invalidatio...ation: none\n  identifiers:\n    slug: default-provider-invalidation-flow\n  model: authentik_flows.flow\n  id: flow\n'

    @wraps(func)
    def wrapper(*args, **kwargs):
        for file in files:
            content = BlueprintInstance(path=file).retrieve()
            Importer.from_string(content).apply()
>       return func(*args, **kwargs)

.../blueprints/tests/__init__.py:25: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

args = (<tests.e2e.test_provider_proxy_forward.TestProviderProxyForward testMethod=test_caddy>,)
kwargs = {}, file = 'system/providers-proxy.yaml'
content = 'version: 1\nmetadata:\n  labels:\n    blueprints.goauthentik.io/system: "true"\n  name: System - Proxy Provider - Sco...ser.group_attributes(request),\n                "is_superuser": request.user.is_superuser,\n            }\n        }\n'

    @wraps(func)
    def wrapper(*args, **kwargs):
        for file in files:
            content = BlueprintInstance(path=file).retrieve()
            Importer.from_string(content).apply()
>       return func(*args, **kwargs)

.../blueprints/tests/__init__.py:25: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

args = (<tests.e2e.test_provider_proxy_forward.TestProviderProxyForward testMethod=test_caddy>,)
kwargs = {}, config = <AuthentikCryptoConfig: authentik_crypto>

    @wraps(func)
    def wrapper(*args, **kwargs):
        config = apps.get_app_config(app_name)
        if isinstance(config, ManagedAppConfig):
            config._on_startup_callback(None)
>       return func(*args, **kwargs)

.../blueprints/tests/__init__.py:43: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_proxy_forward.TestProviderProxyForward testMethod=test_caddy>

    @apply_blueprint(
        "default/flow-default-authentication-flow.yaml",
        "default/flow-default-invalidation-flow.yaml",
    )
    @apply_blueprint(
        "default/flow-default-provider-authorization-implicit-consent.yaml",
        "default/flow-default-provider-invalidation.yaml",
    )
    @apply_blueprint(
        "system/providers-oauth2.yaml",
        "system/providers-proxy.yaml",
    )
    @reconcile_app("authentik_crypto")
    def prepare(self):
        proxy: ProxyProvider = ProxyProvider.objects.create(
            name=generate_id(),
            mode=ProxyMode.FORWARD_SINGLE,
            authorization_flow=Flow.objects.get(
                slug="default-provider-authorization-implicit-consent"
            ),
            invalidation_flow=Flow.objects.get(slug="default-provider-invalidation-flow"),
            internal_host=f"http://{self.host}",
            external_host="http://localhost",
        )
        # Ensure OAuth2 Params are set
        proxy.set_oauth_defaults()
        proxy.save()
        # we need to create an application to actually access the proxy
        Application.objects.create(name=generate_id(), slug=generate_id(), provider=proxy)
        outpost: Outpost = Outpost.objects.create(
            name=generate_id(),
            type=OutpostType.PROXY,
        )
        outpost.providers.add(proxy)
        outpost.build_user_permissions(outpost.user)
    
>       self.start_outpost(outpost)

tests/e2e/test_provider_proxy_forward.py:78: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_proxy_forward.TestProviderProxyForward testMethod=test_caddy>
outpost = <Outpost: Outpost z3bR6yhmfBljHNY64BU3pfzCaefcPLMpABnfCmha>

    def start_outpost(self, outpost: Outpost):
        """Start proxy container based on outpost created"""
>       self.run_container(
            image=self.get_container_image("ghcr.io/goauthentik/dev-proxy"),
            ports={
                "9000": "9000",
            },
            environment={
                "AUTHENTIK_TOKEN": outpost.token.key,
            },
            name="ak-test-outpost",
        )

tests/e2e/test_provider_proxy_forward.py:31: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_proxy_forward.TestProviderProxyForward testMethod=test_caddy>
specs = {'detach': True, 'environment': {'AUTHENTIK_HOST': 'http://10.1.0.71:47765', 'AUTHENTIK_TOKEN': 'vGg1VOUrv4UBd5YQmzdl9....io/goauthentik/dev-proxy:gh-main', 'labels': {'io.goauthentik.test': 'UJ8BAIzQTcuLL7IrLvDfriurgXvpz2lGbYwY4cUn'}, ...}
container = <Container: c53835cb34df>
state = {'Dead': False, 'Error': '', 'ExitCode': 0, 'FinishedAt': '0001-01-01T00:00:00Z', ...}

    def run_container(self, **specs: Any) -> Container:
        if "network_mode" not in specs:
            specs["network"] = self.__network.name
        specs["labels"] = self.docker_labels
        specs["detach"] = True
        if hasattr(self, "live_server_url"):
            specs.setdefault("environment", {})
            specs["environment"]["AUTHENTIK_HOST"] = self.live_server_url
        container: Container = self.docker_client.containers.run(**specs)
        container.reload()
        state = container.attrs.get("State", {})
        if "Health" not in state:
            return container
>       self.wait_for_container(container)

tests/docker.py:83: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_proxy_forward.TestProviderProxyForward testMethod=test_caddy>
container = <Container: c53835cb34df>

    def wait_for_container(self, container: Container) -> Container:
        """Check that container is health"""
        attempt = 0
        while True:
            container.reload()
            status = container.attrs.get("State", {}).get("Health", {}).get("Status")
            if status == "healthy":
                return container
            sleep(1)
            attempt += 1
            if attempt >= self.max_healthcheck_attempts:
                self.output_container_logs(container)
>               raise self.failureException("Container failed to start")
E               AssertionError: Container failed to start

tests/docker.py:57: AssertionError
tests.e2e.test_provider_proxy.TestProviderProxy::test_proxy_basic_auth
Stack Traces | 52.3s run time
self = <unittest.case._Outcome object at 0x7f1221c21e50>
test_case = <tests.e2e.test_provider_proxy.TestProviderProxy testMethod=test_proxy_basic_auth>
subTest = False

    @contextlib.contextmanager
    def testPartExecutor(self, test_case, subTest=False):
        old_success = self.success
        self.success = True
        try:
>           yield

.../hostedtoolcache/Python/3.13.11........./x64/lib/python3.13/unittest/case.py:58: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_proxy.TestProviderProxy testMethod=test_proxy_basic_auth>
result = <TestCaseFunction test_proxy_basic_auth>

    def run(self, result=None):
        if result is None:
            result = self.defaultTestResult()
            startTestRun = getattr(result, 'startTestRun', None)
            stopTestRun = getattr(result, 'stopTestRun', None)
            if startTestRun is not None:
                startTestRun()
        else:
            stopTestRun = None
    
        result.startTest(self)
        try:
            testMethod = getattr(self, self._testMethodName)
            if (getattr(self.__class__, "__unittest_skip__", False) or
                getattr(testMethod, "__unittest_skip__", False)):
                # If the class or method was skipped.
                skip_why = (getattr(self.__class__, '__unittest_skip_why__', '')
                            or getattr(testMethod, '__unittest_skip_why__', ''))
                _addSkip(result, self, skip_why)
                return result
    
            expecting_failure = (
                getattr(self, "__unittest_expecting_failure__", False) or
                getattr(testMethod, "__unittest_expecting_failure__", False)
            )
            outcome = _Outcome(result)
            start_time = time.perf_counter()
            try:
                self._outcome = outcome
    
                with outcome.testPartExecutor(self):
                    self._callSetUp()
                if outcome.success:
                    outcome.expecting_failure = expecting_failure
                    with outcome.testPartExecutor(self):
>                       self._callTestMethod(testMethod)

.../hostedtoolcache/Python/3.13.11........./x64/lib/python3.13/unittest/case.py:651: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_proxy.TestProviderProxy testMethod=test_proxy_basic_auth>
method = <bound method TestProviderProxy.test_proxy_basic_auth of <tests.e2e.test_provider_proxy.TestProviderProxy testMethod=test_proxy_basic_auth>>

    def _callTestMethod(self, method):
>       if method() is not None:

.../hostedtoolcache/Python/3.13.11........./x64/lib/python3.13/unittest/case.py:606: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_proxy.TestProviderProxy testMethod=test_proxy_basic_auth>
args = (), kwargs = {}

    @wraps(func)
    def wrapper(self: TransactionTestCase, *args, **kwargs):
        """Run test again if we're below max_retries, including tearDown and
        setUp. Otherwise raise the error"""
        nonlocal count
        try:
>           return func(self, *args, **kwargs)

tests/e2e/utils.py:405: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

args = (<tests.e2e.test_provider_proxy.TestProviderProxy testMethod=test_proxy_basic_auth>,)
kwargs = {}, file = 'default/flow-default-invalidation-flow.yaml'
content = 'version: 1\nmetadata:\n  name: Default - Invalidation flow\nentries:\n- attrs:\n    designation: invalidation\n    na...0\n    stage: !KeyOf default-invalidation-logout\n    target: !KeyOf flow\n  model: authentik_flows.flowstagebinding\n'

    @wraps(func)
    def wrapper(*args, **kwargs):
        for file in files:
            content = BlueprintInstance(path=file).retrieve()
            Importer.from_string(content).apply()
>       return func(*args, **kwargs)

.../blueprints/tests/__init__.py:25: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

args = (<tests.e2e.test_provider_proxy.TestProviderProxy testMethod=test_proxy_basic_auth>,)
kwargs = {}, file = 'default/flow-default-provider-invalidation.yaml'
content = 'version: 1\nmetadata:\n  name: Default - Provider invalidation flow\nentries:\n- attrs:\n    designation: invalidatio...ation: none\n  identifiers:\n    slug: default-provider-invalidation-flow\n  model: authentik_flows.flow\n  id: flow\n'

    @wraps(func)
    def wrapper(*args, **kwargs):
        for file in files:
            content = BlueprintInstance(path=file).retrieve()
            Importer.from_string(content).apply()
>       return func(*args, **kwargs)

.../blueprints/tests/__init__.py:25: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

args = (<tests.e2e.test_provider_proxy.TestProviderProxy testMethod=test_proxy_basic_auth>,)
kwargs = {}, file = 'system/providers-proxy.yaml'
content = 'version: 1\nmetadata:\n  labels:\n    blueprints.goauthentik.io/system: "true"\n  name: System - Proxy Provider - Sco...ser.group_attributes(request),\n                "is_superuser": request.user.is_superuser,\n            }\n        }\n'

    @wraps(func)
    def wrapper(*args, **kwargs):
        for file in files:
            content = BlueprintInstance(path=file).retrieve()
            Importer.from_string(content).apply()
>       return func(*args, **kwargs)

.../blueprints/tests/__init__.py:25: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

args = (<tests.e2e.test_provider_proxy.TestProviderProxy testMethod=test_proxy_basic_auth>,)
kwargs = {}, config = <AuthentikCryptoConfig: authentik_crypto>

    @wraps(func)
    def wrapper(*args, **kwargs):
        config = apps.get_app_config(app_name)
        if isinstance(config, ManagedAppConfig):
            config._on_startup_callback(None)
>       return func(*args, **kwargs)

.../blueprints/tests/__init__.py:43: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_proxy.TestProviderProxy testMethod=test_proxy_basic_auth>

    @retry()
    @apply_blueprint(
        "default/flow-default-authentication-flow.yaml",
        "default/flow-default-invalidation-flow.yaml",
    )
    @apply_blueprint(
        "default/flow-default-provider-authorization-implicit-consent.yaml",
        "default/flow-default-provider-invalidation.yaml",
    )
    @apply_blueprint(
        "system/providers-oauth2.yaml",
        "system/providers-proxy.yaml",
    )
    @reconcile_app("authentik_crypto")
    def test_proxy_basic_auth(self):
        """Test simple outpost setup with single provider"""
        cred = generate_id()
        attr = "basic-password"  # nosec
        self.user.attributes["basic-username"] = cred
        self.user.attributes[attr] = cred
        self.user.save()
    
        proxy: ProxyProvider = ProxyProvider.objects.create(
            name=generate_id(),
            authorization_flow=Flow.objects.get(
                slug="default-provider-authorization-implicit-consent"
            ),
            invalidation_flow=Flow.objects.get(slug="default-provider-invalidation-flow"),
            internal_host=f"http://{self.host}",
            external_host="http://localhost:9000",
            basic_auth_enabled=True,
            basic_auth_user_attribute="basic-username",
            basic_auth_password_attribute=attr,
        )
        # Ensure OAuth2 Params are set
        proxy.set_oauth_defaults()
        proxy.save()
        # we need to create an application to actually access the proxy
        Application.objects.create(name=generate_id(), slug=generate_id(), provider=proxy)
        outpost: Outpost = Outpost.objects.create(
            name=generate_id(),
            type=OutpostType.PROXY,
        )
        outpost.providers.add(proxy)
        outpost.build_user_permissions(outpost.user)
    
>       self.start_proxy(outpost)

tests/e2e/test_provider_proxy.py:177: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_proxy.TestProviderProxy testMethod=test_proxy_basic_auth>
outpost = <Outpost: Outpost ZxFYtyJ6odKW2xQMQY7KqWNFSLNPl5npqY2ERTix>

    def start_proxy(self, outpost: Outpost):
        """Start proxy container based on outpost created"""
>       self.run_container(
            image=self.get_container_image("ghcr.io/goauthentik/dev-proxy"),
            ports={
                "9000": "9000",
            },
            environment={
                "AUTHENTIK_TOKEN": outpost.token.key,
            },
        )

tests/e2e/test_provider_proxy.py:38: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_proxy.TestProviderProxy testMethod=test_proxy_basic_auth>
specs = {'detach': True, 'environment': {'AUTHENTIK_HOST': 'http://10.1.0.71:45991', 'AUTHENTIK_TOKEN': 'PkhcMeBFEEBwOF4TSwtjd....io/goauthentik/dev-proxy:gh-main', 'labels': {'io.goauthentik.test': 'UJ8BAIzQTcuLL7IrLvDfriurgXvpz2lGbYwY4cUn'}, ...}
container = <Container: 2146a5a5385a>
state = {'Dead': False, 'Error': '', 'ExitCode': 0, 'FinishedAt': '0001-01-01T00:00:00Z', ...}

    def run_container(self, **specs: Any) -> Container:
        if "network_mode" not in specs:
            specs["network"] = self.__network.name
        specs["labels"] = self.docker_labels
        specs["detach"] = True
        if hasattr(self, "live_server_url"):
            specs.setdefault("environment", {})
            specs["environment"]["AUTHENTIK_HOST"] = self.live_server_url
        container: Container = self.docker_client.containers.run(**specs)
        container.reload()
        state = container.attrs.get("State", {})
        if "Health" not in state:
            return container
>       self.wait_for_container(container)

tests/docker.py:83: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_proxy.TestProviderProxy testMethod=test_proxy_basic_auth>
container = <Container: 2146a5a5385a>

    def wait_for_container(self, container: Container) -> Container:
        """Check that container is health"""
        attempt = 0
        while True:
            container.reload()
            status = container.attrs.get("State", {}).get("Health", {}).get("Status")
            if status == "healthy":
                return container
            sleep(1)
            attempt += 1
            if attempt >= self.max_healthcheck_attempts:
                self.output_container_logs(container)
>               raise self.failureException("Container failed to start")
E               AssertionError: Container failed to start

tests/docker.py:57: AssertionError
tests.e2e.test_provider_ldap.TestProviderLDAP::test_ldap_bind_success_ssl
Stack Traces | 53.1s run time
self = <unittest.case._Outcome object at 0x7f32ca57c770>
test_case = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_success_ssl>
subTest = False

    @contextlib.contextmanager
    def testPartExecutor(self, test_case, subTest=False):
        old_success = self.success
        self.success = True
        try:
>           yield

.../hostedtoolcache/Python/3.13.11........./x64/lib/python3.13/unittest/case.py:58: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_success_ssl>
result = <TestCaseFunction test_ldap_bind_success_ssl>

    def run(self, result=None):
        if result is None:
            result = self.defaultTestResult()
            startTestRun = getattr(result, 'startTestRun', None)
            stopTestRun = getattr(result, 'stopTestRun', None)
            if startTestRun is not None:
                startTestRun()
        else:
            stopTestRun = None
    
        result.startTest(self)
        try:
            testMethod = getattr(self, self._testMethodName)
            if (getattr(self.__class__, "__unittest_skip__", False) or
                getattr(testMethod, "__unittest_skip__", False)):
                # If the class or method was skipped.
                skip_why = (getattr(self.__class__, '__unittest_skip_why__', '')
                            or getattr(testMethod, '__unittest_skip_why__', ''))
                _addSkip(result, self, skip_why)
                return result
    
            expecting_failure = (
                getattr(self, "__unittest_expecting_failure__", False) or
                getattr(testMethod, "__unittest_expecting_failure__", False)
            )
            outcome = _Outcome(result)
            start_time = time.perf_counter()
            try:
                self._outcome = outcome
    
                with outcome.testPartExecutor(self):
                    self._callSetUp()
                if outcome.success:
                    outcome.expecting_failure = expecting_failure
                    with outcome.testPartExecutor(self):
>                       self._callTestMethod(testMethod)

.../hostedtoolcache/Python/3.13.11........./x64/lib/python3.13/unittest/case.py:651: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_success_ssl>
method = <bound method TestProviderLDAP.test_ldap_bind_success_ssl of <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_success_ssl>>

    def _callTestMethod(self, method):
>       if method() is not None:

.../hostedtoolcache/Python/3.13.11........./x64/lib/python3.13/unittest/case.py:606: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_success_ssl>
args = (), kwargs = {}

    @wraps(func)
    def wrapper(self: TransactionTestCase, *args, **kwargs):
        """Run test again if we're below max_retries, including tearDown and
        setUp. Otherwise raise the error"""
        nonlocal count
        try:
>           return func(self, *args, **kwargs)

tests/e2e/utils.py:405: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

args = (<tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_success_ssl>,)
kwargs = {}, file = 'default/flow-default-invalidation-flow.yaml'
content = 'version: 1\nmetadata:\n  name: Default - Invalidation flow\nentries:\n- attrs:\n    designation: invalidation\n    na...0\n    stage: !KeyOf default-invalidation-logout\n    target: !KeyOf flow\n  model: authentik_flows.flowstagebinding\n'

    @wraps(func)
    def wrapper(*args, **kwargs):
        for file in files:
            content = BlueprintInstance(path=file).retrieve()
            Importer.from_string(content).apply()
>       return func(*args, **kwargs)

.../blueprints/tests/__init__.py:25: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_success_ssl>

    @retry()
    @apply_blueprint(
        "default/flow-default-authentication-flow.yaml",
        "default/flow-default-invalidation-flow.yaml",
    )
    def test_ldap_bind_success_ssl(self):
        """Test simple bind with ssl"""
>       self._prepare()

tests/e2e/test_provider_ldap.py:93: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_success_ssl>

    def _prepare(self) -> User:
        """prepare user, provider, app and container"""
        self.user.attributes["extraAttribute"] = "bar"
        self.user.save()
    
        ldap: LDAPProvider = LDAPProvider.objects.create(
            name=generate_id(),
            authorization_flow=Flow.objects.get(slug="default-authentication-flow"),
            search_mode=APIAccessMode.CACHED,
        )
        self.user.assign_perms_to_managed_role("search_full_directory", ldap)
        # we need to create an application to actually access the ldap
        Application.objects.create(name=generate_id(), slug=generate_id(), provider=ldap)
        outpost: Outpost = Outpost.objects.create(
            name=generate_id(),
            type=OutpostType.LDAP,
            _config=asdict(OutpostConfig(log_level="debug")),
        )
        outpost.providers.add(ldap)
    
>       self.start_ldap(outpost)

tests/e2e/test_provider_ldap.py:56: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_success_ssl>
outpost = <Outpost: Outpost l2I2BUn1UQ7291AiuyROW4QjsWKMi1av6GFBctDK>

    def start_ldap(self, outpost: Outpost):
        """Start ldap container based on outpost created"""
>       self.run_container(
            image=self.get_container_image("ghcr.io/goauthentik/dev-ldap"),
            ports={
                "3389": "3389",
                "6636": "6636",
            },
            environment={
                "AUTHENTIK_TOKEN": outpost.token.key,
            },
        )

tests/e2e/test_provider_ldap.py:25: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_success_ssl>
specs = {'detach': True, 'environment': {'AUTHENTIK_HOST': 'http://10.1.0.238:51533', 'AUTHENTIK_TOKEN': 'KhpfApiB3nZziu47cvww...r.io/goauthentik/dev-ldap:gh-main', 'labels': {'io.goauthentik.test': 'd2xPN5ygYmpYNm3zpEzDcqnSQo8TqQridlEF1mZ7'}, ...}
container = <Container: e415a66f35c6>
state = {'Dead': False, 'Error': '', 'ExitCode': 0, 'FinishedAt': '0001-01-01T00:00:00Z', ...}

    def run_container(self, **specs: Any) -> Container:
        if "network_mode" not in specs:
            specs["network"] = self.__network.name
        specs["labels"] = self.docker_labels
        specs["detach"] = True
        if hasattr(self, "live_server_url"):
            specs.setdefault("environment", {})
            specs["environment"]["AUTHENTIK_HOST"] = self.live_server_url
        container: Container = self.docker_client.containers.run(**specs)
        container.reload()
        state = container.attrs.get("State", {})
        if "Health" not in state:
            return container
>       self.wait_for_container(container)

tests/docker.py:83: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_success_ssl>
container = <Container: e415a66f35c6>

    def wait_for_container(self, container: Container) -> Container:
        """Check that container is health"""
        attempt = 0
        while True:
            container.reload()
            status = container.attrs.get("State", {}).get("Health", {}).get("Status")
            if status == "healthy":
                return container
            sleep(1)
            attempt += 1
            if attempt >= self.max_healthcheck_attempts:
                self.output_container_logs(container)
>               raise self.failureException("Container failed to start")
E               AssertionError: Container failed to start

tests/docker.py:57: AssertionError
tests.e2e.test_provider_proxy_forward.TestProviderProxyForward::test_traefik
Stack Traces | 53.6s run time
self = <unittest.case._Outcome object at 0x7f1222054c30>
test_case = <tests.e2e.test_provider_proxy_forward.TestProviderProxyForward testMethod=test_traefik>
subTest = False

    @contextlib.contextmanager
    def testPartExecutor(self, test_case, subTest=False):
        old_success = self.success
        self.success = True
        try:
>           yield

.../hostedtoolcache/Python/3.13.11........./x64/lib/python3.13/unittest/case.py:58: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_proxy_forward.TestProviderProxyForward testMethod=test_traefik>
result = <TestCaseFunction test_traefik>

    def run(self, result=None):
        if result is None:
            result = self.defaultTestResult()
            startTestRun = getattr(result, 'startTestRun', None)
            stopTestRun = getattr(result, 'stopTestRun', None)
            if startTestRun is not None:
                startTestRun()
        else:
            stopTestRun = None
    
        result.startTest(self)
        try:
            testMethod = getattr(self, self._testMethodName)
            if (getattr(self.__class__, "__unittest_skip__", False) or
                getattr(testMethod, "__unittest_skip__", False)):
                # If the class or method was skipped.
                skip_why = (getattr(self.__class__, '__unittest_skip_why__', '')
                            or getattr(testMethod, '__unittest_skip_why__', ''))
                _addSkip(result, self, skip_why)
                return result
    
            expecting_failure = (
                getattr(self, "__unittest_expecting_failure__", False) or
                getattr(testMethod, "__unittest_expecting_failure__", False)
            )
            outcome = _Outcome(result)
            start_time = time.perf_counter()
            try:
                self._outcome = outcome
    
                with outcome.testPartExecutor(self):
                    self._callSetUp()
                if outcome.success:
                    outcome.expecting_failure = expecting_failure
                    with outcome.testPartExecutor(self):
>                       self._callTestMethod(testMethod)

.../hostedtoolcache/Python/3.13.11........./x64/lib/python3.13/unittest/case.py:651: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_proxy_forward.TestProviderProxyForward testMethod=test_traefik>
method = <bound method TestProviderProxyForward.test_traefik of <tests.e2e.test_provider_proxy_forward.TestProviderProxyForward testMethod=test_traefik>>

    def _callTestMethod(self, method):
>       if method() is not None:

.../hostedtoolcache/Python/3.13.11........./x64/lib/python3.13/unittest/case.py:606: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_proxy_forward.TestProviderProxyForward testMethod=test_traefik>
args = (), kwargs = {}

    @wraps(func)
    def wrapper(self: TransactionTestCase, *args, **kwargs):
        """Run test again if we're below max_retries, including tearDown and
        setUp. Otherwise raise the error"""
        nonlocal count
        try:
>           return func(self, *args, **kwargs)

tests/e2e/utils.py:405: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_proxy_forward.TestProviderProxyForward testMethod=test_traefik>

    @retry()
    def test_traefik(self):
        """Test traefik"""
        local_config_path = (
            Path(__file__).parent / "proxy_forward_auth" / "traefik_single" / "config-static.yaml"
        )
        self.run_container(
            image="docker.io/library/traefik:3.1",
            ports={
                "80": "80",
            },
            volumes={
                local_config_path: {
                    "bind": ".../etc/traefik/traefik.yml",
                }
            },
        )
    
>       self.prepare()

tests/e2e/test_provider_proxy_forward.py:98: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

args = (<tests.e2e.test_provider_proxy_forward.TestProviderProxyForward testMethod=test_traefik>,)
kwargs = {}, file = 'default/flow-default-invalidation-flow.yaml'
content = 'version: 1\nmetadata:\n  name: Default - Invalidation flow\nentries:\n- attrs:\n    designation: invalidation\n    na...0\n    stage: !KeyOf default-invalidation-logout\n    target: !KeyOf flow\n  model: authentik_flows.flowstagebinding\n'

    @wraps(func)
    def wrapper(*args, **kwargs):
        for file in files:
            content = BlueprintInstance(path=file).retrieve()
            Importer.from_string(content).apply()
>       return func(*args, **kwargs)

.../blueprints/tests/__init__.py:25: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

args = (<tests.e2e.test_provider_proxy_forward.TestProviderProxyForward testMethod=test_traefik>,)
kwargs = {}, file = 'default/flow-default-provider-invalidation.yaml'
content = 'version: 1\nmetadata:\n  name: Default - Provider invalidation flow\nentries:\n- attrs:\n    designation: invalidatio...ation: none\n  identifiers:\n    slug: default-provider-invalidation-flow\n  model: authentik_flows.flow\n  id: flow\n'

    @wraps(func)
    def wrapper(*args, **kwargs):
        for file in files:
            content = BlueprintInstance(path=file).retrieve()
            Importer.from_string(content).apply()
>       return func(*args, **kwargs)

.../blueprints/tests/__init__.py:25: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

args = (<tests.e2e.test_provider_proxy_forward.TestProviderProxyForward testMethod=test_traefik>,)
kwargs = {}, file = 'system/providers-proxy.yaml'
content = 'version: 1\nmetadata:\n  labels:\n    blueprints.goauthentik.io/system: "true"\n  name: System - Proxy Provider - Sco...ser.group_attributes(request),\n                "is_superuser": request.user.is_superuser,\n            }\n        }\n'

    @wraps(func)
    def wrapper(*args, **kwargs):
        for file in files:
            content = BlueprintInstance(path=file).retrieve()
            Importer.from_string(content).apply()
>       return func(*args, **kwargs)

.../blueprints/tests/__init__.py:25: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

args = (<tests.e2e.test_provider_proxy_forward.TestProviderProxyForward testMethod=test_traefik>,)
kwargs = {}, config = <AuthentikCryptoConfig: authentik_crypto>

    @wraps(func)
    def wrapper(*args, **kwargs):
        config = apps.get_app_config(app_name)
        if isinstance(config, ManagedAppConfig):
            config._on_startup_callback(None)
>       return func(*args, **kwargs)

.../blueprints/tests/__init__.py:43: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_proxy_forward.TestProviderProxyForward testMethod=test_traefik>

    @apply_blueprint(
        "default/flow-default-authentication-flow.yaml",
        "default/flow-default-invalidation-flow.yaml",
    )
    @apply_blueprint(
        "default/flow-default-provider-authorization-implicit-consent.yaml",
        "default/flow-default-provider-invalidation.yaml",
    )
    @apply_blueprint(
        "system/providers-oauth2.yaml",
        "system/providers-proxy.yaml",
    )
    @reconcile_app("authentik_crypto")
    def prepare(self):
        proxy: ProxyProvider = ProxyProvider.objects.create(
            name=generate_id(),
            mode=ProxyMode.FORWARD_SINGLE,
            authorization_flow=Flow.objects.get(
                slug="default-provider-authorization-implicit-consent"
            ),
            invalidation_flow=Flow.objects.get(slug="default-provider-invalidation-flow"),
            internal_host=f"http://{self.host}",
            external_host="http://localhost",
        )
        # Ensure OAuth2 Params are set
        proxy.set_oauth_defaults()
        proxy.save()
        # we need to create an application to actually access the proxy
        Application.objects.create(name=generate_id(), slug=generate_id(), provider=proxy)
        outpost: Outpost = Outpost.objects.create(
            name=generate_id(),
            type=OutpostType.PROXY,
        )
        outpost.providers.add(proxy)
        outpost.build_user_permissions(outpost.user)
    
>       self.start_outpost(outpost)

tests/e2e/test_provider_proxy_forward.py:78: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_proxy_forward.TestProviderProxyForward testMethod=test_traefik>
outpost = <Outpost: Outpost yuTp05e5SCnJ9la70UnxBIiGibsNd5VeQwZoIpZk>

    def start_outpost(self, outpost: Outpost):
        """Start proxy container based on outpost created"""
>       self.run_container(
            image=self.get_container_image("ghcr.io/goauthentik/dev-proxy"),
            ports={
                "9000": "9000",
            },
            environment={
                "AUTHENTIK_TOKEN": outpost.token.key,
            },
            name="ak-test-outpost",
        )

tests/e2e/test_provider_proxy_forward.py:31: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_proxy_forward.TestProviderProxyForward testMethod=test_traefik>
specs = {'detach': True, 'environment': {'AUTHENTIK_HOST': 'http://10.1.0.71:47765', 'AUTHENTIK_TOKEN': 'CyvfLcLM5eb1EFT4dIbiA....io/goauthentik/dev-proxy:gh-main', 'labels': {'io.goauthentik.test': 'UJ8BAIzQTcuLL7IrLvDfriurgXvpz2lGbYwY4cUn'}, ...}
container = <Container: 5a175bf8a31d>
state = {'Dead': False, 'Error': '', 'ExitCode': 0, 'FinishedAt': '0001-01-01T00:00:00Z', ...}

    def run_container(self, **specs: Any) -> Container:
        if "network_mode" not in specs:
            specs["network"] = self.__network.name
        specs["labels"] = self.docker_labels
        specs["detach"] = True
        if hasattr(self, "live_server_url"):
            specs.setdefault("environment", {})
            specs["environment"]["AUTHENTIK_HOST"] = self.live_server_url
        container: Container = self.docker_client.containers.run(**specs)
        container.reload()
        state = container.attrs.get("State", {})
        if "Health" not in state:
            return container
>       self.wait_for_container(container)

tests/docker.py:83: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_proxy_forward.TestProviderProxyForward testMethod=test_traefik>
container = <Container: 5a175bf8a31d>

    def wait_for_container(self, container: Container) -> Container:
        """Check that container is health"""
        attempt = 0
        while True:
            container.reload()
            status = container.attrs.get("State", {}).get("Health", {}).get("Status")
            if status == "healthy":
                return container
            sleep(1)
            attempt += 1
            if attempt >= self.max_healthcheck_attempts:
                self.output_container_logs(container)
>               raise self.failureException("Container failed to start")
E               AssertionError: Container failed to start

tests/docker.py:57: AssertionError
tests.e2e.test_provider_proxy_forward.TestProviderProxyForward::test_envoy
Stack Traces | 53.9s run time
self = <unittest.case._Outcome object at 0x7f1221f18550>
test_case = <tests.e2e.test_provider_proxy_forward.TestProviderProxyForward testMethod=test_envoy>
subTest = False

    @contextlib.contextmanager
    def testPartExecutor(self, test_case, subTest=False):
        old_success = self.success
        self.success = True
        try:
>           yield

.../hostedtoolcache/Python/3.13.11........./x64/lib/python3.13/unittest/case.py:58: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_proxy_forward.TestProviderProxyForward testMethod=test_envoy>
result = <TestCaseFunction test_envoy>

    def run(self, result=None):
        if result is None:
            result = self.defaultTestResult()
            startTestRun = getattr(result, 'startTestRun', None)
            stopTestRun = getattr(result, 'stopTestRun', None)
            if startTestRun is not None:
                startTestRun()
        else:
            stopTestRun = None
    
        result.startTest(self)
        try:
            testMethod = getattr(self, self._testMethodName)
            if (getattr(self.__class__, "__unittest_skip__", False) or
                getattr(testMethod, "__unittest_skip__", False)):
                # If the class or method was skipped.
                skip_why = (getattr(self.__class__, '__unittest_skip_why__', '')
                            or getattr(testMethod, '__unittest_skip_why__', ''))
                _addSkip(result, self, skip_why)
                return result
    
            expecting_failure = (
                getattr(self, "__unittest_expecting_failure__", False) or
                getattr(testMethod, "__unittest_expecting_failure__", False)
            )
            outcome = _Outcome(result)
            start_time = time.perf_counter()
            try:
                self._outcome = outcome
    
                with outcome.testPartExecutor(self):
                    self._callSetUp()
                if outcome.success:
                    outcome.expecting_failure = expecting_failure
                    with outcome.testPartExecutor(self):
>                       self._callTestMethod(testMethod)

.../hostedtoolcache/Python/3.13.11........./x64/lib/python3.13/unittest/case.py:651: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_proxy_forward.TestProviderProxyForward testMethod=test_envoy>
method = <bound method TestProviderProxyForward.test_envoy of <tests.e2e.test_provider_proxy_forward.TestProviderProxyForward testMethod=test_envoy>>

    def _callTestMethod(self, method):
>       if method() is not None:

.../hostedtoolcache/Python/3.13.11........./x64/lib/python3.13/unittest/case.py:606: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_proxy_forward.TestProviderProxyForward testMethod=test_envoy>
args = (), kwargs = {}

    @wraps(func)
    def wrapper(self: TransactionTestCase, *args, **kwargs):
        """Run test again if we're below max_retries, including tearDown and
        setUp. Otherwise raise the error"""
        nonlocal count
        try:
>           return func(self, *args, **kwargs)

tests/e2e/utils.py:405: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_proxy_forward.TestProviderProxyForward testMethod=test_envoy>

    @retry()
    def test_envoy(self):
        """Test envoy"""
        self.run_container(
            image="docker.io/envoyproxy/envoy:v1.25-latest",
            ports={
                "10000": "80",
            },
            volumes={
                f"{Path(__file__).parent / "proxy_forward_auth" / "envoy_single" / "envoy.yaml"}": {
                    "bind": ".../etc/envoy/envoy.yaml",
                }
            },
        )
    
>       self.prepare()

tests/e2e/test_provider_proxy_forward.py:176: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

args = (<tests.e2e.test_provider_proxy_forward.TestProviderProxyForward testMethod=test_envoy>,)
kwargs = {}, file = 'default/flow-default-invalidation-flow.yaml'
content = 'version: 1\nmetadata:\n  name: Default - Invalidation flow\nentries:\n- attrs:\n    designation: invalidation\n    na...0\n    stage: !KeyOf default-invalidation-logout\n    target: !KeyOf flow\n  model: authentik_flows.flowstagebinding\n'

    @wraps(func)
    def wrapper(*args, **kwargs):
        for file in files:
            content = BlueprintInstance(path=file).retrieve()
            Importer.from_string(content).apply()
>       return func(*args, **kwargs)

.../blueprints/tests/__init__.py:25: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

args = (<tests.e2e.test_provider_proxy_forward.TestProviderProxyForward testMethod=test_envoy>,)
kwargs = {}, file = 'default/flow-default-provider-invalidation.yaml'
content = 'version: 1\nmetadata:\n  name: Default - Provider invalidation flow\nentries:\n- attrs:\n    designation: invalidatio...ation: none\n  identifiers:\n    slug: default-provider-invalidation-flow\n  model: authentik_flows.flow\n  id: flow\n'

    @wraps(func)
    def wrapper(*args, **kwargs):
        for file in files:
            content = BlueprintInstance(path=file).retrieve()
            Importer.from_string(content).apply()
>       return func(*args, **kwargs)

.../blueprints/tests/__init__.py:25: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

args = (<tests.e2e.test_provider_proxy_forward.TestProviderProxyForward testMethod=test_envoy>,)
kwargs = {}, file = 'system/providers-proxy.yaml'
content = 'version: 1\nmetadata:\n  labels:\n    blueprints.goauthentik.io/system: "true"\n  name: System - Proxy Provider - Sco...ser.group_attributes(request),\n                "is_superuser": request.user.is_superuser,\n            }\n        }\n'

    @wraps(func)
    def wrapper(*args, **kwargs):
        for file in files:
            content = BlueprintInstance(path=file).retrieve()
            Importer.from_string(content).apply()
>       return func(*args, **kwargs)

.../blueprints/tests/__init__.py:25: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

args = (<tests.e2e.test_provider_proxy_forward.TestProviderProxyForward testMethod=test_envoy>,)
kwargs = {}, config = <AuthentikCryptoConfig: authentik_crypto>

    @wraps(func)
    def wrapper(*args, **kwargs):
        config = apps.get_app_config(app_name)
        if isinstance(config, ManagedAppConfig):
            config._on_startup_callback(None)
>       return func(*args, **kwargs)

.../blueprints/tests/__init__.py:43: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_proxy_forward.TestProviderProxyForward testMethod=test_envoy>

    @apply_blueprint(
        "default/flow-default-authentication-flow.yaml",
        "default/flow-default-invalidation-flow.yaml",
    )
    @apply_blueprint(
        "default/flow-default-provider-authorization-implicit-consent.yaml",
        "default/flow-default-provider-invalidation.yaml",
    )
    @apply_blueprint(
        "system/providers-oauth2.yaml",
        "system/providers-proxy.yaml",
    )
    @reconcile_app("authentik_crypto")
    def prepare(self):
        proxy: ProxyProvider = ProxyProvider.objects.create(
            name=generate_id(),
            mode=ProxyMode.FORWARD_SINGLE,
            authorization_flow=Flow.objects.get(
                slug="default-provider-authorization-implicit-consent"
            ),
            invalidation_flow=Flow.objects.get(slug="default-provider-invalidation-flow"),
            internal_host=f"http://{self.host}",
            external_host="http://localhost",
        )
        # Ensure OAuth2 Params are set
        proxy.set_oauth_defaults()
        proxy.save()
        # we need to create an application to actually access the proxy
        Application.objects.create(name=generate_id(), slug=generate_id(), provider=proxy)
        outpost: Outpost = Outpost.objects.create(
            name=generate_id(),
            type=OutpostType.PROXY,
        )
        outpost.providers.add(proxy)
        outpost.build_user_permissions(outpost.user)
    
>       self.start_outpost(outpost)

tests/e2e/test_provider_proxy_forward.py:78: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_proxy_forward.TestProviderProxyForward testMethod=test_envoy>
outpost = <Outpost: Outpost lvU73KMJRWzMcm1ghUtyOhcrFokIcmIXYlG48hob>

    def start_outpost(self, outpost: Outpost):
        """Start proxy container based on outpost created"""
>       self.run_container(
            image=self.get_container_image("ghcr.io/goauthentik/dev-proxy"),
            ports={
                "9000": "9000",
            },
            environment={
                "AUTHENTIK_TOKEN": outpost.token.key,
            },
            name="ak-test-outpost",
        )

tests/e2e/test_provider_proxy_forward.py:31: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_proxy_forward.TestProviderProxyForward testMethod=test_envoy>
specs = {'detach': True, 'environment': {'AUTHENTIK_HOST': 'http://10.1.0.71:47765', 'AUTHENTIK_TOKEN': 'iKYtUgz71tZgfmCIvhYqQ....io/goauthentik/dev-proxy:gh-main', 'labels': {'io.goauthentik.test': 'UJ8BAIzQTcuLL7IrLvDfriurgXvpz2lGbYwY4cUn'}, ...}
container = <Container: 970697c4ca0a>
state = {'Dead': False, 'Error': '', 'ExitCode': 0, 'FinishedAt': '0001-01-01T00:00:00Z', ...}

    def run_container(self, **specs: Any) -> Container:
        if "network_mode" not in specs:
            specs["network"] = self.__network.name
        specs["labels"] = self.docker_labels
        specs["detach"] = True
        if hasattr(self, "live_server_url"):
            specs.setdefault("environment", {})
            specs["environment"]["AUTHENTIK_HOST"] = self.live_server_url
        container: Container = self.docker_client.containers.run(**specs)
        container.reload()
        state = container.attrs.get("State", {})
        if "Health" not in state:
            return container
>       self.wait_for_container(container)

tests/docker.py:83: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_proxy_forward.TestProviderProxyForward testMethod=test_envoy>
container = <Container: 970697c4ca0a>

    def wait_for_container(self, container: Container) -> Container:
        """Check that container is health"""
        attempt = 0
        while True:
            container.reload()
            status = container.attrs.get("State", {}).get("Health", {}).get("Status")
            if status == "healthy":
                return container
            sleep(1)
            attempt += 1
            if attempt >= self.max_healthcheck_attempts:
                self.output_container_logs(container)
>               raise self.failureException("Container failed to start")
E               AssertionError: Container failed to start

tests/docker.py:57: AssertionError
tests.e2e.test_provider_proxy.TestProviderProxy::test_proxy_simple
Stack Traces | 54.8s run time
self = <unittest.case._Outcome object at 0x7f12226d9550>
test_case = <tests.e2e.test_provider_proxy.TestProviderProxy testMethod=test_proxy_simple>
subTest = False

    @contextlib.contextmanager
    def testPartExecutor(self, test_case, subTest=False):
        old_success = self.success
        self.success = True
        try:
>           yield

.../hostedtoolcache/Python/3.13.11........./x64/lib/python3.13/unittest/case.py:58: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_proxy.TestProviderProxy testMethod=test_proxy_simple>
result = <TestCaseFunction test_proxy_simple>

    def run(self, result=None):
        if result is None:
            result = self.defaultTestResult()
            startTestRun = getattr(result, 'startTestRun', None)
            stopTestRun = getattr(result, 'stopTestRun', None)
            if startTestRun is not None:
                startTestRun()
        else:
            stopTestRun = None
    
        result.startTest(self)
        try:
            testMethod = getattr(self, self._testMethodName)
            if (getattr(self.__class__, "__unittest_skip__", False) or
                getattr(testMethod, "__unittest_skip__", False)):
                # If the class or method was skipped.
                skip_why = (getattr(self.__class__, '__unittest_skip_why__', '')
                            or getattr(testMethod, '__unittest_skip_why__', ''))
                _addSkip(result, self, skip_why)
                return result
    
            expecting_failure = (
                getattr(self, "__unittest_expecting_failure__", False) or
                getattr(testMethod, "__unittest_expecting_failure__", False)
            )
            outcome = _Outcome(result)
            start_time = time.perf_counter()
            try:
                self._outcome = outcome
    
                with outcome.testPartExecutor(self):
                    self._callSetUp()
                if outcome.success:
                    outcome.expecting_failure = expecting_failure
                    with outcome.testPartExecutor(self):
>                       self._callTestMethod(testMethod)

.../hostedtoolcache/Python/3.13.11........./x64/lib/python3.13/unittest/case.py:651: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_proxy.TestProviderProxy testMethod=test_proxy_simple>
method = <bound method TestProviderProxy.test_proxy_simple of <tests.e2e.test_provider_proxy.TestProviderProxy testMethod=test_proxy_simple>>

    def _callTestMethod(self, method):
>       if method() is not None:

.../hostedtoolcache/Python/3.13.11........./x64/lib/python3.13/unittest/case.py:606: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_proxy.TestProviderProxy testMethod=test_proxy_simple>
args = (), kwargs = {}

    @wraps(func)
    def wrapper(self: TransactionTestCase, *args, **kwargs):
        """Run test again if we're below max_retries, including tearDown and
        setUp. Otherwise raise the error"""
        nonlocal count
        try:
>           return func(self, *args, **kwargs)

tests/e2e/utils.py:405: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

args = (<tests.e2e.test_provider_proxy.TestProviderProxy testMethod=test_proxy_simple>,)
kwargs = {}, file = 'default/flow-default-invalidation-flow.yaml'
content = 'version: 1\nmetadata:\n  name: Default - Invalidation flow\nentries:\n- attrs:\n    designation: invalidation\n    na...0\n    stage: !KeyOf default-invalidation-logout\n    target: !KeyOf flow\n  model: authentik_flows.flowstagebinding\n'

    @wraps(func)
    def wrapper(*args, **kwargs):
        for file in files:
            content = BlueprintInstance(path=file).retrieve()
            Importer.from_string(content).apply()
>       return func(*args, **kwargs)

.../blueprints/tests/__init__.py:25: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

args = (<tests.e2e.test_provider_proxy.TestProviderProxy testMethod=test_proxy_simple>,)
kwargs = {}, file = 'default/flow-default-provider-invalidation.yaml'
content = 'version: 1\nmetadata:\n  name: Default - Provider invalidation flow\nentries:\n- attrs:\n    designation: invalidatio...ation: none\n  identifiers:\n    slug: default-provider-invalidation-flow\n  model: authentik_flows.flow\n  id: flow\n'

    @wraps(func)
    def wrapper(*args, **kwargs):
        for file in files:
            content = BlueprintInstance(path=file).retrieve()
            Importer.from_string(content).apply()
>       return func(*args, **kwargs)

.../blueprints/tests/__init__.py:25: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

args = (<tests.e2e.test_provider_proxy.TestProviderProxy testMethod=test_proxy_simple>,)
kwargs = {}, file = 'system/providers-proxy.yaml'
content = 'version: 1\nmetadata:\n  labels:\n    blueprints.goauthentik.io/system: "true"\n  name: System - Proxy Provider - Sco...ser.group_attributes(request),\n                "is_superuser": request.user.is_superuser,\n            }\n        }\n'

    @wraps(func)
    def wrapper(*args, **kwargs):
        for file in files:
            content = BlueprintInstance(path=file).retrieve()
            Importer.from_string(content).apply()
>       return func(*args, **kwargs)

.../blueprints/tests/__init__.py:25: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

args = (<tests.e2e.test_provider_proxy.TestProviderProxy testMethod=test_proxy_simple>,)
kwargs = {}, config = <AuthentikCryptoConfig: authentik_crypto>

    @wraps(func)
    def wrapper(*args, **kwargs):
        config = apps.get_app_config(app_name)
        if isinstance(config, ManagedAppConfig):
            config._on_startup_callback(None)
>       return func(*args, **kwargs)

.../blueprints/tests/__init__.py:43: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_proxy.TestProviderProxy testMethod=test_proxy_simple>

    @retry()
    @apply_blueprint(
        "default/flow-default-authentication-flow.yaml",
        "default/flow-default-invalidation-flow.yaml",
    )
    @apply_blueprint(
        "default/flow-default-provider-authorization-implicit-consent.yaml",
        "default/flow-default-provider-invalidation.yaml",
    )
    @apply_blueprint(
        "system/providers-oauth2.yaml",
        "system/providers-proxy.yaml",
    )
    @reconcile_app("authentik_crypto")
    def test_proxy_simple(self):
        """Test simple outpost setup with single provider"""
        # set additionalHeaders to test later
        self.user.attributes["additionalHeaders"] = {"X-Foo": "bar"}
        self.user.save()
    
        proxy: ProxyProvider = ProxyProvider.objects.create(
            name=generate_id(),
            authorization_flow=Flow.objects.get(
                slug="default-provider-authorization-implicit-consent"
            ),
            invalidation_flow=Flow.objects.get(slug="default-provider-invalidation-flow"),
            internal_host=f"http://{self.host}",
            external_host="http://localhost:9000",
        )
        # Ensure OAuth2 Params are set
        proxy.set_oauth_defaults()
        proxy.save()
        # we need to create an application to actually access the proxy
        Application.objects.create(name=generate_id(), slug=generate_id(), provider=proxy)
        outpost: Outpost = Outpost.objects.create(
            name=generate_id(),
            type=OutpostType.PROXY,
        )
        outpost.providers.add(proxy)
        outpost.build_user_permissions(outpost.user)
    
>       self.start_proxy(outpost)

tests/e2e/test_provider_proxy.py:89: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_proxy.TestProviderProxy testMethod=test_proxy_simple>
outpost = <Outpost: Outpost lIE8Ixv8hWNJXUV0u4DFuyknw7eREvbJsm61NaWn>

    def start_proxy(self, outpost: Outpost):
        """Start proxy container based on outpost created"""
>       self.run_container(
            image=self.get_container_image("ghcr.io/goauthentik/dev-proxy"),
            ports={
                "9000": "9000",
            },
            environment={
                "AUTHENTIK_TOKEN": outpost.token.key,
            },
        )

tests/e2e/test_provider_proxy.py:38: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_proxy.TestProviderProxy testMethod=test_proxy_simple>
specs = {'detach': True, 'environment': {'AUTHENTIK_HOST': 'http://10.1.0.71:45991', 'AUTHENTIK_TOKEN': 'buStl4rqqxpmGytLtrrzR....io/goauthentik/dev-proxy:gh-main', 'labels': {'io.goauthentik.test': 'UJ8BAIzQTcuLL7IrLvDfriurgXvpz2lGbYwY4cUn'}, ...}
container = <Container: c051ffcc623a>
state = {'Dead': False, 'Error': '', 'ExitCode': 0, 'FinishedAt': '0001-01-01T00:00:00Z', ...}

    def run_container(self, **specs: Any) -> Container:
        if "network_mode" not in specs:
            specs["network"] = self.__network.name
        specs["labels"] = self.docker_labels
        specs["detach"] = True
        if hasattr(self, "live_server_url"):
            specs.setdefault("environment", {})
            specs["environment"]["AUTHENTIK_HOST"] = self.live_server_url
        container: Container = self.docker_client.containers.run(**specs)
        container.reload()
        state = container.attrs.get("State", {})
        if "Health" not in state:
            return container
>       self.wait_for_container(container)

tests/docker.py:83: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_proxy.TestProviderProxy testMethod=test_proxy_simple>
container = <Container: c051ffcc623a>

    def wait_for_container(self, container: Container) -> Container:
        """Check that container is health"""
        attempt = 0
        while True:
            container.reload()
            status = container.attrs.get("State", {}).get("Health", {}).get("Status")
            if status == "healthy":
                return container
            sleep(1)
            attempt += 1
            if attempt >= self.max_healthcheck_attempts:
                self.output_container_logs(container)
>               raise self.failureException("Container failed to start")
E               AssertionError: Container failed to start

tests/docker.py:57: AssertionError
tests.e2e.test_provider_radius.TestProviderRadius::test_radius_bind_fail
Stack Traces | 149s run time
self = <unittest.case._Outcome object at 0x7fc498fa6f90>
test_case = <tests.e2e.test_provider_radius.TestProviderRadius testMethod=test_radius_bind_fail>
subTest = False

    @contextlib.contextmanager
    def testPartExecutor(self, test_case, subTest=False):
        old_success = self.success
        self.success = True
        try:
>           yield

.../hostedtoolcache/Python/3.13.11........./x64/lib/python3.13/unittest/case.py:58: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_radius.TestProviderRadius testMethod=test_radius_bind_fail>
result = <TestCaseFunction test_radius_bind_fail>

    def run(self, result=None):
        if result is None:
            result = self.defaultTestResult()
            startTestRun = getattr(result, 'startTestRun', None)
            stopTestRun = getattr(result, 'stopTestRun', None)
            if startTestRun is not None:
                startTestRun()
        else:
            stopTestRun = None
    
        result.startTest(self)
        try:
            testMethod = getattr(self, self._testMethodName)
            if (getattr(self.__class__, "__unittest_skip__", False) or
                getattr(testMethod, "__unittest_skip__", False)):
                # If the class or method was skipped.
                skip_why = (getattr(self.__class__, '__unittest_skip_why__', '')
                            or getattr(testMethod, '__unittest_skip_why__', ''))
                _addSkip(result, self, skip_why)
                return result
    
            expecting_failure = (
                getattr(self, "__unittest_expecting_failure__", False) or
                getattr(testMethod, "__unittest_expecting_failure__", False)
            )
            outcome = _Outcome(result)
            start_time = time.perf_counter()
            try:
                self._outcome = outcome
    
                with outcome.testPartExecutor(self):
                    self._callSetUp()
                if outcome.success:
                    outcome.expecting_failure = expecting_failure
                    with outcome.testPartExecutor(self):
>                       self._callTestMethod(testMethod)

.../hostedtoolcache/Python/3.13.11........./x64/lib/python3.13/unittest/case.py:651: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_radius.TestProviderRadius testMethod=test_radius_bind_fail>
method = <bound method TestProviderRadius.test_radius_bind_fail of <tests.e2e.test_provider_radius.TestProviderRadius testMethod=test_radius_bind_fail>>

    def _callTestMethod(self, method):
>       if method() is not None:

.../hostedtoolcache/Python/3.13.11........./x64/lib/python3.13/unittest/case.py:606: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_radius.TestProviderRadius testMethod=test_radius_bind_fail>
args = (), kwargs = {}

    @wraps(func)
    def wrapper(self: TransactionTestCase, *args, **kwargs):
        """Run test again if we're below max_retries, including tearDown and
        setUp. Otherwise raise the error"""
        nonlocal count
        try:
>           return func(self, *args, **kwargs)

tests/e2e/utils.py:405: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

args = (<tests.e2e.test_provider_radius.TestProviderRadius testMethod=test_radius_bind_fail>,)
kwargs = {}, file = 'default/flow-default-invalidation-flow.yaml'
content = 'version: 1\nmetadata:\n  name: Default - Invalidation flow\nentries:\n- attrs:\n    designation: invalidation\n    na...0\n    stage: !KeyOf default-invalidation-logout\n    target: !KeyOf flow\n  model: authentik_flows.flowstagebinding\n'

    @wraps(func)
    def wrapper(*args, **kwargs):
        for file in files:
            content = BlueprintInstance(path=file).retrieve()
            Importer.from_string(content).apply()
>       return func(*args, **kwargs)

.../blueprints/tests/__init__.py:25: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_radius.TestProviderRadius testMethod=test_radius_bind_fail>

    @retry(exceptions=[Timeout])
    @apply_blueprint(
        "default/flow-default-authentication-flow.yaml",
        "default/flow-default-invalidation-flow.yaml",
    )
    def test_radius_bind_fail(self):
        """Test simple bind (failed)"""
>       self._prepare()

tests/e2e/test_provider_radius.py:86: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_radius.TestProviderRadius testMethod=test_radius_bind_fail>

    def _prepare(self) -> User:
        """prepare user, provider, app and container"""
        radius: RadiusProvider = RadiusProvider.objects.create(
            name=generate_id(),
            authorization_flow=Flow.objects.get(slug="default-authentication-flow"),
            shared_secret=self.shared_secret,
        )
        # we need to create an application to actually access radius
        Application.objects.create(name=generate_id(), slug=generate_id(), provider=radius)
        outpost: Outpost = Outpost.objects.create(
            name=generate_id(),
            type=OutpostType.RADIUS,
            _config=asdict(OutpostConfig(log_level="debug")),
        )
        outpost.providers.add(radius)
    
>       self.start_radius(outpost)

tests/e2e/test_provider_radius.py:52: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_radius.TestProviderRadius testMethod=test_radius_bind_fail>
outpost = <Outpost: Outpost cAJnk0qsrkeC087D2bigFqgJVDCX2DnzYDKSj2H9>

    def start_radius(self, outpost: Outpost):
        """Start radius container based on outpost created"""
>       self.run_container(
            image=self.get_container_image("ghcr.io/goauthentik/dev-radius"),
            ports={"1812/udp": 1812},
            environment={
                "AUTHENTIK_TOKEN": outpost.token.key,
            },
        )

tests/e2e/test_provider_radius.py:28: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_radius.TestProviderRadius testMethod=test_radius_bind_fail>
specs = {'detach': True, 'environment': {'AUTHENTIK_HOST': 'http://10.1.0.149:54631', 'AUTHENTIK_TOKEN': 'ou3uV6BTujtbNs6TxoUR...io/goauthentik/dev-radius:gh-main', 'labels': {'io.goauthentik.test': 'qGi1bl8PFMH64Xr6MZAPkgjhG8UCyQ3B1Kkj1P9K'}, ...}
container = <Container: cc7108b5cb92>
state = {'Dead': False, 'Error': '', 'ExitCode': 0, 'FinishedAt': '0001-01-01T00:00:00Z', ...}

    def run_container(self, **specs: Any) -> Container:
        if "network_mode" not in specs:
            specs["network"] = self.__network.name
        specs["labels"] = self.docker_labels
        specs["detach"] = True
        if hasattr(self, "live_server_url"):
            specs.setdefault("environment", {})
            specs["environment"]["AUTHENTIK_HOST"] = self.live_server_url
        container: Container = self.docker_client.containers.run(**specs)
        container.reload()
        state = container.attrs.get("State", {})
        if "Health" not in state:
            return container
>       self.wait_for_container(container)

tests/docker.py:83: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_radius.TestProviderRadius testMethod=test_radius_bind_fail>
container = <Container: cc7108b5cb92>

    def wait_for_container(self, container: Container) -> Container:
        """Check that container is health"""
        attempt = 0
        while True:
            container.reload()
            status = container.attrs.get("State", {}).get("Health", {}).get("Status")
            if status == "healthy":
                return container
            sleep(1)
            attempt += 1
            if attempt >= self.max_healthcheck_attempts:
                self.output_container_logs(container)
>               raise self.failureException("Container failed to start")
E               AssertionError: Container failed to start

tests/docker.py:57: AssertionError

To view more test analytics, go to the Test Analytics Dashboard
📋 Got 3 mins? Take this short survey to help us improve Test Analytics.

@dewi-tik dewi-tik enabled auto-merge (squash) January 27, 2026 09:30
@BeryJu BeryJu disabled auto-merge January 27, 2026 16:45
@BeryJu BeryJu merged commit 73ac3f6 into version-2025.12 Jan 27, 2026
134 of 155 checks passed
@BeryJu BeryJu deleted the cherry-pick/19735-to-version-2025.12 branch January 27, 2026 16:45
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants