Final Class Yiisoft\Queue\Queue
| Inheritance | Yiisoft\Queue\Queue |
|---|---|
| Implements | Yiisoft\Queue\QueueInterface |
Public Methods
Method Details
| public mixed __clone ( ) |
public function __clone()
{
$finalPushHandler = $this->createFinalPushHandler();
$this->baseDispatcher = $this->baseDispatcher->withFinishHandler($finalPushHandler);
$this->dispatcher = $this->dispatcher->withFinishHandler($finalPushHandler);
}
| public mixed __construct ( Yiisoft\Queue\Worker\WorkerInterface $worker, Yiisoft\Queue\Cli\LoopInterface $loop, \Psr\Log\LoggerInterface $logger, Yiisoft\Queue\Middleware\Push\PushMiddlewareConfig $middlewareConfig, Yiisoft\Queue\Adapter\AdapterInterface|null $adapter = null, string|\BackedEnum $name = QueueProviderInterface::DEFAULT_QUEUE, mixed $middlewareDefinitions ) | ||
| $worker | Yiisoft\Queue\Worker\WorkerInterface |
The worker that processes messages. |
| $loop | Yiisoft\Queue\Cli\LoopInterface |
The loop for controlling message processing. |
| $logger | \Psr\Log\LoggerInterface |
The logger for debug and informational messages. |
| $middlewareConfig | Yiisoft\Queue\Middleware\Push\PushMiddlewareConfig |
The push middleware configuration: factory and common middleware definitions. |
| $adapter | Yiisoft\Queue\Adapter\AdapterInterface|null |
The message adapter ( |
| $name | string|\BackedEnum |
The queue name. |
| $middlewareDefinitions | mixed |
Queue-specific middleware definitions. |
public function __construct(
private readonly WorkerInterface $worker,
private readonly LoopInterface $loop,
private readonly LoggerInterface $logger,
PushMiddlewareConfig $middlewareConfig,
private readonly ?AdapterInterface $adapter = null,
string|BackedEnum $name = QueueProviderInterface::DEFAULT_QUEUE,
mixed ...$middlewareDefinitions,
) {
$this->name = StringNormalizer::normalize($name);
$this->baseDispatcher = new PushMiddlewareDispatcher(
$middlewareConfig->middlewareFactory,
$middlewareConfig->commonMiddlewareDefinitions,
$this->createFinalPushHandler(),
);
$this->setMiddlewaresAndPrepareDispatcher($middlewareDefinitions);
}
| public void listen ( ) |
public function listen(): void
{
if ($this->isSynchronous()) {
$this->logger->info('Cannot listen without an adapter. Queue is in synchronous mode.');
return;
}
$this->logger->info('Start listening to the queue.');
$this->adapter->subscribe(fn(MessageInterface $message) => $this->handle($message));
$this->logger->info('Finish listening to the queue.');
}
| public Yiisoft\Queue\Message\MessageInterface push ( Yiisoft\Queue\Message\MessageInterface $message ) | ||
| $message | Yiisoft\Queue\Message\MessageInterface | |
public function push(MessageInterface $message): MessageInterface
{
$this->logger->debug(
'Preparing to push message with message type "{messageType}".',
['messageType' => $message->getType()],
);
$message = $this->dispatcher->dispatch($message);
if ($this->isSynchronous()) {
$this->logger->info(
'Processed message with message type "{messageType}" synchronously.',
['messageType' => $message->getType()],
);
return $message;
}
$messageId = IdEnvelope::fromMessage($message)->getId();
if ($messageId === null) {
$this->logger->info(
'Pushed message with message type "{messageType}" to the queue. ID doesn\'t assigned.',
['messageType' => $message->getType()],
);
} else {
$this->logger->info(
'Pushed message with message type "{messageType}" to the queue. Assigned ID #{id}.',
['messageType' => $message->getType(), 'id' => $messageId],
);
}
return $message;
}
| public integer run ( integer $max = 0 ) | ||
| $max | integer | |
public function run(int $max = 0): int
{
if ($this->isSynchronous()) {
$this->logger->debug(
'Queue is in synchronous mode (no adapter). Messages are processed on push. run() does nothing.',
);
return 0;
}
$this->logger->debug('Start processing queue messages.');
$count = 0;
$handlerCallback = function (MessageInterface $message) use (&$max, &$count): bool {
if (($max > 0 && $max <= $count) || !$this->handle($message)) {
return false;
}
$count++;
return true;
};
$this->adapter->runExisting($handlerCallback);
$this->logger->info(
'Processed {count} queue messages.',
['count' => $count],
);
return $count;
}
| public \Yiisoft\Queue\MessageStatus status ( string|integer $id ) | ||
| $id | string|integer | |
public function status(string|int $id): MessageStatus
{
if ($this->isSynchronous()) {
return MessageStatus::NOT_FOUND;
}
return $this->adapter->status($id);
}
| public self withMiddlewares ( mixed $middlewareDefinitions ) | ||
| $middlewareDefinitions | mixed | |
public function withMiddlewares(mixed ...$middlewareDefinitions): self
{
$instance = clone $this;
$instance->setMiddlewaresAndPrepareDispatcher($middlewareDefinitions);
return $instance;
}
| public self withMiddlewaresAdded ( mixed $middlewareDefinitions ) | ||
| $middlewareDefinitions | mixed | |
public function withMiddlewaresAdded(mixed ...$middlewareDefinitions): self
{
$instance = clone $this;
$instance->setMiddlewaresAndPrepareDispatcher([...array_values($instance->middlewareDefinitions), ...array_values($middlewareDefinitions)]);
return $instance;
}
Signup or Login in order to comment.