Skip to content

Sorting is not maintained after using a window function #15833

@daphnenhuch-at

Description

@daphnenhuch-at

Describe the bug

I have a query which sorts the data by a column called "userPrimaryKey" and then using a windowing function to add a row number column to the data frame. I've set target_partitions to 8 to have a relatively efficient use of parallelism. However, when the query comes back, round robin partitioning is causing the record batches to be returned in an unsorted order. I have 10,000 records and the result has records 8192 through 9999 followed by 0 through 8191. I would expect the result to be fully sorted. This is fixed by setting datafusion.optimizer.enable_round_robin_repartition to false. But I think the query plan is making the wrong decision to Repartition the data after the Bounded window function.

I've included a test which reproduces this bug as well as the query plan produced by the query.

To Reproduce

Run the following test

#[cfg(test)]
mod tests {
    use deltalake::arrow::array::Int32Array;
    use deltalake::datafusion::prelude::SessionContext;
    use rand::distributions::Alphanumeric;
    use rand::Rng;
    use std::sync::Arc;

    use deltalake::datafusion::logical_expr::{ident, lit, ExprSchemable};
    use deltalake::{
        arrow::datatypes::DataType as ArrowDataType,
        arrow::{
            array::{BooleanArray, Int64Array, RecordBatch, StringArray},
            datatypes::{DataType, Field, Schema},
        },
        datafusion::{
            datasource::MemTable,
            functions_window::expr_fn::row_number,
            logical_expr::{expr::WildcardOptions, utils::expand_wildcard},
            sql::sqlparser::ast::ExcludeSelectItem,
        },
    };
    use napi::anyhow::Result;

    fn generate_random_id() -> String {
        rand::thread_rng()
            .sample_iter(&Alphanumeric)
            .take(10)
            .map(char::from)
            .collect()
    }

    fn get_session_context() -> SessionContext {
        use std::sync::Arc;

        use deltalake::datafusion::{
            execution::{
                disk_manager::DiskManagerConfig, memory_pool::FairSpillPool,
                runtime_env::RuntimeEnvBuilder,
            },
            prelude::{SessionConfig, SessionContext},
        };

        let memory_pool_size_in_bytes = 100000000;
        let memory_pool = FairSpillPool::new(memory_pool_size_in_bytes);
        let runtime = Arc::new(
            RuntimeEnvBuilder::new()
                // We disable disk manager to avoid potentially writing temporary files to disk, which
                // would violate conditions of EKM
                .with_disk_manager(DiskManagerConfig::Disabled)
                .with_memory_pool(Arc::new(memory_pool))
                .build()
                .unwrap(),
        );
        SessionContext::new_with_config_rt(
            SessionConfig::default()
                .set_bool("datafusion.execution.parquet.pushdown_filters", true)
                .set_bool("datafusion.execution.parquet.reorder_filters", true)
                .set_usize("datafusion.execution.target_partitions", 2)
                .set_bool("datafusion.execution.keep_partition_by_columns", false)
                .set_usize("datafusion.execution.parquet.metadata_size_hint", 512 << 10),
            runtime,
        )
    }

    fn create_record_batch(schema: Arc<Schema>, start: i32, end: i32) -> RecordBatch {
        let mut column_1_values = vec![];
        let mut column_2_values = vec![];
        let mut primary_keys = vec![];
        let mut ids = vec![];
        let mut last_update_times = vec![];
        let mut is_deleted_values = vec![];

        for i in start..end {
            column_1_values.push(format!("{:04}", i));
            column_2_values.push("a");
            primary_keys.push(format!("{:04}", i));
            ids.push(generate_random_id());
            last_update_times.push(chrono::Utc::now().timestamp_millis());
            is_deleted_values.push(false);
        }
        let batch = RecordBatch::try_new(
            schema.clone(),
            vec![
                Arc::new(StringArray::from(column_1_values)),
                Arc::new(StringArray::from(column_2_values)),
                Arc::new(StringArray::from(primary_keys)),
                Arc::new(StringArray::from(ids)),
                Arc::new(Int64Array::from(last_update_times)),
                Arc::new(BooleanArray::from(is_deleted_values)),
            ],
        )
        .unwrap();
        return batch;
    }

    #[tokio::test(flavor = "multi_thread")]
    async fn simple_reproduction() -> Result<()> {
        let schema = Arc::new(Schema::new(vec![
            Field::new("1", DataType::Utf8, true),
            Field::new("2", DataType::Utf8, true),
            Field::new("userPrimaryKey", DataType::Utf8, false),
            Field::new("id", DataType::Utf8, false),
            Field::new("lastUpdateTime", DataType::Int64, false),
            Field::new("isDeleted", DataType::Boolean, false),
        ]));

        let mut batches: Vec<Vec<RecordBatch>> = vec![];

        let mut batch: RecordBatch = create_record_batch(schema.clone(), 0, 8192);
        batches.push(vec![batch]);

        let mut batch: RecordBatch = create_record_batch(schema.clone(), 8192, 10000);
        batches.push(vec![batch]);
        dbg!(&batches);

        let table = MemTable::try_new(schema.clone(), batches).unwrap();
        let ctx = get_session_context();
        ctx.register_table("my_table", Arc::new(table)).unwrap();

        let row_number_sub_query = ctx
            .table("my_table")
            .await?
            .sort(vec![ident("userPrimaryKey").sort(true, true)])?
            .window(vec![row_number().alias("dataFusionRowNumber")])?;

        let column_names_to_exclude = vec!["dataFusionRowNumber".into()];
        let mut columns_to_select_exprs = expand_wildcard(
            row_number_sub_query.schema(),
            row_number_sub_query.logical_plan(),
            Some(&WildcardOptions {
                ilike: None,
                exclude: Some(ExcludeSelectItem::Multiple(column_names_to_exclude)),
                except: None,
                replace: None,
                rename: None,
            }),
        )?;

        columns_to_select_exprs.push(
            (ident("dataFusionRowNumber") - lit(1))
                .cast_to(&ArrowDataType::Int32, row_number_sub_query.schema())?
                .alias_qualified(Some("my_table"), "fileRowNumber"),
        );
        let result = row_number_sub_query.select(columns_to_select_exprs)?;
        let batches = result.collect().await?;
        let mut counter = 0;
        for batch in batches {
            let primary_key_column = batch
                .column_by_name("userPrimaryKey")
                .unwrap()
                .as_any()
                .downcast_ref::<StringArray>()
                .unwrap();
            let file_row_number_column = batch
                .column_by_name("fileRowNumber")
                .unwrap()
                .as_any()
                .downcast_ref::<Int32Array>()
                .unwrap();
            for i in 0..batch.num_rows() {
                let user_primary_key = primary_key_column.value(i).to_string();
                let file_row_number = file_row_number_column.value(i) as i32;
                assert_eq!(file_row_number, counter);
                assert_eq!(user_primary_key, format!("{:04}", counter));
                counter += 1;
            }
        }

        Ok(())
    }
}

Expected behavior

I expect the data to come back fully sorted from 0000 through 9999 with the corresponding row number.

Additional context

This is the query plan produced. I think that we should either not be repartitioning the data or should use a SoftPreservingMergeExec such that the streams come back in sorted order.

CoalescePartitionsExec {
    input: ProjectionExec {
        expr: [
            (
                Column {
                    name: "1",
                    index: 0,
                },
                "1",
            ),
            (
                Column {
                    name: "2",
                    index: 1,
                },
                "2",
            ),
            (
                Column {
                    name: "userPrimaryKey",
                    index: 2,
                },
                "userPrimaryKey",
            ),
            (
                Column {
                    name: "id",
                    index: 3,
                },
                "id",
            ),
            (
                Column {
                    name: "lastUpdateTime",
                    index: 4,
                },
                "lastUpdateTime",
            ),
            (
                Column {
                    name: "isDeleted",
                    index: 5,
                },
                "isDeleted",
            ),
            (
                BinaryExpr {
                    left: CastExpr {
                        expr: Column {
                            name: "dataFusionRowNumber",
                            index: 6,
                        },
                        cast_type: Int32,
                        cast_options: CastOptions {
                            safe: false,
                            format_options: FormatOptions {
                                safe: true,
                                null: "",
                                date_format: None,
                                datetime_format: None,
                                timestamp_format: None,
                                timestamp_tz_format: None,
                                time_format: None,
                                duration_format: Pretty,
                            },
                        },
                    },
                    op: Minus,
                    right: Literal {
                        value: Int32(1),
                    },
                    fail_on_overflow: false,
                },
                "fileRowNumber",
            ),
        ],
        schema: Schema {
            fields: [
                Field {
                    name: "1",
                    data_type: Utf8,
                    nullable: true,
                    dict_id: 0,
                    dict_is_ordered: false,
                    metadata: {},
                },
                Field {
                    name: "2",
                    data_type: Utf8,
                    nullable: true,
                    dict_id: 0,
                    dict_is_ordered: false,
                    metadata: {},
                },
                Field {
                    name: "userPrimaryKey",
                    data_type: Utf8,
                    nullable: false,
                    dict_id: 0,
                    dict_is_ordered: false,
                    metadata: {},
                },
                Field {
                    name: "id",
                    data_type: Utf8,
                    nullable: false,
                    dict_id: 0,
                    dict_is_ordered: false,
                    metadata: {},
                },
                Field {
                    name: "lastUpdateTime",
                    data_type: Int64,
                    nullable: false,
                    dict_id: 0,
                    dict_is_ordered: false,
                    metadata: {},
                },
                Field {
                    name: "isDeleted",
                    data_type: Boolean,
                    nullable: false,
                    dict_id: 0,
                    dict_is_ordered: false,
                    metadata: {},
                },
                Field {
                    name: "fileRowNumber",
                    data_type: Int32,
                    nullable: false,
                    dict_id: 0,
                    dict_is_ordered: false,
                    metadata: {},
                },
            ],
            metadata: {},
        },
        input: RepartitionExec {
            input: BoundedWindowAggExec {
                input: SortExec {
                    input: DataSourceExec {
                        data_source: FileScanConfig {object_store_url=ObjectStoreUrl { url: Url { scheme: "s3", cannot_be_a_base: false, username: "", password: None, host: Some(Domain("airtable-enterprise-data-tables-us-west-2-development")), port: None, path: "/", query: None, fragment: None } }, statistics=Statistics { num_rows: Exact(10000), total_byte_size: Exact(521607), column_statistics: [ColumnStatistics { null_count: Exact(0), max_value: Exact(Utf8("9999")), min_value: Exact(Utf8("0000")), sum_value: Absent, distinct_count: Absent }, ColumnStatistics { null_count: Exact(0), max_value: Exact(Utf8("a")), min_value: Exact(Utf8("a")), sum_value: Absent, distinct_count: Absent }, ColumnStatistics { null_count: Exact(0), max_value: Exact(Utf8("9999")), min_value: Exact(Utf8("0000")), sum_value: Absent, distinct_count: Absent }, ColumnStatistics { null_count: Exact(0), max_value: Exact(Utf8("dtr00000009999541")), min_value: Exact(Utf8("dtr00000000000175")), sum_value: Absent, distinct_count: Absent }, ColumnStatistics { null_count: Exact(0), max_value: Exact(Int64(10000)), min_value: Exact(Int64(1)), sum_value: Absent, distinct_count: Absent }, ColumnStatistics { null_count: Exact(0), max_value: Exact(Boolean(false)), min_value: Exact(Boolean(false)), sum_value: Absent, distinct_count: Absent }, ColumnStatistics { null_count: Exact(10000), max_value: Exact(Int32(NULL)), min_value: Exact(Int32(NULL)), sum_value: Absent, distinct_count: Absent }] }, file_groups={1 group: [[entRDQybgmzURTXUd/edtcuTVRzNDkbys5t/optimize_42QnfFpBs8/2za67O8nb1.zstd.parquet/partitioning_column=0/XtwrzQJVtMmjo88H.parquet]]}, projection=[1, 2, userPrimaryKey, id, lastUpdateTime, isDeleted]},
                        cache: PlanProperties {
                            eq_properties: EquivalenceProperties {
                                eq_group: EquivalenceGroup {
                                    classes: [],
                                },
                                oeq_class: OrderingEquivalenceClass {
                                    orderings: [],
                                },
                                constants: [],
                                constraints: Constraints {
                                    inner: [],
                                },
                                schema: Schema {
                                    fields: [
                                        Field {
                                            name: "1",
                                            data_type: Utf8,
                                            nullable: true,
                                            dict_id: 0,
                                            dict_is_ordered: false,
                                            metadata: {},
                                        },
                                        Field {
                                            name: "2",
                                            data_type: Utf8,
                                            nullable: true,
                                            dict_id: 0,
                                            dict_is_ordered: false,
                                            metadata: {},
                                        },
                                        Field {
                                            name: "userPrimaryKey",
                                            data_type: Utf8,
                                            nullable: false,
                                            dict_id: 0,
                                            dict_is_ordered: false,
                                            metadata: {},
                                        },
                                        Field {
                                            name: "id",
                                            data_type: Utf8,
                                            nullable: false,
                                            dict_id: 0,
                                            dict_is_ordered: false,
                                            metadata: {},
                                        },
                                        Field {
                                            name: "lastUpdateTime",
                                            data_type: Int64,
                                            nullable: false,
                                            dict_id: 0,
                                            dict_is_ordered: false,
                                            metadata: {},
                                        },
                                        Field {
                                            name: "isDeleted",
                                            data_type: Boolean,
                                            nullable: false,
                                            dict_id: 0,
                                            dict_is_ordered: false,
                                            metadata: {},
                                        },
                                    ],
                                    metadata: {},
                                },
                            },
                            partitioning: UnknownPartitioning(
                                1,
                            ),
                            emission_type: Incremental,
                            boundedness: Bounded,
                            output_ordering: None,
                        },
                    },
                    expr: LexOrdering {
                        inner: [
                            PhysicalSortExpr {
                                expr: Column {
                                    name: "userPrimaryKey",
                                    index: 2,
                                },
                                options: SortOptions {
                                    descending: false,
                                    nulls_first: true,
                                },
                            },
                        ],
                    },
                    metrics_set: ExecutionPlanMetricsSet {
                        inner: Mutex {
                            data: MetricsSet {
                                metrics: [],
                            },
                        },
                    },
                    preserve_partitioning: false,
                    fetch: None,
                    cache: PlanProperties {
                        eq_properties: EquivalenceProperties {
                            eq_group: EquivalenceGroup {
                                classes: [],
                            },
                            oeq_class: OrderingEquivalenceClass {
                                orderings: [
                                    LexOrdering {
                                        inner: [
                                            PhysicalSortExpr {
                                                expr: Column {
                                                    name: "userPrimaryKey",
                                                    index: 2,
                                                },
                                                options: SortOptions {
                                                    descending: false,
                                                    nulls_first: true,
                                                },
                                            },
                                        ],
                                    },
                                ],
                            },
                            constants: [],
                            constraints: Constraints {
                                inner: [],
                            },
                            schema: Schema {
                                fields: [
                                    Field {
                                        name: "1",
                                        data_type: Utf8,
                                        nullable: true,
                                        dict_id: 0,
                                        dict_is_ordered: false,
                                        metadata: {},
                                    },
                                    Field {
                                        name: "2",
                                        data_type: Utf8,
                                        nullable: true,
                                        dict_id: 0,
                                        dict_is_ordered: false,
                                        metadata: {},
                                    },
                                    Field {
                                        name: "userPrimaryKey",
                                        data_type: Utf8,
                                        nullable: false,
                                        dict_id: 0,
                                        dict_is_ordered: false,
                                        metadata: {},
                                    },
                                    Field {
                                        name: "id",
                                        data_type: Utf8,
                                        nullable: false,
                                        dict_id: 0,
                                        dict_is_ordered: false,
                                        metadata: {},
                                    },
                                    Field {
                                        name: "lastUpdateTime",
                                        data_type: Int64,
                                        nullable: false,
                                        dict_id: 0,
                                        dict_is_ordered: false,
                                        metadata: {},
                                    },
                                    Field {
                                        name: "isDeleted",
                                        data_type: Boolean,
                                        nullable: false,
                                        dict_id: 0,
                                        dict_is_ordered: false,
                                        metadata: {},
                                    },
                                ],
                                metadata: {},
                            },
                        },
                        partitioning: UnknownPartitioning(
                            1,
                        ),
                        emission_type: Final,
                        boundedness: Bounded,
                        output_ordering: Some(
                            LexOrdering {
                                inner: [
                                    PhysicalSortExpr {
                                        expr: Column {
                                            name: "userPrimaryKey",
                                            index: 2,
                                        },
                                        options: SortOptions {
                                            descending: false,
                                            nulls_first: true,
                                        },
                                    },
                                ],
                            },
                        ),
                    },
                },
                window_expr: [
                    StandardWindowExpr {
                        expr: WindowUDFExpr {
                            fun: WindowUDF {
                                inner: RowNumber {
                                    signature: Signature {
                                        type_signature: Nullary,
                                        volatility: Immutable,
                                    },
                                },
                            },
                            args: [],
                            name: "dataFusionRowNumber",
                            input_types: [],
                            is_reversed: false,
                            ignore_nulls: false,
                        },
                        partition_by: [],
                        order_by: LexOrdering {
                            inner: [],
                        },
                        window_frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: Following(UInt64(NULL)), is_causal: false },
                    },
                ],
                schema: Schema {
                    fields: [
                        Field {
                            name: "1",
                            data_type: Utf8,
                            nullable: true,
                            dict_id: 0,
                            dict_is_ordered: false,
                            metadata: {},
                        },
                        Field {
                            name: "2",
                            data_type: Utf8,
                            nullable: true,
                            dict_id: 0,
                            dict_is_ordered: false,
                            metadata: {},
                        },
                        Field {
                            name: "userPrimaryKey",
                            data_type: Utf8,
                            nullable: false,
                            dict_id: 0,
                            dict_is_ordered: false,
                            metadata: {},
                        },
                        Field {
                            name: "id",
                            data_type: Utf8,
                            nullable: false,
                            dict_id: 0,
                            dict_is_ordered: false,
                            metadata: {},
                        },
                        Field {
                            name: "lastUpdateTime",
                            data_type: Int64,
                            nullable: false,
                            dict_id: 0,
                            dict_is_ordered: false,
                            metadata: {},
                        },
                        Field {
                            name: "isDeleted",
                            data_type: Boolean,
                            nullable: false,
                            dict_id: 0,
                            dict_is_ordered: false,
                            metadata: {},
                        },
                        Field {
                            name: "dataFusionRowNumber",
                            data_type: UInt64,
                            nullable: false,
                            dict_id: 0,
                            dict_is_ordered: false,
                            metadata: {},
                        },
                    ],
                    metadata: {},
                },
                metrics: ExecutionPlanMetricsSet {
                    inner: Mutex {
                        data: MetricsSet {
                            metrics: [],
                        },
                    },
                },
                input_order_mode: Sorted,
                ordered_partition_by_indices: [],
                cache: PlanProperties {
                    eq_properties: EquivalenceProperties {
                        eq_group: EquivalenceGroup {
                            classes: [],
                        },
                        oeq_class: OrderingEquivalenceClass {
                            orderings: [
                                LexOrdering {
                                    inner: [
                                        PhysicalSortExpr {
                                            expr: Column {
                                                name: "userPrimaryKey",
                                                index: 2,
                                            },
                                            options: SortOptions {
                                                descending: false,
                                                nulls_first: true,
                                            },
                                        },
                                    ],
                                },
                                LexOrdering {
                                    inner: [
                                        PhysicalSortExpr {
                                            expr: Column {
                                                name: "dataFusionRowNumber",
                                                index: 6,
                                            },
                                            options: SortOptions {
                                                descending: false,
                                                nulls_first: false,
                                            },
                                        },
                                    ],
                                },
                            ],
                        },
                        constants: [],
                        constraints: Constraints {
                            inner: [],
                        },
                        schema: Schema {
                            fields: [
                                Field {
                                    name: "1",
                                    data_type: Utf8,
                                    nullable: true,
                                    dict_id: 0,
                                    dict_is_ordered: false,
                                    metadata: {},
                                },
                                Field {
                                    name: "2",
                                    data_type: Utf8,
                                    nullable: true,
                                    dict_id: 0,
                                    dict_is_ordered: false,
                                    metadata: {},
                                },
                                Field {
                                    name: "userPrimaryKey",
                                    data_type: Utf8,
                                    nullable: false,
                                    dict_id: 0,
                                    dict_is_ordered: false,
                                    metadata: {},
                                },
                                Field {
                                    name: "id",
                                    data_type: Utf8,
                                    nullable: false,
                                    dict_id: 0,
                                    dict_is_ordered: false,
                                    metadata: {},
                                },
                                Field {
                                    name: "lastUpdateTime",
                                    data_type: Int64,
                                    nullable: false,
                                    dict_id: 0,
                                    dict_is_ordered: false,
                                    metadata: {},
                                },
                                Field {
                                    name: "isDeleted",
                                    data_type: Boolean,
                                    nullable: false,
                                    dict_id: 0,
                                    dict_is_ordered: false,
                                    metadata: {},
                                },
                                Field {
                                    name: "dataFusionRowNumber",
                                    data_type: UInt64,
                                    nullable: false,
                                    dict_id: 0,
                                    dict_is_ordered: false,
                                    metadata: {},
                                },
                            ],
                            metadata: {},
                        },
                    },
                    partitioning: UnknownPartitioning(
                        1,
                    ),
                    emission_type: Final,
                    boundedness: Bounded,
                    output_ordering: Some(
                        LexOrdering {
                            inner: [
                                PhysicalSortExpr {
                                    expr: Column {
                                        name: "userPrimaryKey",
                                        index: 2,
                                    },
                                    options: SortOptions {
                                        descending: false,
                                        nulls_first: true,
                                    },
                                },
                                PhysicalSortExpr {
                                    expr: Column {
                                        name: "dataFusionRowNumber",
                                        index: 6,
                                    },
                                    options: SortOptions {
                                        descending: false,
                                        nulls_first: false,
                                    },
                                },
                            ],
                        },
                    ),
                },
                can_repartition: false,
            },
            state: OnceCell {
                value: None,
            },
            metrics: ExecutionPlanMetricsSet {
                inner: Mutex {
                    data: MetricsSet {
                        metrics: [],
                    },
                },
            },
            preserve_order: false,
            cache: PlanProperties {
                eq_properties: EquivalenceProperties {
                    eq_group: EquivalenceGroup {
                        classes: [],
                    },
                    oeq_class: OrderingEquivalenceClass {
                        orderings: [
                            LexOrdering {
                                inner: [
                                    PhysicalSortExpr {
                                        expr: Column {
                                            name: "userPrimaryKey",
                                            index: 2,
                                        },
                                        options: SortOptions {
                                            descending: false,
                                            nulls_first: true,
                                        },
                                    },
                                ],
                            },
                            LexOrdering {
                                inner: [
                                    PhysicalSortExpr {
                                        expr: Column {
                                            name: "dataFusionRowNumber",
                                            index: 6,
                                        },
                                        options: SortOptions {
                                            descending: false,
                                            nulls_first: false,
                                        },
                                    },
                                ],
                            },
                        ],
                    },
                    constants: [],
                    constraints: Constraints {
                        inner: [],
                    },
                    schema: Schema {
                        fields: [
                            Field {
                                name: "1",
                                data_type: Utf8,
                                nullable: true,
                                dict_id: 0,
                                dict_is_ordered: false,
                                metadata: {},
                            },
                            Field {
                                name: "2",
                                data_type: Utf8,
                                nullable: true,
                                dict_id: 0,
                                dict_is_ordered: false,
                                metadata: {},
                            },
                            Field {
                                name: "userPrimaryKey",
                                data_type: Utf8,
                                nullable: false,
                                dict_id: 0,
                                dict_is_ordered: false,
                                metadata: {},
                            },
                            Field {
                                name: "id",
                                data_type: Utf8,
                                nullable: false,
                                dict_id: 0,
                                dict_is_ordered: false,
                                metadata: {},
                            },
                            Field {
                                name: "lastUpdateTime",
                                data_type: Int64,
                                nullable: false,
                                dict_id: 0,
                                dict_is_ordered: false,
                                metadata: {},
                            },
                            Field {
                                name: "isDeleted",
                                data_type: Boolean,
                                nullable: false,
                                dict_id: 0,
                                dict_is_ordered: false,
                                metadata: {},
                            },
                            Field {
                                name: "dataFusionRowNumber",
                                data_type: UInt64,
                                nullable: false,
                                dict_id: 0,
                                dict_is_ordered: false,
                                metadata: {},
                            },
                        ],
                        metadata: {},
                    },
                },
                partitioning: RoundRobinBatch(
                    8,
                ),
                emission_type: Final,
                boundedness: Bounded,
                output_ordering: Some(
                    LexOrdering {
                        inner: [
                            PhysicalSortExpr {
                                expr: Column {
                                    name: "userPrimaryKey",
                                    index: 2,
                                },
                                options: SortOptions {
                                    descending: false,
                                    nulls_first: true,
                                },
                            },
                            PhysicalSortExpr {
                                expr: Column {
                                    name: "dataFusionRowNumber",
                                    index: 6,
                                },
                                options: SortOptions {
                                    descending: false,
                                    nulls_first: false,
                                },
                            },
                        ],
                    },
                ),
            },
        },
        metrics: ExecutionPlanMetricsSet {
            inner: Mutex {
                data: MetricsSet {
                    metrics: [],
                },
            },
        },
        cache: PlanProperties {
            eq_properties: EquivalenceProperties {
                eq_group: EquivalenceGroup {
                    classes: [],
                },
                oeq_class: OrderingEquivalenceClass {
                    orderings: [
                        LexOrdering {
                            inner: [
                                PhysicalSortExpr {
                                    expr: Column {
                                        name: "userPrimaryKey",
                                        index: 2,
                                    },
                                    options: SortOptions {
                                        descending: false,
                                        nulls_first: true,
                                    },
                                },
                            ],
                        },
                        LexOrdering {
                            inner: [
                                PhysicalSortExpr {
                                    expr: Column {
                                        name: "fileRowNumber",
                                        index: 6,
                                    },
                                    options: SortOptions {
                                        descending: false,
                                        nulls_first: false,
                                    },
                                },
                            ],
                        },
                    ],
                },
                constants: [],
                constraints: Constraints {
                    inner: [],
                },
                schema: Schema {
                    fields: [
                        Field {
                            name: "1",
                            data_type: Utf8,
                            nullable: true,
                            dict_id: 0,
                            dict_is_ordered: false,
                            metadata: {},
                        },
                        Field {
                            name: "2",
                            data_type: Utf8,
                            nullable: true,
                            dict_id: 0,
                            dict_is_ordered: false,
                            metadata: {},
                        },
                        Field {
                            name: "userPrimaryKey",
                            data_type: Utf8,
                            nullable: false,
                            dict_id: 0,
                            dict_is_ordered: false,
                            metadata: {},
                        },
                        Field {
                            name: "id",
                            data_type: Utf8,
                            nullable: false,
                            dict_id: 0,
                            dict_is_ordered: false,
                            metadata: {},
                        },
                        Field {
                            name: "lastUpdateTime",
                            data_type: Int64,
                            nullable: false,
                            dict_id: 0,
                            dict_is_ordered: false,
                            metadata: {},
                        },
                        Field {
                            name: "isDeleted",
                            data_type: Boolean,
                            nullable: false,
                            dict_id: 0,
                            dict_is_ordered: false,
                            metadata: {},
                        },
                        Field {
                            name: "fileRowNumber",
                            data_type: Int32,
                            nullable: false,
                            dict_id: 0,
                            dict_is_ordered: false,
                            metadata: {},
                        },
                    ],
                    metadata: {},
                },
            },
            partitioning: RoundRobinBatch(
                8,
            ),
            emission_type: Final,
            boundedness: Bounded,
            output_ordering: Some(
                LexOrdering {
                    inner: [
                        PhysicalSortExpr {
                            expr: Column {
                                name: "userPrimaryKey",
                                index: 2,
                            },
                            options: SortOptions {
                                descending: false,
                                nulls_first: true,
                            },
                        },
                        PhysicalSortExpr {
                            expr: Column {
                                name: "fileRowNumber",
                                index: 6,
                            },
                            options: SortOptions {
                                descending: false,
                                nulls_first: false,
                            },
                        },
                    ],
                },
            ),
        },
    },
    metrics: ExecutionPlanMetricsSet {
        inner: Mutex {
            data: MetricsSet {
                metrics: [],
            },
        },
    },
    cache: PlanProperties {
        eq_properties: EquivalenceProperties {
            eq_group: EquivalenceGroup {
                classes: [],
            },
            oeq_class: OrderingEquivalenceClass {
                orderings: [],
            },
            constants: [],
            constraints: Constraints {
                inner: [],
            },
            schema: Schema {
                fields: [
                    Field {
                        name: "1",
                        data_type: Utf8,
                        nullable: true,
                        dict_id: 0,
                        dict_is_ordered: false,
                        metadata: {},
                    },
                    Field {
                        name: "2",
                        data_type: Utf8,
                        nullable: true,
                        dict_id: 0,
                        dict_is_ordered: false,
                        metadata: {},
                    },
                    Field {
                        name: "userPrimaryKey",
                        data_type: Utf8,
                        nullable: false,
                        dict_id: 0,
                        dict_is_ordered: false,
                        metadata: {},
                    },
                    Field {
                        name: "id",
                        data_type: Utf8,
                        nullable: false,
                        dict_id: 0,
                        dict_is_ordered: false,
                        metadata: {},
                    },
                    Field {
                        name: "lastUpdateTime",
                        data_type: Int64,
                        nullable: false,
                        dict_id: 0,
                        dict_is_ordered: false,
                        metadata: {},
                    },
                    Field {
                        name: "isDeleted",
                        data_type: Boolean,
                        nullable: false,
                        dict_id: 0,
                        dict_is_ordered: false,
                        metadata: {},
                    },
                    Field {
                        name: "fileRowNumber",
                        data_type: Int32,
                        nullable: false,
                        dict_id: 0,
                        dict_is_ordered: false,
                        metadata: {},
                    },
                ],
                metadata: {},
            },
        },
        partitioning: UnknownPartitioning(
            1,
        ),
        emission_type: Final,
        boundedness: Bounded,
        output_ordering: None,
    },
    fetch: None,
}

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't working

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions