Skip to content

ci: fix binary outpost build on release (cherry-pick #20248 to version-2025.10)#20281

Merged
rissson merged 1 commit intoversion-2025.10from
cherry-pick-20248-version-2025.10
Feb 13, 2026
Merged

ci: fix binary outpost build on release (cherry-pick #20248 to version-2025.10)#20281
rissson merged 1 commit intoversion-2025.10from
cherry-pick-20248-version-2025.10

Conversation

@rissson
Copy link
Member

@rissson rissson commented Feb 13, 2026

Details

REPLACE ME


Checklist

  • Local tests pass (ak test authentik/)
  • The code has been formatted (make lint-fix)

If an API change has been made

  • The API schema has been updated (make gen-build)

If changes to the frontend have been made

  • The code has been formatted (make web)

If applicable

  • The documentation has been updated
  • The documentation has been formatted (make docs)

@rissson rissson self-assigned this Feb 13, 2026
@rissson rissson requested a review from a team as a code owner February 13, 2026 12:37
@rissson rissson merged commit 06b3d71 into version-2025.10 Feb 13, 2026
50 of 55 checks passed
@rissson rissson deleted the cherry-pick-20248-version-2025.10 branch February 13, 2026 12:38
@netlify
Copy link

netlify bot commented Feb 13, 2026

Deploy Preview for authentik-docs ready!

Name Link
🔨 Latest commit ccdd3e6
🔍 Latest deploy log https://app.netlify.com/projects/authentik-docs/deploys/698f1b01cbc5c800087ceb7c
😎 Deploy Preview https://deploy-preview-20281--authentik-docs.netlify.app
📱 Preview on mobile
Toggle QR Code...

QR Code

Use your smartphone camera to open QR code link.

To edit notification comments on pull requests, go to your Netlify project configuration.

@netlify
Copy link

netlify bot commented Feb 13, 2026

Deploy Preview for authentik-integrations ready!

Name Link
🔨 Latest commit ccdd3e6
🔍 Latest deploy log https://app.netlify.com/projects/authentik-integrations/deploys/698f1b01922c28000848c079
😎 Deploy Preview https://deploy-preview-20281--authentik-integrations.netlify.app
📱 Preview on mobile
Toggle QR Code...

QR Code

Use your smartphone camera to open QR code link.

To edit notification comments on pull requests, go to your Netlify project configuration.

@codecov
Copy link

codecov bot commented Feb 13, 2026

❌ 11 Tests Failed:

Tests completed Failed Passed Skipped
1748 11 1737 2
View the top 3 failed test(s) by shortest run time
tests.integration.test_proxy_docker.TestProxyDocker::test_docker_static
Stack Traces | 8.74s run time
self = <django.db.backends.utils.CursorWrapper object at 0x7f507bb14d10>
sql = 'INSERT INTO "authentik_core_provider" ("name", "authentication_flow_id", "authorization_flow_id", "invalidation_flow_...ackchannel_application_id", "is_backchannel") VALUES (%s, %s, %s, %s, %s, %s) RETURNING "authentik_core_provider"."id"'
params = ('test', None, UUID('f89ce6cf-106d-4558-b475-835209cf5a3e'), None, None, False)
ignored_wrapper_args = (False, {'connection': <DatabaseWrapper vendor='postgresql' alias='default'>, 'cursor': <django.db.backends.utils.CursorWrapper object at 0x7f507bb14d10>})

    def _execute(self, sql, params, *ignored_wrapper_args):
        # Raise a warning during app initialization (stored_app_configs is only
        # ever set during testing).
        if not apps.ready and not apps.stored_app_configs:
            warnings.warn(self.APPS_NOT_READY_WARNING_MSG, category=RuntimeWarning)
        self.db.validate_no_broken_transaction()
        with self.db.wrap_database_errors:
            if params is None:
                # params default might be backend specific.
                return self.cursor.execute(sql)
            else:
>               return self.cursor.execute(sql, params)

.venv/lib/python3.13.../db/backends/utils.py:105: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <django_prometheus.db.common.ExportingCursorWrapper.<locals>.CursorWrapper [closed] [BAD] at 0x7f507b1a4a10>
args = ('INSERT INTO "authentik_core_provider" ("name", "authentication_flow_id", "authorization_flow_id", "invalidation_flow...RNING "authentik_core_provider"."id"', ('test', None, UUID('f89ce6cf-106d-4558-b475-835209cf5a3e'), None, None, False))
kwargs = {}

    def execute(self, *args, **kwargs):
        execute_total.labels(alias, vendor).inc()
        with (
            query_duration_seconds.labels(**labels).time(),
            ExceptionCounterByType(errors_total, extra_labels=labels),
        ):
>           return super().execute(*args, **kwargs)

.venv/lib/python3.13.../django_prometheus/db/common.py:69: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <django_prometheus.db.common.ExportingCursorWrapper.<locals>.CursorWrapper [closed] [BAD] at 0x7f507b1a4a10>
query = 'INSERT INTO "authentik_core_provider" ("name", "authentication_flow_id", "authorization_flow_id", "invalidation_flow_...ackchannel_application_id", "is_backchannel") VALUES (%s, %s, %s, %s, %s, %s) RETURNING "authentik_core_provider"."id"'
params = ('test', None, UUID('f89ce6cf-106d-4558-b475-835209cf5a3e'), None, None, False)

    def execute(
        self,
        query: Query,
        params: Params | None = None,
        *,
        prepare: bool | None = None,
        binary: bool | None = None,
    ) -> Self:
        """
        Execute a query or command to the database.
        """
        try:
            with self._conn.lock:
                self._conn.wait(
                    self._execute_gen(query, params, prepare=prepare, binary=binary)
                )
        except e._NO_TRACEBACK as ex:
>           raise ex.with_traceback(None)
E           psycopg.errors.UniqueViolation: duplicate key value violates unique constraint "authentik_core_provider_name_66fc4ef4_uniq"
E           DETAIL:  Key (name)=(test) already exists.

.venv/lib/python3.13....../site-packages/psycopg/cursor.py:97: UniqueViolation

The above exception was the direct cause of the following exception:

self = <unittest.case._Outcome object at 0x7f507c07ca50>
test_case = <tests.integration.test_proxy_docker.TestProxyDocker testMethod=test_docker_static>
subTest = False

    @contextlib.contextmanager
    def testPartExecutor(self, test_case, subTest=False):
        old_success = self.success
        self.success = True
        try:
>           yield

.../hostedtoolcache/Python/3.13.12........./x64/lib/python3.13/unittest/case.py:58: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.integration.test_proxy_docker.TestProxyDocker testMethod=test_docker_static>
result = <TestCaseFunction test_docker_static>

    def run(self, result=None):
        if result is None:
            result = self.defaultTestResult()
            startTestRun = getattr(result, 'startTestRun', None)
            stopTestRun = getattr(result, 'stopTestRun', None)
            if startTestRun is not None:
                startTestRun()
        else:
            stopTestRun = None
    
        result.startTest(self)
        try:
            testMethod = getattr(self, self._testMethodName)
            if (getattr(self.__class__, "__unittest_skip__", False) or
                getattr(testMethod, "__unittest_skip__", False)):
                # If the class or method was skipped.
                skip_why = (getattr(self.__class__, '__unittest_skip_why__', '')
                            or getattr(testMethod, '__unittest_skip_why__', ''))
                _addSkip(result, self, skip_why)
                return result
    
            expecting_failure = (
                getattr(self, "__unittest_expecting_failure__", False) or
                getattr(testMethod, "__unittest_expecting_failure__", False)
            )
            outcome = _Outcome(result)
            start_time = time.perf_counter()
            try:
                self._outcome = outcome
    
                with outcome.testPartExecutor(self):
>                   self._callSetUp()

.../hostedtoolcache/Python/3.13.12........./x64/lib/python3.13/unittest/case.py:647: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.integration.test_proxy_docker.TestProxyDocker testMethod=test_docker_static>

    def _callSetUp(self):
>       self.setUp()

.../hostedtoolcache/Python/3.13.12........./x64/lib/python3.13/unittest/case.py:603: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.integration.test_proxy_docker.TestProxyDocker testMethod=test_docker_static>

    def setUp(self):
        super().setUp()
        self.ssl_folder = mkdtemp()
        self.run_container(
            image="docker.io/library/docker:28.5.2-dind-alpine3.22",
            network_mode="host",
            privileged=True,
            healthcheck=Healthcheck(
                test=["CMD", "docker", "info"],
                interval=5 * 1_000 * 1_000_000,
                start_period=5 * 1_000 * 1_000_000,
            ),
            environment={"DOCKER_TLS_CERTDIR": "/ssl"},
            volumes={
                f"{self.ssl_folder}/": {
                    "bind": "/ssl",
                }
            },
        )
        # Ensure that local connection have been created
        outpost_connection_discovery.send()
>       self.provider: ProxyProvider = ProxyProvider.objects.create(
            name="test",
            internal_host="http://localhost",
            external_host="http://localhost",
            authorization_flow=create_test_flow(),
        )

tests/integration/test_proxy_docker.py:50: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <model_utils.managers.InheritanceManager object at 0x7f5081ab43d0>
args = ()
kwargs = {'authorization_flow': <Flow: Flow edq5foxn7s (edq5foxn7s)>, 'external_host': 'http://localhost', 'internal_host': 'http://localhost', 'name': 'test'}

    @wraps(method)
    def manager_method(self, *args, **kwargs):
>       return getattr(self.get_queryset(), name)(*args, **kwargs)

.venv/lib/python3.13.../db/models/manager.py:87: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <InheritanceQuerySet []>
kwargs = {'authorization_flow': <Flow: Flow edq5foxn7s (edq5foxn7s)>, 'external_host': 'http://localhost', 'internal_host': 'http://localhost', 'name': 'test'}
reverse_one_to_one_fields = frozenset()
obj = <ProxyProvider: Proxy Provider test>

    def create(self, **kwargs):
        """
        Create a new object with the given kwargs, saving it to the database
        and returning the created object.
        """
        reverse_one_to_one_fields = frozenset(kwargs).intersection(
            self.model._meta._reverse_one_to_one_field_names
        )
        if reverse_one_to_one_fields:
            raise ValueError(
                "The following fields do not exist in this model: %s"
                % ", ".join(reverse_one_to_one_fields)
            )
    
        obj = self.model(**kwargs)
        self._for_write = True
>       obj.save(force_insert=True, using=self.db)

.venv/lib/python3.13.../db/models/query.py:665: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <ProxyProvider: Proxy Provider test>, force_insert = True
force_update = False, using = 'default', update_fields = None

    def save(
        self,
        *args,
        force_insert=False,
        force_update=False,
        using=None,
        update_fields=None,
    ):
        """
        Save the current instance. Override this in a subclass if you want to
        control the saving process.
    
        The 'force_insert' and 'force_update' parameters can be used to insist
        that the "save" must be an SQL insert or update (or equivalent for
        non-SQL backends), respectively. Normally, they should not be set.
        """
        # RemovedInDjango60Warning.
        if args:
            force_insert, force_update, using, update_fields = self._parse_save_params(
                *args,
                method_name="save",
                force_insert=force_insert,
                force_update=force_update,
                using=using,
                update_fields=update_fields,
            )
    
        self._prepare_related_fields_for_save(operation_name="save")
    
        using = using or router.db_for_write(self.__class__, instance=self)
        if force_insert and (force_update or update_fields):
            raise ValueError("Cannot force both insert and updating in model saving.")
    
        deferred_non_generated_fields = {
            f.attname
            for f in self._meta.concrete_fields
            if f.attname not in self.__dict__ and f.generated is False
        }
        if update_fields is not None:
            # If update_fields is empty, skip the save. We do also check for
            # no-op saves later on for inheritance cases. This bailout is
            # still needed for skipping signal sending.
            if not update_fields:
                return
    
            update_fields = frozenset(update_fields)
            field_names = self._meta._non_pk_concrete_field_names
            not_updatable_fields = update_fields.difference(field_names)
    
            if not_updatable_fields:
                raise ValueError(
                    "The following fields do not exist in this model, are m2m "
                    "fields, primary keys, or are non-concrete fields: %s"
                    % ", ".join(not_updatable_fields)
                )
    
        # If saving to the same database, and this model is deferred, then
        # automatically do an "update_fields" save on the loaded fields.
        elif (
            not force_insert
            and deferred_non_generated_fields
            and using == self._state.db
        ):
            field_names = set()
            pk_fields = self._meta.pk_fields
            for field in self._meta.concrete_fields:
                if field not in pk_fields and not hasattr(field, "through"):
                    field_names.add(field.attname)
            loaded_fields = field_names.difference(deferred_non_generated_fields)
            if loaded_fields:
                update_fields = frozenset(loaded_fields)
    
>       self.save_base(
            using=using,
            force_insert=force_insert,
            force_update=force_update,
            update_fields=update_fields,
        )

.venv/lib/python3.13.../db/models/base.py:902: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <ProxyProvider: Proxy Provider test>, raw = False
force_insert = (<class 'authentik.providers.proxy.models.ProxyProvider'>,)
force_update = False, using = 'default', update_fields = None

    def save_base(
        self,
        raw=False,
        force_insert=False,
        force_update=False,
        using=None,
        update_fields=None,
    ):
        """
        Handle the parts of saving which should be done only once per save,
        yet need to be done in raw saves, too. This includes some sanity
        checks and signal sending.
    
        The 'raw' argument is telling save_base not to save any parent
        models and not to do any changes to the values before save. This
        is used by fixture loading.
        """
        using = using or router.db_for_write(self.__class__, instance=self)
        assert not (force_insert and (force_update or update_fields))
        assert update_fields is None or update_fields
        cls = origin = self.__class__
        # Skip proxies, but keep the origin as the proxy model.
        if cls._meta.proxy:
            cls = cls._meta.concrete_model
        meta = cls._meta
        if not meta.auto_created:
            pre_save.send(
                sender=origin,
                instance=self,
                raw=raw,
                using=using,
                update_fields=update_fields,
            )
        # A transaction isn't needed if one query is issued.
        if meta.parents:
            context_manager = transaction.atomic(using=using, savepoint=False)
        else:
            context_manager = transaction.mark_for_rollback_on_error(using=using)
        with context_manager:
            parent_inserted = False
            if not raw:
                # Validate force insert only when parents are inserted.
                force_insert = self._validate_force_insert(force_insert)
>               parent_inserted = self._save_parents(
                    cls, using, update_fields, force_insert
                )

.venv/lib/python3.13.../db/models/base.py:1005: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <ProxyProvider: Proxy Provider test>
cls = <class 'authentik.providers.proxy.models.ProxyProvider'>
using = 'default', update_fields = None
force_insert = (<class 'authentik.providers.proxy.models.ProxyProvider'>,)
updated_parents = {}

    def _save_parents(
        self, cls, using, update_fields, force_insert, updated_parents=None
    ):
        """Save all the parents of cls using values from self."""
        meta = cls._meta
        inserted = False
        if updated_parents is None:
            updated_parents = {}
        for parent, field in meta.parents.items():
            # Make sure the link fields are synced between parent and self.
            if (
                field
                and getattr(self, parent._meta.pk.attname) is None
                and getattr(self, field.attname) is not None
            ):
                setattr(self, parent._meta.pk.attname, getattr(self, field.attname))
            if (parent_updated := updated_parents.get(parent)) is None:
>               parent_inserted = self._save_parents(
                    cls=parent,
                    using=using,
                    update_fields=update_fields,
                    force_insert=force_insert,
                    updated_parents=updated_parents,
                )

.venv/lib/python3.13.../db/models/base.py:1051: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <ProxyProvider: Proxy Provider test>
cls = <class 'authentik.providers.oauth2.models.OAuth2Provider'>
using = 'default', update_fields = None
force_insert = (<class 'authentik.providers.proxy.models.ProxyProvider'>,)
updated_parents = {}

    def _save_parents(
        self, cls, using, update_fields, force_insert, updated_parents=None
    ):
        """Save all the parents of cls using values from self."""
        meta = cls._meta
        inserted = False
        if updated_parents is None:
            updated_parents = {}
        for parent, field in meta.parents.items():
            # Make sure the link fields are synced between parent and self.
            if (
                field
                and getattr(self, parent._meta.pk.attname) is None
                and getattr(self, field.attname) is not None
            ):
                setattr(self, parent._meta.pk.attname, getattr(self, field.attname))
            if (parent_updated := updated_parents.get(parent)) is None:
                parent_inserted = self._save_parents(
                    cls=parent,
                    using=using,
                    update_fields=update_fields,
                    force_insert=force_insert,
                    updated_parents=updated_parents,
                )
>               updated = self._save_table(
                    cls=parent,
                    using=using,
                    update_fields=update_fields,
                    force_insert=parent_inserted or issubclass(parent, force_insert),
                )

.venv/lib/python3.13.../db/models/base.py:1058: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <ProxyProvider: Proxy Provider test>, raw = False
cls = <class 'authentik.core.models.Provider'>, force_insert = False
force_update = False, using = 'default', update_fields = None

    def _save_table(
        self,
        raw=False,
        cls=None,
        force_insert=False,
        force_update=False,
        using=None,
        update_fields=None,
    ):
        """
        Do the heavy-lifting involved in saving. Update or insert the data
        for a single table.
        """
        meta = cls._meta
        pk_fields = meta.pk_fields
        non_pks_non_generated = [
            f
            for f in meta.local_concrete_fields
            if f not in pk_fields and not f.generated
        ]
    
        if update_fields:
            non_pks_non_generated = [
                f
                for f in non_pks_non_generated
                if f.name in update_fields or f.attname in update_fields
            ]
    
        if not self._is_pk_set(meta):
            pk_val = meta.pk.get_pk_value_on_save(self)
            setattr(self, meta.pk.attname, pk_val)
        pk_set = self._is_pk_set(meta)
        if not pk_set and (force_update or update_fields):
            raise ValueError("Cannot force an update in save() with no primary key.")
        updated = False
        # Skip an UPDATE when adding an instance and primary key has a default.
        if (
            not raw
            and not force_insert
            and not force_update
            and self._state.adding
            and all(f.has_default() or f.has_db_default() for f in meta.pk_fields)
        ):
            force_insert = True
        # If possible, try an UPDATE. If that doesn't update anything, do an INSERT.
        if pk_set and not force_insert:
            base_qs = cls._base_manager.using(using)
            values = [
                (
                    f,
                    None,
                    (getattr(self, f.attname) if raw else f.pre_save(self, False)),
                )
                for f in non_pks_non_generated
            ]
            forced_update = update_fields or force_update
            pk_val = self._get_pk_val(meta)
            updated = self._do_update(
                base_qs, using, pk_val, values, update_fields, forced_update
            )
            if force_update and not updated:
                raise DatabaseError("Forced update did not affect any rows.")
            if update_fields and not updated:
                raise DatabaseError("Save with update_fields did not affect any rows.")
        if not updated:
            if meta.order_with_respect_to:
                # If this is a model with an order_with_respect_to
                # autopopulate the _order field
                field = meta.order_with_respect_to
                filter_args = field.get_filter_kwargs_for_object(self)
                self._order = (
                    cls._base_manager.using(using)
                    .filter(**filter_args)
                    .aggregate(
                        _order__max=Coalesce(
                            ExpressionWrapper(
                                Max("_order") + Value(1), output_field=IntegerField()
                            ),
                            Value(0),
                        ),
                    )["_order__max"]
                )
            fields = [
                f
                for f in meta.local_concrete_fields
                if not f.generated and (pk_set or f is not meta.auto_field)
            ]
            returning_fields = meta.db_returning_fields
>           results = self._do_insert(
                cls._base_manager, using, fields, returning_fields, raw
            )

.venv/lib/python3.13.../db/models/base.py:1169: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <ProxyProvider: Proxy Provider test>
manager = <django.db.models.manager.Manager object at 0x7f507bfe37d0>
using = 'default'
fields = [<django.db.models.fields.TextField: name>, <django.db.models.fields.related.ForeignKey: authentication_flow>, <django....db.models.fields.related.ForeignKey: backchannel_application>, <django.db.models.fields.BooleanField: is_backchannel>]
returning_fields = [<django.db.models.fields.AutoField: id>], raw = False

    def _do_insert(self, manager, using, fields, returning_fields, raw):
        """
        Do an INSERT. If returning_fields is defined then this method should
        return the newly created data for the model.
        """
>       return manager._insert(
            [self],
            fields=fields,
            returning_fields=returning_fields,
            using=using,
            raw=raw,
        )

.venv/lib/python3.13.../db/models/base.py:1210: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <django.db.models.manager.Manager object at 0x7f507bfe37d0>
args = ([<ProxyProvider: Proxy Provider test>],)
kwargs = {'fields': [<django.db.models.fields.TextField: name>, <django.db.models.fields.related.ForeignKey: authentication_flo...eld: is_backchannel>], 'raw': False, 'returning_fields': [<django.db.models.fields.AutoField: id>], 'using': 'default'}

    @wraps(method)
    def manager_method(self, *args, **kwargs):
>       return getattr(self.get_queryset(), name)(*args, **kwargs)

.venv/lib/python3.13.../db/models/manager.py:87: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <QuerySet []>, objs = [<ProxyProvider: Proxy Provider test>]
fields = [<django.db.models.fields.TextField: name>, <django.db.models.fields.related.ForeignKey: authentication_flow>, <django....db.models.fields.related.ForeignKey: backchannel_application>, <django.db.models.fields.BooleanField: is_backchannel>]
returning_fields = [<django.db.models.fields.AutoField: id>], raw = False
using = 'default', on_conflict = None, update_fields = None
unique_fields = None

    def _insert(
        self,
        objs,
        fields,
        returning_fields=None,
        raw=False,
        using=None,
        on_conflict=None,
        update_fields=None,
        unique_fields=None,
    ):
        """
        Insert a new record for the given model. This provides an interface to
        the InsertQuery class and is how Model.save() is implemented.
        """
        self._for_write = True
        if using is None:
            using = self.db
        query = sql.InsertQuery(
            self.model,
            on_conflict=on_conflict,
            update_fields=update_fields,
            unique_fields=unique_fields,
        )
        query.insert_values(fields, objs, raw=raw)
>       return query.get_compiler(using=using).execute_sql(returning_fields)

.venv/lib/python3.13.../db/models/query.py:1873: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <SQLInsertCompiler model=Provider connection=<DatabaseWrapper vendor='postgresql' alias='default'> using='default'>
returning_fields = [<django.db.models.fields.AutoField: id>]

    def execute_sql(self, returning_fields=None):
        assert not (
            returning_fields
            and len(self.query.objs) != 1
            and not self.connection.features.can_return_rows_from_bulk_insert
        )
        opts = self.query.get_meta()
        self.returning_fields = returning_fields
        cols = []
        with self.connection.cursor() as cursor:
            for sql, params in self.as_sql():
>               cursor.execute(sql, params)

.venv/lib/python3.13.../models/sql/compiler.py:1882: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

args = (<django.db.backends.utils.CursorWrapper object at 0x7f507bb14d10>, 'INSERT INTO "authentik_core_provider" ("name", "a...RNING "authentik_core_provider"."id"', ('test', None, UUID('f89ce6cf-106d-4558-b475-835209cf5a3e'), None, None, False))
kwargs = {}

    def runner(*args: "P.args", **kwargs: "P.kwargs"):
        # type: (...) -> R
        if sentry_sdk.get_client().get_integration(integration) is None:
            return original_function(*args, **kwargs)
    
>       return sentry_patched_function(*args, **kwargs)

.venv/lib/python3.13.../site-packages/sentry_sdk/utils.py:1816: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <django.db.backends.utils.CursorWrapper object at 0x7f507bb14d10>
sql = 'INSERT INTO "authentik_core_provider" ("name", "authentication_flow_id", "authorization_flow_id", "invalidation_flow_...ackchannel_application_id", "is_backchannel") VALUES (%s, %s, %s, %s, %s, %s) RETURNING "authentik_core_provider"."id"'
params = ('test', None, UUID('f89ce6cf-106d-4558-b475-835209cf5a3e'), None, None, False)

    @ensure_integration_enabled(DjangoIntegration, real_execute)
    def execute(self, sql, params=None):
        # type: (CursorWrapper, Any, Optional[Any]) -> Any
        with record_sql_queries(
            cursor=self.cursor,
            query=sql,
            params_list=params,
            paramstyle="format",
            executemany=False,
            span_origin=DjangoIntegration.origin_db,
        ) as span:
            _set_db_data(span, self)
>           result = real_execute(self, sql, params)

.venv/lib/python3.13.../integrations/django/__init__.py:651: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <django.db.backends.utils.CursorWrapper object at 0x7f507bb14d10>
sql = 'INSERT INTO "authentik_core_provider" ("name", "authentication_flow_id", "authorization_flow_id", "invalidation_flow_...ackchannel_application_id", "is_backchannel") VALUES (%s, %s, %s, %s, %s, %s) RETURNING "authentik_core_provider"."id"'
params = ('test', None, UUID('f89ce6cf-106d-4558-b475-835209cf5a3e'), None, None, False)

    def execute(self, sql, params=None):
>       return self._execute_with_wrappers(
            sql, params, many=False, executor=self._execute
        )

.venv/lib/python3.13.../db/backends/utils.py:79: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <django.db.backends.utils.CursorWrapper object at 0x7f507bb14d10>
sql = 'INSERT INTO "authentik_core_provider" ("name", "authentication_flow_id", "authorization_flow_id", "invalidation_flow_...ackchannel_application_id", "is_backchannel") VALUES (%s, %s, %s, %s, %s, %s) RETURNING "authentik_core_provider"."id"'
params = ('test', None, UUID('f89ce6cf-106d-4558-b475-835209cf5a3e'), None, None, False)
many = False
executor = <bound method CursorWrapper._execute of <django.db.backends.utils.CursorWrapper object at 0x7f507bb14d10>>

    def _execute_with_wrappers(self, sql, params, many, executor):
        context = {"connection": self.db, "cursor": self}
        for wrapper in reversed(self.db.execute_wrappers):
            executor = functools.partial(wrapper, executor)
>       return executor(sql, params, many, context)

.venv/lib/python3.13.../db/backends/utils.py:92: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <django.db.backends.utils.CursorWrapper object at 0x7f507bb14d10>
sql = 'INSERT INTO "authentik_core_provider" ("name", "authentication_flow_id", "authorization_flow_id", "invalidation_flow_...ackchannel_application_id", "is_backchannel") VALUES (%s, %s, %s, %s, %s, %s) RETURNING "authentik_core_provider"."id"'
params = ('test', None, UUID('f89ce6cf-106d-4558-b475-835209cf5a3e'), None, None, False)
ignored_wrapper_args = (False, {'connection': <DatabaseWrapper vendor='postgresql' alias='default'>, 'cursor': <django.db.backends.utils.CursorWrapper object at 0x7f507bb14d10>})

    def _execute(self, sql, params, *ignored_wrapper_args):
        # Raise a warning during app initialization (stored_app_configs is only
        # ever set during testing).
        if not apps.ready and not apps.stored_app_configs:
            warnings.warn(self.APPS_NOT_READY_WARNING_MSG, category=RuntimeWarning)
        self.db.validate_no_broken_transaction()
>       with self.db.wrap_database_errors:

.venv/lib/python3.13.../db/backends/utils.py:100: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <django.db.utils.DatabaseErrorWrapper object at 0x7f5080592e40>
exc_type = <class 'psycopg.errors.UniqueViolation'>
exc_value = UniqueViolation('duplicate key value violates unique constraint "authentik_core_provider_name_66fc4ef4_uniq"\nDETAIL:  Key (name)=(test) already exists.')
traceback = <traceback object at 0x7f507ad94780>

    def __exit__(self, exc_type, exc_value, traceback):
        if exc_type is None:
            return
        for dj_exc_type in (
            DataError,
            OperationalError,
            IntegrityError,
            InternalError,
            ProgrammingError,
            NotSupportedError,
            DatabaseError,
            InterfaceError,
            Error,
        ):
            db_exc_type = getattr(self.wrapper.Database, dj_exc_type.__name__)
            if issubclass(exc_type, db_exc_type):
                dj_exc_value = dj_exc_type(*exc_value.args)
                # Only set the 'errors_occurred' flag for errors that may make
                # the connection unusable.
                if dj_exc_type not in (DataError, IntegrityError):
                    self.wrapper.errors_occurred = True
>               raise dj_exc_value.with_traceback(traceback) from exc_value

.venv/lib/python3.13.../django/db/utils.py:91: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <django.db.backends.utils.CursorWrapper object at 0x7f507bb14d10>
sql = 'INSERT INTO "authentik_core_provider" ("name", "authentication_flow_id", "authorization_flow_id", "invalidation_flow_...ackchannel_application_id", "is_backchannel") VALUES (%s, %s, %s, %s, %s, %s) RETURNING "authentik_core_provider"."id"'
params = ('test', None, UUID('f89ce6cf-106d-4558-b475-835209cf5a3e'), None, None, False)
ignored_wrapper_args = (False, {'connection': <DatabaseWrapper vendor='postgresql' alias='default'>, 'cursor': <django.db.backends.utils.CursorWrapper object at 0x7f507bb14d10>})

    def _execute(self, sql, params, *ignored_wrapper_args):
        # Raise a warning during app initialization (stored_app_configs is only
        # ever set during testing).
        if not apps.ready and not apps.stored_app_configs:
            warnings.warn(self.APPS_NOT_READY_WARNING_MSG, category=RuntimeWarning)
        self.db.validate_no_broken_transaction()
        with self.db.wrap_database_errors:
            if params is None:
                # params default might be backend specific.
                return self.cursor.execute(sql)
            else:
>               return self.cursor.execute(sql, params)

.venv/lib/python3.13.../db/backends/utils.py:105: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <django_prometheus.db.common.ExportingCursorWrapper.<locals>.CursorWrapper [closed] [BAD] at 0x7f507b1a4a10>
args = ('INSERT INTO "authentik_core_provider" ("name", "authentication_flow_id", "authorization_flow_id", "invalidation_flow...RNING "authentik_core_provider"."id"', ('test', None, UUID('f89ce6cf-106d-4558-b475-835209cf5a3e'), None, None, False))
kwargs = {}

    def execute(self, *args, **kwargs):
        execute_total.labels(alias, vendor).inc()
        with (
            query_duration_seconds.labels(**labels).time(),
            ExceptionCounterByType(errors_total, extra_labels=labels),
        ):
>           return super().execute(*args, **kwargs)

.venv/lib/python3.13.../django_prometheus/db/common.py:69: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <django_prometheus.db.common.ExportingCursorWrapper.<locals>.CursorWrapper [closed] [BAD] at 0x7f507b1a4a10>
query = 'INSERT INTO "authentik_core_provider" ("name", "authentication_flow_id", "authorization_flow_id", "invalidation_flow_...ackchannel_application_id", "is_backchannel") VALUES (%s, %s, %s, %s, %s, %s) RETURNING "authentik_core_provider"."id"'
params = ('test', None, UUID('f89ce6cf-106d-4558-b475-835209cf5a3e'), None, None, False)

    def execute(
        self,
        query: Query,
        params: Params | None = None,
        *,
        prepare: bool | None = None,
        binary: bool | None = None,
    ) -> Self:
        """
        Execute a query or command to the database.
        """
        try:
            with self._conn.lock:
                self._conn.wait(
                    self._execute_gen(query, params, prepare=prepare, binary=binary)
                )
        except e._NO_TRACEBACK as ex:
>           raise ex.with_traceback(None)
E           django.db.utils.IntegrityError: duplicate key value violates unique constraint "authentik_core_provider_name_66fc4ef4_uniq"
E           DETAIL:  Key (name)=(test) already exists.

.venv/lib/python3.13....../site-packages/psycopg/cursor.py:97: IntegrityError
tests.e2e.test_provider_ldap.TestProviderLDAP::test_ldap_bind_success_starttls
Stack Traces | 13.5s run time
self = <unittest.case._Outcome object at 0x7f5f3b79df30>
test_case = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_success_starttls>
subTest = False

    @contextlib.contextmanager
    def testPartExecutor(self, test_case, subTest=False):
        old_success = self.success
        self.success = True
        try:
>           yield

.../hostedtoolcache/Python/3.13.12........./x64/lib/python3.13/unittest/case.py:58: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_success_starttls>
result = <TestCaseFunction test_ldap_bind_success_starttls>

    def run(self, result=None):
        if result is None:
            result = self.defaultTestResult()
            startTestRun = getattr(result, 'startTestRun', None)
            stopTestRun = getattr(result, 'stopTestRun', None)
            if startTestRun is not None:
                startTestRun()
        else:
            stopTestRun = None
    
        result.startTest(self)
        try:
            testMethod = getattr(self, self._testMethodName)
            if (getattr(self.__class__, "__unittest_skip__", False) or
                getattr(testMethod, "__unittest_skip__", False)):
                # If the class or method was skipped.
                skip_why = (getattr(self.__class__, '__unittest_skip_why__', '')
                            or getattr(testMethod, '__unittest_skip_why__', ''))
                _addSkip(result, self, skip_why)
                return result
    
            expecting_failure = (
                getattr(self, "__unittest_expecting_failure__", False) or
                getattr(testMethod, "__unittest_expecting_failure__", False)
            )
            outcome = _Outcome(result)
            start_time = time.perf_counter()
            try:
                self._outcome = outcome
    
                with outcome.testPartExecutor(self):
                    self._callSetUp()
                if outcome.success:
                    outcome.expecting_failure = expecting_failure
                    with outcome.testPartExecutor(self):
>                       self._callTestMethod(testMethod)

.../hostedtoolcache/Python/3.13.12........./x64/lib/python3.13/unittest/case.py:651: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_success_starttls>
method = <bound method TestProviderLDAP.test_ldap_bind_success_starttls of <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_success_starttls>>

    def _callTestMethod(self, method):
>       if method() is not None:

.../hostedtoolcache/Python/3.13.12........./x64/lib/python3.13/unittest/case.py:606: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_success_starttls>
args = (), kwargs = {}

    @wraps(func)
    def wrapper(self: TransactionTestCase, *args, **kwargs):
        """Run test again if we're below max_retries, including tearDown and
        setUp. Otherwise raise the error"""
        nonlocal count
        try:
>           return func(self, *args, **kwargs)

tests/e2e/utils.py:324: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

args = (<tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_success_starttls>,)
kwargs = {}, file = 'default/flow-default-invalidation-flow.yaml'
content = 'version: 1\nmetadata:\n  name: Default - Invalidation flow\nentries:\n- attrs:\n    designation: invalidation\n    na...0\n    stage: !KeyOf default-invalidation-logout\n    target: !KeyOf flow\n  model: authentik_flows.flowstagebinding\n'

    @wraps(func)
    def wrapper(*args, **kwargs):
        for file in files:
            content = BlueprintInstance(path=file).retrieve()
            Importer.from_string(content).apply()
>       return func(*args, **kwargs)

.../blueprints/tests/__init__.py:25: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_success_starttls>

    @retry()
    @apply_blueprint(
        "default/flow-default-authentication-flow.yaml",
        "default/flow-default-invalidation-flow.yaml",
    )
    def test_ldap_bind_success_starttls(self):
        """Test simple bind with ssl"""
        self._prepare()
        server = Server("ldap://localhost:3389")
        _connection = Connection(
            server,
            raise_exceptions=True,
            user=f"cn={self.user.username},ou=users,DC=ldap,DC=goauthentik,DC=io",
            password=self.user.username,
        )
        _connection.start_tls()
>       _connection.bind()

tests/e2e/test_provider_ldap.py:130: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = Connection(server=Server(host='localhost', port=3389, use_ssl=False, allowed_referral_hosts=[('*', True)], tls=Tls(val...oder=True, auto_range=True, return_empty_attributes=True, auto_encode=True, auto_escape=True, use_referral_cache=False)
read_server_info = True, controls = None

    def bind(self,
             read_server_info=True,
             controls=None):
        """Bind to ldap Server with the authentication method and the user defined in the connection
    
        :param read_server_info: reads info from server
        :param controls: LDAP controls to send along with the bind operation
        :type controls: list of tuple
        :return: bool
    
        """
        if log_enabled(BASIC):
            log(BASIC, 'start BIND operation via <%s>', self)
        self.last_error = None
        with self.connection_lock:
            if self.lazy and not self._executing_deferred:
                if self.strategy.pooled:
                    self.strategy.validate_bind(controls)
                self._deferred_bind = True
                self._bind_controls = controls
                self.bound = True
                if log_enabled(BASIC):
                    log(BASIC, 'deferring bind for <%s>', self)
            else:
                self._deferred_bind = False
                self._bind_controls = None
                if self.closed:  # try to open connection if closed
                    self.open(read_server_info=False)
                if self.authentication == ANONYMOUS:
                    if log_enabled(PROTOCOL):
                        log(PROTOCOL, 'performing anonymous BIND for <%s>', self)
                    if not self.strategy.pooled:
                        request = bind_operation(self.version, self.authentication, self.user, '', auto_encode=self.auto_encode)
                        if log_enabled(PROTOCOL):
                            log(PROTOCOL, 'anonymous BIND request <%s> sent via <%s>', bind_request_to_dict(request), self)
                        response = self.post_send_single_response(self.send('bindRequest', request, controls))
                    else:
                        response = self.strategy.validate_bind(controls)  # only for REUSABLE
                elif self.authentication == SIMPLE:
                    if log_enabled(PROTOCOL):
                        log(PROTOCOL, 'performing simple BIND for <%s>', self)
                    if not self.strategy.pooled:
                        request = bind_operation(self.version, self.authentication, self.user, self.password, auto_encode=self.auto_encode)
                        if log_enabled(PROTOCOL):
                            log(PROTOCOL, 'simple BIND request <%s> sent via <%s>', bind_request_to_dict(request), self)
>                       response = self.post_send_single_response(self.send('bindRequest', request, controls))

.venv/lib/python3.13.../ldap3/core/connection.py:607: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <ldap3.strategy.sync.SyncStrategy object at 0x7f5f3b529940>
message_id = 9

    def post_send_single_response(self, message_id):
        """
        Executed after an Operation Request (except Search)
        Returns the result message or None
        """
>       responses, result = self.get_response(message_id)

.venv/lib/python3.13.../ldap3/strategy/sync.py:160: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <ldap3.strategy.sync.SyncStrategy object at 0x7f5f3b529940>
message_id = 9, timeout = 20, get_request = False

    def get_response(self, message_id, timeout=None, get_request=False):
        """
        Get response LDAP messages
        Responses are returned by the underlying connection strategy
        Check if message_id LDAP message is still outstanding and wait for timeout to see if it appears in _get_response
        Result is stored in connection.result
        Responses without result is stored in connection.response
        A tuple (responses, result) is returned
        """
        if timeout is None:
            timeout = get_config_parameter('RESPONSE_WAITING_TIMEOUT')
        response = None
        result = None
        # request = None
        if self._outstanding and message_id in self._outstanding:
            responses = self._get_response(message_id, timeout)
    
            if not responses:
                if log_enabled(ERROR):
                    log(ERROR, 'socket timeout, no response from server for <%s>', self.connection)
                raise LDAPResponseTimeoutError('no response from server')
    
            if responses == SESSION_TERMINATED_BY_SERVER:
                try:  # try to close the session but don't raise any error if server has already closed the session
                    self.close()
                except (socket.error, LDAPExceptionError):
                    pass
                self.connection.last_error = 'session terminated by server'
                if log_enabled(ERROR):
                    log(ERROR, '<%s> for <%s>', self.connection.last_error, self.connection)
                raise LDAPSessionTerminatedByServerError(self.connection.last_error)
            elif responses == TRANSACTION_ERROR:  # Novell LDAP Transaction unsolicited notification
                self.connection.last_error = 'transaction error'
                if log_enabled(ERROR):
                    log(ERROR, '<%s> for <%s>', self.connection.last_error, self.connection)
                raise LDAPTransactionError(self.connection.last_error)
    
            # if referral in response opens a new connection to resolve referrals if requested
    
            if responses[-2]['result'] == RESULT_REFERRAL:
                if self.connection.usage:
                    self.connection._usage.referrals_received += 1
                if self.connection.auto_referrals:
                    ref_response, ref_result = self.do_operation_on_referral(self._outstanding[message_id], responses[-2]['referrals'])
                    if ref_response is not None:
                        responses = ref_response + [ref_result]
                        responses.append(RESPONSE_COMPLETE)
                    elif ref_result is not None:
                        responses = [ref_result, RESPONSE_COMPLETE]
    
                    self._referrals = []
    
            if responses:
                result = responses[-2]
                response = responses[:-2]
                self.connection.result = None
                self.connection.response = None
    
            if self.connection.raise_exceptions and result and result['result'] not in DO_NOT_RAISE_EXCEPTIONS:
                if log_enabled(PROTOCOL):
                    log(PROTOCOL, 'operation result <%s> for <%s>', result, self.connection)
                self._outstanding.pop(message_id)
                self.connection.result = result.copy()
>               raise LDAPOperationResult(result=result['result'], description=result['description'], dn=result['dn'], message=result['message'], response_type=result['type'])
E               ldap3.core.exceptions.LDAPOperationsErrorResult: LDAPOperationsErrorResult - 1 - operationsError - None - None - bindResponse - None

.venv/lib/python3.13.../ldap3/strategy/base.py:403: LDAPOperationsErrorResult
tests.e2e.test_provider_ldap.TestProviderLDAP::test_ldap_bind_success_ssl
Stack Traces | 13.5s run time
self = <unittest.case._Outcome object at 0x7f5f3c46aea0>
test_case = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_success_ssl>
subTest = False

    @contextlib.contextmanager
    def testPartExecutor(self, test_case, subTest=False):
        old_success = self.success
        self.success = True
        try:
>           yield

.../hostedtoolcache/Python/3.13.12........./x64/lib/python3.13/unittest/case.py:58: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_success_ssl>
result = <TestCaseFunction test_ldap_bind_success_ssl>

    def run(self, result=None):
        if result is None:
            result = self.defaultTestResult()
            startTestRun = getattr(result, 'startTestRun', None)
            stopTestRun = getattr(result, 'stopTestRun', None)
            if startTestRun is not None:
                startTestRun()
        else:
            stopTestRun = None
    
        result.startTest(self)
        try:
            testMethod = getattr(self, self._testMethodName)
            if (getattr(self.__class__, "__unittest_skip__", False) or
                getattr(testMethod, "__unittest_skip__", False)):
                # If the class or method was skipped.
                skip_why = (getattr(self.__class__, '__unittest_skip_why__', '')
                            or getattr(testMethod, '__unittest_skip_why__', ''))
                _addSkip(result, self, skip_why)
                return result
    
            expecting_failure = (
                getattr(self, "__unittest_expecting_failure__", False) or
                getattr(testMethod, "__unittest_expecting_failure__", False)
            )
            outcome = _Outcome(result)
            start_time = time.perf_counter()
            try:
                self._outcome = outcome
    
                with outcome.testPartExecutor(self):
                    self._callSetUp()
                if outcome.success:
                    outcome.expecting_failure = expecting_failure
                    with outcome.testPartExecutor(self):
>                       self._callTestMethod(testMethod)

.../hostedtoolcache/Python/3.13.12........./x64/lib/python3.13/unittest/case.py:651: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_success_ssl>
method = <bound method TestProviderLDAP.test_ldap_bind_success_ssl of <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_success_ssl>>

    def _callTestMethod(self, method):
>       if method() is not None:

.../hostedtoolcache/Python/3.13.12........./x64/lib/python3.13/unittest/case.py:606: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_success_ssl>
args = (), kwargs = {}

    @wraps(func)
    def wrapper(self: TransactionTestCase, *args, **kwargs):
        """Run test again if we're below max_retries, including tearDown and
        setUp. Otherwise raise the error"""
        nonlocal count
        try:
>           return func(self, *args, **kwargs)

tests/e2e/utils.py:324: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

args = (<tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_success_ssl>,)
kwargs = {}, file = 'default/flow-default-invalidation-flow.yaml'
content = 'version: 1\nmetadata:\n  name: Default - Invalidation flow\nentries:\n- attrs:\n    designation: invalidation\n    na...0\n    stage: !KeyOf default-invalidation-logout\n    target: !KeyOf flow\n  model: authentik_flows.flowstagebinding\n'

    @wraps(func)
    def wrapper(*args, **kwargs):
        for file in files:
            content = BlueprintInstance(path=file).retrieve()
            Importer.from_string(content).apply()
>       return func(*args, **kwargs)

.../blueprints/tests/__init__.py:25: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_success_ssl>

    @retry()
    @apply_blueprint(
        "default/flow-default-authentication-flow.yaml",
        "default/flow-default-invalidation-flow.yaml",
    )
    def test_ldap_bind_success_ssl(self):
        """Test simple bind with ssl"""
        self._prepare()
        server = Server("ldaps://localhost:6636", get_info=ALL)
        _connection = Connection(
            server,
            raise_exceptions=True,
            user=f"cn={self.user.username},ou=users,DC=ldap,DC=goauthentik,DC=io",
            password=self.user.username,
        )
>       _connection.bind()

tests/e2e/test_provider_ldap.py:102: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = Connection(server=Server(host='localhost', port=6636, use_ssl=True, allowed_referral_hosts=[('*', True)], tls=Tls(vali...oder=True, auto_range=True, return_empty_attributes=True, auto_encode=True, auto_escape=True, use_referral_cache=False)
read_server_info = True, controls = None

    def bind(self,
             read_server_info=True,
             controls=None):
        """Bind to ldap Server with the authentication method and the user defined in the connection
    
        :param read_server_info: reads info from server
        :param controls: LDAP controls to send along with the bind operation
        :type controls: list of tuple
        :return: bool
    
        """
        if log_enabled(BASIC):
            log(BASIC, 'start BIND operation via <%s>', self)
        self.last_error = None
        with self.connection_lock:
            if self.lazy and not self._executing_deferred:
                if self.strategy.pooled:
                    self.strategy.validate_bind(controls)
                self._deferred_bind = True
                self._bind_controls = controls
                self.bound = True
                if log_enabled(BASIC):
                    log(BASIC, 'deferring bind for <%s>', self)
            else:
                self._deferred_bind = False
                self._bind_controls = None
                if self.closed:  # try to open connection if closed
                    self.open(read_server_info=False)
                if self.authentication == ANONYMOUS:
                    if log_enabled(PROTOCOL):
                        log(PROTOCOL, 'performing anonymous BIND for <%s>', self)
                    if not self.strategy.pooled:
                        request = bind_operation(self.version, self.authentication, self.user, '', auto_encode=self.auto_encode)
                        if log_enabled(PROTOCOL):
                            log(PROTOCOL, 'anonymous BIND request <%s> sent via <%s>', bind_request_to_dict(request), self)
                        response = self.post_send_single_response(self.send('bindRequest', request, controls))
                    else:
                        response = self.strategy.validate_bind(controls)  # only for REUSABLE
                elif self.authentication == SIMPLE:
                    if log_enabled(PROTOCOL):
                        log(PROTOCOL, 'performing simple BIND for <%s>', self)
                    if not self.strategy.pooled:
                        request = bind_operation(self.version, self.authentication, self.user, self.password, auto_encode=self.auto_encode)
                        if log_enabled(PROTOCOL):
                            log(PROTOCOL, 'simple BIND request <%s> sent via <%s>', bind_request_to_dict(request), self)
>                       response = self.post_send_single_response(self.send('bindRequest', request, controls))

.venv/lib/python3.13.../ldap3/core/connection.py:607: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <ldap3.strategy.sync.SyncStrategy object at 0x7f5f4415f6f0>
message_id = 10

    def post_send_single_response(self, message_id):
        """
        Executed after an Operation Request (except Search)
        Returns the result message or None
        """
>       responses, result = self.get_response(message_id)

.venv/lib/python3.13.../ldap3/strategy/sync.py:160: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <ldap3.strategy.sync.SyncStrategy object at 0x7f5f4415f6f0>
message_id = 10, timeout = 20, get_request = False

    def get_response(self, message_id, timeout=None, get_request=False):
        """
        Get response LDAP messages
        Responses are returned by the underlying connection strategy
        Check if message_id LDAP message is still outstanding and wait for timeout to see if it appears in _get_response
        Result is stored in connection.result
        Responses without result is stored in connection.response
        A tuple (responses, result) is returned
        """
        if timeout is None:
            timeout = get_config_parameter('RESPONSE_WAITING_TIMEOUT')
        response = None
        result = None
        # request = None
        if self._outstanding and message_id in self._outstanding:
            responses = self._get_response(message_id, timeout)
    
            if not responses:
                if log_enabled(ERROR):
                    log(ERROR, 'socket timeout, no response from server for <%s>', self.connection)
                raise LDAPResponseTimeoutError('no response from server')
    
            if responses == SESSION_TERMINATED_BY_SERVER:
                try:  # try to close the session but don't raise any error if server has already closed the session
                    self.close()
                except (socket.error, LDAPExceptionError):
                    pass
                self.connection.last_error = 'session terminated by server'
                if log_enabled(ERROR):
                    log(ERROR, '<%s> for <%s>', self.connection.last_error, self.connection)
                raise LDAPSessionTerminatedByServerError(self.connection.last_error)
            elif responses == TRANSACTION_ERROR:  # Novell LDAP Transaction unsolicited notification
                self.connection.last_error = 'transaction error'
                if log_enabled(ERROR):
                    log(ERROR, '<%s> for <%s>', self.connection.last_error, self.connection)
                raise LDAPTransactionError(self.connection.last_error)
    
            # if referral in response opens a new connection to resolve referrals if requested
    
            if responses[-2]['result'] == RESULT_REFERRAL:
                if self.connection.usage:
                    self.connection._usage.referrals_received += 1
                if self.connection.auto_referrals:
                    ref_response, ref_result = self.do_operation_on_referral(self._outstanding[message_id], responses[-2]['referrals'])
                    if ref_response is not None:
                        responses = ref_response + [ref_result]
                        responses.append(RESPONSE_COMPLETE)
                    elif ref_result is not None:
                        responses = [ref_result, RESPONSE_COMPLETE]
    
                    self._referrals = []
    
            if responses:
                result = responses[-2]
                response = responses[:-2]
                self.connection.result = None
                self.connection.response = None
    
            if self.connection.raise_exceptions and result and result['result'] not in DO_NOT_RAISE_EXCEPTIONS:
                if log_enabled(PROTOCOL):
                    log(PROTOCOL, 'operation result <%s> for <%s>', result, self.connection)
                self._outstanding.pop(message_id)
                self.connection.result = result.copy()
>               raise LDAPOperationResult(result=result['result'], description=result['description'], dn=result['dn'], message=result['message'], response_type=result['type'])
E               ldap3.core.exceptions.LDAPOperationsErrorResult: LDAPOperationsErrorResult - 1 - operationsError - None - None - bindResponse - None

.venv/lib/python3.13.../ldap3/strategy/base.py:403: LDAPOperationsErrorResult
tests.e2e.test_provider_ldap.TestProviderLDAP::test_ldap_bind_success
Stack Traces | 13.5s run time
self = <unittest.case._Outcome object at 0x7f5f3cfb60d0>
test_case = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_success>
subTest = False

    @contextlib.contextmanager
    def testPartExecutor(self, test_case, subTest=False):
        old_success = self.success
        self.success = True
        try:
>           yield

.../hostedtoolcache/Python/3.13.12........./x64/lib/python3.13/unittest/case.py:58: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_success>
result = <TestCaseFunction test_ldap_bind_success>

    def run(self, result=None):
        if result is None:
            result = self.defaultTestResult()
            startTestRun = getattr(result, 'startTestRun', None)
            stopTestRun = getattr(result, 'stopTestRun', None)
            if startTestRun is not None:
                startTestRun()
        else:
            stopTestRun = None
    
        result.startTest(self)
        try:
            testMethod = getattr(self, self._testMethodName)
            if (getattr(self.__class__, "__unittest_skip__", False) or
                getattr(testMethod, "__unittest_skip__", False)):
                # If the class or method was skipped.
                skip_why = (getattr(self.__class__, '__unittest_skip_why__', '')
                            or getattr(testMethod, '__unittest_skip_why__', ''))
                _addSkip(result, self, skip_why)
                return result
    
            expecting_failure = (
                getattr(self, "__unittest_expecting_failure__", False) or
                getattr(testMethod, "__unittest_expecting_failure__", False)
            )
            outcome = _Outcome(result)
            start_time = time.perf_counter()
            try:
                self._outcome = outcome
    
                with outcome.testPartExecutor(self):
                    self._callSetUp()
                if outcome.success:
                    outcome.expecting_failure = expecting_failure
                    with outcome.testPartExecutor(self):
>                       self._callTestMethod(testMethod)

.../hostedtoolcache/Python/3.13.12........./x64/lib/python3.13/unittest/case.py:651: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_success>
method = <bound method TestProviderLDAP.test_ldap_bind_success of <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_success>>

    def _callTestMethod(self, method):
>       if method() is not None:

.../hostedtoolcache/Python/3.13.12........./x64/lib/python3.13/unittest/case.py:606: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_success>
args = (), kwargs = {}

    @wraps(func)
    def wrapper(self: TransactionTestCase, *args, **kwargs):
        """Run test again if we're below max_retries, including tearDown and
        setUp. Otherwise raise the error"""
        nonlocal count
        try:
>           return func(self, *args, **kwargs)

tests/e2e/utils.py:324: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

args = (<tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_success>,)
kwargs = {}, file = 'default/flow-default-invalidation-flow.yaml'
content = 'version: 1\nmetadata:\n  name: Default - Invalidation flow\nentries:\n- attrs:\n    designation: invalidation\n    na...0\n    stage: !KeyOf default-invalidation-logout\n    target: !KeyOf flow\n  model: authentik_flows.flowstagebinding\n'

    @wraps(func)
    def wrapper(*args, **kwargs):
        for file in files:
            content = BlueprintInstance(path=file).retrieve()
            Importer.from_string(content).apply()
>       return func(*args, **kwargs)

.../blueprints/tests/__init__.py:25: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_success>

    @retry()
    @apply_blueprint(
        "default/flow-default-authentication-flow.yaml",
        "default/flow-default-invalidation-flow.yaml",
    )
    def test_ldap_bind_success(self):
        """Test simple bind"""
        self._prepare()
        server = Server("ldap://localhost:3389", get_info=ALL)
        _connection = Connection(
            server,
            raise_exceptions=True,
            user=f"cn={self.user.username},ou=users,DC=ldap,DC=goauthentik,DC=io",
            password=self.user.username,
        )
>       _connection.bind()

tests/e2e/test_provider_ldap.py:75: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = Connection(server=Server(host='localhost', port=3389, use_ssl=False, allowed_referral_hosts=[('*', True)], get_info='A...oder=True, auto_range=True, return_empty_attributes=True, auto_encode=True, auto_escape=True, use_referral_cache=False)
read_server_info = True, controls = None

    def bind(self,
             read_server_info=True,
             controls=None):
        """Bind to ldap Server with the authentication method and the user defined in the connection
    
        :param read_server_info: reads info from server
        :param controls: LDAP controls to send along with the bind operation
        :type controls: list of tuple
        :return: bool
    
        """
        if log_enabled(BASIC):
            log(BASIC, 'start BIND operation via <%s>', self)
        self.last_error = None
        with self.connection_lock:
            if self.lazy and not self._executing_deferred:
                if self.strategy.pooled:
                    self.strategy.validate_bind(controls)
                self._deferred_bind = True
                self._bind_controls = controls
                self.bound = True
                if log_enabled(BASIC):
                    log(BASIC, 'deferring bind for <%s>', self)
            else:
                self._deferred_bind = False
                self._bind_controls = None
                if self.closed:  # try to open connection if closed
                    self.open(read_server_info=False)
                if self.authentication == ANONYMOUS:
                    if log_enabled(PROTOCOL):
                        log(PROTOCOL, 'performing anonymous BIND for <%s>', self)
                    if not self.strategy.pooled:
                        request = bind_operation(self.version, self.authentication, self.user, '', auto_encode=self.auto_encode)
                        if log_enabled(PROTOCOL):
                            log(PROTOCOL, 'anonymous BIND request <%s> sent via <%s>', bind_request_to_dict(request), self)
                        response = self.post_send_single_response(self.send('bindRequest', request, controls))
                    else:
                        response = self.strategy.validate_bind(controls)  # only for REUSABLE
                elif self.authentication == SIMPLE:
                    if log_enabled(PROTOCOL):
                        log(PROTOCOL, 'performing simple BIND for <%s>', self)
                    if not self.strategy.pooled:
                        request = bind_operation(self.version, self.authentication, self.user, self.password, auto_encode=self.auto_encode)
                        if log_enabled(PROTOCOL):
                            log(PROTOCOL, 'simple BIND request <%s> sent via <%s>', bind_request_to_dict(request), self)
>                       response = self.post_send_single_response(self.send('bindRequest', request, controls))

.venv/lib/python3.13.../ldap3/core/connection.py:607: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <ldap3.strategy.sync.SyncStrategy object at 0x7f5f3c248190>
message_id = 3

    def post_send_single_response(self, message_id):
        """
        Executed after an Operation Request (except Search)
        Returns the result message or None
        """
>       responses, result = self.get_response(message_id)

.venv/lib/python3.13.../ldap3/strategy/sync.py:160: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <ldap3.strategy.sync.SyncStrategy object at 0x7f5f3c248190>
message_id = 3, timeout = 20, get_request = False

    def get_response(self, message_id, timeout=None, get_request=False):
        """
        Get response LDAP messages
        Responses are returned by the underlying connection strategy
        Check if message_id LDAP message is still outstanding and wait for timeout to see if it appears in _get_response
        Result is stored in connection.result
        Responses without result is stored in connection.response
        A tuple (responses, result) is returned
        """
        if timeout is None:
            timeout = get_config_parameter('RESPONSE_WAITING_TIMEOUT')
        response = None
        result = None
        # request = None
        if self._outstanding and message_id in self._outstanding:
            responses = self._get_response(message_id, timeout)
    
            if not responses:
                if log_enabled(ERROR):
                    log(ERROR, 'socket timeout, no response from server for <%s>', self.connection)
                raise LDAPResponseTimeoutError('no response from server')
    
            if responses == SESSION_TERMINATED_BY_SERVER:
                try:  # try to close the session but don't raise any error if server has already closed the session
                    self.close()
                except (socket.error, LDAPExceptionError):
                    pass
                self.connection.last_error = 'session terminated by server'
                if log_enabled(ERROR):
                    log(ERROR, '<%s> for <%s>', self.connection.last_error, self.connection)
                raise LDAPSessionTerminatedByServerError(self.connection.last_error)
            elif responses == TRANSACTION_ERROR:  # Novell LDAP Transaction unsolicited notification
                self.connection.last_error = 'transaction error'
                if log_enabled(ERROR):
                    log(ERROR, '<%s> for <%s>', self.connection.last_error, self.connection)
                raise LDAPTransactionError(self.connection.last_error)
    
            # if referral in response opens a new connection to resolve referrals if requested
    
            if responses[-2]['result'] == RESULT_REFERRAL:
                if self.connection.usage:
                    self.connection._usage.referrals_received += 1
                if self.connection.auto_referrals:
                    ref_response, ref_result = self.do_operation_on_referral(self._outstanding[message_id], responses[-2]['referrals'])
                    if ref_response is not None:
                        responses = ref_response + [ref_result]
                        responses.append(RESPONSE_COMPLETE)
                    elif ref_result is not None:
                        responses = [ref_result, RESPONSE_COMPLETE]
    
                    self._referrals = []
    
            if responses:
                result = responses[-2]
                response = responses[:-2]
                self.connection.result = None
                self.connection.response = None
    
            if self.connection.raise_exceptions and result and result['result'] not in DO_NOT_RAISE_EXCEPTIONS:
                if log_enabled(PROTOCOL):
                    log(PROTOCOL, 'operation result <%s> for <%s>', result, self.connection)
                self._outstanding.pop(message_id)
                self.connection.result = result.copy()
>               raise LDAPOperationResult(result=result['result'], description=result['description'], dn=result['dn'], message=result['message'], response_type=result['type'])
E               ldap3.core.exceptions.LDAPOperationsErrorResult: LDAPOperationsErrorResult - 1 - operationsError - None - None - bindResponse - None

.venv/lib/python3.13.../ldap3/strategy/base.py:403: LDAPOperationsErrorResult
tests.e2e.test_provider_ldap.TestProviderLDAP::test_ldap_bind_search
Stack Traces | 13.9s run time
self = <unittest.case._Outcome object at 0x7f5f3d8be490>
test_case = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_search>
subTest = False

    @contextlib.contextmanager
    def testPartExecutor(self, test_case, subTest=False):
        old_success = self.success
        self.success = True
        try:
>           yield

.../hostedtoolcache/Python/3.13.12........./x64/lib/python3.13/unittest/case.py:58: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_search>
result = <TestCaseFunction test_ldap_bind_search>

    def run(self, result=None):
        if result is None:
            result = self.defaultTestResult()
            startTestRun = getattr(result, 'startTestRun', None)
            stopTestRun = getattr(result, 'stopTestRun', None)
            if startTestRun is not None:
                startTestRun()
        else:
            stopTestRun = None
    
        result.startTest(self)
        try:
            testMethod = getattr(self, self._testMethodName)
            if (getattr(self.__class__, "__unittest_skip__", False) or
                getattr(testMethod, "__unittest_skip__", False)):
                # If the class or method was skipped.
                skip_why = (getattr(self.__class__, '__unittest_skip_why__', '')
                            or getattr(testMethod, '__unittest_skip_why__', ''))
                _addSkip(result, self, skip_why)
                return result
    
            expecting_failure = (
                getattr(self, "__unittest_expecting_failure__", False) or
                getattr(testMethod, "__unittest_expecting_failure__", False)
            )
            outcome = _Outcome(result)
            start_time = time.perf_counter()
            try:
                self._outcome = outcome
    
                with outcome.testPartExecutor(self):
                    self._callSetUp()
                if outcome.success:
                    outcome.expecting_failure = expecting_failure
                    with outcome.testPartExecutor(self):
>                       self._callTestMethod(testMethod)

.../hostedtoolcache/Python/3.13.12........./x64/lib/python3.13/unittest/case.py:651: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_search>
method = <bound method TestProviderLDAP.test_ldap_bind_search of <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_search>>

    def _callTestMethod(self, method):
>       if method() is not None:

.../hostedtoolcache/Python/3.13.12........./x64/lib/python3.13/unittest/case.py:606: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_search>
args = (), kwargs = {}

    @wraps(func)
    def wrapper(self: TransactionTestCase, *args, **kwargs):
        """Run test again if we're below max_retries, including tearDown and
        setUp. Otherwise raise the error"""
        nonlocal count
        try:
>           return func(self, *args, **kwargs)

tests/e2e/utils.py:324: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

args = (<tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_search>,)
kwargs = {}, file = 'default/flow-default-invalidation-flow.yaml'
content = 'version: 1\nmetadata:\n  name: Default - Invalidation flow\nentries:\n- attrs:\n    designation: invalidation\n    na...0\n    stage: !KeyOf default-invalidation-logout\n    target: !KeyOf flow\n  model: authentik_flows.flowstagebinding\n'

    @wraps(func)
    def wrapper(*args, **kwargs):
        for file in files:
            content = BlueprintInstance(path=file).retrieve()
            Importer.from_string(content).apply()
>       return func(*args, **kwargs)

.../blueprints/tests/__init__.py:25: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

args = (<tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_search>,)
kwargs = {}, config = <AuthentikTenantsConfig: authentik_tenants>

    @wraps(func)
    def wrapper(*args, **kwargs):
        config = apps.get_app_config(app_name)
        if isinstance(config, ManagedAppConfig):
            config._on_startup_callback(None)
>       return func(*args, **kwargs)

.../blueprints/tests/__init__.py:43: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

args = (<tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_search>,)
kwargs = {}, config = <AuthentikOutpostConfig: authentik_outposts>

    @wraps(func)
    def wrapper(*args, **kwargs):
        config = apps.get_app_config(app_name)
        if isinstance(config, ManagedAppConfig):
            config._on_startup_callback(None)
>       return func(*args, **kwargs)

.../blueprints/tests/__init__.py:43: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_search>

    @retry()
    @apply_blueprint(
        "default/flow-default-authentication-flow.yaml",
        "default/flow-default-invalidation-flow.yaml",
    )
    @reconcile_app("authentik_tenants")
    @reconcile_app("authentik_outposts")
    def test_ldap_bind_search(self):
        """Test simple bind + search"""
        # Remove akadmin to ensure list is correct
        # Remove user before starting container so it's not cached
        User.objects.filter(username="akadmin").delete()
    
        outpost = self._prepare()
        server = Server("ldap://localhost:3389", get_info=ALL)
        _connection = Connection(
            server,
            raise_exceptions=True,
            user=f"cn={self.user.username},ou=users,dc=ldap,dc=goauthentik,dc=io",
            password=self.user.username,
        )
>       _connection.bind()

tests/e2e/test_provider_ldap.py:191: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = Connection(server=Server(host='localhost', port=3389, use_ssl=False, allowed_referral_hosts=[('*', True)], get_info='A...oder=True, auto_range=True, return_empty_attributes=True, auto_encode=True, auto_escape=True, use_referral_cache=False)
read_server_info = True, controls = None

    def bind(self,
             read_server_info=True,
             controls=None):
        """Bind to ldap Server with the authentication method and the user defined in the connection
    
        :param read_server_info: reads info from server
        :param controls: LDAP controls to send along with the bind operation
        :type controls: list of tuple
        :return: bool
    
        """
        if log_enabled(BASIC):
            log(BASIC, 'start BIND operation via <%s>', self)
        self.last_error = None
        with self.connection_lock:
            if self.lazy and not self._executing_deferred:
                if self.strategy.pooled:
                    self.strategy.validate_bind(controls)
                self._deferred_bind = True
                self._bind_controls = controls
                self.bound = True
                if log_enabled(BASIC):
                    log(BASIC, 'deferring bind for <%s>', self)
            else:
                self._deferred_bind = False
                self._bind_controls = None
                if self.closed:  # try to open connection if closed
                    self.open(read_server_info=False)
                if self.authentication == ANONYMOUS:
                    if log_enabled(PROTOCOL):
                        log(PROTOCOL, 'performing anonymous BIND for <%s>', self)
                    if not self.strategy.pooled:
                        request = bind_operation(self.version, self.authentication, self.user, '', auto_encode=self.auto_encode)
                        if log_enabled(PROTOCOL):
                            log(PROTOCOL, 'anonymous BIND request <%s> sent via <%s>', bind_request_to_dict(request), self)
                        response = self.post_send_single_response(self.send('bindRequest', request, controls))
                    else:
                        response = self.strategy.validate_bind(controls)  # only for REUSABLE
                elif self.authentication == SIMPLE:
                    if log_enabled(PROTOCOL):
                        log(PROTOCOL, 'performing simple BIND for <%s>', self)
                    if not self.strategy.pooled:
                        request = bind_operation(self.version, self.authentication, self.user, self.password, auto_encode=self.auto_encode)
                        if log_enabled(PROTOCOL):
                            log(PROTOCOL, 'simple BIND request <%s> sent via <%s>', bind_request_to_dict(request), self)
>                       response = self.post_send_single_response(self.send('bindRequest', request, controls))

.venv/lib/python3.13.../ldap3/core/connection.py:607: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <ldap3.strategy.sync.SyncStrategy object at 0x7f5f3c646490>
message_id = 2

    def post_send_single_response(self, message_id):
        """
        Executed after an Operation Request (except Search)
        Returns the result message or None
        """
>       responses, result = self.get_response(message_id)

.venv/lib/python3.13.../ldap3/strategy/sync.py:160: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <ldap3.strategy.sync.SyncStrategy object at 0x7f5f3c646490>
message_id = 2, timeout = 20, get_request = False

    def get_response(self, message_id, timeout=None, get_request=False):
        """
        Get response LDAP messages
        Responses are returned by the underlying connection strategy
        Check if message_id LDAP message is still outstanding and wait for timeout to see if it appears in _get_response
        Result is stored in connection.result
        Responses without result is stored in connection.response
        A tuple (responses, result) is returned
        """
        if timeout is None:
            timeout = get_config_parameter('RESPONSE_WAITING_TIMEOUT')
        response = None
        result = None
        # request = None
        if self._outstanding and message_id in self._outstanding:
            responses = self._get_response(message_id, timeout)
    
            if not responses:
                if log_enabled(ERROR):
                    log(ERROR, 'socket timeout, no response from server for <%s>', self.connection)
                raise LDAPResponseTimeoutError('no response from server')
    
            if responses == SESSION_TERMINATED_BY_SERVER:
                try:  # try to close the session but don't raise any error if server has already closed the session
                    self.close()
                except (socket.error, LDAPExceptionError):
                    pass
                self.connection.last_error = 'session terminated by server'
                if log_enabled(ERROR):
                    log(ERROR, '<%s> for <%s>', self.connection.last_error, self.connection)
                raise LDAPSessionTerminatedByServerError(self.connection.last_error)
            elif responses == TRANSACTION_ERROR:  # Novell LDAP Transaction unsolicited notification
                self.connection.last_error = 'transaction error'
                if log_enabled(ERROR):
                    log(ERROR, '<%s> for <%s>', self.connection.last_error, self.connection)
                raise LDAPTransactionError(self.connection.last_error)
    
            # if referral in response opens a new connection to resolve referrals if requested
    
            if responses[-2]['result'] == RESULT_REFERRAL:
                if self.connection.usage:
                    self.connection._usage.referrals_received += 1
                if self.connection.auto_referrals:
                    ref_response, ref_result = self.do_operation_on_referral(self._outstanding[message_id], responses[-2]['referrals'])
                    if ref_response is not None:
                        responses = ref_response + [ref_result]
                        responses.append(RESPONSE_COMPLETE)
                    elif ref_result is not None:
                        responses = [ref_result, RESPONSE_COMPLETE]
    
                    self._referrals = []
    
            if responses:
                result = responses[-2]
                response = responses[:-2]
                self.connection.result = None
                self.connection.response = None
    
            if self.connection.raise_exceptions and result and result['result'] not in DO_NOT_RAISE_EXCEPTIONS:
                if log_enabled(PROTOCOL):
                    log(PROTOCOL, 'operation result <%s> for <%s>', result, self.connection)
                self._outstanding.pop(message_id)
                self.connection.result = result.copy()
>               raise LDAPOperationResult(result=result['result'], description=result['description'], dn=result['dn'], message=result['message'], response_type=result['type'])
E               ldap3.core.exceptions.LDAPOperationsErrorResult: LDAPOperationsErrorResult - 1 - operationsError - None - None - bindResponse - None

.venv/lib/python3.13.../ldap3/strategy/base.py:403: LDAPOperationsErrorResult
tests.e2e.test_provider_ldap.TestProviderLDAP::test_ldap_schema
Stack Traces | 14s run time
self = <unittest.case._Outcome object at 0x7f5f3829c7c0>
test_case = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_schema>
subTest = False

    @contextlib.contextmanager
    def testPartExecutor(self, test_case, subTest=False):
        old_success = self.success
        self.success = True
        try:
>           yield

.../hostedtoolcache/Python/3.13.12........./x64/lib/python3.13/unittest/case.py:58: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_schema>
result = <TestCaseFunction test_ldap_schema>

    def run(self, result=None):
        if result is None:
            result = self.defaultTestResult()
            startTestRun = getattr(result, 'startTestRun', None)
            stopTestRun = getattr(result, 'stopTestRun', None)
            if startTestRun is not None:
                startTestRun()
        else:
            stopTestRun = None
    
        result.startTest(self)
        try:
            testMethod = getattr(self, self._testMethodName)
            if (getattr(self.__class__, "__unittest_skip__", False) or
                getattr(testMethod, "__unittest_skip__", False)):
                # If the class or method was skipped.
                skip_why = (getattr(self.__class__, '__unittest_skip_why__', '')
                            or getattr(testMethod, '__unittest_skip_why__', ''))
                _addSkip(result, self, skip_why)
                return result
    
            expecting_failure = (
                getattr(self, "__unittest_expecting_failure__", False) or
                getattr(testMethod, "__unittest_expecting_failure__", False)
            )
            outcome = _Outcome(result)
            start_time = time.perf_counter()
            try:
                self._outcome = outcome
    
                with outcome.testPartExecutor(self):
                    self._callSetUp()
                if outcome.success:
                    outcome.expecting_failure = expecting_failure
                    with outcome.testPartExecutor(self):
>                       self._callTestMethod(testMethod)

.../hostedtoolcache/Python/3.13.12........./x64/lib/python3.13/unittest/case.py:651: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_schema>
method = <bound method TestProviderLDAP.test_ldap_schema of <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_schema>>

    def _callTestMethod(self, method):
>       if method() is not None:

.../hostedtoolcache/Python/3.13.12........./x64/lib/python3.13/unittest/case.py:606: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_schema>
args = (), kwargs = {}

    @wraps(func)
    def wrapper(self: TransactionTestCase, *args, **kwargs):
        """Run test again if we're below max_retries, including tearDown and
        setUp. Otherwise raise the error"""
        nonlocal count
        try:
>           return func(self, *args, **kwargs)

tests/e2e/utils.py:324: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

args = (<tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_schema>,)
kwargs = {}, file = 'default/flow-default-invalidation-flow.yaml'
content = 'version: 1\nmetadata:\n  name: Default - Invalidation flow\nentries:\n- attrs:\n    designation: invalidation\n    na...0\n    stage: !KeyOf default-invalidation-logout\n    target: !KeyOf flow\n  model: authentik_flows.flowstagebinding\n'

    @wraps(func)
    def wrapper(*args, **kwargs):
        for file in files:
            content = BlueprintInstance(path=file).retrieve()
            Importer.from_string(content).apply()
>       return func(*args, **kwargs)

.../blueprints/tests/__init__.py:25: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

args = (<tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_schema>,)
kwargs = {}, config = <AuthentikTenantsConfig: authentik_tenants>

    @wraps(func)
    def wrapper(*args, **kwargs):
        config = apps.get_app_config(app_name)
        if isinstance(config, ManagedAppConfig):
            config._on_startup_callback(None)
>       return func(*args, **kwargs)

.../blueprints/tests/__init__.py:43: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

args = (<tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_schema>,)
kwargs = {}, config = <AuthentikOutpostConfig: authentik_outposts>

    @wraps(func)
    def wrapper(*args, **kwargs):
        config = apps.get_app_config(app_name)
        if isinstance(config, ManagedAppConfig):
            config._on_startup_callback(None)
>       return func(*args, **kwargs)

.../blueprints/tests/__init__.py:43: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_schema>

    @retry()
    @apply_blueprint(
        "default/flow-default-authentication-flow.yaml",
        "default/flow-default-invalidation-flow.yaml",
    )
    @reconcile_app("authentik_tenants")
    @reconcile_app("authentik_outposts")
    def test_ldap_schema(self):
        """Test LDAP Schema"""
        self._prepare()
        server = Server("ldap://localhost:3389", get_info=ALL)
        _connection = Connection(
            server,
            raise_exceptions=True,
            user=f"cn={self.user.username},ou=users,dc=ldap,dc=goauthentik,dc=io",
            password=self.user.username,
        )
>       _connection.bind()

tests/e2e/test_provider_ldap.py:425: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = Connection(server=Server(host='localhost', port=3389, use_ssl=False, allowed_referral_hosts=[('*', True)], get_info='A...oder=True, auto_range=True, return_empty_attributes=True, auto_encode=True, auto_escape=True, use_referral_cache=False)
read_server_info = True, controls = None

    def bind(self,
             read_server_info=True,
             controls=None):
        """Bind to ldap Server with the authentication method and the user defined in the connection
    
        :param read_server_info: reads info from server
        :param controls: LDAP controls to send along with the bind operation
        :type controls: list of tuple
        :return: bool
    
        """
        if log_enabled(BASIC):
            log(BASIC, 'start BIND operation via <%s>', self)
        self.last_error = None
        with self.connection_lock:
            if self.lazy and not self._executing_deferred:
                if self.strategy.pooled:
                    self.strategy.validate_bind(controls)
                self._deferred_bind = True
                self._bind_controls = controls
                self.bound = True
                if log_enabled(BASIC):
                    log(BASIC, 'deferring bind for <%s>', self)
            else:
                self._deferred_bind = False
                self._bind_controls = None
                if self.closed:  # try to open connection if closed
                    self.open(read_server_info=False)
                if self.authentication == ANONYMOUS:
                    if log_enabled(PROTOCOL):
                        log(PROTOCOL, 'performing anonymous BIND for <%s>', self)
                    if not self.strategy.pooled:
                        request = bind_operation(self.version, self.authentication, self.user, '', auto_encode=self.auto_encode)
                        if log_enabled(PROTOCOL):
                            log(PROTOCOL, 'anonymous BIND request <%s> sent via <%s>', bind_request_to_dict(request), self)
                        response = self.post_send_single_response(self.send('bindRequest', request, controls))
                    else:
                        response = self.strategy.validate_bind(controls)  # only for REUSABLE
                elif self.authentication == SIMPLE:
                    if log_enabled(PROTOCOL):
                        log(PROTOCOL, 'performing simple BIND for <%s>', self)
                    if not self.strategy.pooled:
                        request = bind_operation(self.version, self.authentication, self.user, self.password, auto_encode=self.auto_encode)
                        if log_enabled(PROTOCOL):
                            log(PROTOCOL, 'simple BIND request <%s> sent via <%s>', bind_request_to_dict(request), self)
>                       response = self.post_send_single_response(self.send('bindRequest', request, controls))

.venv/lib/python3.13.../ldap3/core/connection.py:607: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <ldap3.strategy.sync.SyncStrategy object at 0x7f5f3829caf0>
message_id = 12

    def post_send_single_response(self, message_id):
        """
        Executed after an Operation Request (except Search)
        Returns the result message or None
        """
>       responses, result = self.get_response(message_id)

.venv/lib/python3.13.../ldap3/strategy/sync.py:160: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <ldap3.strategy.sync.SyncStrategy object at 0x7f5f3829caf0>
message_id = 12, timeout = 20, get_request = False

    def get_response(self, message_id, timeout=None, get_request=False):
        """
        Get response LDAP messages
        Responses are returned by the underlying connection strategy
        Check if message_id LDAP message is still outstanding and wait for timeout to see if it appears in _get_response
        Result is stored in connection.result
        Responses without result is stored in connection.response
        A tuple (responses, result) is returned
        """
        if timeout is None:
            timeout = get_config_parameter('RESPONSE_WAITING_TIMEOUT')
        response = None
        result = None
        # request = None
        if self._outstanding and message_id in self._outstanding:
            responses = self._get_response(message_id, timeout)
    
            if not responses:
                if log_enabled(ERROR):
                    log(ERROR, 'socket timeout, no response from server for <%s>', self.connection)
                raise LDAPResponseTimeoutError('no response from server')
    
            if responses == SESSION_TERMINATED_BY_SERVER:
                try:  # try to close the session but don't raise any error if server has already closed the session
                    self.close()
                except (socket.error, LDAPExceptionError):
                    pass
                self.connection.last_error = 'session terminated by server'
                if log_enabled(ERROR):
                    log(ERROR, '<%s> for <%s>', self.connection.last_error, self.connection)
                raise LDAPSessionTerminatedByServerError(self.connection.last_error)
            elif responses == TRANSACTION_ERROR:  # Novell LDAP Transaction unsolicited notification
                self.connection.last_error = 'transaction error'
                if log_enabled(ERROR):
                    log(ERROR, '<%s> for <%s>', self.connection.last_error, self.connection)
                raise LDAPTransactionError(self.connection.last_error)
    
            # if referral in response opens a new connection to resolve referrals if requested
    
            if responses[-2]['result'] == RESULT_REFERRAL:
                if self.connection.usage:
                    self.connection._usage.referrals_received += 1
                if self.connection.auto_referrals:
                    ref_response, ref_result = self.do_operation_on_referral(self._outstanding[message_id], responses[-2]['referrals'])
                    if ref_response is not None:
                        responses = ref_response + [ref_result]
                        responses.append(RESPONSE_COMPLETE)
                    elif ref_result is not None:
                        responses = [ref_result, RESPONSE_COMPLETE]
    
                    self._referrals = []
    
            if responses:
                result = responses[-2]
                response = responses[:-2]
                self.connection.result = None
                self.connection.response = None
    
            if self.connection.raise_exceptions and result and result['result'] not in DO_NOT_RAISE_EXCEPTIONS:
                if log_enabled(PROTOCOL):
                    log(PROTOCOL, 'operation result <%s> for <%s>', result, self.connection)
                self._outstanding.pop(message_id)
                self.connection.result = result.copy()
>               raise LDAPOperationResult(result=result['result'], description=result['description'], dn=result['dn'], message=result['message'], response_type=result['type'])
E               ldap3.core.exceptions.LDAPOperationsErrorResult: LDAPOperationsErrorResult - 1 - operationsError - None - None - bindResponse - None

.venv/lib/python3.13.../ldap3/strategy/base.py:403: LDAPOperationsErrorResult
tests.e2e.test_provider_ldap.TestProviderLDAP::test_ldap_search_attrs_filter
Stack Traces | 15.7s run time
self = <unittest.case._Outcome object at 0x7f5f441a87c0>
test_case = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_search_attrs_filter>
subTest = False

    @contextlib.contextmanager
    def testPartExecutor(self, test_case, subTest=False):
        old_success = self.success
        self.success = True
        try:
>           yield

.../hostedtoolcache/Python/3.13.12........./x64/lib/python3.13/unittest/case.py:58: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_search_attrs_filter>
result = <TestCaseFunction test_ldap_search_attrs_filter>

    def run(self, result=None):
        if result is None:
            result = self.defaultTestResult()
            startTestRun = getattr(result, 'startTestRun', None)
            stopTestRun = getattr(result, 'stopTestRun', None)
            if startTestRun is not None:
                startTestRun()
        else:
            stopTestRun = None
    
        result.startTest(self)
        try:
            testMethod = getattr(self, self._testMethodName)
            if (getattr(self.__class__, "__unittest_skip__", False) or
                getattr(testMethod, "__unittest_skip__", False)):
                # If the class or method was skipped.
                skip_why = (getattr(self.__class__, '__unittest_skip_why__', '')
                            or getattr(testMethod, '__unittest_skip_why__', ''))
                _addSkip(result, self, skip_why)
                return result
    
            expecting_failure = (
                getattr(self, "__unittest_expecting_failure__", False) or
                getattr(testMethod, "__unittest_expecting_failure__", False)
            )
            outcome = _Outcome(result)
            start_time = time.perf_counter()
            try:
                self._outcome = outcome
    
                with outcome.testPartExecutor(self):
                    self._callSetUp()
                if outcome.success:
                    outcome.expecting_failure = expecting_failure
                    with outcome.testPartExecutor(self):
>                       self._callTestMethod(testMethod)

.../hostedtoolcache/Python/3.13.12........./x64/lib/python3.13/unittest/case.py:651: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_search_attrs_filter>
method = <bound method TestProviderLDAP.test_ldap_search_attrs_filter of <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_search_attrs_filter>>

    def _callTestMethod(self, method):
>       if method() is not None:

.../hostedtoolcache/Python/3.13.12........./x64/lib/python3.13/unittest/case.py:606: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_search_attrs_filter>
args = (), kwargs = {}

    @wraps(func)
    def wrapper(self: TransactionTestCase, *args, **kwargs):
        """Run test again if we're below max_retries, including tearDown and
        setUp. Otherwise raise the error"""
        nonlocal count
        try:
>           return func(self, *args, **kwargs)

tests/e2e/utils.py:324: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

args = (<tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_search_attrs_filter>,)
kwargs = {}, file = 'default/flow-default-invalidation-flow.yaml'
content = 'version: 1\nmetadata:\n  name: Default - Invalidation flow\nentries:\n- attrs:\n    designation: invalidation\n    na...0\n    stage: !KeyOf default-invalidation-logout\n    target: !KeyOf flow\n  model: authentik_flows.flowstagebinding\n'

    @wraps(func)
    def wrapper(*args, **kwargs):
        for file in files:
            content = BlueprintInstance(path=file).retrieve()
            Importer.from_string(content).apply()
>       return func(*args, **kwargs)

.../blueprints/tests/__init__.py:25: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

args = (<tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_search_attrs_filter>,)
kwargs = {}, config = <AuthentikTenantsConfig: authentik_tenants>

    @wraps(func)
    def wrapper(*args, **kwargs):
        config = apps.get_app_config(app_name)
        if isinstance(config, ManagedAppConfig):
            config._on_startup_callback(None)
>       return func(*args, **kwargs)

.../blueprints/tests/__init__.py:43: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

args = (<tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_search_attrs_filter>,)
kwargs = {}, config = <AuthentikOutpostConfig: authentik_outposts>

    @wraps(func)
    def wrapper(*args, **kwargs):
        config = apps.get_app_config(app_name)
        if isinstance(config, ManagedAppConfig):
            config._on_startup_callback(None)
>       return func(*args, **kwargs)

.../blueprints/tests/__init__.py:43: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_search_attrs_filter>

    @retry()
    @apply_blueprint(
        "default/flow-default-authentication-flow.yaml",
        "default/flow-default-invalidation-flow.yaml",
    )
    @reconcile_app("authentik_tenants")
    @reconcile_app("authentik_outposts")
    def test_ldap_search_attrs_filter(self):
        """Test search with attributes filtering"""
        # Remove akadmin to ensure list is correct
        # Remove user before starting container so it's not cached
        User.objects.filter(username="akadmin").delete()
    
        outpost = self._prepare()
        server = Server("ldap://localhost:3389", get_info=ALL)
        _connection = Connection(
            server,
            raise_exceptions=True,
            user=f"cn={self.user.username},ou=users,dc=ldap,dc=goauthentik,dc=io",
            password=self.user.username,
        )
>       _connection.bind()

tests/e2e/test_provider_ldap.py:451: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = Connection(server=Server(host='localhost', port=3389, use_ssl=False, allowed_referral_hosts=[('*', True)], get_info='A...oder=True, auto_range=True, return_empty_attributes=True, auto_encode=True, auto_escape=True, use_referral_cache=False)
read_server_info = True, controls = None

    def bind(self,
             read_server_info=True,
             controls=None):
        """Bind to ldap Server with the authentication method and the user defined in the connection
    
        :param read_server_info: reads info from server
        :param controls: LDAP controls to send along with the bind operation
        :type controls: list of tuple
        :return: bool
    
        """
        if log_enabled(BASIC):
            log(BASIC, 'start BIND operation via <%s>', self)
        self.last_error = None
        with self.connection_lock:
            if self.lazy and not self._executing_deferred:
                if self.strategy.pooled:
                    self.strategy.validate_bind(controls)
                self._deferred_bind = True
                self._bind_controls = controls
                self.bound = True
                if log_enabled(BASIC):
                    log(BASIC, 'deferring bind for <%s>', self)
            else:
                self._deferred_bind = False
                self._bind_controls = None
                if self.closed:  # try to open connection if closed
                    self.open(read_server_info=False)
                if self.authentication == ANONYMOUS:
                    if log_enabled(PROTOCOL):
                        log(PROTOCOL, 'performing anonymous BIND for <%s>', self)
                    if not self.strategy.pooled:
                        request = bind_operation(self.version, self.authentication, self.user, '', auto_encode=self.auto_encode)
                        if log_enabled(PROTOCOL):
                            log(PROTOCOL, 'anonymous BIND request <%s> sent via <%s>', bind_request_to_dict(request), self)
                        response = self.post_send_single_response(self.send('bindRequest', request, controls))
                    else:
                        response = self.strategy.validate_bind(controls)  # only for REUSABLE
                elif self.authentication == SIMPLE:
                    if log_enabled(PROTOCOL):
                        log(PROTOCOL, 'performing simple BIND for <%s>', self)
                    if not self.strategy.pooled:
                        request = bind_operation(self.version, self.authentication, self.user, self.password, auto_encode=self.auto_encode)
                        if log_enabled(PROTOCOL):
                            log(PROTOCOL, 'simple BIND request <%s> sent via <%s>', bind_request_to_dict(request), self)
>                       response = self.post_send_single_response(self.send('bindRequest', request, controls))

.venv/lib/python3.13.../ldap3/core/connection.py:607: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <ldap3.strategy.sync.SyncStrategy object at 0x7f5f3829d7b0>
message_id = 13

    def post_send_single_response(self, message_id):
        """
        Executed after an Operation Request (except Search)
        Returns the result message or None
        """
>       responses, result = self.get_response(message_id)

.venv/lib/python3.13.../ldap3/strategy/sync.py:160: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <ldap3.strategy.sync.SyncStrategy object at 0x7f5f3829d7b0>
message_id = 13, timeout = 20, get_request = False

    def get_response(self, message_id, timeout=None, get_request=False):
        """
        Get response LDAP messages
        Responses are returned by the underlying connection strategy
        Check if message_id LDAP message is still outstanding and wait for timeout to see if it appears in _get_response
        Result is stored in connection.result
        Responses without result is stored in connection.response
        A tuple (responses, result) is returned
        """
        if timeout is None:
            timeout = get_config_parameter('RESPONSE_WAITING_TIMEOUT')
        response = None
        result = None
        # request = None
        if self._outstanding and message_id in self._outstanding:
            responses = self._get_response(message_id, timeout)
    
            if not responses:
                if log_enabled(ERROR):
                    log(ERROR, 'socket timeout, no response from server for <%s>', self.connection)
                raise LDAPResponseTimeoutError('no response from server')
    
            if responses == SESSION_TERMINATED_BY_SERVER:
                try:  # try to close the session but don't raise any error if server has already closed the session
                    self.close()
                except (socket.error, LDAPExceptionError):
                    pass
                self.connection.last_error = 'session terminated by server'
                if log_enabled(ERROR):
                    log(ERROR, '<%s> for <%s>', self.connection.last_error, self.connection)
                raise LDAPSessionTerminatedByServerError(self.connection.last_error)
            elif responses == TRANSACTION_ERROR:  # Novell LDAP Transaction unsolicited notification
                self.connection.last_error = 'transaction error'
                if log_enabled(ERROR):
                    log(ERROR, '<%s> for <%s>', self.connection.last_error, self.connection)
                raise LDAPTransactionError(self.connection.last_error)
    
            # if referral in response opens a new connection to resolve referrals if requested
    
            if responses[-2]['result'] == RESULT_REFERRAL:
                if self.connection.usage:
                    self.connection._usage.referrals_received += 1
                if self.connection.auto_referrals:
                    ref_response, ref_result = self.do_operation_on_referral(self._outstanding[message_id], responses[-2]['referrals'])
                    if ref_response is not None:
                        responses = ref_response + [ref_result]
                        responses.append(RESPONSE_COMPLETE)
                    elif ref_result is not None:
                        responses = [ref_result, RESPONSE_COMPLETE]
    
                    self._referrals = []
    
            if responses:
                result = responses[-2]
                response = responses[:-2]
                self.connection.result = None
                self.connection.response = None
    
            if self.connection.raise_exceptions and result and result['result'] not in DO_NOT_RAISE_EXCEPTIONS:
                if log_enabled(PROTOCOL):
                    log(PROTOCOL, 'operation result <%s> for <%s>', result, self.connection)
                self._outstanding.pop(message_id)
                self.connection.result = result.copy()
>               raise LDAPOperationResult(result=result['result'], description=result['description'], dn=result['dn'], message=result['message'], response_type=result['type'])
E               ldap3.core.exceptions.LDAPOperationsErrorResult: LDAPOperationsErrorResult - 1 - operationsError - None - None - bindResponse - None

.venv/lib/python3.13.../ldap3/strategy/base.py:403: LDAPOperationsErrorResult
tests.integration.test_outpost_docker.OutpostDockerTests::test_docker_controller
Stack Traces | 47.6s run time
self = <unittest.case._Outcome object at 0x7f507cd03c50>
test_case = <tests.integration.test_outpost_docker.OutpostDockerTests testMethod=test_docker_controller>
subTest = False

    @contextlib.contextmanager
    def testPartExecutor(self, test_case, subTest=False):
        old_success = self.success
        self.success = True
        try:
>           yield

.../hostedtoolcache/Python/3.13.12........./x64/lib/python3.13/unittest/case.py:58: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.integration.test_outpost_docker.OutpostDockerTests testMethod=test_docker_controller>
result = <TestCaseFunction test_docker_controller>

    def run(self, result=None):
        if result is None:
            result = self.defaultTestResult()
            startTestRun = getattr(result, 'startTestRun', None)
            stopTestRun = getattr(result, 'stopTestRun', None)
            if startTestRun is not None:
                startTestRun()
        else:
            stopTestRun = None
    
        result.startTest(self)
        try:
            testMethod = getattr(self, self._testMethodName)
            if (getattr(self.__class__, "__unittest_skip__", False) or
                getattr(testMethod, "__unittest_skip__", False)):
                # If the class or method was skipped.
                skip_why = (getattr(self.__class__, '__unittest_skip_why__', '')
                            or getattr(testMethod, '__unittest_skip_why__', ''))
                _addSkip(result, self, skip_why)
                return result
    
            expecting_failure = (
                getattr(self, "__unittest_expecting_failure__", False) or
                getattr(testMethod, "__unittest_expecting_failure__", False)
            )
            outcome = _Outcome(result)
            start_time = time.perf_counter()
            try:
                self._outcome = outcome
    
                with outcome.testPartExecutor(self):
>                   self._callSetUp()

.../hostedtoolcache/Python/3.13.12........./x64/lib/python3.13/unittest/case.py:647: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.integration.test_outpost_docker.OutpostDockerTests testMethod=test_docker_controller>

    def _callSetUp(self):
>       self.setUp()

.../hostedtoolcache/Python/3.13.12........./x64/lib/python3.13/unittest/case.py:603: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.integration.test_outpost_docker.OutpostDockerTests testMethod=test_docker_controller>

    def setUp(self):
        super().setUp()
        self.ssl_folder = mkdtemp()
>       self.run_container(
            image="docker.io/library/docker:28.5.2-dind-alpine3.22",
            network_mode="host",
            privileged=True,
            healthcheck=Healthcheck(
                test=["CMD", "docker", "info"],
                interval=5 * 1_000 * 1_000_000,
                start_period=5 * 1_000 * 1_000_000,
            ),
            environment={"DOCKER_TLS_CERTDIR": "/ssl"},
            volumes={
                f"{self.ssl_folder}/": {
                    "bind": "/ssl",
                }
            },
        )

tests/integration/test_outpost_docker.py:32: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.integration.test_outpost_docker.OutpostDockerTests testMethod=test_docker_controller>
specs = {'detach': True, 'environment': {'AUTHENTIK_HOST': 'http://localhost:45601', 'DOCKER_TLS_CERTDIR': '/ssl'}, 'healthche...t': None, 'Retries': None, 'StartPeriod': 5000000000}, 'image': 'docker.io/library/docker:28.5.2-dind-alpine3.22', ...}
container = <Container: de2c30d30596>
state = {'Dead': False, 'Error': '', 'ExitCode': 0, 'FinishedAt': '0001-01-01T00:00:00Z', ...}

    def run_container(self, **specs: dict[str, Any]) -> Container:
        if "network_mode" not in specs:
            specs["network"] = self.__network.name
        specs["labels"] = self.docker_labels
        specs["detach"] = True
        if hasattr(self, "live_server_url"):
            specs.setdefault("environment", {})
            specs["environment"]["AUTHENTIK_HOST"] = self.live_server_url
        container = self.docker_client.containers.run(**specs)
        container.reload()
        state = container.attrs.get("State", {})
        if "Health" not in state:
            return container
>       self.wait_for_container(container)

tests/e2e/utils.py:117: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.integration.test_outpost_docker.OutpostDockerTests testMethod=test_docker_controller>
container = <Container: de2c30d30596>

    def wait_for_container(self, container: Container):
        """Check that container is health"""
        attempt = 0
        while True:
            container.reload()
            status = container.attrs.get("State", {}).get("Health", {}).get("Status")
            if status == "healthy":
                return container
            sleep(1)
            attempt += 1
            if attempt >= self.max_healthcheck_attempts:
>               raise self.failureException("Container failed to start")
E               AssertionError: Container failed to start

tests/e2e/utils.py:91: AssertionError
tests.integration.test_outpost_docker.OutpostDockerTests::test_docker_static
Stack Traces | 49.6s run time
self = <unittest.case._Outcome object at 0x7f507b473550>
test_case = <tests.integration.test_outpost_docker.OutpostDockerTests testMethod=test_docker_static>
subTest = False

    @contextlib.contextmanager
    def testPartExecutor(self, test_case, subTest=False):
        old_success = self.success
        self.success = True
        try:
>           yield

.../hostedtoolcache/Python/3.13.12........./x64/lib/python3.13/unittest/case.py:58: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.integration.test_outpost_docker.OutpostDockerTests testMethod=test_docker_static>
result = <TestCaseFunction test_docker_static>

    def run(self, result=None):
        if result is None:
            result = self.defaultTestResult()
            startTestRun = getattr(result, 'startTestRun', None)
            stopTestRun = getattr(result, 'stopTestRun', None)
            if startTestRun is not None:
                startTestRun()
        else:
            stopTestRun = None
    
        result.startTest(self)
        try:
            testMethod = getattr(self, self._testMethodName)
            if (getattr(self.__class__, "__unittest_skip__", False) or
                getattr(testMethod, "__unittest_skip__", False)):
                # If the class or method was skipped.
                skip_why = (getattr(self.__class__, '__unittest_skip_why__', '')
                            or getattr(testMethod, '__unittest_skip_why__', ''))
                _addSkip(result, self, skip_why)
                return result
    
            expecting_failure = (
                getattr(self, "__unittest_expecting_failure__", False) or
                getattr(testMethod, "__unittest_expecting_failure__", False)
            )
            outcome = _Outcome(result)
            start_time = time.perf_counter()
            try:
                self._outcome = outcome
    
                with outcome.testPartExecutor(self):
>                   self._callSetUp()

.../hostedtoolcache/Python/3.13.12........./x64/lib/python3.13/unittest/case.py:647: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.integration.test_outpost_docker.OutpostDockerTests testMethod=test_docker_static>

    def _callSetUp(self):
>       self.setUp()

.../hostedtoolcache/Python/3.13.12........./x64/lib/python3.13/unittest/case.py:603: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.integration.test_outpost_docker.OutpostDockerTests testMethod=test_docker_static>

    def setUp(self):
        super().setUp()
        self.ssl_folder = mkdtemp()
>       self.run_container(
            image="docker.io/library/docker:28.5.2-dind-alpine3.22",
            network_mode="host",
            privileged=True,
            healthcheck=Healthcheck(
                test=["CMD", "docker", "info"],
                interval=5 * 1_000 * 1_000_000,
                start_period=5 * 1_000 * 1_000_000,
            ),
            environment={"DOCKER_TLS_CERTDIR": "/ssl"},
            volumes={
                f"{self.ssl_folder}/": {
                    "bind": "/ssl",
                }
            },
        )

tests/integration/test_outpost_docker.py:32: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.integration.test_outpost_docker.OutpostDockerTests testMethod=test_docker_static>
specs = {'detach': True, 'environment': {'AUTHENTIK_HOST': 'http://localhost:45601', 'DOCKER_TLS_CERTDIR': '/ssl'}, 'healthche...t': None, 'Retries': None, 'StartPeriod': 5000000000}, 'image': 'docker.io/library/docker:28.5.2-dind-alpine3.22', ...}
container = <Container: e33391cfdbaa>
state = {'Dead': False, 'Error': '', 'ExitCode': 0, 'FinishedAt': '0001-01-01T00:00:00Z', ...}

    def run_container(self, **specs: dict[str, Any]) -> Container:
        if "network_mode" not in specs:
            specs["network"] = self.__network.name
        specs["labels"] = self.docker_labels
        specs["detach"] = True
        if hasattr(self, "live_server_url"):
            specs.setdefault("environment", {})
            specs["environment"]["AUTHENTIK_HOST"] = self.live_server_url
        container = self.docker_client.containers.run(**specs)
        container.reload()
        state = container.attrs.get("State", {})
        if "Health" not in state:
            return container
>       self.wait_for_container(container)

tests/e2e/utils.py:117: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.integration.test_outpost_docker.OutpostDockerTests testMethod=test_docker_static>
container = <Container: e33391cfdbaa>

    def wait_for_container(self, container: Container):
        """Check that container is health"""
        attempt = 0
        while True:
            container.reload()
            status = container.attrs.get("State", {}).get("Health", {}).get("Status")
            if status == "healthy":
                return container
            sleep(1)
            attempt += 1
            if attempt >= self.max_healthcheck_attempts:
>               raise self.failureException("Container failed to start")
E               AssertionError: Container failed to start

tests/e2e/utils.py:91: AssertionError
tests.e2e.test_provider_ldap.TestProviderLDAP::test_ldap_bind_search_no_perms
Stack Traces | 100s run time
self = <unittest.case._Outcome object at 0x7f5f3d4bfcb0>
test_case = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_search_no_perms>
subTest = False

    @contextlib.contextmanager
    def testPartExecutor(self, test_case, subTest=False):
        old_success = self.success
        self.success = True
        try:
>           yield

.../hostedtoolcache/Python/3.13.12........./x64/lib/python3.13/unittest/case.py:58: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_search_no_perms>
result = <TestCaseFunction test_ldap_bind_search_no_perms>

    def run(self, result=None):
        if result is None:
            result = self.defaultTestResult()
            startTestRun = getattr(result, 'startTestRun', None)
            stopTestRun = getattr(result, 'stopTestRun', None)
            if startTestRun is not None:
                startTestRun()
        else:
            stopTestRun = None
    
        result.startTest(self)
        try:
            testMethod = getattr(self, self._testMethodName)
            if (getattr(self.__class__, "__unittest_skip__", False) or
                getattr(testMethod, "__unittest_skip__", False)):
                # If the class or method was skipped.
                skip_why = (getattr(self.__class__, '__unittest_skip_why__', '')
                            or getattr(testMethod, '__unittest_skip_why__', ''))
                _addSkip(result, self, skip_why)
                return result
    
            expecting_failure = (
                getattr(self, "__unittest_expecting_failure__", False) or
                getattr(testMethod, "__unittest_expecting_failure__", False)
            )
            outcome = _Outcome(result)
            start_time = time.perf_counter()
            try:
                self._outcome = outcome
    
                with outcome.testPartExecutor(self):
                    self._callSetUp()
                if outcome.success:
                    outcome.expecting_failure = expecting_failure
                    with outcome.testPartExecutor(self):
>                       self._callTestMethod(testMethod)

.../hostedtoolcache/Python/3.13.12........./x64/lib/python3.13/unittest/case.py:651: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_search_no_perms>
method = <bound method TestProviderLDAP.test_ldap_bind_search_no_perms of <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_search_no_perms>>

    def _callTestMethod(self, method):
>       if method() is not None:

.../hostedtoolcache/Python/3.13.12........./x64/lib/python3.13/unittest/case.py:606: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_search_no_perms>
args = (), kwargs = {}

    @wraps(func)
    def wrapper(self: TransactionTestCase, *args, **kwargs):
        """Run test again if we're below max_retries, including tearDown and
        setUp. Otherwise raise the error"""
        nonlocal count
        try:
>           return func(self, *args, **kwargs)

tests/e2e/utils.py:324: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

args = (<tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_search_no_perms>,)
kwargs = {}, file = 'default/flow-default-invalidation-flow.yaml'
content = 'version: 1\nmetadata:\n  name: Default - Invalidation flow\nentries:\n- attrs:\n    designation: invalidation\n    na...0\n    stage: !KeyOf default-invalidation-logout\n    target: !KeyOf flow\n  model: authentik_flows.flowstagebinding\n'

    @wraps(func)
    def wrapper(*args, **kwargs):
        for file in files:
            content = BlueprintInstance(path=file).retrieve()
            Importer.from_string(content).apply()
>       return func(*args, **kwargs)

.../blueprints/tests/__init__.py:25: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

args = (<tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_search_no_perms>,)
kwargs = {}, config = <AuthentikTenantsConfig: authentik_tenants>

    @wraps(func)
    def wrapper(*args, **kwargs):
        config = apps.get_app_config(app_name)
        if isinstance(config, ManagedAppConfig):
            config._on_startup_callback(None)
>       return func(*args, **kwargs)

.../blueprints/tests/__init__.py:43: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

args = (<tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_search_no_perms>,)
kwargs = {}, config = <AuthentikOutpostConfig: authentik_outposts>

    @wraps(func)
    def wrapper(*args, **kwargs):
        config = apps.get_app_config(app_name)
        if isinstance(config, ManagedAppConfig):
            config._on_startup_callback(None)
>       return func(*args, **kwargs)

.../blueprints/tests/__init__.py:43: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_search_no_perms>

    @retry()
    @apply_blueprint(
        "default/flow-default-authentication-flow.yaml",
        "default/flow-default-invalidation-flow.yaml",
    )
    @reconcile_app("authentik_tenants")
    @reconcile_app("authentik_outposts")
    def test_ldap_bind_search_no_perms(self):
        """Test simple bind + search"""
        user = create_test_user()
        self._prepare()
        server = Server("ldap://localhost:3389", get_info=ALL)
        _connection = Connection(
            server,
            raise_exceptions=True,
            user=f"cn={user.username},ou=users,dc=ldap,dc=goauthentik,dc=io",
            password=user.username,
        )
>       _connection.bind()

tests/e2e/test_provider_ldap.py:337: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = Connection(server=Server(host='localhost', port=3389, use_ssl=False, allowed_referral_hosts=[('*', True)], get_info='A...oder=True, auto_range=True, return_empty_attributes=True, auto_encode=True, auto_escape=True, use_referral_cache=False)
read_server_info = True, controls = None

    def bind(self,
             read_server_info=True,
             controls=None):
        """Bind to ldap Server with the authentication method and the user defined in the connection
    
        :param read_server_info: reads info from server
        :param controls: LDAP controls to send along with the bind operation
        :type controls: list of tuple
        :return: bool
    
        """
        if log_enabled(BASIC):
            log(BASIC, 'start BIND operation via <%s>', self)
        self.last_error = None
        with self.connection_lock:
            if self.lazy and not self._executing_deferred:
                if self.strategy.pooled:
                    self.strategy.validate_bind(controls)
                self._deferred_bind = True
                self._bind_controls = controls
                self.bound = True
                if log_enabled(BASIC):
                    log(BASIC, 'deferring bind for <%s>', self)
            else:
                self._deferred_bind = False
                self._bind_controls = None
                if self.closed:  # try to open connection if closed
                    self.open(read_server_info=False)
                if self.authentication == ANONYMOUS:
                    if log_enabled(PROTOCOL):
                        log(PROTOCOL, 'performing anonymous BIND for <%s>', self)
                    if not self.strategy.pooled:
                        request = bind_operation(self.version, self.authentication, self.user, '', auto_encode=self.auto_encode)
                        if log_enabled(PROTOCOL):
                            log(PROTOCOL, 'anonymous BIND request <%s> sent via <%s>', bind_request_to_dict(request), self)
                        response = self.post_send_single_response(self.send('bindRequest', request, controls))
                    else:
                        response = self.strategy.validate_bind(controls)  # only for REUSABLE
                elif self.authentication == SIMPLE:
                    if log_enabled(PROTOCOL):
                        log(PROTOCOL, 'performing simple BIND for <%s>', self)
                    if not self.strategy.pooled:
                        request = bind_operation(self.version, self.authentication, self.user, self.password, auto_encode=self.auto_encode)
                        if log_enabled(PROTOCOL):
                            log(PROTOCOL, 'simple BIND request <%s> sent via <%s>', bind_request_to_dict(request), self)
>                       response = self.post_send_single_response(self.send('bindRequest', request, controls))

.venv/lib/python3.13.../ldap3/core/connection.py:607: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <ldap3.strategy.sync.SyncStrategy object at 0x7f5f441986e0>
message_id = 1

    def post_send_single_response(self, message_id):
        """
        Executed after an Operation Request (except Search)
        Returns the result message or None
        """
>       responses, result = self.get_response(message_id)

.venv/lib/python3.13.../ldap3/strategy/sync.py:160: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <ldap3.strategy.sync.SyncStrategy object at 0x7f5f441986e0>
message_id = 1, timeout = 20, get_request = False

    def get_response(self, message_id, timeout=None, get_request=False):
        """
        Get response LDAP messages
        Responses are returned by the underlying connection strategy
        Check if message_id LDAP message is still outstanding and wait for timeout to see if it appears in _get_response
        Result is stored in connection.result
        Responses without result is stored in connection.response
        A tuple (responses, result) is returned
        """
        if timeout is None:
            timeout = get_config_parameter('RESPONSE_WAITING_TIMEOUT')
        response = None
        result = None
        # request = None
        if self._outstanding and message_id in self._outstanding:
            responses = self._get_response(message_id, timeout)
    
            if not responses:
                if log_enabled(ERROR):
                    log(ERROR, 'socket timeout, no response from server for <%s>', self.connection)
                raise LDAPResponseTimeoutError('no response from server')
    
            if responses == SESSION_TERMINATED_BY_SERVER:
                try:  # try to close the session but don't raise any error if server has already closed the session
                    self.close()
                except (socket.error, LDAPExceptionError):
                    pass
                self.connection.last_error = 'session terminated by server'
                if log_enabled(ERROR):
                    log(ERROR, '<%s> for <%s>', self.connection.last_error, self.connection)
                raise LDAPSessionTerminatedByServerError(self.connection.last_error)
            elif responses == TRANSACTION_ERROR:  # Novell LDAP Transaction unsolicited notification
                self.connection.last_error = 'transaction error'
                if log_enabled(ERROR):
                    log(ERROR, '<%s> for <%s>', self.connection.last_error, self.connection)
                raise LDAPTransactionError(self.connection.last_error)
    
            # if referral in response opens a new connection to resolve referrals if requested
    
            if responses[-2]['result'] == RESULT_REFERRAL:
                if self.connection.usage:
                    self.connection._usage.referrals_received += 1
                if self.connection.auto_referrals:
                    ref_response, ref_result = self.do_operation_on_referral(self._outstanding[message_id], responses[-2]['referrals'])
                    if ref_response is not None:
                        responses = ref_response + [ref_result]
                        responses.append(RESPONSE_COMPLETE)
                    elif ref_result is not None:
                        responses = [ref_result, RESPONSE_COMPLETE]
    
                    self._referrals = []
    
            if responses:
                result = responses[-2]
                response = responses[:-2]
                self.connection.result = None
                self.connection.response = None
    
            if self.connection.raise_exceptions and result and result['result'] not in DO_NOT_RAISE_EXCEPTIONS:
                if log_enabled(PROTOCOL):
                    log(PROTOCOL, 'operation result <%s> for <%s>', result, self.connection)
                self._outstanding.pop(message_id)
                self.connection.result = result.copy()
>               raise LDAPOperationResult(result=result['result'], description=result['description'], dn=result['dn'], message=result['message'], response_type=result['type'])
E               ldap3.core.exceptions.LDAPOperationsErrorResult: LDAPOperationsErrorResult - 1 - operationsError - None - None - bindResponse - None

.venv/lib/python3.13.../ldap3/strategy/base.py:403: LDAPOperationsErrorResult
tests.integration.test_proxy_docker.TestProxyDocker::test_docker_controller
Stack Traces | 120s run time
cls = <class '_pytest.runner.CallInfo'>
func = <function call_and_report.<locals>.<lambda> at 0x7f507b7c4ae0>
when = 'call'
reraise = (<class '_pytest.outcomes.Exit'>, <class 'KeyboardInterrupt'>)

    @classmethod
    def from_call(
        cls,
        func: Callable[[], TResult],
        when: Literal["collect", "setup", "call", "teardown"],
        reraise: type[BaseException] | tuple[type[BaseException], ...] | None = None,
    ) -> CallInfo[TResult]:
        """Call func, wrapping the result in a CallInfo.
    
        :param func:
            The function to call. Called without arguments.
        :type func: Callable[[], _pytest.runner.TResult]
        :param when:
            The phase in which the function is called.
        :param reraise:
            Exception or exceptions that shall propagate if raised by the
            function, instead of being wrapped in the CallInfo.
        """
        excinfo = None
        start = timing.time()
        precise_start = timing.perf_counter()
        try:
>           result: TResult | None = func()

.venv/lib/python3.13........./site-packages/_pytest/runner.py:341: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

>       lambda: runtest_hook(item=item, **kwds), when=when, reraise=reraise
    )

.venv/lib/python3.13........./site-packages/_pytest/runner.py:242: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <HookCaller 'pytest_runtest_call'>
kwargs = {'item': <TestCaseFunction test_docker_controller>}
firstresult = False

    def __call__(self, **kwargs: object) -> Any:
        """Call the hook.
    
        Only accepts keyword arguments, which should match the hook
        specification.
    
        Returns the result(s) of calling all registered plugins, see
        :ref:`calling`.
        """
        assert not self.is_historic(), (
            "Cannot directly call a historic hook - use call_historic instead."
        )
        self._verify_all_args_are_provided(kwargs)
        firstresult = self.spec.opts.get("firstresult", False) if self.spec else False
        # Copy because plugins may register other plugins during iteration (#438).
>       return self._hookexec(self.name, self._hookimpls.copy(), kwargs, firstresult)

.venv/lib/python3.13.../site-packages/pluggy/_hooks.py:512: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <_pytest.config.PytestPluginManager object at 0x7f50805938c0>
hook_name = 'pytest_runtest_call'
methods = [<HookImpl plugin_name='runner', plugin=<module '_pytest.runner' from '.../authentik/authentik/.venv/lib...=None>>, <HookImpl plugin_name='logging-plugin', plugin=<_pytest.logging.LoggingPlugin object at 0x7f507d236120>>, ...]
kwargs = {'item': <TestCaseFunction test_docker_controller>}
firstresult = False

    def _hookexec(
        self,
        hook_name: str,
        methods: Sequence[HookImpl],
        kwargs: Mapping[str, object],
        firstresult: bool,
    ) -> object | list[object]:
        # called from all hookcaller instances.
        # enable_tracing will set its own wrapping function at self._inner_hookexec
>       return self._inner_hookexec(hook_name, methods, kwargs, firstresult)

.venv/lib/python3.13.../site-packages/pluggy/_manager.py:120: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

hook_name = 'pytest_runtest_call'
hook_impls = [<HookImpl plugin_name='runner', plugin=<module '_pytest.runner' from '.../authentik/authentik/.venv/lib...=None>>, <HookImpl plugin_name='logging-plugin', plugin=<_pytest.logging.LoggingPlugin object at 0x7f507d236120>>, ...]
caller_kwargs = {'item': <TestCaseFunction test_docker_controller>}
firstresult = False

    def _multicall(
        hook_name: str,
        hook_impls: Sequence[HookImpl],
        caller_kwargs: Mapping[str, object],
        firstresult: bool,
    ) -> object | list[object]:
        """Execute a call into multiple python functions/methods and return the
        result(s).
    
        ``caller_kwargs`` comes from HookCaller.__call__().
        """
        __tracebackhide__ = True
        results: list[object] = []
        exception = None
        try:  # run impl and wrapper setup functions in a loop
            teardowns: list[Teardown] = []
            try:
                for hook_impl in reversed(hook_impls):
                    try:
                        args = [caller_kwargs[argname] for argname in hook_impl.argnames]
                    except KeyError as e:
                        # coverage bug - this is tested
                        for argname in hook_impl.argnames:  # pragma: no cover
                            if argname not in caller_kwargs:
                                raise HookCallError(
                                    f"hook call must provide argument {argname!r}"
                                ) from e
    
                    if hook_impl.hookwrapper:
                        function_gen = run_old_style_hookwrapper(hook_impl, hook_name, args)
    
                        next(function_gen)  # first yield
                        teardowns.append(function_gen)
    
                    elif hook_impl.wrapper:
                        try:
                            # If this cast is not valid, a type error is raised below,
                            # which is the desired response.
                            res = hook_impl.function(*args)
                            function_gen = cast(Generator[None, object, object], res)
                            next(function_gen)  # first yield
                            teardowns.append(function_gen)
                        except StopIteration:
                            _raise_wrapfail(function_gen, "did not yield")
                    else:
                        res = hook_impl.function(*args)
                        if res is not None:
                            results.append(res)
                            if firstresult:  # halt further impl calls
                                break
            except BaseException as exc:
                exception = exc
        finally:
            if firstresult:  # first result hooks return a single value
                result = results[0] if results else None
            else:
                result = results
    
            # run all wrapper post-yield blocks
            for teardown in reversed(teardowns):
                try:
                    if exception is not None:
                        try:
                            teardown.throw(exception)
                        except RuntimeError as re:
                            # StopIteration from generator causes RuntimeError
                            # even for coroutine usage - see #544
                            if (
                                isinstance(exception, StopIteration)
                                and re.__cause__ is exception
                            ):
                                teardown.close()
                                continue
                            else:
                                raise
                    else:
                        teardown.send(result)
                    # Following is unreachable for a well behaved hook wrapper.
                    # Try to force finalizers otherwise postponed till GC action.
                    # Note: close() may raise if generator handles GeneratorExit.
                    teardown.close()
                except StopIteration as si:
                    result = si.value
                    exception = None
                    continue
                except BaseException as e:
                    exception = e
                    continue
                _raise_wrapfail(teardown, "has second yield")
    
        if exception is not None:
>           raise exception

.venv/lib/python3.13............................../site-packages/pluggy/_callers.py:167: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

hook_name = 'pytest_runtest_call'
hook_impls = [<HookImpl plugin_name='runner', plugin=<module '_pytest.runner' from '.../authentik/authentik/.venv/lib...=None>>, <HookImpl plugin_name='logging-plugin', plugin=<_pytest.logging.LoggingPlugin object at 0x7f507d236120>>, ...]
caller_kwargs = {'item': <TestCaseFunction test_docker_controller>}
firstresult = False

    def _multicall(
        hook_name: str,
        hook_impls: Sequence[HookImpl],
        caller_kwargs: Mapping[str, object],
        firstresult: bool,
    ) -> object | list[object]:
        """Execute a call into multiple python functions/methods and return the
        result(s).
    
        ``caller_kwargs`` comes from HookCaller.__call__().
        """
        __tracebackhide__ = True
        results: list[object] = []
        exception = None
        try:  # run impl and wrapper setup functions in a loop
            teardowns: list[Teardown] = []
            try:
                for hook_impl in reversed(hook_impls):
                    try:
                        args = [caller_kwargs[argname] for argname in hook_impl.argnames]
                    except KeyError as e:
                        # coverage bug - this is tested
                        for argname in hook_impl.argnames:  # pragma: no cover
                            if argname not in caller_kwargs:
                                raise HookCallError(
                                    f"hook call must provide argument {argname!r}"
                                ) from e
    
                    if hook_impl.hookwrapper:
                        function_gen = run_old_style_hookwrapper(hook_impl, hook_name, args)
    
                        next(function_gen)  # first yield
                        teardowns.append(function_gen)
    
                    elif hook_impl.wrapper:
                        try:
                            # If this cast is not valid, a type error is raised below,
                            # which is the desired response.
                            res = hook_impl.function(*args)
                            function_gen = cast(Generator[None, object, object], res)
                            next(function_gen)  # first yield
                            teardowns.append(function_gen)
                        except StopIteration:
                            _raise_wrapfail(function_gen, "did not yield")
                    else:
                        res = hook_impl.function(*args)
                        if res is not None:
                            results.append(res)
                            if firstresult:  # halt further impl calls
                                break
            except BaseException as exc:
                exception = exc
        finally:
            if firstresult:  # first result hooks return a single value
                result = results[0] if results else None
            else:
                result = results
    
            # run all wrapper post-yield blocks
            for teardown in reversed(teardowns):
                try:
                    if exception is not None:
                        try:
>                           teardown.throw(exception)

.venv/lib/python3.13............................../site-packages/pluggy/_callers.py:139: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

    @pytest.hookimpl(wrapper=True, tryfirst=True)
    def pytest_runtest_call() -> Generator[None]:
>       yield from thread_exception_runtest_hook()

.venv/lib/python3.13....../site-packages/_pytest/threadexception.py:92: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

    def thread_exception_runtest_hook() -> Generator[None]:
        with catch_threading_exception() as cm:
            try:
>               yield

.venv/lib/python3.13....../site-packages/_pytest/threadexception.py:68: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

hook_name = 'pytest_runtest_call'
hook_impls = [<HookImpl plugin_name='runner', plugin=<module '_pytest.runner' from '.../authentik/authentik/.venv/lib...=None>>, <HookImpl plugin_name='logging-plugin', plugin=<_pytest.logging.LoggingPlugin object at 0x7f507d236120>>, ...]
caller_kwargs = {'item': <TestCaseFunction test_docker_controller>}
firstresult = False

    def _multicall(
        hook_name: str,
        hook_impls: Sequence[HookImpl],
        caller_kwargs: Mapping[str, object],
        firstresult: bool,
    ) -> object | list[object]:
        """Execute a call into multiple python functions/methods and return the
        result(s).
    
        ``caller_kwargs`` comes from HookCaller.__call__().
        """
        __tracebackhide__ = True
        results: list[object] = []
        exception = None
        try:  # run impl and wrapper setup functions in a loop
            teardowns: list[Teardown] = []
            try:
                for hook_impl in reversed(hook_impls):
                    try:
                        args = [caller_kwargs[argname] for argname in hook_impl.argnames]
                    except KeyError as e:
                        # coverage bug - this is tested
                        for argname in hook_impl.argnames:  # pragma: no cover
                            if argname not in caller_kwargs:
                                raise HookCallError(
                                    f"hook call must provide argument {argname!r}"
                                ) from e
    
                    if hook_impl.hookwrapper:
                        function_gen = run_old_style_hookwrapper(hook_impl, hook_name, args)
    
                        next(function_gen)  # first yield
                        teardowns.append(function_gen)
    
                    elif hook_impl.wrapper:
                        try:
                            # If this cast is not valid, a type error is raised below,
                            # which is the desired response.
                            res = hook_impl.function(*args)
                            function_gen = cast(Generator[None, object, object], res)
                            next(function_gen)  # first yield
                            teardowns.append(function_gen)
                        except StopIteration:
                            _raise_wrapfail(function_gen, "did not yield")
                    else:
                        res = hook_impl.function(*args)
                        if res is not None:
                            results.append(res)
                            if firstresult:  # halt further impl calls
                                break
            except BaseException as exc:
                exception = exc
        finally:
            if firstresult:  # first result hooks return a single value
                result = results[0] if results else None
            else:
                result = results
    
            # run all wrapper post-yield blocks
            for teardown in reversed(teardowns):
                try:
                    if exception is not None:
                        try:
>                           teardown.throw(exception)

.venv/lib/python3.13............................../site-packages/pluggy/_callers.py:139: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

    @pytest.hookimpl(wrapper=True, tryfirst=True)
    def pytest_runtest_call() -> Generator[None]:
>       yield from unraisable_exception_runtest_hook()

.venv/lib/python3.13....../site-packages/_pytest/unraisableexception.py:95: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

    def unraisable_exception_runtest_hook() -> Generator[None]:
        with catch_unraisable_exception() as cm:
            try:
>               yield

.venv/lib/python3.13....../site-packages/_pytest/unraisableexception.py:70: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

hook_name = 'pytest_runtest_call'
hook_impls = [<HookImpl plugin_name='runner', plugin=<module '_pytest.runner' from '.../authentik/authentik/.venv/lib...=None>>, <HookImpl plugin_name='logging-plugin', plugin=<_pytest.logging.LoggingPlugin object at 0x7f507d236120>>, ...]
caller_kwargs = {'item': <TestCaseFunction test_docker_controller>}
firstresult = False

    def _multicall(
        hook_name: str,
        hook_impls: Sequence[HookImpl],
        caller_kwargs: Mapping[str, object],
        firstresult: bool,
    ) -> object | list[object]:
        """Execute a call into multiple python functions/methods and return the
        result(s).
    
        ``caller_kwargs`` comes from HookCaller.__call__().
        """
        __tracebackhide__ = True
        results: list[object] = []
        exception = None
        try:  # run impl and wrapper setup functions in a loop
            teardowns: list[Teardown] = []
            try:
                for hook_impl in reversed(hook_impls):
                    try:
                        args = [caller_kwargs[argname] for argname in hook_impl.argnames]
                    except KeyError as e:
                        # coverage bug - this is tested
                        for argname in hook_impl.argnames:  # pragma: no cover
                            if argname not in caller_kwargs:
                                raise HookCallError(
                                    f"hook call must provide argument {argname!r}"
                                ) from e
    
                    if hook_impl.hookwrapper:
                        function_gen = run_old_style_hookwrapper(hook_impl, hook_name, args)
    
                        next(function_gen)  # first yield
                        teardowns.append(function_gen)
    
                    elif hook_impl.wrapper:
                        try:
                            # If this cast is not valid, a type error is raised below,
                            # which is the desired response.
                            res = hook_impl.function(*args)
                            function_gen = cast(Generator[None, object, object], res)
                            next(function_gen)  # first yield
                            teardowns.append(function_gen)
                        except StopIteration:
                            _raise_wrapfail(function_gen, "did not yield")
                    else:
                        res = hook_impl.function(*args)
                        if res is not None:
                            results.append(res)
                            if firstresult:  # halt further impl calls
                                break
            except BaseException as exc:
                exception = exc
        finally:
            if firstresult:  # first result hooks return a single value
                result = results[0] if results else None
            else:
                result = results
    
            # run all wrapper post-yield blocks
            for teardown in reversed(teardowns):
                try:
                    if exception is not None:
                        try:
>                           teardown.throw(exception)

.venv/lib/python3.13............................../site-packages/pluggy/_callers.py:139: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <_pytest.logging.LoggingPlugin object at 0x7f507d236120>
item = <TestCaseFunction test_docker_controller>

    @hookimpl(wrapper=True)
    def pytest_runtest_call(self, item: nodes.Item) -> Generator[None]:
        self.log_cli_handler.set_when("call")
    
>       yield from self._runtest_for(item, "call")

.venv/lib/python3.13....../site-packages/_pytest/logging.py:846: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <_pytest.logging.LoggingPlugin object at 0x7f507d236120>
item = <TestCaseFunction test_docker_controller>, when = 'call'

    def _runtest_for(self, item: nodes.Item, when: str) -> Generator[None]:
        """Implement the internals of the pytest_runtest_xxx() hooks."""
        with catching_logs(
            self.caplog_handler,
            level=self.log_level,
        ) as caplog_handler, catching_logs(
            self.report_handler,
            level=self.log_level,
        ) as report_handler:
            caplog_handler.reset()
            report_handler.reset()
            item.stash[caplog_records_key][when] = caplog_handler.records
            item.stash[caplog_handler_key] = caplog_handler
    
            try:
>               yield

.venv/lib/python3.13....../site-packages/_pytest/logging.py:829: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

hook_name = 'pytest_runtest_call'
hook_impls = [<HookImpl plugin_name='runner', plugin=<module '_pytest.runner' from '.../authentik/authentik/.venv/lib...=None>>, <HookImpl plugin_name='logging-plugin', plugin=<_pytest.logging.LoggingPlugin object at 0x7f507d236120>>, ...]
caller_kwargs = {'item': <TestCaseFunction test_docker_controller>}
firstresult = False

    def _multicall(
        hook_name: str,
        hook_impls: Sequence[HookImpl],
        caller_kwargs: Mapping[str, object],
        firstresult: bool,
    ) -> object | list[object]:
        """Execute a call into multiple python functions/methods and return the
        result(s).
    
        ``caller_kwargs`` comes from HookCaller.__call__().
        """
        __tracebackhide__ = True
        results: list[object] = []
        exception = None
        try:  # run impl and wrapper setup functions in a loop
            teardowns: list[Teardown] = []
            try:
                for hook_impl in reversed(hook_impls):
                    try:
                        args = [caller_kwargs[argname] for argname in hook_impl.argnames]
                    except KeyError as e:
                        # coverage bug - this is tested
                        for argname in hook_impl.argnames:  # pragma: no cover
                            if argname not in caller_kwargs:
                                raise HookCallError(
                                    f"hook call must provide argument {argname!r}"
                                ) from e
    
                    if hook_impl.hookwrapper:
                        function_gen = run_old_style_hookwrapper(hook_impl, hook_name, args)
    
                        next(function_gen)  # first yield
                        teardowns.append(function_gen)
    
                    elif hook_impl.wrapper:
                        try:
                            # If this cast is not valid, a type error is raised below,
                            # which is the desired response.
                            res = hook_impl.function(*args)
                            function_gen = cast(Generator[None, object, object], res)
                            next(function_gen)  # first yield
                            teardowns.append(function_gen)
                        except StopIteration:
                            _raise_wrapfail(function_gen, "did not yield")
                    else:
                        res = hook_impl.function(*args)
                        if res is not None:
                            results.append(res)
                            if firstresult:  # halt further impl calls
                                break
            except BaseException as exc:
                exception = exc
        finally:
            if firstresult:  # first result hooks return a single value
                result = results[0] if results else None
            else:
                result = results
    
            # run all wrapper post-yield blocks
            for teardown in reversed(teardowns):
                try:
                    if exception is not None:
                        try:
>                           teardown.throw(exception)

.venv/lib/python3.13............................../site-packages/pluggy/_callers.py:139: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <CaptureManager _method='fd' _global_capturing=<MultiCapture out=<FDCapture 1 oldfd=6 _state='suspended' tmpfile=<Enco...xtIOWrapper name='/dev/null' mode='r' encoding='utf-8'>> _state='suspended' _in_suspended=False> _capture_fixture=None>
item = <TestCaseFunction test_docker_controller>

    @hookimpl(wrapper=True)
    def pytest_runtest_call(self, item: Item) -> Generator[None]:
        with self.item_capture("call", item):
>           return (yield)

.venv/lib/python3.13.../site-packages/_pytest/capture.py:898: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

hook_name = 'pytest_runtest_call'
hook_impls = [<HookImpl plugin_name='runner', plugin=<module '_pytest.runner' from '.../authentik/authentik/.venv/lib...=None>>, <HookImpl plugin_name='logging-plugin', plugin=<_pytest.logging.LoggingPlugin object at 0x7f507d236120>>, ...]
caller_kwargs = {'item': <TestCaseFunction test_docker_controller>}
firstresult = False

    def _multicall(
        hook_name: str,
        hook_impls: Sequence[HookImpl],
        caller_kwargs: Mapping[str, object],
        firstresult: bool,
    ) -> object | list[object]:
        """Execute a call into multiple python functions/methods and return the
        result(s).
    
        ``caller_kwargs`` comes from HookCaller.__call__().
        """
        __tracebackhide__ = True
        results: list[object] = []
        exception = None
        try:  # run impl and wrapper setup functions in a loop
            teardowns: list[Teardown] = []
            try:
                for hook_impl in reversed(hook_impls):
                    try:
                        args = [caller_kwargs[argname] for argname in hook_impl.argnames]
                    except KeyError as e:
                        # coverage bug - this is tested
                        for argname in hook_impl.argnames:  # pragma: no cover
                            if argname not in caller_kwargs:
                                raise HookCallError(
                                    f"hook call must provide argument {argname!r}"
                                ) from e
    
                    if hook_impl.hookwrapper:
                        function_gen = run_old_style_hookwrapper(hook_impl, hook_name, args)
    
                        next(function_gen)  # first yield
                        teardowns.append(function_gen)
    
                    elif hook_impl.wrapper:
                        try:
                            # If this cast is not valid, a type error is raised below,
                            # which is the desired response.
                            res = hook_impl.function(*args)
                            function_gen = cast(Generator[None, object, object], res)
                            next(function_gen)  # first yield
                            teardowns.append(function_gen)
                        except StopIteration:
                            _raise_wrapfail(function_gen, "did not yield")
                    else:
                        res = hook_impl.function(*args)
                        if res is not None:
                            results.append(res)
                            if firstresult:  # halt further impl calls
                                break
            except BaseException as exc:
                exception = exc
        finally:
            if firstresult:  # first result hooks return a single value
                result = results[0] if results else None
            else:
                result = results
    
            # run all wrapper post-yield blocks
            for teardown in reversed(teardowns):
                try:
                    if exception is not None:
                        try:
>                           teardown.throw(exception)

.venv/lib/python3.13............................../site-packages/pluggy/_callers.py:139: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

hook_impl = <HookImpl plugin_name='timeout', plugin=<module 'pytest_timeout' from '.../authentik/authentik/.venv/lib/python3.13/site-packages/pytest_timeout.py'>>
hook_name = 'pytest_runtest_call'
args = [<TestCaseFunction test_docker_controller>]

    def run_old_style_hookwrapper(
        hook_impl: HookImpl, hook_name: str, args: Sequence[object]
    ) -> Teardown:
        """
        backward compatibility wrapper to run a old style hookwrapper as a wrapper
        """
    
        teardown: Teardown = cast(Teardown, hook_impl.function(*args))
        try:
            next(teardown)
        except StopIteration:
            _raise_wrapfail(teardown, "did not yield")
        try:
            res = yield
            result = Result(res, None)
        except BaseException as exc:
            result = Result(None, exc)
        try:
            teardown.send(result)
        except StopIteration:
            pass
        except BaseException as e:
            _warn_teardown_exception(hook_name, hook_impl, e)
            raise
        else:
            _raise_wrapfail(teardown, "has second yield")
        finally:
            teardown.close()
>       return result.get_result()

.venv/lib/python3.13............................../site-packages/pluggy/_callers.py:53: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <pluggy._result.Result object at 0x7f507bb24640>

    def get_result(self) -> ResultType:
        """Get the result(s) for this hook call.
    
        If the hook was marked as a ``firstresult`` only a single value
        will be returned, otherwise a list of results.
        """
        __tracebackhide__ = True
        exc = self._exception
        tb = self._traceback
        if exc is None:
            return cast(ResultType, self._result)
        else:
>           raise exc.with_traceback(tb)

.venv/lib/python3.13.../site-packages/pluggy/_result.py:103: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

hook_impl = <HookImpl plugin_name='timeout', plugin=<module 'pytest_timeout' from '.../authentik/authentik/.venv/lib/python3.13/site-packages/pytest_timeout.py'>>
hook_name = 'pytest_runtest_call'
args = [<TestCaseFunction test_docker_controller>]

    def run_old_style_hookwrapper(
        hook_impl: HookImpl, hook_name: str, args: Sequence[object]
    ) -> Teardown:
        """
        backward compatibility wrapper to run a old style hookwrapper as a wrapper
        """
    
        teardown: Teardown = cast(Teardown, hook_impl.function(*args))
        try:
            next(teardown)
        except StopIteration:
            _raise_wrapfail(teardown, "did not yield")
        try:
>           res = yield

.venv/lib/python3.13............................../site-packages/pluggy/_callers.py:38: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

hook_name = 'pytest_runtest_call'
hook_impls = [<HookImpl plugin_name='runner', plugin=<module '_pytest.runner' from '.../authentik/authentik/.venv/lib...=None>>, <HookImpl plugin_name='logging-plugin', plugin=<_pytest.logging.LoggingPlugin object at 0x7f507d236120>>, ...]
caller_kwargs = {'item': <TestCaseFunction test_docker_controller>}
firstresult = False

    def _multicall(
        hook_name: str,
        hook_impls: Sequence[HookImpl],
        caller_kwargs: Mapping[str, object],
        firstresult: bool,
    ) -> object | list[object]:
        """Execute a call into multiple python functions/methods and return the
        result(s).
    
        ``caller_kwargs`` comes from HookCaller.__call__().
        """
        __tracebackhide__ = True
        results: list[object] = []
        exception = None
        try:  # run impl and wrapper setup functions in a loop
            teardowns: list[Teardown] = []
            try:
                for hook_impl in reversed(hook_impls):
                    try:
                        args = [caller_kwargs[argname] for argname in hook_impl.argnames]
                    except KeyError as e:
                        # coverage bug - this is tested
                        for argname in hook_impl.argnames:  # pragma: no cover
                            if argname not in caller_kwargs:
                                raise HookCallError(
                                    f"hook call must provide argument {argname!r}"
                                ) from e
    
                    if hook_impl.hookwrapper:
                        function_gen = run_old_style_hookwrapper(hook_impl, hook_name, args)
    
                        next(function_gen)  # first yield
                        teardowns.append(function_gen)
    
                    elif hook_impl.wrapper:
                        try:
                            # If this cast is not valid, a type error is raised below,
                            # which is the desired response.
                            res = hook_impl.function(*args)
                            function_gen = cast(Generator[None, object, object], res)
                            next(function_gen)  # first yield
                            teardowns.append(function_gen)
                        except StopIteration:
                            _raise_wrapfail(function_gen, "did not yield")
                    else:
                        res = hook_impl.function(*args)
                        if res is not None:
                            results.append(res)
                            if firstresult:  # halt further impl calls
                                break
            except BaseException as exc:
                exception = exc
        finally:
            if firstresult:  # first result hooks return a single value
                result = results[0] if results else None
            else:
                result = results
    
            # run all wrapper post-yield blocks
            for teardown in reversed(teardowns):
                try:
                    if exception is not None:
                        try:
>                           teardown.throw(exception)

.venv/lib/python3.13............................../site-packages/pluggy/_callers.py:139: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

item = <TestCaseFunction test_docker_controller>

    @hookimpl(wrapper=True)
    def pytest_runtest_call(item: Item) -> Generator[None]:
        xfailed = item.stash.get(xfailed_key, None)
        if xfailed is None:
            item.stash[xfailed_key] = xfailed = evaluate_xfail_marks(item)
    
        if xfailed and not item.config.option.runxfail and not xfailed.run:
            xfail("[NOTRUN] " + xfailed.reason)
    
        try:
>           return (yield)

.venv/lib/python3.13.../site-packages/_pytest/skipping.py:257: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

hook_name = 'pytest_runtest_call'
hook_impls = [<HookImpl plugin_name='runner', plugin=<module '_pytest.runner' from '.../authentik/authentik/.venv/lib...=None>>, <HookImpl plugin_name='logging-plugin', plugin=<_pytest.logging.LoggingPlugin object at 0x7f507d236120>>, ...]
caller_kwargs = {'item': <TestCaseFunction test_docker_controller>}
firstresult = False

    def _multicall(
        hook_name: str,
        hook_impls: Sequence[HookImpl],
        caller_kwargs: Mapping[str, object],
        firstresult: bool,
    ) -> object | list[object]:
        """Execute a call into multiple python functions/methods and return the
        result(s).
    
        ``caller_kwargs`` comes from HookCaller.__call__().
        """
        __tracebackhide__ = True
        results: list[object] = []
        exception = None
        try:  # run impl and wrapper setup functions in a loop
            teardowns: list[Teardown] = []
            try:
                for hook_impl in reversed(hook_impls):
                    try:
                        args = [caller_kwargs[argname] for argname in hook_impl.argnames]
                    except KeyError as e:
                        # coverage bug - this is tested
                        for argname in hook_impl.argnames:  # pragma: no cover
                            if argname not in caller_kwargs:
                                raise HookCallError(
                                    f"hook call must provide argument {argname!r}"
                                ) from e
    
                    if hook_impl.hookwrapper:
                        function_gen = run_old_style_hookwrapper(hook_impl, hook_name, args)
    
                        next(function_gen)  # first yield
                        teardowns.append(function_gen)
    
                    elif hook_impl.wrapper:
                        try:
                            # If this cast is not valid, a type error is raised below,
                            # which is the desired response.
                            res = hook_impl.function(*args)
                            function_gen = cast(Generator[None, object, object], res)
                            next(function_gen)  # first yield
                            teardowns.append(function_gen)
                        except StopIteration:
                            _raise_wrapfail(function_gen, "did not yield")
                    else:
>                       res = hook_impl.function(*args)

.venv/lib/python3.13............................../site-packages/pluggy/_callers.py:121: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

item = <TestCaseFunction test_docker_controller>

    def pytest_runtest_call(item: Item) -> None:
        _update_current_test_var(item, "call")
        try:
            del sys.last_type
            del sys.last_value
            del sys.last_traceback
            if sys.version_info >= (3, 12, 0):
                del sys.last_exc  # type:ignore[attr-defined]
        except AttributeError:
            pass
        try:
>           item.runtest()

.venv/lib/python3.13........./site-packages/_pytest/runner.py:174: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <TestCaseFunction test_docker_controller>

    def non_debugging_runtest(self) -> None:
>       self._testcase(result=self)

.venv/lib/python3.13.../site-packages/pytest_django/plugin.py:573: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.integration.test_proxy_docker.TestProxyDocker testMethod=test_docker_controller>
result = <TestCaseFunction test_docker_controller>

    def __call__(self, result=None):
        """
        Wrapper around default __call__ method to perform common Django test
        set up. This means that user-defined TestCases aren't required to
        include a call to super().setUp().
        """
>       self._setup_and_call(result)

.venv/lib/python3.13.../django/test/testcases.py:321: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.integration.test_proxy_docker.TestProxyDocker testMethod=test_docker_controller>
result = <TestCaseFunction test_docker_controller>, debug = False

    def _setup_and_call(self, result, debug=False):
        """
        Perform the following in order: pre-setup, run test, post-teardown,
        skipping pre/post hooks if test is set to be skipped.
    
        If debug=True, reraise any errors in setup and use super().debug()
        instead of __call__() to run the test.
        """
        testMethod = getattr(self, self._testMethodName)
        skipped = getattr(self.__class__, "__unittest_skip__", False) or getattr(
            testMethod, "__unittest_skip__", False
        )
    
        # Convert async test methods.
        if iscoroutinefunction(testMethod):
            setattr(self, self._testMethodName, async_to_sync(testMethod))
    
        if not skipped:
            try:
                if self.__class__._pre_setup_ran_eagerly:
                    self.__class__._pre_setup_ran_eagerly = False
                else:
                    self._pre_setup()
            except Exception:
                if debug:
                    raise
                result.addError(self, sys.exc_info())
                return
        if debug:
            super().debug()
        else:
            super().__call__(result)
        if not skipped:
            try:
>               self._post_teardown()

.venv/lib/python3.13.../django/test/testcases.py:379: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.integration.test_proxy_docker.TestProxyDocker testMethod=test_docker_controller>

    def _post_teardown(self):
        """
        Perform post-test things:
        * Flush the contents of the database to leave a clean slate. If the
          class has an 'available_apps' attribute, don't fire post_migrate.
        * Force-close the connection so the next test gets a clean cursor.
        """
        try:
>           self._fixture_teardown()

.venv/lib/python3.13.../django/test/testcases.py:1231: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.integration.test_proxy_docker.TestProxyDocker testMethod=test_docker_controller>

    def _fixture_teardown(self):
        # Allow TRUNCATE ... CASCADE and don't emit the post_migrate signal
        # when flushing only a subset of the apps
        for db_name in self._databases_names(include_mirrors=False):
            # Flush the database
            inhibit_post_migrate = (
                self.available_apps is not None
                or (  # Inhibit the post_migrate signal when using serialized
                    # rollback to avoid trying to recreate the serialized data.
                    self.serialized_rollback
                    and hasattr(connections[db_name], "_test_serialized_contents")
                )
            )
>           call_command(
                "flush",
                verbosity=0,
                interactive=False,
                database=db_name,
                reset_sequences=False,
                allow_cascade=self.available_apps is not None,
                inhibit_post_migrate=inhibit_post_migrate,
            )

.venv/lib/python3.13.../django/test/testcases.py:1266: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

command_name = 'flush', args = ()
options = {'allow_cascade': False, 'database': 'default', 'inhibit_post_migrate': False, 'interactive': False, ...}
command = <django.core.management.commands.flush.Command object at 0x7f507cdf8c20>
app_name = 'django.core'
parser = CommandParser(prog=' flush', usage=None, description='Removes ALL DATA from the database, including data added during ....', formatter_class=<class 'django.core.management.base.DjangoHelpFormatter'>, conflict_handler='error', add_help=True)
opt_mapping = {'database': 'database', 'force_color': 'force_color', 'help': 'help', 'no_color': 'no_color', ...}
arg_options = {'allow_cascade': False, 'database': 'default', 'inhibit_post_migrate': False, 'interactive': False, ...}
parse_args = []

    def call_command(command_name, *args, **options):
        """
        Call the given command, with the given options and args/kwargs.
    
        This is the primary API you should use for calling specific commands.
    
        `command_name` may be a string or a command object. Using a string is
        preferred unless the command object is required for further processing or
        testing.
    
        Some examples:
            call_command('migrate')
            call_command('shell', plain=True)
            call_command('sqlmigrate', 'myapp')
    
            from django.core.management.commands import flush
            cmd = flush.Command()
            call_command(cmd, verbosity=0, interactive=False)
            # Do something with cmd ...
        """
        if isinstance(command_name, BaseCommand):
            # Command object passed in.
            command = command_name
            command_name = command.__class__.__module__.split(".")[-1]
        else:
            # Load the command object by name.
            try:
                app_name = get_commands()[command_name]
            except KeyError:
                raise CommandError("Unknown command: %r" % command_name)
    
            if isinstance(app_name, BaseCommand):
                # If the command is already loaded, use it directly.
                command = app_name
            else:
                command = load_command_class(app_name, command_name)
    
        # Simulate argument parsing to get the option defaults (see #10080 for details).
        parser = command.create_parser("", command_name)
        # Use the `dest` option name from the parser option
        opt_mapping = {
            min(s_opt.option_strings).lstrip("-").replace("-", "_"): s_opt.dest
            for s_opt in parser._actions
            if s_opt.option_strings
        }
        arg_options = {opt_mapping.get(key, key): value for key, value in options.items()}
        parse_args = []
        for arg in args:
            if isinstance(arg, (list, tuple)):
                parse_args += map(str, arg)
            else:
                parse_args.append(str(arg))
    
        def get_actions(parser):
            # Parser actions and actions from sub-parser choices.
            for opt in parser._actions:
                if isinstance(opt, _SubParsersAction):
                    for sub_opt in opt.choices.values():
                        yield from get_actions(sub_opt)
                else:
                    yield opt
    
        parser_actions = list(get_actions(parser))
        mutually_exclusive_required_options = {
            opt
            for group in parser._mutually_exclusive_groups
            for opt in group._group_actions
            if group.required
        }
        # Any required arguments which are passed in via **options must be passed
        # to parse_args().
        for opt in parser_actions:
            if opt.dest in options and (
                opt.required or opt in mutually_exclusive_required_options
            ):
                opt_dest_count = sum(v == opt.dest for v in opt_mapping.values())
                if opt_dest_count > 1:
                    raise TypeError(
                        f"Cannot pass the dest {opt.dest!r} that matches multiple "
                        f"arguments via **options."
                    )
                parse_args.append(min(opt.option_strings))
                if isinstance(opt, (_AppendConstAction, _CountAction, _StoreConstAction)):
                    continue
                value = arg_options[opt.dest]
                if isinstance(value, (list, tuple)):
                    parse_args += map(str, value)
                else:
                    parse_args.append(str(value))
        defaults = parser.parse_args(args=parse_args)
        defaults = dict(defaults._get_kwargs(), **arg_options)
        # Raise an error if any unknown options were passed.
        stealth_options = set(command.base_stealth_options + command.stealth_options)
        dest_parameters = {action.dest for action in parser_actions}
        valid_options = (dest_parameters | stealth_options).union(opt_mapping)
        unknown_options = set(options) - valid_options
        if unknown_options:
            raise TypeError(
                "Unknown option(s) for %s command: %s. "
                "Valid options are: %s."
                % (
                    command_name,
                    ", ".join(sorted(unknown_options)),
                    ", ".join(sorted(valid_options)),
                )
            )
        # Move positional args out of options to mimic legacy optparse
        args = defaults.pop("args", ())
        if "skip_checks" not in options:
            defaults["skip_checks"] = True
    
>       return command.execute(*args, **defaults)

.venv/lib/python3.13.../core/management/__init__.py:194: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <django.core.management.commands.flush.Command object at 0x7f507cdf8c20>
args = ()
options = {'allow_cascade': False, 'database': 'default', 'force_color': False, 'inhibit_post_migrate': False, ...}

    def execute(self, *args, **options):
        """
        Try to execute this command, performing system checks if needed (as
        controlled by the ``requires_system_checks`` attribute, except if
        force-skipped).
        """
        if options["force_color"] and options["no_color"]:
            raise CommandError(
                "The --no-color and --force-color options can't be used together."
            )
        if options["force_color"]:
            self.style = color_style(force_color=True)
        elif options["no_color"]:
            self.style = no_style()
            self.stderr.style_func = None
        if options.get("stdout"):
            self.stdout = OutputWrapper(options["stdout"])
        if options.get("stderr"):
            self.stderr = OutputWrapper(options["stderr"])
    
        if self.requires_system_checks and not options["skip_checks"]:
            check_kwargs = self.get_check_kwargs(options)
            self.check(**check_kwargs)
        if self.requires_migrations_checks:
            self.check_migrations()
>       output = self.handle(*args, **options)

.venv/lib/python3.13.../core/management/base.py:464: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <django.core.management.commands.flush.Command object at 0x7f507cdf8c20>
options = {'allow_cascade': False, 'database': 'default', 'force_color': False, 'inhibit_post_migrate': False, ...}
database = 'default'
connection = <DatabaseWrapper vendor='postgresql' alias='default'>
verbosity = 0, interactive = False, reset_sequences = False
allow_cascade = False, inhibit_post_migrate = False

        def handle(self, **options):
            database = options["database"]
            connection = connections[database]
            verbosity = options["verbosity"]
            interactive = options["interactive"]
            # The following are stealth options used by Django's internals.
            reset_sequences = options.get("reset_sequences", True)
            allow_cascade = options.get("allow_cascade", False)
            inhibit_post_migrate = options.get("inhibit_post_migrate", False)
    
            self.style = no_style()
    
            # Import the 'management' module within each installed app, to register
            # dispatcher events.
            for app_config in apps.get_app_configs():
                try:
                    import_module(".management", app_config.name)
                except ImportError:
                    pass
    
            sql_list = sql_flush(
                self.style,
                connection,
                reset_sequences=reset_sequences,
                allow_cascade=allow_cascade,
            )
    
            if interactive:
                confirm = input(
                    """You have requested a flush of the database.
    This will IRREVERSIBLY DESTROY all data currently in the "%s" database,
    and return each table to an empty state.
    Are you sure you want to do this?
    
        Type 'yes' to continue, or 'no' to cancel: """
                    % connection.settings_dict["NAME"]
                )
            else:
                confirm = "yes"
    
            if confirm == "yes":
                try:
>                   connection.ops.execute_sql_flush(sql_list)

.venv/lib/python3.13.../management/commands/flush.py:74: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <psqlextra.backend.operations.PostgresOperations object at 0x7f5088714050>
sql_list = ['TRUNCATE "authentik_stages_authenticator_duo_authenticatorduostage", "authentik_providers_saml_samlsession", "authen...dergroup", "authentik_brands_brand_client_certificates", "authentik_providers_ssf_stream", "authentik_core_provider";']

    def execute_sql_flush(self, sql_list):
        """Execute a list of SQL statements to flush the database."""
        with transaction.atomic(
            using=self.connection.alias,
            savepoint=self.connection.features.can_rollback_ddl,
        ):
            with self.connection.cursor() as cursor:
                for sql in sql_list:
>                   cursor.execute(sql)

.venv/lib/python3.13.../backends/base/operations.py:473: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

args = (<django.db.backends.utils.CursorWrapper object at 0x7f507c63cb30>, 'TRUNCATE "authentik_stages_authenticator_duo_auth...dergroup", "authentik_brands_brand_client_certificates", "authentik_providers_ssf_stream", "authentik_core_provider";')
kwargs = {}

    def runner(*args: "P.args", **kwargs: "P.kwargs"):
        # type: (...) -> R
        if sentry_sdk.get_client().get_integration(integration) is None:
            return original_function(*args, **kwargs)
    
>       return sentry_patched_function(*args, **kwargs)

.venv/lib/python3.13.../site-packages/sentry_sdk/utils.py:1816: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <django.db.backends.utils.CursorWrapper object at 0x7f507c63cb30>
sql = 'TRUNCATE "authentik_stages_authenticator_duo_authenticatorduostage", "authentik_providers_saml_samlsession", "authent...idergroup", "authentik_brands_brand_client_certificates", "authentik_providers_ssf_stream", "authentik_core_provider";'
params = None

    @ensure_integration_enabled(DjangoIntegration, real_execute)
    def execute(self, sql, params=None):
        # type: (CursorWrapper, Any, Optional[Any]) -> Any
        with record_sql_queries(
            cursor=self.cursor,
            query=sql,
            params_list=params,
            paramstyle="format",
            executemany=False,
            span_origin=DjangoIntegration.origin_db,
        ) as span:
            _set_db_data(span, self)
>           result = real_execute(self, sql, params)

.venv/lib/python3.13.../integrations/django/__init__.py:651: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <django.db.backends.utils.CursorWrapper object at 0x7f507c63cb30>
sql = 'TRUNCATE "authentik_stages_authenticator_duo_authenticatorduostage", "authentik_providers_saml_samlsession", "authent...idergroup", "authentik_brands_brand_client_certificates", "authentik_providers_ssf_stream", "authentik_core_provider";'
params = None

    def execute(self, sql, params=None):
>       return self._execute_with_wrappers(
            sql, params, many=False, executor=self._execute
        )

.venv/lib/python3.13.../db/backends/utils.py:79: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <django.db.backends.utils.CursorWrapper object at 0x7f507c63cb30>
sql = 'TRUNCATE "authentik_stages_authenticator_duo_authenticatorduostage", "authentik_providers_saml_samlsession", "authent...idergroup", "authentik_brands_brand_client_certificates", "authentik_providers_ssf_stream", "authentik_core_provider";'
params = None, many = False
executor = <bound method CursorWrapper._execute of <django.db.backends.utils.CursorWrapper object at 0x7f507c63cb30>>

    def _execute_with_wrappers(self, sql, params, many, executor):
        context = {"connection": self.db, "cursor": self}
        for wrapper in reversed(self.db.execute_wrappers):
            executor = functools.partial(wrapper, executor)
>       return executor(sql, params, many, context)

.venv/lib/python3.13.../db/backends/utils.py:92: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <django.db.backends.utils.CursorWrapper object at 0x7f507c63cb30>
sql = 'TRUNCATE "authentik_stages_authenticator_duo_authenticatorduostage", "authentik_providers_saml_samlsession", "authent...idergroup", "authentik_brands_brand_client_certificates", "authentik_providers_ssf_stream", "authentik_core_provider";'
params = None
ignored_wrapper_args = (False, {'connection': <DatabaseWrapper vendor='postgresql' alias='default'>, 'cursor': <django.db.backends.utils.CursorWrapper object at 0x7f507c63cb30>})

    def _execute(self, sql, params, *ignored_wrapper_args):
        # Raise a warning during app initialization (stored_app_configs is only
        # ever set during testing).
        if not apps.ready and not apps.stored_app_configs:
            warnings.warn(self.APPS_NOT_READY_WARNING_MSG, category=RuntimeWarning)
        self.db.validate_no_broken_transaction()
        with self.db.wrap_database_errors:
            if params is None:
                # params default might be backend specific.
>               return self.cursor.execute(sql)

.venv/lib/python3.13.../db/backends/utils.py:103: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <django_prometheus.db.common.ExportingCursorWrapper.<locals>.CursorWrapper [closed] [BAD] at 0x7f507c262750>
args = ('TRUNCATE "authentik_stages_authenticator_duo_authenticatorduostage", "authentik_providers_saml_samlsession", "authen...ergroup", "authentik_brands_brand_client_certificates", "authentik_providers_ssf_stream", "authentik_core_provider";',)
kwargs = {}

    def execute(self, *args, **kwargs):
        execute_total.labels(alias, vendor).inc()
        with (
            query_duration_seconds.labels(**labels).time(),
            ExceptionCounterByType(errors_total, extra_labels=labels),
        ):
>           return super().execute(*args, **kwargs)

.venv/lib/python3.13.../django_prometheus/db/common.py:69: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <django_prometheus.db.common.ExportingCursorWrapper.<locals>.CursorWrapper [closed] [BAD] at 0x7f507c262750>
query = 'TRUNCATE "authentik_stages_authenticator_duo_authenticatorduostage", "authentik_providers_saml_samlsession", "authent...idergroup", "authentik_brands_brand_client_certificates", "authentik_providers_ssf_stream", "authentik_core_provider";'
params = None

    def execute(
        self,
        query: Query,
        params: Params | None = None,
        *,
        prepare: bool | None = None,
        binary: bool | None = None,
    ) -> Self:
        """
        Execute a query or command to the database.
        """
        try:
            with self._conn.lock:
>               self._conn.wait(
                    self._execute_gen(query, params, prepare=prepare, binary=binary)
                )

.venv/lib/python3.13.../site-packages/psycopg/cursor.py:93: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <psycopg.Connection [BAD] at 0x7f507d0ad810>
gen = <generator object BaseCursor._execute_gen at 0x7f507cd02e40>
interval = 0.1

    def wait(self, gen: PQGen[RV], interval: float | None = _WAIT_INTERVAL) -> RV:
        """
        Consume a generator operating on the connection.
    
        The function must be used on generators that don't change connection
        fd (i.e. not on connect and reset).
        """
        try:
>           return waiting.wait(gen, self.pgconn.socket, interval=interval)

.venv/lib/python3.13....../site-packages/psycopg/connection.py:414: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

>   ???

psycopg_c/_psycopg/waiting.pyx:201: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

signum = 14
frame = <frame at 0x7f507c0734c0, file '.../authentik/authentik/.venv/lib/python3.13....../site-packages/psycopg/connection.py', line 415, code wait>

    def handler(signum, frame):
        __tracebackhide__ = True
>       timeout_sigalrm(item, settings)

.venv/lib/python3.13/site-packages/pytest_timeout.py:317: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

item = <TestCaseFunction test_docker_controller>
settings = Settings(timeout=120.0, method='signal', func_only=False, disable_debugger_detection=False)

    def timeout_sigalrm(item, settings):
        """Dump stack of threads and raise an exception.
    
        This will output the stacks of any threads other then the
        current to stderr and then raise an AssertionError, thus
        terminating the test.
        """
        if not settings.disable_debugger_detection and is_debugging():
            return
        __tracebackhide__ = True
        nthreads = len(threading.enumerate())
        terminal = item.config.get_terminal_writer()
        if nthreads > 1:
            terminal.sep("+", title="Timeout")
        dump_stacks(terminal)
        if nthreads > 1:
            terminal.sep("+", title="Timeout")
>       pytest.fail(PYTEST_FAILURE_MESSAGE % settings.timeout)

.venv/lib/python3.13/site-packages/pytest_timeout.py:502: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

reason = 'Timeout (>120.0s) from pytest-timeout.', pytrace = True

    @_with_exception(Failed)
    def fail(reason: str = "", pytrace: bool = True) -> NoReturn:
        """Explicitly fail an executing test with the given message.
    
        :param reason:
            The message to show the user as reason for the failure.
    
        :param pytrace:
            If False, msg represents the full failure information and no
            python traceback will be reported.
    
        :raises pytest.fail.Exception:
            The exception that is raised.
        """
        __tracebackhide__ = True
>       raise Failed(msg=reason, pytrace=pytrace)
E       Failed: Timeout (>120.0s) from pytest-timeout.

.venv/lib/python3.13.../site-packages/_pytest/outcomes.py:178: Failed

To view more test analytics, go to the Test Analytics Dashboard
📋 Got 3 mins? Take this short survey to help us improve Test Analytics.

rissson added a commit that referenced this pull request Feb 16, 2026
…n-2025.10) (#20281)

fix binary outpost build on release (#20248)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant