Skip to content

Commit 3602957

Browse files
holdmannsamdarkviktorproggervjik
authored
Supporting of M:N relation (many type of messages in many channels) required (#224)
* fixed according to issue. * Improve tests --------- Co-authored-by: Alexander Makarov <sam@rmcreative.ru> Co-authored-by: viktorprogger <viktorprogger@gmail.com> Co-authored-by: Sergei Predvoditelev <sergei@predvoditelev.ru>
1 parent ace417c commit 3602957

2 files changed

Lines changed: 16 additions & 8 deletions

File tree

src/Middleware/Consume/ConsumeMiddlewareDispatcher.php

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,11 @@
99
final class ConsumeMiddlewareDispatcher
1010
{
1111
/**
12-
* Contains a middleware pipeline handler.
12+
* Contains a middleware pipeline handlers.
1313
*
14-
* @var MiddlewareConsumeStack|null The middleware stack.
14+
* @var MiddlewareConsumeStack[] The middleware stack divided by message types.
1515
*/
16-
private ?MiddlewareConsumeStack $stack = null;
16+
private array $stack = [];
1717

1818
/**
1919
* @var array[]|callable[]|MiddlewareConsumeInterface[]|string[]
@@ -37,11 +37,12 @@ public function dispatch(
3737
ConsumeRequest $request,
3838
MessageHandlerConsumeInterface $finishHandler
3939
): ConsumeRequest {
40-
if ($this->stack === null) {
41-
$this->stack = new MiddlewareConsumeStack($this->buildMiddlewares(), $finishHandler);
40+
$handlerName = $request->getMessage()->getHandlerName();
41+
if (!array_key_exists($handlerName, $this->stack)) {
42+
$this->stack[$handlerName] = new MiddlewareConsumeStack($this->buildMiddlewares(), $finishHandler);
4243
}
4344

44-
return $this->stack->handleConsume($request);
45+
return $this->stack[$handlerName]->handleConsume($request);
4546
}
4647

4748
/**
@@ -68,7 +69,7 @@ public function withMiddlewares(array $middlewareDefinitions): self
6869

6970
// Fixes a memory leak.
7071
unset($instance->stack);
71-
$instance->stack = null;
72+
$instance->stack = [];
7273

7374
return $instance;
7475
}

tests/Integration/MessageConsumingTest.php

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,14 +20,19 @@
2020
final class MessageConsumingTest extends TestCase
2121
{
2222
private array $messagesProcessed;
23+
private array $messagesProcessedSecond;
2324

2425
public function testMessagesConsumed(): void
2526
{
2627
$this->messagesProcessed = [];
28+
$this->messagesProcessedSecond = [];
2729

2830
$container = $this->createMock(ContainerInterface::class);
2931
$worker = new Worker(
30-
['test' => fn (MessageInterface $message): mixed => $this->messagesProcessed[] = $message->getData()],
32+
[
33+
'test' => fn (MessageInterface $message): mixed => $this->messagesProcessed[] = $message->getData(),
34+
'test2' => fn (MessageInterface $message): mixed => $this->messagesProcessedSecond[] = $message->getData(),
35+
],
3136
new NullLogger(),
3237
new Injector($container),
3338
$container,
@@ -38,9 +43,11 @@ public function testMessagesConsumed(): void
3843
$messages = [1, 'foo', 'bar-baz'];
3944
foreach ($messages as $message) {
4045
$worker->process(new Message('test', $message), $this->getQueue());
46+
$worker->process(new Message('test2', $message), $this->getQueue());
4147
}
4248

4349
$this->assertEquals($messages, $this->messagesProcessed);
50+
$this->assertEquals($messages, $this->messagesProcessedSecond);
4451
}
4552

4653
public function testMessagesConsumedByHandlerClass(): void

0 commit comments

Comments
 (0)