Final Class Yiisoft\Queue\Command\ListenAllCommand
| Inheritance | Yiisoft\Queue\Command\ListenAllCommand » Symfony\Component\Console\Command\Command |
|---|
Public Methods
| Method | Description | Defined By |
|---|---|---|
| __construct() | Yiisoft\Queue\Command\ListenAllCommand | |
| configure() | Yiisoft\Queue\Command\ListenAllCommand |
Protected Methods
| Method | Description | Defined By |
|---|---|---|
| execute() | Yiisoft\Queue\Command\ListenAllCommand |
Method Details
| public __construct( Yiisoft\Queue\Provider\QueueProviderInterface $queueProvider, Yiisoft\Queue\Cli\LoopInterface $loop, array $channels ): mixed | ||
| $queueProvider | Yiisoft\Queue\Provider\QueueProviderInterface | |
| $loop | Yiisoft\Queue\Cli\LoopInterface | |
| $channels | array | |
public function __construct(
private readonly QueueProviderInterface $queueProvider,
private readonly LoopInterface $loop,
private readonly array $channels,
) {
parent::__construct();
}
| public configure( ): void |
public function configure(): void
{
$this->addArgument(
'channel',
InputArgument::OPTIONAL | InputArgument::IS_ARRAY,
'Queue channel name list to connect to',
$this->channels,
)
->addOption(
'pause',
'p',
InputOption::VALUE_REQUIRED,
'Pause between queue channel iterations in seconds. May save some CPU. Default: 1',
1,
)
->addOption(
'maximum',
'm',
InputOption::VALUE_REQUIRED,
'Maximum number of messages to process in each channel before switching to another channel. '
. 'Default is 0 (no limits).',
0,
);
$this->addUsage('[channel1 [channel2 [...]]] [--timeout=<timeout>] [--maximum=<maximum>]');
}
| protected execute( \Symfony\Component\Console\Input\InputInterface $input, \Symfony\Component\Console\Output\OutputInterface $output ): integer | ||
| $input | \Symfony\Component\Console\Input\InputInterface | |
| $output | \Symfony\Component\Console\Output\OutputInterface | |
protected function execute(InputInterface $input, OutputInterface $output): int
{
$queues = [];
/** @var string $channel */
foreach ($input->getArgument('channel') as $channel) {
$queues[] = $this->queueProvider->get($channel);
}
$pauseSeconds = (int) $input->getOption('pause');
if ($pauseSeconds < 0) {
$pauseSeconds = 1;
}
while ($this->loop->canContinue()) {
$hasMessages = false;
foreach ($queues as $queue) {
$hasMessages = $queue->run((int) $input->getOption('maximum')) > 0 || $hasMessages;
}
if (!$hasMessages) {
/** @psalm-var 0|positive-int $pauseSeconds */
sleep($pauseSeconds);
}
}
return 0;
}
Signup or Login in order to comment.