0 follower

Final Class Yiisoft\Queue\Command\ListenAllCommand

InheritanceYiisoft\Queue\Command\ListenAllCommand » Symfony\Component\Console\Command\Command

Protected Methods

Hide inherited methods

Method Description Defined By
execute() Yiisoft\Queue\Command\ListenAllCommand

Method Details

Hide inherited methods

__construct() public method

public mixed __construct ( Yiisoft\Queue\Provider\QueueProviderInterface $queueProvider, Yiisoft\Queue\Cli\LoopInterface $loop )
$queueProvider Yiisoft\Queue\Provider\QueueProviderInterface
$loop Yiisoft\Queue\Cli\LoopInterface

                public function __construct(
    private readonly QueueProviderInterface $queueProvider,
    private readonly LoopInterface $loop,
) {
    parent::__construct();
}

            
configure() public method

public void configure ( )

                public function configure(): void
{
    $this->addArgument(
        'queue',
        InputArgument::OPTIONAL | InputArgument::IS_ARRAY,
        'Queue name list to connect to',
        $this->queueProvider->getNames(),
    )
        ->addOption(
            'pause',
            'p',
            InputOption::VALUE_REQUIRED,
            'Pause between queue iterations in seconds. May save some CPU. Default: 1',
            1,
        )
        ->addOption(
            'maximum',
            'm',
            InputOption::VALUE_REQUIRED,
            'Maximum number of messages to process in each queue before switching to another queue. '
                . 'Default is 0 (no limits).',
            0,
        );
    $this->addUsage('[queue1 [queue2 [...]]] [--timeout=<timeout>] [--maximum=<maximum>]');
}

            
execute() protected method

protected integer execute ( \Symfony\Component\Console\Input\InputInterface $input, \Symfony\Component\Console\Output\OutputInterface $output )
$input \Symfony\Component\Console\Input\InputInterface
$output \Symfony\Component\Console\Output\OutputInterface

                protected function execute(InputInterface $input, OutputInterface $output): int
{
    $queues = [];
    /** @var string $queue */
    foreach ($input->getArgument('queue') as $queue) {
        $queues[] = $this->queueProvider->get($queue);
    }
    $pauseSeconds = (int) $input->getOption('pause');
    if ($pauseSeconds < 0) {
        $pauseSeconds = 1;
    }
    while ($this->loop->canContinue()) {
        $hasMessages = false;
        foreach ($queues as $queue) {
            $hasMessages = $queue->run((int) $input->getOption('maximum')) > 0 || $hasMessages;
        }
        if (!$hasMessages) {
            /** @psalm-var 0|positive-int $pauseSeconds */
            sleep($pauseSeconds);
        }
    }
    return 0;
}