-
Notifications
You must be signed in to change notification settings - Fork 1.9k
Description
Is your feature request related to a problem or challenge?
I'd like to share few observations after putting hash aggregation ( #7400) on a stress test. Datafusion v32 used.
Before I start, I apologise if I did not get something correctly, aggregation code has changed a lot since last time I had a look.
I've created a test case which should put under pressure hash aggregation, forcing it to spill, so I can observe query behaviour. Data contains about 250M rows, with 50M unique uids, which are going to be used to as aggregation key. It is around 3GB parquet file.
Memory pool is configured as follows:
let runtime_config = RuntimeConfig::new()
.with_memory_pool(Arc::new(FairSpillPool::new(2_000_000_000)))with 4 target partitions.
Query is rather simple:
select count(*), sum(id), sum(co), sum(n), uid from ta group by uid I was expecting that query will not fail with ResourcesExhausted as aggregation would spill if under memory pressure, this was not the case.
Describe the solution you'd like
Few low hanging fruits which can be addressed:
sort_batchcopies data, if I'm not mistaken. As spill is usually triggered under memory pressure, in most cases for all partitions around same time, it effectively doubles memory needed (in most cases I've observed ~2.5x more memory used than set up for memory pool).
- Spill writes whole state as single batch, which is problem later when we try to merge all those files,
- Not sure whats the reason for checks at:
and
Available improvements, IMHO:
-
before spill, we could split batch into smaller blocks, sort those smaller blocks and write spill file per block, at the moment we write single file. Not sure what would be strategy for splitting batch into smaller blocks, we should take into account not to have to many open files as well.
-
Write more than one batch per spill
fn spill(&mut self) -> Result<()> {
let emit = self.emit(EmitTo::All, true)?;
let sorted = sort_batch(&emit, &self.spill_state.spill_expr, None)?;
let spillfile = self.runtime.disk_manager.create_tmp_file("HashAggSpill")?;
let mut writer = IPCWriter::new(spillfile.path(), &emit.schema())?;
// TODO: slice large `sorted` and write to multiple files in parallel
let mut offset = 0;
let total_rows = sorted.num_rows();
while offset < total_rows {
let length = std::cmp::min(total_rows - offset, self.batch_size);
let batch = sorted.slice(offset, length);
offset += batch.num_rows();
writer.write(&batch)?;
}
writer.finish()?;
self.spill_state.spills.push(spillfile);
Ok(())
}- can we remove
ifat ?
and change if at
to self.group_values.len() > 0
it would make more sense to "send" smaller batch than fail with ResourcesExhausted
Describe alternatives you've considered
No other alternatives considered at the moment
Additional context
I have disabled (commented out) resource accounting in RepartitionExec as it would be the first one to freak out with ResourcesExhausted. From what I observed, RepartitionExec would hold memory for a few batches of data when it raises ResourcesExhausted. Change made in #4816 make sense, but in my test they were the first one to give up, before aggregation spill can occur.