Skip to content

providers/oauth2: use compare_digest for client_secret comparison (cherry-pick #19979 to version-2025.10)#19982

Merged
BeryJu merged 1 commit intoversion-2025.10from
cherry-pick/19979-to-version-2025.10
Feb 3, 2026
Merged

providers/oauth2: use compare_digest for client_secret comparison (cherry-pick #19979 to version-2025.10)#19982
BeryJu merged 1 commit intoversion-2025.10from
cherry-pick/19979-to-version-2025.10

Conversation

@authentik-automation
Copy link
Contributor

Cherry-pick of #19979 to version-2025.10 branch.

Original PR: #19979
Original Author: @kolega-ai-dev
Cherry-picked commit: b32f33b

…9979)

* security: use constant-time comparison for client secrets

Replace insecure '!=' comparisons with hmac.compare_digest() to prevent
timing attacks on client secret validation. This matches the existing
security pattern used elsewhere in the codebase.

* format

Signed-off-by: Jens Langhammer <jens@goauthentik.io>

---------

Signed-off-by: Jens Langhammer <jens@goauthentik.io>
Co-authored-by: kolega.dev <faizan@kolega.ai>
Co-authored-by: Jens Langhammer <jens@goauthentik.io>
@authentik-automation authentik-automation bot requested a review from a team as a code owner February 3, 2026 16:39
@netlify
Copy link

netlify bot commented Feb 3, 2026

Deploy Preview for authentik-integrations ready!

Name Link
🔨 Latest commit 580e254
🔍 Latest deploy log https://app.netlify.com/projects/authentik-integrations/deploys/698224c6c8b8f600082c272b
😎 Deploy Preview https://deploy-preview-19982--authentik-integrations.netlify.app
📱 Preview on mobile
Toggle QR Code...

QR Code

Use your smartphone camera to open QR code link.

To edit notification comments on pull requests, go to your Netlify project configuration.

@codecov
Copy link

codecov bot commented Feb 3, 2026

❌ 7 Tests Failed:

Tests completed Failed Passed Skipped
2205 7 2198 2
View the top 3 failed test(s) by shortest run time
tests.e2e.test_provider_ldap.TestProviderLDAP::test_ldap_bind_search
Stack Traces | 11.7s run time
self = <unittest.case._Outcome object at 0x7f62f2965e10>
test_case = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_search>
subTest = False

    @contextlib.contextmanager
    def testPartExecutor(self, test_case, subTest=False):
        old_success = self.success
        self.success = True
        try:
>           yield

.../hostedtoolcache/Python/3.13.11........./x64/lib/python3.13/unittest/case.py:58: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_search>
result = <TestCaseFunction test_ldap_bind_search>

    def run(self, result=None):
        if result is None:
            result = self.defaultTestResult()
            startTestRun = getattr(result, 'startTestRun', None)
            stopTestRun = getattr(result, 'stopTestRun', None)
            if startTestRun is not None:
                startTestRun()
        else:
            stopTestRun = None
    
        result.startTest(self)
        try:
            testMethod = getattr(self, self._testMethodName)
            if (getattr(self.__class__, "__unittest_skip__", False) or
                getattr(testMethod, "__unittest_skip__", False)):
                # If the class or method was skipped.
                skip_why = (getattr(self.__class__, '__unittest_skip_why__', '')
                            or getattr(testMethod, '__unittest_skip_why__', ''))
                _addSkip(result, self, skip_why)
                return result
    
            expecting_failure = (
                getattr(self, "__unittest_expecting_failure__", False) or
                getattr(testMethod, "__unittest_expecting_failure__", False)
            )
            outcome = _Outcome(result)
            start_time = time.perf_counter()
            try:
                self._outcome = outcome
    
                with outcome.testPartExecutor(self):
                    self._callSetUp()
                if outcome.success:
                    outcome.expecting_failure = expecting_failure
                    with outcome.testPartExecutor(self):
>                       self._callTestMethod(testMethod)

.../hostedtoolcache/Python/3.13.11........./x64/lib/python3.13/unittest/case.py:651: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_search>
method = <bound method TestProviderLDAP.test_ldap_bind_search of <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_search>>

    def _callTestMethod(self, method):
>       if method() is not None:

.../hostedtoolcache/Python/3.13.11........./x64/lib/python3.13/unittest/case.py:606: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_search>
args = (), kwargs = {}

    @wraps(func)
    def wrapper(self: TransactionTestCase, *args, **kwargs):
        """Run test again if we're below max_retries, including tearDown and
        setUp. Otherwise raise the error"""
        nonlocal count
        try:
>           return func(self, *args, **kwargs)

tests/e2e/utils.py:324: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

args = (<tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_search>,)
kwargs = {}, file = 'default/flow-default-invalidation-flow.yaml'
content = 'version: 1\nmetadata:\n  name: Default - Invalidation flow\nentries:\n- attrs:\n    designation: invalidation\n    na...0\n    stage: !KeyOf default-invalidation-logout\n    target: !KeyOf flow\n  model: authentik_flows.flowstagebinding\n'

    @wraps(func)
    def wrapper(*args, **kwargs):
        for file in files:
            content = BlueprintInstance(path=file).retrieve()
            Importer.from_string(content).apply()
>       return func(*args, **kwargs)

.../blueprints/tests/__init__.py:25: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

args = (<tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_search>,)
kwargs = {}, config = <AuthentikTenantsConfig: authentik_tenants>

    @wraps(func)
    def wrapper(*args, **kwargs):
        config = apps.get_app_config(app_name)
        if isinstance(config, ManagedAppConfig):
            config._on_startup_callback(None)
>       return func(*args, **kwargs)

.../blueprints/tests/__init__.py:43: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

args = (<tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_search>,)
kwargs = {}, config = <AuthentikOutpostConfig: authentik_outposts>

    @wraps(func)
    def wrapper(*args, **kwargs):
        config = apps.get_app_config(app_name)
        if isinstance(config, ManagedAppConfig):
            config._on_startup_callback(None)
>       return func(*args, **kwargs)

.../blueprints/tests/__init__.py:43: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_search>

    @retry()
    @apply_blueprint(
        "default/flow-default-authentication-flow.yaml",
        "default/flow-default-invalidation-flow.yaml",
    )
    @reconcile_app("authentik_tenants")
    @reconcile_app("authentik_outposts")
    def test_ldap_bind_search(self):
        """Test simple bind + search"""
        # Remove akadmin to ensure list is correct
        # Remove user before starting container so it's not cached
        User.objects.filter(username="akadmin").delete()
    
        outpost = self._prepare()
        server = Server("ldap://localhost:3389", get_info=ALL)
        _connection = Connection(
            server,
            raise_exceptions=True,
            user=f"cn={self.user.username},ou=users,dc=ldap,dc=goauthentik,dc=io",
            password=self.user.username,
        )
>       _connection.bind()

tests/e2e/test_provider_ldap.py:191: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = Connection(server=Server(host='localhost', port=3389, use_ssl=False, allowed_referral_hosts=[('*', True)], get_info='A...oder=True, auto_range=True, return_empty_attributes=True, auto_encode=True, auto_escape=True, use_referral_cache=False)
read_server_info = True, controls = None

    def bind(self,
             read_server_info=True,
             controls=None):
        """Bind to ldap Server with the authentication method and the user defined in the connection
    
        :param read_server_info: reads info from server
        :param controls: LDAP controls to send along with the bind operation
        :type controls: list of tuple
        :return: bool
    
        """
        if log_enabled(BASIC):
            log(BASIC, 'start BIND operation via <%s>', self)
        self.last_error = None
        with self.connection_lock:
            if self.lazy and not self._executing_deferred:
                if self.strategy.pooled:
                    self.strategy.validate_bind(controls)
                self._deferred_bind = True
                self._bind_controls = controls
                self.bound = True
                if log_enabled(BASIC):
                    log(BASIC, 'deferring bind for <%s>', self)
            else:
                self._deferred_bind = False
                self._bind_controls = None
                if self.closed:  # try to open connection if closed
                    self.open(read_server_info=False)
                if self.authentication == ANONYMOUS:
                    if log_enabled(PROTOCOL):
                        log(PROTOCOL, 'performing anonymous BIND for <%s>', self)
                    if not self.strategy.pooled:
                        request = bind_operation(self.version, self.authentication, self.user, '', auto_encode=self.auto_encode)
                        if log_enabled(PROTOCOL):
                            log(PROTOCOL, 'anonymous BIND request <%s> sent via <%s>', bind_request_to_dict(request), self)
                        response = self.post_send_single_response(self.send('bindRequest', request, controls))
                    else:
                        response = self.strategy.validate_bind(controls)  # only for REUSABLE
                elif self.authentication == SIMPLE:
                    if log_enabled(PROTOCOL):
                        log(PROTOCOL, 'performing simple BIND for <%s>', self)
                    if not self.strategy.pooled:
                        request = bind_operation(self.version, self.authentication, self.user, self.password, auto_encode=self.auto_encode)
                        if log_enabled(PROTOCOL):
                            log(PROTOCOL, 'simple BIND request <%s> sent via <%s>', bind_request_to_dict(request), self)
>                       response = self.post_send_single_response(self.send('bindRequest', request, controls))

.venv/lib/python3.13.../ldap3/core/connection.py:607: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <ldap3.strategy.sync.SyncStrategy object at 0x7f62f4ccfce0>
message_id = 12

    def post_send_single_response(self, message_id):
        """
        Executed after an Operation Request (except Search)
        Returns the result message or None
        """
>       responses, result = self.get_response(message_id)

.venv/lib/python3.13.../ldap3/strategy/sync.py:160: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <ldap3.strategy.sync.SyncStrategy object at 0x7f62f4ccfce0>
message_id = 12, timeout = 20, get_request = False

    def get_response(self, message_id, timeout=None, get_request=False):
        """
        Get response LDAP messages
        Responses are returned by the underlying connection strategy
        Check if message_id LDAP message is still outstanding and wait for timeout to see if it appears in _get_response
        Result is stored in connection.result
        Responses without result is stored in connection.response
        A tuple (responses, result) is returned
        """
        if timeout is None:
            timeout = get_config_parameter('RESPONSE_WAITING_TIMEOUT')
        response = None
        result = None
        # request = None
        if self._outstanding and message_id in self._outstanding:
            responses = self._get_response(message_id, timeout)
    
            if not responses:
                if log_enabled(ERROR):
                    log(ERROR, 'socket timeout, no response from server for <%s>', self.connection)
                raise LDAPResponseTimeoutError('no response from server')
    
            if responses == SESSION_TERMINATED_BY_SERVER:
                try:  # try to close the session but don't raise any error if server has already closed the session
                    self.close()
                except (socket.error, LDAPExceptionError):
                    pass
                self.connection.last_error = 'session terminated by server'
                if log_enabled(ERROR):
                    log(ERROR, '<%s> for <%s>', self.connection.last_error, self.connection)
                raise LDAPSessionTerminatedByServerError(self.connection.last_error)
            elif responses == TRANSACTION_ERROR:  # Novell LDAP Transaction unsolicited notification
                self.connection.last_error = 'transaction error'
                if log_enabled(ERROR):
                    log(ERROR, '<%s> for <%s>', self.connection.last_error, self.connection)
                raise LDAPTransactionError(self.connection.last_error)
    
            # if referral in response opens a new connection to resolve referrals if requested
    
            if responses[-2]['result'] == RESULT_REFERRAL:
                if self.connection.usage:
                    self.connection._usage.referrals_received += 1
                if self.connection.auto_referrals:
                    ref_response, ref_result = self.do_operation_on_referral(self._outstanding[message_id], responses[-2]['referrals'])
                    if ref_response is not None:
                        responses = ref_response + [ref_result]
                        responses.append(RESPONSE_COMPLETE)
                    elif ref_result is not None:
                        responses = [ref_result, RESPONSE_COMPLETE]
    
                    self._referrals = []
    
            if responses:
                result = responses[-2]
                response = responses[:-2]
                self.connection.result = None
                self.connection.response = None
    
            if self.connection.raise_exceptions and result and result['result'] not in DO_NOT_RAISE_EXCEPTIONS:
                if log_enabled(PROTOCOL):
                    log(PROTOCOL, 'operation result <%s> for <%s>', result, self.connection)
                self._outstanding.pop(message_id)
                self.connection.result = result.copy()
>               raise LDAPOperationResult(result=result['result'], description=result['description'], dn=result['dn'], message=result['message'], response_type=result['type'])
E               ldap3.core.exceptions.LDAPOperationsErrorResult: LDAPOperationsErrorResult - 1 - operationsError - None - None - bindResponse - None

.venv/lib/python3.13.../ldap3/strategy/base.py:403: LDAPOperationsErrorResult
tests.e2e.test_provider_ldap.TestProviderLDAP::test_ldap_bind_success
Stack Traces | 12.1s run time
self = <unittest.case._Outcome object at 0x7f62f47d5350>
test_case = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_success>
subTest = False

    @contextlib.contextmanager
    def testPartExecutor(self, test_case, subTest=False):
        old_success = self.success
        self.success = True
        try:
>           yield

.../hostedtoolcache/Python/3.13.11........./x64/lib/python3.13/unittest/case.py:58: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_success>
result = <TestCaseFunction test_ldap_bind_success>

    def run(self, result=None):
        if result is None:
            result = self.defaultTestResult()
            startTestRun = getattr(result, 'startTestRun', None)
            stopTestRun = getattr(result, 'stopTestRun', None)
            if startTestRun is not None:
                startTestRun()
        else:
            stopTestRun = None
    
        result.startTest(self)
        try:
            testMethod = getattr(self, self._testMethodName)
            if (getattr(self.__class__, "__unittest_skip__", False) or
                getattr(testMethod, "__unittest_skip__", False)):
                # If the class or method was skipped.
                skip_why = (getattr(self.__class__, '__unittest_skip_why__', '')
                            or getattr(testMethod, '__unittest_skip_why__', ''))
                _addSkip(result, self, skip_why)
                return result
    
            expecting_failure = (
                getattr(self, "__unittest_expecting_failure__", False) or
                getattr(testMethod, "__unittest_expecting_failure__", False)
            )
            outcome = _Outcome(result)
            start_time = time.perf_counter()
            try:
                self._outcome = outcome
    
                with outcome.testPartExecutor(self):
                    self._callSetUp()
                if outcome.success:
                    outcome.expecting_failure = expecting_failure
                    with outcome.testPartExecutor(self):
>                       self._callTestMethod(testMethod)

.../hostedtoolcache/Python/3.13.11........./x64/lib/python3.13/unittest/case.py:651: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_success>
method = <bound method TestProviderLDAP.test_ldap_bind_success of <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_success>>

    def _callTestMethod(self, method):
>       if method() is not None:

.../hostedtoolcache/Python/3.13.11........./x64/lib/python3.13/unittest/case.py:606: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_success>
args = (), kwargs = {}

    @wraps(func)
    def wrapper(self: TransactionTestCase, *args, **kwargs):
        """Run test again if we're below max_retries, including tearDown and
        setUp. Otherwise raise the error"""
        nonlocal count
        try:
>           return func(self, *args, **kwargs)

tests/e2e/utils.py:324: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

args = (<tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_success>,)
kwargs = {}, file = 'default/flow-default-invalidation-flow.yaml'
content = 'version: 1\nmetadata:\n  name: Default - Invalidation flow\nentries:\n- attrs:\n    designation: invalidation\n    na...0\n    stage: !KeyOf default-invalidation-logout\n    target: !KeyOf flow\n  model: authentik_flows.flowstagebinding\n'

    @wraps(func)
    def wrapper(*args, **kwargs):
        for file in files:
            content = BlueprintInstance(path=file).retrieve()
            Importer.from_string(content).apply()
>       return func(*args, **kwargs)

.../blueprints/tests/__init__.py:25: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_success>

    @retry()
    @apply_blueprint(
        "default/flow-default-authentication-flow.yaml",
        "default/flow-default-invalidation-flow.yaml",
    )
    def test_ldap_bind_success(self):
        """Test simple bind"""
        self._prepare()
        server = Server("ldap://localhost:3389", get_info=ALL)
        _connection = Connection(
            server,
            raise_exceptions=True,
            user=f"cn={self.user.username},ou=users,DC=ldap,DC=goauthentik,DC=io",
            password=self.user.username,
        )
>       _connection.bind()

tests/e2e/test_provider_ldap.py:75: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = Connection(server=Server(host='localhost', port=3389, use_ssl=False, allowed_referral_hosts=[('*', True)], get_info='A...oder=True, auto_range=True, return_empty_attributes=True, auto_encode=True, auto_escape=True, use_referral_cache=False)
read_server_info = True, controls = None

    def bind(self,
             read_server_info=True,
             controls=None):
        """Bind to ldap Server with the authentication method and the user defined in the connection
    
        :param read_server_info: reads info from server
        :param controls: LDAP controls to send along with the bind operation
        :type controls: list of tuple
        :return: bool
    
        """
        if log_enabled(BASIC):
            log(BASIC, 'start BIND operation via <%s>', self)
        self.last_error = None
        with self.connection_lock:
            if self.lazy and not self._executing_deferred:
                if self.strategy.pooled:
                    self.strategy.validate_bind(controls)
                self._deferred_bind = True
                self._bind_controls = controls
                self.bound = True
                if log_enabled(BASIC):
                    log(BASIC, 'deferring bind for <%s>', self)
            else:
                self._deferred_bind = False
                self._bind_controls = None
                if self.closed:  # try to open connection if closed
                    self.open(read_server_info=False)
                if self.authentication == ANONYMOUS:
                    if log_enabled(PROTOCOL):
                        log(PROTOCOL, 'performing anonymous BIND for <%s>', self)
                    if not self.strategy.pooled:
                        request = bind_operation(self.version, self.authentication, self.user, '', auto_encode=self.auto_encode)
                        if log_enabled(PROTOCOL):
                            log(PROTOCOL, 'anonymous BIND request <%s> sent via <%s>', bind_request_to_dict(request), self)
                        response = self.post_send_single_response(self.send('bindRequest', request, controls))
                    else:
                        response = self.strategy.validate_bind(controls)  # only for REUSABLE
                elif self.authentication == SIMPLE:
                    if log_enabled(PROTOCOL):
                        log(PROTOCOL, 'performing simple BIND for <%s>', self)
                    if not self.strategy.pooled:
                        request = bind_operation(self.version, self.authentication, self.user, self.password, auto_encode=self.auto_encode)
                        if log_enabled(PROTOCOL):
                            log(PROTOCOL, 'simple BIND request <%s> sent via <%s>', bind_request_to_dict(request), self)
>                       response = self.post_send_single_response(self.send('bindRequest', request, controls))

.venv/lib/python3.13.../ldap3/core/connection.py:607: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <ldap3.strategy.sync.SyncStrategy object at 0x7f62f3c48d60>
message_id = 9

    def post_send_single_response(self, message_id):
        """
        Executed after an Operation Request (except Search)
        Returns the result message or None
        """
>       responses, result = self.get_response(message_id)

.venv/lib/python3.13.../ldap3/strategy/sync.py:160: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <ldap3.strategy.sync.SyncStrategy object at 0x7f62f3c48d60>
message_id = 9, timeout = 20, get_request = False

    def get_response(self, message_id, timeout=None, get_request=False):
        """
        Get response LDAP messages
        Responses are returned by the underlying connection strategy
        Check if message_id LDAP message is still outstanding and wait for timeout to see if it appears in _get_response
        Result is stored in connection.result
        Responses without result is stored in connection.response
        A tuple (responses, result) is returned
        """
        if timeout is None:
            timeout = get_config_parameter('RESPONSE_WAITING_TIMEOUT')
        response = None
        result = None
        # request = None
        if self._outstanding and message_id in self._outstanding:
            responses = self._get_response(message_id, timeout)
    
            if not responses:
                if log_enabled(ERROR):
                    log(ERROR, 'socket timeout, no response from server for <%s>', self.connection)
                raise LDAPResponseTimeoutError('no response from server')
    
            if responses == SESSION_TERMINATED_BY_SERVER:
                try:  # try to close the session but don't raise any error if server has already closed the session
                    self.close()
                except (socket.error, LDAPExceptionError):
                    pass
                self.connection.last_error = 'session terminated by server'
                if log_enabled(ERROR):
                    log(ERROR, '<%s> for <%s>', self.connection.last_error, self.connection)
                raise LDAPSessionTerminatedByServerError(self.connection.last_error)
            elif responses == TRANSACTION_ERROR:  # Novell LDAP Transaction unsolicited notification
                self.connection.last_error = 'transaction error'
                if log_enabled(ERROR):
                    log(ERROR, '<%s> for <%s>', self.connection.last_error, self.connection)
                raise LDAPTransactionError(self.connection.last_error)
    
            # if referral in response opens a new connection to resolve referrals if requested
    
            if responses[-2]['result'] == RESULT_REFERRAL:
                if self.connection.usage:
                    self.connection._usage.referrals_received += 1
                if self.connection.auto_referrals:
                    ref_response, ref_result = self.do_operation_on_referral(self._outstanding[message_id], responses[-2]['referrals'])
                    if ref_response is not None:
                        responses = ref_response + [ref_result]
                        responses.append(RESPONSE_COMPLETE)
                    elif ref_result is not None:
                        responses = [ref_result, RESPONSE_COMPLETE]
    
                    self._referrals = []
    
            if responses:
                result = responses[-2]
                response = responses[:-2]
                self.connection.result = None
                self.connection.response = None
    
            if self.connection.raise_exceptions and result and result['result'] not in DO_NOT_RAISE_EXCEPTIONS:
                if log_enabled(PROTOCOL):
                    log(PROTOCOL, 'operation result <%s> for <%s>', result, self.connection)
                self._outstanding.pop(message_id)
                self.connection.result = result.copy()
>               raise LDAPOperationResult(result=result['result'], description=result['description'], dn=result['dn'], message=result['message'], response_type=result['type'])
E               ldap3.core.exceptions.LDAPOperationsErrorResult: LDAPOperationsErrorResult - 1 - operationsError - None - None - bindResponse - None

.venv/lib/python3.13.../ldap3/strategy/base.py:403: LDAPOperationsErrorResult
tests.e2e.test_provider_ldap.TestProviderLDAP::test_ldap_search_attrs_filter
Stack Traces | 12.6s run time
self = <unittest.case._Outcome object at 0x7f62f3de0690>
test_case = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_search_attrs_filter>
subTest = False

    @contextlib.contextmanager
    def testPartExecutor(self, test_case, subTest=False):
        old_success = self.success
        self.success = True
        try:
>           yield

.../hostedtoolcache/Python/3.13.11........./x64/lib/python3.13/unittest/case.py:58: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_search_attrs_filter>
result = <TestCaseFunction test_ldap_search_attrs_filter>

    def run(self, result=None):
        if result is None:
            result = self.defaultTestResult()
            startTestRun = getattr(result, 'startTestRun', None)
            stopTestRun = getattr(result, 'stopTestRun', None)
            if startTestRun is not None:
                startTestRun()
        else:
            stopTestRun = None
    
        result.startTest(self)
        try:
            testMethod = getattr(self, self._testMethodName)
            if (getattr(self.__class__, "__unittest_skip__", False) or
                getattr(testMethod, "__unittest_skip__", False)):
                # If the class or method was skipped.
                skip_why = (getattr(self.__class__, '__unittest_skip_why__', '')
                            or getattr(testMethod, '__unittest_skip_why__', ''))
                _addSkip(result, self, skip_why)
                return result
    
            expecting_failure = (
                getattr(self, "__unittest_expecting_failure__", False) or
                getattr(testMethod, "__unittest_expecting_failure__", False)
            )
            outcome = _Outcome(result)
            start_time = time.perf_counter()
            try:
                self._outcome = outcome
    
                with outcome.testPartExecutor(self):
                    self._callSetUp()
                if outcome.success:
                    outcome.expecting_failure = expecting_failure
                    with outcome.testPartExecutor(self):
>                       self._callTestMethod(testMethod)

.../hostedtoolcache/Python/3.13.11........./x64/lib/python3.13/unittest/case.py:651: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_search_attrs_filter>
method = <bound method TestProviderLDAP.test_ldap_search_attrs_filter of <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_search_attrs_filter>>

    def _callTestMethod(self, method):
>       if method() is not None:

.../hostedtoolcache/Python/3.13.11........./x64/lib/python3.13/unittest/case.py:606: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_search_attrs_filter>
args = (), kwargs = {}

    @wraps(func)
    def wrapper(self: TransactionTestCase, *args, **kwargs):
        """Run test again if we're below max_retries, including tearDown and
        setUp. Otherwise raise the error"""
        nonlocal count
        try:
>           return func(self, *args, **kwargs)

tests/e2e/utils.py:324: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

args = (<tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_search_attrs_filter>,)
kwargs = {}, file = 'default/flow-default-invalidation-flow.yaml'
content = 'version: 1\nmetadata:\n  name: Default - Invalidation flow\nentries:\n- attrs:\n    designation: invalidation\n    na...0\n    stage: !KeyOf default-invalidation-logout\n    target: !KeyOf flow\n  model: authentik_flows.flowstagebinding\n'

    @wraps(func)
    def wrapper(*args, **kwargs):
        for file in files:
            content = BlueprintInstance(path=file).retrieve()
            Importer.from_string(content).apply()
>       return func(*args, **kwargs)

.../blueprints/tests/__init__.py:25: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

args = (<tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_search_attrs_filter>,)
kwargs = {}, config = <AuthentikTenantsConfig: authentik_tenants>

    @wraps(func)
    def wrapper(*args, **kwargs):
        config = apps.get_app_config(app_name)
        if isinstance(config, ManagedAppConfig):
            config._on_startup_callback(None)
>       return func(*args, **kwargs)

.../blueprints/tests/__init__.py:43: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

args = (<tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_search_attrs_filter>,)
kwargs = {}, config = <AuthentikOutpostConfig: authentik_outposts>

    @wraps(func)
    def wrapper(*args, **kwargs):
        config = apps.get_app_config(app_name)
        if isinstance(config, ManagedAppConfig):
            config._on_startup_callback(None)
>       return func(*args, **kwargs)

.../blueprints/tests/__init__.py:43: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_search_attrs_filter>

    @retry()
    @apply_blueprint(
        "default/flow-default-authentication-flow.yaml",
        "default/flow-default-invalidation-flow.yaml",
    )
    @reconcile_app("authentik_tenants")
    @reconcile_app("authentik_outposts")
    def test_ldap_search_attrs_filter(self):
        """Test search with attributes filtering"""
        # Remove akadmin to ensure list is correct
        # Remove user before starting container so it's not cached
        User.objects.filter(username="akadmin").delete()
    
        outpost = self._prepare()
        server = Server("ldap://localhost:3389", get_info=ALL)
        _connection = Connection(
            server,
            raise_exceptions=True,
            user=f"cn={self.user.username},ou=users,dc=ldap,dc=goauthentik,dc=io",
            password=self.user.username,
        )
>       _connection.bind()

tests/e2e/test_provider_ldap.py:451: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = Connection(server=Server(host='localhost', port=3389, use_ssl=False, allowed_referral_hosts=[('*', True)], get_info='A...oder=True, auto_range=True, return_empty_attributes=True, auto_encode=True, auto_escape=True, use_referral_cache=False)
read_server_info = True, controls = None

    def bind(self,
             read_server_info=True,
             controls=None):
        """Bind to ldap Server with the authentication method and the user defined in the connection
    
        :param read_server_info: reads info from server
        :param controls: LDAP controls to send along with the bind operation
        :type controls: list of tuple
        :return: bool
    
        """
        if log_enabled(BASIC):
            log(BASIC, 'start BIND operation via <%s>', self)
        self.last_error = None
        with self.connection_lock:
            if self.lazy and not self._executing_deferred:
                if self.strategy.pooled:
                    self.strategy.validate_bind(controls)
                self._deferred_bind = True
                self._bind_controls = controls
                self.bound = True
                if log_enabled(BASIC):
                    log(BASIC, 'deferring bind for <%s>', self)
            else:
                self._deferred_bind = False
                self._bind_controls = None
                if self.closed:  # try to open connection if closed
                    self.open(read_server_info=False)
                if self.authentication == ANONYMOUS:
                    if log_enabled(PROTOCOL):
                        log(PROTOCOL, 'performing anonymous BIND for <%s>', self)
                    if not self.strategy.pooled:
                        request = bind_operation(self.version, self.authentication, self.user, '', auto_encode=self.auto_encode)
                        if log_enabled(PROTOCOL):
                            log(PROTOCOL, 'anonymous BIND request <%s> sent via <%s>', bind_request_to_dict(request), self)
                        response = self.post_send_single_response(self.send('bindRequest', request, controls))
                    else:
                        response = self.strategy.validate_bind(controls)  # only for REUSABLE
                elif self.authentication == SIMPLE:
                    if log_enabled(PROTOCOL):
                        log(PROTOCOL, 'performing simple BIND for <%s>', self)
                    if not self.strategy.pooled:
                        request = bind_operation(self.version, self.authentication, self.user, self.password, auto_encode=self.auto_encode)
                        if log_enabled(PROTOCOL):
                            log(PROTOCOL, 'simple BIND request <%s> sent via <%s>', bind_request_to_dict(request), self)
>                       response = self.post_send_single_response(self.send('bindRequest', request, controls))

.venv/lib/python3.13.../ldap3/core/connection.py:607: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <ldap3.strategy.sync.SyncStrategy object at 0x7f62f3779bd0>
message_id = 8

    def post_send_single_response(self, message_id):
        """
        Executed after an Operation Request (except Search)
        Returns the result message or None
        """
>       responses, result = self.get_response(message_id)

.venv/lib/python3.13.../ldap3/strategy/sync.py:160: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <ldap3.strategy.sync.SyncStrategy object at 0x7f62f3779bd0>
message_id = 8, timeout = 20, get_request = False

    def get_response(self, message_id, timeout=None, get_request=False):
        """
        Get response LDAP messages
        Responses are returned by the underlying connection strategy
        Check if message_id LDAP message is still outstanding and wait for timeout to see if it appears in _get_response
        Result is stored in connection.result
        Responses without result is stored in connection.response
        A tuple (responses, result) is returned
        """
        if timeout is None:
            timeout = get_config_parameter('RESPONSE_WAITING_TIMEOUT')
        response = None
        result = None
        # request = None
        if self._outstanding and message_id in self._outstanding:
            responses = self._get_response(message_id, timeout)
    
            if not responses:
                if log_enabled(ERROR):
                    log(ERROR, 'socket timeout, no response from server for <%s>', self.connection)
                raise LDAPResponseTimeoutError('no response from server')
    
            if responses == SESSION_TERMINATED_BY_SERVER:
                try:  # try to close the session but don't raise any error if server has already closed the session
                    self.close()
                except (socket.error, LDAPExceptionError):
                    pass
                self.connection.last_error = 'session terminated by server'
                if log_enabled(ERROR):
                    log(ERROR, '<%s> for <%s>', self.connection.last_error, self.connection)
                raise LDAPSessionTerminatedByServerError(self.connection.last_error)
            elif responses == TRANSACTION_ERROR:  # Novell LDAP Transaction unsolicited notification
                self.connection.last_error = 'transaction error'
                if log_enabled(ERROR):
                    log(ERROR, '<%s> for <%s>', self.connection.last_error, self.connection)
                raise LDAPTransactionError(self.connection.last_error)
    
            # if referral in response opens a new connection to resolve referrals if requested
    
            if responses[-2]['result'] == RESULT_REFERRAL:
                if self.connection.usage:
                    self.connection._usage.referrals_received += 1
                if self.connection.auto_referrals:
                    ref_response, ref_result = self.do_operation_on_referral(self._outstanding[message_id], responses[-2]['referrals'])
                    if ref_response is not None:
                        responses = ref_response + [ref_result]
                        responses.append(RESPONSE_COMPLETE)
                    elif ref_result is not None:
                        responses = [ref_result, RESPONSE_COMPLETE]
    
                    self._referrals = []
    
            if responses:
                result = responses[-2]
                response = responses[:-2]
                self.connection.result = None
                self.connection.response = None
    
            if self.connection.raise_exceptions and result and result['result'] not in DO_NOT_RAISE_EXCEPTIONS:
                if log_enabled(PROTOCOL):
                    log(PROTOCOL, 'operation result <%s> for <%s>', result, self.connection)
                self._outstanding.pop(message_id)
                self.connection.result = result.copy()
>               raise LDAPOperationResult(result=result['result'], description=result['description'], dn=result['dn'], message=result['message'], response_type=result['type'])
E               ldap3.core.exceptions.LDAPOperationsErrorResult: LDAPOperationsErrorResult - 1 - operationsError - None - None - bindResponse - None

.venv/lib/python3.13.../ldap3/strategy/base.py:403: LDAPOperationsErrorResult
tests.e2e.test_provider_ldap.TestProviderLDAP::test_ldap_bind_success_ssl
Stack Traces | 12.6s run time
self = <unittest.case._Outcome object at 0x7f62f3f1c290>
test_case = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_success_ssl>
subTest = False

    @contextlib.contextmanager
    def testPartExecutor(self, test_case, subTest=False):
        old_success = self.success
        self.success = True
        try:
>           yield

.../hostedtoolcache/Python/3.13.11........./x64/lib/python3.13/unittest/case.py:58: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_success_ssl>
result = <TestCaseFunction test_ldap_bind_success_ssl>

    def run(self, result=None):
        if result is None:
            result = self.defaultTestResult()
            startTestRun = getattr(result, 'startTestRun', None)
            stopTestRun = getattr(result, 'stopTestRun', None)
            if startTestRun is not None:
                startTestRun()
        else:
            stopTestRun = None
    
        result.startTest(self)
        try:
            testMethod = getattr(self, self._testMethodName)
            if (getattr(self.__class__, "__unittest_skip__", False) or
                getattr(testMethod, "__unittest_skip__", False)):
                # If the class or method was skipped.
                skip_why = (getattr(self.__class__, '__unittest_skip_why__', '')
                            or getattr(testMethod, '__unittest_skip_why__', ''))
                _addSkip(result, self, skip_why)
                return result
    
            expecting_failure = (
                getattr(self, "__unittest_expecting_failure__", False) or
                getattr(testMethod, "__unittest_expecting_failure__", False)
            )
            outcome = _Outcome(result)
            start_time = time.perf_counter()
            try:
                self._outcome = outcome
    
                with outcome.testPartExecutor(self):
                    self._callSetUp()
                if outcome.success:
                    outcome.expecting_failure = expecting_failure
                    with outcome.testPartExecutor(self):
>                       self._callTestMethod(testMethod)

.../hostedtoolcache/Python/3.13.11........./x64/lib/python3.13/unittest/case.py:651: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_success_ssl>
method = <bound method TestProviderLDAP.test_ldap_bind_success_ssl of <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_success_ssl>>

    def _callTestMethod(self, method):
>       if method() is not None:

.../hostedtoolcache/Python/3.13.11........./x64/lib/python3.13/unittest/case.py:606: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_success_ssl>
args = (), kwargs = {}

    @wraps(func)
    def wrapper(self: TransactionTestCase, *args, **kwargs):
        """Run test again if we're below max_retries, including tearDown and
        setUp. Otherwise raise the error"""
        nonlocal count
        try:
>           return func(self, *args, **kwargs)

tests/e2e/utils.py:324: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

args = (<tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_success_ssl>,)
kwargs = {}, file = 'default/flow-default-invalidation-flow.yaml'
content = 'version: 1\nmetadata:\n  name: Default - Invalidation flow\nentries:\n- attrs:\n    designation: invalidation\n    na...0\n    stage: !KeyOf default-invalidation-logout\n    target: !KeyOf flow\n  model: authentik_flows.flowstagebinding\n'

    @wraps(func)
    def wrapper(*args, **kwargs):
        for file in files:
            content = BlueprintInstance(path=file).retrieve()
            Importer.from_string(content).apply()
>       return func(*args, **kwargs)

.../blueprints/tests/__init__.py:25: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_success_ssl>

    @retry()
    @apply_blueprint(
        "default/flow-default-authentication-flow.yaml",
        "default/flow-default-invalidation-flow.yaml",
    )
    def test_ldap_bind_success_ssl(self):
        """Test simple bind with ssl"""
        self._prepare()
        server = Server("ldaps://localhost:6636", get_info=ALL)
        _connection = Connection(
            server,
            raise_exceptions=True,
            user=f"cn={self.user.username},ou=users,DC=ldap,DC=goauthentik,DC=io",
            password=self.user.username,
        )
>       _connection.bind()

tests/e2e/test_provider_ldap.py:102: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = Connection(server=Server(host='localhost', port=6636, use_ssl=True, allowed_referral_hosts=[('*', True)], tls=Tls(vali...oder=True, auto_range=True, return_empty_attributes=True, auto_encode=True, auto_escape=True, use_referral_cache=False)
read_server_info = True, controls = None

    def bind(self,
             read_server_info=True,
             controls=None):
        """Bind to ldap Server with the authentication method and the user defined in the connection
    
        :param read_server_info: reads info from server
        :param controls: LDAP controls to send along with the bind operation
        :type controls: list of tuple
        :return: bool
    
        """
        if log_enabled(BASIC):
            log(BASIC, 'start BIND operation via <%s>', self)
        self.last_error = None
        with self.connection_lock:
            if self.lazy and not self._executing_deferred:
                if self.strategy.pooled:
                    self.strategy.validate_bind(controls)
                self._deferred_bind = True
                self._bind_controls = controls
                self.bound = True
                if log_enabled(BASIC):
                    log(BASIC, 'deferring bind for <%s>', self)
            else:
                self._deferred_bind = False
                self._bind_controls = None
                if self.closed:  # try to open connection if closed
                    self.open(read_server_info=False)
                if self.authentication == ANONYMOUS:
                    if log_enabled(PROTOCOL):
                        log(PROTOCOL, 'performing anonymous BIND for <%s>', self)
                    if not self.strategy.pooled:
                        request = bind_operation(self.version, self.authentication, self.user, '', auto_encode=self.auto_encode)
                        if log_enabled(PROTOCOL):
                            log(PROTOCOL, 'anonymous BIND request <%s> sent via <%s>', bind_request_to_dict(request), self)
                        response = self.post_send_single_response(self.send('bindRequest', request, controls))
                    else:
                        response = self.strategy.validate_bind(controls)  # only for REUSABLE
                elif self.authentication == SIMPLE:
                    if log_enabled(PROTOCOL):
                        log(PROTOCOL, 'performing simple BIND for <%s>', self)
                    if not self.strategy.pooled:
                        request = bind_operation(self.version, self.authentication, self.user, self.password, auto_encode=self.auto_encode)
                        if log_enabled(PROTOCOL):
                            log(PROTOCOL, 'simple BIND request <%s> sent via <%s>', bind_request_to_dict(request), self)
>                       response = self.post_send_single_response(self.send('bindRequest', request, controls))

.venv/lib/python3.13.../ldap3/core/connection.py:607: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <ldap3.strategy.sync.SyncStrategy object at 0x7f62f3378b90>
message_id = 11

    def post_send_single_response(self, message_id):
        """
        Executed after an Operation Request (except Search)
        Returns the result message or None
        """
>       responses, result = self.get_response(message_id)

.venv/lib/python3.13.../ldap3/strategy/sync.py:160: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <ldap3.strategy.sync.SyncStrategy object at 0x7f62f3378b90>
message_id = 11, timeout = 20, get_request = False

    def get_response(self, message_id, timeout=None, get_request=False):
        """
        Get response LDAP messages
        Responses are returned by the underlying connection strategy
        Check if message_id LDAP message is still outstanding and wait for timeout to see if it appears in _get_response
        Result is stored in connection.result
        Responses without result is stored in connection.response
        A tuple (responses, result) is returned
        """
        if timeout is None:
            timeout = get_config_parameter('RESPONSE_WAITING_TIMEOUT')
        response = None
        result = None
        # request = None
        if self._outstanding and message_id in self._outstanding:
            responses = self._get_response(message_id, timeout)
    
            if not responses:
                if log_enabled(ERROR):
                    log(ERROR, 'socket timeout, no response from server for <%s>', self.connection)
                raise LDAPResponseTimeoutError('no response from server')
    
            if responses == SESSION_TERMINATED_BY_SERVER:
                try:  # try to close the session but don't raise any error if server has already closed the session
                    self.close()
                except (socket.error, LDAPExceptionError):
                    pass
                self.connection.last_error = 'session terminated by server'
                if log_enabled(ERROR):
                    log(ERROR, '<%s> for <%s>', self.connection.last_error, self.connection)
                raise LDAPSessionTerminatedByServerError(self.connection.last_error)
            elif responses == TRANSACTION_ERROR:  # Novell LDAP Transaction unsolicited notification
                self.connection.last_error = 'transaction error'
                if log_enabled(ERROR):
                    log(ERROR, '<%s> for <%s>', self.connection.last_error, self.connection)
                raise LDAPTransactionError(self.connection.last_error)
    
            # if referral in response opens a new connection to resolve referrals if requested
    
            if responses[-2]['result'] == RESULT_REFERRAL:
                if self.connection.usage:
                    self.connection._usage.referrals_received += 1
                if self.connection.auto_referrals:
                    ref_response, ref_result = self.do_operation_on_referral(self._outstanding[message_id], responses[-2]['referrals'])
                    if ref_response is not None:
                        responses = ref_response + [ref_result]
                        responses.append(RESPONSE_COMPLETE)
                    elif ref_result is not None:
                        responses = [ref_result, RESPONSE_COMPLETE]
    
                    self._referrals = []
    
            if responses:
                result = responses[-2]
                response = responses[:-2]
                self.connection.result = None
                self.connection.response = None
    
            if self.connection.raise_exceptions and result and result['result'] not in DO_NOT_RAISE_EXCEPTIONS:
                if log_enabled(PROTOCOL):
                    log(PROTOCOL, 'operation result <%s> for <%s>', result, self.connection)
                self._outstanding.pop(message_id)
                self.connection.result = result.copy()
>               raise LDAPOperationResult(result=result['result'], description=result['description'], dn=result['dn'], message=result['message'], response_type=result['type'])
E               ldap3.core.exceptions.LDAPOperationsErrorResult: LDAPOperationsErrorResult - 1 - operationsError - None - None - bindResponse - None

.venv/lib/python3.13.../ldap3/strategy/base.py:403: LDAPOperationsErrorResult
tests.e2e.test_provider_ldap.TestProviderLDAP::test_ldap_bind_success_starttls
Stack Traces | 12.6s run time
self = <unittest.case._Outcome object at 0x7f62f3de1a90>
test_case = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_success_starttls>
subTest = False

    @contextlib.contextmanager
    def testPartExecutor(self, test_case, subTest=False):
        old_success = self.success
        self.success = True
        try:
>           yield

.../hostedtoolcache/Python/3.13.11........./x64/lib/python3.13/unittest/case.py:58: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_success_starttls>
result = <TestCaseFunction test_ldap_bind_success_starttls>

    def run(self, result=None):
        if result is None:
            result = self.defaultTestResult()
            startTestRun = getattr(result, 'startTestRun', None)
            stopTestRun = getattr(result, 'stopTestRun', None)
            if startTestRun is not None:
                startTestRun()
        else:
            stopTestRun = None
    
        result.startTest(self)
        try:
            testMethod = getattr(self, self._testMethodName)
            if (getattr(self.__class__, "__unittest_skip__", False) or
                getattr(testMethod, "__unittest_skip__", False)):
                # If the class or method was skipped.
                skip_why = (getattr(self.__class__, '__unittest_skip_why__', '')
                            or getattr(testMethod, '__unittest_skip_why__', ''))
                _addSkip(result, self, skip_why)
                return result
    
            expecting_failure = (
                getattr(self, "__unittest_expecting_failure__", False) or
                getattr(testMethod, "__unittest_expecting_failure__", False)
            )
            outcome = _Outcome(result)
            start_time = time.perf_counter()
            try:
                self._outcome = outcome
    
                with outcome.testPartExecutor(self):
                    self._callSetUp()
                if outcome.success:
                    outcome.expecting_failure = expecting_failure
                    with outcome.testPartExecutor(self):
>                       self._callTestMethod(testMethod)

.../hostedtoolcache/Python/3.13.11........./x64/lib/python3.13/unittest/case.py:651: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_success_starttls>
method = <bound method TestProviderLDAP.test_ldap_bind_success_starttls of <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_success_starttls>>

    def _callTestMethod(self, method):
>       if method() is not None:

.../hostedtoolcache/Python/3.13.11........./x64/lib/python3.13/unittest/case.py:606: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_success_starttls>
args = (), kwargs = {}

    @wraps(func)
    def wrapper(self: TransactionTestCase, *args, **kwargs):
        """Run test again if we're below max_retries, including tearDown and
        setUp. Otherwise raise the error"""
        nonlocal count
        try:
>           return func(self, *args, **kwargs)

tests/e2e/utils.py:324: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

args = (<tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_success_starttls>,)
kwargs = {}, file = 'default/flow-default-invalidation-flow.yaml'
content = 'version: 1\nmetadata:\n  name: Default - Invalidation flow\nentries:\n- attrs:\n    designation: invalidation\n    na...0\n    stage: !KeyOf default-invalidation-logout\n    target: !KeyOf flow\n  model: authentik_flows.flowstagebinding\n'

    @wraps(func)
    def wrapper(*args, **kwargs):
        for file in files:
            content = BlueprintInstance(path=file).retrieve()
            Importer.from_string(content).apply()
>       return func(*args, **kwargs)

.../blueprints/tests/__init__.py:25: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_success_starttls>

    @retry()
    @apply_blueprint(
        "default/flow-default-authentication-flow.yaml",
        "default/flow-default-invalidation-flow.yaml",
    )
    def test_ldap_bind_success_starttls(self):
        """Test simple bind with ssl"""
        self._prepare()
        server = Server("ldap://localhost:3389")
        _connection = Connection(
            server,
            raise_exceptions=True,
            user=f"cn={self.user.username},ou=users,DC=ldap,DC=goauthentik,DC=io",
            password=self.user.username,
        )
        _connection.start_tls()
>       _connection.bind()

tests/e2e/test_provider_ldap.py:130: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = Connection(server=Server(host='localhost', port=3389, use_ssl=False, allowed_referral_hosts=[('*', True)], tls=Tls(val...oder=True, auto_range=True, return_empty_attributes=True, auto_encode=True, auto_escape=True, use_referral_cache=False)
read_server_info = True, controls = None

    def bind(self,
             read_server_info=True,
             controls=None):
        """Bind to ldap Server with the authentication method and the user defined in the connection
    
        :param read_server_info: reads info from server
        :param controls: LDAP controls to send along with the bind operation
        :type controls: list of tuple
        :return: bool
    
        """
        if log_enabled(BASIC):
            log(BASIC, 'start BIND operation via <%s>', self)
        self.last_error = None
        with self.connection_lock:
            if self.lazy and not self._executing_deferred:
                if self.strategy.pooled:
                    self.strategy.validate_bind(controls)
                self._deferred_bind = True
                self._bind_controls = controls
                self.bound = True
                if log_enabled(BASIC):
                    log(BASIC, 'deferring bind for <%s>', self)
            else:
                self._deferred_bind = False
                self._bind_controls = None
                if self.closed:  # try to open connection if closed
                    self.open(read_server_info=False)
                if self.authentication == ANONYMOUS:
                    if log_enabled(PROTOCOL):
                        log(PROTOCOL, 'performing anonymous BIND for <%s>', self)
                    if not self.strategy.pooled:
                        request = bind_operation(self.version, self.authentication, self.user, '', auto_encode=self.auto_encode)
                        if log_enabled(PROTOCOL):
                            log(PROTOCOL, 'anonymous BIND request <%s> sent via <%s>', bind_request_to_dict(request), self)
                        response = self.post_send_single_response(self.send('bindRequest', request, controls))
                    else:
                        response = self.strategy.validate_bind(controls)  # only for REUSABLE
                elif self.authentication == SIMPLE:
                    if log_enabled(PROTOCOL):
                        log(PROTOCOL, 'performing simple BIND for <%s>', self)
                    if not self.strategy.pooled:
                        request = bind_operation(self.version, self.authentication, self.user, self.password, auto_encode=self.auto_encode)
                        if log_enabled(PROTOCOL):
                            log(PROTOCOL, 'simple BIND request <%s> sent via <%s>', bind_request_to_dict(request), self)
>                       response = self.post_send_single_response(self.send('bindRequest', request, controls))

.venv/lib/python3.13.../ldap3/core/connection.py:607: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <ldap3.strategy.sync.SyncStrategy object at 0x7f62f3740b90>
message_id = 7

    def post_send_single_response(self, message_id):
        """
        Executed after an Operation Request (except Search)
        Returns the result message or None
        """
>       responses, result = self.get_response(message_id)

.venv/lib/python3.13.../ldap3/strategy/sync.py:160: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <ldap3.strategy.sync.SyncStrategy object at 0x7f62f3740b90>
message_id = 7, timeout = 20, get_request = False

    def get_response(self, message_id, timeout=None, get_request=False):
        """
        Get response LDAP messages
        Responses are returned by the underlying connection strategy
        Check if message_id LDAP message is still outstanding and wait for timeout to see if it appears in _get_response
        Result is stored in connection.result
        Responses without result is stored in connection.response
        A tuple (responses, result) is returned
        """
        if timeout is None:
            timeout = get_config_parameter('RESPONSE_WAITING_TIMEOUT')
        response = None
        result = None
        # request = None
        if self._outstanding and message_id in self._outstanding:
            responses = self._get_response(message_id, timeout)
    
            if not responses:
                if log_enabled(ERROR):
                    log(ERROR, 'socket timeout, no response from server for <%s>', self.connection)
                raise LDAPResponseTimeoutError('no response from server')
    
            if responses == SESSION_TERMINATED_BY_SERVER:
                try:  # try to close the session but don't raise any error if server has already closed the session
                    self.close()
                except (socket.error, LDAPExceptionError):
                    pass
                self.connection.last_error = 'session terminated by server'
                if log_enabled(ERROR):
                    log(ERROR, '<%s> for <%s>', self.connection.last_error, self.connection)
                raise LDAPSessionTerminatedByServerError(self.connection.last_error)
            elif responses == TRANSACTION_ERROR:  # Novell LDAP Transaction unsolicited notification
                self.connection.last_error = 'transaction error'
                if log_enabled(ERROR):
                    log(ERROR, '<%s> for <%s>', self.connection.last_error, self.connection)
                raise LDAPTransactionError(self.connection.last_error)
    
            # if referral in response opens a new connection to resolve referrals if requested
    
            if responses[-2]['result'] == RESULT_REFERRAL:
                if self.connection.usage:
                    self.connection._usage.referrals_received += 1
                if self.connection.auto_referrals:
                    ref_response, ref_result = self.do_operation_on_referral(self._outstanding[message_id], responses[-2]['referrals'])
                    if ref_response is not None:
                        responses = ref_response + [ref_result]
                        responses.append(RESPONSE_COMPLETE)
                    elif ref_result is not None:
                        responses = [ref_result, RESPONSE_COMPLETE]
    
                    self._referrals = []
    
            if responses:
                result = responses[-2]
                response = responses[:-2]
                self.connection.result = None
                self.connection.response = None
    
            if self.connection.raise_exceptions and result and result['result'] not in DO_NOT_RAISE_EXCEPTIONS:
                if log_enabled(PROTOCOL):
                    log(PROTOCOL, 'operation result <%s> for <%s>', result, self.connection)
                self._outstanding.pop(message_id)
                self.connection.result = result.copy()
>               raise LDAPOperationResult(result=result['result'], description=result['description'], dn=result['dn'], message=result['message'], response_type=result['type'])
E               ldap3.core.exceptions.LDAPOperationsErrorResult: LDAPOperationsErrorResult - 1 - operationsError - None - None - bindResponse - None

.venv/lib/python3.13.../ldap3/strategy/base.py:403: LDAPOperationsErrorResult
tests.e2e.test_provider_ldap.TestProviderLDAP::test_ldap_bind_search_no_perms
Stack Traces | 13s run time
self = <unittest.case._Outcome object at 0x7f6300347ce0>
test_case = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_search_no_perms>
subTest = False

    @contextlib.contextmanager
    def testPartExecutor(self, test_case, subTest=False):
        old_success = self.success
        self.success = True
        try:
>           yield

.../hostedtoolcache/Python/3.13.11........./x64/lib/python3.13/unittest/case.py:58: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_search_no_perms>
result = <TestCaseFunction test_ldap_bind_search_no_perms>

    def run(self, result=None):
        if result is None:
            result = self.defaultTestResult()
            startTestRun = getattr(result, 'startTestRun', None)
            stopTestRun = getattr(result, 'stopTestRun', None)
            if startTestRun is not None:
                startTestRun()
        else:
            stopTestRun = None
    
        result.startTest(self)
        try:
            testMethod = getattr(self, self._testMethodName)
            if (getattr(self.__class__, "__unittest_skip__", False) or
                getattr(testMethod, "__unittest_skip__", False)):
                # If the class or method was skipped.
                skip_why = (getattr(self.__class__, '__unittest_skip_why__', '')
                            or getattr(testMethod, '__unittest_skip_why__', ''))
                _addSkip(result, self, skip_why)
                return result
    
            expecting_failure = (
                getattr(self, "__unittest_expecting_failure__", False) or
                getattr(testMethod, "__unittest_expecting_failure__", False)
            )
            outcome = _Outcome(result)
            start_time = time.perf_counter()
            try:
                self._outcome = outcome
    
                with outcome.testPartExecutor(self):
                    self._callSetUp()
                if outcome.success:
                    outcome.expecting_failure = expecting_failure
                    with outcome.testPartExecutor(self):
>                       self._callTestMethod(testMethod)

.../hostedtoolcache/Python/3.13.11........./x64/lib/python3.13/unittest/case.py:651: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_search_no_perms>
method = <bound method TestProviderLDAP.test_ldap_bind_search_no_perms of <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_search_no_perms>>

    def _callTestMethod(self, method):
>       if method() is not None:

.../hostedtoolcache/Python/3.13.11........./x64/lib/python3.13/unittest/case.py:606: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_search_no_perms>
args = (), kwargs = {}

    @wraps(func)
    def wrapper(self: TransactionTestCase, *args, **kwargs):
        """Run test again if we're below max_retries, including tearDown and
        setUp. Otherwise raise the error"""
        nonlocal count
        try:
>           return func(self, *args, **kwargs)

tests/e2e/utils.py:324: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

args = (<tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_search_no_perms>,)
kwargs = {}, file = 'default/flow-default-invalidation-flow.yaml'
content = 'version: 1\nmetadata:\n  name: Default - Invalidation flow\nentries:\n- attrs:\n    designation: invalidation\n    na...0\n    stage: !KeyOf default-invalidation-logout\n    target: !KeyOf flow\n  model: authentik_flows.flowstagebinding\n'

    @wraps(func)
    def wrapper(*args, **kwargs):
        for file in files:
            content = BlueprintInstance(path=file).retrieve()
            Importer.from_string(content).apply()
>       return func(*args, **kwargs)

.../blueprints/tests/__init__.py:25: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

args = (<tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_search_no_perms>,)
kwargs = {}, config = <AuthentikTenantsConfig: authentik_tenants>

    @wraps(func)
    def wrapper(*args, **kwargs):
        config = apps.get_app_config(app_name)
        if isinstance(config, ManagedAppConfig):
            config._on_startup_callback(None)
>       return func(*args, **kwargs)

.../blueprints/tests/__init__.py:43: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

args = (<tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_search_no_perms>,)
kwargs = {}, config = <AuthentikOutpostConfig: authentik_outposts>

    @wraps(func)
    def wrapper(*args, **kwargs):
        config = apps.get_app_config(app_name)
        if isinstance(config, ManagedAppConfig):
            config._on_startup_callback(None)
>       return func(*args, **kwargs)

.../blueprints/tests/__init__.py:43: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_search_no_perms>

    @retry()
    @apply_blueprint(
        "default/flow-default-authentication-flow.yaml",
        "default/flow-default-invalidation-flow.yaml",
    )
    @reconcile_app("authentik_tenants")
    @reconcile_app("authentik_outposts")
    def test_ldap_bind_search_no_perms(self):
        """Test simple bind + search"""
        user = create_test_user()
        self._prepare()
        server = Server("ldap://localhost:3389", get_info=ALL)
        _connection = Connection(
            server,
            raise_exceptions=True,
            user=f"cn={user.username},ou=users,dc=ldap,dc=goauthentik,dc=io",
            password=user.username,
        )
>       _connection.bind()

tests/e2e/test_provider_ldap.py:337: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = Connection(server=Server(host='localhost', port=3389, use_ssl=False, allowed_referral_hosts=[('*', True)], get_info='A...oder=True, auto_range=True, return_empty_attributes=True, auto_encode=True, auto_escape=True, use_referral_cache=False)
read_server_info = True, controls = None

    def bind(self,
             read_server_info=True,
             controls=None):
        """Bind to ldap Server with the authentication method and the user defined in the connection
    
        :param read_server_info: reads info from server
        :param controls: LDAP controls to send along with the bind operation
        :type controls: list of tuple
        :return: bool
    
        """
        if log_enabled(BASIC):
            log(BASIC, 'start BIND operation via <%s>', self)
        self.last_error = None
        with self.connection_lock:
            if self.lazy and not self._executing_deferred:
                if self.strategy.pooled:
                    self.strategy.validate_bind(controls)
                self._deferred_bind = True
                self._bind_controls = controls
                self.bound = True
                if log_enabled(BASIC):
                    log(BASIC, 'deferring bind for <%s>', self)
            else:
                self._deferred_bind = False
                self._bind_controls = None
                if self.closed:  # try to open connection if closed
                    self.open(read_server_info=False)
                if self.authentication == ANONYMOUS:
                    if log_enabled(PROTOCOL):
                        log(PROTOCOL, 'performing anonymous BIND for <%s>', self)
                    if not self.strategy.pooled:
                        request = bind_operation(self.version, self.authentication, self.user, '', auto_encode=self.auto_encode)
                        if log_enabled(PROTOCOL):
                            log(PROTOCOL, 'anonymous BIND request <%s> sent via <%s>', bind_request_to_dict(request), self)
                        response = self.post_send_single_response(self.send('bindRequest', request, controls))
                    else:
                        response = self.strategy.validate_bind(controls)  # only for REUSABLE
                elif self.authentication == SIMPLE:
                    if log_enabled(PROTOCOL):
                        log(PROTOCOL, 'performing simple BIND for <%s>', self)
                    if not self.strategy.pooled:
                        request = bind_operation(self.version, self.authentication, self.user, self.password, auto_encode=self.auto_encode)
                        if log_enabled(PROTOCOL):
                            log(PROTOCOL, 'simple BIND request <%s> sent via <%s>', bind_request_to_dict(request), self)
>                       response = self.post_send_single_response(self.send('bindRequest', request, controls))

.venv/lib/python3.13.../ldap3/core/connection.py:607: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <ldap3.strategy.sync.SyncStrategy object at 0x7f62f2b52060>
message_id = 10

    def post_send_single_response(self, message_id):
        """
        Executed after an Operation Request (except Search)
        Returns the result message or None
        """
>       responses, result = self.get_response(message_id)

.venv/lib/python3.13.../ldap3/strategy/sync.py:160: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <ldap3.strategy.sync.SyncStrategy object at 0x7f62f2b52060>
message_id = 10, timeout = 20, get_request = False

    def get_response(self, message_id, timeout=None, get_request=False):
        """
        Get response LDAP messages
        Responses are returned by the underlying connection strategy
        Check if message_id LDAP message is still outstanding and wait for timeout to see if it appears in _get_response
        Result is stored in connection.result
        Responses without result is stored in connection.response
        A tuple (responses, result) is returned
        """
        if timeout is None:
            timeout = get_config_parameter('RESPONSE_WAITING_TIMEOUT')
        response = None
        result = None
        # request = None
        if self._outstanding and message_id in self._outstanding:
            responses = self._get_response(message_id, timeout)
    
            if not responses:
                if log_enabled(ERROR):
                    log(ERROR, 'socket timeout, no response from server for <%s>', self.connection)
                raise LDAPResponseTimeoutError('no response from server')
    
            if responses == SESSION_TERMINATED_BY_SERVER:
                try:  # try to close the session but don't raise any error if server has already closed the session
                    self.close()
                except (socket.error, LDAPExceptionError):
                    pass
                self.connection.last_error = 'session terminated by server'
                if log_enabled(ERROR):
                    log(ERROR, '<%s> for <%s>', self.connection.last_error, self.connection)
                raise LDAPSessionTerminatedByServerError(self.connection.last_error)
            elif responses == TRANSACTION_ERROR:  # Novell LDAP Transaction unsolicited notification
                self.connection.last_error = 'transaction error'
                if log_enabled(ERROR):
                    log(ERROR, '<%s> for <%s>', self.connection.last_error, self.connection)
                raise LDAPTransactionError(self.connection.last_error)
    
            # if referral in response opens a new connection to resolve referrals if requested
    
            if responses[-2]['result'] == RESULT_REFERRAL:
                if self.connection.usage:
                    self.connection._usage.referrals_received += 1
                if self.connection.auto_referrals:
                    ref_response, ref_result = self.do_operation_on_referral(self._outstanding[message_id], responses[-2]['referrals'])
                    if ref_response is not None:
                        responses = ref_response + [ref_result]
                        responses.append(RESPONSE_COMPLETE)
                    elif ref_result is not None:
                        responses = [ref_result, RESPONSE_COMPLETE]
    
                    self._referrals = []
    
            if responses:
                result = responses[-2]
                response = responses[:-2]
                self.connection.result = None
                self.connection.response = None
    
            if self.connection.raise_exceptions and result and result['result'] not in DO_NOT_RAISE_EXCEPTIONS:
                if log_enabled(PROTOCOL):
                    log(PROTOCOL, 'operation result <%s> for <%s>', result, self.connection)
                self._outstanding.pop(message_id)
                self.connection.result = result.copy()
>               raise LDAPOperationResult(result=result['result'], description=result['description'], dn=result['dn'], message=result['message'], response_type=result['type'])
E               ldap3.core.exceptions.LDAPOperationsErrorResult: LDAPOperationsErrorResult - 1 - operationsError - None - None - bindResponse - None

.venv/lib/python3.13.../ldap3/strategy/base.py:403: LDAPOperationsErrorResult
tests.e2e.test_provider_ldap.TestProviderLDAP::test_ldap_schema
Stack Traces | 92.2s run time
self = <unittest.case._Outcome object at 0x7f62f4a21be0>
test_case = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_schema>
subTest = False

    @contextlib.contextmanager
    def testPartExecutor(self, test_case, subTest=False):
        old_success = self.success
        self.success = True
        try:
>           yield

.../hostedtoolcache/Python/3.13.11........./x64/lib/python3.13/unittest/case.py:58: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_schema>
result = <TestCaseFunction test_ldap_schema>

    def run(self, result=None):
        if result is None:
            result = self.defaultTestResult()
            startTestRun = getattr(result, 'startTestRun', None)
            stopTestRun = getattr(result, 'stopTestRun', None)
            if startTestRun is not None:
                startTestRun()
        else:
            stopTestRun = None
    
        result.startTest(self)
        try:
            testMethod = getattr(self, self._testMethodName)
            if (getattr(self.__class__, "__unittest_skip__", False) or
                getattr(testMethod, "__unittest_skip__", False)):
                # If the class or method was skipped.
                skip_why = (getattr(self.__class__, '__unittest_skip_why__', '')
                            or getattr(testMethod, '__unittest_skip_why__', ''))
                _addSkip(result, self, skip_why)
                return result
    
            expecting_failure = (
                getattr(self, "__unittest_expecting_failure__", False) or
                getattr(testMethod, "__unittest_expecting_failure__", False)
            )
            outcome = _Outcome(result)
            start_time = time.perf_counter()
            try:
                self._outcome = outcome
    
                with outcome.testPartExecutor(self):
                    self._callSetUp()
                if outcome.success:
                    outcome.expecting_failure = expecting_failure
                    with outcome.testPartExecutor(self):
>                       self._callTestMethod(testMethod)

.../hostedtoolcache/Python/3.13.11........./x64/lib/python3.13/unittest/case.py:651: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_schema>
method = <bound method TestProviderLDAP.test_ldap_schema of <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_schema>>

    def _callTestMethod(self, method):
>       if method() is not None:

.../hostedtoolcache/Python/3.13.11........./x64/lib/python3.13/unittest/case.py:606: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_schema>
args = (), kwargs = {}

    @wraps(func)
    def wrapper(self: TransactionTestCase, *args, **kwargs):
        """Run test again if we're below max_retries, including tearDown and
        setUp. Otherwise raise the error"""
        nonlocal count
        try:
>           return func(self, *args, **kwargs)

tests/e2e/utils.py:324: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

args = (<tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_schema>,)
kwargs = {}, file = 'default/flow-default-invalidation-flow.yaml'
content = 'version: 1\nmetadata:\n  name: Default - Invalidation flow\nentries:\n- attrs:\n    designation: invalidation\n    na...0\n    stage: !KeyOf default-invalidation-logout\n    target: !KeyOf flow\n  model: authentik_flows.flowstagebinding\n'

    @wraps(func)
    def wrapper(*args, **kwargs):
        for file in files:
            content = BlueprintInstance(path=file).retrieve()
            Importer.from_string(content).apply()
>       return func(*args, **kwargs)

.../blueprints/tests/__init__.py:25: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

args = (<tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_schema>,)
kwargs = {}, config = <AuthentikTenantsConfig: authentik_tenants>

    @wraps(func)
    def wrapper(*args, **kwargs):
        config = apps.get_app_config(app_name)
        if isinstance(config, ManagedAppConfig):
            config._on_startup_callback(None)
>       return func(*args, **kwargs)

.../blueprints/tests/__init__.py:43: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

args = (<tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_schema>,)
kwargs = {}, config = <AuthentikOutpostConfig: authentik_outposts>

    @wraps(func)
    def wrapper(*args, **kwargs):
        config = apps.get_app_config(app_name)
        if isinstance(config, ManagedAppConfig):
            config._on_startup_callback(None)
>       return func(*args, **kwargs)

.../blueprints/tests/__init__.py:43: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_schema>

    @retry()
    @apply_blueprint(
        "default/flow-default-authentication-flow.yaml",
        "default/flow-default-invalidation-flow.yaml",
    )
    @reconcile_app("authentik_tenants")
    @reconcile_app("authentik_outposts")
    def test_ldap_schema(self):
        """Test LDAP Schema"""
        self._prepare()
        server = Server("ldap://localhost:3389", get_info=ALL)
        _connection = Connection(
            server,
            raise_exceptions=True,
            user=f"cn={self.user.username},ou=users,dc=ldap,dc=goauthentik,dc=io",
            password=self.user.username,
        )
>       _connection.bind()

tests/e2e/test_provider_ldap.py:425: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = Connection(server=Server(host='localhost', port=3389, use_ssl=False, allowed_referral_hosts=[('*', True)], get_info='A...oder=True, auto_range=True, return_empty_attributes=True, auto_encode=True, auto_escape=True, use_referral_cache=False)
read_server_info = True, controls = None

    def bind(self,
             read_server_info=True,
             controls=None):
        """Bind to ldap Server with the authentication method and the user defined in the connection
    
        :param read_server_info: reads info from server
        :param controls: LDAP controls to send along with the bind operation
        :type controls: list of tuple
        :return: bool
    
        """
        if log_enabled(BASIC):
            log(BASIC, 'start BIND operation via <%s>', self)
        self.last_error = None
        with self.connection_lock:
            if self.lazy and not self._executing_deferred:
                if self.strategy.pooled:
                    self.strategy.validate_bind(controls)
                self._deferred_bind = True
                self._bind_controls = controls
                self.bound = True
                if log_enabled(BASIC):
                    log(BASIC, 'deferring bind for <%s>', self)
            else:
                self._deferred_bind = False
                self._bind_controls = None
                if self.closed:  # try to open connection if closed
                    self.open(read_server_info=False)
                if self.authentication == ANONYMOUS:
                    if log_enabled(PROTOCOL):
                        log(PROTOCOL, 'performing anonymous BIND for <%s>', self)
                    if not self.strategy.pooled:
                        request = bind_operation(self.version, self.authentication, self.user, '', auto_encode=self.auto_encode)
                        if log_enabled(PROTOCOL):
                            log(PROTOCOL, 'anonymous BIND request <%s> sent via <%s>', bind_request_to_dict(request), self)
                        response = self.post_send_single_response(self.send('bindRequest', request, controls))
                    else:
                        response = self.strategy.validate_bind(controls)  # only for REUSABLE
                elif self.authentication == SIMPLE:
                    if log_enabled(PROTOCOL):
                        log(PROTOCOL, 'performing simple BIND for <%s>', self)
                    if not self.strategy.pooled:
                        request = bind_operation(self.version, self.authentication, self.user, self.password, auto_encode=self.auto_encode)
                        if log_enabled(PROTOCOL):
                            log(PROTOCOL, 'simple BIND request <%s> sent via <%s>', bind_request_to_dict(request), self)
>                       response = self.post_send_single_response(self.send('bindRequest', request, controls))

.venv/lib/python3.13.../ldap3/core/connection.py:607: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <ldap3.strategy.sync.SyncStrategy object at 0x7f62f4964d70>
message_id = 1

    def post_send_single_response(self, message_id):
        """
        Executed after an Operation Request (except Search)
        Returns the result message or None
        """
>       responses, result = self.get_response(message_id)

.venv/lib/python3.13.../ldap3/strategy/sync.py:160: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <ldap3.strategy.sync.SyncStrategy object at 0x7f62f4964d70>
message_id = 1, timeout = 20, get_request = False

    def get_response(self, message_id, timeout=None, get_request=False):
        """
        Get response LDAP messages
        Responses are returned by the underlying connection strategy
        Check if message_id LDAP message is still outstanding and wait for timeout to see if it appears in _get_response
        Result is stored in connection.result
        Responses without result is stored in connection.response
        A tuple (responses, result) is returned
        """
        if timeout is None:
            timeout = get_config_parameter('RESPONSE_WAITING_TIMEOUT')
        response = None
        result = None
        # request = None
        if self._outstanding and message_id in self._outstanding:
            responses = self._get_response(message_id, timeout)
    
            if not responses:
                if log_enabled(ERROR):
                    log(ERROR, 'socket timeout, no response from server for <%s>', self.connection)
                raise LDAPResponseTimeoutError('no response from server')
    
            if responses == SESSION_TERMINATED_BY_SERVER:
                try:  # try to close the session but don't raise any error if server has already closed the session
                    self.close()
                except (socket.error, LDAPExceptionError):
                    pass
                self.connection.last_error = 'session terminated by server'
                if log_enabled(ERROR):
                    log(ERROR, '<%s> for <%s>', self.connection.last_error, self.connection)
                raise LDAPSessionTerminatedByServerError(self.connection.last_error)
            elif responses == TRANSACTION_ERROR:  # Novell LDAP Transaction unsolicited notification
                self.connection.last_error = 'transaction error'
                if log_enabled(ERROR):
                    log(ERROR, '<%s> for <%s>', self.connection.last_error, self.connection)
                raise LDAPTransactionError(self.connection.last_error)
    
            # if referral in response opens a new connection to resolve referrals if requested
    
            if responses[-2]['result'] == RESULT_REFERRAL:
                if self.connection.usage:
                    self.connection._usage.referrals_received += 1
                if self.connection.auto_referrals:
                    ref_response, ref_result = self.do_operation_on_referral(self._outstanding[message_id], responses[-2]['referrals'])
                    if ref_response is not None:
                        responses = ref_response + [ref_result]
                        responses.append(RESPONSE_COMPLETE)
                    elif ref_result is not None:
                        responses = [ref_result, RESPONSE_COMPLETE]
    
                    self._referrals = []
    
            if responses:
                result = responses[-2]
                response = responses[:-2]
                self.connection.result = None
                self.connection.response = None
    
            if self.connection.raise_exceptions and result and result['result'] not in DO_NOT_RAISE_EXCEPTIONS:
                if log_enabled(PROTOCOL):
                    log(PROTOCOL, 'operation result <%s> for <%s>', result, self.connection)
                self._outstanding.pop(message_id)
                self.connection.result = result.copy()
>               raise LDAPOperationResult(result=result['result'], description=result['description'], dn=result['dn'], message=result['message'], response_type=result['type'])
E               ldap3.core.exceptions.LDAPOperationsErrorResult: LDAPOperationsErrorResult - 1 - operationsError - None - None - bindResponse - None

.venv/lib/python3.13.../ldap3/strategy/base.py:403: LDAPOperationsErrorResult

To view more test analytics, go to the Test Analytics Dashboard
📋 Got 3 mins? Take this short survey to help us improve Test Analytics.

@BeryJu BeryJu merged commit 13bb8e3 into version-2025.10 Feb 3, 2026
76 of 87 checks passed
@BeryJu BeryJu deleted the cherry-pick/19979-to-version-2025.10 branch February 3, 2026 18:43
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants