@@ -5,7 +5,7 @@ use datafusion::datasource::file_format::parquet::ParquetFormat;
55use datafusion:: datasource:: listing:: {
66 ListingOptions , ListingTable , ListingTableConfig , ListingTableUrl ,
77} ;
8- use datafusion_uwheel:: UWheelOptimizer ;
8+ use datafusion_uwheel:: { IndexBuilder , UWheelOptimizer } ;
99
1010use chrono:: { DateTime , NaiveDate , Utc } ;
1111use clap:: Parser ;
@@ -45,7 +45,7 @@ async fn main() -> Result<()> {
4545 let filename = "../../data/yellow_tripdata_2022-01.parquet" ;
4646
4747 // register parquet file with the execution context
48- ctx. register_parquet ( "yellow_tripdata" , & filename, ParquetReadOptions :: default ( ) )
48+ ctx. register_parquet ( "yellow_tripdata" , filename, ParquetReadOptions :: default ( ) )
4949 . await ?;
5050
5151 // Create ctx with UWheelOptimizer
@@ -78,14 +78,24 @@ async fn main() -> Result<()> {
7878 Builder :: new ( "tpep_dropoff_datetime" )
7979 . with_name ( "yellow_tripdata" )
8080 . with_min_max_wheels ( vec ! [ "fare_amount" , "trip_distance" ] ) // Create Min/Max wheels for the columns "fare_amount" and "trip_distance"
81- . with_sum_wheels ( vec ! [ "fare_amount" ] )
8281 . build_with_provider ( provider)
8382 . await
8483 . unwrap ( ) ,
8584 ) ;
8685
87- // Set UWheelOptimizer as the query planner
88- let session_state = uwheel_ctx. state ( ) . with_query_planner ( optimizer. clone ( ) ) ;
86+ // Build index on fare_amount using SUM as aggregate
87+ optimizer
88+ . build_index ( IndexBuilder :: with_col_and_aggregate (
89+ "fare_amount" ,
90+ datafusion_uwheel:: AggregateType :: Sum ,
91+ ) )
92+ . await
93+ . unwrap ( ) ;
94+
95+ // Set UWheelOptimizer as optimizer rule
96+ let session_state = uwheel_ctx
97+ . state ( )
98+ . with_optimizer_rules ( vec ! [ optimizer. clone( ) ] ) ;
8999 let uwheel_ctx = SessionContext :: new_with_state ( session_state) ;
90100
91101 // Register the table using the underlying provider
@@ -131,33 +141,33 @@ pub async fn bench(
131141 ranges : & [ ( u64 , u64 ) ] ,
132142 fares : & [ f64 ] ,
133143) {
134- bench_datafusion_count ( "datafusion-count(*)" , & ctx, & ranges) . await ;
135- bench_datafusion_count ( "datafusion-uwheel-count(*)" , & uwheel_ctx, & ranges) . await ;
144+ bench_datafusion_count ( "datafusion-count(*)" , ctx, ranges) . await ;
145+ bench_datafusion_count ( "datafusion-uwheel-count(*)" , uwheel_ctx, ranges) . await ;
136146
137- bench_datafusion_sum_fare_amount ( "datafusion-sum(fare_amount)" , & ctx, & ranges) . await ;
138- bench_datafusion_sum_fare_amount ( "datafusion-uwheel-sum(fare_amount)" , & uwheel_ctx, & ranges)
147+ bench_datafusion_sum_fare_amount ( "datafusion-sum(fare_amount)" , ctx, ranges) . await ;
148+ bench_datafusion_sum_fare_amount ( "datafusion-uwheel-sum(fare_amount)" , uwheel_ctx, ranges)
139149 . await ;
140150
141151 bench_min_max_projection (
142152 "datafusion-select(*)-fare-amount-filter" ,
143- & ctx,
144- & ranges,
145- & fares,
153+ ctx,
154+ ranges,
155+ fares,
146156 )
147157 . await ;
148158 bench_min_max_projection (
149159 "datafusion-uwheel-select(*)-fare-amount-filter" ,
150- & uwheel_ctx,
151- & ranges,
152- & fares,
160+ uwheel_ctx,
161+ ranges,
162+ fares,
153163 )
154164 . await ;
155165
156- bench_datafusion_temporal_projection ( "datafusion-select(*)-count-filter" , & ctx, & ranges) . await ;
166+ bench_datafusion_temporal_projection ( "datafusion-select(*)-count-filter" , ctx, ranges) . await ;
157167 bench_datafusion_temporal_projection (
158168 "datafusion-uwheel-select(*)-count-filter" ,
159- & uwheel_ctx,
160- & ranges,
169+ uwheel_ctx,
170+ ranges,
161171 )
162172 . await ;
163173}
@@ -244,13 +254,11 @@ async fn bench_datafusion_count(id: &str, ctx: &SessionContext, ranges: &[(u64,
244254 . map ( |( start, end) | {
245255 let start = DateTime :: from_timestamp_millis ( start as i64 )
246256 . unwrap ( )
247- . to_utc ( )
248- . naive_utc ( )
257+ . to_rfc3339 ( )
249258 . to_string ( ) ;
250259 let end = DateTime :: from_timestamp_millis ( end as i64 )
251260 . unwrap ( )
252- . to_utc ( )
253- . naive_utc ( )
261+ . to_rfc3339 ( )
254262 . to_string ( ) ;
255263 format ! (
256264 "SELECT COUNT(*) FROM yellow_tripdata \
@@ -298,18 +306,16 @@ async fn bench_datafusion_sum_fare_amount(id: &str, ctx: &SessionContext, ranges
298306 . map ( |( start, end) | {
299307 let start = DateTime :: from_timestamp_millis ( start as i64 )
300308 . unwrap ( )
301- . to_utc ( )
302- . naive_utc ( )
309+ . to_rfc3339 ( )
303310 . to_string ( ) ;
304311 let end = DateTime :: from_timestamp_millis ( end as i64 )
305312 . unwrap ( )
306- . to_utc ( )
307- . naive_utc ( )
313+ . to_rfc3339 ( )
308314 . to_string ( ) ;
309315 format ! (
310316 "SELECT SUM(fare_amount) FROM yellow_tripdata \
311- WHERE tpep_dropoff_datetime >= '{}' \
312- AND tpep_dropoff_datetime < '{}'",
317+ WHERE tpep_dropoff_datetime >= '{}' \
318+ AND tpep_dropoff_datetime < '{}'",
313319 start, end
314320 )
315321 } )
@@ -358,13 +364,11 @@ async fn bench_min_max_projection(
358364 . map ( |( ( start, end) , fare) | {
359365 let start = DateTime :: from_timestamp_millis ( start as i64 )
360366 . unwrap ( )
361- . to_utc ( )
362- . naive_utc ( )
367+ . to_rfc3339 ( )
363368 . to_string ( ) ;
364369 let end = DateTime :: from_timestamp_millis ( end as i64 )
365370 . unwrap ( )
366- . to_utc ( )
367- . naive_utc ( )
371+ . to_rfc3339 ( )
368372 . to_string ( ) ;
369373 format ! (
370374 "SELECT * FROM yellow_tripdata \
@@ -382,7 +386,7 @@ async fn bench_min_max_projection(
382386 // dbg!(&query);
383387 let now = Instant :: now ( ) ;
384388 let df = ctx. sql ( & query) . await . unwrap ( ) ;
385- let res = df. collect ( ) . await . unwrap ( ) ;
389+ let _res = df. collect ( ) . await . unwrap ( ) ;
386390 hist. record ( now. elapsed ( ) . as_micros ( ) as u64 ) . unwrap ( ) ;
387391 }
388392 let runtime = full. elapsed ( ) ;
@@ -409,13 +413,11 @@ async fn bench_datafusion_temporal_projection(
409413 . map ( |( start, end) | {
410414 let start = DateTime :: from_timestamp_millis ( start as i64 )
411415 . unwrap ( )
412- . to_utc ( )
413- . naive_utc ( )
416+ . to_rfc3339 ( )
414417 . to_string ( ) ;
415418 let end = DateTime :: from_timestamp_millis ( end as i64 )
416419 . unwrap ( )
417- . to_utc ( )
418- . naive_utc ( )
420+ . to_rfc3339 ( )
419421 . to_string ( ) ;
420422 format ! (
421423 "SELECT * FROM yellow_tripdata \
@@ -432,7 +434,7 @@ async fn bench_datafusion_temporal_projection(
432434 // dbg!(&query);
433435 let now = Instant :: now ( ) ;
434436 let df = ctx. sql ( & query) . await . unwrap ( ) ;
435- let res = df. collect ( ) . await . unwrap ( ) ;
437+ let _res = df. collect ( ) . await . unwrap ( ) ;
436438 hist. record ( now. elapsed ( ) . as_micros ( ) as u64 ) . unwrap ( ) ;
437439 }
438440 let runtime = full. elapsed ( ) ;
0 commit comments