[WB-6939] Add service backend using sockets (support fork)#2892
[WB-6939] Add service backend using sockets (support fork)#2892
Conversation
Codecov Report
@@ Coverage Diff @@
## master #2892 +/- ##
==========================================
- Coverage 78.83% 78.64% -0.19%
==========================================
Files 194 208 +14
Lines 26548 27212 +664
==========================================
+ Hits 20928 21400 +472
- Misses 5620 5812 +192
Flags with carried forward coverage won't be shown. Click here to find out more.
|
| parts = self._token_str.split("-") | ||
| assert len(parts) == 3, f"token must have 3 parts: {parts}" | ||
| assert len(parts) == 5, f"token must have 5 parts: {parts}" | ||
| # TODO: make more robust? |
There was a problem hiding this comment.
TODO: what if there arent 5 parts, what if they are malformed with different versions and non supported tokens.
i think this is fine for now, but it is a note to care about this more in the future and think about how it should fail rather than an assert.. It "shouldnt" ever fail, but there are cases we already know if where it might.
Start your training job, before call wandb.init() someone updates the wandb release with pip install --upgrade.... Now the user process is speaking a potentially different dialect than the server process. Eventually we should actually handle these cases by bumping a version and making sure things are compatible.
| pass | ||
|
|
||
|
|
||
| def _dict_from_pbmap(pbmap: "MessageMap[str, spb.SettingsValue]") -> Dict[str, Any]: |
There was a problem hiding this comment.
should it be under proto_util?
| self.pid = 0 | ||
|
|
||
| def run(self) -> None: | ||
| # TODO: catch exceptions and report errors to scheduler |
There was a problem hiding this comment.
| self._target = target | ||
| self._kwargs = kwargs | ||
| self.daemon = True | ||
| self.pid = 0 |
There was a problem hiding this comment.
what is this pid? why is it default to 0?
There was a problem hiding this comment.
not needed right now. removed
| if TYPE_CHECKING: | ||
| from google.protobuf.internal.containers import MessageMap | ||
|
|
||
| class GrpcServerType(object): |
There was a problem hiding this comment.
probably can remove this one, doesn't seem to be used
| relay_q=self._relay_q, | ||
| process=process, | ||
| process_check=False, | ||
| # use_router=False, |
|
|
||
| streams_to_join = [] | ||
| while streams: | ||
| for sid, stream in list(streams.items()): |
There was a problem hiding this comment.
why do you need to wrap it in a list?
There was a problem hiding this comment.
Added a comment:
# Note that we materialize the generator so we can modify the underlying list
| for stream in streams_to_join: | ||
| stream.join() | ||
|
|
||
| wandb.termlog("Done!") # type: ignore |
There was a problem hiding this comment.
maybe more informative message?
There was a problem hiding this comment.
This was covered in this ticket already, just havent taken the time to do it yet:
https://wandb.atlassian.net/browse/WB-6948
|
|
||
| def _loop(self) -> None: | ||
| while not self._stopped.is_set(): | ||
| # TODO: check for parent process going away |
There was a problem hiding this comment.
Do you want to address this TODO in a different PR? should we track this?
There was a problem hiding this comment.
| self._action_q = queue.Queue() | ||
|
|
||
| def _get_stopped_event(self) -> "Event": | ||
| # TODO: clean this up, there should be a better way to abstract this |
There was a problem hiding this comment.
This refers to the Event logic we talked about? Do you want to track this one as well?
There was a problem hiding this comment.
| - StreamRecord: | ||
| - WandbServicer: | ||
|
|
||
| """ |
There was a problem hiding this comment.
Probably want to update the docstring here?
There was a problem hiding this comment.
Improved:
+"""streams: class that manages internal threads for each run.
+StreamThread: Thread that runs internal.wandb_internal()
+StreamRecord: All the external state for the internal thread (queues, etc)
+StreamAction: Lightweight record for stream ops for thread safety with grpc
+StreamMux: Container for dictionary of stream threads per runid
"""
|
|
||
| def server_record_publish(self, sreq: "spb.ServerRequest") -> None: | ||
| record = sreq.record_publish | ||
| # print("GOT rec", record) |
| iface.record_q.put(record) | ||
|
|
||
| def server_inform_finish(self, sreq: "spb.ServerRequest") -> None: | ||
| # print("serv INF FIN") |
| self._mux.drop_stream(stream_id) | ||
|
|
||
| def server_inform_teardown(self, sreq: "spb.ServerRequest") -> None: | ||
| # print("serv INF TEARDOWN") |
| # print("GOT", type(conn)) | ||
| # print("Connected by", addr) |
| # print(f"Running at port: {self.port}") | ||
| self._thread = SockAcceptThread(sock=self._sock, mux=self._mux) | ||
| self._thread.start() | ||
| # self._dbg_thread = DebugThread(mux=self._mux) | ||
| # self._dbg_thread.start() |
Implements: WB-6939
Description
Changes the default "wandb-service" mode to use raw tcp sockets instead of GRPC.
This is a desirable change because:
This change (introducing a new transport method) is undesirable because:
Additional changes:
TODO:
Disable(works now)RunStatusCheckerfor nowFuture:
RunStatusCheckeragain (WB-7352)Testing
functional tests only (more unittests can come later once things stabilize)