Skip to content

endpoints/devices: cleanup (cherry-pick #19047 to version-2025.12)#19057

Merged
BeryJu merged 2 commits intoversion-2025.12from
cherry-pick/19047-to-version-2025.12
Dec 26, 2025
Merged

endpoints/devices: cleanup (cherry-pick #19047 to version-2025.12)#19057
BeryJu merged 2 commits intoversion-2025.12from
cherry-pick/19047-to-version-2025.12

Conversation

@authentik-automation
Copy link
Contributor

Cherry-pick of #19047 to version-2025.12 branch.

Original PR: #19047
Original Author: @BeryJu
Cherry-picked commit: 448c8f8

* endpoints: make device token internally managed

Signed-off-by: Jens Langhammer <jens@goauthentik.io>

* fix text and defaults for agent

Signed-off-by: Jens Langhammer <jens@goauthentik.io>

* re-org some code

Signed-off-by: Jens Langhammer <jens@goauthentik.io>

---------

Signed-off-by: Jens Langhammer <jens@goauthentik.io>
@netlify
Copy link

netlify bot commented Dec 26, 2025

Deploy Preview for authentik-integrations ready!

Name Link
🔨 Latest commit 2d54e87
🔍 Latest deploy log https://app.netlify.com/projects/authentik-integrations/deploys/694e8bbd6d871000084eeaba
😎 Deploy Preview https://deploy-preview-19057--authentik-integrations.netlify.app
📱 Preview on mobile
Toggle QR Code...

QR Code

Use your smartphone camera to open QR code link.

To edit notification comments on pull requests, go to your Netlify project configuration.

@codecov
Copy link

codecov bot commented Dec 26, 2025

❌ 3 Tests Failed:

Tests completed Failed Passed Skipped
2859 3 2856 2
View the top 3 failed test(s) by shortest run time
authentik.blueprints.tests.test_v1_tasks.TestBlueprintsV1Tasks::test_valid_disabled
Stack Traces | 2.44s run time
self = <django.db.backends.utils.CursorWrapper object at 0x7f17c23cd670>
sql = 'INSERT INTO "authentik_tasks_tasklog" ("id", "task_id", "event", "log_level", "logger", "timestamp", "attributes", "previous") VALUES (%s, %s, %s, %s, %s, %s, %s, %s)'
params = (UUID('1ec5f01b-e024-43d4-8c5c-3bffea253a43'), UUID('be14f74d-edd1-41b5-aee6-6a4f21e6ff0a'), 'Blueprint RgVtDHgQPGlONZ...lonzgjrupxqmoypayond2lskuwzjdk', datetime.datetime(2025, 12, 26, 14, 4, 58, 901886, tzinfo=datetime.timezone.utc), ...)
ignored_wrapper_args = (False, {'connection': <DatabaseWrapper vendor='postgresql' alias='default'>, 'cursor': <django.db.backends.utils.CursorWrapper object at 0x7f17c23cd670>})

    def _execute(self, sql, params, *ignored_wrapper_args):
        # Raise a warning during app initialization (stored_app_configs is only
        # ever set during testing).
        if not apps.ready and not apps.stored_app_configs:
            warnings.warn(self.APPS_NOT_READY_WARNING_MSG, category=RuntimeWarning)
        self.db.validate_no_broken_transaction()
        with self.db.wrap_database_errors:
            if params is None:
                # params default might be backend specific.
                return self.cursor.execute(sql)
            else:
>               return self.cursor.execute(sql, params)

.venv/lib/python3.13.../db/backends/utils.py:105: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <django_prometheus.db.common.ExportingCursorWrapper.<locals>.CursorWrapper [closed] [BAD] at 0x7f17c25dd0d0>
args = ('INSERT INTO "authentik_tasks_tasklog" ("id", "task_id", "event", "log_level", "logger", "timestamp", "attributes", "...onzgjrupxqmoypayond2lskuwzjdk', datetime.datetime(2025, 12, 26, 14, 4, 58, 901886, tzinfo=datetime.timezone.utc), ...))
kwargs = {}

    def execute(self, *args, **kwargs):
        execute_total.labels(alias, vendor).inc()
        with (
            query_duration_seconds.labels(**labels).time(),
            ExceptionCounterByType(errors_total, extra_labels=labels),
        ):
>           return super().execute(*args, **kwargs)

.venv/lib/python3.13.../django_prometheus/db/common.py:69: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <django_prometheus.db.common.ExportingCursorWrapper.<locals>.CursorWrapper [closed] [BAD] at 0x7f17c25dd0d0>
query = 'INSERT INTO "authentik_tasks_tasklog" ("id", "task_id", "event", "log_level", "logger", "timestamp", "attributes", "previous") VALUES (%s, %s, %s, %s, %s, %s, %s, %s)'
params = (UUID('1ec5f01b-e024-43d4-8c5c-3bffea253a43'), UUID('be14f74d-edd1-41b5-aee6-6a4f21e6ff0a'), 'Blueprint RgVtDHgQPGlONZ...lonzgjrupxqmoypayond2lskuwzjdk', datetime.datetime(2025, 12, 26, 14, 4, 58, 901886, tzinfo=datetime.timezone.utc), ...)

    def execute(
        self,
        query: Query,
        params: Params | None = None,
        *,
        prepare: bool | None = None,
        binary: bool | None = None,
    ) -> Self:
        """
        Execute a query or command to the database.
        """
        try:
            with self._conn.lock:
                self._conn.wait(
                    self._execute_gen(query, params, prepare=prepare, binary=binary)
                )
        except e._NO_TRACEBACK as ex:
>           raise ex.with_traceback(None)
E           psycopg.errors.ForeignKeyViolation: insert or update on table "authentik_tasks_tasklog" violates foreign key constraint "authentik_tasks_task_task_id_a82f0835_fk_authentik"
E           DETAIL:  Key (task_id)=(be14f74d-edd1-41b5-aee6-6a4f21e6ff0a) is not present in table "authentik_tasks_task".

.venv/lib/python3.13............/site-packages/psycopg/cursor.py:97: ForeignKeyViolation

The above exception was the direct cause of the following exception:

instance_pk = UUID('04a75ace-2a66-421a-9eaf-425976ac0219')

    @actor(description=_("Apply single blueprint."))
    def apply_blueprint(instance_pk: UUID):
        try:
            self = CurrentTask.get_task()
        except CurrentTaskNotFound:
            self = Task()
        self.set_uid(str(instance_pk))
        instance: BlueprintInstance | None = None
        try:
            instance: BlueprintInstance = BlueprintInstance.objects.filter(pk=instance_pk).first()
            if not instance:
                self.warning(f"Could not find blueprint {instance_pk}, skipping")
                return
            self.set_uid(slugify(instance.name))
            if not instance.enabled:
>               self.info(f"Blueprint {instance.name} is disabled, skipping")

.../blueprints/v1/tasks.py:207: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <Task: be14f74d-edd1-41b5-aee6-6a4f21e6ff0a>
message = 'Blueprint RgVtDHgQPGlONZGJruPxQMOypAYoND2lsKUwZJdk is disabled, skipping'
attributes = {}

    def info(self, message: str | Exception, **attributes) -> None:
>       self.log(self.uid, TaskStatus.INFO, message, **attributes)

authentik/tasks/models.py:138: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <Task: be14f74d-edd1-41b5-aee6-6a4f21e6ff0a>
logger = 'authentik.events.tasks.event_trigger_dispatch:rgvtdhgqpglonzgjrupxqmoypayond2lskuwzjdk'
log_level = TaskStatus.INFO
message = 'Blueprint RgVtDHgQPGlONZGJruPxQMOypAYoND2lsKUwZJdk is disabled, skipping'
attributes = {}

    def log(
        self,
        logger: str,
        log_level: TaskStatus,
        message: str | Exception,
        **attributes,
    ) -> None:
>       TaskLog.create_from_log_event(
            self,
            self._make_log(
                logger,
                log_level,
                message,
                **attributes,
            ),
        )

authentik/tasks/models.py:127: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

cls = <class 'authentik.tasks.models.TaskLog'>
task = <Task: be14f74d-edd1-41b5-aee6-6a4f21e6ff0a>
log_event = LogEvent(event='Blueprint RgVtDHgQPGlONZGJruPxQMOypAYoND2lsKUwZJdk is disabled, skipping', log_level='info', logger='a...2lskuwzjdk', timestamp=datetime.datetime(2025, 12, 26, 14, 4, 58, 901886, tzinfo=datetime.timezone.utc), attributes={})

    @classmethod
    def create_from_log_event(cls, task: Task, log_event: LogEvent) -> Self | None:
        if not task.message:
            return None
>       return cls.objects.create(
            task=task,
            event=log_event.event,
            log_level=log_event.log_level,
            logger=log_event.logger,
            timestamp=log_event.timestamp,
            attributes=sanitize_item(log_event.attributes),
        )

authentik/tasks/models.py:172: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <django.db.models.manager.Manager object at 0x7f17c57c60d0>, args = ()
kwargs = {'attributes': {}, 'event': 'Blueprint RgVtDHgQPGlONZGJruPxQMOypAYoND2lsKUwZJdk is disabled, skipping', 'log_level': 'info', 'logger': 'authentik.events.tasks.event_trigger_dispatch:rgvtdhgqpglonzgjrupxqmoypayond2lskuwzjdk', ...}

    @wraps(method)
    def manager_method(self, *args, **kwargs):
>       return getattr(self.get_queryset(), name)(*args, **kwargs)

.venv/lib/python3.13.../db/models/manager.py:87: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <QuerySet []>
kwargs = {'attributes': {}, 'event': 'Blueprint RgVtDHgQPGlONZGJruPxQMOypAYoND2lsKUwZJdk is disabled, skipping', 'log_level': 'info', 'logger': 'authentik.events.tasks.event_trigger_dispatch:rgvtdhgqpglonzgjrupxqmoypayond2lskuwzjdk', ...}
reverse_one_to_one_fields = frozenset()
obj = <TaskLog: 1ec5f01b-e024-43d4-8c5c-3bffea253a43>

    def create(self, **kwargs):
        """
        Create a new object with the given kwargs, saving it to the database
        and returning the created object.
        """
        reverse_one_to_one_fields = frozenset(kwargs).intersection(
            self.model._meta._reverse_one_to_one_field_names
        )
        if reverse_one_to_one_fields:
            raise ValueError(
                "The following fields do not exist in this model: %s"
                % ", ".join(reverse_one_to_one_fields)
            )
    
        obj = self.model(**kwargs)
        self._for_write = True
>       obj.save(force_insert=True, using=self.db)

.venv/lib/python3.13.../db/models/query.py:665: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <TaskLog: 1ec5f01b-e024-43d4-8c5c-3bffea253a43>, force_insert = True
force_update = False, using = 'default', update_fields = None

    def save(
        self,
        *args,
        force_insert=False,
        force_update=False,
        using=None,
        update_fields=None,
    ):
        """
        Save the current instance. Override this in a subclass if you want to
        control the saving process.
    
        The 'force_insert' and 'force_update' parameters can be used to insist
        that the "save" must be an SQL insert or update (or equivalent for
        non-SQL backends), respectively. Normally, they should not be set.
        """
        # RemovedInDjango60Warning.
        if args:
            force_insert, force_update, using, update_fields = self._parse_save_params(
                *args,
                method_name="save",
                force_insert=force_insert,
                force_update=force_update,
                using=using,
                update_fields=update_fields,
            )
    
        self._prepare_related_fields_for_save(operation_name="save")
    
        using = using or router.db_for_write(self.__class__, instance=self)
        if force_insert and (force_update or update_fields):
            raise ValueError("Cannot force both insert and updating in model saving.")
    
        deferred_non_generated_fields = {
            f.attname
            for f in self._meta.concrete_fields
            if f.attname not in self.__dict__ and f.generated is False
        }
        if update_fields is not None:
            # If update_fields is empty, skip the save. We do also check for
            # no-op saves later on for inheritance cases. This bailout is
            # still needed for skipping signal sending.
            if not update_fields:
                return
    
            update_fields = frozenset(update_fields)
            field_names = self._meta._non_pk_concrete_field_names
            not_updatable_fields = update_fields.difference(field_names)
    
            if not_updatable_fields:
                raise ValueError(
                    "The following fields do not exist in this model, are m2m "
                    "fields, primary keys, or are non-concrete fields: %s"
                    % ", ".join(not_updatable_fields)
                )
    
        # If saving to the same database, and this model is deferred, then
        # automatically do an "update_fields" save on the loaded fields.
        elif (
            not force_insert
            and deferred_non_generated_fields
            and using == self._state.db
        ):
            field_names = set()
            pk_fields = self._meta.pk_fields
            for field in self._meta.concrete_fields:
                if field not in pk_fields and not hasattr(field, "through"):
                    field_names.add(field.attname)
            loaded_fields = field_names.difference(deferred_non_generated_fields)
            if loaded_fields:
                update_fields = frozenset(loaded_fields)
    
>       self.save_base(
            using=using,
            force_insert=force_insert,
            force_update=force_update,
            update_fields=update_fields,
        )

.venv/lib/python3.13.../db/models/base.py:902: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <TaskLog: 1ec5f01b-e024-43d4-8c5c-3bffea253a43>, raw = False
force_insert = (<class 'authentik.tasks.models.TaskLog'>,), force_update = False
using = 'default', update_fields = None

    def save_base(
        self,
        raw=False,
        force_insert=False,
        force_update=False,
        using=None,
        update_fields=None,
    ):
        """
        Handle the parts of saving which should be done only once per save,
        yet need to be done in raw saves, too. This includes some sanity
        checks and signal sending.
    
        The 'raw' argument is telling save_base not to save any parent
        models and not to do any changes to the values before save. This
        is used by fixture loading.
        """
        using = using or router.db_for_write(self.__class__, instance=self)
        assert not (force_insert and (force_update or update_fields))
        assert update_fields is None or update_fields
        cls = origin = self.__class__
        # Skip proxies, but keep the origin as the proxy model.
        if cls._meta.proxy:
            cls = cls._meta.concrete_model
        meta = cls._meta
        if not meta.auto_created:
            pre_save.send(
                sender=origin,
                instance=self,
                raw=raw,
                using=using,
                update_fields=update_fields,
            )
        # A transaction isn't needed if one query is issued.
        if meta.parents:
            context_manager = transaction.atomic(using=using, savepoint=False)
        else:
            context_manager = transaction.mark_for_rollback_on_error(using=using)
        with context_manager:
            parent_inserted = False
            if not raw:
                # Validate force insert only when parents are inserted.
                force_insert = self._validate_force_insert(force_insert)
                parent_inserted = self._save_parents(
                    cls, using, update_fields, force_insert
                )
>           updated = self._save_table(
                raw,
                cls,
                force_insert or parent_inserted,
                force_update,
                using,
                update_fields,
            )

.venv/lib/python3.13.../db/models/base.py:1008: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <TaskLog: 1ec5f01b-e024-43d4-8c5c-3bffea253a43>, raw = False
cls = <class 'authentik.tasks.models.TaskLog'>
force_insert = (<class 'authentik.tasks.models.TaskLog'>,), force_update = False
using = 'default', update_fields = None

    def _save_table(
        self,
        raw=False,
        cls=None,
        force_insert=False,
        force_update=False,
        using=None,
        update_fields=None,
    ):
        """
        Do the heavy-lifting involved in saving. Update or insert the data
        for a single table.
        """
        meta = cls._meta
        pk_fields = meta.pk_fields
        non_pks_non_generated = [
            f
            for f in meta.local_concrete_fields
            if f not in pk_fields and not f.generated
        ]
    
        if update_fields:
            non_pks_non_generated = [
                f
                for f in non_pks_non_generated
                if f.name in update_fields or f.attname in update_fields
            ]
    
        if not self._is_pk_set(meta):
            pk_val = meta.pk.get_pk_value_on_save(self)
            setattr(self, meta.pk.attname, pk_val)
        pk_set = self._is_pk_set(meta)
        if not pk_set and (force_update or update_fields):
            raise ValueError("Cannot force an update in save() with no primary key.")
        updated = False
        # Skip an UPDATE when adding an instance and primary key has a default.
        if (
            not raw
            and not force_insert
            and not force_update
            and self._state.adding
            and all(f.has_default() or f.has_db_default() for f in meta.pk_fields)
        ):
            force_insert = True
        # If possible, try an UPDATE. If that doesn't update anything, do an INSERT.
        if pk_set and not force_insert:
            base_qs = cls._base_manager.using(using)
            values = [
                (
                    f,
                    None,
                    (getattr(self, f.attname) if raw else f.pre_save(self, False)),
                )
                for f in non_pks_non_generated
            ]
            forced_update = update_fields or force_update
            pk_val = self._get_pk_val(meta)
            updated = self._do_update(
                base_qs, using, pk_val, values, update_fields, forced_update
            )
            if force_update and not updated:
                raise DatabaseError("Forced update did not affect any rows.")
            if update_fields and not updated:
                raise DatabaseError("Save with update_fields did not affect any rows.")
        if not updated:
            if meta.order_with_respect_to:
                # If this is a model with an order_with_respect_to
                # autopopulate the _order field
                field = meta.order_with_respect_to
                filter_args = field.get_filter_kwargs_for_object(self)
                self._order = (
                    cls._base_manager.using(using)
                    .filter(**filter_args)
                    .aggregate(
                        _order__max=Coalesce(
                            ExpressionWrapper(
                                Max("_order") + Value(1), output_field=IntegerField()
                            ),
                            Value(0),
                        ),
                    )["_order__max"]
                )
            fields = [
                f
                for f in meta.local_concrete_fields
                if not f.generated and (pk_set or f is not meta.auto_field)
            ]
            returning_fields = meta.db_returning_fields
>           results = self._do_insert(
                cls._base_manager, using, fields, returning_fields, raw
            )

.venv/lib/python3.13.../db/models/base.py:1169: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <TaskLog: 1ec5f01b-e024-43d4-8c5c-3bffea253a43>
manager = <django.db.models.manager.Manager object at 0x7f17c57c4bd0>
using = 'default'
fields = [<django.db.models.fields.UUIDField: id>, <django.db.models.fields.related.ForeignKey: task>, <django.db.models.fields...ield: log_level>, <django.db.models.fields.TextField: logger>, <django.db.models.fields.DateTimeField: timestamp>, ...]
returning_fields = [], raw = False

    def _do_insert(self, manager, using, fields, returning_fields, raw):
        """
        Do an INSERT. If returning_fields is defined then this method should
        return the newly created data for the model.
        """
>       return manager._insert(
            [self],
            fields=fields,
            returning_fields=returning_fields,
            using=using,
            raw=raw,
        )

.venv/lib/python3.13.../db/models/base.py:1210: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <django.db.models.manager.Manager object at 0x7f17c57c4bd0>
args = ([<TaskLog: 1ec5f01b-e024-43d4-8c5c-3bffea253a43>],)
kwargs = {'fields': [<django.db.models.fields.UUIDField: id>, <django.db.models.fields.related.ForeignKey: task>, <django.db.mo...r>, <django.db.models.fields.DateTimeField: timestamp>, ...], 'raw': False, 'returning_fields': [], 'using': 'default'}

    @wraps(method)
    def manager_method(self, *args, **kwargs):
>       return getattr(self.get_queryset(), name)(*args, **kwargs)

.venv/lib/python3.13.../db/models/manager.py:87: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <QuerySet []>, objs = [<TaskLog: 1ec5f01b-e024-43d4-8c5c-3bffea253a43>]
fields = [<django.db.models.fields.UUIDField: id>, <django.db.models.fields.related.ForeignKey: task>, <django.db.models.fields...ield: log_level>, <django.db.models.fields.TextField: logger>, <django.db.models.fields.DateTimeField: timestamp>, ...]
returning_fields = [], raw = False, using = 'default', on_conflict = None
update_fields = None, unique_fields = None

    def _insert(
        self,
        objs,
        fields,
        returning_fields=None,
        raw=False,
        using=None,
        on_conflict=None,
        update_fields=None,
        unique_fields=None,
    ):
        """
        Insert a new record for the given model. This provides an interface to
        the InsertQuery class and is how Model.save() is implemented.
        """
        self._for_write = True
        if using is None:
            using = self.db
        query = sql.InsertQuery(
            self.model,
            on_conflict=on_conflict,
            update_fields=update_fields,
            unique_fields=unique_fields,
        )
        query.insert_values(fields, objs, raw=raw)
>       return query.get_compiler(using=using).execute_sql(returning_fields)

.venv/lib/python3.13.../db/models/query.py:1873: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <SQLInsertCompiler model=TaskLog connection=<DatabaseWrapper vendor='postgresql' alias='default'> using='default'>
returning_fields = []

    def execute_sql(self, returning_fields=None):
        assert not (
            returning_fields
            and len(self.query.objs) != 1
            and not self.connection.features.can_return_rows_from_bulk_insert
        )
        opts = self.query.get_meta()
        self.returning_fields = returning_fields
        cols = []
        with self.connection.cursor() as cursor:
            for sql, params in self.as_sql():
>               cursor.execute(sql, params)

.venv/lib/python3.13.../models/sql/compiler.py:1882: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

args = (<django.db.backends.utils.CursorWrapper object at 0x7f17c23cd670>, 'INSERT INTO "authentik_tasks_tasklog" ("id", "tas...onzgjrupxqmoypayond2lskuwzjdk', datetime.datetime(2025, 12, 26, 14, 4, 58, 901886, tzinfo=datetime.timezone.utc), ...))
kwargs = {}

    def runner(*args: "P.args", **kwargs: "P.kwargs"):
        # type: (...) -> R
        if sentry_sdk.get_client().get_integration(integration) is None:
            return original_function(*args, **kwargs)
    
>       return sentry_patched_function(*args, **kwargs)

.venv/lib/python3.13....../site-packages/sentry_sdk/utils.py:1816: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <django.db.backends.utils.CursorWrapper object at 0x7f17c23cd670>
sql = 'INSERT INTO "authentik_tasks_tasklog" ("id", "task_id", "event", "log_level", "logger", "timestamp", "attributes", "previous") VALUES (%s, %s, %s, %s, %s, %s, %s, %s)'
params = (UUID('1ec5f01b-e024-43d4-8c5c-3bffea253a43'), UUID('be14f74d-edd1-41b5-aee6-6a4f21e6ff0a'), 'Blueprint RgVtDHgQPGlONZ...lonzgjrupxqmoypayond2lskuwzjdk', datetime.datetime(2025, 12, 26, 14, 4, 58, 901886, tzinfo=datetime.timezone.utc), ...)

    @ensure_integration_enabled(DjangoIntegration, real_execute)
    def execute(self, sql, params=None):
        # type: (CursorWrapper, Any, Optional[Any]) -> Any
        with record_sql_queries(
            cursor=self.cursor,
            query=sql,
            params_list=params,
            paramstyle="format",
            executemany=False,
            span_origin=DjangoIntegration.origin_db,
        ) as span:
            _set_db_data(span, self)
>           result = real_execute(self, sql, params)

.venv/lib/python3.13.../integrations/django/__init__.py:651: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <django.db.backends.utils.CursorWrapper object at 0x7f17c23cd670>
sql = 'INSERT INTO "authentik_tasks_tasklog" ("id", "task_id", "event", "log_level", "logger", "timestamp", "attributes", "previous") VALUES (%s, %s, %s, %s, %s, %s, %s, %s)'
params = (UUID('1ec5f01b-e024-43d4-8c5c-3bffea253a43'), UUID('be14f74d-edd1-41b5-aee6-6a4f21e6ff0a'), 'Blueprint RgVtDHgQPGlONZ...lonzgjrupxqmoypayond2lskuwzjdk', datetime.datetime(2025, 12, 26, 14, 4, 58, 901886, tzinfo=datetime.timezone.utc), ...)

    def execute(self, sql, params=None):
>       return self._execute_with_wrappers(
            sql, params, many=False, executor=self._execute
        )

.venv/lib/python3.13.../db/backends/utils.py:79: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <django.db.backends.utils.CursorWrapper object at 0x7f17c23cd670>
sql = 'INSERT INTO "authentik_tasks_tasklog" ("id", "task_id", "event", "log_level", "logger", "timestamp", "attributes", "previous") VALUES (%s, %s, %s, %s, %s, %s, %s, %s)'
params = (UUID('1ec5f01b-e024-43d4-8c5c-3bffea253a43'), UUID('be14f74d-edd1-41b5-aee6-6a4f21e6ff0a'), 'Blueprint RgVtDHgQPGlONZ...lonzgjrupxqmoypayond2lskuwzjdk', datetime.datetime(2025, 12, 26, 14, 4, 58, 901886, tzinfo=datetime.timezone.utc), ...)
many = False
executor = <bound method CursorWrapper._execute of <django.db.backends.utils.CursorWrapper object at 0x7f17c23cd670>>

    def _execute_with_wrappers(self, sql, params, many, executor):
        context = {"connection": self.db, "cursor": self}
        for wrapper in reversed(self.db.execute_wrappers):
            executor = functools.partial(wrapper, executor)
>       return executor(sql, params, many, context)

.venv/lib/python3.13.../db/backends/utils.py:92: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <django.db.backends.utils.CursorWrapper object at 0x7f17c23cd670>
sql = 'INSERT INTO "authentik_tasks_tasklog" ("id", "task_id", "event", "log_level", "logger", "timestamp", "attributes", "previous") VALUES (%s, %s, %s, %s, %s, %s, %s, %s)'
params = (UUID('1ec5f01b-e024-43d4-8c5c-3bffea253a43'), UUID('be14f74d-edd1-41b5-aee6-6a4f21e6ff0a'), 'Blueprint RgVtDHgQPGlONZ...lonzgjrupxqmoypayond2lskuwzjdk', datetime.datetime(2025, 12, 26, 14, 4, 58, 901886, tzinfo=datetime.timezone.utc), ...)
ignored_wrapper_args = (False, {'connection': <DatabaseWrapper vendor='postgresql' alias='default'>, 'cursor': <django.db.backends.utils.CursorWrapper object at 0x7f17c23cd670>})

    def _execute(self, sql, params, *ignored_wrapper_args):
        # Raise a warning during app initialization (stored_app_configs is only
        # ever set during testing).
        if not apps.ready and not apps.stored_app_configs:
            warnings.warn(self.APPS_NOT_READY_WARNING_MSG, category=RuntimeWarning)
        self.db.validate_no_broken_transaction()
>       with self.db.wrap_database_errors:

.venv/lib/python3.13.../db/backends/utils.py:100: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <django.db.utils.DatabaseErrorWrapper object at 0x7f17da9167b0>
exc_type = <class 'psycopg.errors.ForeignKeyViolation'>
exc_value = ForeignKeyViolation('insert or update on table "authentik_tasks_tasklog" violates foreign key constraint "authentik_ta...entik"\nDETAIL:  Key (task_id)=(be14f74d-edd1-41b5-aee6-6a4f21e6ff0a) is not present in table "authentik_tasks_task".')
traceback = <traceback object at 0x7f17d4e9fb00>

    def __exit__(self, exc_type, exc_value, traceback):
        if exc_type is None:
            return
        for dj_exc_type in (
            DataError,
            OperationalError,
            IntegrityError,
            InternalError,
            ProgrammingError,
            NotSupportedError,
            DatabaseError,
            InterfaceError,
            Error,
        ):
            db_exc_type = getattr(self.wrapper.Database, dj_exc_type.__name__)
            if issubclass(exc_type, db_exc_type):
                dj_exc_value = dj_exc_type(*exc_value.args)
                # Only set the 'errors_occurred' flag for errors that may make
                # the connection unusable.
                if dj_exc_type not in (DataError, IntegrityError):
                    self.wrapper.errors_occurred = True
>               raise dj_exc_value.with_traceback(traceback) from exc_value

.venv/lib/python3.13.../django/db/utils.py:91: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <django.db.backends.utils.CursorWrapper object at 0x7f17c23cd670>
sql = 'INSERT INTO "authentik_tasks_tasklog" ("id", "task_id", "event", "log_level", "logger", "timestamp", "attributes", "previous") VALUES (%s, %s, %s, %s, %s, %s, %s, %s)'
params = (UUID('1ec5f01b-e024-43d4-8c5c-3bffea253a43'), UUID('be14f74d-edd1-41b5-aee6-6a4f21e6ff0a'), 'Blueprint RgVtDHgQPGlONZ...lonzgjrupxqmoypayond2lskuwzjdk', datetime.datetime(2025, 12, 26, 14, 4, 58, 901886, tzinfo=datetime.timezone.utc), ...)
ignored_wrapper_args = (False, {'connection': <DatabaseWrapper vendor='postgresql' alias='default'>, 'cursor': <django.db.backends.utils.CursorWrapper object at 0x7f17c23cd670>})

    def _execute(self, sql, params, *ignored_wrapper_args):
        # Raise a warning during app initialization (stored_app_configs is only
        # ever set during testing).
        if not apps.ready and not apps.stored_app_configs:
            warnings.warn(self.APPS_NOT_READY_WARNING_MSG, category=RuntimeWarning)
        self.db.validate_no_broken_transaction()
        with self.db.wrap_database_errors:
            if params is None:
                # params default might be backend specific.
                return self.cursor.execute(sql)
            else:
>               return self.cursor.execute(sql, params)

.venv/lib/python3.13.../db/backends/utils.py:105: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <django_prometheus.db.common.ExportingCursorWrapper.<locals>.CursorWrapper [closed] [BAD] at 0x7f17c25dd0d0>
args = ('INSERT INTO "authentik_tasks_tasklog" ("id", "task_id", "event", "log_level", "logger", "timestamp", "attributes", "...onzgjrupxqmoypayond2lskuwzjdk', datetime.datetime(2025, 12, 26, 14, 4, 58, 901886, tzinfo=datetime.timezone.utc), ...))
kwargs = {}

    def execute(self, *args, **kwargs):
        execute_total.labels(alias, vendor).inc()
        with (
            query_duration_seconds.labels(**labels).time(),
            ExceptionCounterByType(errors_total, extra_labels=labels),
        ):
>           return super().execute(*args, **kwargs)

.venv/lib/python3.13.../django_prometheus/db/common.py:69: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <django_prometheus.db.common.ExportingCursorWrapper.<locals>.CursorWrapper [closed] [BAD] at 0x7f17c25dd0d0>
query = 'INSERT INTO "authentik_tasks_tasklog" ("id", "task_id", "event", "log_level", "logger", "timestamp", "attributes", "previous") VALUES (%s, %s, %s, %s, %s, %s, %s, %s)'
params = (UUID('1ec5f01b-e024-43d4-8c5c-3bffea253a43'), UUID('be14f74d-edd1-41b5-aee6-6a4f21e6ff0a'), 'Blueprint RgVtDHgQPGlONZ...lonzgjrupxqmoypayond2lskuwzjdk', datetime.datetime(2025, 12, 26, 14, 4, 58, 901886, tzinfo=datetime.timezone.utc), ...)

    def execute(
        self,
        query: Query,
        params: Params | None = None,
        *,
        prepare: bool | None = None,
        binary: bool | None = None,
    ) -> Self:
        """
        Execute a query or command to the database.
        """
        try:
            with self._conn.lock:
                self._conn.wait(
                    self._execute_gen(query, params, prepare=prepare, binary=binary)
                )
        except e._NO_TRACEBACK as ex:
>           raise ex.with_traceback(None)
E           django.db.utils.IntegrityError: insert or update on table "authentik_tasks_tasklog" violates foreign key constraint "authentik_tasks_task_task_id_a82f0835_fk_authentik"
E           DETAIL:  Key (task_id)=(be14f74d-edd1-41b5-aee6-6a4f21e6ff0a) is not present in table "authentik_tasks_task".

.venv/lib/python3.13............/site-packages/psycopg/cursor.py:97: IntegrityError

During handling of the above exception, another exception occurred:

self = <django.db.backends.utils.CursorWrapper object at 0x7f17c1e9d850>
sql = 'INSERT INTO "authentik_tasks_tasklog" ("id", "task_id", "event", "log_level", "logger", "timestamp", "attributes", "previous") VALUES (%s, %s, %s, %s, %s, %s, %s, %s)'
params = (UUID('842d95d0-0151-4c86-bb6b-b2e9f8cbcc87'), UUID('be14f74d-edd1-41b5-aee6-6a4f21e6ff0a'), 'insert or update on tabl...lonzgjrupxqmoypayond2lskuwzjdk', datetime.datetime(2025, 12, 26, 14, 4, 58, 907313, tzinfo=datetime.timezone.utc), ...)
ignored_wrapper_args = (False, {'connection': <DatabaseWrapper vendor='postgresql' alias='default'>, 'cursor': <django.db.backends.utils.CursorWrapper object at 0x7f17c1e9d850>})

    def _execute(self, sql, params, *ignored_wrapper_args):
        # Raise a warning during app initialization (stored_app_configs is only
        # ever set during testing).
        if not apps.ready and not apps.stored_app_configs:
            warnings.warn(self.APPS_NOT_READY_WARNING_MSG, category=RuntimeWarning)
        self.db.validate_no_broken_transaction()
        with self.db.wrap_database_errors:
            if params is None:
                # params default might be backend specific.
                return self.cursor.execute(sql)
            else:
>               return self.cursor.execute(sql, params)

.venv/lib/python3.13.../db/backends/utils.py:105: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <django_prometheus.db.common.ExportingCursorWrapper.<locals>.CursorWrapper [closed] [BAD] at 0x7f17c26a6990>
args = ('INSERT INTO "authentik_tasks_tasklog" ("id", "task_id", "event", "log_level", "logger", "timestamp", "attributes", "...onzgjrupxqmoypayond2lskuwzjdk', datetime.datetime(2025, 12, 26, 14, 4, 58, 907313, tzinfo=datetime.timezone.utc), ...))
kwargs = {}

    def execute(self, *args, **kwargs):
        execute_total.labels(alias, vendor).inc()
        with (
            query_duration_seconds.labels(**labels).time(),
            ExceptionCounterByType(errors_total, extra_labels=labels),
        ):
>           return super().execute(*args, **kwargs)

.venv/lib/python3.13.../django_prometheus/db/common.py:69: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <django_prometheus.db.common.ExportingCursorWrapper.<locals>.CursorWrapper [closed] [BAD] at 0x7f17c26a6990>
query = 'INSERT INTO "authentik_tasks_tasklog" ("id", "task_id", "event", "log_level", "logger", "timestamp", "attributes", "previous") VALUES (%s, %s, %s, %s, %s, %s, %s, %s)'
params = (UUID('842d95d0-0151-4c86-bb6b-b2e9f8cbcc87'), UUID('be14f74d-edd1-41b5-aee6-6a4f21e6ff0a'), 'insert or update on tabl...lonzgjrupxqmoypayond2lskuwzjdk', datetime.datetime(2025, 12, 26, 14, 4, 58, 907313, tzinfo=datetime.timezone.utc), ...)

    def execute(
        self,
        query: Query,
        params: Params | None = None,
        *,
        prepare: bool | None = None,
        binary: bool | None = None,
    ) -> Self:
        """
        Execute a query or command to the database.
        """
        try:
            with self._conn.lock:
                self._conn.wait(
                    self._execute_gen(query, params, prepare=prepare, binary=binary)
                )
        except e._NO_TRACEBACK as ex:
>           raise ex.with_traceback(None)
E           psycopg.errors.ForeignKeyViolation: insert or update on table "authentik_tasks_tasklog" violates foreign key constraint "authentik_tasks_task_task_id_a82f0835_fk_authentik"
E           DETAIL:  Key (task_id)=(be14f74d-edd1-41b5-aee6-6a4f21e6ff0a) is not present in table "authentik_tasks_task".

.venv/lib/python3.13............/site-packages/psycopg/cursor.py:97: ForeignKeyViolation

The above exception was the direct cause of the following exception:

self = <unittest.case._Outcome object at 0x7f17c494b4d0>
test_case = <authentik.blueprints.tests.test_v1_tasks.TestBlueprintsV1Tasks testMethod=test_valid_disabled>
subTest = False

    @contextlib.contextmanager
    def testPartExecutor(self, test_case, subTest=False):
        old_success = self.success
        self.success = True
        try:
>           yield

.../hostedtoolcache/Python/3.13.11............/x64/lib/python3.13/unittest/case.py:58: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <authentik.blueprints.tests.test_v1_tasks.TestBlueprintsV1Tasks testMethod=test_valid_disabled>
result = <TestCaseFunction test_valid_disabled>

    def run(self, result=None):
        if result is None:
            result = self.defaultTestResult()
            startTestRun = getattr(result, 'startTestRun', None)
            stopTestRun = getattr(result, 'stopTestRun', None)
            if startTestRun is not None:
                startTestRun()
        else:
            stopTestRun = None
    
        result.startTest(self)
        try:
            testMethod = getattr(self, self._testMethodName)
            if (getattr(self.__class__, "__unittest_skip__", False) or
                getattr(testMethod, "__unittest_skip__", False)):
                # If the class or method was skipped.
                skip_why = (getattr(self.__class__, '__unittest_skip_why__', '')
                            or getattr(testMethod, '__unittest_skip_why__', ''))
                _addSkip(result, self, skip_why)
                return result
    
            expecting_failure = (
                getattr(self, "__unittest_expecting_failure__", False) or
                getattr(testMethod, "__unittest_expecting_failure__", False)
            )
            outcome = _Outcome(result)
            start_time = time.perf_counter()
            try:
                self._outcome = outcome
    
                with outcome.testPartExecutor(self):
                    self._callSetUp()
                if outcome.success:
                    outcome.expecting_failure = expecting_failure
                    with outcome.testPartExecutor(self):
>                       self._callTestMethod(testMethod)

.../hostedtoolcache/Python/3.13.11............/x64/lib/python3.13/unittest/case.py:651: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <authentik.blueprints.tests.test_v1_tasks.TestBlueprintsV1Tasks testMethod=test_valid_disabled>
method = <bound method TestBlueprintsV1Tasks.test_valid_disabled of <authentik.blueprints.tests.test_v1_tasks.TestBlueprintsV1Tasks testMethod=test_valid_disabled>>

    def _callTestMethod(self, method):
>       if method() is not None:

.../hostedtoolcache/Python/3.13.11............/x64/lib/python3.13/unittest/case.py:606: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

args = (<authentik.blueprints.tests.test_v1_tasks.TestBlueprintsV1Tasks testMethod=test_valid_disabled>,)
kwds = {}

    @wraps(func)
    def inner(*args, **kwds):
        with self._recreate_cm():
>           return func(*args, **kwds)

.../hostedtoolcache/Python/3.13.11............/x64/lib/python3.13/contextlib.py:85: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <authentik.blueprints.tests.test_v1_tasks.TestBlueprintsV1Tasks testMethod=test_valid_disabled>

    @CONFIG.patch("blueprints_dir", TMP)
    def test_valid_disabled(self):
        """Test valid file"""
        with NamedTemporaryFile(mode="w+", suffix=".yaml", dir=TMP) as file:
            file.write(
                dump(
                    {
                        "version": 1,
                        "entries": [],
                    }
                )
            )
            file.flush()
            instance: BlueprintInstance = BlueprintInstance.objects.create(
                name=generate_id(),
                path=file.name,
                enabled=False,
                status=BlueprintInstanceStatus.UNKNOWN,
            )
            instance.refresh_from_db()
            self.assertEqual(instance.last_applied_hash, "")
            self.assertEqual(
                instance.status,
                BlueprintInstanceStatus.UNKNOWN,
            )
>           apply_blueprint(instance.pk)

.../blueprints/tests/test_v1_tasks.py:152: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = Actor(<function apply_blueprint at 0x7f17da183ce0>, queue_name='default', actor_name='authentik.blueprints.v1.tasks.apply_blueprint')
args = (UUID('04a75ace-2a66-421a-9eaf-425976ac0219'),), kwargs = {}
start = 1462.453280796, delta = 0.014510248000078718

    def __call__(self, *args: P.args, **kwargs: P.kwargs) -> Any | R | Awaitable[R]:
        """Synchronously call this actor.
    
        Parameters:
          *args: Positional arguments to send to the actor.
          **kwargs: Keyword arguments to send to the actor.
    
        Returns:
          Whatever the underlying function backing this actor returns.
        """
        try:
            self.logger.debug("Received args=%r kwargs=%r.", args, kwargs)
            start = time.perf_counter()
>           return self.fn(*args, **kwargs)

.venv/lib/python3.13.../site-packages/dramatiq/actor.py:185: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

instance_pk = UUID('04a75ace-2a66-421a-9eaf-425976ac0219')

    @actor(description=_("Apply single blueprint."))
    def apply_blueprint(instance_pk: UUID):
        try:
            self = CurrentTask.get_task()
        except CurrentTaskNotFound:
            self = Task()
        self.set_uid(str(instance_pk))
        instance: BlueprintInstance | None = None
        try:
            instance: BlueprintInstance = BlueprintInstance.objects.filter(pk=instance_pk).first()
            if not instance:
                self.warning(f"Could not find blueprint {instance_pk}, skipping")
                return
            self.set_uid(slugify(instance.name))
            if not instance.enabled:
                self.info(f"Blueprint {instance.name} is disabled, skipping")
                return
            blueprint_content = instance.retrieve()
            file_hash = sha512(blueprint_content.encode()).hexdigest()
            importer = Importer.from_string(blueprint_content, instance.context)
            if importer.blueprint.metadata:
                instance.metadata = asdict(importer.blueprint.metadata)
            valid, logs = importer.validate()
            if not valid:
                instance.status = BlueprintInstanceStatus.ERROR
                instance.save()
                self.logs(logs)
                return
            with capture_logs() as logs:
                applied = importer.apply()
                if not applied:
                    instance.status = BlueprintInstanceStatus.ERROR
                    instance.save()
                    self.logs(logs)
                    return
            instance.status = BlueprintInstanceStatus.SUCCESSFUL
            instance.last_applied_hash = file_hash
            instance.last_applied = now()
        except (
            OSError,
            DatabaseError,
            ProgrammingError,
            InternalError,
            BlueprintRetrievalFailed,
            EntryInvalidError,
        ) as exc:
            if instance:
                instance.status = BlueprintInstanceStatus.ERROR
>           self.error(exc)

.../blueprints/v1/tasks.py:240: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <Task: be14f74d-edd1-41b5-aee6-6a4f21e6ff0a>
message = IntegrityError('insert or update on table "authentik_tasks_tasklog" violates foreign key constraint "authentik_tasks_t...entik"\nDETAIL:  Key (task_id)=(be14f74d-edd1-41b5-aee6-6a4f21e6ff0a) is not present in table "authentik_tasks_task".')
attributes = {}

    def error(self, message: str | Exception, **attributes) -> None:
>       self.log(self.uid, TaskStatus.ERROR, message, **attributes)

authentik/tasks/models.py:144: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <Task: be14f74d-edd1-41b5-aee6-6a4f21e6ff0a>
logger = 'authentik.events.tasks.event_trigger_dispatch:rgvtdhgqpglonzgjrupxqmoypayond2lskuwzjdk'
log_level = TaskStatus.ERROR
message = IntegrityError('insert or update on table "authentik_tasks_tasklog" violates foreign key constraint "authentik_tasks_t...entik"\nDETAIL:  Key (task_id)=(be14f74d-edd1-41b5-aee6-6a4f21e6ff0a) is not present in table "authentik_tasks_task".')
attributes = {}

    def log(
        self,
        logger: str,
        log_level: TaskStatus,
        message: str | Exception,
        **attributes,
    ) -> None:
>       TaskLog.create_from_log_event(
            self,
            self._make_log(
                logger,
                log_level,
                message,
                **attributes,
            ),
        )

authentik/tasks/models.py:127: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

cls = <class 'authentik.tasks.models.TaskLog'>
task = <Task: be14f74d-edd1-41b5-aee6-6a4f21e6ff0a>
log_event = LogEvent(event='insert or update on table "authentik_tasks_tasklog" violates foreign key constraint "authentik_tasks_t...python3.13............/site-packages/psycopg/cursor.py', 'lineno': 97, 'name': 'execute'}], 'is_group': False, 'exceptions': []}]})

    @classmethod
    def create_from_log_event(cls, task: Task, log_event: LogEvent) -> Self | None:
        if not task.message:
            return None
>       return cls.objects.create(
            task=task,
            event=log_event.event,
            log_level=log_event.log_level,
            logger=log_event.logger,
            timestamp=log_event.timestamp,
            attributes=sanitize_item(log_event.attributes),
        )

authentik/tasks/models.py:172: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <django.db.models.manager.Manager object at 0x7f17c57c60d0>, args = ()
kwargs = {'attributes': {'exception': [{'exc_notes': [], 'exc_type': 'IntegrityError', 'exc_value': 'insert or update on table ...vel': 'error', 'logger': 'authentik.events.tasks.event_trigger_dispatch:rgvtdhgqpglonzgjrupxqmoypayond2lskuwzjdk', ...}

    @wraps(method)
    def manager_method(self, *args, **kwargs):
>       return getattr(self.get_queryset(), name)(*args, **kwargs)

.venv/lib/python3.13.../db/models/manager.py:87: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <QuerySet []>
kwargs = {'attributes': {'exception': [{'exc_notes': [], 'exc_type': 'IntegrityError', 'exc_value': 'insert or update on table ...vel': 'error', 'logger': 'authentik.events.tasks.event_trigger_dispatch:rgvtdhgqpglonzgjrupxqmoypayond2lskuwzjdk', ...}
reverse_one_to_one_fields = frozenset()
obj = <TaskLog: 842d95d0-0151-4c86-bb6b-b2e9f8cbcc87>

    def create(self, **kwargs):
        """
        Create a new object with the given kwargs, saving it to the database
        and returning the created object.
        """
        reverse_one_to_one_fields = frozenset(kwargs).intersection(
            self.model._meta._reverse_one_to_one_field_names
        )
        if reverse_one_to_one_fields:
            raise ValueError(
                "The following fields do not exist in this model: %s"
                % ", ".join(reverse_one_to_one_fields)
            )
    
        obj = self.model(**kwargs)
        self._for_write = True
>       obj.save(force_insert=True, using=self.db)

.venv/lib/python3.13.../db/models/query.py:665: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <TaskLog: 842d95d0-0151-4c86-bb6b-b2e9f8cbcc87>, force_insert = True
force_update = False, using = 'default', update_fields = None

    def save(
        self,
        *args,
        force_insert=False,
        force_update=False,
        using=None,
        update_fields=None,
    ):
        """
        Save the current instance. Override this in a subclass if you want to
        control the saving process.
    
        The 'force_insert' and 'force_update' parameters can be used to insist
        that the "save" must be an SQL insert or update (or equivalent for
        non-SQL backends), respectively. Normally, they should not be set.
        """
        # RemovedInDjango60Warning.
        if args:
            force_insert, force_update, using, update_fields = self._parse_save_params(
                *args,
                method_name="save",
                force_insert=force_insert,
                force_update=force_update,
                using=using,
                update_fields=update_fields,
            )
    
        self._prepare_related_fields_for_save(operation_name="save")
    
        using = using or router.db_for_write(self.__class__, instance=self)
        if force_insert and (force_update or update_fields):
            raise ValueError("Cannot force both insert and updating in model saving.")
    
        deferred_non_generated_fields = {
            f.attname
            for f in self._meta.concrete_fields
            if f.attname not in self.__dict__ and f.generated is False
        }
        if update_fields is not None:
            # If update_fields is empty, skip the save. We do also check for
            # no-op saves later on for inheritance cases. This bailout is
            # still needed for skipping signal sending.
            if not update_fields:
                return
    
            update_fields = frozenset(update_fields)
            field_names = self._meta._non_pk_concrete_field_names
            not_updatable_fields = update_fields.difference(field_names)
    
            if not_updatable_fields:
                raise ValueError(
                    "The following fields do not exist in this model, are m2m "
                    "fields, primary keys, or are non-concrete fields: %s"
                    % ", ".join(not_updatable_fields)
                )
    
        # If saving to the same database, and this model is deferred, then
        # automatically do an "update_fields" save on the loaded fields.
        elif (
            not force_insert
            and deferred_non_generated_fields
            and using == self._state.db
        ):
            field_names = set()
            pk_fields = self._meta.pk_fields
            for field in self._meta.concrete_fields:
                if field not in pk_fields and not hasattr(field, "through"):
                    field_names.add(field.attname)
            loaded_fields = field_names.difference(deferred_non_generated_fields)
            if loaded_fields:
                update_fields = frozenset(loaded_fields)
    
>       self.save_base(
            using=using,
            force_insert=force_insert,
            force_update=force_update,
            update_fields=update_fields,
        )

.venv/lib/python3.13.../db/models/base.py:902: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <TaskLog: 842d95d0-0151-4c86-bb6b-b2e9f8cbcc87>, raw = False
force_insert = (<class 'authentik.tasks.models.TaskLog'>,), force_update = False
using = 'default', update_fields = None

    def save_base(
        self,
        raw=False,
        force_insert=False,
        force_update=False,
        using=None,
        update_fields=None,
    ):
        """
        Handle the parts of saving which should be done only once per save,
        yet need to be done in raw saves, too. This includes some sanity
        checks and signal sending.
    
        The 'raw' argument is telling save_base not to save any parent
        models and not to do any changes to the values before save. This
        is used by fixture loading.
        """
        using = using or router.db_for_write(self.__class__, instance=self)
        assert not (force_insert and (force_update or update_fields))
        assert update_fields is None or update_fields
        cls = origin = self.__class__
        # Skip proxies, but keep the origin as the proxy model.
        if cls._meta.proxy:
            cls = cls._meta.concrete_model
        meta = cls._meta
        if not meta.auto_created:
            pre_save.send(
                sender=origin,
                instance=self,
                raw=raw,
                using=using,
                update_fields=update_fields,
            )
        # A transaction isn't needed if one query is issued.
        if meta.parents:
            context_manager = transaction.atomic(using=using, savepoint=False)
        else:
            context_manager = transaction.mark_for_rollback_on_error(using=using)
        with context_manager:
            parent_inserted = False
            if not raw:
                # Validate force insert only when parents are inserted.
                force_insert = self._validate_force_insert(force_insert)
                parent_inserted = self._save_parents(
                    cls, using, update_fields, force_insert
                )
>           updated = self._save_table(
                raw,
                cls,
                force_insert or parent_inserted,
                force_update,
                using,
                update_fields,
            )

.venv/lib/python3.13.../db/models/base.py:1008: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <TaskLog: 842d95d0-0151-4c86-bb6b-b2e9f8cbcc87>, raw = False
cls = <class 'authentik.tasks.models.TaskLog'>
force_insert = (<class 'authentik.tasks.models.TaskLog'>,), force_update = False
using = 'default', update_fields = None

    def _save_table(
        self,
        raw=False,
        cls=None,
        force_insert=False,
        force_update=False,
        using=None,
        update_fields=None,
    ):
        """
        Do the heavy-lifting involved in saving. Update or insert the data
        for a single table.
        """
        meta = cls._meta
        pk_fields = meta.pk_fields
        non_pks_non_generated = [
            f
            for f in meta.local_concrete_fields
            if f not in pk_fields and not f.generated
        ]
    
        if update_fields:
            non_pks_non_generated = [
                f
                for f in non_pks_non_generated
                if f.name in update_fields or f.attname in update_fields
            ]
    
        if not self._is_pk_set(meta):
            pk_val = meta.pk.get_pk_value_on_save(self)
            setattr(self, meta.pk.attname, pk_val)
        pk_set = self._is_pk_set(meta)
        if not pk_set and (force_update or update_fields):
            raise ValueError("Cannot force an update in save() with no primary key.")
        updated = False
        # Skip an UPDATE when adding an instance and primary key has a default.
        if (
            not raw
            and not force_insert
            and not force_update
            and self._state.adding
            and all(f.has_default() or f.has_db_default() for f in meta.pk_fields)
        ):
            force_insert = True
        # If possible, try an UPDATE. If that doesn't update anything, do an INSERT.
        if pk_set and not force_insert:
            base_qs = cls._base_manager.using(using)
            values = [
                (
                    f,
                    None,
                    (getattr(self, f.attname) if raw else f.pre_save(self, False)),
                )
                for f in non_pks_non_generated
            ]
            forced_update = update_fields or force_update
            pk_val = self._get_pk_val(meta)
            updated = self._do_update(
                base_qs, using, pk_val, values, update_fields, forced_update
            )
            if force_update and not updated:
                raise DatabaseError("Forced update did not affect any rows.")
            if update_fields and not updated:
                raise DatabaseError("Save with update_fields did not affect any rows.")
        if not updated:
            if meta.order_with_respect_to:
                # If this is a model with an order_with_respect_to
                # autopopulate the _order field
                field = meta.order_with_respect_to
                filter_args = field.get_filter_kwargs_for_object(self)
                self._order = (
                    cls._base_manager.using(using)
                    .filter(**filter_args)
                    .aggregate(
                        _order__max=Coalesce(
                            ExpressionWrapper(
                                Max("_order") + Value(1), output_field=IntegerField()
                            ),
                            Value(0),
                        ),
                    )["_order__max"]
                )
            fields = [
                f
                for f in meta.local_concrete_fields
                if not f.generated and (pk_set or f is not meta.auto_field)
            ]
            returning_fields = meta.db_returning_fields
>           results = self._do_insert(
                cls._base_manager, using, fields, returning_fields, raw
            )

.venv/lib/python3.13.../db/models/base.py:1169: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <TaskLog: 842d95d0-0151-4c86-bb6b-b2e9f8cbcc87>
manager = <django.db.models.manager.Manager object at 0x7f17c57c4bd0>
using = 'default'
fields = [<django.db.models.fields.UUIDField: id>, <django.db.models.fields.related.ForeignKey: task>, <django.db.models.fields...ield: log_level>, <django.db.models.fields.TextField: logger>, <django.db.models.fields.DateTimeField: timestamp>, ...]
returning_fields = [], raw = False

    def _do_insert(self, manager, using, fields, returning_fields, raw):
        """
        Do an INSERT. If returning_fields is defined then this method should
        return the newly created data for the model.
        """
>       return manager._insert(
            [self],
            fields=fields,
            returning_fields=returning_fields,
            using=using,
            raw=raw,
        )

.venv/lib/python3.13.../db/models/base.py:1210: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <django.db.models.manager.Manager object at 0x7f17c57c4bd0>
args = ([<TaskLog: 842d95d0-0151-4c86-bb6b-b2e9f8cbcc87>],)
kwargs = {'fields': [<django.db.models.fields.UUIDField: id>, <django.db.models.fields.related.ForeignKey: task>, <django.db.mo...r>, <django.db.models.fields.DateTimeField: timestamp>, ...], 'raw': False, 'returning_fields': [], 'using': 'default'}

    @wraps(method)
    def manager_method(self, *args, **kwargs):
>       return getattr(self.get_queryset(), name)(*args, **kwargs)

.venv/lib/python3.13.../db/models/manager.py:87: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <QuerySet []>, objs = [<TaskLog: 842d95d0-0151-4c86-bb6b-b2e9f8cbcc87>]
fields = [<django.db.models.fields.UUIDField: id>, <django.db.models.fields.related.ForeignKey: task>, <django.db.models.fields...ield: log_level>, <django.db.models.fields.TextField: logger>, <django.db.models.fields.DateTimeField: timestamp>, ...]
returning_fields = [], raw = False, using = 'default', on_conflict = None
update_fields = None, unique_fields = None

    def _insert(
        self,
        objs,
        fields,
        returning_fields=None,
        raw=False,
        using=None,
        on_conflict=None,
        update_fields=None,
        unique_fields=None,
    ):
        """
        Insert a new record for the given model. This provides an interface to
        the InsertQuery class and is how Model.save() is implemented.
        """
        self._for_write = True
        if using is None:
            using = self.db
        query = sql.InsertQuery(
            self.model,
            on_conflict=on_conflict,
            update_fields=update_fields,
            unique_fields=unique_fields,
        )
        query.insert_values(fields, objs, raw=raw)
>       return query.get_compiler(using=using).execute_sql(returning_fields)

.venv/lib/python3.13.../db/models/query.py:1873: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <SQLInsertCompiler model=TaskLog connection=<DatabaseWrapper vendor='postgresql' alias='default'> using='default'>
returning_fields = []

    def execute_sql(self, returning_fields=None):
        assert not (
            returning_fields
            and len(self.query.objs) != 1
            and not self.connection.features.can_return_rows_from_bulk_insert
        )
        opts = self.query.get_meta()
        self.returning_fields = returning_fields
        cols = []
        with self.connection.cursor() as cursor:
            for sql, params in self.as_sql():
>               cursor.execute(sql, params)

.venv/lib/python3.13.../models/sql/compiler.py:1882: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

args = (<django.db.backends.utils.CursorWrapper object at 0x7f17c1e9d850>, 'INSERT INTO "authentik_tasks_tasklog" ("id", "tas...onzgjrupxqmoypayond2lskuwzjdk', datetime.datetime(2025, 12, 26, 14, 4, 58, 907313, tzinfo=datetime.timezone.utc), ...))
kwargs = {}

    def runner(*args: "P.args", **kwargs: "P.kwargs"):
        # type: (...) -> R
        if sentry_sdk.get_client().get_integration(integration) is None:
            return original_function(*args, **kwargs)
    
>       return sentry_patched_function(*args, **kwargs)

.venv/lib/python3.13....../site-packages/sentry_sdk/utils.py:1816: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <django.db.backends.utils.CursorWrapper object at 0x7f17c1e9d850>
sql = 'INSERT INTO "authentik_tasks_tasklog" ("id", "task_id", "event", "log_level", "logger", "timestamp", "attributes", "previous") VALUES (%s, %s, %s, %s, %s, %s, %s, %s)'
params = (UUID('842d95d0-0151-4c86-bb6b-b2e9f8cbcc87'), UUID('be14f74d-edd1-41b5-aee6-6a4f21e6ff0a'), 'insert or update on tabl...lonzgjrupxqmoypayond2lskuwzjdk', datetime.datetime(2025, 12, 26, 14, 4, 58, 907313, tzinfo=datetime.timezone.utc), ...)

    @ensure_integration_enabled(DjangoIntegration, real_execute)
    def execute(self, sql, params=None):
        # type: (CursorWrapper, Any, Optional[Any]) -> Any
        with record_sql_queries(
            cursor=self.cursor,
            query=sql,
            params_list=params,
            paramstyle="format",
            executemany=False,
            span_origin=DjangoIntegration.origin_db,
        ) as span:
            _set_db_data(span, self)
>           result = real_execute(self, sql, params)

.venv/lib/python3.13.../integrations/django/__init__.py:651: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <django.db.backends.utils.CursorWrapper object at 0x7f17c1e9d850>
sql = 'INSERT INTO "authentik_tasks_tasklog" ("id", "task_id", "event", "log_level", "logger", "timestamp", "attributes", "previous") VALUES (%s, %s, %s, %s, %s, %s, %s, %s)'
params = (UUID('842d95d0-0151-4c86-bb6b-b2e9f8cbcc87'), UUID('be14f74d-edd1-41b5-aee6-6a4f21e6ff0a'), 'insert or update on tabl...lonzgjrupxqmoypayond2lskuwzjdk', datetime.datetime(2025, 12, 26, 14, 4, 58, 907313, tzinfo=datetime.timezone.utc), ...)

    def execute(self, sql, params=None):
>       return self._execute_with_wrappers(
            sql, params, many=False, executor=self._execute
        )

.venv/lib/python3.13.../db/backends/utils.py:79: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <django.db.backends.utils.CursorWrapper object at 0x7f17c1e9d850>
sql = 'INSERT INTO "authentik_tasks_tasklog" ("id", "task_id", "event", "log_level", "logger", "timestamp", "attributes", "previous") VALUES (%s, %s, %s, %s, %s, %s, %s, %s)'
params = (UUID('842d95d0-0151-4c86-bb6b-b2e9f8cbcc87'), UUID('be14f74d-edd1-41b5-aee6-6a4f21e6ff0a'), 'insert or update on tabl...lonzgjrupxqmoypayond2lskuwzjdk', datetime.datetime(2025, 12, 26, 14, 4, 58, 907313, tzinfo=datetime.timezone.utc), ...)
many = False
executor = <bound method CursorWrapper._execute of <django.db.backends.utils.CursorWrapper object at 0x7f17c1e9d850>>

    def _execute_with_wrappers(self, sql, params, many, executor):
        context = {"connection": self.db, "cursor": self}
        for wrapper in reversed(self.db.execute_wrappers):
            executor = functools.partial(wrapper, executor)
>       return executor(sql, params, many, context)

.venv/lib/python3.13.../db/backends/utils.py:92: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <django.db.backends.utils.CursorWrapper object at 0x7f17c1e9d850>
sql = 'INSERT INTO "authentik_tasks_tasklog" ("id", "task_id", "event", "log_level", "logger", "timestamp", "attributes", "previous") VALUES (%s, %s, %s, %s, %s, %s, %s, %s)'
params = (UUID('842d95d0-0151-4c86-bb6b-b2e9f8cbcc87'), UUID('be14f74d-edd1-41b5-aee6-6a4f21e6ff0a'), 'insert or update on tabl...lonzgjrupxqmoypayond2lskuwzjdk', datetime.datetime(2025, 12, 26, 14, 4, 58, 907313, tzinfo=datetime.timezone.utc), ...)
ignored_wrapper_args = (False, {'connection': <DatabaseWrapper vendor='postgresql' alias='default'>, 'cursor': <django.db.backends.utils.CursorWrapper object at 0x7f17c1e9d850>})

    def _execute(self, sql, params, *ignored_wrapper_args):
        # Raise a warning during app initialization (stored_app_configs is only
        # ever set during testing).
        if not apps.ready and not apps.stored_app_configs:
            warnings.warn(self.APPS_NOT_READY_WARNING_MSG, category=RuntimeWarning)
        self.db.validate_no_broken_transaction()
>       with self.db.wrap_database_errors:

.venv/lib/python3.13.../db/backends/utils.py:100: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <django.db.utils.DatabaseErrorWrapper object at 0x7f17da9167b0>
exc_type = <class 'psycopg.errors.ForeignKeyViolation'>
exc_value = ForeignKeyViolation('insert or update on table "authentik_tasks_tasklog" violates foreign key constraint "authentik_ta...entik"\nDETAIL:  Key (task_id)=(be14f74d-edd1-41b5-aee6-6a4f21e6ff0a) is not present in table "authentik_tasks_task".')
traceback = <traceback object at 0x7f17c3fcb300>

    def __exit__(self, exc_type, exc_value, traceback):
        if exc_type is None:
            return
        for dj_exc_type in (
            DataError,
            OperationalError,
            IntegrityError,
            InternalError,
            ProgrammingError,
            NotSupportedError,
            DatabaseError,
            InterfaceError,
            Error,
        ):
            db_exc_type = getattr(self.wrapper.Database, dj_exc_type.__name__)
            if issubclass(exc_type, db_exc_type):
                dj_exc_value = dj_exc_type(*exc_value.args)
                # Only set the 'errors_occurred' flag for errors that may make
                # the connection unusable.
                if dj_exc_type not in (DataError, IntegrityError):
                    self.wrapper.errors_occurred = True
>               raise dj_exc_value.with_traceback(traceback) from exc_value

.venv/lib/python3.13.../django/db/utils.py:91: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <django.db.backends.utils.CursorWrapper object at 0x7f17c1e9d850>
sql = 'INSERT INTO "authentik_tasks_tasklog" ("id", "task_id", "event", "log_level", "logger", "timestamp", "attributes", "previous") VALUES (%s, %s, %s, %s, %s, %s, %s, %s)'
params = (UUID('842d95d0-0151-4c86-bb6b-b2e9f8cbcc87'), UUID('be14f74d-edd1-41b5-aee6-6a4f21e6ff0a'), 'insert or update on tabl...lonzgjrupxqmoypayond2lskuwzjdk', datetime.datetime(2025, 12, 26, 14, 4, 58, 907313, tzinfo=datetime.timezone.utc), ...)
ignored_wrapper_args = (False, {'connection': <DatabaseWrapper vendor='postgresql' alias='default'>, 'cursor': <django.db.backends.utils.CursorWrapper object at 0x7f17c1e9d850>})

    def _execute(self, sql, params, *ignored_wrapper_args):
        # Raise a warning during app initialization (stored_app_configs is only
        # ever set during testing).
        if not apps.ready and not apps.stored_app_configs:
            warnings.warn(self.APPS_NOT_READY_WARNING_MSG, category=RuntimeWarning)
        self.db.validate_no_broken_transaction()
        with self.db.wrap_database_errors:
            if params is None:
                # params default might be backend specific.
                return self.cursor.execute(sql)
            else:
>               return self.cursor.execute(sql, params)

.venv/lib/python3.13.../db/backends/utils.py:105: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <django_prometheus.db.common.ExportingCursorWrapper.<locals>.CursorWrapper [closed] [BAD] at 0x7f17c26a6990>
args = ('INSERT INTO "authentik_tasks_tasklog" ("id", "task_id", "event", "log_level", "logger", "timestamp", "attributes", "...onzgjrupxqmoypayond2lskuwzjdk', datetime.datetime(2025, 12, 26, 14, 4, 58, 907313, tzinfo=datetime.timezone.utc), ...))
kwargs = {}

    def execute(self, *args, **kwargs):
        execute_total.labels(alias, vendor).inc()
        with (
            query_duration_seconds.labels(**labels).time(),
            ExceptionCounterByType(errors_total, extra_labels=labels),
        ):
>           return super().execute(*args, **kwargs)

.venv/lib/python3.13.../django_prometheus/db/common.py:69: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <django_prometheus.db.common.ExportingCursorWrapper.<locals>.CursorWrapper [closed] [BAD] at 0x7f17c26a6990>
query = 'INSERT INTO "authentik_tasks_tasklog" ("id", "task_id", "event", "log_level", "logger", "timestamp", "attributes", "previous") VALUES (%s, %s, %s, %s, %s, %s, %s, %s)'
params = (UUID('842d95d0-0151-4c86-bb6b-b2e9f8cbcc87'), UUID('be14f74d-edd1-41b5-aee6-6a4f21e6ff0a'), 'insert or update on tabl...lonzgjrupxqmoypayond2lskuwzjdk', datetime.datetime(2025, 12, 26, 14, 4, 58, 907313, tzinfo=datetime.timezone.utc), ...)

    def execute(
        self,
        query: Query,
        params: Params | None = None,
        *,
        prepare: bool | None = None,
        binary: bool | None = None,
    ) -> Self:
        """
        Execute a query or command to the database.
        """
        try:
            with self._conn.lock:
                self._conn.wait(
                    self._execute_gen(query, params, prepare=prepare, binary=binary)
                )
        except e._NO_TRACEBACK as ex:
>           raise ex.with_traceback(None)
E           django.db.utils.IntegrityError: insert or update on table "authentik_tasks_tasklog" violates foreign key constraint "authentik_tasks_task_task_id_a82f0835_fk_authentik"
E           DETAIL:  Key (task_id)=(be14f74d-edd1-41b5-aee6-6a4f21e6ff0a) is not present in table "authentik_tasks_task".

.venv/lib/python3.13............/site-packages/psycopg/cursor.py:97: IntegrityError
tests.e2e.test_provider_oidc.TestProviderOAuth2OIDC::test_authorization_consent_implied
Stack Traces | 53.9s run time
self = <tests.e2e.test_provider_oidc.TestProviderOAuth2OIDC testMethod=test_authorization_consent_implied>
args = (), kwargs = {}

    @wraps(func)
    def wrapper(self: TransactionTestCase, *args, **kwargs):
        """Run test again if we're below max_retries, including tearDown and
        setUp. Otherwise raise the error"""
        nonlocal count
        try:
>           return func(self, *args, **kwargs)

tests/e2e/utils.py:492: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

args = (<tests.e2e.test_provider_oidc.TestProviderOAuth2OIDC testMethod=test_authorization_consent_implied>,)
kwargs = {}, file = 'default/flow-default-invalidation-flow.yaml'
content = 'version: 1\nmetadata:\n  name: Default - Invalidation flow\nentries:\n- attrs:\n    designation: invalidation\n    na...0\n    stage: !KeyOf default-invalidation-logout\n    target: !KeyOf flow\n  model: authentik_flows.flowstagebinding\n'

    @wraps(func)
    def wrapper(*args, **kwargs):
        for file in files:
            content = BlueprintInstance(path=file).retrieve()
            Importer.from_string(content).apply()
>       return func(*args, **kwargs)

.../blueprints/tests/__init__.py:25: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

args = (<tests.e2e.test_provider_oidc.TestProviderOAuth2OIDC testMethod=test_authorization_consent_implied>,)
kwargs = {}
file = 'default/flow-default-provider-authorization-implicit-consent.yaml'
content = 'version: 1\nmetadata:\n  name: Default - Provider authorization flow (implicit consent)\nentries:\n- attrs:\n    desi...henticated\n  identifiers:\n    slug: default-provider-authorization-implicit-consent\n  model: authentik_flows.flow\n'

    @wraps(func)
    def wrapper(*args, **kwargs):
        for file in files:
            content = BlueprintInstance(path=file).retrieve()
            Importer.from_string(content).apply()
>       return func(*args, **kwargs)

.../blueprints/tests/__init__.py:25: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

args = (<tests.e2e.test_provider_oidc.TestProviderOAuth2OIDC testMethod=test_authorization_consent_implied>,)
kwargs = {}, file = 'system/providers-oauth2.yaml'
content = 'version: 1\nmetadata:\n  labels:\n    blueprints.goauthentik.io/system: "true"\n  name: System - OAuth2 Provider - Sc... application the ability to access the authentik API\n        # on behalf of the authorizing user\n        return {}\n'

    @wraps(func)
    def wrapper(*args, **kwargs):
        for file in files:
            content = BlueprintInstance(path=file).retrieve()
            Importer.from_string(content).apply()
>       return func(*args, **kwargs)

.../blueprints/tests/__init__.py:25: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

args = (<tests.e2e.test_provider_oidc.TestProviderOAuth2OIDC testMethod=test_authorization_consent_implied>,)
kwargs = {}, config = <AuthentikCryptoConfig: authentik_crypto>

    @wraps(func)
    def wrapper(*args, **kwargs):
        config = apps.get_app_config(app_name)
        if isinstance(config, ManagedAppConfig):
            config._on_startup_callback(None)
>       return func(*args, **kwargs)

.../blueprints/tests/__init__.py:43: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_oidc.TestProviderOAuth2OIDC testMethod=test_authorization_consent_implied>

    @retry()
    @apply_blueprint(
        "default/flow-default-authentication-flow.yaml",
        "default/flow-default-invalidation-flow.yaml",
    )
    @apply_blueprint("default/flow-default-provider-authorization-implicit-consent.yaml")
    @apply_blueprint("system/providers-oauth2.yaml")
    @reconcile_app("authentik_crypto")
    def test_authorization_consent_implied(self):
        """test OpenID Provider flow (default authorization flow with implied consent)
        (due to offline_access a consent will still be triggered)"""
        sleep(1)
        # Bootstrap all needed objects
        authorization_flow = Flow.objects.get(
            slug="default-provider-authorization-implicit-consent"
        )
        provider = OAuth2Provider.objects.create(
            name=self.application_slug,
            client_type=ClientTypes.CONFIDENTIAL,
            client_id=self.client_id,
            client_secret=self.client_secret,
            signing_key=create_test_cert(),
            redirect_uris=[
                RedirectURI(RedirectURIMatchingMode.STRICT, "http://localhost:9009/auth/callback")
            ],
            authorization_flow=authorization_flow,
        )
        provider.property_mappings.set(
            ScopeMapping.objects.filter(
                scope_name__in=[
                    SCOPE_OPENID,
                    SCOPE_OPENID_EMAIL,
                    SCOPE_OPENID_PROFILE,
                    SCOPE_OFFLINE_ACCESS,
                ]
            )
        )
        app = Application.objects.create(
            name=self.application_slug,
            slug=self.application_slug,
            provider=provider,
        )
        self.setup_client()
    
        self.driver.get("http://localhost:9009")
        self.login()
    
        self.wait.until(ec.presence_of_element_located((By.CSS_SELECTOR, "ak-flow-executor")))
    
        flow_executor = self.get_shadow_root("ak-flow-executor")
>       consent_stage = self.get_shadow_root("ak-stage-consent", flow_executor)

tests/e2e/test_provider_oidc.py:153: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_oidc.TestProviderOAuth2OIDC testMethod=test_authorization_consent_implied>
selector = 'ak-stage-consent'
container = <selenium.webdriver.remote.shadowroot.ShadowRoot (session="8dd5ea07ac0a18a97c60a00b291fbf25", element="f.8FAABF8410875726AD8A9C30E7E18134.d.2C24E33ABB01FD0908FA787ED7B9C52C.e.6")>
timeout = 10

    def get_shadow_root(
        self, selector: str, container: WebElement | WebDriver | None = None, timeout: float = 10
    ) -> WebElement:
        """Get the shadow root of a web component specified by `selector`."""
        if not container:
            container = self.driver
        wait = WebDriverWait(container, timeout)
        host: WebElement | None = None
    
        try:
>           host = wait.until(lambda c: c.find_element(By.CSS_SELECTOR, selector))

tests/e2e/utils.py:347: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <[AttributeError("'ShadowRoot' object has no attribute 'session_id'") raised in repr()] WebDriverWait object at 0x7f3878bf2350>
method = <function SeleniumTestCase.get_shadow_root.<locals>.<lambda> at 0x7f3877eba980>
message = ''

    def until(self, method: Callable[[D], Union[Literal[False], T]], message: str = "") -> T:
        """Wait until the method returns a value that is not False.
    
        Calls the method provided with the driver as an argument until the
        return value does not evaluate to ``False``.
    
        Parameters:
        -----------
        method: callable(WebDriver)
            - A callable object that takes a WebDriver instance as an argument.
    
        message: str
            - Optional message for :exc:`TimeoutException`
    
        Return:
        -------
        object: T
            - The result of the last call to `method`
    
        Raises:
        -------
        TimeoutException
            - If 'method' does not return a truthy value within the WebDriverWait
            object's timeout
    
        Example:
        --------
        >>> from selenium.webdriver.common.by import By
        >>> from selenium.webdriver.support.ui import WebDriverWait
        >>> from selenium.webdriver.support import expected_conditions as EC
    
        # Wait until an element is visible on the page
        >>> wait = WebDriverWait(driver, 10)
        >>> element = wait.until(EC.visibility_of_element_located((By.ID, "exampleId")))
        >>> print(element.text)
        """
        screen = None
        stacktrace = None
    
        end_time = time.monotonic() + self._timeout
        while True:
            try:
>               value = method(self._driver)

.venv/lib/python3.13.../webdriver/support/wait.py:137: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

c = <selenium.webdriver.remote.shadowroot.ShadowRoot (session="8dd5ea07ac0a18a97c60a00b291fbf25", element="f.8FAABF8410875726AD8A9C30E7E18134.d.2C24E33ABB01FD0908FA787ED7B9C52C.e.6")>

>   host = wait.until(lambda c: c.find_element(By.CSS_SELECTOR, selector))

tests/e2e/utils.py:347: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <selenium.webdriver.remote.shadowroot.ShadowRoot (session="8dd5ea07ac0a18a97c60a00b291fbf25", element="f.8FAABF8410875726AD8A9C30E7E18134.d.2C24E33ABB01FD0908FA787ED7B9C52C.e.6")>
by = 'css selector', value = 'ak-stage-consent'

    def find_element(self, by: str = By.ID, value: str = None):
        """Find an element inside a shadow root given a By strategy and
        locator.
    
        Parameters:
        -----------
        by : selenium.webdriver.common.by.By
            The locating strategy to use. Default is `By.ID`. Supported values include:
            - By.ID: Locate by element ID.
            - By.NAME: Locate by the `name` attribute.
            - By.XPATH: Locate by an XPath expression.
            - By.CSS_SELECTOR: Locate by a CSS selector.
            - By.CLASS_NAME: Locate by the `class` attribute.
            - By.TAG_NAME: Locate by the tag name (e.g., "input", "button").
            - By.LINK_TEXT: Locate a link element by its exact text.
            - By.PARTIAL_LINK_TEXT: Locate a link element by partial text match.
            - RelativeBy: Locate elements relative to a specified root element.
    
        Example:
        --------
        element = driver.find_element(By.ID, 'foo')
    
        Returns:
        -------
        WebElement
            The first matching `WebElement` found on the page.
        """
        if by == By.ID:
            by = By.CSS_SELECTOR
            value = f'[id="{value}"]'
        elif by == By.CLASS_NAME:
            by = By.CSS_SELECTOR
            value = f".{value}"
        elif by == By.NAME:
            by = By.CSS_SELECTOR
            value = f'[name="{value}"]'
    
>       return self._execute(Command.FIND_ELEMENT_FROM_SHADOW_ROOT, {"using": by, "value": value})["value"]

.venv/lib/python3.13.../webdriver/remote/shadowroot.py:79: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <selenium.webdriver.remote.shadowroot.ShadowRoot (session="8dd5ea07ac0a18a97c60a00b291fbf25", element="f.8FAABF8410875726AD8A9C30E7E18134.d.2C24E33ABB01FD0908FA787ED7B9C52C.e.6")>
command = 'findElementFromShadowRoot'
params = {'shadowId': 'f.8FAABF8410875726AD8A9C30E7E18134.d.2C24E33ABB01FD0908FA787ED7B9C52C.e.6', 'using': 'css selector', 'value': 'ak-stage-consent'}

    def _execute(self, command, params=None):
        """Executes a command against the underlying HTML element.
    
        Args:
          command: The name of the command to _execute as a string.
          params: A dictionary of named parameters to send with the command.
    
        Returns:
          The command's JSON response loaded into a dictionary object.
        """
        if not params:
            params = {}
        params["shadowId"] = self._id
>       return self.session.execute(command, params)

.venv/lib/python3.13.../webdriver/remote/shadowroot.py:133: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <selenium.webdriver.remote.webdriver.WebDriver (session="8dd5ea07ac0a18a97c60a00b291fbf25")>
driver_command = 'findElementFromShadowRoot'
params = {'using': 'css selector', 'value': 'ak-stage-consent'}

    def execute(self, driver_command: str, params: dict = None) -> dict:
        """Sends a command to be executed by a command.CommandExecutor.
    
        Parameters:
        -----------
        driver_command : str
            - The name of the command to execute as a string.
    
        params : dict
            - A dictionary of named Parameters to send with the command.
    
        Returns:
        --------
          dict - The command's JSON response loaded into a dictionary object.
        """
        params = self._wrap_value(params)
    
        if self.session_id:
            if not params:
                params = {"sessionId": self.session_id}
            elif "sessionId" not in params:
                params["sessionId"] = self.session_id
    
        response = self.command_executor.execute(driver_command, params)
        if response:
>           self.error_handler.check_response(response)

.venv/lib/python3.13.../webdriver/remote/webdriver.py:448: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <selenium.webdriver.remote.errorhandler.ErrorHandler object at 0x7f3879d6e450>
response = {'status': 404, 'value': '{"value":{"error":"detached shadow root","message":"detached shadow root: detached shadow ro...\\n#27 0x55fd2d513cd5 \\u003Cunknown>\\n#28 0x55fd2d525709 \\u003Cunknown>\\n#29 0x7f2e1738d428 \\u003Cunknown>\\n"}}'}

    def check_response(self, response: Dict[str, Any]) -> None:
        """Checks that a JSON response from the WebDriver does not have an
        error.
    
        :Args:
         - response - The JSON response from the WebDriver server as a dictionary
           object.
    
        :Raises: If the response contains an error message.
        """
        status = response.get("status", None)
        if not status or status == ErrorCode.SUCCESS:
            return
        value = None
        message = response.get("message", "")
        screen: str = response.get("screen", "")
        stacktrace = None
        if isinstance(status, int):
            value_json = response.get("value", None)
            if value_json and isinstance(value_json, str):
                import json
    
                try:
                    value = json.loads(value_json)
                    if len(value) == 1:
                        value = value["value"]
                    status = value.get("error", None)
                    if not status:
                        status = value.get("status", ErrorCode.UNKNOWN_ERROR)
                        message = value.get("value") or value.get("message")
                        if not isinstance(message, str):
                            value = message
                            message = message.get("message")
                    else:
                        message = value.get("message", None)
                except ValueError:
                    pass
    
        exception_class: Type[WebDriverException]
        e = ErrorCode()
        error_codes = [item for item in dir(e) if not item.startswith("__")]
        for error_code in error_codes:
            error_info = getattr(ErrorCode, error_code)
            if isinstance(error_info, list) and status in error_info:
                exception_class = getattr(ExceptionMapping, error_code, WebDriverException)
                break
        else:
            exception_class = WebDriverException
    
        if not value:
            value = response["value"]
        if isinstance(value, str):
            raise exception_class(value)
        if message == "" and "message" in value:
            message = value["message"]
    
        screen = None  # type: ignore[assignment]
        if "screen" in value:
            screen = value["screen"]
    
        stacktrace = None
        st_value = value.get("stackTrace") or value.get("stacktrace")
        if st_value:
            if isinstance(st_value, str):
                stacktrace = st_value.split("\n")
            else:
                stacktrace = []
                try:
                    for frame in st_value:
                        line = frame.get("lineNumber", "")
                        file = frame.get("fileName", "<anonymous>")
                        if line:
                            file = f"{file}:{line}"
                        meth = frame.get("methodName", "<anonymous>")
                        if "className" in frame:
                            meth = f"{frame['className']}.{meth}"
                        msg = "    at %s (%s)"
                        msg = msg % (meth, file)
                        stacktrace.append(msg)
                except TypeError:
                    pass
        if exception_class == UnexpectedAlertPresentException:
            alert_text = None
            if "data" in value:
                alert_text = value["data"].get("text")
            elif "alert" in value:
                alert_text = value["alert"].get("text")
            raise exception_class(message, screen, stacktrace, alert_text)  # type: ignore[call-arg]  # mypy is not smart enough here
>       raise exception_class(message, screen, stacktrace)
E       selenium.common.exceptions.DetachedShadowRootException: Message: detached shadow root: detached shadow root not found
E         (Session info: chrome=143.0.7499.169)
E       Stacktrace:
E       #0 0x55fd2d526432 <unknown>
E       #1 0x55fd2cf6b71b <unknown>
E       #2 0x55fd2cf7f207 <unknown>
E       #3 0x55fd2cf7de10 <unknown>
E       #4 0x55fd2cf7f719 <unknown>
E       #5 0x55fd2cf72c64 <unknown>
E       #6 0x55fd2cf71873 <unknown>
E       #7 0x55fd2cf74d6f <unknown>
E       #8 0x55fd2cf74e55 <unknown>
E       #9 0x55fd2cfbada4 <unknown>
E       #10 0x55fd2cfbb7d5 <unknown>
E       #11 0x55fd2cfafcaa <unknown>
E       #12 0x55fd2cfdfee1 <unknown>
E       #13 0x55fd2cfafb81 <unknown>
E       #14 0x55fd2cfe00a2 <unknown>
E       #15 0x55fd2d001b62 <unknown>
E       #16 0x55fd2cfdfc67 <unknown>
E       #17 0x55fd2cfae047 <unknown>
E       #18 0x55fd2cfaee15 <unknown>
E       #19 0x55fd2d4f1064 <unknown>
E       #20 0x55fd2d4f4354 <unknown>
E       #21 0x55fd2d4f3e0e <unknown>
E       #22 0x55fd2d4f47e9 <unknown>
E       #23 0x55fd2d4daa7a <unknown>
E       #24 0x55fd2d4f4b5a <unknown>
E       #25 0x55fd2d4c3629 <unknown>
E       #26 0x55fd2d513ae9 <unknown>
E       #27 0x55fd2d513cd5 <unknown>
E       #28 0x55fd2d525709 <unknown>
E       #29 0x7f2e1738d428 <unknown>

.venv/lib/python3.13.../webdriver/remote/errorhandler.py:232: DetachedShadowRootException

During handling of the above exception, another exception occurred:

self = <tests.e2e.test_provider_oidc.TestProviderOAuth2OIDC testMethod=test_authorization_consent_implied>
args = (), kwargs = {}

    @wraps(func)
    def wrapper(self: TransactionTestCase, *args, **kwargs):
        """Run test again if we're below max_retries, including tearDown and
        setUp. Otherwise raise the error"""
        nonlocal count
        try:
>           return func(self, *args, **kwargs)

tests/e2e/utils.py:492: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

args = (<tests.e2e.test_provider_oidc.TestProviderOAuth2OIDC testMethod=test_authorization_consent_implied>,)
kwargs = {}, file = 'default/flow-default-invalidation-flow.yaml'
content = 'version: 1\nmetadata:\n  name: Default - Invalidation flow\nentries:\n- attrs:\n    designation: invalidation\n    na...0\n    stage: !KeyOf default-invalidation-logout\n    target: !KeyOf flow\n  model: authentik_flows.flowstagebinding\n'

    @wraps(func)
    def wrapper(*args, **kwargs):
        for file in files:
            content = BlueprintInstance(path=file).retrieve()
            Importer.from_string(content).apply()
>       return func(*args, **kwargs)

.../blueprints/tests/__init__.py:25: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

args = (<tests.e2e.test_provider_oidc.TestProviderOAuth2OIDC testMethod=test_authorization_consent_implied>,)
kwargs = {}
file = 'default/flow-default-provider-authorization-implicit-consent.yaml'
content = 'version: 1\nmetadata:\n  name: Default - Provider authorization flow (implicit consent)\nentries:\n- attrs:\n    desi...henticated\n  identifiers:\n    slug: default-provider-authorization-implicit-consent\n  model: authentik_flows.flow\n'

    @wraps(func)
    def wrapper(*args, **kwargs):
        for file in files:
            content = BlueprintInstance(path=file).retrieve()
            Importer.from_string(content).apply()
>       return func(*args, **kwargs)

.../blueprints/tests/__init__.py:25: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

args = (<tests.e2e.test_provider_oidc.TestProviderOAuth2OIDC testMethod=test_authorization_consent_implied>,)
kwargs = {}, file = 'system/providers-oauth2.yaml'
content = 'version: 1\nmetadata:\n  labels:\n    blueprints.goauthentik.io/system: "true"\n  name: System - OAuth2 Provider - Sc... application the ability to access the authentik API\n        # on behalf of the authorizing user\n        return {}\n'

    @wraps(func)
    def wrapper(*args, **kwargs):
        for file in files:
            content = BlueprintInstance(path=file).retrieve()
            Importer.from_string(content).apply()
>       return func(*args, **kwargs)

.../blueprints/tests/__init__.py:25: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

args = (<tests.e2e.test_provider_oidc.TestProviderOAuth2OIDC testMethod=test_authorization_consent_implied>,)
kwargs = {}, config = <AuthentikCryptoConfig: authentik_crypto>

    @wraps(func)
    def wrapper(*args, **kwargs):
        config = apps.get_app_config(app_name)
        if isinstance(config, ManagedAppConfig):
            config._on_startup_callback(None)
>       return func(*args, **kwargs)

.../blueprints/tests/__init__.py:43: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_oidc.TestProviderOAuth2OIDC testMethod=test_authorization_consent_implied>

    @retry()
    @apply_blueprint(
        "default/flow-default-authentication-flow.yaml",
        "default/flow-default-invalidation-flow.yaml",
    )
    @apply_blueprint("default/flow-default-provider-authorization-implicit-consent.yaml")
    @apply_blueprint("system/providers-oauth2.yaml")
    @reconcile_app("authentik_crypto")
    def test_authorization_consent_implied(self):
        """test OpenID Provider flow (default authorization flow with implied consent)
        (due to offline_access a consent will still be triggered)"""
        sleep(1)
        # Bootstrap all needed objects
        authorization_flow = Flow.objects.get(
            slug="default-provider-authorization-implicit-consent"
        )
        provider = OAuth2Provider.objects.create(
            name=self.application_slug,
            client_type=ClientTypes.CONFIDENTIAL,
            client_id=self.client_id,
            client_secret=self.client_secret,
            signing_key=create_test_cert(),
            redirect_uris=[
                RedirectURI(RedirectURIMatchingMode.STRICT, "http://localhost:9009/auth/callback")
            ],
            authorization_flow=authorization_flow,
        )
        provider.property_mappings.set(
            ScopeMapping.objects.filter(
                scope_name__in=[
                    SCOPE_OPENID,
                    SCOPE_OPENID_EMAIL,
                    SCOPE_OPENID_PROFILE,
                    SCOPE_OFFLINE_ACCESS,
                ]
            )
        )
        app = Application.objects.create(
            name=self.application_slug,
            slug=self.application_slug,
            provider=provider,
        )
        self.setup_client()
    
        self.driver.get("http://localhost:9009")
        self.login()
    
        self.wait.until(ec.presence_of_element_located((By.CSS_SELECTOR, "ak-flow-executor")))
    
        flow_executor = self.get_shadow_root("ak-flow-executor")
>       consent_stage = self.get_shadow_root("ak-stage-consent", flow_executor)

tests/e2e/test_provider_oidc.py:153: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_oidc.TestProviderOAuth2OIDC testMethod=test_authorization_consent_implied>
selector = 'ak-stage-consent'
container = <selenium.webdriver.remote.shadowroot.ShadowRoot (session="d10a1d7341b00c79107200230899407e", element="f.41A6690628E29BC246079AE540AAFDE8.d.0427AA219CE8821CB6A3D5552C3B7E45.e.6")>
timeout = 10

    def get_shadow_root(
        self, selector: str, container: WebElement | WebDriver | None = None, timeout: float = 10
    ) -> WebElement:
        """Get the shadow root of a web component specified by `selector`."""
        if not container:
            container = self.driver
        wait = WebDriverWait(container, timeout)
        host: WebElement | None = None
    
        try:
>           host = wait.until(lambda c: c.find_element(By.CSS_SELECTOR, selector))

tests/e2e/utils.py:347: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <[AttributeError("'ShadowRoot' object has no attribute 'session_id'") raised in repr()] WebDriverWait object at 0x7f38797f6270>
method = <function SeleniumTestCase.get_shadow_root.<locals>.<lambda> at 0x7f3878770c20>
message = ''

    def until(self, method: Callable[[D], Union[Literal[False], T]], message: str = "") -> T:
        """Wait until the method returns a value that is not False.
    
        Calls the method provided with the driver as an argument until the
        return value does not evaluate to ``False``.
    
        Parameters:
        -----------
        method: callable(WebDriver)
            - A callable object that takes a WebDriver instance as an argument.
    
        message: str
            - Optional message for :exc:`TimeoutException`
    
        Return:
        -------
        object: T
            - The result of the last call to `method`
    
        Raises:
        -------
        TimeoutException
            - If 'method' does not return a truthy value within the WebDriverWait
            object's timeout
    
        Example:
        --------
        >>> from selenium.webdriver.common.by import By
        >>> from selenium.webdriver.support.ui import WebDriverWait
        >>> from selenium.webdriver.support import expected_conditions as EC
    
        # Wait until an element is visible on the page
        >>> wait = WebDriverWait(driver, 10)
        >>> element = wait.until(EC.visibility_of_element_located((By.ID, "exampleId")))
        >>> print(element.text)
        """
        screen = None
        stacktrace = None
    
        end_time = time.monotonic() + self._timeout
        while True:
            try:
>               value = method(self._driver)

.venv/lib/python3.13.../webdriver/support/wait.py:137: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

c = <selenium.webdriver.remote.shadowroot.ShadowRoot (session="d10a1d7341b00c79107200230899407e", element="f.41A6690628E29BC246079AE540AAFDE8.d.0427AA219CE8821CB6A3D5552C3B7E45.e.6")>

>   host = wait.until(lambda c: c.find_element(By.CSS_SELECTOR, selector))

tests/e2e/utils.py:347: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <selenium.webdriver.remote.shadowroot.ShadowRoot (session="d10a1d7341b00c79107200230899407e", element="f.41A6690628E29BC246079AE540AAFDE8.d.0427AA219CE8821CB6A3D5552C3B7E45.e.6")>
by = 'css selector', value = 'ak-stage-consent'

    def find_element(self, by: str = By.ID, value: str = None):
        """Find an element inside a shadow root given a By strategy and
        locator.
    
        Parameters:
        -----------
        by : selenium.webdriver.common.by.By
            The locating strategy to use. Default is `By.ID`. Supported values include:
            - By.ID: Locate by element ID.
            - By.NAME: Locate by the `name` attribute.
            - By.XPATH: Locate by an XPath expression.
            - By.CSS_SELECTOR: Locate by a CSS selector.
            - By.CLASS_NAME: Locate by the `class` attribute.
            - By.TAG_NAME: Locate by the tag name (e.g., "input", "button").
            - By.LINK_TEXT: Locate a link element by its exact text.
            - By.PARTIAL_LINK_TEXT: Locate a link element by partial text match.
            - RelativeBy: Locate elements relative to a specified root element.
    
        Example:
        --------
        element = driver.find_element(By.ID, 'foo')
    
        Returns:
        -------
        WebElement
            The first matching `WebElement` found on the page.
        """
        if by == By.ID:
            by = By.CSS_SELECTOR
            value = f'[id="{value}"]'
        elif by == By.CLASS_NAME:
            by = By.CSS_SELECTOR
            value = f".{value}"
        elif by == By.NAME:
            by = By.CSS_SELECTOR
            value = f'[name="{value}"]'
    
>       return self._execute(Command.FIND_ELEMENT_FROM_SHADOW_ROOT, {"using": by, "value": value})["value"]

.venv/lib/python3.13.../webdriver/remote/shadowroot.py:79: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <selenium.webdriver.remote.shadowroot.ShadowRoot (session="d10a1d7341b00c79107200230899407e", element="f.41A6690628E29BC246079AE540AAFDE8.d.0427AA219CE8821CB6A3D5552C3B7E45.e.6")>
command = 'findElementFromShadowRoot'
params = {'shadowId': 'f.41A6690628E29BC246079AE540AAFDE8.d.0427AA219CE8821CB6A3D5552C3B7E45.e.6', 'using': 'css selector', 'value': 'ak-stage-consent'}

    def _execute(self, command, params=None):
        """Executes a command against the underlying HTML element.
    
        Args:
          command: The name of the command to _execute as a string.
          params: A dictionary of named parameters to send with the command.
    
        Returns:
          The command's JSON response loaded into a dictionary object.
        """
        if not params:
            params = {}
        params["shadowId"] = self._id
>       return self.session.execute(command, params)

.venv/lib/python3.13.../webdriver/remote/shadowroot.py:133: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <selenium.webdriver.remote.webdriver.WebDriver (session="d10a1d7341b00c79107200230899407e")>
driver_command = 'findElementFromShadowRoot'
params = {'using': 'css selector', 'value': 'ak-stage-consent'}

    def execute(self, driver_command: str, params: dict = None) -> dict:
        """Sends a command to be executed by a command.CommandExecutor.
    
        Parameters:
        -----------
        driver_command : str
            - The name of the command to execute as a string.
    
        params : dict
            - A dictionary of named Parameters to send with the command.
    
        Returns:
        --------
          dict - The command's JSON response loaded into a dictionary object.
        """
        params = self._wrap_value(params)
    
        if self.session_id:
            if not params:
                params = {"sessionId": self.session_id}
            elif "sessionId" not in params:
                params["sessionId"] = self.session_id
    
        response = self.command_executor.execute(driver_command, params)
        if response:
>           self.error_handler.check_response(response)

.venv/lib/python3.13.../webdriver/remote/webdriver.py:448: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <selenium.webdriver.remote.errorhandler.ErrorHandler object at 0x7f38795d2580>
response = {'status': 404, 'value': '{"value":{"error":"detached shadow root","message":"detached shadow root: detached shadow ro...\\n#27 0x560bdce72cd5 \\u003Cunknown>\\n#28 0x560bdce84709 \\u003Cunknown>\\n#29 0x7faebca9c428 \\u003Cunknown>\\n"}}'}

    def check_response(self, response: Dict[str, Any]) -> None:
        """Checks that a JSON response from the WebDriver does not have an
        error.
    
        :Args:
         - response - The JSON response from the WebDriver server as a dictionary
           object.
    
        :Raises: If the response contains an error message.
        """
        status = response.get("status", None)
        if not status or status == ErrorCode.SUCCESS:
            return
        value = None
        message = response.get("message", "")
        screen: str = response.get("screen", "")
        stacktrace = None
        if isinstance(status, int):
            value_json = response.get("value", None)
            if value_json and isinstance(value_json, str):
                import json
    
                try:
                    value = json.loads(value_json)
                    if len(value) == 1:
                        value = value["value"]
                    status = value.get("error", None)
                    if not status:
                        status = value.get("status", ErrorCode.UNKNOWN_ERROR)
                        message = value.get("value") or value.get("message")
                        if not isinstance(message, str):
                            value = message
                            message = message.get("message")
                    else:
                        message = value.get("message", None)
                except ValueError:
                    pass
    
        exception_class: Type[WebDriverException]
        e = ErrorCode()
        error_codes = [item for item in dir(e) if not item.startswith("__")]
        for error_code in error_codes:
            error_info = getattr(ErrorCode, error_code)
            if isinstance(error_info, list) and status in error_info:
                exception_class = getattr(ExceptionMapping, error_code, WebDriverException)
                break
        else:
            exception_class = WebDriverException
    
        if not value:
            value = response["value"]
        if isinstance(value, str):
            raise exception_class(value)
        if message == "" and "message" in value:
            message = value["message"]
    
        screen = None  # type: ignore[assignment]
        if "screen" in value:
            screen = value["screen"]
    
        stacktrace = None
        st_value = value.get("stackTrace") or value.get("stacktrace")
        if st_value:
            if isinstance(st_value, str):
                stacktrace = st_value.split("\n")
            else:
                stacktrace = []
                try:
                    for frame in st_value:
                        line = frame.get("lineNumber", "")
                        file = frame.get("fileName", "<anonymous>")
                        if line:
                            file = f"{file}:{line}"
                        meth = frame.get("methodName", "<anonymous>")
                        if "className" in frame:
                            meth = f"{frame['className']}.{meth}"
                        msg = "    at %s (%s)"
                        msg = msg % (meth, file)
                        stacktrace.append(msg)
                except TypeError:
                    pass
        if exception_class == UnexpectedAlertPresentException:
            alert_text = None
            if "data" in value:
                alert_text = value["data"].get("text")
            elif "alert" in value:
                alert_text = value["alert"].get("text")
            raise exception_class(message, screen, stacktrace, alert_text)  # type: ignore[call-arg]  # mypy is not smart enough here
>       raise exception_class(message, screen, stacktrace)
E       selenium.common.exceptions.DetachedShadowRootException: Message: detached shadow root: detached shadow root not found
E         (Session info: chrome=143.0.7499.169)
E       Stacktrace:
E       #0 0x560bdce85432 <unknown>
E       #1 0x560bdc8ca71b <unknown>
E       #2 0x560bdc8de207 <unknown>
E       #3 0x560bdc8dce10 <unknown>
E       #4 0x560bdc8de719 <unknown>
E       #5 0x560bdc8d1c64 <unknown>
E       #6 0x560bdc8d0873 <unknown>
E       #7 0x560bdc8d3d6f <unknown>
E       #8 0x560bdc8d3e55 <unknown>
E       #9 0x560bdc919da4 <unknown>
E       #10 0x560bdc91a7d5 <unknown>
E       #11 0x560bdc90ecaa <unknown>
E       #12 0x560bdc93eee1 <unknown>
E       #13 0x560bdc90eb81 <unknown>
E       #14 0x560bdc93f0a2 <unknown>
E       #15 0x560bdc960b62 <unknown>
E       #16 0x560bdc93ec67 <unknown>
E       #17 0x560bdc90d047 <unknown>
E       #18 0x560bdc90de15 <unknown>
E       #19 0x560bdce50064 <unknown>
E       #20 0x560bdce53354 <unknown>
E       #21 0x560bdce52e0e <unknown>
E       #22 0x560bdce537e9 <unknown>
E       #23 0x560bdce39a7a <unknown>
E       #24 0x560bdce53b5a <unknown>
E       #25 0x560bdce22629 <unknown>
E       #26 0x560bdce72ae9 <unknown>
E       #27 0x560bdce72cd5 <unknown>
E       #28 0x560bdce84709 <unknown>
E       #29 0x7faebca9c428 <unknown>

.venv/lib/python3.13.../webdriver/remote/errorhandler.py:232: DetachedShadowRootException

During handling of the above exception, another exception occurred:

self = <unittest.case._Outcome object at 0x7f38794cf890>
test_case = <tests.e2e.test_provider_oidc.TestProviderOAuth2OIDC testMethod=test_authorization_consent_implied>
subTest = False

    @contextlib.contextmanager
    def testPartExecutor(self, test_case, subTest=False):
        old_success = self.success
        self.success = True
        try:
>           yield

.../hostedtoolcache/Python/3.13.11........./x64/lib/python3.13/unittest/case.py:58: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_oidc.TestProviderOAuth2OIDC testMethod=test_authorization_consent_implied>
result = <TestCaseFunction test_authorization_consent_implied>

    def run(self, result=None):
        if result is None:
            result = self.defaultTestResult()
            startTestRun = getattr(result, 'startTestRun', None)
            stopTestRun = getattr(result, 'stopTestRun', None)
            if startTestRun is not None:
                startTestRun()
        else:
            stopTestRun = None
    
        result.startTest(self)
        try:
            testMethod = getattr(self, self._testMethodName)
            if (getattr(self.__class__, "__unittest_skip__", False) or
                getattr(testMethod, "__unittest_skip__", False)):
                # If the class or method was skipped.
                skip_why = (getattr(self.__class__, '__unittest_skip_why__', '')
                            or getattr(testMethod, '__unittest_skip_why__', ''))
                _addSkip(result, self, skip_why)
                return result
    
            expecting_failure = (
                getattr(self, "__unittest_expecting_failure__", False) or
                getattr(testMethod, "__unittest_expecting_failure__", False)
            )
            outcome = _Outcome(result)
            start_time = time.perf_counter()
            try:
                self._outcome = outcome
    
                with outcome.testPartExecutor(self):
                    self._callSetUp()
                if outcome.success:
                    outcome.expecting_failure = expecting_failure
                    with outcome.testPartExecutor(self):
>                       self._callTestMethod(testMethod)

.../hostedtoolcache/Python/3.13.11........./x64/lib/python3.13/unittest/case.py:651: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_oidc.TestProviderOAuth2OIDC testMethod=test_authorization_consent_implied>
method = <bound method TestProviderOAuth2OIDC.test_authorization_consent_implied of <tests.e2e.test_provider_oidc.TestProviderOAuth2OIDC testMethod=test_authorization_consent_implied>>

    def _callTestMethod(self, method):
>       if method() is not None:

.../hostedtoolcache/Python/3.13.11........./x64/lib/python3.13/unittest/case.py:606: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_oidc.TestProviderOAuth2OIDC testMethod=test_authorization_consent_implied>
args = (), kwargs = {}

    @wraps(func)
    def wrapper(self: TransactionTestCase, *args, **kwargs):
        """Run test again if we're below max_retries, including tearDown and
        setUp. Otherwise raise the error"""
        nonlocal count
        try:
            return func(self, *args, **kwargs)
    
        except tuple(exceptions) as exc:
            count += 1
            if count > max_retires:
                logger.debug("Exceeded retry count", exc=exc, test=self)
    
                raise exc
            logger.debug("Retrying on error", exc=exc, test=self)
            self.tearDown()
            self._post_teardown()
            self._pre_setup()
            self.setUp()
>           return wrapper(self, *args, **kwargs)

tests/e2e/utils.py:505: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_oidc.TestProviderOAuth2OIDC testMethod=test_authorization_consent_implied>
args = (), kwargs = {}

    @wraps(func)
    def wrapper(self: TransactionTestCase, *args, **kwargs):
        """Run test again if we're below max_retries, including tearDown and
        setUp. Otherwise raise the error"""
        nonlocal count
        try:
            return func(self, *args, **kwargs)
    
        except tuple(exceptions) as exc:
            count += 1
            if count > max_retires:
                logger.debug("Exceeded retry count", exc=exc, test=self)
    
                raise exc
            logger.debug("Retrying on error", exc=exc, test=self)
            self.tearDown()
            self._post_teardown()
            self._pre_setup()
            self.setUp()
>           return wrapper(self, *args, **kwargs)

tests/e2e/utils.py:505: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_oidc.TestProviderOAuth2OIDC testMethod=test_authorization_consent_implied>
args = (), kwargs = {}

    @wraps(func)
    def wrapper(self: TransactionTestCase, *args, **kwargs):
        """Run test again if we're below max_retries, including tearDown and
        setUp. Otherwise raise the error"""
        nonlocal count
        try:
            return func(self, *args, **kwargs)
    
        except tuple(exceptions) as exc:
            count += 1
            if count > max_retires:
                logger.debug("Exceeded retry count", exc=exc, test=self)
    
>               raise exc

tests/e2e/utils.py:499: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_oidc.TestProviderOAuth2OIDC testMethod=test_authorization_consent_implied>
args = (), kwargs = {}

    @wraps(func)
    def wrapper(self: TransactionTestCase, *args, **kwargs):
        """Run test again if we're below max_retries, including tearDown and
        setUp. Otherwise raise the error"""
        nonlocal count
        try:
>           return func(self, *args, **kwargs)

tests/e2e/utils.py:492: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

args = (<tests.e2e.test_provider_oidc.TestProviderOAuth2OIDC testMethod=test_authorization_consent_implied>,)
kwargs = {}, file = 'default/flow-default-invalidation-flow.yaml'
content = 'version: 1\nmetadata:\n  name: Default - Invalidation flow\nentries:\n- attrs:\n    designation: invalidation\n    na...0\n    stage: !KeyOf default-invalidation-logout\n    target: !KeyOf flow\n  model: authentik_flows.flowstagebinding\n'

    @wraps(func)
    def wrapper(*args, **kwargs):
        for file in files:
            content = BlueprintInstance(path=file).retrieve()
            Importer.from_string(content).apply()
>       return func(*args, **kwargs)

.../blueprints/tests/__init__.py:25: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

args = (<tests.e2e.test_provider_oidc.TestProviderOAuth2OIDC testMethod=test_authorization_consent_implied>,)
kwargs = {}
file = 'default/flow-default-provider-authorization-implicit-consent.yaml'
content = 'version: 1\nmetadata:\n  name: Default - Provider authorization flow (implicit consent)\nentries:\n- attrs:\n    desi...henticated\n  identifiers:\n    slug: default-provider-authorization-implicit-consent\n  model: authentik_flows.flow\n'

    @wraps(func)
    def wrapper(*args, **kwargs):
        for file in files:
            content = BlueprintInstance(path=file).retrieve()
            Importer.from_string(content).apply()
>       return func(*args, **kwargs)

.../blueprints/tests/__init__.py:25: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

args = (<tests.e2e.test_provider_oidc.TestProviderOAuth2OIDC testMethod=test_authorization_consent_implied>,)
kwargs = {}, file = 'system/providers-oauth2.yaml'
content = 'version: 1\nmetadata:\n  labels:\n    blueprints.goauthentik.io/system: "true"\n  name: System - OAuth2 Provider - Sc... application the ability to access the authentik API\n        # on behalf of the authorizing user\n        return {}\n'

    @wraps(func)
    def wrapper(*args, **kwargs):
        for file in files:
            content = BlueprintInstance(path=file).retrieve()
            Importer.from_string(content).apply()
>       return func(*args, **kwargs)

.../blueprints/tests/__init__.py:25: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

args = (<tests.e2e.test_provider_oidc.TestProviderOAuth2OIDC testMethod=test_authorization_consent_implied>,)
kwargs = {}, config = <AuthentikCryptoConfig: authentik_crypto>

    @wraps(func)
    def wrapper(*args, **kwargs):
        config = apps.get_app_config(app_name)
        if isinstance(config, ManagedAppConfig):
            config._on_startup_callback(None)
>       return func(*args, **kwargs)

.../blueprints/tests/__init__.py:43: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_oidc.TestProviderOAuth2OIDC testMethod=test_authorization_consent_implied>

    @retry()
    @apply_blueprint(
        "default/flow-default-authentication-flow.yaml",
        "default/flow-default-invalidation-flow.yaml",
    )
    @apply_blueprint("default/flow-default-provider-authorization-implicit-consent.yaml")
    @apply_blueprint("system/providers-oauth2.yaml")
    @reconcile_app("authentik_crypto")
    def test_authorization_consent_implied(self):
        """test OpenID Provider flow (default authorization flow with implied consent)
        (due to offline_access a consent will still be triggered)"""
        sleep(1)
        # Bootstrap all needed objects
        authorization_flow = Flow.objects.get(
            slug="default-provider-authorization-implicit-consent"
        )
        provider = OAuth2Provider.objects.create(
            name=self.application_slug,
            client_type=ClientTypes.CONFIDENTIAL,
            client_id=self.client_id,
            client_secret=self.client_secret,
            signing_key=create_test_cert(),
            redirect_uris=[
                RedirectURI(RedirectURIMatchingMode.STRICT, "http://localhost:9009/auth/callback")
            ],
            authorization_flow=authorization_flow,
        )
        provider.property_mappings.set(
            ScopeMapping.objects.filter(
                scope_name__in=[
                    SCOPE_OPENID,
                    SCOPE_OPENID_EMAIL,
                    SCOPE_OPENID_PROFILE,
                    SCOPE_OFFLINE_ACCESS,
                ]
            )
        )
        app = Application.objects.create(
            name=self.application_slug,
            slug=self.application_slug,
            provider=provider,
        )
        self.setup_client()
    
        self.driver.get("http://localhost:9009")
        self.login()
    
        self.wait.until(ec.presence_of_element_located((By.CSS_SELECTOR, "ak-flow-executor")))
    
        flow_executor = self.get_shadow_root("ak-flow-executor")
>       consent_stage = self.get_shadow_root("ak-stage-consent", flow_executor)

tests/e2e/test_provider_oidc.py:153: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_oidc.TestProviderOAuth2OIDC testMethod=test_authorization_consent_implied>
selector = 'ak-stage-consent'
container = <selenium.webdriver.remote.shadowroot.ShadowRoot (session="ae769faa1ae734bf8fe33b41288e860b", element="f.32978E48537DD2F62C363E72F812D905.d.93A04530C27B63A63581EEBBE5AD968E.e.6")>
timeout = 10

    def get_shadow_root(
        self, selector: str, container: WebElement | WebDriver | None = None, timeout: float = 10
    ) -> WebElement:
        """Get the shadow root of a web component specified by `selector`."""
        if not container:
            container = self.driver
        wait = WebDriverWait(container, timeout)
        host: WebElement | None = None
    
        try:
>           host = wait.until(lambda c: c.find_element(By.CSS_SELECTOR, selector))

tests/e2e/utils.py:347: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <[AttributeError("'ShadowRoot' object has no attribute 'session_id'") raised in repr()] WebDriverWait object at 0x7f3878e112b0>
method = <function SeleniumTestCase.get_shadow_root.<locals>.<lambda> at 0x7f38787d7880>
message = ''

    def until(self, method: Callable[[D], Union[Literal[False], T]], message: str = "") -> T:
        """Wait until the method returns a value that is not False.
    
        Calls the method provided with the driver as an argument until the
        return value does not evaluate to ``False``.
    
        Parameters:
        -----------
        method: callable(WebDriver)
            - A callable object that takes a WebDriver instance as an argument.
    
        message: str
            - Optional message for :exc:`TimeoutException`
    
        Return:
        -------
        object: T
            - The result of the last call to `method`
    
        Raises:
        -------
        TimeoutException
            - If 'method' does not return a truthy value within the WebDriverWait
            object's timeout
    
        Example:
        --------
        >>> from selenium.webdriver.common.by import By
        >>> from selenium.webdriver.support.ui import WebDriverWait
        >>> from selenium.webdriver.support import expected_conditions as EC
    
        # Wait until an element is visible on the page
        >>> wait = WebDriverWait(driver, 10)
        >>> element = wait.until(EC.visibility_of_element_located((By.ID, "exampleId")))
        >>> print(element.text)
        """
        screen = None
        stacktrace = None
    
        end_time = time.monotonic() + self._timeout
        while True:
            try:
>               value = method(self._driver)

.venv/lib/python3.13.../webdriver/support/wait.py:137: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

c = <selenium.webdriver.remote.shadowroot.ShadowRoot (session="ae769faa1ae734bf8fe33b41288e860b", element="f.32978E48537DD2F62C363E72F812D905.d.93A04530C27B63A63581EEBBE5AD968E.e.6")>

>   host = wait.until(lambda c: c.find_element(By.CSS_SELECTOR, selector))

tests/e2e/utils.py:347: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <selenium.webdriver.remote.shadowroot.ShadowRoot (session="ae769faa1ae734bf8fe33b41288e860b", element="f.32978E48537DD2F62C363E72F812D905.d.93A04530C27B63A63581EEBBE5AD968E.e.6")>
by = 'css selector', value = 'ak-stage-consent'

    def find_element(self, by: str = By.ID, value: str = None):
        """Find an element inside a shadow root given a By strategy and
        locator.
    
        Parameters:
        -----------
        by : selenium.webdriver.common.by.By
            The locating strategy to use. Default is `By.ID`. Supported values include:
            - By.ID: Locate by element ID.
            - By.NAME: Locate by the `name` attribute.
            - By.XPATH: Locate by an XPath expression.
            - By.CSS_SELECTOR: Locate by a CSS selector.
            - By.CLASS_NAME: Locate by the `class` attribute.
            - By.TAG_NAME: Locate by the tag name (e.g., "input", "button").
            - By.LINK_TEXT: Locate a link element by its exact text.
            - By.PARTIAL_LINK_TEXT: Locate a link element by partial text match.
            - RelativeBy: Locate elements relative to a specified root element.
    
        Example:
        --------
        element = driver.find_element(By.ID, 'foo')
    
        Returns:
        -------
        WebElement
            The first matching `WebElement` found on the page.
        """
        if by == By.ID:
            by = By.CSS_SELECTOR
            value = f'[id="{value}"]'
        elif by == By.CLASS_NAME:
            by = By.CSS_SELECTOR
            value = f".{value}"
        elif by == By.NAME:
            by = By.CSS_SELECTOR
            value = f'[name="{value}"]'
    
>       return self._execute(Command.FIND_ELEMENT_FROM_SHADOW_ROOT, {"using": by, "value": value})["value"]

.venv/lib/python3.13.../webdriver/remote/shadowroot.py:79: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <selenium.webdriver.remote.shadowroot.ShadowRoot (session="ae769faa1ae734bf8fe33b41288e860b", element="f.32978E48537DD2F62C363E72F812D905.d.93A04530C27B63A63581EEBBE5AD968E.e.6")>
command = 'findElementFromShadowRoot'
params = {'shadowId': 'f.32978E48537DD2F62C363E72F812D905.d.93A04530C27B63A63581EEBBE5AD968E.e.6', 'using': 'css selector', 'value': 'ak-stage-consent'}

    def _execute(self, command, params=None):
        """Executes a command against the underlying HTML element.
    
        Args:
          command: The name of the command to _execute as a string.
          params: A dictionary of named parameters to send with the command.
    
        Returns:
          The command's JSON response loaded into a dictionary object.
        """
        if not params:
            params = {}
        params["shadowId"] = self._id
>       return self.session.execute(command, params)

.venv/lib/python3.13.../webdriver/remote/shadowroot.py:133: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <selenium.webdriver.remote.webdriver.WebDriver (session="ae769faa1ae734bf8fe33b41288e860b")>
driver_command = 'findElementFromShadowRoot'
params = {'using': 'css selector', 'value': 'ak-stage-consent'}

    def execute(self, driver_command: str, params: dict = None) -> dict:
        """Sends a command to be executed by a command.CommandExecutor.
    
        Parameters:
        -----------
        driver_command : str
            - The name of the command to execute as a string.
    
        params : dict
            - A dictionary of named Parameters to send with the command.
    
        Returns:
        --------
          dict - The command's JSON response loaded into a dictionary object.
        """
        params = self._wrap_value(params)
    
        if self.session_id:
            if not params:
                params = {"sessionId": self.session_id}
            elif "sessionId" not in params:
                params["sessionId"] = self.session_id
    
        response = self.command_executor.execute(driver_command, params)
        if response:
>           self.error_handler.check_response(response)

.venv/lib/python3.13.../webdriver/remote/webdriver.py:448: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <selenium.webdriver.remote.errorhandler.ErrorHandler object at 0x7f3878869040>
response = {'status': 404, 'value': '{"value":{"error":"detached shadow root","message":"detached shadow root: detached shadow ro...\\n#27 0x55903072dcd5 \\u003Cunknown>\\n#28 0x55903073f709 \\u003Cunknown>\\n#29 0x7f3188996428 \\u003Cunknown>\\n"}}'}

    def check_response(self, response: Dict[str, Any]) -> None:
        """Checks that a JSON response from the WebDriver does not have an
        error.
    
        :Args:
         - response - The JSON response from the WebDriver server as a dictionary
           object.
    
        :Raises: If the response contains an error message.
        """
        status = response.get("status", None)
        if not status or status == ErrorCode.SUCCESS:
            return
        value = None
        message = response.get("message", "")
        screen: str = response.get("screen", "")
        stacktrace = None
        if isinstance(status, int):
            value_json = response.get("value", None)
            if value_json and isinstance(value_json, str):
                import json
    
                try:
                    value = json.loads(value_json)
                    if len(value) == 1:
                        value = value["value"]
                    status = value.get("error", None)
                    if not status:
                        status = value.get("status", ErrorCode.UNKNOWN_ERROR)
                        message = value.get("value") or value.get("message")
                        if not isinstance(message, str):
                            value = message
                            message = message.get("message")
                    else:
                        message = value.get("message", None)
                except ValueError:
                    pass
    
        exception_class: Type[WebDriverException]
        e = ErrorCode()
        error_codes = [item for item in dir(e) if not item.startswith("__")]
        for error_code in error_codes:
            error_info = getattr(ErrorCode, error_code)
            if isinstance(error_info, list) and status in error_info:
                exception_class = getattr(ExceptionMapping, error_code, WebDriverException)
                break
        else:
            exception_class = WebDriverException
    
        if not value:
            value = response["value"]
        if isinstance(value, str):
            raise exception_class(value)
        if message == "" and "message" in value:
            message = value["message"]
    
        screen = None  # type: ignore[assignment]
        if "screen" in value:
            screen = value["screen"]
    
        stacktrace = None
        st_value = value.get("stackTrace") or value.get("stacktrace")
        if st_value:
            if isinstance(st_value, str):
                stacktrace = st_value.split("\n")
            else:
                stacktrace = []
                try:
                    for frame in st_value:
                        line = frame.get("lineNumber", "")
                        file = frame.get("fileName", "<anonymous>")
                        if line:
                            file = f"{file}:{line}"
                        meth = frame.get("methodName", "<anonymous>")
                        if "className" in frame:
                            meth = f"{frame['className']}.{meth}"
                        msg = "    at %s (%s)"
                        msg = msg % (meth, file)
                        stacktrace.append(msg)
                except TypeError:
                    pass
        if exception_class == UnexpectedAlertPresentException:
            alert_text = None
            if "data" in value:
                alert_text = value["data"].get("text")
            elif "alert" in value:
                alert_text = value["alert"].get("text")
            raise exception_class(message, screen, stacktrace, alert_text)  # type: ignore[call-arg]  # mypy is not smart enough here
>       raise exception_class(message, screen, stacktrace)
E       selenium.common.exceptions.DetachedShadowRootException: Message: detached shadow root: detached shadow root not found
E         (Session info: chrome=143.0.7499.169)
E       Stacktrace:
E       #0 0x559030740432 <unknown>
E       #1 0x55903018571b <unknown>
E       #2 0x559030199207 <unknown>
E       #3 0x559030197e10 <unknown>
E       #4 0x559030199719 <unknown>
E       #5 0x55903018cc64 <unknown>
E       #6 0x55903018b873 <unknown>
E       #7 0x55903018ed6f <unknown>
E       #8 0x55903018ee55 <unknown>
E       #9 0x5590301d4da4 <unknown>
E       #10 0x5590301d57d5 <unknown>
E       #11 0x5590301c9caa <unknown>
E       #12 0x5590301f9ee1 <unknown>
E       #13 0x5590301c9b81 <unknown>
E       #14 0x5590301fa0a2 <unknown>
E       #15 0x55903021bb62 <unknown>
E       #16 0x5590301f9c67 <unknown>
E       #17 0x5590301c8047 <unknown>
E       #18 0x5590301c8e15 <unknown>
E       #19 0x55903070b064 <unknown>
E       #20 0x55903070e354 <unknown>
E       #21 0x55903070de0e <unknown>
E       #22 0x55903070e7e9 <unknown>
E       #23 0x5590306f4a7a <unknown>
E       #24 0x55903070eb5a <unknown>
E       #25 0x5590306dd629 <unknown>
E       #26 0x55903072dae9 <unknown>
E       #27 0x55903072dcd5 <unknown>
E       #28 0x55903073f709 <unknown>
E       #29 0x7f3188996428 <unknown>

.venv/lib/python3.13.../webdriver/remote/errorhandler.py:232: DetachedShadowRootException
tests.e2e.test_provider_oidc.TestProviderOAuth2OIDC::test_authorization_consent_explicit
Stack Traces | 54.5s run time
self = <tests.e2e.test_provider_oidc.TestProviderOAuth2OIDC testMethod=test_authorization_consent_explicit>
args = (), kwargs = {}

    @wraps(func)
    def wrapper(self: TransactionTestCase, *args, **kwargs):
        """Run test again if we're below max_retries, including tearDown and
        setUp. Otherwise raise the error"""
        nonlocal count
        try:
>           return func(self, *args, **kwargs)

tests/e2e/utils.py:492: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

args = (<tests.e2e.test_provider_oidc.TestProviderOAuth2OIDC testMethod=test_authorization_consent_explicit>,)
kwargs = {}, file = 'default/flow-default-invalidation-flow.yaml'
content = 'version: 1\nmetadata:\n  name: Default - Invalidation flow\nentries:\n- attrs:\n    designation: invalidation\n    na...0\n    stage: !KeyOf default-invalidation-logout\n    target: !KeyOf flow\n  model: authentik_flows.flowstagebinding\n'

    @wraps(func)
    def wrapper(*args, **kwargs):
        for file in files:
            content = BlueprintInstance(path=file).retrieve()
            Importer.from_string(content).apply()
>       return func(*args, **kwargs)

.../blueprints/tests/__init__.py:25: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

args = (<tests.e2e.test_provider_oidc.TestProviderOAuth2OIDC testMethod=test_authorization_consent_explicit>,)
kwargs = {}
file = 'default/flow-default-provider-authorization-explicit-consent.yaml'
content = 'version: 1\nmetadata:\n  name: Default - Provider authorization flow (explicit consent)\nentries:\n- attrs:\n    desi...e: !KeyOf default-provider-authorization-consent\n    target: !KeyOf flow\n  model: authentik_flows.flowstagebinding\n'

    @wraps(func)
    def wrapper(*args, **kwargs):
        for file in files:
            content = BlueprintInstance(path=file).retrieve()
            Importer.from_string(content).apply()
>       return func(*args, **kwargs)

.../blueprints/tests/__init__.py:25: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

args = (<tests.e2e.test_provider_oidc.TestProviderOAuth2OIDC testMethod=test_authorization_consent_explicit>,)
kwargs = {}, file = 'system/providers-oauth2.yaml'
content = 'version: 1\nmetadata:\n  labels:\n    blueprints.goauthentik.io/system: "true"\n  name: System - OAuth2 Provider - Sc... application the ability to access the authentik API\n        # on behalf of the authorizing user\n        return {}\n'

    @wraps(func)
    def wrapper(*args, **kwargs):
        for file in files:
            content = BlueprintInstance(path=file).retrieve()
            Importer.from_string(content).apply()
>       return func(*args, **kwargs)

.../blueprints/tests/__init__.py:25: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

args = (<tests.e2e.test_provider_oidc.TestProviderOAuth2OIDC testMethod=test_authorization_consent_explicit>,)
kwargs = {}, config = <AuthentikCryptoConfig: authentik_crypto>

    @wraps(func)
    def wrapper(*args, **kwargs):
        config = apps.get_app_config(app_name)
        if isinstance(config, ManagedAppConfig):
            config._on_startup_callback(None)
>       return func(*args, **kwargs)

.../blueprints/tests/__init__.py:43: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_oidc.TestProviderOAuth2OIDC testMethod=test_authorization_consent_explicit>

    @retry()
    @apply_blueprint(
        "default/flow-default-authentication-flow.yaml",
        "default/flow-default-invalidation-flow.yaml",
    )
    @apply_blueprint("default/flow-default-provider-authorization-explicit-consent.yaml")
    @apply_blueprint("system/providers-oauth2.yaml")
    @reconcile_app("authentik_crypto")
    def test_authorization_consent_explicit(self):
        """test OpenID Provider flow (default authorization flow with explicit consent)"""
        sleep(1)
        # Bootstrap all needed objects
        authorization_flow = Flow.objects.get(
            slug="default-provider-authorization-explicit-consent"
        )
        provider = OAuth2Provider.objects.create(
            name=self.application_slug,
            authorization_flow=authorization_flow,
            client_type=ClientTypes.CONFIDENTIAL,
            client_id=self.client_id,
            client_secret=self.client_secret,
            signing_key=create_test_cert(),
            redirect_uris=[
                RedirectURI(RedirectURIMatchingMode.STRICT, "http://localhost:9009/auth/callback")
            ],
        )
        provider.property_mappings.set(
            ScopeMapping.objects.filter(
                scope_name__in=[
                    SCOPE_OPENID,
                    SCOPE_OPENID_EMAIL,
                    SCOPE_OPENID_PROFILE,
                    SCOPE_OFFLINE_ACCESS,
                ]
            )
        )
        app = Application.objects.create(
            name=self.application_slug,
            slug=self.application_slug,
            provider=provider,
        )
        self.setup_client()
    
        self.driver.get("http://localhost:9009")
        self.login()
    
        self.wait.until(ec.presence_of_element_located((By.CSS_SELECTOR, "ak-flow-executor")))
    
        flow_executor = self.get_shadow_root("ak-flow-executor")
>       consent_stage = self.get_shadow_root("ak-stage-consent", flow_executor)

tests/e2e/test_provider_oidc.py:264: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_oidc.TestProviderOAuth2OIDC testMethod=test_authorization_consent_explicit>
selector = 'ak-stage-consent'
container = <selenium.webdriver.remote.shadowroot.ShadowRoot (session="11b8b6b58091b3dadd2b4b239c65a617", element="f.1CE40D653EBEBC2D35D38A54254B5271.d.39D04AEC5186EB96B912484D03021710.e.6")>
timeout = 10

    def get_shadow_root(
        self, selector: str, container: WebElement | WebDriver | None = None, timeout: float = 10
    ) -> WebElement:
        """Get the shadow root of a web component specified by `selector`."""
        if not container:
            container = self.driver
        wait = WebDriverWait(container, timeout)
        host: WebElement | None = None
    
        try:
>           host = wait.until(lambda c: c.find_element(By.CSS_SELECTOR, selector))

tests/e2e/utils.py:347: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <[AttributeError("'ShadowRoot' object has no attribute 'session_id'") raised in repr()] WebDriverWait object at 0x7f38793a7f50>
method = <function SeleniumTestCase.get_shadow_root.<locals>.<lambda> at 0x7f38787e3a60>
message = ''

    def until(self, method: Callable[[D], Union[Literal[False], T]], message: str = "") -> T:
        """Wait until the method returns a value that is not False.
    
        Calls the method provided with the driver as an argument until the
        return value does not evaluate to ``False``.
    
        Parameters:
        -----------
        method: callable(WebDriver)
            - A callable object that takes a WebDriver instance as an argument.
    
        message: str
            - Optional message for :exc:`TimeoutException`
    
        Return:
        -------
        object: T
            - The result of the last call to `method`
    
        Raises:
        -------
        TimeoutException
            - If 'method' does not return a truthy value within the WebDriverWait
            object's timeout
    
        Example:
        --------
        >>> from selenium.webdriver.common.by import By
        >>> from selenium.webdriver.support.ui import WebDriverWait
        >>> from selenium.webdriver.support import expected_conditions as EC
    
        # Wait until an element is visible on the page
        >>> wait = WebDriverWait(driver, 10)
        >>> element = wait.until(EC.visibility_of_element_located((By.ID, "exampleId")))
        >>> print(element.text)
        """
        screen = None
        stacktrace = None
    
        end_time = time.monotonic() + self._timeout
        while True:
            try:
>               value = method(self._driver)

.venv/lib/python3.13.../webdriver/support/wait.py:137: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

c = <selenium.webdriver.remote.shadowroot.ShadowRoot (session="11b8b6b58091b3dadd2b4b239c65a617", element="f.1CE40D653EBEBC2D35D38A54254B5271.d.39D04AEC5186EB96B912484D03021710.e.6")>

>   host = wait.until(lambda c: c.find_element(By.CSS_SELECTOR, selector))

tests/e2e/utils.py:347: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <selenium.webdriver.remote.shadowroot.ShadowRoot (session="11b8b6b58091b3dadd2b4b239c65a617", element="f.1CE40D653EBEBC2D35D38A54254B5271.d.39D04AEC5186EB96B912484D03021710.e.6")>
by = 'css selector', value = 'ak-stage-consent'

    def find_element(self, by: str = By.ID, value: str = None):
        """Find an element inside a shadow root given a By strategy and
        locator.
    
        Parameters:
        -----------
        by : selenium.webdriver.common.by.By
            The locating strategy to use. Default is `By.ID`. Supported values include:
            - By.ID: Locate by element ID.
            - By.NAME: Locate by the `name` attribute.
            - By.XPATH: Locate by an XPath expression.
            - By.CSS_SELECTOR: Locate by a CSS selector.
            - By.CLASS_NAME: Locate by the `class` attribute.
            - By.TAG_NAME: Locate by the tag name (e.g., "input", "button").
            - By.LINK_TEXT: Locate a link element by its exact text.
            - By.PARTIAL_LINK_TEXT: Locate a link element by partial text match.
            - RelativeBy: Locate elements relative to a specified root element.
    
        Example:
        --------
        element = driver.find_element(By.ID, 'foo')
    
        Returns:
        -------
        WebElement
            The first matching `WebElement` found on the page.
        """
        if by == By.ID:
            by = By.CSS_SELECTOR
            value = f'[id="{value}"]'
        elif by == By.CLASS_NAME:
            by = By.CSS_SELECTOR
            value = f".{value}"
        elif by == By.NAME:
            by = By.CSS_SELECTOR
            value = f'[name="{value}"]'
    
>       return self._execute(Command.FIND_ELEMENT_FROM_SHADOW_ROOT, {"using": by, "value": value})["value"]

.venv/lib/python3.13.../webdriver/remote/shadowroot.py:79: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <selenium.webdriver.remote.shadowroot.ShadowRoot (session="11b8b6b58091b3dadd2b4b239c65a617", element="f.1CE40D653EBEBC2D35D38A54254B5271.d.39D04AEC5186EB96B912484D03021710.e.6")>
command = 'findElementFromShadowRoot'
params = {'shadowId': 'f.1CE40D653EBEBC2D35D38A54254B5271.d.39D04AEC5186EB96B912484D03021710.e.6', 'using': 'css selector', 'value': 'ak-stage-consent'}

    def _execute(self, command, params=None):
        """Executes a command against the underlying HTML element.
    
        Args:
          command: The name of the command to _execute as a string.
          params: A dictionary of named parameters to send with the command.
    
        Returns:
          The command's JSON response loaded into a dictionary object.
        """
        if not params:
            params = {}
        params["shadowId"] = self._id
>       return self.session.execute(command, params)

.venv/lib/python3.13.../webdriver/remote/shadowroot.py:133: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <selenium.webdriver.remote.webdriver.WebDriver (session="11b8b6b58091b3dadd2b4b239c65a617")>
driver_command = 'findElementFromShadowRoot'
params = {'using': 'css selector', 'value': 'ak-stage-consent'}

    def execute(self, driver_command: str, params: dict = None) -> dict:
        """Sends a command to be executed by a command.CommandExecutor.
    
        Parameters:
        -----------
        driver_command : str
            - The name of the command to execute as a string.
    
        params : dict
            - A dictionary of named Parameters to send with the command.
    
        Returns:
        --------
          dict - The command's JSON response loaded into a dictionary object.
        """
        params = self._wrap_value(params)
    
        if self.session_id:
            if not params:
                params = {"sessionId": self.session_id}
            elif "sessionId" not in params:
                params["sessionId"] = self.session_id
    
        response = self.command_executor.execute(driver_command, params)
        if response:
>           self.error_handler.check_response(response)

.venv/lib/python3.13.../webdriver/remote/webdriver.py:448: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <selenium.webdriver.remote.errorhandler.ErrorHandler object at 0x7f3878831850>
response = {'status': 404, 'value': '{"value":{"error":"detached shadow root","message":"detached shadow root: detached shadow ro...\\n#27 0x55c966dd3cd5 \\u003Cunknown>\\n#28 0x55c966de5709 \\u003Cunknown>\\n#29 0x7fe3a44f3428 \\u003Cunknown>\\n"}}'}

    def check_response(self, response: Dict[str, Any]) -> None:
        """Checks that a JSON response from the WebDriver does not have an
        error.
    
        :Args:
         - response - The JSON response from the WebDriver server as a dictionary
           object.
    
        :Raises: If the response contains an error message.
        """
        status = response.get("status", None)
        if not status or status == ErrorCode.SUCCESS:
            return
        value = None
        message = response.get("message", "")
        screen: str = response.get("screen", "")
        stacktrace = None
        if isinstance(status, int):
            value_json = response.get("value", None)
            if value_json and isinstance(value_json, str):
                import json
    
                try:
                    value = json.loads(value_json)
                    if len(value) == 1:
                        value = value["value"]
                    status = value.get("error", None)
                    if not status:
                        status = value.get("status", ErrorCode.UNKNOWN_ERROR)
                        message = value.get("value") or value.get("message")
                        if not isinstance(message, str):
                            value = message
                            message = message.get("message")
                    else:
                        message = value.get("message", None)
                except ValueError:
                    pass
    
        exception_class: Type[WebDriverException]
        e = ErrorCode()
        error_codes = [item for item in dir(e) if not item.startswith("__")]
        for error_code in error_codes:
            error_info = getattr(ErrorCode, error_code)
            if isinstance(error_info, list) and status in error_info:
                exception_class = getattr(ExceptionMapping, error_code, WebDriverException)
                break
        else:
            exception_class = WebDriverException
    
        if not value:
            value = response["value"]
        if isinstance(value, str):
            raise exception_class(value)
        if message == "" and "message" in value:
            message = value["message"]
    
        screen = None  # type: ignore[assignment]
        if "screen" in value:
            screen = value["screen"]
    
        stacktrace = None
        st_value = value.get("stackTrace") or value.get("stacktrace")
        if st_value:
            if isinstance(st_value, str):
                stacktrace = st_value.split("\n")
            else:
                stacktrace = []
                try:
                    for frame in st_value:
                        line = frame.get("lineNumber", "")
                        file = frame.get("fileName", "<anonymous>")
                        if line:
                            file = f"{file}:{line}"
                        meth = frame.get("methodName", "<anonymous>")
                        if "className" in frame:
                            meth = f"{frame['className']}.{meth}"
                        msg = "    at %s (%s)"
                        msg = msg % (meth, file)
                        stacktrace.append(msg)
                except TypeError:
                    pass
        if exception_class == UnexpectedAlertPresentException:
            alert_text = None
            if "data" in value:
                alert_text = value["data"].get("text")
            elif "alert" in value:
                alert_text = value["alert"].get("text")
            raise exception_class(message, screen, stacktrace, alert_text)  # type: ignore[call-arg]  # mypy is not smart enough here
>       raise exception_class(message, screen, stacktrace)
E       selenium.common.exceptions.DetachedShadowRootException: Message: detached shadow root: detached shadow root not found
E         (Session info: chrome=143.0.7499.169)
E       Stacktrace:
E       #0 0x55c966de6432 <unknown>
E       #1 0x55c96682b71b <unknown>
E       #2 0x55c96683f207 <unknown>
E       #3 0x55c96683de10 <unknown>
E       #4 0x55c96683f719 <unknown>
E       #5 0x55c966832c64 <unknown>
E       #6 0x55c966831873 <unknown>
E       #7 0x55c966834d6f <unknown>
E       #8 0x55c966834e55 <unknown>
E       #9 0x55c96687ada4 <unknown>
E       #10 0x55c96687b7d5 <unknown>
E       #11 0x55c96686fcaa <unknown>
E       #12 0x55c96689fee1 <unknown>
E       #13 0x55c96686fb81 <unknown>
E       #14 0x55c9668a00a2 <unknown>
E       #15 0x55c9668c1b62 <unknown>
E       #16 0x55c96689fc67 <unknown>
E       #17 0x55c96686e047 <unknown>
E       #18 0x55c96686ee15 <unknown>
E       #19 0x55c966db1064 <unknown>
E       #20 0x55c966db4354 <unknown>
E       #21 0x55c966db3e0e <unknown>
E       #22 0x55c966db47e9 <unknown>
E       #23 0x55c966d9aa7a <unknown>
E       #24 0x55c966db4b5a <unknown>
E       #25 0x55c966d83629 <unknown>
E       #26 0x55c966dd3ae9 <unknown>
E       #27 0x55c966dd3cd5 <unknown>
E       #28 0x55c966de5709 <unknown>
E       #29 0x7fe3a44f3428 <unknown>

.venv/lib/python3.13.../webdriver/remote/errorhandler.py:232: DetachedShadowRootException

During handling of the above exception, another exception occurred:

self = <tests.e2e.test_provider_oidc.TestProviderOAuth2OIDC testMethod=test_authorization_consent_explicit>
args = (), kwargs = {}

    @wraps(func)
    def wrapper(self: TransactionTestCase, *args, **kwargs):
        """Run test again if we're below max_retries, including tearDown and
        setUp. Otherwise raise the error"""
        nonlocal count
        try:
>           return func(self, *args, **kwargs)

tests/e2e/utils.py:492: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

args = (<tests.e2e.test_provider_oidc.TestProviderOAuth2OIDC testMethod=test_authorization_consent_explicit>,)
kwargs = {}, file = 'default/flow-default-invalidation-flow.yaml'
content = 'version: 1\nmetadata:\n  name: Default - Invalidation flow\nentries:\n- attrs:\n    designation: invalidation\n    na...0\n    stage: !KeyOf default-invalidation-logout\n    target: !KeyOf flow\n  model: authentik_flows.flowstagebinding\n'

    @wraps(func)
    def wrapper(*args, **kwargs):
        for file in files:
            content = BlueprintInstance(path=file).retrieve()
            Importer.from_string(content).apply()
>       return func(*args, **kwargs)

.../blueprints/tests/__init__.py:25: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

args = (<tests.e2e.test_provider_oidc.TestProviderOAuth2OIDC testMethod=test_authorization_consent_explicit>,)
kwargs = {}
file = 'default/flow-default-provider-authorization-explicit-consent.yaml'
content = 'version: 1\nmetadata:\n  name: Default - Provider authorization flow (explicit consent)\nentries:\n- attrs:\n    desi...e: !KeyOf default-provider-authorization-consent\n    target: !KeyOf flow\n  model: authentik_flows.flowstagebinding\n'

    @wraps(func)
    def wrapper(*args, **kwargs):
        for file in files:
            content = BlueprintInstance(path=file).retrieve()
            Importer.from_string(content).apply()
>       return func(*args, **kwargs)

.../blueprints/tests/__init__.py:25: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

args = (<tests.e2e.test_provider_oidc.TestProviderOAuth2OIDC testMethod=test_authorization_consent_explicit>,)
kwargs = {}, file = 'system/providers-oauth2.yaml'
content = 'version: 1\nmetadata:\n  labels:\n    blueprints.goauthentik.io/system: "true"\n  name: System - OAuth2 Provider - Sc... application the ability to access the authentik API\n        # on behalf of the authorizing user\n        return {}\n'

    @wraps(func)
    def wrapper(*args, **kwargs):
        for file in files:
            content = BlueprintInstance(path=file).retrieve()
            Importer.from_string(content).apply()
>       return func(*args, **kwargs)

.../blueprints/tests/__init__.py:25: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

args = (<tests.e2e.test_provider_oidc.TestProviderOAuth2OIDC testMethod=test_authorization_consent_explicit>,)
kwargs = {}, config = <AuthentikCryptoConfig: authentik_crypto>

    @wraps(func)
    def wrapper(*args, **kwargs):
        config = apps.get_app_config(app_name)
        if isinstance(config, ManagedAppConfig):
            config._on_startup_callback(None)
>       return func(*args, **kwargs)

.../blueprints/tests/__init__.py:43: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_oidc.TestProviderOAuth2OIDC testMethod=test_authorization_consent_explicit>

    @retry()
    @apply_blueprint(
        "default/flow-default-authentication-flow.yaml",
        "default/flow-default-invalidation-flow.yaml",
    )
    @apply_blueprint("default/flow-default-provider-authorization-explicit-consent.yaml")
    @apply_blueprint("system/providers-oauth2.yaml")
    @reconcile_app("authentik_crypto")
    def test_authorization_consent_explicit(self):
        """test OpenID Provider flow (default authorization flow with explicit consent)"""
        sleep(1)
        # Bootstrap all needed objects
        authorization_flow = Flow.objects.get(
            slug="default-provider-authorization-explicit-consent"
        )
        provider = OAuth2Provider.objects.create(
            name=self.application_slug,
            authorization_flow=authorization_flow,
            client_type=ClientTypes.CONFIDENTIAL,
            client_id=self.client_id,
            client_secret=self.client_secret,
            signing_key=create_test_cert(),
            redirect_uris=[
                RedirectURI(RedirectURIMatchingMode.STRICT, "http://localhost:9009/auth/callback")
            ],
        )
        provider.property_mappings.set(
            ScopeMapping.objects.filter(
                scope_name__in=[
                    SCOPE_OPENID,
                    SCOPE_OPENID_EMAIL,
                    SCOPE_OPENID_PROFILE,
                    SCOPE_OFFLINE_ACCESS,
                ]
            )
        )
        app = Application.objects.create(
            name=self.application_slug,
            slug=self.application_slug,
            provider=provider,
        )
        self.setup_client()
    
        self.driver.get("http://localhost:9009")
        self.login()
    
        self.wait.until(ec.presence_of_element_located((By.CSS_SELECTOR, "ak-flow-executor")))
    
        flow_executor = self.get_shadow_root("ak-flow-executor")
>       consent_stage = self.get_shadow_root("ak-stage-consent", flow_executor)

tests/e2e/test_provider_oidc.py:264: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_oidc.TestProviderOAuth2OIDC testMethod=test_authorization_consent_explicit>
selector = 'ak-stage-consent'
container = <selenium.webdriver.remote.shadowroot.ShadowRoot (session="d6bd2a4b65c25d5ff5eac43bbcef8e48", element="f.EBCD6AE0897BD90687D9F21AE0545AF0.d.ACEA76AF78B1BC9B50753CEB2CBC2571.e.6")>
timeout = 10

    def get_shadow_root(
        self, selector: str, container: WebElement | WebDriver | None = None, timeout: float = 10
    ) -> WebElement:
        """Get the shadow root of a web component specified by `selector`."""
        if not container:
            container = self.driver
        wait = WebDriverWait(container, timeout)
        host: WebElement | None = None
    
        try:
>           host = wait.until(lambda c: c.find_element(By.CSS_SELECTOR, selector))

tests/e2e/utils.py:347: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <[AttributeError("'ShadowRoot' object has no attribute 'session_id'") raised in repr()] WebDriverWait object at 0x7f3878871e10>
method = <function SeleniumTestCase.get_shadow_root.<locals>.<lambda> at 0x7f3877d97d80>
message = ''

    def until(self, method: Callable[[D], Union[Literal[False], T]], message: str = "") -> T:
        """Wait until the method returns a value that is not False.
    
        Calls the method provided with the driver as an argument until the
        return value does not evaluate to ``False``.
    
        Parameters:
        -----------
        method: callable(WebDriver)
            - A callable object that takes a WebDriver instance as an argument.
    
        message: str
            - Optional message for :exc:`TimeoutException`
    
        Return:
        -------
        object: T
            - The result of the last call to `method`
    
        Raises:
        -------
        TimeoutException
            - If 'method' does not return a truthy value within the WebDriverWait
            object's timeout
    
        Example:
        --------
        >>> from selenium.webdriver.common.by import By
        >>> from selenium.webdriver.support.ui import WebDriverWait
        >>> from selenium.webdriver.support import expected_conditions as EC
    
        # Wait until an element is visible on the page
        >>> wait = WebDriverWait(driver, 10)
        >>> element = wait.until(EC.visibility_of_element_located((By.ID, "exampleId")))
        >>> print(element.text)
        """
        screen = None
        stacktrace = None
    
        end_time = time.monotonic() + self._timeout
        while True:
            try:
>               value = method(self._driver)

.venv/lib/python3.13.../webdriver/support/wait.py:137: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

c = <selenium.webdriver.remote.shadowroot.ShadowRoot (session="d6bd2a4b65c25d5ff5eac43bbcef8e48", element="f.EBCD6AE0897BD90687D9F21AE0545AF0.d.ACEA76AF78B1BC9B50753CEB2CBC2571.e.6")>

>   host = wait.until(lambda c: c.find_element(By.CSS_SELECTOR, selector))

tests/e2e/utils.py:347: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <selenium.webdriver.remote.shadowroot.ShadowRoot (session="d6bd2a4b65c25d5ff5eac43bbcef8e48", element="f.EBCD6AE0897BD90687D9F21AE0545AF0.d.ACEA76AF78B1BC9B50753CEB2CBC2571.e.6")>
by = 'css selector', value = 'ak-stage-consent'

    def find_element(self, by: str = By.ID, value: str = None):
        """Find an element inside a shadow root given a By strategy and
        locator.
    
        Parameters:
        -----------
        by : selenium.webdriver.common.by.By
            The locating strategy to use. Default is `By.ID`. Supported values include:
            - By.ID: Locate by element ID.
            - By.NAME: Locate by the `name` attribute.
            - By.XPATH: Locate by an XPath expression.
            - By.CSS_SELECTOR: Locate by a CSS selector.
            - By.CLASS_NAME: Locate by the `class` attribute.
            - By.TAG_NAME: Locate by the tag name (e.g., "input", "button").
            - By.LINK_TEXT: Locate a link element by its exact text.
            - By.PARTIAL_LINK_TEXT: Locate a link element by partial text match.
            - RelativeBy: Locate elements relative to a specified root element.
    
        Example:
        --------
        element = driver.find_element(By.ID, 'foo')
    
        Returns:
        -------
        WebElement
            The first matching `WebElement` found on the page.
        """
        if by == By.ID:
            by = By.CSS_SELECTOR
            value = f'[id="{value}"]'
        elif by == By.CLASS_NAME:
            by = By.CSS_SELECTOR
            value = f".{value}"
        elif by == By.NAME:
            by = By.CSS_SELECTOR
            value = f'[name="{value}"]'
    
>       return self._execute(Command.FIND_ELEMENT_FROM_SHADOW_ROOT, {"using": by, "value": value})["value"]

.venv/lib/python3.13.../webdriver/remote/shadowroot.py:79: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <selenium.webdriver.remote.shadowroot.ShadowRoot (session="d6bd2a4b65c25d5ff5eac43bbcef8e48", element="f.EBCD6AE0897BD90687D9F21AE0545AF0.d.ACEA76AF78B1BC9B50753CEB2CBC2571.e.6")>
command = 'findElementFromShadowRoot'
params = {'shadowId': 'f.EBCD6AE0897BD90687D9F21AE0545AF0.d.ACEA76AF78B1BC9B50753CEB2CBC2571.e.6', 'using': 'css selector', 'value': 'ak-stage-consent'}

    def _execute(self, command, params=None):
        """Executes a command against the underlying HTML element.
    
        Args:
          command: The name of the command to _execute as a string.
          params: A dictionary of named parameters to send with the command.
    
        Returns:
          The command's JSON response loaded into a dictionary object.
        """
        if not params:
            params = {}
        params["shadowId"] = self._id
>       return self.session.execute(command, params)

.venv/lib/python3.13.../webdriver/remote/shadowroot.py:133: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <selenium.webdriver.remote.webdriver.WebDriver (session="d6bd2a4b65c25d5ff5eac43bbcef8e48")>
driver_command = 'findElementFromShadowRoot'
params = {'using': 'css selector', 'value': 'ak-stage-consent'}

    def execute(self, driver_command: str, params: dict = None) -> dict:
        """Sends a command to be executed by a command.CommandExecutor.
    
        Parameters:
        -----------
        driver_command : str
            - The name of the command to execute as a string.
    
        params : dict
            - A dictionary of named Parameters to send with the command.
    
        Returns:
        --------
          dict - The command's JSON response loaded into a dictionary object.
        """
        params = self._wrap_value(params)
    
        if self.session_id:
            if not params:
                params = {"sessionId": self.session_id}
            elif "sessionId" not in params:
                params["sessionId"] = self.session_id
    
        response = self.command_executor.execute(driver_command, params)
        if response:
>           self.error_handler.check_response(response)

.venv/lib/python3.13.../webdriver/remote/webdriver.py:448: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <selenium.webdriver.remote.errorhandler.ErrorHandler object at 0x7f3879975c50>
response = {'status': 404, 'value': '{"value":{"error":"detached shadow root","message":"detached shadow root: detached shadow ro...\\n#27 0x55bf64242cd5 \\u003Cunknown>\\n#28 0x55bf64254709 \\u003Cunknown>\\n#29 0x7f5210cba428 \\u003Cunknown>\\n"}}'}

    def check_response(self, response: Dict[str, Any]) -> None:
        """Checks that a JSON response from the WebDriver does not have an
        error.
    
        :Args:
         - response - The JSON response from the WebDriver server as a dictionary
           object.
    
        :Raises: If the response contains an error message.
        """
        status = response.get("status", None)
        if not status or status == ErrorCode.SUCCESS:
            return
        value = None
        message = response.get("message", "")
        screen: str = response.get("screen", "")
        stacktrace = None
        if isinstance(status, int):
            value_json = response.get("value", None)
            if value_json and isinstance(value_json, str):
                import json
    
                try:
                    value = json.loads(value_json)
                    if len(value) == 1:
                        value = value["value"]
                    status = value.get("error", None)
                    if not status:
                        status = value.get("status", ErrorCode.UNKNOWN_ERROR)
                        message = value.get("value") or value.get("message")
                        if not isinstance(message, str):
                            value = message
                            message = message.get("message")
                    else:
                        message = value.get("message", None)
                except ValueError:
                    pass
    
        exception_class: Type[WebDriverException]
        e = ErrorCode()
        error_codes = [item for item in dir(e) if not item.startswith("__")]
        for error_code in error_codes:
            error_info = getattr(ErrorCode, error_code)
            if isinstance(error_info, list) and status in error_info:
                exception_class = getattr(ExceptionMapping, error_code, WebDriverException)
                break
        else:
            exception_class = WebDriverException
    
        if not value:
            value = response["value"]
        if isinstance(value, str):
            raise exception_class(value)
        if message == "" and "message" in value:
            message = value["message"]
    
        screen = None  # type: ignore[assignment]
        if "screen" in value:
            screen = value["screen"]
    
        stacktrace = None
        st_value = value.get("stackTrace") or value.get("stacktrace")
        if st_value:
            if isinstance(st_value, str):
                stacktrace = st_value.split("\n")
            else:
                stacktrace = []
                try:
                    for frame in st_value:
                        line = frame.get("lineNumber", "")
                        file = frame.get("fileName", "<anonymous>")
                        if line:
                            file = f"{file}:{line}"
                        meth = frame.get("methodName", "<anonymous>")
                        if "className" in frame:
                            meth = f"{frame['className']}.{meth}"
                        msg = "    at %s (%s)"
                        msg = msg % (meth, file)
                        stacktrace.append(msg)
                except TypeError:
                    pass
        if exception_class == UnexpectedAlertPresentException:
            alert_text = None
            if "data" in value:
                alert_text = value["data"].get("text")
            elif "alert" in value:
                alert_text = value["alert"].get("text")
            raise exception_class(message, screen, stacktrace, alert_text)  # type: ignore[call-arg]  # mypy is not smart enough here
>       raise exception_class(message, screen, stacktrace)
E       selenium.common.exceptions.DetachedShadowRootException: Message: detached shadow root: detached shadow root not found
E         (Session info: chrome=143.0.7499.169)
E       Stacktrace:
E       #0 0x55bf64255432 <unknown>
E       #1 0x55bf63c9a71b <unknown>
E       #2 0x55bf63cae207 <unknown>
E       #3 0x55bf63cace10 <unknown>
E       #4 0x55bf63cae719 <unknown>
E       #5 0x55bf63ca1c64 <unknown>
E       #6 0x55bf63ca0873 <unknown>
E       #7 0x55bf63ca3d6f <unknown>
E       #8 0x55bf63ca3e55 <unknown>
E       #9 0x55bf63ce9da4 <unknown>
E       #10 0x55bf63cea7d5 <unknown>
E       #11 0x55bf63cdecaa <unknown>
E       #12 0x55bf63d0eee1 <unknown>
E       #13 0x55bf63cdeb81 <unknown>
E       #14 0x55bf63d0f0a2 <unknown>
E       #15 0x55bf63d30b62 <unknown>
E       #16 0x55bf63d0ec67 <unknown>
E       #17 0x55bf63cdd047 <unknown>
E       #18 0x55bf63cdde15 <unknown>
E       #19 0x55bf64220064 <unknown>
E       #20 0x55bf64223354 <unknown>
E       #21 0x55bf64222e0e <unknown>
E       #22 0x55bf642237e9 <unknown>
E       #23 0x55bf64209a7a <unknown>
E       #24 0x55bf64223b5a <unknown>
E       #25 0x55bf641f2629 <unknown>
E       #26 0x55bf64242ae9 <unknown>
E       #27 0x55bf64242cd5 <unknown>
E       #28 0x55bf64254709 <unknown>
E       #29 0x7f5210cba428 <unknown>

.venv/lib/python3.13.../webdriver/remote/errorhandler.py:232: DetachedShadowRootException

During handling of the above exception, another exception occurred:

self = <unittest.case._Outcome object at 0x7f38788689e0>
test_case = <tests.e2e.test_provider_oidc.TestProviderOAuth2OIDC testMethod=test_authorization_consent_explicit>
subTest = False

    @contextlib.contextmanager
    def testPartExecutor(self, test_case, subTest=False):
        old_success = self.success
        self.success = True
        try:
>           yield

.../hostedtoolcache/Python/3.13.11........./x64/lib/python3.13/unittest/case.py:58: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_oidc.TestProviderOAuth2OIDC testMethod=test_authorization_consent_explicit>
result = <TestCaseFunction test_authorization_consent_explicit>

    def run(self, result=None):
        if result is None:
            result = self.defaultTestResult()
            startTestRun = getattr(result, 'startTestRun', None)
            stopTestRun = getattr(result, 'stopTestRun', None)
            if startTestRun is not None:
                startTestRun()
        else:
            stopTestRun = None
    
        result.startTest(self)
        try:
            testMethod = getattr(self, self._testMethodName)
            if (getattr(self.__class__, "__unittest_skip__", False) or
                getattr(testMethod, "__unittest_skip__", False)):
                # If the class or method was skipped.
                skip_why = (getattr(self.__class__, '__unittest_skip_why__', '')
                            or getattr(testMethod, '__unittest_skip_why__', ''))
                _addSkip(result, self, skip_why)
                return result
    
            expecting_failure = (
                getattr(self, "__unittest_expecting_failure__", False) or
                getattr(testMethod, "__unittest_expecting_failure__", False)
            )
            outcome = _Outcome(result)
            start_time = time.perf_counter()
            try:
                self._outcome = outcome
    
                with outcome.testPartExecutor(self):
                    self._callSetUp()
                if outcome.success:
                    outcome.expecting_failure = expecting_failure
                    with outcome.testPartExecutor(self):
>                       self._callTestMethod(testMethod)

.../hostedtoolcache/Python/3.13.11........./x64/lib/python3.13/unittest/case.py:651: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_oidc.TestProviderOAuth2OIDC testMethod=test_authorization_consent_explicit>
method = <bound method TestProviderOAuth2OIDC.test_authorization_consent_explicit of <tests.e2e.test_provider_oidc.TestProviderOAuth2OIDC testMethod=test_authorization_consent_explicit>>

    def _callTestMethod(self, method):
>       if method() is not None:

.../hostedtoolcache/Python/3.13.11........./x64/lib/python3.13/unittest/case.py:606: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_oidc.TestProviderOAuth2OIDC testMethod=test_authorization_consent_explicit>
args = (), kwargs = {}

    @wraps(func)
    def wrapper(self: TransactionTestCase, *args, **kwargs):
        """Run test again if we're below max_retries, including tearDown and
        setUp. Otherwise raise the error"""
        nonlocal count
        try:
            return func(self, *args, **kwargs)
    
        except tuple(exceptions) as exc:
            count += 1
            if count > max_retires:
                logger.debug("Exceeded retry count", exc=exc, test=self)
    
                raise exc
            logger.debug("Retrying on error", exc=exc, test=self)
            self.tearDown()
            self._post_teardown()
            self._pre_setup()
            self.setUp()
>           return wrapper(self, *args, **kwargs)

tests/e2e/utils.py:505: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_oidc.TestProviderOAuth2OIDC testMethod=test_authorization_consent_explicit>
args = (), kwargs = {}

    @wraps(func)
    def wrapper(self: TransactionTestCase, *args, **kwargs):
        """Run test again if we're below max_retries, including tearDown and
        setUp. Otherwise raise the error"""
        nonlocal count
        try:
            return func(self, *args, **kwargs)
    
        except tuple(exceptions) as exc:
            count += 1
            if count > max_retires:
                logger.debug("Exceeded retry count", exc=exc, test=self)
    
                raise exc
            logger.debug("Retrying on error", exc=exc, test=self)
            self.tearDown()
            self._post_teardown()
            self._pre_setup()
            self.setUp()
>           return wrapper(self, *args, **kwargs)

tests/e2e/utils.py:505: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_oidc.TestProviderOAuth2OIDC testMethod=test_authorization_consent_explicit>
args = (), kwargs = {}

    @wraps(func)
    def wrapper(self: TransactionTestCase, *args, **kwargs):
        """Run test again if we're below max_retries, including tearDown and
        setUp. Otherwise raise the error"""
        nonlocal count
        try:
            return func(self, *args, **kwargs)
    
        except tuple(exceptions) as exc:
            count += 1
            if count > max_retires:
                logger.debug("Exceeded retry count", exc=exc, test=self)
    
>               raise exc

tests/e2e/utils.py:499: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_oidc.TestProviderOAuth2OIDC testMethod=test_authorization_consent_explicit>
args = (), kwargs = {}

    @wraps(func)
    def wrapper(self: TransactionTestCase, *args, **kwargs):
        """Run test again if we're below max_retries, including tearDown and
        setUp. Otherwise raise the error"""
        nonlocal count
        try:
>           return func(self, *args, **kwargs)

tests/e2e/utils.py:492: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

args = (<tests.e2e.test_provider_oidc.TestProviderOAuth2OIDC testMethod=test_authorization_consent_explicit>,)
kwargs = {}, file = 'default/flow-default-invalidation-flow.yaml'
content = 'version: 1\nmetadata:\n  name: Default - Invalidation flow\nentries:\n- attrs:\n    designation: invalidation\n    na...0\n    stage: !KeyOf default-invalidation-logout\n    target: !KeyOf flow\n  model: authentik_flows.flowstagebinding\n'

    @wraps(func)
    def wrapper(*args, **kwargs):
        for file in files:
            content = BlueprintInstance(path=file).retrieve()
            Importer.from_string(content).apply()
>       return func(*args, **kwargs)

.../blueprints/tests/__init__.py:25: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

args = (<tests.e2e.test_provider_oidc.TestProviderOAuth2OIDC testMethod=test_authorization_consent_explicit>,)
kwargs = {}
file = 'default/flow-default-provider-authorization-explicit-consent.yaml'
content = 'version: 1\nmetadata:\n  name: Default - Provider authorization flow (explicit consent)\nentries:\n- attrs:\n    desi...e: !KeyOf default-provider-authorization-consent\n    target: !KeyOf flow\n  model: authentik_flows.flowstagebinding\n'

    @wraps(func)
    def wrapper(*args, **kwargs):
        for file in files:
            content = BlueprintInstance(path=file).retrieve()
            Importer.from_string(content).apply()
>       return func(*args, **kwargs)

.../blueprints/tests/__init__.py:25: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

args = (<tests.e2e.test_provider_oidc.TestProviderOAuth2OIDC testMethod=test_authorization_consent_explicit>,)
kwargs = {}, file = 'system/providers-oauth2.yaml'
content = 'version: 1\nmetadata:\n  labels:\n    blueprints.goauthentik.io/system: "true"\n  name: System - OAuth2 Provider - Sc... application the ability to access the authentik API\n        # on behalf of the authorizing user\n        return {}\n'

    @wraps(func)
    def wrapper(*args, **kwargs):
        for file in files:
            content = BlueprintInstance(path=file).retrieve()
            Importer.from_string(content).apply()
>       return func(*args, **kwargs)

.../blueprints/tests/__init__.py:25: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

args = (<tests.e2e.test_provider_oidc.TestProviderOAuth2OIDC testMethod=test_authorization_consent_explicit>,)
kwargs = {}, config = <AuthentikCryptoConfig: authentik_crypto>

    @wraps(func)
    def wrapper(*args, **kwargs):
        config = apps.get_app_config(app_name)
        if isinstance(config, ManagedAppConfig):
            config._on_startup_callback(None)
>       return func(*args, **kwargs)

.../blueprints/tests/__init__.py:43: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_oidc.TestProviderOAuth2OIDC testMethod=test_authorization_consent_explicit>

    @retry()
    @apply_blueprint(
        "default/flow-default-authentication-flow.yaml",
        "default/flow-default-invalidation-flow.yaml",
    )
    @apply_blueprint("default/flow-default-provider-authorization-explicit-consent.yaml")
    @apply_blueprint("system/providers-oauth2.yaml")
    @reconcile_app("authentik_crypto")
    def test_authorization_consent_explicit(self):
        """test OpenID Provider flow (default authorization flow with explicit consent)"""
        sleep(1)
        # Bootstrap all needed objects
        authorization_flow = Flow.objects.get(
            slug="default-provider-authorization-explicit-consent"
        )
        provider = OAuth2Provider.objects.create(
            name=self.application_slug,
            authorization_flow=authorization_flow,
            client_type=ClientTypes.CONFIDENTIAL,
            client_id=self.client_id,
            client_secret=self.client_secret,
            signing_key=create_test_cert(),
            redirect_uris=[
                RedirectURI(RedirectURIMatchingMode.STRICT, "http://localhost:9009/auth/callback")
            ],
        )
        provider.property_mappings.set(
            ScopeMapping.objects.filter(
                scope_name__in=[
                    SCOPE_OPENID,
                    SCOPE_OPENID_EMAIL,
                    SCOPE_OPENID_PROFILE,
                    SCOPE_OFFLINE_ACCESS,
                ]
            )
        )
        app = Application.objects.create(
            name=self.application_slug,
            slug=self.application_slug,
            provider=provider,
        )
        self.setup_client()
    
        self.driver.get("http://localhost:9009")
        self.login()
    
        self.wait.until(ec.presence_of_element_located((By.CSS_SELECTOR, "ak-flow-executor")))
    
        flow_executor = self.get_shadow_root("ak-flow-executor")
>       consent_stage = self.get_shadow_root("ak-stage-consent", flow_executor)

tests/e2e/test_provider_oidc.py:264: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_oidc.TestProviderOAuth2OIDC testMethod=test_authorization_consent_explicit>
selector = 'ak-stage-consent'
container = <selenium.webdriver.remote.shadowroot.ShadowRoot (session="948d8227c391637242dcce1e49831dd8", element="f.C145F762DFB75670D097797F712B37F5.d.DC9FE07E1CBCE127DABA48C26AACB719.e.6")>
timeout = 10

    def get_shadow_root(
        self, selector: str, container: WebElement | WebDriver | None = None, timeout: float = 10
    ) -> WebElement:
        """Get the shadow root of a web component specified by `selector`."""
        if not container:
            container = self.driver
        wait = WebDriverWait(container, timeout)
        host: WebElement | None = None
    
        try:
>           host = wait.until(lambda c: c.find_element(By.CSS_SELECTOR, selector))

tests/e2e/utils.py:347: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <[AttributeError("'ShadowRoot' object has no attribute 'session_id'") raised in repr()] WebDriverWait object at 0x7f38795925f0>
method = <function SeleniumTestCase.get_shadow_root.<locals>.<lambda> at 0x7f3877dbb2e0>
message = ''

    def until(self, method: Callable[[D], Union[Literal[False], T]], message: str = "") -> T:
        """Wait until the method returns a value that is not False.
    
        Calls the method provided with the driver as an argument until the
        return value does not evaluate to ``False``.
    
        Parameters:
        -----------
        method: callable(WebDriver)
            - A callable object that takes a WebDriver instance as an argument.
    
        message: str
            - Optional message for :exc:`TimeoutException`
    
        Return:
        -------
        object: T
            - The result of the last call to `method`
    
        Raises:
        -------
        TimeoutException
            - If 'method' does not return a truthy value within the WebDriverWait
            object's timeout
    
        Example:
        --------
        >>> from selenium.webdriver.common.by import By
        >>> from selenium.webdriver.support.ui import WebDriverWait
        >>> from selenium.webdriver.support import expected_conditions as EC
    
        # Wait until an element is visible on the page
        >>> wait = WebDriverWait(driver, 10)
        >>> element = wait.until(EC.visibility_of_element_located((By.ID, "exampleId")))
        >>> print(element.text)
        """
        screen = None
        stacktrace = None
    
        end_time = time.monotonic() + self._timeout
        while True:
            try:
>               value = method(self._driver)

.venv/lib/python3.13.../webdriver/support/wait.py:137: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

c = <selenium.webdriver.remote.shadowroot.ShadowRoot (session="948d8227c391637242dcce1e49831dd8", element="f.C145F762DFB75670D097797F712B37F5.d.DC9FE07E1CBCE127DABA48C26AACB719.e.6")>

>   host = wait.until(lambda c: c.find_element(By.CSS_SELECTOR, selector))

tests/e2e/utils.py:347: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <selenium.webdriver.remote.shadowroot.ShadowRoot (session="948d8227c391637242dcce1e49831dd8", element="f.C145F762DFB75670D097797F712B37F5.d.DC9FE07E1CBCE127DABA48C26AACB719.e.6")>
by = 'css selector', value = 'ak-stage-consent'

    def find_element(self, by: str = By.ID, value: str = None):
        """Find an element inside a shadow root given a By strategy and
        locator.
    
        Parameters:
        -----------
        by : selenium.webdriver.common.by.By
            The locating strategy to use. Default is `By.ID`. Supported values include:
            - By.ID: Locate by element ID.
            - By.NAME: Locate by the `name` attribute.
            - By.XPATH: Locate by an XPath expression.
            - By.CSS_SELECTOR: Locate by a CSS selector.
            - By.CLASS_NAME: Locate by the `class` attribute.
            - By.TAG_NAME: Locate by the tag name (e.g., "input", "button").
            - By.LINK_TEXT: Locate a link element by its exact text.
            - By.PARTIAL_LINK_TEXT: Locate a link element by partial text match.
            - RelativeBy: Locate elements relative to a specified root element.
    
        Example:
        --------
        element = driver.find_element(By.ID, 'foo')
    
        Returns:
        -------
        WebElement
            The first matching `WebElement` found on the page.
        """
        if by == By.ID:
            by = By.CSS_SELECTOR
            value = f'[id="{value}"]'
        elif by == By.CLASS_NAME:
            by = By.CSS_SELECTOR
            value = f".{value}"
        elif by == By.NAME:
            by = By.CSS_SELECTOR
            value = f'[name="{value}"]'
    
>       return self._execute(Command.FIND_ELEMENT_FROM_SHADOW_ROOT, {"using": by, "value": value})["value"]

.venv/lib/python3.13.../webdriver/remote/shadowroot.py:79: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <selenium.webdriver.remote.shadowroot.ShadowRoot (session="948d8227c391637242dcce1e49831dd8", element="f.C145F762DFB75670D097797F712B37F5.d.DC9FE07E1CBCE127DABA48C26AACB719.e.6")>
command = 'findElementFromShadowRoot'
params = {'shadowId': 'f.C145F762DFB75670D097797F712B37F5.d.DC9FE07E1CBCE127DABA48C26AACB719.e.6', 'using': 'css selector', 'value': 'ak-stage-consent'}

    def _execute(self, command, params=None):
        """Executes a command against the underlying HTML element.
    
        Args:
          command: The name of the command to _execute as a string.
          params: A dictionary of named parameters to send with the command.
    
        Returns:
          The command's JSON response loaded into a dictionary object.
        """
        if not params:
            params = {}
        params["shadowId"] = self._id
>       return self.session.execute(command, params)

.venv/lib/python3.13.../webdriver/remote/shadowroot.py:133: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <selenium.webdriver.remote.webdriver.WebDriver (session="948d8227c391637242dcce1e49831dd8")>
driver_command = 'findElementFromShadowRoot'
params = {'using': 'css selector', 'value': 'ak-stage-consent'}

    def execute(self, driver_command: str, params: dict = None) -> dict:
        """Sends a command to be executed by a command.CommandExecutor.
    
        Parameters:
        -----------
        driver_command : str
            - The name of the command to execute as a string.
    
        params : dict
            - A dictionary of named Parameters to send with the command.
    
        Returns:
        --------
          dict - The command's JSON response loaded into a dictionary object.
        """
        params = self._wrap_value(params)
    
        if self.session_id:
            if not params:
                params = {"sessionId": self.session_id}
            elif "sessionId" not in params:
                params["sessionId"] = self.session_id
    
        response = self.command_executor.execute(driver_command, params)
        if response:
>           self.error_handler.check_response(response)

.venv/lib/python3.13.../webdriver/remote/webdriver.py:448: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <selenium.webdriver.remote.errorhandler.ErrorHandler object at 0x7f3878d8ef30>
response = {'status': 404, 'value': '{"value":{"error":"detached shadow root","message":"detached shadow root: detached shadow ro...\\n#27 0x5622df6bacd5 \\u003Cunknown>\\n#28 0x5622df6cc709 \\u003Cunknown>\\n#29 0x7faed88e3428 \\u003Cunknown>\\n"}}'}

    def check_response(self, response: Dict[str, Any]) -> None:
        """Checks that a JSON response from the WebDriver does not have an
        error.
    
        :Args:
         - response - The JSON response from the WebDriver server as a dictionary
           object.
    
        :Raises: If the response contains an error message.
        """
        status = response.get("status", None)
        if not status or status == ErrorCode.SUCCESS:
            return
        value = None
        message = response.get("message", "")
        screen: str = response.get("screen", "")
        stacktrace = None
        if isinstance(status, int):
            value_json = response.get("value", None)
            if value_json and isinstance(value_json, str):
                import json
    
                try:
                    value = json.loads(value_json)
                    if len(value) == 1:
                        value = value["value"]
                    status = value.get("error", None)
                    if not status:
                        status = value.get("status", ErrorCode.UNKNOWN_ERROR)
                        message = value.get("value") or value.get("message")
                        if not isinstance(message, str):
                            value = message
                            message = message.get("message")
                    else:
                        message = value.get("message", None)
                except ValueError:
                    pass
    
        exception_class: Type[WebDriverException]
        e = ErrorCode()
        error_codes = [item for item in dir(e) if not item.startswith("__")]
        for error_code in error_codes:
            error_info = getattr(ErrorCode, error_code)
            if isinstance(error_info, list) and status in error_info:
                exception_class = getattr(ExceptionMapping, error_code, WebDriverException)
                break
        else:
            exception_class = WebDriverException
    
        if not value:
            value = response["value"]
        if isinstance(value, str):
            raise exception_class(value)
        if message == "" and "message" in value:
            message = value["message"]
    
        screen = None  # type: ignore[assignment]
        if "screen" in value:
            screen = value["screen"]
    
        stacktrace = None
        st_value = value.get("stackTrace") or value.get("stacktrace")
        if st_value:
            if isinstance(st_value, str):
                stacktrace = st_value.split("\n")
            else:
                stacktrace = []
                try:
                    for frame in st_value:
                        line = frame.get("lineNumber", "")
                        file = frame.get("fileName", "<anonymous>")
                        if line:
                            file = f"{file}:{line}"
                        meth = frame.get("methodName", "<anonymous>")
                        if "className" in frame:
                            meth = f"{frame['className']}.{meth}"
                        msg = "    at %s (%s)"
                        msg = msg % (meth, file)
                        stacktrace.append(msg)
                except TypeError:
                    pass
        if exception_class == UnexpectedAlertPresentException:
            alert_text = None
            if "data" in value:
                alert_text = value["data"].get("text")
            elif "alert" in value:
                alert_text = value["alert"].get("text")
            raise exception_class(message, screen, stacktrace, alert_text)  # type: ignore[call-arg]  # mypy is not smart enough here
>       raise exception_class(message, screen, stacktrace)
E       selenium.common.exceptions.DetachedShadowRootException: Message: detached shadow root: detached shadow root not found
E         (Session info: chrome=143.0.7499.169)
E       Stacktrace:
E       #0 0x5622df6cd432 <unknown>
E       #1 0x5622df11271b <unknown>
E       #2 0x5622df126207 <unknown>
E       #3 0x5622df124e10 <unknown>
E       #4 0x5622df126719 <unknown>
E       #5 0x5622df119c64 <unknown>
E       #6 0x5622df118873 <unknown>
E       #7 0x5622df11bd6f <unknown>
E       #8 0x5622df11be55 <unknown>
E       #9 0x5622df161da4 <unknown>
E       #10 0x5622df1627d5 <unknown>
E       #11 0x5622df156caa <unknown>
E       #12 0x5622df186ee1 <unknown>
E       #13 0x5622df156b81 <unknown>
E       #14 0x5622df1870a2 <unknown>
E       #15 0x5622df1a8b62 <unknown>
E       #16 0x5622df186c67 <unknown>
E       #17 0x5622df155047 <unknown>
E       #18 0x5622df155e15 <unknown>
E       #19 0x5622df698064 <unknown>
E       #20 0x5622df69b354 <unknown>
E       #21 0x5622df69ae0e <unknown>
E       #22 0x5622df69b7e9 <unknown>
E       #23 0x5622df681a7a <unknown>
E       #24 0x5622df69bb5a <unknown>
E       #25 0x5622df66a629 <unknown>
E       #26 0x5622df6baae9 <unknown>
E       #27 0x5622df6bacd5 <unknown>
E       #28 0x5622df6cc709 <unknown>
E       #29 0x7faed88e3428 <unknown>

.venv/lib/python3.13.../webdriver/remote/errorhandler.py:232: DetachedShadowRootException

To view more test analytics, go to the Test Analytics Dashboard
📋 Got 3 mins? Take this short survey to help us improve Test Analytics.

@netlify
Copy link

netlify bot commented Dec 26, 2025

Deploy Preview for authentik-docs ready!

Name Link
🔨 Latest commit 2d54e87
🔍 Latest deploy log https://app.netlify.com/projects/authentik-docs/deploys/694e8bbd440e5c00083f0500
😎 Deploy Preview https://deploy-preview-19057--authentik-docs.netlify.app
📱 Preview on mobile
Toggle QR Code...

QR Code

Use your smartphone camera to open QR code link.

To edit notification comments on pull requests, go to your Netlify project configuration.

Signed-off-by: Jens Langhammer <jens@goauthentik.io>
@BeryJu BeryJu merged commit 87f4435 into version-2025.12 Dec 26, 2025
78 of 85 checks passed
@BeryJu BeryJu deleted the cherry-pick/19047-to-version-2025.12 branch December 26, 2025 14:53
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant