0 follower

Final Class Yiisoft\Queue\Command\ListenAllCommand

InheritanceYiisoft\Queue\Command\ListenAllCommand » Symfony\Component\Console\Command\Command

Protected Methods

Hide inherited methods

Method Description Defined By
execute() Yiisoft\Queue\Command\ListenAllCommand

Method Details

Hide inherited methods

__construct() public method

public __construct( Yiisoft\Queue\Provider\QueueProviderInterface $queueProvider, Yiisoft\Queue\Cli\LoopInterface $loop, array $channels ): mixed
$queueProvider Yiisoft\Queue\Provider\QueueProviderInterface
$loop Yiisoft\Queue\Cli\LoopInterface
$channels array

                public function __construct(
    private readonly QueueProviderInterface $queueProvider,
    private readonly LoopInterface $loop,
    private readonly array $channels,
) {
    parent::__construct();
}

            
configure() public method

public configure( ): void

                public function configure(): void
{
    $this->addArgument(
        'channel',
        InputArgument::OPTIONAL | InputArgument::IS_ARRAY,
        'Queue channel name list to connect to',
        $this->channels,
    )
        ->addOption(
            'pause',
            'p',
            InputOption::VALUE_REQUIRED,
            'Pause between queue channel iterations in seconds. May save some CPU. Default: 1',
            1,
        )
        ->addOption(
            'maximum',
            'm',
            InputOption::VALUE_REQUIRED,
            'Maximum number of messages to process in each channel before switching to another channel. '
                . 'Default is 0 (no limits).',
            0,
        );
    $this->addUsage('[channel1 [channel2 [...]]] [--timeout=<timeout>] [--maximum=<maximum>]');
}

            
execute() protected method

protected execute( \Symfony\Component\Console\Input\InputInterface $input, \Symfony\Component\Console\Output\OutputInterface $output ): integer
$input \Symfony\Component\Console\Input\InputInterface
$output \Symfony\Component\Console\Output\OutputInterface

                protected function execute(InputInterface $input, OutputInterface $output): int
{
    $queues = [];
    /** @var string $channel */
    foreach ($input->getArgument('channel') as $channel) {
        $queues[] = $this->queueProvider->get($channel);
    }
    $pauseSeconds = (int) $input->getOption('pause');
    if ($pauseSeconds < 0) {
        $pauseSeconds = 1;
    }
    while ($this->loop->canContinue()) {
        $hasMessages = false;
        foreach ($queues as $queue) {
            $hasMessages = $queue->run((int) $input->getOption('maximum')) > 0 || $hasMessages;
        }
        if (!$hasMessages) {
            /** @psalm-var 0|positive-int $pauseSeconds */
            sleep($pauseSeconds);
        }
    }
    return 0;
}