Skip to content

Improvements in HashAggregationExec when spilling #7858

@milenkovicm

Description

@milenkovicm

Is your feature request related to a problem or challenge?

I'd like to share few observations after putting hash aggregation ( #7400) on a stress test. Datafusion v32 used.

Before I start, I apologise if I did not get something correctly, aggregation code has changed a lot since last time I had a look.

I've created a test case which should put under pressure hash aggregation, forcing it to spill, so I can observe query behaviour. Data contains about 250M rows, with 50M unique uids, which are going to be used to as aggregation key. It is around 3GB parquet file.

Memory pool is configured as follows:

let runtime_config = RuntimeConfig::new()
        .with_memory_pool(Arc::new(FairSpillPool::new(2_000_000_000)))

with 4 target partitions.

Query is rather simple:

select count(*), sum(id), sum(co), sum(n), uid from ta group by uid 

I was expecting that query will not fail with ResourcesExhausted as aggregation would spill if under memory pressure, this was not the case.

Describe the solution you'd like

Few low hanging fruits which can be addressed:

  1. sort_batch copies data, if I'm not mistaken. As spill is usually triggered under memory pressure, in most cases for all partitions around same time, it effectively doubles memory needed (in most cases I've observed ~2.5x more memory used than set up for memory pool).

https://github.com/apache/arrow-datafusion/blob/7acd8833cc5d03ba7643d4ae424553c7681ccce8/datafusion/physical-plan/src/aggregates/row_hash.rs#L672

  1. Spill writes whole state as single batch, which is problem later when we try to merge all those files,

https://github.com/apache/arrow-datafusion/blob/7acd8833cc5d03ba7643d4ae424553c7681ccce8/datafusion/physical-plan/src/aggregates/row_hash.rs#L676

  1. Not sure whats the reason for checks at:

https://github.com/apache/arrow-datafusion/blob/7acd8833cc5d03ba7643d4ae424553c7681ccce8/datafusion/physical-plan/src/aggregates/row_hash.rs#L591

and

https://github.com/apache/arrow-datafusion/blob/7acd8833cc5d03ba7643d4ae424553c7681ccce8/datafusion/physical-plan/src/aggregates/row_hash.rs#L699

Available improvements, IMHO:

  1. before spill, we could split batch into smaller blocks, sort those smaller blocks and write spill file per block, at the moment we write single file. Not sure what would be strategy for splitting batch into smaller blocks, we should take into account not to have to many open files as well.

  2. Write more than one batch per spill

fn spill(&mut self) -> Result<()> {
        let emit = self.emit(EmitTo::All, true)?;
        let sorted = sort_batch(&emit, &self.spill_state.spill_expr, None)?;
        let spillfile = self.runtime.disk_manager.create_tmp_file("HashAggSpill")?;
        let mut writer = IPCWriter::new(spillfile.path(), &emit.schema())?;
        // TODO: slice large `sorted` and write to multiple files in parallel
        let mut offset = 0;
        let total_rows = sorted.num_rows();

        while offset < total_rows {
            let length = std::cmp::min(total_rows - offset, self.batch_size);
            let batch = sorted.slice(offset, length);
            offset += batch.num_rows();
            writer.write(&batch)?;
        }

        writer.finish()?;
        self.spill_state.spills.push(spillfile);
        Ok(())
    }
  1. can we remove if at ?

https://github.com/apache/arrow-datafusion/blob/7acd8833cc5d03ba7643d4ae424553c7681ccce8/datafusion/physical-plan/src/aggregates/row_hash.rs#L591

and change if at

https://github.com/apache/arrow-datafusion/blob/7acd8833cc5d03ba7643d4ae424553c7681ccce8/datafusion/physical-plan/src/aggregates/row_hash.rs#L699

to self.group_values.len() > 0

it would make more sense to "send" smaller batch than fail with ResourcesExhausted

Describe alternatives you've considered

No other alternatives considered at the moment

Additional context

I have disabled (commented out) resource accounting in RepartitionExec as it would be the first one to freak out with ResourcesExhausted. From what I observed, RepartitionExec would hold memory for a few batches of data when it raises ResourcesExhausted. Change made in #4816 make sense, but in my test they were the first one to give up, before aggregation spill can occur.

Metadata

Metadata

Assignees

No one assigned

    Labels

    enhancementNew feature or request

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions