0 follower

Final Class Yiisoft\Queue\Queue

InheritanceYiisoft\Queue\Queue
ImplementsYiisoft\Queue\QueueInterface

Constants

Hide inherited constants

Constant Value Description Defined By
DEFAULT_CHANNEL 'yii-queue' Yiisoft\Queue\QueueInterface

Method Details

Hide inherited methods

__construct() public method

public mixed __construct ( Yiisoft\Queue\Worker\WorkerInterface $worker, Yiisoft\Queue\Cli\LoopInterface $loop, \Psr\Log\LoggerInterface $logger, Yiisoft\Queue\Middleware\Push\PushMiddlewareDispatcher $pushMiddlewareDispatcher, Yiisoft\Queue\Adapter\AdapterInterface|null $adapter null, Yiisoft\Queue\Middleware\Push\MiddlewarePushInterface|callable|array|string $middlewareDefinitions )
$worker Yiisoft\Queue\Worker\WorkerInterface
$loop Yiisoft\Queue\Cli\LoopInterface
$logger \Psr\Log\LoggerInterface
$pushMiddlewareDispatcher Yiisoft\Queue\Middleware\Push\PushMiddlewareDispatcher
$adapter Yiisoft\Queue\Adapter\AdapterInterface|null
$middlewareDefinitions Yiisoft\Queue\Middleware\Push\MiddlewarePushInterface|callable|array|string

                public function __construct(
    private readonly WorkerInterface $worker,
    private readonly LoopInterface $loop,
    private readonly LoggerInterface $logger,
    private readonly PushMiddlewareDispatcher $pushMiddlewareDispatcher,
    private ?AdapterInterface $adapter = null,
    MiddlewarePushInterface|callable|array|string ...$middlewareDefinitions
) {
    $this->middlewareDefinitions = $middlewareDefinitions;
    $this->adapterPushHandler = new AdapterPushHandler();
}

            
getChannel() public method

public string getChannel ( )

                public function getChannel(): string
{
    $this->checkAdapter();
    return $this->adapter->getChannel();
}

            
listen() public method

public void listen ( )

                public function listen(): void
{
    $this->checkAdapter();
    $this->logger->info('Start listening to the queue.');
    $this->adapter->subscribe(fn (MessageInterface $message) => $this->handle($message));
    $this->logger->info('Finish listening to the queue.');
}

            
push() public method

public Yiisoft\Queue\Message\MessageInterface push ( Yiisoft\Queue\Message\MessageInterface $message, Yiisoft\Queue\Middleware\Push\MiddlewarePushInterface|callable|array|string $middlewareDefinitions )
$message Yiisoft\Queue\Message\MessageInterface
$middlewareDefinitions Yiisoft\Queue\Middleware\Push\MiddlewarePushInterface|callable|array|string

                public function push(
    MessageInterface $message,
    MiddlewarePushInterface|callable|array|string ...$middlewareDefinitions
): MessageInterface {
    $this->checkAdapter();
    $this->logger->debug(
        'Preparing to push message with handler name "{handlerName}".',
        ['handlerName' => $message->getHandlerName()]
    );
    $request = new PushRequest($message, $this->adapter);
    $message = $this->pushMiddlewareDispatcher
        ->dispatch($request, $this->createPushHandler(...$middlewareDefinitions))
        ->getMessage();
    /** @var string $messageId */
    $messageId = $message->getMetadata()[IdEnvelope::MESSAGE_ID_KEY] ?? 'null';
    $this->logger->info(
        'Pushed message with handler name "{handlerName}" to the queue. Assigned ID #{id}.',
        ['handlerName' => $message->getHandlerName(), 'id' => $messageId]
    );
    return $message;
}

            
run() public method

public integer run ( integer $max 0 )
$max integer

                public function run(int $max = 0): int
{
    $this->checkAdapter();
    $this->logger->debug('Start processing queue messages.');
    $count = 0;
    $handlerCallback = function (MessageInterface $message) use (&$max, &$count): bool {
        if (($max > 0 && $max <= $count) || !$this->handle($message)) {
            return false;
        }
        $count++;
        return true;
    };
    $this->adapter->runExisting($handlerCallback);
    $this->logger->info(
        'Processed {count} queue messages.',
        ['count' => $count]
    );
    return $count;
}

            
status() public method

public \Yiisoft\Queue\JobStatus status ( string|integer $id )
$id string|integer

                public function status(string|int $id): JobStatus
{
    $this->checkAdapter();
    return $this->adapter->status($id);
}

            
withAdapter() public method

public Yiisoft\Queue\Queue withAdapter ( Yiisoft\Queue\Adapter\AdapterInterface $adapter )
$adapter Yiisoft\Queue\Adapter\AdapterInterface

                public function withAdapter(AdapterInterface $adapter): static
{
    $new = clone $this;
    $new->adapter = $adapter;
    return $new;
}

            
withMiddlewares() public method

public self withMiddlewares ( Yiisoft\Queue\Middleware\Push\MiddlewarePushInterface|callable|array|string $middlewareDefinitions )
$middlewareDefinitions Yiisoft\Queue\Middleware\Push\MiddlewarePushInterface|callable|array|string

                public function withMiddlewares(MiddlewarePushInterface|callable|array|string ...$middlewareDefinitions): self
{
    $instance = clone $this;
    $instance->middlewareDefinitions = $middlewareDefinitions;
    return $instance;
}

            
withMiddlewaresAdded() public method

public self withMiddlewaresAdded ( Yiisoft\Queue\Middleware\Push\MiddlewarePushInterface|callable|array|string $middlewareDefinitions )
$middlewareDefinitions Yiisoft\Queue\Middleware\Push\MiddlewarePushInterface|callable|array|string

                public function withMiddlewaresAdded(MiddlewarePushInterface|callable|array|string ...$middlewareDefinitions): self
{
    $instance = clone $this;
    $instance->middlewareDefinitions = [...array_values($instance->middlewareDefinitions), ...array_values($middlewareDefinitions)];
    return $instance;
}