Optimise scheduler.get_comm_cost set difference#6931
Conversation
|
This is one simple go at addressing #6899. |
Unit Test ResultsSee test report for an extended history of previous test failures. This is useful for diagnosing flaky tests. 15 files ±0 15 suites ±0 6h 44m 23s ⏱️ - 1m 14s For more details on these failures, see this check. Results for commit ce87313. ± Comparison against base commit 2a2c3bb. ♻️ This comment has been updated with latest results. |
| if 10 * len(ts.dependencies) < len(ws.has_what): | ||
| # In the common case where the number of dependencies is | ||
| # much less than the number of tasks that we have, | ||
| # construct the set of deps that require communication in | ||
| # O(len(dependencies)) rather than O(len(has_what)) time. | ||
| # Factor of 10 is a guess at the overhead of explicit | ||
| # iteration as opposed to just calling set.difference | ||
| deps = {dep for dep in ts.dependencies if dep not in ws.has_what} | ||
| else: | ||
| deps = ts.dependencies.difference(ws.has_what) |
There was a problem hiding this comment.
Micro benchmarking this, I get a factor of ~2 rather than 10
for dict_size in [100, 1_000, 10_000, 100_000, 1_000_000]:
a_large_dict = {
f"{ix}-{uuid.uuid4()}": "foo"
for ix in range(dict_size)
}
def timing(func):
start = time.time_ns()
iterations = 10
for iteration in range(iterations):
func()
end = time.time_ns()
return (end-start)/iterations
for factor in [0.1, 0.4, 0.45, 0.5]:
small_set = set(sample(a_large_dict.keys(), int(factor * dict_size)))
intersect = timing(lambda: small_set.intersection(a_large_dict))
iterate = timing(lambda: {k for k in small_set if k in a_large_dict})
if iterate < intersect:
print(f"Iterating faster for {dict_size=} and {factor=}")Iterating faster for dict_size=100 and factor=0.1
Iterating faster for dict_size=1000 and factor=0.1
Iterating faster for dict_size=1000 and factor=0.4
Iterating faster for dict_size=1000 and factor=0.5
Iterating faster for dict_size=10000 and factor=0.1
Iterating faster for dict_size=10000 and factor=0.4
Iterating faster for dict_size=10000 and factor=0.45
Iterating faster for dict_size=100000 and factor=0.1
Iterating faster for dict_size=100000 and factor=0.4
Iterating faster for dict_size=100000 and factor=0.45
Iterating faster for dict_size=100000 and factor=0.5
Iterating faster for dict_size=1000000 and factor=0.1
There was a problem hiding this comment.
Conversely, on my (admittedly slightly antediluvian) Broadwell box, Python 3.9.13
Iterating faster for dict_size=100 and factor=0.1
Iterating faster for dict_size=1000 and factor=0.1
Iterating faster for dict_size=1000 and factor=0.4
Iterating faster for dict_size=10000 and factor=0.1
Iterating faster for dict_size=100000 and factor=0.1
Iterating faster for dict_size=1000000 and factor=0.1
Computing set(A).difference(B) is O(max(len(A), len(B))). When estimating the communication cost of a task's dependencies it is usual that the number of dependencies (A) will be small but the number of tasks the worker has (B) is large. In this case it is better to manually construct the set difference by iterating over A and checking if each element is in B. Performing a left.merge(right, on="key", how="inner) of a distributed dataframe with eight workers with chunks_per_worker * rows_per_chunk held constant, I observe the following timings using the tcp communication protocol: | chunks_per_worker | rows_per_chunk | before | after | |-------------------|----------------|--------|-------| | 100 | 50000 | 75s | 48s | | 10 | 500000 | ~9s | ~9s | | 1 | 5000000 | ~8s | ~8s |
01d4e05 to
ce87313
Compare
|
Updated commit message/summary for timings with tcp rather than UCX comms protocol (otherwise no change in the force push). I can adapt the heuristic for when to select between the two options but as above, the threshold varies depending on hardware. |
In my benchmarking of the workflow, a factor of 10 or 2 didn't really make a difference, I guess because the |
|
Thanks for checking about the factor again. I guess you are right and that's good enough |
Computing set(A).difference(B) is O(max(len(A), len(B))). When
estimating the communication cost of a task's dependencies it is usual
that the number of dependencies (A) will be small but the number of
tasks the worker has (B) is large. In this case it is better to
manually construct the set difference by iterating over A and checking
if each element is in B.
Performing a
left.merge(right, on="key", how="inner)of a distributeddataframe with eight workers with chunks_per_worker * rows_per_chunk
held constant, I observe the following timings using the tcp
communication protocol:
pre-commit run --all-files