Skip to content

Commit 9ce1388

Browse files
Improve run command (#168)
* Allow to process messages from different channels * Add "maximum" option to the "run" console command * Improve messages * Fix test * Apply fixes from StyleCI * Fix typecasting bug --------- Co-authored-by: StyleCI Bot <bot@styleci.io>
1 parent 8498f59 commit 9ce1388

9 files changed

Lines changed: 52 additions & 19 deletions

File tree

config/di.php

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
use Yiisoft\Queue\Cli\LoopInterface;
77
use Yiisoft\Queue\Cli\SignalLoop;
88
use Yiisoft\Queue\Cli\SimpleLoop;
9+
use Yiisoft\Queue\Command\RunCommand;
910
use Yiisoft\Queue\Middleware\Consume\ConsumeMiddlewareDispatcher;
1011
use Yiisoft\Queue\Middleware\Consume\MiddlewareFactoryConsume;
1112
use Yiisoft\Queue\Middleware\Consume\MiddlewareFactoryConsumeInterface;
@@ -52,4 +53,9 @@
5253
FailureMiddlewareDispatcher::class => [
5354
'__construct()' => ['middlewareDefinitions' => $params['yiisoft/queue']['middlewares-fail']],
5455
],
56+
RunCommand::class => [
57+
'__construct()' => [
58+
'channels' => array_keys($params['yiisoft/yii-queue']['channel-definitions']),
59+
],
60+
],
5561
];

config/params.php

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
declare(strict_types=1);
44

5+
use Yiisoft\Queue\Adapter\AdapterInterface;
56
use Yiisoft\Queue\Command\ListenCommand;
67
use Yiisoft\Queue\Command\RunCommand;
78
use Yiisoft\Queue\Debug\QueueCollector;
@@ -19,7 +20,9 @@
1920
],
2021
'yiisoft/queue' => [
2122
'handlers' => [],
22-
'channel-definitions' => [],
23+
'channel-definitions' => [
24+
QueueFactoryInterface::DEFAULT_CHANNEL_NAME => AdapterInterface::class,
25+
],
2326
'middlewares-push' => [],
2427
'middlewares-consume' => [],
2528
'middlewares-fail' => [],

src/Command/RunCommand.php

Lines changed: 25 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -7,16 +7,17 @@
77
use Symfony\Component\Console\Command\Command;
88
use Symfony\Component\Console\Input\InputArgument;
99
use Symfony\Component\Console\Input\InputInterface;
10+
use Symfony\Component\Console\Input\InputOption;
1011
use Symfony\Component\Console\Output\OutputInterface;
11-
use Yiisoft\Queue\QueueFactory;
1212
use Yiisoft\Queue\QueueFactoryInterface;
1313

1414
final class RunCommand extends Command
1515
{
1616
protected static $defaultName = 'queue:run';
17-
protected static $defaultDescription = 'Runs all the existing messages in the queue. Exits once messages are over.';
17+
protected static $defaultDescription = 'Runs all the existing messages in the given queues. ' .
18+
'Exits once messages are over.';
1819

19-
public function __construct(private QueueFactoryInterface $queueFactory)
20+
public function __construct(private QueueFactoryInterface $queueFactory, private array $channels)
2021
{
2122
parent::__construct();
2223
}
@@ -25,17 +26,31 @@ public function configure(): void
2526
{
2627
$this->addArgument(
2728
'channel',
28-
InputArgument::OPTIONAL,
29-
'Queue channel name to connect to',
30-
QueueFactory::DEFAULT_CHANNEL_NAME
31-
);
29+
InputArgument::OPTIONAL | InputArgument::IS_ARRAY,
30+
'Queue channel name list to connect to.',
31+
$this->channels,
32+
)
33+
->addOption(
34+
'maximum',
35+
'm',
36+
InputOption::VALUE_REQUIRED,
37+
'Maximum number of messages to process in each channel. Default is 0 (no limits).',
38+
0,
39+
)
40+
->addUsage('[channel1 [channel2 [...]]] --maximum 100');
3241
}
3342

3443
protected function execute(InputInterface $input, OutputInterface $output): int
3544
{
36-
$this->queueFactory
37-
->get($input->getArgument('channel'))
38-
->run();
45+
/** @var string $channel */
46+
foreach ($input->getArgument('channel') as $channel) {
47+
$output->write("Processing channel $channel... ");
48+
$count = $this->queueFactory
49+
->get($channel)
50+
->run((int)$input->getOption('maximum'));
51+
52+
$output->writeln("Messages processed: $count.");
53+
}
3954

4055
return 0;
4156
}

src/Debug/QueueDecorator.php

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,9 +35,9 @@ public function push(
3535
return $message;
3636
}
3737

38-
public function run(int $max = 0): void
38+
public function run(int $max = 0): int
3939
{
40-
$this->queue->run($max);
40+
return $this->queue->run($max);
4141
}
4242

4343
public function listen(): void

src/Middleware/FailureHandling/Implementation/ExponentialDelayMiddleware.php

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,11 @@ private function getDelay(MessageInterface $message): float
101101
$meta = $message->getMetadata();
102102
$key = self::META_KEY_DELAY . "-$this->id";
103103

104-
$delayOriginal = (float) ($meta[$key] ?? 0 ?: $this->delayInitial);
104+
$delayOriginal = (float) ($meta[$key] ?? 0);
105+
if ($delayOriginal <= 0) {
106+
$delayOriginal = $this->delayInitial;
107+
}
108+
105109
$result = $delayOriginal * $this->exponent;
106110

107111
return min($result, $this->delayMaximum);

src/Queue.php

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ public function push(
6767
return $message;
6868
}
6969

70-
public function run(int $max = 0): void
70+
public function run(int $max = 0): int
7171
{
7272
$this->checkAdapter();
7373

@@ -90,6 +90,8 @@ public function run(int $max = 0): void
9090
'Processed {count} queue messages.',
9191
['count' => $count]
9292
);
93+
94+
return $count;
9395
}
9496

9597
public function listen(): void

src/QueueInterface.php

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,10 @@ public function push(MessageInterface $message, MiddlewarePushInterface|callable
3030
* Execute all existing jobs and exit
3131
*
3232
* @param int $max
33+
*
34+
* @return int How many messages were processed
3335
*/
34-
public function run(int $max = 0): void;
36+
public function run(int $max = 0): int;
3537

3638
/**
3739
* Listen to the queue and execute jobs as they come

tests/App/DummyQueue.php

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,9 @@ public function push(
2424
return $message;
2525
}
2626

27-
public function run(int $max = 0): void
27+
public function run(int $max = 0): int
2828
{
29+
throw new Exception('`run()` method is not implemented yet.');
2930
}
3031

3132
public function listen(): void

tests/Unit/Command/RunCommandTest.php

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ final class RunCommandTest extends TestCase
1515
{
1616
public function testConfigure(): void
1717
{
18-
$command = new RunCommand($this->createMock(QueueFactoryInterface::class));
18+
$command = new RunCommand($this->createMock(QueueFactoryInterface::class), []);
1919
$channelArgument = $command->getNativeDefinition()->getArgument('channel');
2020
$this->assertEquals('channel', $channelArgument->getName());
2121
}
@@ -28,7 +28,7 @@ public function testExecute(): void
2828
$queueFactory->method('get')->willReturn($queue);
2929
$input = new StringInput('channel');
3030

31-
$command = new RunCommand($queueFactory);
31+
$command = new RunCommand($queueFactory, []);
3232
$exitCode = $command->run($input, $this->createMock(OutputInterface::class));
3333

3434
$this->assertEquals(0, $exitCode);

0 commit comments

Comments
 (0)