Use threading.Events to communicate between shutdown and export#4511
Use threading.Events to communicate between shutdown and export#4511DylanRussell wants to merge 7 commits intoopen-telemetry:mainfrom
Conversation
export call is occuring, so that shutdown waits for export call to finish. Use threading.Event() to communicate when shutdown is occuring, so that sleep in export is interrupted if a shutdown is occuring.
...pentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/exporter.py
Show resolved
Hide resolved
| metadata=self._headers, | ||
| timeout=self._timeout, | ||
| try: | ||
| self._export_not_occuring.clear() |
There was a problem hiding this comment.
The usage of _export_not_occuring looks like a lock to me. Is there a benefit to using an event for it ?
There was a problem hiding this comment.
Using an event allows the export thread to communicate to shutdown that there is / is not a pending RPC. In Shutdown we call the wait() method that blocks until the flag is true.
The problem with the lock is export gives it up, only to immediately require it. When 2 threads ask for a lock there's no guarantee on which gets it.
If the behavior that we want is for shutdown to block for any pending RPC and otherwise execute I think an event is best.
There was a problem hiding this comment.
Maybe I'm missing something, but if you're doing
while:
if shutdown_occuring.is_set(): return
event.clear()
export()
event.set()there is no guarantee that the thing waiting for the event will have run and set shutdown_occuring before export() gets called again. I think even switching to a lock doesn't necessarily solve everything. Might need to rethink the approach a little.
There was a problem hiding this comment.
There must be some delay for the shutdown thread to receive that notification and set shutdown_occurring, but that must be really small.
I'm sure it's less than the sleeps in the retry loop (I think my test covers this, but I'll double check). I can probably test and see exactly how small that delay is. Conceivably a new export call could occur in the milliseconds or nanoseconds it takes. If that happens shutdown will have proceeded and closed the channel which will interrupt this export call. I don't think it's that bad for this to happen, and it's unlikely. Still an improvement on the current behavior IMO.
| delay, | ||
| ) | ||
| self._shutdown_occuring.wait(delay) | ||
| continue |
There was a problem hiding this comment.
IMO this would be a little clearer to just return here
exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_exporter_mixin.py
Outdated
Show resolved
Hide resolved
| def run(self): | ||
| if self._target is not None: # type: ignore | ||
| self._return = self._target(*self._args, **self._kwargs) # type: ignore |
There was a problem hiding this comment.
Should we include the cleanup from the original run function or is that not a concern here?
| def run(self): | |
| if self._target is not None: # type: ignore | |
| self._return = self._target(*self._args, **self._kwargs) # type: ignore | |
| try: | |
| if self._target is not None: | |
| self._return = self._target(*self._args, **self._kwargs) | |
| finally: | |
| # Avoid a refcycle if the thread is running a function with | |
| # an argument that has a member that points to the thread. | |
| del self._target, self._args, self._kwargs |
| def join(self, *args): # type: ignore | ||
| threading.Thread.join(self, *args) |
There was a problem hiding this comment.
nit: Could we avoid type ignore by explicitly passing the expected type?
| def join(self, *args): # type: ignore | |
| threading.Thread.join(self, *args) | |
| def join(self, timeout: float | None = None) -> Any: | |
| threading.Thread.join(self, timeout=timeout) |
| # value will remain constant. | ||
| for delay in _create_exp_backoff_generator(max_value=max_value): | ||
| if delay == max_value or self._shutdown: | ||
| for delay in [1, 2, 4, 8, 16, 32]: |
There was a problem hiding this comment.
Should it include 64 as well like max_value before?
| metadata=self._headers, | ||
| timeout=self._timeout, | ||
| try: | ||
| self._export_not_occuring.clear() |
There was a problem hiding this comment.
Maybe I'm missing something, but if you're doing
while:
if shutdown_occuring.is_set(): return
event.clear()
export()
event.set()there is no guarantee that the thing waiting for the event will have run and set shutdown_occuring before export() gets called again. I think even switching to a lock doesn't necessarily solve everything. Might need to rethink the approach a little.
|
|
||
| def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None: | ||
| if self._shutdown: | ||
| if self._shutdown_occuring.is_set(): |
There was a problem hiding this comment.
This already had the same problem, but shutdown() is not thread safe. I guess for this PR we can assume only one thread calls it.
|
This PR has been automatically marked as stale because it has not had any activity for 14 days. It will be closed if no further activity occurs within 14 days of this comment. |
|
This PR has been closed due to inactivity. Please reopen if you would like to continue working on it. |
Description
It seems like the behavior we want for
Shutdown()is:Shutdown()to interrupt thesleepcall inexport, so we don't idle only to report Failure.This PR accomplishes these via threading events.
eventforexportto communicate toshutdownthat an RPC is in progress, and to wait until it's done or the shutdown timeout finishes.shutdownto communicate toexportthat shutdown is happening, and it doesn't need tosleep.We use these 2 events to communicate between the 2 threads. AFAIK there are only 2 threads we need to worry about, one thread where
exportis repeatedly called, and the main thread whereshutdownis called.Note that this PR also fixes a bug where were we were needlessly sleeping for 32 seconds only to report failure, because we would simply break out of the loop in the next iteration. I also did some minor code cleanup in the exporters in this PR.
Type of change
Please delete options that are not relevant.
How Has This Been Tested?
Still need to write tests. Putting this out there now to get early feedback.
Does This PR Require a Contrib Repo Change?
Checklist: