Skip to content

Conversation

@aocsa
Copy link
Contributor

@aocsa aocsa commented Sep 22, 2021

Instead of immediately pushing to outputs, ExecNodes should package their work into tasks and defer execution to allow more sophisticated external scheduling. This will also relieve pipeline breaking ExecNodes of the responsibility to ensure their work is correctly parallelized.

@github-actions
Copy link

@aocsa aocsa marked this pull request as draft September 22, 2021 13:46
Copy link
Member

@westonpace westonpace left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like a reasonable start. I'm not sure I fully understand the task scheduler right now and it would be nice if we could avoid duplication between the nodes on some of the task management work.

@aocsa aocsa marked this pull request as ready for review September 28, 2021 14:52
@aocsa
Copy link
Contributor Author

aocsa commented Sep 28, 2021

I update this PR and I think is ready to review. I was struggling trying to work fine with AsyncTaskGroup but it seems this util has an issue when concurrent thread are trying to know when all_task are done. See here: https://gist.github.com/aocsa/c4e66e9061d1dc9f1a0e8253ffad2be3

So now I am using TaskGroup class in async mode with a stop_token to cancel tasks when ExecNode::StopProducing is requested. cc @felipeblazing, @westonpace

Copy link
Member

@lidavidm lidavidm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some implementation-level comments. Thanks for finding the error in AsyncTaskGroup; we should figure out what's going on there as well.

@lidavidm
Copy link
Member

Ah, for AsyncTaskGroup, I think what's happening is that it allows adding tasks even after WaitForTaskToFinish, but it's assumed that this will only be done by another task. But for this PR, presumably that's not the case - we're adding tasks externally so that invariant may be violated, and so running_tasks_ may hit 0, the future may be marked finished, and then a concurrent call may add another task. So the semantics of AsyncTaskGroup don't quite fit what we want here.

@westonpace
Copy link
Member

FYI, in the new dataset writer node PR, I've renamed WaitForTaskToFinish to End so it's clear that it should only be called once. I've also added OnFinished which returns a future and can be accessed early. So you can call OnFinished as much as you want (e.g. before you add any tasks even) but you only call End once. I think this clears up the confusion.

Check it out here: https://github.com/apache/arrow/pull/11017/files

If you want to use this I can pull out the changes to AsyncTaskGroup and submit them as a small PR on their own today. Otherways, it will probably be some ways off before 11017 merges.

Copy link
Member

@westonpace westonpace left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the idea of combining similar logic in one place but I don't think this implementation is not going to make sense for all exec nodes. Rather than write one huge comment I drafted up a bare sketch of an idea here: https://docs.google.com/document/d/1aJWvU-9fxnl8eE6Iw-IebVVs8wxkZ30f1rNpSBOBgmY/edit#heading=h.qv2yr7aeayxs

I didn't spend too much time on it so feel free to reject it out of the box. However, I think we will want to either create various ExecNode subclasses or create an "exec node runner" concept. This way we keep the base ExecNode API pure. This allows nodes that need to have a rather custom implementation to keep their custom implementation and nodes that can share common logic can do so.

@aocsa
Copy link
Contributor Author

aocsa commented Oct 1, 2021

I refactor this PR based of your feedbacks.
Main changes:

  • Refactor using composition to create two implementations MakeSimpleSyncRunner and MakeSimpleParallelRunner for the use in filter and project node.
  • Fixed some race condition
  • Use of AsyncTaskGroup again.
    cc @westonpace, @felipeblazing, @lidavidm

@westonpace westonpace self-requested a review October 2, 2021 05:24
@aocsa aocsa force-pushed the aocsa/ARROW-13576 branch 2 times, most recently from ae54b32 to 6c8ebde Compare October 3, 2021 00:48
Copy link
Member

@westonpace westonpace left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some thoughts in addition to David's suggestion of a base class. I see the challenge of the composition approach better now. Since the runner doesn't have visibility of the inputs/outputs it's difficult for the runner to take care of things like forwarding signals backwards and forwards.

Copy link
Member

@lidavidm lidavidm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the updates. I left some small comments but this looks good to me.

@aocsa
Copy link
Contributor Author

aocsa commented Oct 6, 2021

Thanks @lidavidm, I rebased this PR and addressed latest feedback. I think this PR is ready for merging. Let me know if anything needs to be taken care of from my end to merge this into PR. cc @felipeblazing, @westonpace

Copy link
Member

@lidavidm lidavidm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. LGTM, I'll let Weston/Felipe comment though.

@lidavidm
Copy link
Member

lidavidm commented Oct 7, 2021

@ursabot please benchmark lang=R

@aocsa
Copy link
Contributor Author

aocsa commented Oct 8, 2021

Thanks @weston, I rebased this PR and addressed latest feedback. Moreover I ran some benchmarks to see the impact of: 1. the possible issue with ExecBatch copies; 2. async mode execution.

Note: I runned the benchmark with the following code and with this machine configuration:

Run on (16 X 3600 MHz CPU s) with 32Gb RAM
CPU Caches:
  L1 Data 32 KiB (x8)
  L1 Instruction 32 KiB (x8)
  L2 Unified 512 KiB (x8)
  L3 Unified 16384 KiB (x2)

1.1 Sync mode with the lambda task function capturing batches by copy

Benchmark                                               Time             CPU   Iterations UserCounters...
---------------------------------------------------------------------------------------------------------
MinimalEndToEndBench/100/10/min_time:1.000           3.46 ms         1.71 ms          812 items_per_second=586.126/s
MinimalEndToEndBench/1000/100/min_time:1.000         52.0 ms         38.0 ms           36 items_per_second=26.3494/s
MinimalEndToEndBench/10000/100/min_time:1.000        1102 ms          997 ms            2 items_per_second=1.00278/s
MinimalEndToEndBench/10000/1000/min_time:1.000       4752 ms         4644 ms            1 items_per_second=0.215319/s

1.2 Sync mode with the lambda task function capturing batches by move with std::bind

---------------------------------------------------------------------------------------------------------
Benchmark                                               Time             CPU   Iterations UserCounters...
---------------------------------------------------------------------------------------------------------
MinimalEndToEndBench/100/10/min_time:1.000           3.02 ms         1.62 ms          885 items_per_second=617.506/s
MinimalEndToEndBench/1000/100/min_time:1.000         51.7 ms         37.9 ms           35 items_per_second=26.4141/s
MinimalEndToEndBench/10000/100/min_time:1.000        1132 ms         1041 ms            1 items_per_second=0.961052/s
MinimalEndToEndBench/10000/1000/min_time:1.000       4795 ms         4680 ms            1 items_per_second=0.213687/s

As you can see ExecBatch copies were not the culprit of worst performance in some queries because there aren't "that" many ExecBatch instances and the copy is cheap (most of the case just references).

  1. Execution without/with ThreadPool
without threadpool
------------------------------------------------------------------------------------------------------------
Benchmark                                                  Time             CPU   Iterations UserCounters...
------------------------------------------------------------------------------------------------------------
MinimalEndToEndBench/100/10/0/min_time:1.000            2.33 ms         2.33 ms          601 items_per_second=428.441/s
MinimalEndToEndBench/1000/100/0/min_time:1.000          46.8 ms         46.8 ms           30 items_per_second=21.3571/s
MinimalEndToEndBench/10000/100/0/min_time:1.000         1172 ms         1172 ms            1 items_per_second=0.853482/s
MinimalEndToEndBench/10000/1000/0/min_time:1.000        4906 ms         4905 ms            1 items_per_second=0.203876/s
MinimalEndToEndBench/10000/10000/0/min_time:1.000      52141 ms        52129 ms            1 items_per_second=0.0191832/s

with threadpool
MinimalEndToEndBench/100/10/1/min_time:1.000            3.87 ms         1.87 ms          745 items_per_second=533.584/s
MinimalEndToEndBench/1000/100/1/min_time:1.000          54.1 ms         38.3 ms           37 items_per_second=26.09/s
MinimalEndToEndBench/10000/100/1/min_time:1.000         1153 ms         1010 ms            1 items_per_second=0.990056/s
MinimalEndToEndBench/10000/1000/1/min_time:1.000        4771 ms         4624 ms            1 items_per_second=0.216249/s
MinimalEndToEndBench/10000/10000/1/min_time:1.000      49761 ms        49578 ms            1 items_per_second=0.0201702/s

As you can see performance when the workload is small is worst in async mode but when the workload is huge the performance tends to be better.

Looking forward your thoughts. cc @felipeblazing

Copy link
Member

@lidavidm lidavidm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the updates! I left some comments on the benchmark.

@aocsa
Copy link
Contributor Author

aocsa commented Oct 8, 2021

Thanks @lidavidm, I update the benchmark code.

Here the last numbers

-------------------------------------------------------------------------------------------------------------------------------------------
Benchmark                                                                                 Time             CPU   Iterations UserCounters...
-------------------------------------------------------------------------------------------------------------------------------------------
MinimalEndToEndBench/num_batches:100/batch_size:10/use_executor:1/real_time         3035047 ns      1055366 ns          233 bytes_per_second=1.5711M/s items_per_second=32.9484k/s
MinimalEndToEndBench/num_batches:100/batch_size:10/use_executor:0/real_time         2431543 ns      2430187 ns          289 bytes_per_second=1.96105M/s items_per_second=41.1262k/s
MinimalEndToEndBench/num_batches:100/batch_size:100/use_executor:1/real_time        3057831 ns      1067603 ns          225 bytes_per_second=15.594M/s items_per_second=32.7029k/s
MinimalEndToEndBench/num_batches:100/batch_size:100/use_executor:0/real_time        2551493 ns      2550704 ns          272 bytes_per_second=18.6886M/s items_per_second=39.1927k/s
MinimalEndToEndBench/num_batches:100/batch_size:1000/use_executor:1/real_time       2963222 ns      1082264 ns          233 bytes_per_second=160.918M/s items_per_second=33.747k/s
MinimalEndToEndBench/num_batches:100/batch_size:1000/use_executor:0/real_time       3571509 ns      3570340 ns          202 bytes_per_second=133.511M/s items_per_second=27.9994k/s
MinimalEndToEndBench/num_batches:1000/batch_size:10/use_executor:1/real_time       24303552 ns      7684245 ns           30 bytes_per_second=1.96201M/s items_per_second=41.1462k/s
MinimalEndToEndBench/num_batches:1000/batch_size:10/use_executor:0/real_time       25323049 ns     25313647 ns           28 bytes_per_second=1.88302M/s items_per_second=39.4897k/s
MinimalEndToEndBench/num_batches:1000/batch_size:100/use_executor:1/real_time      24240980 ns      7592179 ns           28 bytes_per_second=19.6707M/s items_per_second=41.2525k/s
MinimalEndToEndBench/num_batches:1000/batch_size:100/use_executor:0/real_time      26440865 ns     26432060 ns           26 bytes_per_second=18.0341M/s items_per_second=37.8202k/s
MinimalEndToEndBench/num_batches:1000/batch_size:1000/use_executor:1/real_time     24367160 ns      7565095 ns           29 bytes_per_second=195.688M/s items_per_second=41.0388k/s
MinimalEndToEndBench/num_batches:1000/batch_size:1000/use_executor:0/real_time     40216462 ns     40199735 ns           17 bytes_per_second=118.568M/s items_per_second=24.8654k/s
MinimalEndToEndBench/num_batches:10000/batch_size:10/use_executor:1/real_time     227825184 ns     68188690 ns            3 bytes_per_second=2.093M/s items_per_second=43.8933k/s
MinimalEndToEndBench/num_batches:10000/batch_size:10/use_executor:0/real_time     275693035 ns    274119580 ns            3 bytes_per_second=1.72959M/s items_per_second=36.2722k/s
MinimalEndToEndBench/num_batches:10000/batch_size:100/use_executor:1/real_time    226319606 ns     74475842 ns            3 bytes_per_second=21.0692M/s items_per_second=44.1853k/s
MinimalEndToEndBench/num_batches:10000/batch_size:100/use_executor:0/real_time    289676886 ns    289590532 ns            2 bytes_per_second=16.461M/s items_per_second=34.5212k/s
MinimalEndToEndBench/num_batches:10000/batch_size:1000/use_executor:1/real_time   236291693 ns     69614625 ns            3 bytes_per_second=201.8M/s items_per_second=42.3206k/s
MinimalEndToEndBench/num_batches:10000/batch_size:1000/use_executor:0/real_time   396367253 ns    396224612 ns            2 bytes_per_second=120.302M/s items_per_second=25.2291k/s
MinimalEndToEndBench/num_batches:100000/batch_size:10/use_executor:1/real_time   2060422883 ns    584906294 ns            1 bytes_per_second=2.31427M/s items_per_second=48.5337k/s
MinimalEndToEndBench/num_batches:100000/batch_size:10/use_executor:0/real_time   2297644557 ns   2296919513 ns            1 bytes_per_second=2.07533M/s items_per_second=43.5228k/s
MinimalEndToEndBench/num_batches:100000/batch_size:100/use_executor:1/real_time  2303122407 ns    645052654 ns            1 bytes_per_second=20.7039M/s items_per_second=43.4193k/s
MinimalEndToEndBench/num_batches:100000/batch_size:100/use_executor:0/real_time  2475248324 ns   2474333854 ns            1 bytes_per_second=19.2642M/s items_per_second=40.4k/s
MinimalEndToEndBench/num_batches:100000/batch_size:1000/use_executor:1/real_time 2235447040 ns    650730679 ns            1 bytes_per_second=213.307M/s items_per_second=44.7338k/s
MinimalEndToEndBench/num_batches:100000/batch_size:1000/use_executor:0/real_time 3766306440 ns   3764680381 ns            1 bytes_per_second=126.606M/s items_per_second=26.5512k/s

As you can see again performance in async mode when the workload is small is not good but when the workload is huge the performance tends to be better.

@westonpace
Copy link
Member

This benchmark is very cool and we certainly need more benchmarks to understand the overhead of ExecPlan. I'm not sure it explains the conbench results however. The benchmark creates a Scan->Filter->Project plan and then tests between serial & threaded.

However, the conbench result isn't a difference between has_executor=true and has_executor=false.

The thing I most want to understand is the difference in performance between "thread per batch" and "thread per operation". In other words, with your change we now create 3 tasks for every batch in threaded mode. Before your change we created 1 task for every batch.

Can you add a flag to your benchmark that only controls whether the filter & project steps create a new task or use the serial runner. There would be two modes:

Thread Per Batch Mode:

  • ExecPlan::executor is set
  • SourceNode creates tasks
  • FilterNode and ProjectNode user serial runner

Thread Per Operation Mode:

  • ExecPlan::executor is set (it is set in both)
  • SourceNode creates tasks
  • FilterNode and ProjectNode user parallel runner and create tasks

@aocsa aocsa force-pushed the aocsa/ARROW-13576 branch from 16df46f to a985186 Compare October 11, 2021 14:43
@aocsa
Copy link
Contributor Author

aocsa commented Oct 11, 2021

Thanks @westonpace, I made some changed to enable , Thread Per Batch Mode (async_mode=false) & Thread Per Operation Mode (async_mode=true).

As u mentioned before, this benchmark shows that spawning new thread tasks don't reduced our core usage too much (The percentage difference vary between 1% - 5%), though It don't increase core usage either.

-----------------------------------------------------------------------------------------------------------------------------------------
Benchmark                                                                               Time             CPU   Iterations UserCounters...
-----------------------------------------------------------------------------------------------------------------------------------------
MinimalEndToEndBench/num_batches:100/batch_size:10/async_mode:1/real_time         3076857 ns      1061716 ns          225 bytes_per_second=1.54975M/s items_per_second=32.5007k/s
MinimalEndToEndBench/num_batches:100/batch_size:10/async_mode:0/real_time         2987236 ns      1020221 ns          234 bytes_per_second=1.59625M/s items_per_second=33.4758k/s
MinimalEndToEndBench/num_batches:100/batch_size:100/async_mode:1/real_time        3124765 ns      1075070 ns          224 bytes_per_second=15.2599M/s items_per_second=32.0024k/s
MinimalEndToEndBench/num_batches:100/batch_size:100/async_mode:0/real_time        3028913 ns      1031486 ns          232 bytes_per_second=15.7428M/s items_per_second=33.0151k/s
MinimalEndToEndBench/num_batches:100/batch_size:1000/async_mode:1/real_time       3013393 ns      1067201 ns          231 bytes_per_second=158.239M/s items_per_second=33.1852k/s
MinimalEndToEndBench/num_batches:100/batch_size:1000/async_mode:0/real_time       2982961 ns      1052526 ns          232 bytes_per_second=159.854M/s items_per_second=33.5237k/s
MinimalEndToEndBench/num_batches:100/batch_size:5000/async_mode:1/real_time       2996094 ns      1034481 ns          231 bytes_per_second=795.765M/s items_per_second=33.3768k/s
MinimalEndToEndBench/num_batches:100/batch_size:5000/async_mode:0/real_time       2918542 ns      1022779 ns          238 bytes_per_second=816.91M/s items_per_second=34.2637k/s
MinimalEndToEndBench/num_batches:1000/batch_size:10/async_mode:1/real_time       24135715 ns      7412783 ns           28 bytes_per_second=1.97565M/s items_per_second=41.4324k/s
MinimalEndToEndBench/num_batches:1000/batch_size:10/async_mode:0/real_time       24077781 ns      7391717 ns           30 bytes_per_second=1.9804M/s items_per_second=41.5321k/s
MinimalEndToEndBench/num_batches:1000/batch_size:100/async_mode:1/real_time      24011509 ns      7461978 ns           29 bytes_per_second=19.8587M/s items_per_second=41.6467k/s
MinimalEndToEndBench/num_batches:1000/batch_size:100/async_mode:0/real_time      24196028 ns      7541171 ns           30 bytes_per_second=19.7072M/s items_per_second=41.3291k/s
MinimalEndToEndBench/num_batches:1000/batch_size:1000/async_mode:1/real_time     23608108 ns      7296256 ns           28 bytes_per_second=201.98M/s items_per_second=42.3583k/s
MinimalEndToEndBench/num_batches:1000/batch_size:1000/async_mode:0/real_time     23518460 ns      7353338 ns           29 bytes_per_second=202.75M/s items_per_second=42.5198k/s
MinimalEndToEndBench/num_batches:1000/batch_size:5000/async_mode:1/real_time     26493311 ns      7815151 ns           27 bytes_per_second=899.92M/s items_per_second=37.7454k/s
MinimalEndToEndBench/num_batches:1000/batch_size:5000/async_mode:0/real_time     26073649 ns      7780264 ns           27 bytes_per_second=914.404M/s items_per_second=38.3529k/s
MinimalEndToEndBench/num_batches:10000/batch_size:10/async_mode:1/real_time     223436234 ns     70675962 ns            3 bytes_per_second=2.13411M/s items_per_second=44.7555k/s
MinimalEndToEndBench/num_batches:10000/batch_size:10/async_mode:0/real_time     220303752 ns     70971486 ns            3 bytes_per_second=2.16445M/s items_per_second=45.3919k/s
MinimalEndToEndBench/num_batches:10000/batch_size:100/async_mode:1/real_time    228117278 ns     70758735 ns            3 bytes_per_second=20.9032M/s items_per_second=43.8371k/s
MinimalEndToEndBench/num_batches:10000/batch_size:100/async_mode:0/real_time    216247699 ns     70714074 ns            3 bytes_per_second=22.0505M/s items_per_second=46.2433k/s
MinimalEndToEndBench/num_batches:10000/batch_size:1000/async_mode:1/real_time   228636211 ns     72468405 ns            3 bytes_per_second=208.557M/s items_per_second=43.7376k/s
MinimalEndToEndBench/num_batches:10000/batch_size:1000/async_mode:0/real_time   225630993 ns     71128965 ns            3 bytes_per_second=211.335M/s items_per_second=44.3202k/s
MinimalEndToEndBench/num_batches:10000/batch_size:5000/async_mode:1/real_time   248843462 ns     73516841 ns            3 bytes_per_second=958.107M/s items_per_second=40.1859k/s
MinimalEndToEndBench/num_batches:10000/batch_size:5000/async_mode:0/real_time   239517144 ns     73220804 ns            3 bytes_per_second=995.413M/s items_per_second=41.7507k/s
MinimalEndToEndBench/num_batches:100000/batch_size:10/async_mode:1/real_time   2122319750 ns    601241313 ns            1 bytes_per_second=2.24677M/s items_per_second=47.1183k/s
MinimalEndToEndBench/num_batches:100000/batch_size:10/async_mode:0/real_time   2101197844 ns    587004432 ns            1 bytes_per_second=2.26936M/s items_per_second=47.5919k/s
MinimalEndToEndBench/num_batches:100000/batch_size:100/async_mode:1/real_time  2128421476 ns    629112804 ns            1 bytes_per_second=22.4033M/s items_per_second=46.9832k/s
MinimalEndToEndBench/num_batches:100000/batch_size:100/async_mode:0/real_time  2100455252 ns    629750934 ns            1 bytes_per_second=22.7016M/s items_per_second=47.6087k/s
MinimalEndToEndBench/num_batches:100000/batch_size:1000/async_mode:1/real_time 2175799302 ns    629577506 ns            1 bytes_per_second=219.155M/s items_per_second=45.9601k/s
MinimalEndToEndBench/num_batches:100000/batch_size:1000/async_mode:0/real_time 2128841169 ns    627977128 ns            1 bytes_per_second=223.989M/s items_per_second=46.9739k/s
MinimalEndToEndBench/num_batches:100000/batch_size:5000/async_mode:1/real_time 2382353053 ns    651938913 ns            1 bytes_per_second=1000.77M/s items_per_second=41.9753k/s
MinimalEndToEndBench/num_batches:100000/batch_size:5000/async_mode:0/real_time 2294256145 ns    638338052 ns            1 bytes_per_second=1039.2M/s items_per_second=43.5871k/s

This benchmark is very cool and we certainly need more benchmarks to understand the overhead of ExecPlan. I'm not sure it explains the conbench results however. The benchmark creates a Scan->Filter->Project plan and then tests between serial & threaded.

However, the conbench result isn't a difference between has_executor=true and has_executor=false.

The thing I most want to understand is the difference in performance between "thread per batch" and "thread per operation". In other words, with your change we now create 3 tasks for every batch in threaded mode. Before your change we created 1 task for every batch.

Can you add a flag to your benchmark that only controls whether the filter & project steps create a new task or use the serial runner. There would be two modes:

Thread Per Batch Mode:

  • ExecPlan::executor is set
  • SourceNode creates tasks
  • FilterNode and ProjectNode user serial runner

Thread Per Operation Mode:

  • ExecPlan::executor is set (it is set in both)
  • SourceNode creates tasks
  • FilterNode and ProjectNode user parallel runner and create tasks

@ursabot
Copy link

ursabot commented Oct 12, 2021

Benchmark runs are scheduled for baseline = b2b2cbe and contender = 4f0ac50. 4f0ac50 is a master commit associated with this PR. Results will be available as each benchmark for each run completes.
Conbench compare runs links:
[Failed ⬇️0.0% ⬆️0.0%] ec2-t3-xlarge-us-east-2
[Finished ⬇️6.71% ⬆️0.0%] ursa-i9-9960x
[Failed ⬇️0.0% ⬆️0.0%] ursa-thinkcentre-m75q
Supported benchmarks:
ursa-i9-9960x: langs = Python, R, JavaScript
ursa-thinkcentre-m75q: langs = C++, Java
ec2-t3-xlarge-us-east-2: cloud = True

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants