Skip to content

[WB-6939] Add service backend using sockets (support fork)#2892

Merged
raubitsj merged 75 commits intomasterfrom
mp-add-fork-support
Dec 4, 2021
Merged

[WB-6939] Add service backend using sockets (support fork)#2892
raubitsj merged 75 commits intomasterfrom
mp-add-fork-support

Conversation

@raubitsj
Copy link
Copy Markdown
Contributor

@raubitsj raubitsj commented Nov 7, 2021

Implements: WB-6939

Description

Changes the default "wandb-service" mode to use raw tcp sockets instead of GRPC.

This is a desirable change because:

  • gRPC is not fork safe (forking a client grpc process after a connection has been made is unsafe)
  • gRPC introduces overhead and complexity that is unnecessary in many same node cases

This change (introducing a new transport method) is undesirable because:

  • It requires maintaining a separate code path
  • Getting socket error handling and shutdown handling can be difficult to get right

Additional changes:

  • Converts type annotation to be opt-out instead of opt-in (for wandb/sdk/ only right now)

TODO:

  • Remove debug messages
  • Fix up socket shutdown
  • Move grpc server code to its own place
  • Disable RunStatusChecker for now (works now)
  • Add grpc specific tests
  • Make tests pass!
  • Add (update) dev docs -- sequence diagrams
  • Make sure exception handling works
  • Improve coverage

Future:

  • Enable RunStatusChecker again (WB-7352)
  • windows tests / support

Testing

functional tests only (more unittests can come later once things stabilize)

@raubitsj raubitsj changed the title [WIP] Add alternative service backend (not grpc) to support fork mode and higher performance [WIP] Add service backend using sockets (support fork) Nov 7, 2021
@codecov
Copy link
Copy Markdown

codecov Bot commented Nov 7, 2021

Codecov Report

Merging #2892 (180e6f7) into master (b647fc5) will decrease coverage by 0.18%.
The diff coverage is 90.79%.

Impacted file tree graph

@@            Coverage Diff             @@
##           master    #2892      +/-   ##
==========================================
- Coverage   78.83%   78.64%   -0.19%     
==========================================
  Files         194      208      +14     
  Lines       26548    27212     +664     
==========================================
+ Hits        20928    21400     +472     
- Misses       5620     5812     +192     
Flag Coverage Δ
functest 56.55% <89.45%> (+0.43%) ⬆️
unittest 69.18% <25.33%> (-1.49%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

Impacted Files Coverage Δ
wandb/sdk/internal/handler.py 89.22% <66.66%> (-4.20%) ⬇️
wandb/sdk/service/server.py 82.22% <82.22%> (ø)
wandb/sdk/service/service_base.py 83.33% <83.33%> (ø)
wandb/sdk/service/service_grpc.py 84.09% <84.09%> (ø)
wandb/sdk/interface/interface_shared.py 84.83% <84.83%> (ø)
wandb/sdk/interface/router.py 96.72% <85.71%> (-1.59%) ⬇️
wandb/sdk/service/service.py 89.61% <90.00%> (-4.33%) ⬇️
wandb/sdk/service/server_sock.py 91.25% <91.25%> (ø)
wandb/sdk/service/port_file.py 92.45% <92.45%> (ø)
wandb/sdk/internal/sender.py 91.66% <93.33%> (-0.26%) ⬇️
... and 38 more

@raubitsj raubitsj changed the title [WIP] Add service backend using sockets (support fork) [WIP][WB-6939] Add service backend using sockets (support fork) Nov 8, 2021
parts = self._token_str.split("-")
assert len(parts) == 3, f"token must have 3 parts: {parts}"
assert len(parts) == 5, f"token must have 5 parts: {parts}"
# TODO: make more robust?
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is this TODO mean?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO: what if there arent 5 parts, what if they are malformed with different versions and non supported tokens.
i think this is fine for now, but it is a note to care about this more in the future and think about how it should fail rather than an assert.. It "shouldnt" ever fail, but there are cases we already know if where it might.
Start your training job, before call wandb.init() someone updates the wandb release with pip install --upgrade.... Now the user process is speaking a potentially different dialect than the server process. Eventually we should actually handle these cases by bumping a version and making sure things are compatible.

Comment thread wandb/sdk/service/streams.py Outdated
pass


def _dict_from_pbmap(pbmap: "MessageMap[str, spb.SettingsValue]") -> Dict[str, Any]:
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should it be under proto_util?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just being lazy, fixed in:
b99f463

self.pid = 0

def run(self) -> None:
# TODO: catch exceptions and report errors to scheduler
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we track this TODO?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment thread wandb/sdk/service/streams.py Outdated
self._target = target
self._kwargs = kwargs
self.daemon = True
self.pid = 0
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is this pid? why is it default to 0?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not needed right now. removed

Comment thread wandb/sdk/service/streams.py Outdated
if TYPE_CHECKING:
from google.protobuf.internal.containers import MessageMap

class GrpcServerType(object):
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

probably can remove this one, doesn't seem to be used

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed

Comment thread wandb/sdk/service/streams.py Outdated
relay_q=self._relay_q,
process=process,
process_check=False,
# use_router=False,
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove unused code?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed


streams_to_join = []
while streams:
for sid, stream in list(streams.items()):
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do you need to wrap it in a list?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a comment:

# Note that we materialize the generator so we can modify the underlying list

for stream in streams_to_join:
stream.join()

wandb.termlog("Done!") # type: ignore
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe more informative message?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was covered in this ticket already, just havent taken the time to do it yet:
https://wandb.atlassian.net/browse/WB-6948


def _loop(self) -> None:
while not self._stopped.is_set():
# TODO: check for parent process going away
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you want to address this TODO in a different PR? should we track this?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

self._action_q = queue.Queue()

def _get_stopped_event(self) -> "Event":
# TODO: clean this up, there should be a better way to abstract this
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This refers to the Event logic we talked about? Do you want to track this one as well?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

- StreamRecord:
- WandbServicer:

"""
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably want to update the docstring here?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Improved:

+"""streams: class that manages internal threads for each run.
 
+StreamThread: Thread that runs internal.wandb_internal()
+StreamRecord: All the external state for the internal thread (queues, etc)
+StreamAction: Lightweight record for stream ops for thread safety with grpc
+StreamMux: Container for dictionary of stream threads per runid
 """

Comment thread wandb/sdk/service/server_sock.py Outdated

def server_record_publish(self, sreq: "spb.ServerRequest") -> None:
record = sreq.record_publish
# print("GOT rec", record)
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove commented out code

Comment thread wandb/sdk/service/server_sock.py Outdated
iface.record_q.put(record)

def server_inform_finish(self, sreq: "spb.ServerRequest") -> None:
# print("serv INF FIN")
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here as well :)

Comment thread wandb/sdk/service/server_sock.py Outdated
self._mux.drop_stream(stream_id)

def server_inform_teardown(self, sreq: "spb.ServerRequest") -> None:
# print("serv INF TEARDOWN")
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and this one

Comment thread wandb/sdk/service/server_sock.py Outdated
Comment on lines +192 to +193
# print("GOT", type(conn))
# print("Connected by", addr)
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove commented out code

Comment thread wandb/sdk/service/server_sock.py Outdated
Comment on lines +238 to +242
# print(f"Running at port: {self.port}")
self._thread = SockAcceptThread(sock=self._sock, mux=self._mux)
self._thread.start()
# self._dbg_thread = DebugThread(mux=self._mux)
# self._dbg_thread.start()
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove commented out code?

Copy link
Copy Markdown
Collaborator

@kptkin kptkin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Amazing!!! 🤩 🚀

@raubitsj raubitsj merged commit 31c12aa into master Dec 4, 2021
@raubitsj raubitsj deleted the mp-add-fork-support branch December 4, 2021 16:32
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants