Skip to content

Mismatch in MemTable (Select Into with aggregate window functions having no alias) #6492

@berkaysynnada

Description

@berkaysynnada

Describe the bug

When writing query result to MemTable using SELECT .. INTO syntax, some queries gives error because of schema mismatch. As an example,

SELECT SUM(c1) OVER(ORDER BY c1) as sum1 INTO new_table FROM annotated_data_infinite
has no problem but
SELECT SUM(c1) OVER(ORDER BY c1) INTO new_table FROM annotated_data_infinite
gives an error: Plan("Mismatch between schema and batches").

The reason is that in MemTable::try_new(), the schema and partitions' schema don't match. I have tracked the reason and saw that the schema, which is created from the input LogicalPlan, has fields whose names are the result of display_name() (It writes the whole expression, func + window specs). However, the RecordBatch's fields of partitions are the result of physical_name(), in case of no alias. (It writes only the function part of the expr).

To Reproduce

SELECT SUM(c1) OVER(ORDER BY c1) INTO new_table FROM annotated_data_infinite

Expected behavior

I have 2 solution approaches:

  1. create_window_expr() gives the name with display_name() while constructing the window expr. However, there needs to be many changes in tests, and the exec lines will become too long like:
    ProjectionExec: expr=[c2@0 as c2, MAX(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@3 as MAX(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW, SUM(aggregate_test_100.c9) ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING@4 as SUM(aggregate_test_100.c9) ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING, MIN(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c2 ASC NULLS LAST, aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@2 as MIN(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c2 ASC NULLS LAST, aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]
    Maybe Display's can be changed?

  2. contains() function implemented for Schema is used in try_new() of MemTable to match fields, and it uses contains() implemented for Field. It checks one-to-one equalities of all elements in the Field struct. Just for the name element, we can reduce the equality to something like schema_field.name().starts_with(partition_field.name()). If we do not prefer changing contains() function, maybe we can write some specialized function like

fn validate_partitions_wth_schema(schema: &SchemaRef, partitions: &Vec<Vec<RecordBatch>>) -> bool {
    if !partitions.iter().flatten().all(|p| p.schema().fields().len() == schema.fields().len())
    { return false; }
    for partition in partitions.iter().flatten() {
        for (schema_field, partition_field) in schema.fields().iter().zip(partition.schema().fields().iter())
        {
            if !schema_field.name().starts_with(partition_field.name()) || 
                schema_field.data_type() != partition_field.data_type()
            { return false; }
        }
    }
    true
}

But this approach also does not seem solid to me.

Additional context

Any advice is welcomed. I will solve the issue when we reach a common ground.

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't working

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions