Skip to content

website/docs: fix Transifex link in translation guide (cherry-pick #19735 to version-2025.10)#19770

Merged
BeryJu merged 1 commit intoversion-2025.10from
cherry-pick/19735-to-version-2025.10
Jan 27, 2026
Merged

website/docs: fix Transifex link in translation guide (cherry-pick #19735 to version-2025.10)#19770
BeryJu merged 1 commit intoversion-2025.10from
cherry-pick/19735-to-version-2025.10

Conversation

@authentik-automation
Copy link
Contributor

Cherry-pick of #19735 to version-2025.10 branch.

Original PR: #19735
Original Author: @dominic-r
Cherry-picked commit: 6653ea3

@netlify
Copy link

netlify bot commented Jan 26, 2026

Deploy Preview for authentik-docs ready!

Name Link
🔨 Latest commit c415761
🔍 Latest deploy log https://app.netlify.com/projects/authentik-docs/deploys/6977c02589b7af0008f3527b
😎 Deploy Preview https://deploy-preview-19770--authentik-docs.netlify.app
📱 Preview on mobile
Toggle QR Code...

QR Code

Use your smartphone camera to open QR code link.

To edit notification comments on pull requests, go to your Netlify project configuration.

@netlify
Copy link

netlify bot commented Jan 26, 2026

Deploy Preview for authentik-integrations ready!

Name Link
🔨 Latest commit c415761
🔍 Latest deploy log https://app.netlify.com/projects/authentik-integrations/deploys/6977c02547593200084f9bb2
😎 Deploy Preview https://deploy-preview-19770--authentik-integrations.netlify.app
📱 Preview on mobile
Toggle QR Code...

QR Code

Use your smartphone camera to open QR code link.

To edit notification comments on pull requests, go to your Netlify project configuration.

@codecov
Copy link

codecov bot commented Jan 26, 2026

❌ 15 Tests Failed:

Tests completed Failed Passed Skipped
2205 15 2190 2
View the top 3 failed test(s) by shortest run time
tests.e2e.test_provider_ldap.TestProviderLDAP::test_ldap_bind_success_ssl
Stack Traces | 50.4s run time
self = <unittest.case._Outcome object at 0x7f8e036cb770>
test_case = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_success_ssl>
subTest = False

    @contextlib.contextmanager
    def testPartExecutor(self, test_case, subTest=False):
        old_success = self.success
        self.success = True
        try:
>           yield

.../hostedtoolcache/Python/3.13.11........./x64/lib/python3.13/unittest/case.py:58: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_success_ssl>
result = <TestCaseFunction test_ldap_bind_success_ssl>

    def run(self, result=None):
        if result is None:
            result = self.defaultTestResult()
            startTestRun = getattr(result, 'startTestRun', None)
            stopTestRun = getattr(result, 'stopTestRun', None)
            if startTestRun is not None:
                startTestRun()
        else:
            stopTestRun = None
    
        result.startTest(self)
        try:
            testMethod = getattr(self, self._testMethodName)
            if (getattr(self.__class__, "__unittest_skip__", False) or
                getattr(testMethod, "__unittest_skip__", False)):
                # If the class or method was skipped.
                skip_why = (getattr(self.__class__, '__unittest_skip_why__', '')
                            or getattr(testMethod, '__unittest_skip_why__', ''))
                _addSkip(result, self, skip_why)
                return result
    
            expecting_failure = (
                getattr(self, "__unittest_expecting_failure__", False) or
                getattr(testMethod, "__unittest_expecting_failure__", False)
            )
            outcome = _Outcome(result)
            start_time = time.perf_counter()
            try:
                self._outcome = outcome
    
                with outcome.testPartExecutor(self):
                    self._callSetUp()
                if outcome.success:
                    outcome.expecting_failure = expecting_failure
                    with outcome.testPartExecutor(self):
>                       self._callTestMethod(testMethod)

.../hostedtoolcache/Python/3.13.11........./x64/lib/python3.13/unittest/case.py:651: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_success_ssl>
method = <bound method TestProviderLDAP.test_ldap_bind_success_ssl of <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_success_ssl>>

    def _callTestMethod(self, method):
>       if method() is not None:

.../hostedtoolcache/Python/3.13.11........./x64/lib/python3.13/unittest/case.py:606: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_success_ssl>
args = (), kwargs = {}

    @wraps(func)
    def wrapper(self: TransactionTestCase, *args, **kwargs):
        """Run test again if we're below max_retries, including tearDown and
        setUp. Otherwise raise the error"""
        nonlocal count
        try:
>           return func(self, *args, **kwargs)

tests/e2e/utils.py:324: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

args = (<tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_success_ssl>,)
kwargs = {}, file = 'default/flow-default-invalidation-flow.yaml'
content = 'version: 1\nmetadata:\n  name: Default - Invalidation flow\nentries:\n- attrs:\n    designation: invalidation\n    na...0\n    stage: !KeyOf default-invalidation-logout\n    target: !KeyOf flow\n  model: authentik_flows.flowstagebinding\n'

    @wraps(func)
    def wrapper(*args, **kwargs):
        for file in files:
            content = BlueprintInstance(path=file).retrieve()
            Importer.from_string(content).apply()
>       return func(*args, **kwargs)

.../blueprints/tests/__init__.py:25: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_success_ssl>

    @retry()
    @apply_blueprint(
        "default/flow-default-authentication-flow.yaml",
        "default/flow-default-invalidation-flow.yaml",
    )
    def test_ldap_bind_success_ssl(self):
        """Test simple bind with ssl"""
>       self._prepare()

tests/e2e/test_provider_ldap.py:94: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_success_ssl>

    def _prepare(self) -> User:
        """prepare user, provider, app and container"""
        self.user.attributes["extraAttribute"] = "bar"
        self.user.save()
    
        ldap: LDAPProvider = LDAPProvider.objects.create(
            name=generate_id(),
            authorization_flow=Flow.objects.get(slug="default-authentication-flow"),
            search_mode=APIAccessMode.CACHED,
        )
        assign_perm("search_full_directory", self.user, ldap)
        # we need to create an application to actually access the ldap
        Application.objects.create(name=generate_id(), slug=generate_id(), provider=ldap)
        outpost: Outpost = Outpost.objects.create(
            name=generate_id(),
            type=OutpostType.LDAP,
            _config=asdict(OutpostConfig(log_level="debug")),
        )
        outpost.providers.add(ldap)
    
>       self.start_ldap(outpost)

tests/e2e/test_provider_ldap.py:57: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_success_ssl>
outpost = <Outpost: Outpost 2sWu6T1uFldjvzSxNzqbb7EgXQlQMxMr21ZdYViK>

    def start_ldap(self, outpost: Outpost):
        """Start ldap container based on outpost created"""
>       self.run_container(
            image=self.get_container_image("ghcr.io/goauthentik/dev-ldap"),
            ports={
                "3389": "3389",
                "6636": "6636",
            },
            environment={
                "AUTHENTIK_TOKEN": outpost.token.key,
            },
        )

tests/e2e/test_provider_ldap.py:26: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_success_ssl>
specs = {'detach': True, 'environment': {'AUTHENTIK_HOST': 'http://10.1.0.20:54051', 'AUTHENTIK_TOKEN': 'JCokcf2WUFF2nwsgxRaOy...r.io/goauthentik/dev-ldap:gh-main', 'labels': {'io.goauthentik.test': 'LE3LhswMqkI17BpS2aP9TpMii7Xgod8TGJSFPBbr'}, ...}
container = <Container: b5ed5c0be12f>
state = {'Dead': False, 'Error': '', 'ExitCode': 0, 'FinishedAt': '0001-01-01T00:00:00Z', ...}

    def run_container(self, **specs: dict[str, Any]) -> Container:
        if "network_mode" not in specs:
            specs["network"] = self.__network.name
        specs["labels"] = self.docker_labels
        specs["detach"] = True
        if hasattr(self, "live_server_url"):
            specs.setdefault("environment", {})
            specs["environment"]["AUTHENTIK_HOST"] = self.live_server_url
        container = self.docker_client.containers.run(**specs)
        container.reload()
        state = container.attrs.get("State", {})
        if "Health" not in state:
            return container
>       self.wait_for_container(container)

tests/e2e/utils.py:117: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_success_ssl>
container = <Container: b5ed5c0be12f>

    def wait_for_container(self, container: Container):
        """Check that container is health"""
        attempt = 0
        while True:
            container.reload()
            status = container.attrs.get("State", {}).get("Health", {}).get("Status")
            if status == "healthy":
                return container
            sleep(1)
            attempt += 1
            if attempt >= self.max_healthcheck_attempts:
>               raise self.failureException("Container failed to start")
E               AssertionError: Container failed to start

tests/e2e/utils.py:91: AssertionError
tests.e2e.test_provider_ldap.TestProviderLDAP::test_ldap_bind_success
Stack Traces | 50.5s run time
self = <unittest.case._Outcome object at 0x7f8e04937890>
test_case = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_success>
subTest = False

    @contextlib.contextmanager
    def testPartExecutor(self, test_case, subTest=False):
        old_success = self.success
        self.success = True
        try:
>           yield

.../hostedtoolcache/Python/3.13.11........./x64/lib/python3.13/unittest/case.py:58: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_success>
result = <TestCaseFunction test_ldap_bind_success>

    def run(self, result=None):
        if result is None:
            result = self.defaultTestResult()
            startTestRun = getattr(result, 'startTestRun', None)
            stopTestRun = getattr(result, 'stopTestRun', None)
            if startTestRun is not None:
                startTestRun()
        else:
            stopTestRun = None
    
        result.startTest(self)
        try:
            testMethod = getattr(self, self._testMethodName)
            if (getattr(self.__class__, "__unittest_skip__", False) or
                getattr(testMethod, "__unittest_skip__", False)):
                # If the class or method was skipped.
                skip_why = (getattr(self.__class__, '__unittest_skip_why__', '')
                            or getattr(testMethod, '__unittest_skip_why__', ''))
                _addSkip(result, self, skip_why)
                return result
    
            expecting_failure = (
                getattr(self, "__unittest_expecting_failure__", False) or
                getattr(testMethod, "__unittest_expecting_failure__", False)
            )
            outcome = _Outcome(result)
            start_time = time.perf_counter()
            try:
                self._outcome = outcome
    
                with outcome.testPartExecutor(self):
                    self._callSetUp()
                if outcome.success:
                    outcome.expecting_failure = expecting_failure
                    with outcome.testPartExecutor(self):
>                       self._callTestMethod(testMethod)

.../hostedtoolcache/Python/3.13.11........./x64/lib/python3.13/unittest/case.py:651: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_success>
method = <bound method TestProviderLDAP.test_ldap_bind_success of <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_success>>

    def _callTestMethod(self, method):
>       if method() is not None:

.../hostedtoolcache/Python/3.13.11........./x64/lib/python3.13/unittest/case.py:606: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_success>
args = (), kwargs = {}

    @wraps(func)
    def wrapper(self: TransactionTestCase, *args, **kwargs):
        """Run test again if we're below max_retries, including tearDown and
        setUp. Otherwise raise the error"""
        nonlocal count
        try:
>           return func(self, *args, **kwargs)

tests/e2e/utils.py:324: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

args = (<tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_success>,)
kwargs = {}, file = 'default/flow-default-invalidation-flow.yaml'
content = 'version: 1\nmetadata:\n  name: Default - Invalidation flow\nentries:\n- attrs:\n    designation: invalidation\n    na...0\n    stage: !KeyOf default-invalidation-logout\n    target: !KeyOf flow\n  model: authentik_flows.flowstagebinding\n'

    @wraps(func)
    def wrapper(*args, **kwargs):
        for file in files:
            content = BlueprintInstance(path=file).retrieve()
            Importer.from_string(content).apply()
>       return func(*args, **kwargs)

.../blueprints/tests/__init__.py:25: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_success>

    @retry()
    @apply_blueprint(
        "default/flow-default-authentication-flow.yaml",
        "default/flow-default-invalidation-flow.yaml",
    )
    def test_ldap_bind_success(self):
        """Test simple bind"""
>       self._prepare()

tests/e2e/test_provider_ldap.py:67: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_success>

    def _prepare(self) -> User:
        """prepare user, provider, app and container"""
        self.user.attributes["extraAttribute"] = "bar"
        self.user.save()
    
        ldap: LDAPProvider = LDAPProvider.objects.create(
            name=generate_id(),
            authorization_flow=Flow.objects.get(slug="default-authentication-flow"),
            search_mode=APIAccessMode.CACHED,
        )
        assign_perm("search_full_directory", self.user, ldap)
        # we need to create an application to actually access the ldap
        Application.objects.create(name=generate_id(), slug=generate_id(), provider=ldap)
        outpost: Outpost = Outpost.objects.create(
            name=generate_id(),
            type=OutpostType.LDAP,
            _config=asdict(OutpostConfig(log_level="debug")),
        )
        outpost.providers.add(ldap)
    
>       self.start_ldap(outpost)

tests/e2e/test_provider_ldap.py:57: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_success>
outpost = <Outpost: Outpost 0vRyJxKyZfz1NuKosfkvZEtydgqsD7iaJwOQiOCX>

    def start_ldap(self, outpost: Outpost):
        """Start ldap container based on outpost created"""
>       self.run_container(
            image=self.get_container_image("ghcr.io/goauthentik/dev-ldap"),
            ports={
                "3389": "3389",
                "6636": "6636",
            },
            environment={
                "AUTHENTIK_TOKEN": outpost.token.key,
            },
        )

tests/e2e/test_provider_ldap.py:26: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_success>
specs = {'detach': True, 'environment': {'AUTHENTIK_HOST': 'http://10.1.0.20:54051', 'AUTHENTIK_TOKEN': 'kaTPG1b6IJHPw2sgEpGWZ...r.io/goauthentik/dev-ldap:gh-main', 'labels': {'io.goauthentik.test': 'LE3LhswMqkI17BpS2aP9TpMii7Xgod8TGJSFPBbr'}, ...}
container = <Container: 930f062b2c47>
state = {'Dead': False, 'Error': '', 'ExitCode': 0, 'FinishedAt': '0001-01-01T00:00:00Z', ...}

    def run_container(self, **specs: dict[str, Any]) -> Container:
        if "network_mode" not in specs:
            specs["network"] = self.__network.name
        specs["labels"] = self.docker_labels
        specs["detach"] = True
        if hasattr(self, "live_server_url"):
            specs.setdefault("environment", {})
            specs["environment"]["AUTHENTIK_HOST"] = self.live_server_url
        container = self.docker_client.containers.run(**specs)
        container.reload()
        state = container.attrs.get("State", {})
        if "Health" not in state:
            return container
>       self.wait_for_container(container)

tests/e2e/utils.py:117: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_success>
container = <Container: 930f062b2c47>

    def wait_for_container(self, container: Container):
        """Check that container is health"""
        attempt = 0
        while True:
            container.reload()
            status = container.attrs.get("State", {}).get("Health", {}).get("Status")
            if status == "healthy":
                return container
            sleep(1)
            attempt += 1
            if attempt >= self.max_healthcheck_attempts:
>               raise self.failureException("Container failed to start")
E               AssertionError: Container failed to start

tests/e2e/utils.py:91: AssertionError
tests.e2e.test_provider_ldap.TestProviderLDAP::test_ldap_bind_success_starttls
Stack Traces | 50.5s run time
self = <unittest.case._Outcome object at 0x7f8e053daad0>
test_case = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_success_starttls>
subTest = False

    @contextlib.contextmanager
    def testPartExecutor(self, test_case, subTest=False):
        old_success = self.success
        self.success = True
        try:
>           yield

.../hostedtoolcache/Python/3.13.11........./x64/lib/python3.13/unittest/case.py:58: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_success_starttls>
result = <TestCaseFunction test_ldap_bind_success_starttls>

    def run(self, result=None):
        if result is None:
            result = self.defaultTestResult()
            startTestRun = getattr(result, 'startTestRun', None)
            stopTestRun = getattr(result, 'stopTestRun', None)
            if startTestRun is not None:
                startTestRun()
        else:
            stopTestRun = None
    
        result.startTest(self)
        try:
            testMethod = getattr(self, self._testMethodName)
            if (getattr(self.__class__, "__unittest_skip__", False) or
                getattr(testMethod, "__unittest_skip__", False)):
                # If the class or method was skipped.
                skip_why = (getattr(self.__class__, '__unittest_skip_why__', '')
                            or getattr(testMethod, '__unittest_skip_why__', ''))
                _addSkip(result, self, skip_why)
                return result
    
            expecting_failure = (
                getattr(self, "__unittest_expecting_failure__", False) or
                getattr(testMethod, "__unittest_expecting_failure__", False)
            )
            outcome = _Outcome(result)
            start_time = time.perf_counter()
            try:
                self._outcome = outcome
    
                with outcome.testPartExecutor(self):
                    self._callSetUp()
                if outcome.success:
                    outcome.expecting_failure = expecting_failure
                    with outcome.testPartExecutor(self):
>                       self._callTestMethod(testMethod)

.../hostedtoolcache/Python/3.13.11........./x64/lib/python3.13/unittest/case.py:651: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_success_starttls>
method = <bound method TestProviderLDAP.test_ldap_bind_success_starttls of <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_success_starttls>>

    def _callTestMethod(self, method):
>       if method() is not None:

.../hostedtoolcache/Python/3.13.11........./x64/lib/python3.13/unittest/case.py:606: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_success_starttls>
args = (), kwargs = {}

    @wraps(func)
    def wrapper(self: TransactionTestCase, *args, **kwargs):
        """Run test again if we're below max_retries, including tearDown and
        setUp. Otherwise raise the error"""
        nonlocal count
        try:
>           return func(self, *args, **kwargs)

tests/e2e/utils.py:324: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

args = (<tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_success_starttls>,)
kwargs = {}, file = 'default/flow-default-invalidation-flow.yaml'
content = 'version: 1\nmetadata:\n  name: Default - Invalidation flow\nentries:\n- attrs:\n    designation: invalidation\n    na...0\n    stage: !KeyOf default-invalidation-logout\n    target: !KeyOf flow\n  model: authentik_flows.flowstagebinding\n'

    @wraps(func)
    def wrapper(*args, **kwargs):
        for file in files:
            content = BlueprintInstance(path=file).retrieve()
            Importer.from_string(content).apply()
>       return func(*args, **kwargs)

.../blueprints/tests/__init__.py:25: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_success_starttls>

    @retry()
    @apply_blueprint(
        "default/flow-default-authentication-flow.yaml",
        "default/flow-default-invalidation-flow.yaml",
    )
    def test_ldap_bind_success_starttls(self):
        """Test simple bind with ssl"""
>       self._prepare()

tests/e2e/test_provider_ldap.py:121: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_success_starttls>

    def _prepare(self) -> User:
        """prepare user, provider, app and container"""
        self.user.attributes["extraAttribute"] = "bar"
        self.user.save()
    
        ldap: LDAPProvider = LDAPProvider.objects.create(
            name=generate_id(),
            authorization_flow=Flow.objects.get(slug="default-authentication-flow"),
            search_mode=APIAccessMode.CACHED,
        )
        assign_perm("search_full_directory", self.user, ldap)
        # we need to create an application to actually access the ldap
        Application.objects.create(name=generate_id(), slug=generate_id(), provider=ldap)
        outpost: Outpost = Outpost.objects.create(
            name=generate_id(),
            type=OutpostType.LDAP,
            _config=asdict(OutpostConfig(log_level="debug")),
        )
        outpost.providers.add(ldap)
    
>       self.start_ldap(outpost)

tests/e2e/test_provider_ldap.py:57: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_success_starttls>
outpost = <Outpost: Outpost dtQ6yDcPdUndfCRjqP4uaHqGEt2PNbStPjDggwoh>

    def start_ldap(self, outpost: Outpost):
        """Start ldap container based on outpost created"""
>       self.run_container(
            image=self.get_container_image("ghcr.io/goauthentik/dev-ldap"),
            ports={
                "3389": "3389",
                "6636": "6636",
            },
            environment={
                "AUTHENTIK_TOKEN": outpost.token.key,
            },
        )

tests/e2e/test_provider_ldap.py:26: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_success_starttls>
specs = {'detach': True, 'environment': {'AUTHENTIK_HOST': 'http://10.1.0.20:54051', 'AUTHENTIK_TOKEN': 'XKI7lRDjYI9TLpBQK3FvW...r.io/goauthentik/dev-ldap:gh-main', 'labels': {'io.goauthentik.test': 'LE3LhswMqkI17BpS2aP9TpMii7Xgod8TGJSFPBbr'}, ...}
container = <Container: d2abaa8a3796>
state = {'Dead': False, 'Error': '', 'ExitCode': 0, 'FinishedAt': '0001-01-01T00:00:00Z', ...}

    def run_container(self, **specs: dict[str, Any]) -> Container:
        if "network_mode" not in specs:
            specs["network"] = self.__network.name
        specs["labels"] = self.docker_labels
        specs["detach"] = True
        if hasattr(self, "live_server_url"):
            specs.setdefault("environment", {})
            specs["environment"]["AUTHENTIK_HOST"] = self.live_server_url
        container = self.docker_client.containers.run(**specs)
        container.reload()
        state = container.attrs.get("State", {})
        if "Health" not in state:
            return container
>       self.wait_for_container(container)

tests/e2e/utils.py:117: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_success_starttls>
container = <Container: d2abaa8a3796>

    def wait_for_container(self, container: Container):
        """Check that container is health"""
        attempt = 0
        while True:
            container.reload()
            status = container.attrs.get("State", {}).get("Health", {}).get("Status")
            if status == "healthy":
                return container
            sleep(1)
            attempt += 1
            if attempt >= self.max_healthcheck_attempts:
>               raise self.failureException("Container failed to start")
E               AssertionError: Container failed to start

tests/e2e/utils.py:91: AssertionError
tests.e2e.test_provider_ldap.TestProviderLDAP::test_ldap_bind_fail
Stack Traces | 50.6s run time
self = <unittest.case._Outcome object at 0x7f8e04602850>
test_case = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_fail>
subTest = False

    @contextlib.contextmanager
    def testPartExecutor(self, test_case, subTest=False):
        old_success = self.success
        self.success = True
        try:
>           yield

.../hostedtoolcache/Python/3.13.11........./x64/lib/python3.13/unittest/case.py:58: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_fail>
result = <TestCaseFunction test_ldap_bind_fail>

    def run(self, result=None):
        if result is None:
            result = self.defaultTestResult()
            startTestRun = getattr(result, 'startTestRun', None)
            stopTestRun = getattr(result, 'stopTestRun', None)
            if startTestRun is not None:
                startTestRun()
        else:
            stopTestRun = None
    
        result.startTest(self)
        try:
            testMethod = getattr(self, self._testMethodName)
            if (getattr(self.__class__, "__unittest_skip__", False) or
                getattr(testMethod, "__unittest_skip__", False)):
                # If the class or method was skipped.
                skip_why = (getattr(self.__class__, '__unittest_skip_why__', '')
                            or getattr(testMethod, '__unittest_skip_why__', ''))
                _addSkip(result, self, skip_why)
                return result
    
            expecting_failure = (
                getattr(self, "__unittest_expecting_failure__", False) or
                getattr(testMethod, "__unittest_expecting_failure__", False)
            )
            outcome = _Outcome(result)
            start_time = time.perf_counter()
            try:
                self._outcome = outcome
    
                with outcome.testPartExecutor(self):
                    self._callSetUp()
                if outcome.success:
                    outcome.expecting_failure = expecting_failure
                    with outcome.testPartExecutor(self):
>                       self._callTestMethod(testMethod)

.../hostedtoolcache/Python/3.13.11........./x64/lib/python3.13/unittest/case.py:651: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_fail>
method = <bound method TestProviderLDAP.test_ldap_bind_fail of <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_fail>>

    def _callTestMethod(self, method):
>       if method() is not None:

.../hostedtoolcache/Python/3.13.11........./x64/lib/python3.13/unittest/case.py:606: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_fail>
args = (), kwargs = {}

    @wraps(func)
    def wrapper(self: TransactionTestCase, *args, **kwargs):
        """Run test again if we're below max_retries, including tearDown and
        setUp. Otherwise raise the error"""
        nonlocal count
        try:
>           return func(self, *args, **kwargs)

tests/e2e/utils.py:324: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

args = (<tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_fail>,)
kwargs = {}, file = 'default/flow-default-invalidation-flow.yaml'
content = 'version: 1\nmetadata:\n  name: Default - Invalidation flow\nentries:\n- attrs:\n    designation: invalidation\n    na...0\n    stage: !KeyOf default-invalidation-logout\n    target: !KeyOf flow\n  model: authentik_flows.flowstagebinding\n'

    @wraps(func)
    def wrapper(*args, **kwargs):
        for file in files:
            content = BlueprintInstance(path=file).retrieve()
            Importer.from_string(content).apply()
>       return func(*args, **kwargs)

.../blueprints/tests/__init__.py:25: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_fail>

    @retry()
    @apply_blueprint(
        "default/flow-default-authentication-flow.yaml",
        "default/flow-default-invalidation-flow.yaml",
    )
    def test_ldap_bind_fail(self):
        """Test simple bind (failed)"""
>       self._prepare()

tests/e2e/test_provider_ldap.py:149: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_fail>

    def _prepare(self) -> User:
        """prepare user, provider, app and container"""
        self.user.attributes["extraAttribute"] = "bar"
        self.user.save()
    
        ldap: LDAPProvider = LDAPProvider.objects.create(
            name=generate_id(),
            authorization_flow=Flow.objects.get(slug="default-authentication-flow"),
            search_mode=APIAccessMode.CACHED,
        )
        assign_perm("search_full_directory", self.user, ldap)
        # we need to create an application to actually access the ldap
        Application.objects.create(name=generate_id(), slug=generate_id(), provider=ldap)
        outpost: Outpost = Outpost.objects.create(
            name=generate_id(),
            type=OutpostType.LDAP,
            _config=asdict(OutpostConfig(log_level="debug")),
        )
        outpost.providers.add(ldap)
    
>       self.start_ldap(outpost)

tests/e2e/test_provider_ldap.py:57: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_fail>
outpost = <Outpost: Outpost CZ96rE6PvU65NntVu2cryBBOZYm9h5hrzJpgbRxo>

    def start_ldap(self, outpost: Outpost):
        """Start ldap container based on outpost created"""
>       self.run_container(
            image=self.get_container_image("ghcr.io/goauthentik/dev-ldap"),
            ports={
                "3389": "3389",
                "6636": "6636",
            },
            environment={
                "AUTHENTIK_TOKEN": outpost.token.key,
            },
        )

tests/e2e/test_provider_ldap.py:26: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_fail>
specs = {'detach': True, 'environment': {'AUTHENTIK_HOST': 'http://10.1.0.20:54051', 'AUTHENTIK_TOKEN': 'vbFKJ0EhiMrtoFPaYUMMD...r.io/goauthentik/dev-ldap:gh-main', 'labels': {'io.goauthentik.test': 'LE3LhswMqkI17BpS2aP9TpMii7Xgod8TGJSFPBbr'}, ...}
container = <Container: c93a060b5aad>
state = {'Dead': False, 'Error': '', 'ExitCode': 0, 'FinishedAt': '0001-01-01T00:00:00Z', ...}

    def run_container(self, **specs: dict[str, Any]) -> Container:
        if "network_mode" not in specs:
            specs["network"] = self.__network.name
        specs["labels"] = self.docker_labels
        specs["detach"] = True
        if hasattr(self, "live_server_url"):
            specs.setdefault("environment", {})
            specs["environment"]["AUTHENTIK_HOST"] = self.live_server_url
        container = self.docker_client.containers.run(**specs)
        container.reload()
        state = container.attrs.get("State", {})
        if "Health" not in state:
            return container
>       self.wait_for_container(container)

tests/e2e/utils.py:117: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_fail>
container = <Container: c93a060b5aad>

    def wait_for_container(self, container: Container):
        """Check that container is health"""
        attempt = 0
        while True:
            container.reload()
            status = container.attrs.get("State", {}).get("Health", {}).get("Status")
            if status == "healthy":
                return container
            sleep(1)
            attempt += 1
            if attempt >= self.max_healthcheck_attempts:
>               raise self.failureException("Container failed to start")
E               AssertionError: Container failed to start

tests/e2e/utils.py:91: AssertionError
tests.e2e.test_provider_radius.TestProviderRadius::test_radius_bind_fail
Stack Traces | 50.6s run time
self = <unittest.case._Outcome object at 0x7f7584f216d0>
test_case = <tests.e2e.test_provider_radius.TestProviderRadius testMethod=test_radius_bind_fail>
subTest = False

    @contextlib.contextmanager
    def testPartExecutor(self, test_case, subTest=False):
        old_success = self.success
        self.success = True
        try:
>           yield

.../hostedtoolcache/Python/3.13.11........./x64/lib/python3.13/unittest/case.py:58: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_radius.TestProviderRadius testMethod=test_radius_bind_fail>
result = <TestCaseFunction test_radius_bind_fail>

    def run(self, result=None):
        if result is None:
            result = self.defaultTestResult()
            startTestRun = getattr(result, 'startTestRun', None)
            stopTestRun = getattr(result, 'stopTestRun', None)
            if startTestRun is not None:
                startTestRun()
        else:
            stopTestRun = None
    
        result.startTest(self)
        try:
            testMethod = getattr(self, self._testMethodName)
            if (getattr(self.__class__, "__unittest_skip__", False) or
                getattr(testMethod, "__unittest_skip__", False)):
                # If the class or method was skipped.
                skip_why = (getattr(self.__class__, '__unittest_skip_why__', '')
                            or getattr(testMethod, '__unittest_skip_why__', ''))
                _addSkip(result, self, skip_why)
                return result
    
            expecting_failure = (
                getattr(self, "__unittest_expecting_failure__", False) or
                getattr(testMethod, "__unittest_expecting_failure__", False)
            )
            outcome = _Outcome(result)
            start_time = time.perf_counter()
            try:
                self._outcome = outcome
    
                with outcome.testPartExecutor(self):
                    self._callSetUp()
                if outcome.success:
                    outcome.expecting_failure = expecting_failure
                    with outcome.testPartExecutor(self):
>                       self._callTestMethod(testMethod)

.../hostedtoolcache/Python/3.13.11........./x64/lib/python3.13/unittest/case.py:651: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_radius.TestProviderRadius testMethod=test_radius_bind_fail>
method = <bound method TestProviderRadius.test_radius_bind_fail of <tests.e2e.test_provider_radius.TestProviderRadius testMethod=test_radius_bind_fail>>

    def _callTestMethod(self, method):
>       if method() is not None:

.../hostedtoolcache/Python/3.13.11........./x64/lib/python3.13/unittest/case.py:606: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_radius.TestProviderRadius testMethod=test_radius_bind_fail>
args = (), kwargs = {}

    @wraps(func)
    def wrapper(self: TransactionTestCase, *args, **kwargs):
        """Run test again if we're below max_retries, including tearDown and
        setUp. Otherwise raise the error"""
        nonlocal count
        try:
>           return func(self, *args, **kwargs)

tests/e2e/utils.py:324: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

args = (<tests.e2e.test_provider_radius.TestProviderRadius testMethod=test_radius_bind_fail>,)
kwargs = {}, file = 'default/flow-default-invalidation-flow.yaml'
content = 'version: 1\nmetadata:\n  name: Default - Invalidation flow\nentries:\n- attrs:\n    designation: invalidation\n    na...0\n    stage: !KeyOf default-invalidation-logout\n    target: !KeyOf flow\n  model: authentik_flows.flowstagebinding\n'

    @wraps(func)
    def wrapper(*args, **kwargs):
        for file in files:
            content = BlueprintInstance(path=file).retrieve()
            Importer.from_string(content).apply()
>       return func(*args, **kwargs)

.../blueprints/tests/__init__.py:25: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_radius.TestProviderRadius testMethod=test_radius_bind_fail>

    @retry(exceptions=[Timeout])
    @apply_blueprint(
        "default/flow-default-authentication-flow.yaml",
        "default/flow-default-invalidation-flow.yaml",
    )
    def test_radius_bind_fail(self):
        """Test simple bind (failed)"""
>       self._prepare()

tests/e2e/test_provider_radius.py:86: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_radius.TestProviderRadius testMethod=test_radius_bind_fail>

    def _prepare(self) -> User:
        """prepare user, provider, app and container"""
        radius: RadiusProvider = RadiusProvider.objects.create(
            name=generate_id(),
            authorization_flow=Flow.objects.get(slug="default-authentication-flow"),
            shared_secret=self.shared_secret,
        )
        # we need to create an application to actually access radius
        Application.objects.create(name=generate_id(), slug=generate_id(), provider=radius)
        outpost: Outpost = Outpost.objects.create(
            name=generate_id(),
            type=OutpostType.RADIUS,
            _config=asdict(OutpostConfig(log_level="debug")),
        )
        outpost.providers.add(radius)
    
>       self.start_radius(outpost)

tests/e2e/test_provider_radius.py:52: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_radius.TestProviderRadius testMethod=test_radius_bind_fail>
outpost = <Outpost: Outpost FOcwmIuynYw9CGfoxN8yHHR5x0KvDDp3AlHSBwfL>

    def start_radius(self, outpost: Outpost):
        """Start radius container based on outpost created"""
>       self.run_container(
            image=self.get_container_image("ghcr.io/goauthentik/dev-radius"),
            ports={"1812/udp": 1812},
            environment={
                "AUTHENTIK_TOKEN": outpost.token.key,
            },
        )

tests/e2e/test_provider_radius.py:28: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_radius.TestProviderRadius testMethod=test_radius_bind_fail>
specs = {'detach': True, 'environment': {'AUTHENTIK_HOST': 'http://10.1.0.150:41543', 'AUTHENTIK_TOKEN': 'qm8hBtd1nZM5VpNM3Kmy...io/goauthentik/dev-radius:gh-main', 'labels': {'io.goauthentik.test': 'FKbQpXsFmusWltkF3vZLXKN1Tsp8jD0GEQRws2R3'}, ...}
container = <Container: 3546573dfc1a>
state = {'Dead': False, 'Error': '', 'ExitCode': 0, 'FinishedAt': '0001-01-01T00:00:00Z', ...}

    def run_container(self, **specs: dict[str, Any]) -> Container:
        if "network_mode" not in specs:
            specs["network"] = self.__network.name
        specs["labels"] = self.docker_labels
        specs["detach"] = True
        if hasattr(self, "live_server_url"):
            specs.setdefault("environment", {})
            specs["environment"]["AUTHENTIK_HOST"] = self.live_server_url
        container = self.docker_client.containers.run(**specs)
        container.reload()
        state = container.attrs.get("State", {})
        if "Health" not in state:
            return container
>       self.wait_for_container(container)

tests/e2e/utils.py:117: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_radius.TestProviderRadius testMethod=test_radius_bind_fail>
container = <Container: 3546573dfc1a>

    def wait_for_container(self, container: Container):
        """Check that container is health"""
        attempt = 0
        while True:
            container.reload()
            status = container.attrs.get("State", {}).get("Health", {}).get("Status")
            if status == "healthy":
                return container
            sleep(1)
            attempt += 1
            if attempt >= self.max_healthcheck_attempts:
>               raise self.failureException("Container failed to start")
E               AssertionError: Container failed to start

tests/e2e/utils.py:91: AssertionError
tests.e2e.test_provider_ldap.TestProviderLDAP::test_ldap_search_attrs_filter
Stack Traces | 50.9s run time
self = <unittest.case._Outcome object at 0x7f8e045d35c0>
test_case = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_search_attrs_filter>
subTest = False

    @contextlib.contextmanager
    def testPartExecutor(self, test_case, subTest=False):
        old_success = self.success
        self.success = True
        try:
>           yield

.../hostedtoolcache/Python/3.13.11........./x64/lib/python3.13/unittest/case.py:58: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_search_attrs_filter>
result = <TestCaseFunction test_ldap_search_attrs_filter>

    def run(self, result=None):
        if result is None:
            result = self.defaultTestResult()
            startTestRun = getattr(result, 'startTestRun', None)
            stopTestRun = getattr(result, 'stopTestRun', None)
            if startTestRun is not None:
                startTestRun()
        else:
            stopTestRun = None
    
        result.startTest(self)
        try:
            testMethod = getattr(self, self._testMethodName)
            if (getattr(self.__class__, "__unittest_skip__", False) or
                getattr(testMethod, "__unittest_skip__", False)):
                # If the class or method was skipped.
                skip_why = (getattr(self.__class__, '__unittest_skip_why__', '')
                            or getattr(testMethod, '__unittest_skip_why__', ''))
                _addSkip(result, self, skip_why)
                return result
    
            expecting_failure = (
                getattr(self, "__unittest_expecting_failure__", False) or
                getattr(testMethod, "__unittest_expecting_failure__", False)
            )
            outcome = _Outcome(result)
            start_time = time.perf_counter()
            try:
                self._outcome = outcome
    
                with outcome.testPartExecutor(self):
                    self._callSetUp()
                if outcome.success:
                    outcome.expecting_failure = expecting_failure
                    with outcome.testPartExecutor(self):
>                       self._callTestMethod(testMethod)

.../hostedtoolcache/Python/3.13.11........./x64/lib/python3.13/unittest/case.py:651: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_search_attrs_filter>
method = <bound method TestProviderLDAP.test_ldap_search_attrs_filter of <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_search_attrs_filter>>

    def _callTestMethod(self, method):
>       if method() is not None:

.../hostedtoolcache/Python/3.13.11........./x64/lib/python3.13/unittest/case.py:606: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_search_attrs_filter>
args = (), kwargs = {}

    @wraps(func)
    def wrapper(self: TransactionTestCase, *args, **kwargs):
        """Run test again if we're below max_retries, including tearDown and
        setUp. Otherwise raise the error"""
        nonlocal count
        try:
>           return func(self, *args, **kwargs)

tests/e2e/utils.py:324: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

args = (<tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_search_attrs_filter>,)
kwargs = {}, file = 'default/flow-default-invalidation-flow.yaml'
content = 'version: 1\nmetadata:\n  name: Default - Invalidation flow\nentries:\n- attrs:\n    designation: invalidation\n    na...0\n    stage: !KeyOf default-invalidation-logout\n    target: !KeyOf flow\n  model: authentik_flows.flowstagebinding\n'

    @wraps(func)
    def wrapper(*args, **kwargs):
        for file in files:
            content = BlueprintInstance(path=file).retrieve()
            Importer.from_string(content).apply()
>       return func(*args, **kwargs)

.../blueprints/tests/__init__.py:25: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

args = (<tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_search_attrs_filter>,)
kwargs = {}, config = <AuthentikTenantsConfig: authentik_tenants>

    @wraps(func)
    def wrapper(*args, **kwargs):
        config = apps.get_app_config(app_name)
        if isinstance(config, ManagedAppConfig):
            config._on_startup_callback(None)
>       return func(*args, **kwargs)

.../blueprints/tests/__init__.py:43: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

args = (<tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_search_attrs_filter>,)
kwargs = {}, config = <AuthentikOutpostConfig: authentik_outposts>

    @wraps(func)
    def wrapper(*args, **kwargs):
        config = apps.get_app_config(app_name)
        if isinstance(config, ManagedAppConfig):
            config._on_startup_callback(None)
>       return func(*args, **kwargs)

.../blueprints/tests/__init__.py:43: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_search_attrs_filter>

    @retry()
    @apply_blueprint(
        "default/flow-default-authentication-flow.yaml",
        "default/flow-default-invalidation-flow.yaml",
    )
    @reconcile_app("authentik_tenants")
    @reconcile_app("authentik_outposts")
    def test_ldap_search_attrs_filter(self):
        """Test search with attributes filtering"""
        # Remove akadmin to ensure list is correct
        # Remove user before starting container so it's not cached
        User.objects.filter(username="akadmin").delete()
    
>       outpost = self._prepare()

tests/e2e/test_provider_ldap.py:443: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_search_attrs_filter>

    def _prepare(self) -> User:
        """prepare user, provider, app and container"""
        self.user.attributes["extraAttribute"] = "bar"
        self.user.save()
    
        ldap: LDAPProvider = LDAPProvider.objects.create(
            name=generate_id(),
            authorization_flow=Flow.objects.get(slug="default-authentication-flow"),
            search_mode=APIAccessMode.CACHED,
        )
        assign_perm("search_full_directory", self.user, ldap)
        # we need to create an application to actually access the ldap
        Application.objects.create(name=generate_id(), slug=generate_id(), provider=ldap)
        outpost: Outpost = Outpost.objects.create(
            name=generate_id(),
            type=OutpostType.LDAP,
            _config=asdict(OutpostConfig(log_level="debug")),
        )
        outpost.providers.add(ldap)
    
>       self.start_ldap(outpost)

tests/e2e/test_provider_ldap.py:57: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_search_attrs_filter>
outpost = <Outpost: Outpost 3oLCt7ZxPz8I2traLCHK2jjlAMS06w1lAvOYlSXo>

    def start_ldap(self, outpost: Outpost):
        """Start ldap container based on outpost created"""
>       self.run_container(
            image=self.get_container_image("ghcr.io/goauthentik/dev-ldap"),
            ports={
                "3389": "3389",
                "6636": "6636",
            },
            environment={
                "AUTHENTIK_TOKEN": outpost.token.key,
            },
        )

tests/e2e/test_provider_ldap.py:26: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_search_attrs_filter>
specs = {'detach': True, 'environment': {'AUTHENTIK_HOST': 'http://10.1.0.20:54051', 'AUTHENTIK_TOKEN': 'rb9Q2ffvvpmhbGoGhsWcY...r.io/goauthentik/dev-ldap:gh-main', 'labels': {'io.goauthentik.test': 'LE3LhswMqkI17BpS2aP9TpMii7Xgod8TGJSFPBbr'}, ...}
container = <Container: a611ae276770>
state = {'Dead': False, 'Error': '', 'ExitCode': 0, 'FinishedAt': '0001-01-01T00:00:00Z', ...}

    def run_container(self, **specs: dict[str, Any]) -> Container:
        if "network_mode" not in specs:
            specs["network"] = self.__network.name
        specs["labels"] = self.docker_labels
        specs["detach"] = True
        if hasattr(self, "live_server_url"):
            specs.setdefault("environment", {})
            specs["environment"]["AUTHENTIK_HOST"] = self.live_server_url
        container = self.docker_client.containers.run(**specs)
        container.reload()
        state = container.attrs.get("State", {})
        if "Health" not in state:
            return container
>       self.wait_for_container(container)

tests/e2e/utils.py:117: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_search_attrs_filter>
container = <Container: a611ae276770>

    def wait_for_container(self, container: Container):
        """Check that container is health"""
        attempt = 0
        while True:
            container.reload()
            status = container.attrs.get("State", {}).get("Health", {}).get("Status")
            if status == "healthy":
                return container
            sleep(1)
            attempt += 1
            if attempt >= self.max_healthcheck_attempts:
>               raise self.failureException("Container failed to start")
E               AssertionError: Container failed to start

tests/e2e/utils.py:91: AssertionError
tests.e2e.test_provider_ldap.TestProviderLDAP::test_ldap_schema
Stack Traces | 51s run time
self = <unittest.case._Outcome object at 0x7f8e053d9f20>
test_case = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_schema>
subTest = False

    @contextlib.contextmanager
    def testPartExecutor(self, test_case, subTest=False):
        old_success = self.success
        self.success = True
        try:
>           yield

.../hostedtoolcache/Python/3.13.11........./x64/lib/python3.13/unittest/case.py:58: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_schema>
result = <TestCaseFunction test_ldap_schema>

    def run(self, result=None):
        if result is None:
            result = self.defaultTestResult()
            startTestRun = getattr(result, 'startTestRun', None)
            stopTestRun = getattr(result, 'stopTestRun', None)
            if startTestRun is not None:
                startTestRun()
        else:
            stopTestRun = None
    
        result.startTest(self)
        try:
            testMethod = getattr(self, self._testMethodName)
            if (getattr(self.__class__, "__unittest_skip__", False) or
                getattr(testMethod, "__unittest_skip__", False)):
                # If the class or method was skipped.
                skip_why = (getattr(self.__class__, '__unittest_skip_why__', '')
                            or getattr(testMethod, '__unittest_skip_why__', ''))
                _addSkip(result, self, skip_why)
                return result
    
            expecting_failure = (
                getattr(self, "__unittest_expecting_failure__", False) or
                getattr(testMethod, "__unittest_expecting_failure__", False)
            )
            outcome = _Outcome(result)
            start_time = time.perf_counter()
            try:
                self._outcome = outcome
    
                with outcome.testPartExecutor(self):
                    self._callSetUp()
                if outcome.success:
                    outcome.expecting_failure = expecting_failure
                    with outcome.testPartExecutor(self):
>                       self._callTestMethod(testMethod)

.../hostedtoolcache/Python/3.13.11........./x64/lib/python3.13/unittest/case.py:651: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_schema>
method = <bound method TestProviderLDAP.test_ldap_schema of <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_schema>>

    def _callTestMethod(self, method):
>       if method() is not None:

.../hostedtoolcache/Python/3.13.11........./x64/lib/python3.13/unittest/case.py:606: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_schema>
args = (), kwargs = {}

    @wraps(func)
    def wrapper(self: TransactionTestCase, *args, **kwargs):
        """Run test again if we're below max_retries, including tearDown and
        setUp. Otherwise raise the error"""
        nonlocal count
        try:
>           return func(self, *args, **kwargs)

tests/e2e/utils.py:324: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

args = (<tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_schema>,)
kwargs = {}, file = 'default/flow-default-invalidation-flow.yaml'
content = 'version: 1\nmetadata:\n  name: Default - Invalidation flow\nentries:\n- attrs:\n    designation: invalidation\n    na...0\n    stage: !KeyOf default-invalidation-logout\n    target: !KeyOf flow\n  model: authentik_flows.flowstagebinding\n'

    @wraps(func)
    def wrapper(*args, **kwargs):
        for file in files:
            content = BlueprintInstance(path=file).retrieve()
            Importer.from_string(content).apply()
>       return func(*args, **kwargs)

.../blueprints/tests/__init__.py:25: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

args = (<tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_schema>,)
kwargs = {}, config = <AuthentikTenantsConfig: authentik_tenants>

    @wraps(func)
    def wrapper(*args, **kwargs):
        config = apps.get_app_config(app_name)
        if isinstance(config, ManagedAppConfig):
            config._on_startup_callback(None)
>       return func(*args, **kwargs)

.../blueprints/tests/__init__.py:43: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

args = (<tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_schema>,)
kwargs = {}, config = <AuthentikOutpostConfig: authentik_outposts>

    @wraps(func)
    def wrapper(*args, **kwargs):
        config = apps.get_app_config(app_name)
        if isinstance(config, ManagedAppConfig):
            config._on_startup_callback(None)
>       return func(*args, **kwargs)

.../blueprints/tests/__init__.py:43: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_schema>

    @retry()
    @apply_blueprint(
        "default/flow-default-authentication-flow.yaml",
        "default/flow-default-invalidation-flow.yaml",
    )
    @reconcile_app("authentik_tenants")
    @reconcile_app("authentik_outposts")
    def test_ldap_schema(self):
        """Test LDAP Schema"""
>       self._prepare()

tests/e2e/test_provider_ldap.py:417: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_schema>

    def _prepare(self) -> User:
        """prepare user, provider, app and container"""
        self.user.attributes["extraAttribute"] = "bar"
        self.user.save()
    
        ldap: LDAPProvider = LDAPProvider.objects.create(
            name=generate_id(),
            authorization_flow=Flow.objects.get(slug="default-authentication-flow"),
            search_mode=APIAccessMode.CACHED,
        )
        assign_perm("search_full_directory", self.user, ldap)
        # we need to create an application to actually access the ldap
        Application.objects.create(name=generate_id(), slug=generate_id(), provider=ldap)
        outpost: Outpost = Outpost.objects.create(
            name=generate_id(),
            type=OutpostType.LDAP,
            _config=asdict(OutpostConfig(log_level="debug")),
        )
        outpost.providers.add(ldap)
    
>       self.start_ldap(outpost)

tests/e2e/test_provider_ldap.py:57: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_schema>
outpost = <Outpost: Outpost c9obBJAuvxBiKgjjSk22Np5vig6vH83SqzrgDsah>

    def start_ldap(self, outpost: Outpost):
        """Start ldap container based on outpost created"""
>       self.run_container(
            image=self.get_container_image("ghcr.io/goauthentik/dev-ldap"),
            ports={
                "3389": "3389",
                "6636": "6636",
            },
            environment={
                "AUTHENTIK_TOKEN": outpost.token.key,
            },
        )

tests/e2e/test_provider_ldap.py:26: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_schema>
specs = {'detach': True, 'environment': {'AUTHENTIK_HOST': 'http://10.1.0.20:54051', 'AUTHENTIK_TOKEN': 'WRltW042WY7CWJPvqWgBw...r.io/goauthentik/dev-ldap:gh-main', 'labels': {'io.goauthentik.test': 'LE3LhswMqkI17BpS2aP9TpMii7Xgod8TGJSFPBbr'}, ...}
container = <Container: f34f5e0093b6>
state = {'Dead': False, 'Error': '', 'ExitCode': 0, 'FinishedAt': '0001-01-01T00:00:00Z', ...}

    def run_container(self, **specs: dict[str, Any]) -> Container:
        if "network_mode" not in specs:
            specs["network"] = self.__network.name
        specs["labels"] = self.docker_labels
        specs["detach"] = True
        if hasattr(self, "live_server_url"):
            specs.setdefault("environment", {})
            specs["environment"]["AUTHENTIK_HOST"] = self.live_server_url
        container = self.docker_client.containers.run(**specs)
        container.reload()
        state = container.attrs.get("State", {})
        if "Health" not in state:
            return container
>       self.wait_for_container(container)

tests/e2e/utils.py:117: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_schema>
container = <Container: f34f5e0093b6>

    def wait_for_container(self, container: Container):
        """Check that container is health"""
        attempt = 0
        while True:
            container.reload()
            status = container.attrs.get("State", {}).get("Health", {}).get("Status")
            if status == "healthy":
                return container
            sleep(1)
            attempt += 1
            if attempt >= self.max_healthcheck_attempts:
>               raise self.failureException("Container failed to start")
E               AssertionError: Container failed to start

tests/e2e/utils.py:91: AssertionError
tests.e2e.test_provider_proxy.TestProviderProxy::test_proxy_basic_auth
Stack Traces | 51.1s run time
self = <unittest.case._Outcome object at 0x7f35050043e0>
test_case = <tests.e2e.test_provider_proxy.TestProviderProxy testMethod=test_proxy_basic_auth>
subTest = False

    @contextlib.contextmanager
    def testPartExecutor(self, test_case, subTest=False):
        old_success = self.success
        self.success = True
        try:
>           yield

.../hostedtoolcache/Python/3.13.11........./x64/lib/python3.13/unittest/case.py:58: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_proxy.TestProviderProxy testMethod=test_proxy_basic_auth>
result = <TestCaseFunction test_proxy_basic_auth>

    def run(self, result=None):
        if result is None:
            result = self.defaultTestResult()
            startTestRun = getattr(result, 'startTestRun', None)
            stopTestRun = getattr(result, 'stopTestRun', None)
            if startTestRun is not None:
                startTestRun()
        else:
            stopTestRun = None
    
        result.startTest(self)
        try:
            testMethod = getattr(self, self._testMethodName)
            if (getattr(self.__class__, "__unittest_skip__", False) or
                getattr(testMethod, "__unittest_skip__", False)):
                # If the class or method was skipped.
                skip_why = (getattr(self.__class__, '__unittest_skip_why__', '')
                            or getattr(testMethod, '__unittest_skip_why__', ''))
                _addSkip(result, self, skip_why)
                return result
    
            expecting_failure = (
                getattr(self, "__unittest_expecting_failure__", False) or
                getattr(testMethod, "__unittest_expecting_failure__", False)
            )
            outcome = _Outcome(result)
            start_time = time.perf_counter()
            try:
                self._outcome = outcome
    
                with outcome.testPartExecutor(self):
                    self._callSetUp()
                if outcome.success:
                    outcome.expecting_failure = expecting_failure
                    with outcome.testPartExecutor(self):
>                       self._callTestMethod(testMethod)

.../hostedtoolcache/Python/3.13.11........./x64/lib/python3.13/unittest/case.py:651: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_proxy.TestProviderProxy testMethod=test_proxy_basic_auth>
method = <bound method TestProviderProxy.test_proxy_basic_auth of <tests.e2e.test_provider_proxy.TestProviderProxy testMethod=test_proxy_basic_auth>>

    def _callTestMethod(self, method):
>       if method() is not None:

.../hostedtoolcache/Python/3.13.11........./x64/lib/python3.13/unittest/case.py:606: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_proxy.TestProviderProxy testMethod=test_proxy_basic_auth>
args = (), kwargs = {}

    @wraps(func)
    def wrapper(self: TransactionTestCase, *args, **kwargs):
        """Run test again if we're below max_retries, including tearDown and
        setUp. Otherwise raise the error"""
        nonlocal count
        try:
>           return func(self, *args, **kwargs)

tests/e2e/utils.py:324: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

args = (<tests.e2e.test_provider_proxy.TestProviderProxy testMethod=test_proxy_basic_auth>,)
kwargs = {}, file = 'default/flow-default-invalidation-flow.yaml'
content = 'version: 1\nmetadata:\n  name: Default - Invalidation flow\nentries:\n- attrs:\n    designation: invalidation\n    na...0\n    stage: !KeyOf default-invalidation-logout\n    target: !KeyOf flow\n  model: authentik_flows.flowstagebinding\n'

    @wraps(func)
    def wrapper(*args, **kwargs):
        for file in files:
            content = BlueprintInstance(path=file).retrieve()
            Importer.from_string(content).apply()
>       return func(*args, **kwargs)

.../blueprints/tests/__init__.py:25: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

args = (<tests.e2e.test_provider_proxy.TestProviderProxy testMethod=test_proxy_basic_auth>,)
kwargs = {}, file = 'default/flow-default-provider-invalidation.yaml'
content = 'version: 1\nmetadata:\n  name: Default - Provider invalidation flow\nentries:\n- attrs:\n    designation: invalidatio...ation: none\n  identifiers:\n    slug: default-provider-invalidation-flow\n  model: authentik_flows.flow\n  id: flow\n'

    @wraps(func)
    def wrapper(*args, **kwargs):
        for file in files:
            content = BlueprintInstance(path=file).retrieve()
            Importer.from_string(content).apply()
>       return func(*args, **kwargs)

.../blueprints/tests/__init__.py:25: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

args = (<tests.e2e.test_provider_proxy.TestProviderProxy testMethod=test_proxy_basic_auth>,)
kwargs = {}, file = 'system/providers-proxy.yaml'
content = 'version: 1\nmetadata:\n  labels:\n    blueprints.goauthentik.io/system: "true"\n  name: System - Proxy Provider - Sco...ser.group_attributes(request),\n                "is_superuser": request.user.is_superuser,\n            }\n        }\n'

    @wraps(func)
    def wrapper(*args, **kwargs):
        for file in files:
            content = BlueprintInstance(path=file).retrieve()
            Importer.from_string(content).apply()
>       return func(*args, **kwargs)

.../blueprints/tests/__init__.py:25: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

args = (<tests.e2e.test_provider_proxy.TestProviderProxy testMethod=test_proxy_basic_auth>,)
kwargs = {}, config = <AuthentikCryptoConfig: authentik_crypto>

    @wraps(func)
    def wrapper(*args, **kwargs):
        config = apps.get_app_config(app_name)
        if isinstance(config, ManagedAppConfig):
            config._on_startup_callback(None)
>       return func(*args, **kwargs)

.../blueprints/tests/__init__.py:43: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_proxy.TestProviderProxy testMethod=test_proxy_basic_auth>

    @retry()
    @apply_blueprint(
        "default/flow-default-authentication-flow.yaml",
        "default/flow-default-invalidation-flow.yaml",
    )
    @apply_blueprint(
        "default/flow-default-provider-authorization-implicit-consent.yaml",
        "default/flow-default-provider-invalidation.yaml",
    )
    @apply_blueprint(
        "system/providers-oauth2.yaml",
        "system/providers-proxy.yaml",
    )
    @reconcile_app("authentik_crypto")
    def test_proxy_basic_auth(self):
        """Test simple outpost setup with single provider"""
        cred = generate_id()
        attr = "basic-password"  # nosec
        self.user.attributes["basic-username"] = cred
        self.user.attributes[attr] = cred
        self.user.save()
    
        proxy: ProxyProvider = ProxyProvider.objects.create(
            name=generate_id(),
            authorization_flow=Flow.objects.get(
                slug="default-provider-authorization-implicit-consent"
            ),
            invalidation_flow=Flow.objects.get(slug="default-provider-invalidation-flow"),
            internal_host=f"http://{self.host}",
            external_host="http://localhost:9000",
            basic_auth_enabled=True,
            basic_auth_user_attribute="basic-username",
            basic_auth_password_attribute=attr,
        )
        # Ensure OAuth2 Params are set
        proxy.set_oauth_defaults()
        proxy.save()
        # we need to create an application to actually access the proxy
        Application.objects.create(name=generate_id(), slug=generate_id(), provider=proxy)
        outpost: Outpost = Outpost.objects.create(
            name=generate_id(),
            type=OutpostType.PROXY,
        )
        outpost.providers.add(proxy)
        outpost.build_user_permissions(outpost.user)
    
>       self.start_proxy(outpost)

tests/e2e/test_provider_proxy.py:162: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_proxy.TestProviderProxy testMethod=test_proxy_basic_auth>
outpost = <Outpost: Outpost MOTPPiugHyk5xwlSSHUilUuvxTbkhLzSJBdNpB9Z>

    def start_proxy(self, outpost: Outpost):
        """Start proxy container based on outpost created"""
>       self.run_container(
            image=self.get_container_image("ghcr.io/goauthentik/dev-proxy"),
            ports={
                "9000": "9000",
            },
            environment={
                "AUTHENTIK_TOKEN": outpost.token.key,
            },
        )

tests/e2e/test_provider_proxy.py:38: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_proxy.TestProviderProxy testMethod=test_proxy_basic_auth>
specs = {'detach': True, 'environment': {'AUTHENTIK_HOST': 'http://10.1.0.141:37617', 'AUTHENTIK_TOKEN': 'gtF642An12vLKjhMO7IY....io/goauthentik/dev-proxy:gh-main', 'labels': {'io.goauthentik.test': 'O4IKbDKwLlcrabll9X82dju5gLdPHqHOUgqmNn01'}, ...}
container = <Container: da96b3e3d4cc>
state = {'Dead': False, 'Error': '', 'ExitCode': 0, 'FinishedAt': '0001-01-01T00:00:00Z', ...}

    def run_container(self, **specs: dict[str, Any]) -> Container:
        if "network_mode" not in specs:
            specs["network"] = self.__network.name
        specs["labels"] = self.docker_labels
        specs["detach"] = True
        if hasattr(self, "live_server_url"):
            specs.setdefault("environment", {})
            specs["environment"]["AUTHENTIK_HOST"] = self.live_server_url
        container = self.docker_client.containers.run(**specs)
        container.reload()
        state = container.attrs.get("State", {})
        if "Health" not in state:
            return container
>       self.wait_for_container(container)

tests/e2e/utils.py:117: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_proxy.TestProviderProxy testMethod=test_proxy_basic_auth>
container = <Container: da96b3e3d4cc>

    def wait_for_container(self, container: Container):
        """Check that container is health"""
        attempt = 0
        while True:
            container.reload()
            status = container.attrs.get("State", {}).get("Health", {}).get("Status")
            if status == "healthy":
                return container
            sleep(1)
            attempt += 1
            if attempt >= self.max_healthcheck_attempts:
>               raise self.failureException("Container failed to start")
E               AssertionError: Container failed to start

tests/e2e/utils.py:91: AssertionError
tests.e2e.test_provider_proxy.TestProviderProxy::test_proxy_simple
Stack Traces | 51.3s run time
self = <unittest.case._Outcome object at 0x7f350720a520>
test_case = <tests.e2e.test_provider_proxy.TestProviderProxy testMethod=test_proxy_simple>
subTest = False

    @contextlib.contextmanager
    def testPartExecutor(self, test_case, subTest=False):
        old_success = self.success
        self.success = True
        try:
>           yield

.../hostedtoolcache/Python/3.13.11........./x64/lib/python3.13/unittest/case.py:58: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_proxy.TestProviderProxy testMethod=test_proxy_simple>
result = <TestCaseFunction test_proxy_simple>

    def run(self, result=None):
        if result is None:
            result = self.defaultTestResult()
            startTestRun = getattr(result, 'startTestRun', None)
            stopTestRun = getattr(result, 'stopTestRun', None)
            if startTestRun is not None:
                startTestRun()
        else:
            stopTestRun = None
    
        result.startTest(self)
        try:
            testMethod = getattr(self, self._testMethodName)
            if (getattr(self.__class__, "__unittest_skip__", False) or
                getattr(testMethod, "__unittest_skip__", False)):
                # If the class or method was skipped.
                skip_why = (getattr(self.__class__, '__unittest_skip_why__', '')
                            or getattr(testMethod, '__unittest_skip_why__', ''))
                _addSkip(result, self, skip_why)
                return result
    
            expecting_failure = (
                getattr(self, "__unittest_expecting_failure__", False) or
                getattr(testMethod, "__unittest_expecting_failure__", False)
            )
            outcome = _Outcome(result)
            start_time = time.perf_counter()
            try:
                self._outcome = outcome
    
                with outcome.testPartExecutor(self):
                    self._callSetUp()
                if outcome.success:
                    outcome.expecting_failure = expecting_failure
                    with outcome.testPartExecutor(self):
>                       self._callTestMethod(testMethod)

.../hostedtoolcache/Python/3.13.11........./x64/lib/python3.13/unittest/case.py:651: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_proxy.TestProviderProxy testMethod=test_proxy_simple>
method = <bound method TestProviderProxy.test_proxy_simple of <tests.e2e.test_provider_proxy.TestProviderProxy testMethod=test_proxy_simple>>

    def _callTestMethod(self, method):
>       if method() is not None:

.../hostedtoolcache/Python/3.13.11........./x64/lib/python3.13/unittest/case.py:606: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_proxy.TestProviderProxy testMethod=test_proxy_simple>
args = (), kwargs = {}

    @wraps(func)
    def wrapper(self: TransactionTestCase, *args, **kwargs):
        """Run test again if we're below max_retries, including tearDown and
        setUp. Otherwise raise the error"""
        nonlocal count
        try:
>           return func(self, *args, **kwargs)

tests/e2e/utils.py:324: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

args = (<tests.e2e.test_provider_proxy.TestProviderProxy testMethod=test_proxy_simple>,)
kwargs = {}, file = 'default/flow-default-invalidation-flow.yaml'
content = 'version: 1\nmetadata:\n  name: Default - Invalidation flow\nentries:\n- attrs:\n    designation: invalidation\n    na...0\n    stage: !KeyOf default-invalidation-logout\n    target: !KeyOf flow\n  model: authentik_flows.flowstagebinding\n'

    @wraps(func)
    def wrapper(*args, **kwargs):
        for file in files:
            content = BlueprintInstance(path=file).retrieve()
            Importer.from_string(content).apply()
>       return func(*args, **kwargs)

.../blueprints/tests/__init__.py:25: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

args = (<tests.e2e.test_provider_proxy.TestProviderProxy testMethod=test_proxy_simple>,)
kwargs = {}, file = 'default/flow-default-provider-invalidation.yaml'
content = 'version: 1\nmetadata:\n  name: Default - Provider invalidation flow\nentries:\n- attrs:\n    designation: invalidatio...ation: none\n  identifiers:\n    slug: default-provider-invalidation-flow\n  model: authentik_flows.flow\n  id: flow\n'

    @wraps(func)
    def wrapper(*args, **kwargs):
        for file in files:
            content = BlueprintInstance(path=file).retrieve()
            Importer.from_string(content).apply()
>       return func(*args, **kwargs)

.../blueprints/tests/__init__.py:25: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

args = (<tests.e2e.test_provider_proxy.TestProviderProxy testMethod=test_proxy_simple>,)
kwargs = {}, file = 'system/providers-proxy.yaml'
content = 'version: 1\nmetadata:\n  labels:\n    blueprints.goauthentik.io/system: "true"\n  name: System - Proxy Provider - Sco...ser.group_attributes(request),\n                "is_superuser": request.user.is_superuser,\n            }\n        }\n'

    @wraps(func)
    def wrapper(*args, **kwargs):
        for file in files:
            content = BlueprintInstance(path=file).retrieve()
            Importer.from_string(content).apply()
>       return func(*args, **kwargs)

.../blueprints/tests/__init__.py:25: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

args = (<tests.e2e.test_provider_proxy.TestProviderProxy testMethod=test_proxy_simple>,)
kwargs = {}, config = <AuthentikCryptoConfig: authentik_crypto>

    @wraps(func)
    def wrapper(*args, **kwargs):
        config = apps.get_app_config(app_name)
        if isinstance(config, ManagedAppConfig):
            config._on_startup_callback(None)
>       return func(*args, **kwargs)

.../blueprints/tests/__init__.py:43: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_proxy.TestProviderProxy testMethod=test_proxy_simple>

    @retry()
    @apply_blueprint(
        "default/flow-default-authentication-flow.yaml",
        "default/flow-default-invalidation-flow.yaml",
    )
    @apply_blueprint(
        "default/flow-default-provider-authorization-implicit-consent.yaml",
        "default/flow-default-provider-invalidation.yaml",
    )
    @apply_blueprint(
        "system/providers-oauth2.yaml",
        "system/providers-proxy.yaml",
    )
    @reconcile_app("authentik_crypto")
    def test_proxy_simple(self):
        """Test simple outpost setup with single provider"""
        # set additionalHeaders to test later
        self.user.attributes["additionalHeaders"] = {"X-Foo": "bar"}
        self.user.save()
    
        proxy: ProxyProvider = ProxyProvider.objects.create(
            name=generate_id(),
            authorization_flow=Flow.objects.get(
                slug="default-provider-authorization-implicit-consent"
            ),
            invalidation_flow=Flow.objects.get(slug="default-provider-invalidation-flow"),
            internal_host=f"http://{self.host}",
            external_host="http://localhost:9000",
        )
        # Ensure OAuth2 Params are set
        proxy.set_oauth_defaults()
        proxy.save()
        # we need to create an application to actually access the proxy
        Application.objects.create(name=generate_id(), slug=generate_id(), provider=proxy)
        outpost: Outpost = Outpost.objects.create(
            name=generate_id(),
            type=OutpostType.PROXY,
        )
        outpost.providers.add(proxy)
        outpost.build_user_permissions(outpost.user)
    
>       self.start_proxy(outpost)

tests/e2e/test_provider_proxy.py:89: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_proxy.TestProviderProxy testMethod=test_proxy_simple>
outpost = <Outpost: Outpost TAf4JhztMB77ZjHHopaLaF3bIfibtIWZWxUVTJvF>

    def start_proxy(self, outpost: Outpost):
        """Start proxy container based on outpost created"""
>       self.run_container(
            image=self.get_container_image("ghcr.io/goauthentik/dev-proxy"),
            ports={
                "9000": "9000",
            },
            environment={
                "AUTHENTIK_TOKEN": outpost.token.key,
            },
        )

tests/e2e/test_provider_proxy.py:38: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_proxy.TestProviderProxy testMethod=test_proxy_simple>
specs = {'detach': True, 'environment': {'AUTHENTIK_HOST': 'http://10.1.0.141:37617', 'AUTHENTIK_TOKEN': 'yrOZpzOXplmKzje3E7ih....io/goauthentik/dev-proxy:gh-main', 'labels': {'io.goauthentik.test': 'O4IKbDKwLlcrabll9X82dju5gLdPHqHOUgqmNn01'}, ...}
container = <Container: a071f619e90e>
state = {'Dead': False, 'Error': '', 'ExitCode': 0, 'FinishedAt': '0001-01-01T00:00:00Z', ...}

    def run_container(self, **specs: dict[str, Any]) -> Container:
        if "network_mode" not in specs:
            specs["network"] = self.__network.name
        specs["labels"] = self.docker_labels
        specs["detach"] = True
        if hasattr(self, "live_server_url"):
            specs.setdefault("environment", {})
            specs["environment"]["AUTHENTIK_HOST"] = self.live_server_url
        container = self.docker_client.containers.run(**specs)
        container.reload()
        state = container.attrs.get("State", {})
        if "Health" not in state:
            return container
>       self.wait_for_container(container)

tests/e2e/utils.py:117: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_proxy.TestProviderProxy testMethod=test_proxy_simple>
container = <Container: a071f619e90e>

    def wait_for_container(self, container: Container):
        """Check that container is health"""
        attempt = 0
        while True:
            container.reload()
            status = container.attrs.get("State", {}).get("Health", {}).get("Status")
            if status == "healthy":
                return container
            sleep(1)
            attempt += 1
            if attempt >= self.max_healthcheck_attempts:
>               raise self.failureException("Container failed to start")
E               AssertionError: Container failed to start

tests/e2e/utils.py:91: AssertionError
tests.e2e.test_provider_ldap.TestProviderLDAP::test_ldap_bind_search_no_perms
Stack Traces | 51.3s run time
self = <unittest.case._Outcome object at 0x7f8e0337f950>
test_case = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_search_no_perms>
subTest = False

    @contextlib.contextmanager
    def testPartExecutor(self, test_case, subTest=False):
        old_success = self.success
        self.success = True
        try:
>           yield

.../hostedtoolcache/Python/3.13.11........./x64/lib/python3.13/unittest/case.py:58: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_search_no_perms>
result = <TestCaseFunction test_ldap_bind_search_no_perms>

    def run(self, result=None):
        if result is None:
            result = self.defaultTestResult()
            startTestRun = getattr(result, 'startTestRun', None)
            stopTestRun = getattr(result, 'stopTestRun', None)
            if startTestRun is not None:
                startTestRun()
        else:
            stopTestRun = None
    
        result.startTest(self)
        try:
            testMethod = getattr(self, self._testMethodName)
            if (getattr(self.__class__, "__unittest_skip__", False) or
                getattr(testMethod, "__unittest_skip__", False)):
                # If the class or method was skipped.
                skip_why = (getattr(self.__class__, '__unittest_skip_why__', '')
                            or getattr(testMethod, '__unittest_skip_why__', ''))
                _addSkip(result, self, skip_why)
                return result
    
            expecting_failure = (
                getattr(self, "__unittest_expecting_failure__", False) or
                getattr(testMethod, "__unittest_expecting_failure__", False)
            )
            outcome = _Outcome(result)
            start_time = time.perf_counter()
            try:
                self._outcome = outcome
    
                with outcome.testPartExecutor(self):
                    self._callSetUp()
                if outcome.success:
                    outcome.expecting_failure = expecting_failure
                    with outcome.testPartExecutor(self):
>                       self._callTestMethod(testMethod)

.../hostedtoolcache/Python/3.13.11........./x64/lib/python3.13/unittest/case.py:651: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_search_no_perms>
method = <bound method TestProviderLDAP.test_ldap_bind_search_no_perms of <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_search_no_perms>>

    def _callTestMethod(self, method):
>       if method() is not None:

.../hostedtoolcache/Python/3.13.11........./x64/lib/python3.13/unittest/case.py:606: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_search_no_perms>
args = (), kwargs = {}

    @wraps(func)
    def wrapper(self: TransactionTestCase, *args, **kwargs):
        """Run test again if we're below max_retries, including tearDown and
        setUp. Otherwise raise the error"""
        nonlocal count
        try:
>           return func(self, *args, **kwargs)

tests/e2e/utils.py:324: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

args = (<tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_search_no_perms>,)
kwargs = {}, file = 'default/flow-default-invalidation-flow.yaml'
content = 'version: 1\nmetadata:\n  name: Default - Invalidation flow\nentries:\n- attrs:\n    designation: invalidation\n    na...0\n    stage: !KeyOf default-invalidation-logout\n    target: !KeyOf flow\n  model: authentik_flows.flowstagebinding\n'

    @wraps(func)
    def wrapper(*args, **kwargs):
        for file in files:
            content = BlueprintInstance(path=file).retrieve()
            Importer.from_string(content).apply()
>       return func(*args, **kwargs)

.../blueprints/tests/__init__.py:25: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

args = (<tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_search_no_perms>,)
kwargs = {}, config = <AuthentikTenantsConfig: authentik_tenants>

    @wraps(func)
    def wrapper(*args, **kwargs):
        config = apps.get_app_config(app_name)
        if isinstance(config, ManagedAppConfig):
            config._on_startup_callback(None)
>       return func(*args, **kwargs)

.../blueprints/tests/__init__.py:43: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

args = (<tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_search_no_perms>,)
kwargs = {}, config = <AuthentikOutpostConfig: authentik_outposts>

    @wraps(func)
    def wrapper(*args, **kwargs):
        config = apps.get_app_config(app_name)
        if isinstance(config, ManagedAppConfig):
            config._on_startup_callback(None)
>       return func(*args, **kwargs)

.../blueprints/tests/__init__.py:43: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_search_no_perms>

    @retry()
    @apply_blueprint(
        "default/flow-default-authentication-flow.yaml",
        "default/flow-default-invalidation-flow.yaml",
    )
    @reconcile_app("authentik_tenants")
    @reconcile_app("authentik_outposts")
    def test_ldap_bind_search_no_perms(self):
        """Test simple bind + search"""
        user = create_test_user()
>       self._prepare()

tests/e2e/test_provider_ldap.py:329: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_search_no_perms>

    def _prepare(self) -> User:
        """prepare user, provider, app and container"""
        self.user.attributes["extraAttribute"] = "bar"
        self.user.save()
    
        ldap: LDAPProvider = LDAPProvider.objects.create(
            name=generate_id(),
            authorization_flow=Flow.objects.get(slug="default-authentication-flow"),
            search_mode=APIAccessMode.CACHED,
        )
        assign_perm("search_full_directory", self.user, ldap)
        # we need to create an application to actually access the ldap
        Application.objects.create(name=generate_id(), slug=generate_id(), provider=ldap)
        outpost: Outpost = Outpost.objects.create(
            name=generate_id(),
            type=OutpostType.LDAP,
            _config=asdict(OutpostConfig(log_level="debug")),
        )
        outpost.providers.add(ldap)
    
>       self.start_ldap(outpost)

tests/e2e/test_provider_ldap.py:57: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_search_no_perms>
outpost = <Outpost: Outpost MtfZB94xCvXLWmmtPbulMOL8qiGBwWDcOGnF03eh>

    def start_ldap(self, outpost: Outpost):
        """Start ldap container based on outpost created"""
>       self.run_container(
            image=self.get_container_image("ghcr.io/goauthentik/dev-ldap"),
            ports={
                "3389": "3389",
                "6636": "6636",
            },
            environment={
                "AUTHENTIK_TOKEN": outpost.token.key,
            },
        )

tests/e2e/test_provider_ldap.py:26: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_search_no_perms>
specs = {'detach': True, 'environment': {'AUTHENTIK_HOST': 'http://10.1.0.20:54051', 'AUTHENTIK_TOKEN': 'xd2d9hgRsfgIpU32e1U29...r.io/goauthentik/dev-ldap:gh-main', 'labels': {'io.goauthentik.test': 'LE3LhswMqkI17BpS2aP9TpMii7Xgod8TGJSFPBbr'}, ...}
container = <Container: ec0774dadd68>
state = {'Dead': False, 'Error': '', 'ExitCode': 0, 'FinishedAt': '0001-01-01T00:00:00Z', ...}

    def run_container(self, **specs: dict[str, Any]) -> Container:
        if "network_mode" not in specs:
            specs["network"] = self.__network.name
        specs["labels"] = self.docker_labels
        specs["detach"] = True
        if hasattr(self, "live_server_url"):
            specs.setdefault("environment", {})
            specs["environment"]["AUTHENTIK_HOST"] = self.live_server_url
        container = self.docker_client.containers.run(**specs)
        container.reload()
        state = container.attrs.get("State", {})
        if "Health" not in state:
            return container
>       self.wait_for_container(container)

tests/e2e/utils.py:117: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_search_no_perms>
container = <Container: ec0774dadd68>

    def wait_for_container(self, container: Container):
        """Check that container is health"""
        attempt = 0
        while True:
            container.reload()
            status = container.attrs.get("State", {}).get("Health", {}).get("Status")
            if status == "healthy":
                return container
            sleep(1)
            attempt += 1
            if attempt >= self.max_healthcheck_attempts:
>               raise self.failureException("Container failed to start")
E               AssertionError: Container failed to start

tests/e2e/utils.py:91: AssertionError
tests.e2e.test_provider_proxy_forward.TestProviderProxyForward::test_traefik
Stack Traces | 53.1s run time
self = <unittest.case._Outcome object at 0x7f350672b110>
test_case = <tests.e2e.test_provider_proxy_forward.TestProviderProxyForward testMethod=test_traefik>
subTest = False

    @contextlib.contextmanager
    def testPartExecutor(self, test_case, subTest=False):
        old_success = self.success
        self.success = True
        try:
>           yield

.../hostedtoolcache/Python/3.13.11........./x64/lib/python3.13/unittest/case.py:58: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_proxy_forward.TestProviderProxyForward testMethod=test_traefik>
result = <TestCaseFunction test_traefik>

    def run(self, result=None):
        if result is None:
            result = self.defaultTestResult()
            startTestRun = getattr(result, 'startTestRun', None)
            stopTestRun = getattr(result, 'stopTestRun', None)
            if startTestRun is not None:
                startTestRun()
        else:
            stopTestRun = None
    
        result.startTest(self)
        try:
            testMethod = getattr(self, self._testMethodName)
            if (getattr(self.__class__, "__unittest_skip__", False) or
                getattr(testMethod, "__unittest_skip__", False)):
                # If the class or method was skipped.
                skip_why = (getattr(self.__class__, '__unittest_skip_why__', '')
                            or getattr(testMethod, '__unittest_skip_why__', ''))
                _addSkip(result, self, skip_why)
                return result
    
            expecting_failure = (
                getattr(self, "__unittest_expecting_failure__", False) or
                getattr(testMethod, "__unittest_expecting_failure__", False)
            )
            outcome = _Outcome(result)
            start_time = time.perf_counter()
            try:
                self._outcome = outcome
    
                with outcome.testPartExecutor(self):
                    self._callSetUp()
                if outcome.success:
                    outcome.expecting_failure = expecting_failure
                    with outcome.testPartExecutor(self):
>                       self._callTestMethod(testMethod)

.../hostedtoolcache/Python/3.13.11........./x64/lib/python3.13/unittest/case.py:651: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_proxy_forward.TestProviderProxyForward testMethod=test_traefik>
method = <bound method TestProviderProxyForward.test_traefik of <tests.e2e.test_provider_proxy_forward.TestProviderProxyForward testMethod=test_traefik>>

    def _callTestMethod(self, method):
>       if method() is not None:

.../hostedtoolcache/Python/3.13.11........./x64/lib/python3.13/unittest/case.py:606: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_proxy_forward.TestProviderProxyForward testMethod=test_traefik>
args = (), kwargs = {}

    @wraps(func)
    def wrapper(self: TransactionTestCase, *args, **kwargs):
        """Run test again if we're below max_retries, including tearDown and
        setUp. Otherwise raise the error"""
        nonlocal count
        try:
>           return func(self, *args, **kwargs)

tests/e2e/utils.py:324: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_proxy_forward.TestProviderProxyForward testMethod=test_traefik>

    @retry()
    def test_traefik(self):
        """Test traefik"""
        local_config_path = (
            Path(__file__).parent / "proxy_forward_auth" / "traefik_single" / "config-static.yaml"
        )
        self.run_container(
            image="docker.io/library/traefik:3.1",
            ports={
                "80": "80",
            },
            volumes={
                local_config_path: {
                    "bind": ".../etc/traefik/traefik.yml",
                }
            },
        )
    
>       self.prepare()

tests/e2e/test_provider_proxy_forward.py:98: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

args = (<tests.e2e.test_provider_proxy_forward.TestProviderProxyForward testMethod=test_traefik>,)
kwargs = {}, file = 'default/flow-default-invalidation-flow.yaml'
content = 'version: 1\nmetadata:\n  name: Default - Invalidation flow\nentries:\n- attrs:\n    designation: invalidation\n    na...0\n    stage: !KeyOf default-invalidation-logout\n    target: !KeyOf flow\n  model: authentik_flows.flowstagebinding\n'

    @wraps(func)
    def wrapper(*args, **kwargs):
        for file in files:
            content = BlueprintInstance(path=file).retrieve()
            Importer.from_string(content).apply()
>       return func(*args, **kwargs)

.../blueprints/tests/__init__.py:25: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

args = (<tests.e2e.test_provider_proxy_forward.TestProviderProxyForward testMethod=test_traefik>,)
kwargs = {}, file = 'default/flow-default-provider-invalidation.yaml'
content = 'version: 1\nmetadata:\n  name: Default - Provider invalidation flow\nentries:\n- attrs:\n    designation: invalidatio...ation: none\n  identifiers:\n    slug: default-provider-invalidation-flow\n  model: authentik_flows.flow\n  id: flow\n'

    @wraps(func)
    def wrapper(*args, **kwargs):
        for file in files:
            content = BlueprintInstance(path=file).retrieve()
            Importer.from_string(content).apply()
>       return func(*args, **kwargs)

.../blueprints/tests/__init__.py:25: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

args = (<tests.e2e.test_provider_proxy_forward.TestProviderProxyForward testMethod=test_traefik>,)
kwargs = {}, file = 'system/providers-proxy.yaml'
content = 'version: 1\nmetadata:\n  labels:\n    blueprints.goauthentik.io/system: "true"\n  name: System - Proxy Provider - Sco...ser.group_attributes(request),\n                "is_superuser": request.user.is_superuser,\n            }\n        }\n'

    @wraps(func)
    def wrapper(*args, **kwargs):
        for file in files:
            content = BlueprintInstance(path=file).retrieve()
            Importer.from_string(content).apply()
>       return func(*args, **kwargs)

.../blueprints/tests/__init__.py:25: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

args = (<tests.e2e.test_provider_proxy_forward.TestProviderProxyForward testMethod=test_traefik>,)
kwargs = {}, config = <AuthentikCryptoConfig: authentik_crypto>

    @wraps(func)
    def wrapper(*args, **kwargs):
        config = apps.get_app_config(app_name)
        if isinstance(config, ManagedAppConfig):
            config._on_startup_callback(None)
>       return func(*args, **kwargs)

.../blueprints/tests/__init__.py:43: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_proxy_forward.TestProviderProxyForward testMethod=test_traefik>

    @apply_blueprint(
        "default/flow-default-authentication-flow.yaml",
        "default/flow-default-invalidation-flow.yaml",
    )
    @apply_blueprint(
        "default/flow-default-provider-authorization-implicit-consent.yaml",
        "default/flow-default-provider-invalidation.yaml",
    )
    @apply_blueprint(
        "system/providers-oauth2.yaml",
        "system/providers-proxy.yaml",
    )
    @reconcile_app("authentik_crypto")
    def prepare(self):
        proxy: ProxyProvider = ProxyProvider.objects.create(
            name=generate_id(),
            mode=ProxyMode.FORWARD_SINGLE,
            authorization_flow=Flow.objects.get(
                slug="default-provider-authorization-implicit-consent"
            ),
            invalidation_flow=Flow.objects.get(slug="default-provider-invalidation-flow"),
            internal_host=f"http://{self.host}",
            external_host="http://localhost",
        )
        # Ensure OAuth2 Params are set
        proxy.set_oauth_defaults()
        proxy.save()
        # we need to create an application to actually access the proxy
        Application.objects.create(name=generate_id(), slug=generate_id(), provider=proxy)
        outpost: Outpost = Outpost.objects.create(
            name=generate_id(),
            type=OutpostType.PROXY,
        )
        outpost.providers.add(proxy)
        outpost.build_user_permissions(outpost.user)
    
>       self.start_outpost(outpost)

tests/e2e/test_provider_proxy_forward.py:78: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_proxy_forward.TestProviderProxyForward testMethod=test_traefik>
outpost = <Outpost: Outpost n2Mj9kmPH7z74tpwpoXmC4aMwhqSP07jbAcS3hKs>

    def start_outpost(self, outpost: Outpost):
        """Start proxy container based on outpost created"""
>       self.run_container(
            image=self.get_container_image("ghcr.io/goauthentik/dev-proxy"),
            ports={
                "9000": "9000",
            },
            environment={
                "AUTHENTIK_TOKEN": outpost.token.key,
            },
            name="ak-test-outpost",
        )

tests/e2e/test_provider_proxy_forward.py:31: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_proxy_forward.TestProviderProxyForward testMethod=test_traefik>
specs = {'detach': True, 'environment': {'AUTHENTIK_HOST': 'http://10.1.0.141:53693', 'AUTHENTIK_TOKEN': 'VpuTRRPtnfOeIt3klplN....io/goauthentik/dev-proxy:gh-main', 'labels': {'io.goauthentik.test': 'O4IKbDKwLlcrabll9X82dju5gLdPHqHOUgqmNn01'}, ...}
container = <Container: f1f4ed52b10d>
state = {'Dead': False, 'Error': '', 'ExitCode': 0, 'FinishedAt': '0001-01-01T00:00:00Z', ...}

    def run_container(self, **specs: dict[str, Any]) -> Container:
        if "network_mode" not in specs:
            specs["network"] = self.__network.name
        specs["labels"] = self.docker_labels
        specs["detach"] = True
        if hasattr(self, "live_server_url"):
            specs.setdefault("environment", {})
            specs["environment"]["AUTHENTIK_HOST"] = self.live_server_url
        container = self.docker_client.containers.run(**specs)
        container.reload()
        state = container.attrs.get("State", {})
        if "Health" not in state:
            return container
>       self.wait_for_container(container)

tests/e2e/utils.py:117: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_proxy_forward.TestProviderProxyForward testMethod=test_traefik>
container = <Container: f1f4ed52b10d>

    def wait_for_container(self, container: Container):
        """Check that container is health"""
        attempt = 0
        while True:
            container.reload()
            status = container.attrs.get("State", {}).get("Health", {}).get("Status")
            if status == "healthy":
                return container
            sleep(1)
            attempt += 1
            if attempt >= self.max_healthcheck_attempts:
>               raise self.failureException("Container failed to start")
E               AssertionError: Container failed to start

tests/e2e/utils.py:91: AssertionError
tests.e2e.test_provider_proxy_forward.TestProviderProxyForward::test_envoy
Stack Traces | 53.6s run time
self = <unittest.case._Outcome object at 0x7f3505389f90>
test_case = <tests.e2e.test_provider_proxy_forward.TestProviderProxyForward testMethod=test_envoy>
subTest = False

    @contextlib.contextmanager
    def testPartExecutor(self, test_case, subTest=False):
        old_success = self.success
        self.success = True
        try:
>           yield

.../hostedtoolcache/Python/3.13.11........./x64/lib/python3.13/unittest/case.py:58: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_proxy_forward.TestProviderProxyForward testMethod=test_envoy>
result = <TestCaseFunction test_envoy>

    def run(self, result=None):
        if result is None:
            result = self.defaultTestResult()
            startTestRun = getattr(result, 'startTestRun', None)
            stopTestRun = getattr(result, 'stopTestRun', None)
            if startTestRun is not None:
                startTestRun()
        else:
            stopTestRun = None
    
        result.startTest(self)
        try:
            testMethod = getattr(self, self._testMethodName)
            if (getattr(self.__class__, "__unittest_skip__", False) or
                getattr(testMethod, "__unittest_skip__", False)):
                # If the class or method was skipped.
                skip_why = (getattr(self.__class__, '__unittest_skip_why__', '')
                            or getattr(testMethod, '__unittest_skip_why__', ''))
                _addSkip(result, self, skip_why)
                return result
    
            expecting_failure = (
                getattr(self, "__unittest_expecting_failure__", False) or
                getattr(testMethod, "__unittest_expecting_failure__", False)
            )
            outcome = _Outcome(result)
            start_time = time.perf_counter()
            try:
                self._outcome = outcome
    
                with outcome.testPartExecutor(self):
                    self._callSetUp()
                if outcome.success:
                    outcome.expecting_failure = expecting_failure
                    with outcome.testPartExecutor(self):
>                       self._callTestMethod(testMethod)

.../hostedtoolcache/Python/3.13.11........./x64/lib/python3.13/unittest/case.py:651: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_proxy_forward.TestProviderProxyForward testMethod=test_envoy>
method = <bound method TestProviderProxyForward.test_envoy of <tests.e2e.test_provider_proxy_forward.TestProviderProxyForward testMethod=test_envoy>>

    def _callTestMethod(self, method):
>       if method() is not None:

.../hostedtoolcache/Python/3.13.11........./x64/lib/python3.13/unittest/case.py:606: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_proxy_forward.TestProviderProxyForward testMethod=test_envoy>
args = (), kwargs = {}

    @wraps(func)
    def wrapper(self: TransactionTestCase, *args, **kwargs):
        """Run test again if we're below max_retries, including tearDown and
        setUp. Otherwise raise the error"""
        nonlocal count
        try:
>           return func(self, *args, **kwargs)

tests/e2e/utils.py:324: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_proxy_forward.TestProviderProxyForward testMethod=test_envoy>

    @retry()
    def test_envoy(self):
        """Test envoy"""
        self.run_container(
            image="docker.io/envoyproxy/envoy:v1.25-latest",
            ports={
                "10000": "80",
            },
            volumes={
                f"{Path(__file__).parent / "proxy_forward_auth" / "envoy_single" / "envoy.yaml"}": {
                    "bind": ".../etc/envoy/envoy.yaml",
                }
            },
        )
    
>       self.prepare()

tests/e2e/test_provider_proxy_forward.py:168: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

args = (<tests.e2e.test_provider_proxy_forward.TestProviderProxyForward testMethod=test_envoy>,)
kwargs = {}, file = 'default/flow-default-invalidation-flow.yaml'
content = 'version: 1\nmetadata:\n  name: Default - Invalidation flow\nentries:\n- attrs:\n    designation: invalidation\n    na...0\n    stage: !KeyOf default-invalidation-logout\n    target: !KeyOf flow\n  model: authentik_flows.flowstagebinding\n'

    @wraps(func)
    def wrapper(*args, **kwargs):
        for file in files:
            content = BlueprintInstance(path=file).retrieve()
            Importer.from_string(content).apply()
>       return func(*args, **kwargs)

.../blueprints/tests/__init__.py:25: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

args = (<tests.e2e.test_provider_proxy_forward.TestProviderProxyForward testMethod=test_envoy>,)
kwargs = {}, file = 'default/flow-default-provider-invalidation.yaml'
content = 'version: 1\nmetadata:\n  name: Default - Provider invalidation flow\nentries:\n- attrs:\n    designation: invalidatio...ation: none\n  identifiers:\n    slug: default-provider-invalidation-flow\n  model: authentik_flows.flow\n  id: flow\n'

    @wraps(func)
    def wrapper(*args, **kwargs):
        for file in files:
            content = BlueprintInstance(path=file).retrieve()
            Importer.from_string(content).apply()
>       return func(*args, **kwargs)

.../blueprints/tests/__init__.py:25: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

args = (<tests.e2e.test_provider_proxy_forward.TestProviderProxyForward testMethod=test_envoy>,)
kwargs = {}, file = 'system/providers-proxy.yaml'
content = 'version: 1\nmetadata:\n  labels:\n    blueprints.goauthentik.io/system: "true"\n  name: System - Proxy Provider - Sco...ser.group_attributes(request),\n                "is_superuser": request.user.is_superuser,\n            }\n        }\n'

    @wraps(func)
    def wrapper(*args, **kwargs):
        for file in files:
            content = BlueprintInstance(path=file).retrieve()
            Importer.from_string(content).apply()
>       return func(*args, **kwargs)

.../blueprints/tests/__init__.py:25: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

args = (<tests.e2e.test_provider_proxy_forward.TestProviderProxyForward testMethod=test_envoy>,)
kwargs = {}, config = <AuthentikCryptoConfig: authentik_crypto>

    @wraps(func)
    def wrapper(*args, **kwargs):
        config = apps.get_app_config(app_name)
        if isinstance(config, ManagedAppConfig):
            config._on_startup_callback(None)
>       return func(*args, **kwargs)

.../blueprints/tests/__init__.py:43: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_proxy_forward.TestProviderProxyForward testMethod=test_envoy>

    @apply_blueprint(
        "default/flow-default-authentication-flow.yaml",
        "default/flow-default-invalidation-flow.yaml",
    )
    @apply_blueprint(
        "default/flow-default-provider-authorization-implicit-consent.yaml",
        "default/flow-default-provider-invalidation.yaml",
    )
    @apply_blueprint(
        "system/providers-oauth2.yaml",
        "system/providers-proxy.yaml",
    )
    @reconcile_app("authentik_crypto")
    def prepare(self):
        proxy: ProxyProvider = ProxyProvider.objects.create(
            name=generate_id(),
            mode=ProxyMode.FORWARD_SINGLE,
            authorization_flow=Flow.objects.get(
                slug="default-provider-authorization-implicit-consent"
            ),
            invalidation_flow=Flow.objects.get(slug="default-provider-invalidation-flow"),
            internal_host=f"http://{self.host}",
            external_host="http://localhost",
        )
        # Ensure OAuth2 Params are set
        proxy.set_oauth_defaults()
        proxy.save()
        # we need to create an application to actually access the proxy
        Application.objects.create(name=generate_id(), slug=generate_id(), provider=proxy)
        outpost: Outpost = Outpost.objects.create(
            name=generate_id(),
            type=OutpostType.PROXY,
        )
        outpost.providers.add(proxy)
        outpost.build_user_permissions(outpost.user)
    
>       self.start_outpost(outpost)

tests/e2e/test_provider_proxy_forward.py:78: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_proxy_forward.TestProviderProxyForward testMethod=test_envoy>
outpost = <Outpost: Outpost UqmCxqAru9utva1gmp3IXi02ol2Uflj6d5KISIXy>

    def start_outpost(self, outpost: Outpost):
        """Start proxy container based on outpost created"""
>       self.run_container(
            image=self.get_container_image("ghcr.io/goauthentik/dev-proxy"),
            ports={
                "9000": "9000",
            },
            environment={
                "AUTHENTIK_TOKEN": outpost.token.key,
            },
            name="ak-test-outpost",
        )

tests/e2e/test_provider_proxy_forward.py:31: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_proxy_forward.TestProviderProxyForward testMethod=test_envoy>
specs = {'detach': True, 'environment': {'AUTHENTIK_HOST': 'http://10.1.0.141:53693', 'AUTHENTIK_TOKEN': '0LVwuLsqeUL34DJGBl2T....io/goauthentik/dev-proxy:gh-main', 'labels': {'io.goauthentik.test': 'O4IKbDKwLlcrabll9X82dju5gLdPHqHOUgqmNn01'}, ...}
container = <Container: 2741dea90053>
state = {'Dead': False, 'Error': '', 'ExitCode': 0, 'FinishedAt': '0001-01-01T00:00:00Z', ...}

    def run_container(self, **specs: dict[str, Any]) -> Container:
        if "network_mode" not in specs:
            specs["network"] = self.__network.name
        specs["labels"] = self.docker_labels
        specs["detach"] = True
        if hasattr(self, "live_server_url"):
            specs.setdefault("environment", {})
            specs["environment"]["AUTHENTIK_HOST"] = self.live_server_url
        container = self.docker_client.containers.run(**specs)
        container.reload()
        state = container.attrs.get("State", {})
        if "Health" not in state:
            return container
>       self.wait_for_container(container)

tests/e2e/utils.py:117: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_proxy_forward.TestProviderProxyForward testMethod=test_envoy>
container = <Container: 2741dea90053>

    def wait_for_container(self, container: Container):
        """Check that container is health"""
        attempt = 0
        while True:
            container.reload()
            status = container.attrs.get("State", {}).get("Health", {}).get("Status")
            if status == "healthy":
                return container
            sleep(1)
            attempt += 1
            if attempt >= self.max_healthcheck_attempts:
>               raise self.failureException("Container failed to start")
E               AssertionError: Container failed to start

tests/e2e/utils.py:91: AssertionError
tests.e2e.test_provider_saml.TestProviderSAML::test_sp_initiated_implicit_post_buffer
Stack Traces | 103s run time
self = <django.db.backends.utils.CursorWrapper object at 0x7f11670f7590>
sql = 'TRUNCATE "authentik_policies_reputation_reputation", "authentik_providers_microsoft_entra_microsoftentraprovideruser"...ice", "authentik_enterprise_licenseusage", "authentik_core_user_ak_groups", "authentik_providers_oauth2_refreshtoken";'
params = None
ignored_wrapper_args = (False, {'connection': <DatabaseWrapper vendor='postgresql' alias='default'>, 'cursor': <django.db.backends.utils.CursorWrapper object at 0x7f11670f7590>})

    def _execute(self, sql, params, *ignored_wrapper_args):
        # Raise a warning during app initialization (stored_app_configs is only
        # ever set during testing).
        if not apps.ready and not apps.stored_app_configs:
            warnings.warn(self.APPS_NOT_READY_WARNING_MSG, category=RuntimeWarning)
        self.db.validate_no_broken_transaction()
        with self.db.wrap_database_errors:
            if params is None:
                # params default might be backend specific.
>               return self.cursor.execute(sql)

.venv/lib/python3.13.../db/backends/utils.py:103: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <django_prometheus.db.common.ExportingCursorWrapper.<locals>.CursorWrapper [closed] [IDLE] (host=localhost user=authentik database=test_authentik) at 0x7f1169eaf4d0>
args = ('TRUNCATE "authentik_policies_reputation_reputation", "authentik_providers_microsoft_entra_microsoftentraprovideruser...e", "authentik_enterprise_licenseusage", "authentik_core_user_ak_groups", "authentik_providers_oauth2_refreshtoken";',)
kwargs = {}

    def execute(self, *args, **kwargs):
        execute_total.labels(alias, vendor).inc()
        with (
            query_duration_seconds.labels(**labels).time(),
            ExceptionCounterByType(errors_total, extra_labels=labels),
        ):
>           return super().execute(*args, **kwargs)

.venv/lib/python3.13.../django_prometheus/db/common.py:69: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <django_prometheus.db.common.ExportingCursorWrapper.<locals>.CursorWrapper [closed] [IDLE] (host=localhost user=authentik database=test_authentik) at 0x7f1169eaf4d0>
query = 'TRUNCATE "authentik_policies_reputation_reputation", "authentik_providers_microsoft_entra_microsoftentraprovideruser"...ice", "authentik_enterprise_licenseusage", "authentik_core_user_ak_groups", "authentik_providers_oauth2_refreshtoken";'
params = None

    def execute(
        self,
        query: Query,
        params: Params | None = None,
        *,
        prepare: bool | None = None,
        binary: bool | None = None,
    ) -> Self:
        """
        Execute a query or command to the database.
        """
        try:
            with self._conn.lock:
                self._conn.wait(
                    self._execute_gen(query, params, prepare=prepare, binary=binary)
                )
        except e._NO_TRACEBACK as ex:
>           raise ex.with_traceback(None)
E           psycopg.errors.DeadlockDetected: deadlock detected
E           DETAIL:  Process 745 waits for AccessExclusiveLock on relation 18141 of database 16389; blocked by process 875.
E           Process 875 waits for AccessShareLock on relation 20226 of database 16389; blocked by process 745.
E           HINT:  See server log for query details.

.venv/lib/python3.13....../site-packages/psycopg/cursor.py:97: DeadlockDetected

The above exception was the direct cause of the following exception:

self = <django.core.management.commands.flush.Command object at 0x7f1171c93890>
options = {'allow_cascade': False, 'database': 'default', 'force_color': False, 'inhibit_post_migrate': False, ...}
database = 'default'
connection = <DatabaseWrapper vendor='postgresql' alias='default'>
verbosity = 0, interactive = False, reset_sequences = False
allow_cascade = False, inhibit_post_migrate = False

        def handle(self, **options):
            database = options["database"]
            connection = connections[database]
            verbosity = options["verbosity"]
            interactive = options["interactive"]
            # The following are stealth options used by Django's internals.
            reset_sequences = options.get("reset_sequences", True)
            allow_cascade = options.get("allow_cascade", False)
            inhibit_post_migrate = options.get("inhibit_post_migrate", False)
    
            self.style = no_style()
    
            # Import the 'management' module within each installed app, to register
            # dispatcher events.
            for app_config in apps.get_app_configs():
                try:
                    import_module(".management", app_config.name)
                except ImportError:
                    pass
    
            sql_list = sql_flush(
                self.style,
                connection,
                reset_sequences=reset_sequences,
                allow_cascade=allow_cascade,
            )
    
            if interactive:
                confirm = input(
                    """You have requested a flush of the database.
    This will IRREVERSIBLY DESTROY all data currently in the "%s" database,
    and return each table to an empty state.
    Are you sure you want to do this?
    
        Type 'yes' to continue, or 'no' to cancel: """
                    % connection.settings_dict["NAME"]
                )
            else:
                confirm = "yes"
    
            if confirm == "yes":
                try:
>                   connection.ops.execute_sql_flush(sql_list)

.venv/lib/python3.13.../management/commands/flush.py:74: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <psqlextra.backend.operations.PostgresOperations object at 0x7f1171f1f4d0>
sql_list = ['TRUNCATE "authentik_policies_reputation_reputation", "authentik_providers_microsoft_entra_microsoftentraprovideruser...ce", "authentik_enterprise_licenseusage", "authentik_core_user_ak_groups", "authentik_providers_oauth2_refreshtoken";']

    def execute_sql_flush(self, sql_list):
        """Execute a list of SQL statements to flush the database."""
        with transaction.atomic(
            using=self.connection.alias,
            savepoint=self.connection.features.can_rollback_ddl,
        ):
            with self.connection.cursor() as cursor:
                for sql in sql_list:
>                   cursor.execute(sql)

.venv/lib/python3.13.../backends/base/operations.py:473: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

args = (<django.db.backends.utils.CursorWrapper object at 0x7f11670f7590>, 'TRUNCATE "authentik_policies_reputation_reputatio...ce", "authentik_enterprise_licenseusage", "authentik_core_user_ak_groups", "authentik_providers_oauth2_refreshtoken";')
kwargs = {}

    def runner(*args: "P.args", **kwargs: "P.kwargs"):
        # type: (...) -> R
        if sentry_sdk.get_client().get_integration(integration) is None:
            return original_function(*args, **kwargs)
    
>       return sentry_patched_function(*args, **kwargs)

.venv/lib/python3.13.../site-packages/sentry_sdk/utils.py:1816: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <django.db.backends.utils.CursorWrapper object at 0x7f11670f7590>
sql = 'TRUNCATE "authentik_policies_reputation_reputation", "authentik_providers_microsoft_entra_microsoftentraprovideruser"...ice", "authentik_enterprise_licenseusage", "authentik_core_user_ak_groups", "authentik_providers_oauth2_refreshtoken";'
params = None

    @ensure_integration_enabled(DjangoIntegration, real_execute)
    def execute(self, sql, params=None):
        # type: (CursorWrapper, Any, Optional[Any]) -> Any
        with record_sql_queries(
            cursor=self.cursor,
            query=sql,
            params_list=params,
            paramstyle="format",
            executemany=False,
            span_origin=DjangoIntegration.origin_db,
        ) as span:
            _set_db_data(span, self)
>           result = real_execute(self, sql, params)

.venv/lib/python3.13.../integrations/django/__init__.py:651: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <django.db.backends.utils.CursorWrapper object at 0x7f11670f7590>
sql = 'TRUNCATE "authentik_policies_reputation_reputation", "authentik_providers_microsoft_entra_microsoftentraprovideruser"...ice", "authentik_enterprise_licenseusage", "authentik_core_user_ak_groups", "authentik_providers_oauth2_refreshtoken";'
params = None

    def execute(self, sql, params=None):
>       return self._execute_with_wrappers(
            sql, params, many=False, executor=self._execute
        )

.venv/lib/python3.13.../db/backends/utils.py:79: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <django.db.backends.utils.CursorWrapper object at 0x7f11670f7590>
sql = 'TRUNCATE "authentik_policies_reputation_reputation", "authentik_providers_microsoft_entra_microsoftentraprovideruser"...ice", "authentik_enterprise_licenseusage", "authentik_core_user_ak_groups", "authentik_providers_oauth2_refreshtoken";'
params = None, many = False
executor = <bound method CursorWrapper._execute of <django.db.backends.utils.CursorWrapper object at 0x7f11670f7590>>

    def _execute_with_wrappers(self, sql, params, many, executor):
        context = {"connection": self.db, "cursor": self}
        for wrapper in reversed(self.db.execute_wrappers):
            executor = functools.partial(wrapper, executor)
>       return executor(sql, params, many, context)

.venv/lib/python3.13.../db/backends/utils.py:92: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <django.db.backends.utils.CursorWrapper object at 0x7f11670f7590>
sql = 'TRUNCATE "authentik_policies_reputation_reputation", "authentik_providers_microsoft_entra_microsoftentraprovideruser"...ice", "authentik_enterprise_licenseusage", "authentik_core_user_ak_groups", "authentik_providers_oauth2_refreshtoken";'
params = None
ignored_wrapper_args = (False, {'connection': <DatabaseWrapper vendor='postgresql' alias='default'>, 'cursor': <django.db.backends.utils.CursorWrapper object at 0x7f11670f7590>})

    def _execute(self, sql, params, *ignored_wrapper_args):
        # Raise a warning during app initialization (stored_app_configs is only
        # ever set during testing).
        if not apps.ready and not apps.stored_app_configs:
            warnings.warn(self.APPS_NOT_READY_WARNING_MSG, category=RuntimeWarning)
        self.db.validate_no_broken_transaction()
>       with self.db.wrap_database_errors:

.venv/lib/python3.13.../db/backends/utils.py:100: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <django.db.utils.DatabaseErrorWrapper object at 0x7f116e5b63c0>
exc_type = <class 'psycopg.errors.DeadlockDetected'>
exc_value = DeadlockDetected('deadlock detected\nDETAIL:  Process 745 waits for AccessExclusiveLock on relation 18141 of database ...ccessShareLock on relation 20226 of database 16389; blocked by process 745.\nHINT:  See server log for query details.')
traceback = <traceback object at 0x7f116706c080>

    def __exit__(self, exc_type, exc_value, traceback):
        if exc_type is None:
            return
        for dj_exc_type in (
            DataError,
            OperationalError,
            IntegrityError,
            InternalError,
            ProgrammingError,
            NotSupportedError,
            DatabaseError,
            InterfaceError,
            Error,
        ):
            db_exc_type = getattr(self.wrapper.Database, dj_exc_type.__name__)
            if issubclass(exc_type, db_exc_type):
                dj_exc_value = dj_exc_type(*exc_value.args)
                # Only set the 'errors_occurred' flag for errors that may make
                # the connection unusable.
                if dj_exc_type not in (DataError, IntegrityError):
                    self.wrapper.errors_occurred = True
>               raise dj_exc_value.with_traceback(traceback) from exc_value

.venv/lib/python3.13.../django/db/utils.py:91: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <django.db.backends.utils.CursorWrapper object at 0x7f11670f7590>
sql = 'TRUNCATE "authentik_policies_reputation_reputation", "authentik_providers_microsoft_entra_microsoftentraprovideruser"...ice", "authentik_enterprise_licenseusage", "authentik_core_user_ak_groups", "authentik_providers_oauth2_refreshtoken";'
params = None
ignored_wrapper_args = (False, {'connection': <DatabaseWrapper vendor='postgresql' alias='default'>, 'cursor': <django.db.backends.utils.CursorWrapper object at 0x7f11670f7590>})

    def _execute(self, sql, params, *ignored_wrapper_args):
        # Raise a warning during app initialization (stored_app_configs is only
        # ever set during testing).
        if not apps.ready and not apps.stored_app_configs:
            warnings.warn(self.APPS_NOT_READY_WARNING_MSG, category=RuntimeWarning)
        self.db.validate_no_broken_transaction()
        with self.db.wrap_database_errors:
            if params is None:
                # params default might be backend specific.
>               return self.cursor.execute(sql)

.venv/lib/python3.13.../db/backends/utils.py:103: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <django_prometheus.db.common.ExportingCursorWrapper.<locals>.CursorWrapper [closed] [IDLE] (host=localhost user=authentik database=test_authentik) at 0x7f1169eaf4d0>
args = ('TRUNCATE "authentik_policies_reputation_reputation", "authentik_providers_microsoft_entra_microsoftentraprovideruser...e", "authentik_enterprise_licenseusage", "authentik_core_user_ak_groups", "authentik_providers_oauth2_refreshtoken";',)
kwargs = {}

    def execute(self, *args, **kwargs):
        execute_total.labels(alias, vendor).inc()
        with (
            query_duration_seconds.labels(**labels).time(),
            ExceptionCounterByType(errors_total, extra_labels=labels),
        ):
>           return super().execute(*args, **kwargs)

.venv/lib/python3.13.../django_prometheus/db/common.py:69: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <django_prometheus.db.common.ExportingCursorWrapper.<locals>.CursorWrapper [closed] [IDLE] (host=localhost user=authentik database=test_authentik) at 0x7f1169eaf4d0>
query = 'TRUNCATE "authentik_policies_reputation_reputation", "authentik_providers_microsoft_entra_microsoftentraprovideruser"...ice", "authentik_enterprise_licenseusage", "authentik_core_user_ak_groups", "authentik_providers_oauth2_refreshtoken";'
params = None

    def execute(
        self,
        query: Query,
        params: Params | None = None,
        *,
        prepare: bool | None = None,
        binary: bool | None = None,
    ) -> Self:
        """
        Execute a query or command to the database.
        """
        try:
            with self._conn.lock:
                self._conn.wait(
                    self._execute_gen(query, params, prepare=prepare, binary=binary)
                )
        except e._NO_TRACEBACK as ex:
>           raise ex.with_traceback(None)
E           django.db.utils.OperationalError: deadlock detected
E           DETAIL:  Process 745 waits for AccessExclusiveLock on relation 18141 of database 16389; blocked by process 875.
E           Process 875 waits for AccessShareLock on relation 20226 of database 16389; blocked by process 745.
E           HINT:  See server log for query details.

.venv/lib/python3.13....../site-packages/psycopg/cursor.py:97: OperationalError

The above exception was the direct cause of the following exception:

self = <tests.e2e.test_provider_saml.TestProviderSAML testMethod=test_sp_initiated_implicit_post_buffer>
result = <TestCaseFunction test_sp_initiated_implicit_post_buffer>
debug = False

    def _setup_and_call(self, result, debug=False):
        """
        Perform the following in order: pre-setup, run test, post-teardown,
        skipping pre/post hooks if test is set to be skipped.
    
        If debug=True, reraise any errors in setup and use super().debug()
        instead of __call__() to run the test.
        """
        testMethod = getattr(self, self._testMethodName)
        skipped = getattr(self.__class__, "__unittest_skip__", False) or getattr(
            testMethod, "__unittest_skip__", False
        )
    
        # Convert async test methods.
        if iscoroutinefunction(testMethod):
            setattr(self, self._testMethodName, async_to_sync(testMethod))
    
        if not skipped:
            try:
                if self.__class__._pre_setup_ran_eagerly:
                    self.__class__._pre_setup_ran_eagerly = False
                else:
                    self._pre_setup()
            except Exception:
                if debug:
                    raise
                result.addError(self, sys.exc_info())
                return
        if debug:
            super().debug()
        else:
            super().__call__(result)
        if not skipped:
            try:
>               self._post_teardown()

.venv/lib/python3.13.../django/test/testcases.py:379: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_saml.TestProviderSAML testMethod=test_sp_initiated_implicit_post_buffer>

    def _post_teardown(self):
        """
        Perform post-test things:
        * Flush the contents of the database to leave a clean slate. If the
          class has an 'available_apps' attribute, don't fire post_migrate.
        * Force-close the connection so the next test gets a clean cursor.
        """
        try:
>           self._fixture_teardown()

.venv/lib/python3.13.../django/test/testcases.py:1231: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_saml.TestProviderSAML testMethod=test_sp_initiated_implicit_post_buffer>

    def _fixture_teardown(self):
        # Allow TRUNCATE ... CASCADE and don't emit the post_migrate signal
        # when flushing only a subset of the apps
        for db_name in self._databases_names(include_mirrors=False):
            # Flush the database
            inhibit_post_migrate = (
                self.available_apps is not None
                or (  # Inhibit the post_migrate signal when using serialized
                    # rollback to avoid trying to recreate the serialized data.
                    self.serialized_rollback
                    and hasattr(connections[db_name], "_test_serialized_contents")
                )
            )
>           call_command(
                "flush",
                verbosity=0,
                interactive=False,
                database=db_name,
                reset_sequences=False,
                allow_cascade=self.available_apps is not None,
                inhibit_post_migrate=inhibit_post_migrate,
            )

.venv/lib/python3.13.../django/test/testcases.py:1266: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

command_name = 'flush', args = ()
options = {'allow_cascade': False, 'database': 'default', 'inhibit_post_migrate': False, 'interactive': False, ...}
command = <django.core.management.commands.flush.Command object at 0x7f1171c93890>
app_name = 'django.core'
parser = CommandParser(prog=' flush', usage=None, description='Removes ALL DATA from the database, including data added during ....', formatter_class=<class 'django.core.management.base.DjangoHelpFormatter'>, conflict_handler='error', add_help=True)
opt_mapping = {'database': 'database', 'force_color': 'force_color', 'help': 'help', 'no_color': 'no_color', ...}
arg_options = {'allow_cascade': False, 'database': 'default', 'inhibit_post_migrate': False, 'interactive': False, ...}
parse_args = []

    def call_command(command_name, *args, **options):
        """
        Call the given command, with the given options and args/kwargs.
    
        This is the primary API you should use for calling specific commands.
    
        `command_name` may be a string or a command object. Using a string is
        preferred unless the command object is required for further processing or
        testing.
    
        Some examples:
            call_command('migrate')
            call_command('shell', plain=True)
            call_command('sqlmigrate', 'myapp')
    
            from django.core.management.commands import flush
            cmd = flush.Command()
            call_command(cmd, verbosity=0, interactive=False)
            # Do something with cmd ...
        """
        if isinstance(command_name, BaseCommand):
            # Command object passed in.
            command = command_name
            command_name = command.__class__.__module__.split(".")[-1]
        else:
            # Load the command object by name.
            try:
                app_name = get_commands()[command_name]
            except KeyError:
                raise CommandError("Unknown command: %r" % command_name)
    
            if isinstance(app_name, BaseCommand):
                # If the command is already loaded, use it directly.
                command = app_name
            else:
                command = load_command_class(app_name, command_name)
    
        # Simulate argument parsing to get the option defaults (see #10080 for details).
        parser = command.create_parser("", command_name)
        # Use the `dest` option name from the parser option
        opt_mapping = {
            min(s_opt.option_strings).lstrip("-").replace("-", "_"): s_opt.dest
            for s_opt in parser._actions
            if s_opt.option_strings
        }
        arg_options = {opt_mapping.get(key, key): value for key, value in options.items()}
        parse_args = []
        for arg in args:
            if isinstance(arg, (list, tuple)):
                parse_args += map(str, arg)
            else:
                parse_args.append(str(arg))
    
        def get_actions(parser):
            # Parser actions and actions from sub-parser choices.
            for opt in parser._actions:
                if isinstance(opt, _SubParsersAction):
                    for sub_opt in opt.choices.values():
                        yield from get_actions(sub_opt)
                else:
                    yield opt
    
        parser_actions = list(get_actions(parser))
        mutually_exclusive_required_options = {
            opt
            for group in parser._mutually_exclusive_groups
            for opt in group._group_actions
            if group.required
        }
        # Any required arguments which are passed in via **options must be passed
        # to parse_args().
        for opt in parser_actions:
            if opt.dest in options and (
                opt.required or opt in mutually_exclusive_required_options
            ):
                opt_dest_count = sum(v == opt.dest for v in opt_mapping.values())
                if opt_dest_count > 1:
                    raise TypeError(
                        f"Cannot pass the dest {opt.dest!r} that matches multiple "
                        f"arguments via **options."
                    )
                parse_args.append(min(opt.option_strings))
                if isinstance(opt, (_AppendConstAction, _CountAction, _StoreConstAction)):
                    continue
                value = arg_options[opt.dest]
                if isinstance(value, (list, tuple)):
                    parse_args += map(str, value)
                else:
                    parse_args.append(str(value))
        defaults = parser.parse_args(args=parse_args)
        defaults = dict(defaults._get_kwargs(), **arg_options)
        # Raise an error if any unknown options were passed.
        stealth_options = set(command.base_stealth_options + command.stealth_options)
        dest_parameters = {action.dest for action in parser_actions}
        valid_options = (dest_parameters | stealth_options).union(opt_mapping)
        unknown_options = set(options) - valid_options
        if unknown_options:
            raise TypeError(
                "Unknown option(s) for %s command: %s. "
                "Valid options are: %s."
                % (
                    command_name,
                    ", ".join(sorted(unknown_options)),
                    ", ".join(sorted(valid_options)),
                )
            )
        # Move positional args out of options to mimic legacy optparse
        args = defaults.pop("args", ())
        if "skip_checks" not in options:
            defaults["skip_checks"] = True
    
>       return command.execute(*args, **defaults)

.venv/lib/python3.13.../core/management/__init__.py:194: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <django.core.management.commands.flush.Command object at 0x7f1171c93890>
args = ()
options = {'allow_cascade': False, 'database': 'default', 'force_color': False, 'inhibit_post_migrate': False, ...}

    def execute(self, *args, **options):
        """
        Try to execute this command, performing system checks if needed (as
        controlled by the ``requires_system_checks`` attribute, except if
        force-skipped).
        """
        if options["force_color"] and options["no_color"]:
            raise CommandError(
                "The --no-color and --force-color options can't be used together."
            )
        if options["force_color"]:
            self.style = color_style(force_color=True)
        elif options["no_color"]:
            self.style = no_style()
            self.stderr.style_func = None
        if options.get("stdout"):
            self.stdout = OutputWrapper(options["stdout"])
        if options.get("stderr"):
            self.stderr = OutputWrapper(options["stderr"])
    
        if self.requires_system_checks and not options["skip_checks"]:
            check_kwargs = self.get_check_kwargs(options)
            self.check(**check_kwargs)
        if self.requires_migrations_checks:
            self.check_migrations()
>       output = self.handle(*args, **options)

.venv/lib/python3.13.../core/management/base.py:460: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <django.core.management.commands.flush.Command object at 0x7f1171c93890>
options = {'allow_cascade': False, 'database': 'default', 'force_color': False, 'inhibit_post_migrate': False, ...}
database = 'default'
connection = <DatabaseWrapper vendor='postgresql' alias='default'>
verbosity = 0, interactive = False, reset_sequences = False
allow_cascade = False, inhibit_post_migrate = False

        def handle(self, **options):
            database = options["database"]
            connection = connections[database]
            verbosity = options["verbosity"]
            interactive = options["interactive"]
            # The following are stealth options used by Django's internals.
            reset_sequences = options.get("reset_sequences", True)
            allow_cascade = options.get("allow_cascade", False)
            inhibit_post_migrate = options.get("inhibit_post_migrate", False)
    
            self.style = no_style()
    
            # Import the 'management' module within each installed app, to register
            # dispatcher events.
            for app_config in apps.get_app_configs():
                try:
                    import_module(".management", app_config.name)
                except ImportError:
                    pass
    
            sql_list = sql_flush(
                self.style,
                connection,
                reset_sequences=reset_sequences,
                allow_cascade=allow_cascade,
            )
    
            if interactive:
                confirm = input(
                    """You have requested a flush of the database.
    This will IRREVERSIBLY DESTROY all data currently in the "%s" database,
    and return each table to an empty state.
    Are you sure you want to do this?
    
        Type 'yes' to continue, or 'no' to cancel: """
                    % connection.settings_dict["NAME"]
                )
            else:
                confirm = "yes"
    
            if confirm == "yes":
                try:
                    connection.ops.execute_sql_flush(sql_list)
                except Exception as exc:
>                   raise CommandError(
                        "Database %s couldn't be flushed. Possible reasons:\n"
                        "  * The database isn't running or isn't configured correctly.\n"
                        "  * At least one of the expected database tables doesn't exist.\n"
                        "  * The SQL was invalid.\n"
                        "Hint: Look at the output of 'django-admin sqlflush'. "
                        "That's the SQL this command wasn't able to run."
                        % (connection.settings_dict["NAME"],)
                    ) from exc
E                   django.core.management.base.CommandError: Database test_authentik couldn't be flushed. Possible reasons:
E                     * The database isn't running or isn't configured correctly.
E                     * At least one of the expected database tables doesn't exist.
E                     * The SQL was invalid.
E                   Hint: Look at the output of 'django-admin sqlflush'. That's the SQL this command wasn't able to run.

.venv/lib/python3.13.../management/commands/flush.py:76: CommandError
tests.e2e.test_provider_radius.TestProviderRadius::test_radius_bind_success
Stack Traces | 131s run time
self = <unittest.case._Outcome object at 0x7f7586ba30e0>
test_case = <tests.e2e.test_provider_radius.TestProviderRadius testMethod=test_radius_bind_success>
subTest = False

    @contextlib.contextmanager
    def testPartExecutor(self, test_case, subTest=False):
        old_success = self.success
        self.success = True
        try:
>           yield

.../hostedtoolcache/Python/3.13.11........./x64/lib/python3.13/unittest/case.py:58: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_radius.TestProviderRadius testMethod=test_radius_bind_success>
result = <TestCaseFunction test_radius_bind_success>

    def run(self, result=None):
        if result is None:
            result = self.defaultTestResult()
            startTestRun = getattr(result, 'startTestRun', None)
            stopTestRun = getattr(result, 'stopTestRun', None)
            if startTestRun is not None:
                startTestRun()
        else:
            stopTestRun = None
    
        result.startTest(self)
        try:
            testMethod = getattr(self, self._testMethodName)
            if (getattr(self.__class__, "__unittest_skip__", False) or
                getattr(testMethod, "__unittest_skip__", False)):
                # If the class or method was skipped.
                skip_why = (getattr(self.__class__, '__unittest_skip_why__', '')
                            or getattr(testMethod, '__unittest_skip_why__', ''))
                _addSkip(result, self, skip_why)
                return result
    
            expecting_failure = (
                getattr(self, "__unittest_expecting_failure__", False) or
                getattr(testMethod, "__unittest_expecting_failure__", False)
            )
            outcome = _Outcome(result)
            start_time = time.perf_counter()
            try:
                self._outcome = outcome
    
                with outcome.testPartExecutor(self):
                    self._callSetUp()
                if outcome.success:
                    outcome.expecting_failure = expecting_failure
                    with outcome.testPartExecutor(self):
>                       self._callTestMethod(testMethod)

.../hostedtoolcache/Python/3.13.11........./x64/lib/python3.13/unittest/case.py:651: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_radius.TestProviderRadius testMethod=test_radius_bind_success>
method = <bound method TestProviderRadius.test_radius_bind_success of <tests.e2e.test_provider_radius.TestProviderRadius testMethod=test_radius_bind_success>>

    def _callTestMethod(self, method):
>       if method() is not None:

.../hostedtoolcache/Python/3.13.11........./x64/lib/python3.13/unittest/case.py:606: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_radius.TestProviderRadius testMethod=test_radius_bind_success>
args = (), kwargs = {}

    @wraps(func)
    def wrapper(self: TransactionTestCase, *args, **kwargs):
        """Run test again if we're below max_retries, including tearDown and
        setUp. Otherwise raise the error"""
        nonlocal count
        try:
>           return func(self, *args, **kwargs)

tests/e2e/utils.py:324: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

args = (<tests.e2e.test_provider_radius.TestProviderRadius testMethod=test_radius_bind_success>,)
kwargs = {}, file = 'default/flow-default-invalidation-flow.yaml'
content = 'version: 1\nmetadata:\n  name: Default - Invalidation flow\nentries:\n- attrs:\n    designation: invalidation\n    na...0\n    stage: !KeyOf default-invalidation-logout\n    target: !KeyOf flow\n  model: authentik_flows.flowstagebinding\n'

    @wraps(func)
    def wrapper(*args, **kwargs):
        for file in files:
            content = BlueprintInstance(path=file).retrieve()
            Importer.from_string(content).apply()
>       return func(*args, **kwargs)

.../blueprints/tests/__init__.py:25: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_radius.TestProviderRadius testMethod=test_radius_bind_success>

    @retry(exceptions=[Timeout])
    @apply_blueprint(
        "default/flow-default-authentication-flow.yaml",
        "default/flow-default-invalidation-flow.yaml",
    )
    def test_radius_bind_success(self):
        """Test simple bind"""
>       self._prepare()

tests/e2e/test_provider_radius.py:64: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_radius.TestProviderRadius testMethod=test_radius_bind_success>

    def _prepare(self) -> User:
        """prepare user, provider, app and container"""
        radius: RadiusProvider = RadiusProvider.objects.create(
            name=generate_id(),
            authorization_flow=Flow.objects.get(slug="default-authentication-flow"),
            shared_secret=self.shared_secret,
        )
        # we need to create an application to actually access radius
        Application.objects.create(name=generate_id(), slug=generate_id(), provider=radius)
        outpost: Outpost = Outpost.objects.create(
            name=generate_id(),
            type=OutpostType.RADIUS,
            _config=asdict(OutpostConfig(log_level="debug")),
        )
        outpost.providers.add(radius)
    
>       self.start_radius(outpost)

tests/e2e/test_provider_radius.py:52: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_radius.TestProviderRadius testMethod=test_radius_bind_success>
outpost = <Outpost: Outpost iXP4sAPmw64xus5QT1G4kR14nLCt8cVAuXsGfidc>

    def start_radius(self, outpost: Outpost):
        """Start radius container based on outpost created"""
>       self.run_container(
            image=self.get_container_image("ghcr.io/goauthentik/dev-radius"),
            ports={"1812/udp": 1812},
            environment={
                "AUTHENTIK_TOKEN": outpost.token.key,
            },
        )

tests/e2e/test_provider_radius.py:28: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_radius.TestProviderRadius testMethod=test_radius_bind_success>
specs = {'detach': True, 'environment': {'AUTHENTIK_HOST': 'http://10.1.0.150:41543', 'AUTHENTIK_TOKEN': '4MyOFVAYtul49xMqvham...io/goauthentik/dev-radius:gh-main', 'labels': {'io.goauthentik.test': 'FKbQpXsFmusWltkF3vZLXKN1Tsp8jD0GEQRws2R3'}, ...}
container = <Container: bbe70a333a19>
state = {'Dead': False, 'Error': '', 'ExitCode': 0, 'FinishedAt': '0001-01-01T00:00:00Z', ...}

    def run_container(self, **specs: dict[str, Any]) -> Container:
        if "network_mode" not in specs:
            specs["network"] = self.__network.name
        specs["labels"] = self.docker_labels
        specs["detach"] = True
        if hasattr(self, "live_server_url"):
            specs.setdefault("environment", {})
            specs["environment"]["AUTHENTIK_HOST"] = self.live_server_url
        container = self.docker_client.containers.run(**specs)
        container.reload()
        state = container.attrs.get("State", {})
        if "Health" not in state:
            return container
>       self.wait_for_container(container)

tests/e2e/utils.py:117: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_radius.TestProviderRadius testMethod=test_radius_bind_success>
container = <Container: bbe70a333a19>

    def wait_for_container(self, container: Container):
        """Check that container is health"""
        attempt = 0
        while True:
            container.reload()
            status = container.attrs.get("State", {}).get("Health", {}).get("Status")
            if status == "healthy":
                return container
            sleep(1)
            attempt += 1
            if attempt >= self.max_healthcheck_attempts:
>               raise self.failureException("Container failed to start")
E               AssertionError: Container failed to start

tests/e2e/utils.py:91: AssertionError
tests.e2e.test_provider_ldap.TestProviderLDAP::test_ldap_bind_search
Stack Traces | 133s run time
self = <unittest.case._Outcome object at 0x7f8e04f39e80>
test_case = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_search>
subTest = False

    @contextlib.contextmanager
    def testPartExecutor(self, test_case, subTest=False):
        old_success = self.success
        self.success = True
        try:
>           yield

.../hostedtoolcache/Python/3.13.11........./x64/lib/python3.13/unittest/case.py:58: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_search>
result = <TestCaseFunction test_ldap_bind_search>

    def run(self, result=None):
        if result is None:
            result = self.defaultTestResult()
            startTestRun = getattr(result, 'startTestRun', None)
            stopTestRun = getattr(result, 'stopTestRun', None)
            if startTestRun is not None:
                startTestRun()
        else:
            stopTestRun = None
    
        result.startTest(self)
        try:
            testMethod = getattr(self, self._testMethodName)
            if (getattr(self.__class__, "__unittest_skip__", False) or
                getattr(testMethod, "__unittest_skip__", False)):
                # If the class or method was skipped.
                skip_why = (getattr(self.__class__, '__unittest_skip_why__', '')
                            or getattr(testMethod, '__unittest_skip_why__', ''))
                _addSkip(result, self, skip_why)
                return result
    
            expecting_failure = (
                getattr(self, "__unittest_expecting_failure__", False) or
                getattr(testMethod, "__unittest_expecting_failure__", False)
            )
            outcome = _Outcome(result)
            start_time = time.perf_counter()
            try:
                self._outcome = outcome
    
                with outcome.testPartExecutor(self):
                    self._callSetUp()
                if outcome.success:
                    outcome.expecting_failure = expecting_failure
                    with outcome.testPartExecutor(self):
>                       self._callTestMethod(testMethod)

.../hostedtoolcache/Python/3.13.11........./x64/lib/python3.13/unittest/case.py:651: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_search>
method = <bound method TestProviderLDAP.test_ldap_bind_search of <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_search>>

    def _callTestMethod(self, method):
>       if method() is not None:

.../hostedtoolcache/Python/3.13.11........./x64/lib/python3.13/unittest/case.py:606: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_search>
args = (), kwargs = {}

    @wraps(func)
    def wrapper(self: TransactionTestCase, *args, **kwargs):
        """Run test again if we're below max_retries, including tearDown and
        setUp. Otherwise raise the error"""
        nonlocal count
        try:
>           return func(self, *args, **kwargs)

tests/e2e/utils.py:324: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

args = (<tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_search>,)
kwargs = {}, file = 'default/flow-default-invalidation-flow.yaml'
content = 'version: 1\nmetadata:\n  name: Default - Invalidation flow\nentries:\n- attrs:\n    designation: invalidation\n    na...0\n    stage: !KeyOf default-invalidation-logout\n    target: !KeyOf flow\n  model: authentik_flows.flowstagebinding\n'

    @wraps(func)
    def wrapper(*args, **kwargs):
        for file in files:
            content = BlueprintInstance(path=file).retrieve()
            Importer.from_string(content).apply()
>       return func(*args, **kwargs)

.../blueprints/tests/__init__.py:25: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

args = (<tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_search>,)
kwargs = {}, config = <AuthentikTenantsConfig: authentik_tenants>

    @wraps(func)
    def wrapper(*args, **kwargs):
        config = apps.get_app_config(app_name)
        if isinstance(config, ManagedAppConfig):
            config._on_startup_callback(None)
>       return func(*args, **kwargs)

.../blueprints/tests/__init__.py:43: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

args = (<tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_search>,)
kwargs = {}, config = <AuthentikOutpostConfig: authentik_outposts>

    @wraps(func)
    def wrapper(*args, **kwargs):
        config = apps.get_app_config(app_name)
        if isinstance(config, ManagedAppConfig):
            config._on_startup_callback(None)
>       return func(*args, **kwargs)

.../blueprints/tests/__init__.py:43: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_search>

    @retry()
    @apply_blueprint(
        "default/flow-default-authentication-flow.yaml",
        "default/flow-default-invalidation-flow.yaml",
    )
    @reconcile_app("authentik_tenants")
    @reconcile_app("authentik_outposts")
    def test_ldap_bind_search(self):
        """Test simple bind + search"""
        # Remove akadmin to ensure list is correct
        # Remove user before starting container so it's not cached
        User.objects.filter(username="akadmin").delete()
    
>       outpost = self._prepare()

tests/e2e/test_provider_ldap.py:183: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_search>

    def _prepare(self) -> User:
        """prepare user, provider, app and container"""
        self.user.attributes["extraAttribute"] = "bar"
        self.user.save()
    
        ldap: LDAPProvider = LDAPProvider.objects.create(
            name=generate_id(),
            authorization_flow=Flow.objects.get(slug="default-authentication-flow"),
            search_mode=APIAccessMode.CACHED,
        )
        assign_perm("search_full_directory", self.user, ldap)
        # we need to create an application to actually access the ldap
        Application.objects.create(name=generate_id(), slug=generate_id(), provider=ldap)
        outpost: Outpost = Outpost.objects.create(
            name=generate_id(),
            type=OutpostType.LDAP,
            _config=asdict(OutpostConfig(log_level="debug")),
        )
        outpost.providers.add(ldap)
    
>       self.start_ldap(outpost)

tests/e2e/test_provider_ldap.py:57: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_search>
outpost = <Outpost: Outpost LOhZHUzkxAeXahcoKk2LQYcQ156kXzMtH9mhrC4E>

    def start_ldap(self, outpost: Outpost):
        """Start ldap container based on outpost created"""
>       self.run_container(
            image=self.get_container_image("ghcr.io/goauthentik/dev-ldap"),
            ports={
                "3389": "3389",
                "6636": "6636",
            },
            environment={
                "AUTHENTIK_TOKEN": outpost.token.key,
            },
        )

tests/e2e/test_provider_ldap.py:26: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_search>
specs = {'detach': True, 'environment': {'AUTHENTIK_HOST': 'http://10.1.0.20:54051', 'AUTHENTIK_TOKEN': 'wBc3e6uXwg3PvdGB3oCdu...r.io/goauthentik/dev-ldap:gh-main', 'labels': {'io.goauthentik.test': 'LE3LhswMqkI17BpS2aP9TpMii7Xgod8TGJSFPBbr'}, ...}
container = <Container: 3faffa413aa4>
state = {'Dead': False, 'Error': '', 'ExitCode': 0, 'FinishedAt': '0001-01-01T00:00:00Z', ...}

    def run_container(self, **specs: dict[str, Any]) -> Container:
        if "network_mode" not in specs:
            specs["network"] = self.__network.name
        specs["labels"] = self.docker_labels
        specs["detach"] = True
        if hasattr(self, "live_server_url"):
            specs.setdefault("environment", {})
            specs["environment"]["AUTHENTIK_HOST"] = self.live_server_url
        container = self.docker_client.containers.run(**specs)
        container.reload()
        state = container.attrs.get("State", {})
        if "Health" not in state:
            return container
>       self.wait_for_container(container)

tests/e2e/utils.py:117: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_search>
container = <Container: 3faffa413aa4>

    def wait_for_container(self, container: Container):
        """Check that container is health"""
        attempt = 0
        while True:
            container.reload()
            status = container.attrs.get("State", {}).get("Health", {}).get("Status")
            if status == "healthy":
                return container
            sleep(1)
            attempt += 1
            if attempt >= self.max_healthcheck_attempts:
>               raise self.failureException("Container failed to start")
E               AssertionError: Container failed to start

tests/e2e/utils.py:91: AssertionError
tests.e2e.test_provider_proxy_forward.TestProviderProxyForward::test_caddy
Stack Traces | 133s run time
self = <unittest.case._Outcome object at 0x7f3506f7e7b0>
test_case = <tests.e2e.test_provider_proxy_forward.TestProviderProxyForward testMethod=test_caddy>
subTest = False

    @contextlib.contextmanager
    def testPartExecutor(self, test_case, subTest=False):
        old_success = self.success
        self.success = True
        try:
>           yield

.../hostedtoolcache/Python/3.13.11........./x64/lib/python3.13/unittest/case.py:58: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_proxy_forward.TestProviderProxyForward testMethod=test_caddy>
result = <TestCaseFunction test_caddy>

    def run(self, result=None):
        if result is None:
            result = self.defaultTestResult()
            startTestRun = getattr(result, 'startTestRun', None)
            stopTestRun = getattr(result, 'stopTestRun', None)
            if startTestRun is not None:
                startTestRun()
        else:
            stopTestRun = None
    
        result.startTest(self)
        try:
            testMethod = getattr(self, self._testMethodName)
            if (getattr(self.__class__, "__unittest_skip__", False) or
                getattr(testMethod, "__unittest_skip__", False)):
                # If the class or method was skipped.
                skip_why = (getattr(self.__class__, '__unittest_skip_why__', '')
                            or getattr(testMethod, '__unittest_skip_why__', ''))
                _addSkip(result, self, skip_why)
                return result
    
            expecting_failure = (
                getattr(self, "__unittest_expecting_failure__", False) or
                getattr(testMethod, "__unittest_expecting_failure__", False)
            )
            outcome = _Outcome(result)
            start_time = time.perf_counter()
            try:
                self._outcome = outcome
    
                with outcome.testPartExecutor(self):
                    self._callSetUp()
                if outcome.success:
                    outcome.expecting_failure = expecting_failure
                    with outcome.testPartExecutor(self):
>                       self._callTestMethod(testMethod)

.../hostedtoolcache/Python/3.13.11........./x64/lib/python3.13/unittest/case.py:651: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_proxy_forward.TestProviderProxyForward testMethod=test_caddy>
method = <bound method TestProviderProxyForward.test_caddy of <tests.e2e.test_provider_proxy_forward.TestProviderProxyForward testMethod=test_caddy>>

    def _callTestMethod(self, method):
>       if method() is not None:

.../hostedtoolcache/Python/3.13.11........./x64/lib/python3.13/unittest/case.py:606: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_proxy_forward.TestProviderProxyForward testMethod=test_caddy>
args = (), kwargs = {}

    @wraps(func)
    def wrapper(self: TransactionTestCase, *args, **kwargs):
        """Run test again if we're below max_retries, including tearDown and
        setUp. Otherwise raise the error"""
        nonlocal count
        try:
>           return func(self, *args, **kwargs)

tests/e2e/utils.py:324: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_proxy_forward.TestProviderProxyForward testMethod=test_caddy>

    @retry()
    def test_caddy(self):
        """Test caddy"""
        local_config_path = (
            Path(__file__).parent / "proxy_forward_auth" / "caddy_single" / "Caddyfile"
        )
        self.run_container(
            image="docker.io/library/caddy:2.8",
            ports={
                "80": "80",
            },
            volumes={
                local_config_path: {
                    "bind": ".../etc/caddy/Caddyfile",
                }
            },
        )
    
>       self.prepare()

tests/e2e/test_provider_proxy_forward.py:205: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

args = (<tests.e2e.test_provider_proxy_forward.TestProviderProxyForward testMethod=test_caddy>,)
kwargs = {}, file = 'default/flow-default-invalidation-flow.yaml'
content = 'version: 1\nmetadata:\n  name: Default - Invalidation flow\nentries:\n- attrs:\n    designation: invalidation\n    na...0\n    stage: !KeyOf default-invalidation-logout\n    target: !KeyOf flow\n  model: authentik_flows.flowstagebinding\n'

    @wraps(func)
    def wrapper(*args, **kwargs):
        for file in files:
            content = BlueprintInstance(path=file).retrieve()
            Importer.from_string(content).apply()
>       return func(*args, **kwargs)

.../blueprints/tests/__init__.py:25: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

args = (<tests.e2e.test_provider_proxy_forward.TestProviderProxyForward testMethod=test_caddy>,)
kwargs = {}, file = 'default/flow-default-provider-invalidation.yaml'
content = 'version: 1\nmetadata:\n  name: Default - Provider invalidation flow\nentries:\n- attrs:\n    designation: invalidatio...ation: none\n  identifiers:\n    slug: default-provider-invalidation-flow\n  model: authentik_flows.flow\n  id: flow\n'

    @wraps(func)
    def wrapper(*args, **kwargs):
        for file in files:
            content = BlueprintInstance(path=file).retrieve()
            Importer.from_string(content).apply()
>       return func(*args, **kwargs)

.../blueprints/tests/__init__.py:25: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

args = (<tests.e2e.test_provider_proxy_forward.TestProviderProxyForward testMethod=test_caddy>,)
kwargs = {}, file = 'system/providers-proxy.yaml'
content = 'version: 1\nmetadata:\n  labels:\n    blueprints.goauthentik.io/system: "true"\n  name: System - Proxy Provider - Sco...ser.group_attributes(request),\n                "is_superuser": request.user.is_superuser,\n            }\n        }\n'

    @wraps(func)
    def wrapper(*args, **kwargs):
        for file in files:
            content = BlueprintInstance(path=file).retrieve()
            Importer.from_string(content).apply()
>       return func(*args, **kwargs)

.../blueprints/tests/__init__.py:25: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

args = (<tests.e2e.test_provider_proxy_forward.TestProviderProxyForward testMethod=test_caddy>,)
kwargs = {}, config = <AuthentikCryptoConfig: authentik_crypto>

    @wraps(func)
    def wrapper(*args, **kwargs):
        config = apps.get_app_config(app_name)
        if isinstance(config, ManagedAppConfig):
            config._on_startup_callback(None)
>       return func(*args, **kwargs)

.../blueprints/tests/__init__.py:43: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_proxy_forward.TestProviderProxyForward testMethod=test_caddy>

    @apply_blueprint(
        "default/flow-default-authentication-flow.yaml",
        "default/flow-default-invalidation-flow.yaml",
    )
    @apply_blueprint(
        "default/flow-default-provider-authorization-implicit-consent.yaml",
        "default/flow-default-provider-invalidation.yaml",
    )
    @apply_blueprint(
        "system/providers-oauth2.yaml",
        "system/providers-proxy.yaml",
    )
    @reconcile_app("authentik_crypto")
    def prepare(self):
        proxy: ProxyProvider = ProxyProvider.objects.create(
            name=generate_id(),
            mode=ProxyMode.FORWARD_SINGLE,
            authorization_flow=Flow.objects.get(
                slug="default-provider-authorization-implicit-consent"
            ),
            invalidation_flow=Flow.objects.get(slug="default-provider-invalidation-flow"),
            internal_host=f"http://{self.host}",
            external_host="http://localhost",
        )
        # Ensure OAuth2 Params are set
        proxy.set_oauth_defaults()
        proxy.save()
        # we need to create an application to actually access the proxy
        Application.objects.create(name=generate_id(), slug=generate_id(), provider=proxy)
        outpost: Outpost = Outpost.objects.create(
            name=generate_id(),
            type=OutpostType.PROXY,
        )
        outpost.providers.add(proxy)
        outpost.build_user_permissions(outpost.user)
    
>       self.start_outpost(outpost)

tests/e2e/test_provider_proxy_forward.py:78: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_proxy_forward.TestProviderProxyForward testMethod=test_caddy>
outpost = <Outpost: Outpost 2lD0Hiu1FqyApI0kBRthL8diPl7lAIJgBn3gkPn1>

    def start_outpost(self, outpost: Outpost):
        """Start proxy container based on outpost created"""
>       self.run_container(
            image=self.get_container_image("ghcr.io/goauthentik/dev-proxy"),
            ports={
                "9000": "9000",
            },
            environment={
                "AUTHENTIK_TOKEN": outpost.token.key,
            },
            name="ak-test-outpost",
        )

tests/e2e/test_provider_proxy_forward.py:31: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_proxy_forward.TestProviderProxyForward testMethod=test_caddy>
specs = {'detach': True, 'environment': {'AUTHENTIK_HOST': 'http://10.1.0.141:53693', 'AUTHENTIK_TOKEN': 'obqUIpChHWm9r3nhEyCK....io/goauthentik/dev-proxy:gh-main', 'labels': {'io.goauthentik.test': 'O4IKbDKwLlcrabll9X82dju5gLdPHqHOUgqmNn01'}, ...}
container = <Container: a3f59d6e6607>
state = {'Dead': False, 'Error': '', 'ExitCode': 0, 'FinishedAt': '0001-01-01T00:00:00Z', ...}

    def run_container(self, **specs: dict[str, Any]) -> Container:
        if "network_mode" not in specs:
            specs["network"] = self.__network.name
        specs["labels"] = self.docker_labels
        specs["detach"] = True
        if hasattr(self, "live_server_url"):
            specs.setdefault("environment", {})
            specs["environment"]["AUTHENTIK_HOST"] = self.live_server_url
        container = self.docker_client.containers.run(**specs)
        container.reload()
        state = container.attrs.get("State", {})
        if "Health" not in state:
            return container
>       self.wait_for_container(container)

tests/e2e/utils.py:117: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_proxy_forward.TestProviderProxyForward testMethod=test_caddy>
container = <Container: a3f59d6e6607>

    def wait_for_container(self, container: Container):
        """Check that container is health"""
        attempt = 0
        while True:
            container.reload()
            status = container.attrs.get("State", {}).get("Health", {}).get("Status")
            if status == "healthy":
                return container
            sleep(1)
            attempt += 1
            if attempt >= self.max_healthcheck_attempts:
>               raise self.failureException("Container failed to start")
E               AssertionError: Container failed to start

tests/e2e/utils.py:91: AssertionError

To view more test analytics, go to the Test Analytics Dashboard
📋 Got 3 mins? Take this short survey to help us improve Test Analytics.

@dominic-r dominic-r enabled auto-merge (squash) January 26, 2026 19:54
@BeryJu BeryJu disabled auto-merge January 27, 2026 16:46
@BeryJu BeryJu merged commit aafb7cb into version-2025.10 Jan 27, 2026
140 of 157 checks passed
@BeryJu BeryJu deleted the cherry-pick/19735-to-version-2025.10 branch January 27, 2026 16:46
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants