feat(taskworker) Add concurrent worker#83254
Merged
Conversation
Codecov ReportAttention: Patch coverage is ✅ All tests successful. No failed tests found.
Additional details and impacted files@@ Coverage Diff @@
## master #83254 +/- ##
==========================================
+ Coverage 87.49% 87.56% +0.06%
==========================================
Files 9404 9393 -11
Lines 537180 536973 -207
Branches 21133 21048 -85
==========================================
+ Hits 470024 470203 +179
+ Misses 66798 66414 -384
+ Partials 358 356 -2 |
evanh
reviewed
Jan 14, 2025
src/sentry/taskworker/worker.py
Outdated
| task = self._get_known_task(activation) | ||
| if not task: | ||
| try: | ||
| activation = child_tasks.get_nowait() |
Member
There was a problem hiding this comment.
I don't think get_nowait is necessarily correct here. I understand this is ensuring that the process doesn't block while waiting for a task before checking for the shutdown, but I think some kind of timeout/delay would good to here to avoid spiking the CPU. Maybe like 100ms or something like that?
Member
Author
There was a problem hiding this comment.
Good point about the potential CPU burn on an empty queue. I'll put a blocking get with a timeout in.
Co-authored-by: Evan Hicks <evanh@users.noreply.github.com>
Wait on the empty queue to reduce CPU burn.
evanh
approved these changes
Jan 16, 2025
Contributor
Suspect IssuesThis pull request was deployed and Sentry observed the following issues:
Did you find this useful? React with a 👍 or 👎 |
andrewshie-sentry
pushed a commit
that referenced
this pull request
Jan 22, 2025
Move the taskworker process to be a multiprocess concurrent worker. This
will help enable higher CPU usage in worker pods, as we can pack more
concurrent CPU operations into each pod (at the cost of memory).
The main process is responsible for:
- Spawning children
- Making RPC requests to fill child queues and submit results.
Each child process handles:
- Resolving task names
- Checking at_most_once keys
- Enforcing processing deadlines
- Executing task functions
Instead of using more child processes to enforce timeouts, I've used
SIGALRM. I've verified that tasks like
```python
@exampletasks.register(name="examples.infinite", retry=Retry(times=2))
def infinite_task() -> None:
try:
while True:
pass
except Exception as e:
print("haha caught exception", e)
```
Do not paralyze workers with infinite loops.
When a worker is terminated, it uses an `Event` to have children exit,
and then drains any results. If there are tasks in the `_child_tasks`
queue will not be completed, and instead will sent to another worker
when the `processing_deadline` on the activations expires.
---------
Co-authored-by: Evan Hicks <evanh@users.noreply.github.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to subscribe to this conversation on GitHub.
Already have an account?
Sign in.
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Move the taskworker process to be a multiprocess concurrent worker. This will help enable higher CPU usage in worker pods, as we can pack more concurrent CPU operations into each pod (at the cost of memory).
The main process is responsible for:
Each child process handles:
Instead of using more child processes to enforce timeouts, I've used SIGALRM. I've verified that tasks like
Do not paralyze workers with infinite loops.
When a worker is terminated, it uses an
Eventto have children exit, and then drains any results. If there are tasks in the_child_tasksqueue will not be completed, and instead will sent to another worker when theprocessing_deadlineon the activations expires.