[WIP] first draft of a controller-worker wrapper for heat#823
[WIP] first draft of a controller-worker wrapper for heat#823fschlimb wants to merge 24 commits intohelmholtz-analytics:mainfrom
Conversation
|
GPU cluster tests are currently disabled on this Pull Request. |
0d7edce to
d9b8c92
Compare
|
I used black versions 19 and 21 locally, always accepting current formatting (I used flags from pyproject.toml). |
|
the recommended way to get if you want to run the hooks without a commit the command is |
Codecov Report
@@ Coverage Diff @@
## main #823 +/- ##
==========================================
- Coverage 94.62% 87.04% -7.59%
==========================================
Files 65 69 +4
Lines 9976 10443 +467
==========================================
- Hits 9440 9090 -350
- Misses 536 1353 +817
Flags with carried forward coverage won't be shown. Click here to find out more.
Continue to review full report at Codecov.
|
aafd280 to
4a2727d
Compare
d81e612 to
8c34fd9
Compare
a99ba9a to
1b0a4bf
Compare
coquelin77
left a comment
There was a problem hiding this comment.
Things look pretty good here. good work!
There are some places where documentation could be a bit better, but i think that the best way to review this in the future would be to look at the distributor before the __init__.py. I left a few comments and questions in the code itself as well
| from array arguments. For this we assume that array arguments never occur | ||
| after non-array arguments. Each function.task handles and passes array-typed | ||
| and non-array-types arguments separately. | ||
| """ |
There was a problem hiding this comment.
it would be nice to have a simple code block to show how to run in CW mode. I can see it in the class below but maybe point to it here for clarity
|
|
||
|
|
||
| def _setComm(c): | ||
| # return impl.use_comm(impl.MPICommunication(c.Create(c.group.Excl([0])))) |
There was a problem hiding this comment.
Probably. In some version of this the controller did not participate in computation so the workers had their own communicator. It turned out that the communication overhead triggered by the non-HeAT code on the controller quickly becomes too high.
| For now we assume all ranks (controller and workers) are started through mpirun, | ||
| workers will never leave distributor.start() and so this function. |
There was a problem hiding this comment.
I think that this assumption is something that we should make generally. if we assume that the mpi ranks are spawned by mpirun/exec/slurm, then we can move between systems much more easily
There was a problem hiding this comment.
What "systems" are you referring to?
There was a problem hiding this comment.
the systems that i refer to here are general MPI capable clusters. If we build on the assumption that the ranks are started by the scheduler, it is probably a bit easier to work with in the future. I think that this comment was a bit superfluous on my end. if we were to set up some way to get the same general calls, we could just as easily use NCCL or gloo. My intention originally was to make the assumptions clear, but you have done that in this comment
| def reset(): | ||
| """ | ||
| Reset all internal state. | ||
| Distributed objects created before calling reset cannot be used afterwards. | ||
| """ | ||
| _runner.distributor.reset() | ||
|
|
||
|
|
||
| def sync(): | ||
| """ | ||
| Trigger all computation. | ||
| """ | ||
| _runner.distributor.go(True) |
There was a problem hiding this comment.
do these functions need to have global _runner called before to make sure that they are using the right one?
| def __exit__(self, exc_type, exc_value, exc_traceback): | ||
| if _runner.comm.rank == 0: | ||
| fini() |
There was a problem hiding this comment.
i am seeing _runner a lot already, should this be a class parameter instead of a global? would that break the syntax with Ray or the arrayapi?
There was a problem hiding this comment.
That's right. Passing it around would require a class-variable anyway unless we want to break the API. Semantically _runner is a module-private singleton, needed at various places within the module (e.g. in more than one class). Making it a module-global object looks like the most pythonic approach.
| """ | ||
| A task queue, each rank holds one for queuing up local tasks. | ||
| We currently dissallow submitting tasks by on-root ranks. | ||
| Non-root ranks get their TaskQueue set in the recv-lop if init(). |
There was a problem hiding this comment.
| Non-root ranks get their TaskQueue set in the recv-lop if init(). | |
| Non-root ranks get their TaskQueue set in the recv-lop of init(). |
| else: | ||
| print("Entering worker loop", flush=True) | ||
| done = False | ||
| header = None | ||
| while not done: | ||
| # wait in bcast for work | ||
| header = self._comm.bcast(header, 0) | ||
| # then see what we need to do | ||
| if header[0] == TASK: | ||
| self._tQueue._taskQueue = header[1] | ||
| elif header[0] == GET: | ||
| # We do not support arrays yet, scalars do not need communication | ||
| assert False | ||
| elif header[0] == GO: | ||
| self._tQueue.go() | ||
| if header[1]: | ||
| self._comm.Barrier() | ||
| elif header[0] == GETPART: | ||
| if self._comm.rank == header[1]: | ||
| val = _RemoteTask.getVal(header[2]) | ||
| attr = getattr(val, header[3]) | ||
| self._comm.send(attr, dest=0, tag=GETPART) | ||
| elif header[0] == PUBPART: | ||
| val = _RemoteTask.getVal(header[1]) | ||
| attr = header[3](getattr(val, header[2])) | ||
| self._comm.gather(attr, root=0) | ||
| elif header[0] == RESET: | ||
| _RemoteTask.reset() | ||
| self._tQueue.clear() | ||
| # Handle._reset() | ||
| elif header[0] == END: | ||
| done = True | ||
| self._comm.Barrier() | ||
| break | ||
| else: | ||
| raise Exception("Worker received unknown tag") | ||
| MPI.Finalize() | ||
| if doExit: | ||
| sys.exit() | ||
| return False |
There was a problem hiding this comment.
all of this can be dropped down a tab block. the other if block returns
| We keep a static dictionary mapping globally unique identifiers to dependent | ||
| global objects (like heat.DNDarrays). This keeps the objects alive and allows | ||
| communicating through simple integers. |
There was a problem hiding this comment.
could this become a memory leak for larger applications?
There was a problem hiding this comment.
I lately added some GC for arrays. Whenever an array gets deleted on the controller the corresponding objects will eventually get deleted from all these dicts, too. This does not work yet for non-array objects and I am not sure if/how we could do this automatically. There is a reset available to allow explicitly emptying the dicts.
| with ht.cw4h() as cw: | ||
| if cw.controller(): | ||
| a = ht.arange(8, split=0) | ||
| b = ht.ones(8, split=0) | ||
| c = a @ b | ||
| # assert hasattr(c, "__partitioned__") | ||
| print(type(c)) | ||
| p = c.__partitioned__() | ||
| print(c.shape, c, p) | ||
| for k, v in p["partitions"].items(): | ||
| print(k, p["get"](v["data"])) |
There was a problem hiding this comment.
regarding the possible memory leaks mentioned before, does cleanup of the cw object cleanup the handle dictionary?
There was a problem hiding this comment.
Hm, I don't think we can just reset the dict when cw gets out of scope because we might want some of the arrays/values to persist (e.g. maintain Python semantics). As mentioned above, at least the arrays should get correctly removed from the dict anyway.
There was a problem hiding this comment.
since we are targeting larger datasets in general, i think that removing the array is enough. If the dict gets so large that its a problem then the reset is always an option.
👇 Click on the image for a new way to code review
Legend |
|
This pull request is stale because it has been open for 60 days with no activity. |
|
This pull request was closed because it has been inactive for 60 days since being marked as stale. |

Description
A first draft for a simple wrapper allowing controller-worker style execution of numpy codes using HeAT.
Changes proposed:
Type of change
Due Diligence
Tested with 3 benchmarks: https://github.com/IntelPython/dpbench/tree/feature/dist/distributed
Just change import statement in the heat versions to activate cw4heat.
Does this change modify the behaviour of other functions? If so, which?
no
skip ci