Refactor runner for implementing batching#1632
Conversation
32fd885 to
cdf721e
Compare
cdf721e to
76260fb
Compare
JakeHillion
left a comment
There was a problem hiding this comment.
This looks good to me, there's a lot of it but I like the direction. Would be great to see what continuous batching looks like on top of this before merging.
Evanev7
left a comment
There was a problem hiding this comment.
nice first pass, i like the direction this is going in
There was a problem hiding this comment.
was this intended to be included in this PR?
There was a problem hiding this comment.
(the batch generation code that is)
There was a problem hiding this comment.
This isn't batch generation. We just queue up a batch of tasks and process sequentially. The resulting behaviour is a no-op, but this gives it the structure of batch generation that we can replace with a true implementation.
There was a problem hiding this comment.
it is a batch generator, it's just a bad one.
There was a problem hiding this comment.
Since we're going to have both implementations in the next PR, I've renamed this to a SequentialGenerator that implements an Inference Generator ABC. (Not sure about that naming but I couldn't come up with a better one)
src/exo/utils/channels.py
Outdated
| return d | ||
|
|
||
|
|
||
| class NonBlockingGenerator[T](Generator[T | None, None, None]): |
There was a problem hiding this comment.
ahh monads my old friend. this is (i reckon) not quite the right abstraction here. in its current iteration, id suggest using just the receiver and letting the WouldBlock exception bubble up so we don't need to do this T | None dance all the way through the pipeline.
There was a problem hiding this comment.
Have replied similarly in a different comment, but I want to be able to use this like mlx generate that can be composed with the model output parsers with the option for None when a result isn't available.
There was a problem hiding this comment.
which other comment? and, i suppose if you're set on that api we should make the split explicit; have one class throw WouldBlock and have an outer wrapper that catches WouldBlock and converts it to None if you don't want to adjust the current generator mapping functions.
src/exo/worker/runner/bootstrap.py
Outdated
| if bound_instance.is_image_model: | ||
| from exo.worker.runner.image_models.runner import main | ||
|
|
||
| main(bound_instance, event_sender, task_receiver, cancel_receiver) |
There was a problem hiding this comment.
a change of this scale should be reflected in the image runner.
There was a problem hiding this comment.
I think this would make the diff a bit unmanageable. I'll add another pr on top that does this.
There was a problem hiding this comment.
playing devils advocate a little but;
- im not jake; i dont care about diff size
- i do not want the two runners to fall out of sync at the current moment, i want a clean break from old style main to new style class
- this is already out of scope of what a single pr could be (i.e. the batch generator interface)
There was a problem hiding this comment.
I thought the diffs would be a lot worse but that file isn't too big... Made it a separate commit so I can extract if it ever feels necessary
| self.status = event.runner_status | ||
| if isinstance(event, TaskAcknowledged): | ||
| self.pending.pop(event.task_id).set() | ||
| self.pending[event.task_id].set() |
There was a problem hiding this comment.
this is a very significant change to the meaning of "pending" in the supervisor, that I'm not a huge fan of. if you want a union of pending and active there should be other ways to implement it.
There was a problem hiding this comment.
Will think on this
There was a problem hiding this comment.
Added an in progress set instead
| self.batch_generator = BatchGenerator( | ||
| model=self.inference_model, | ||
| tokenizer=self.tokenizer, | ||
| group=self.group, | ||
| kv_prefix_cache=self.kv_prefix_cache, | ||
| model_id=self.model_id, | ||
| device_rank=self.device_rank, | ||
| cancel_receiver=self.cancel_receiver, | ||
| cancelled_tasks=self.cancelled_tasks, | ||
| event_sender=self.event_sender, | ||
| check_for_cancel_every=self.check_for_cancel_every, | ||
| ) |
There was a problem hiding this comment.
if you are going to include a dedicated batch generator task, it should either take ownership of these values or be independent of them. really don't like this sharing.
There was a problem hiding this comment.
Since warmup requires a lot of these fields, I'm not sure how I should go about doing this. I have gotten rid of cancelled tasks as that can be entirely local to the generator.
b771f67 to
8cb9bac
Compare
ca53647 to
0b00f1a
Compare
593cd59 to
b05ddff
Compare
b9c4199 to
f6eccf1
Compare
f6eccf1 to
6962838
Compare
what da ya think! --------- Co-authored-by: Ryuichi Leo Takashige <leo@exolabs.net>
There was a problem hiding this comment.
thanks for implementing this as well
…actor, regressing from exo-explore#1262.
Motivation
Batching will require us to send tasks concurrently and queue them up. Our current infrastructure cannot handle that all. This PR gets us closer to this by allowing multiple tasks to be sent in parallel and then queuing up tasks.
Changes
Change Plan logic
Make runner main into a class
Add a "BatchGenerator" to which tasks can be submitted (although tasks are handled sequentially) and sent back through an MpSender.
Refactor runner to accept tasks during generation
Keep the generator threading
Separate the runner into several files for better readability
Test Plan
Manual Testing
Tested manually, needs a lot more automated testing. Cancellation still works on a single device. Needs checking on multiple devices.
Automated Testing