0 follower

Final Class Yiisoft\Queue\Queue

InheritanceYiisoft\Queue\Queue
ImplementsYiisoft\Queue\QueueInterface

Method Details

Hide inherited methods

__clone() public method

public mixed __clone ( )

                public function __clone()
{
    $finalPushHandler = $this->createFinalPushHandler();
    $this->baseDispatcher = $this->baseDispatcher->withFinishHandler($finalPushHandler);
    $this->dispatcher = $this->dispatcher->withFinishHandler($finalPushHandler);
}

            
__construct() public method

public mixed __construct ( Yiisoft\Queue\Worker\WorkerInterface $worker, Yiisoft\Queue\Cli\LoopInterface $loop, \Psr\Log\LoggerInterface $logger, Yiisoft\Queue\Middleware\Push\PushMiddlewareConfig $middlewareConfig, Yiisoft\Queue\Adapter\AdapterInterface|null $adapter null, string|\BackedEnum $name QueueProviderInterface::DEFAULT_QUEUE, mixed $middlewareDefinitions )
$worker Yiisoft\Queue\Worker\WorkerInterface

The worker that processes messages.

$loop Yiisoft\Queue\Cli\LoopInterface

The loop for controlling message processing.

$logger \Psr\Log\LoggerInterface

The logger for debug and informational messages.

$middlewareConfig Yiisoft\Queue\Middleware\Push\PushMiddlewareConfig

The push middleware configuration: factory and common middleware definitions.

$adapter Yiisoft\Queue\Adapter\AdapterInterface|null

The message adapter (null for synchronous mode).

$name string|\BackedEnum

The queue name.

$middlewareDefinitions mixed

Queue-specific middleware definitions.

                public function __construct(
    private readonly WorkerInterface $worker,
    private readonly LoopInterface $loop,
    private readonly LoggerInterface $logger,
    PushMiddlewareConfig $middlewareConfig,
    private readonly ?AdapterInterface $adapter = null,
    string|BackedEnum $name = QueueProviderInterface::DEFAULT_QUEUE,
    mixed ...$middlewareDefinitions,
) {
    $this->name = StringNormalizer::normalize($name);
    $this->baseDispatcher = new PushMiddlewareDispatcher(
        $middlewareConfig->middlewareFactory,
        $middlewareConfig->commonMiddlewareDefinitions,
        $this->createFinalPushHandler(),
    );
    $this->setMiddlewaresAndPrepareDispatcher($middlewareDefinitions);
}

            
getName() public method

public string getName ( )

                public function getName(): string
{
    return $this->name;
}

            
listen() public method

public void listen ( )

                public function listen(): void
{
    if ($this->isSynchronous()) {
        $this->logger->info('Cannot listen without an adapter. Queue is in synchronous mode.');
        return;
    }
    $this->logger->info('Start listening to the queue.');
    $this->adapter->subscribe(fn(MessageInterface $message) => $this->handle($message));
    $this->logger->info('Finish listening to the queue.');
}

            
push() public method

public Yiisoft\Queue\Message\MessageInterface push ( Yiisoft\Queue\Message\MessageInterface $message )
$message Yiisoft\Queue\Message\MessageInterface

                public function push(MessageInterface $message): MessageInterface
{
    $this->logger->debug(
        'Preparing to push message with message type "{messageType}".',
        ['messageType' => $message->getType()],
    );
    $message = $this->dispatcher->dispatch($message);
    if ($this->isSynchronous()) {
        $this->logger->info(
            'Processed message with message type "{messageType}" synchronously.',
            ['messageType' => $message->getType()],
        );
        return $message;
    }
    $messageId = IdEnvelope::fromMessage($message)->getId();
    if ($messageId === null) {
        $this->logger->info(
            'Pushed message with message type "{messageType}" to the queue. ID doesn\'t assigned.',
            ['messageType' => $message->getType()],
        );
    } else {
        $this->logger->info(
            'Pushed message with message type "{messageType}" to the queue. Assigned ID #{id}.',
            ['messageType' => $message->getType(), 'id' => $messageId],
        );
    }
    return $message;
}

            
run() public method

public integer run ( integer $max 0 )
$max integer

                public function run(int $max = 0): int
{
    if ($this->isSynchronous()) {
        $this->logger->debug(
            'Queue is in synchronous mode (no adapter). Messages are processed on push. run() does nothing.',
        );
        return 0;
    }
    $this->logger->debug('Start processing queue messages.');
    $count = 0;
    $handlerCallback = function (MessageInterface $message) use (&$max, &$count): bool {
        if (($max > 0 && $max <= $count) || !$this->handle($message)) {
            return false;
        }
        $count++;
        return true;
    };
    $this->adapter->runExisting($handlerCallback);
    $this->logger->info(
        'Processed {count} queue messages.',
        ['count' => $count],
    );
    return $count;
}

            
status() public method

public \Yiisoft\Queue\MessageStatus status ( string|integer $id )
$id string|integer

                public function status(string|int $id): MessageStatus
{
    if ($this->isSynchronous()) {
        return MessageStatus::NOT_FOUND;
    }
    return $this->adapter->status($id);
}

            
withMiddlewares() public method

public self withMiddlewares ( mixed $middlewareDefinitions )
$middlewareDefinitions mixed

                public function withMiddlewares(mixed ...$middlewareDefinitions): self
{
    $instance = clone $this;
    $instance->setMiddlewaresAndPrepareDispatcher($middlewareDefinitions);
    return $instance;
}

            
withMiddlewaresAdded() public method

public self withMiddlewaresAdded ( mixed $middlewareDefinitions )
$middlewareDefinitions mixed

                public function withMiddlewaresAdded(mixed ...$middlewareDefinitions): self
{
    $instance = clone $this;
    $instance->setMiddlewaresAndPrepareDispatcher([...array_values($instance->middlewareDefinitions), ...array_values($middlewareDefinitions)]);
    return $instance;
}