Final Class Yiisoft\Queue\Worker\Worker
| Inheritance | Yiisoft\Queue\Worker\Worker |
|---|---|
| Implements | Yiisoft\Queue\Worker\WorkerInterface |
Public Methods
| Method | Description | Defined By |
|---|---|---|
| __construct() | Yiisoft\Queue\Worker\Worker | |
| process() | Yiisoft\Queue\Worker\Worker |
Method Details
| public mixed __construct ( array $handlers, \Psr\Log\LoggerInterface $logger, \Yiisoft\Injector\Injector $injector, \Psr\Container\ContainerInterface $container, Yiisoft\Queue\Middleware\Consume\ConsumeMiddlewareDispatcher $consumeMiddlewareDispatcher, Yiisoft\Queue\Middleware\FailureHandling\FailureMiddlewareDispatcher $failureMiddlewareDispatcher ) | ||
| $handlers | array | |
| $logger | \Psr\Log\LoggerInterface | |
| $injector | \Yiisoft\Injector\Injector | |
| $container | \Psr\Container\ContainerInterface | |
| $consumeMiddlewareDispatcher | Yiisoft\Queue\Middleware\Consume\ConsumeMiddlewareDispatcher | |
| $failureMiddlewareDispatcher | Yiisoft\Queue\Middleware\FailureHandling\FailureMiddlewareDispatcher | |
public function __construct(
private readonly array $handlers,
private readonly LoggerInterface $logger,
private readonly Injector $injector,
private readonly ContainerInterface $container,
private readonly ConsumeMiddlewareDispatcher $consumeMiddlewareDispatcher,
private readonly FailureMiddlewareDispatcher $failureMiddlewareDispatcher,
) {
}
| public Yiisoft\Queue\Message\MessageInterface process ( Yiisoft\Queue\Message\MessageInterface $message, Yiisoft\Queue\QueueInterface $queue ) | ||
| $message | Yiisoft\Queue\Message\MessageInterface | |
| $queue | Yiisoft\Queue\QueueInterface | |
| throws | Throwable | |
|---|---|---|
public function process(MessageInterface $message, QueueInterface $queue): MessageInterface
{
$this->logger->info('Processing message #{message}.', ['message' => $message->getMetadata()[IdEnvelope::MESSAGE_ID_KEY] ?? 'null']);
$name = $message->getHandlerName();
$handler = $this->getHandler($name);
if ($handler === null) {
throw new RuntimeException(sprintf('Queue handler with name "%s" does not exist', $name));
}
$request = new ConsumeRequest($message, $queue);
$closure = fn (MessageInterface $message): mixed => $this->injector->invoke($handler, [$message]);
try {
return $this->consumeMiddlewareDispatcher->dispatch($request, $this->createConsumeHandler($closure))->getMessage();
} catch (Throwable $exception) {
$request = new FailureHandlingRequest($request->getMessage(), $exception, $request->getQueue());
try {
$result = $this->failureMiddlewareDispatcher->dispatch($request, $this->createFailureHandler());
$this->logger->info($exception->getMessage());
return $result->getMessage();
} catch (Throwable $exception) {
$exception = new JobFailureException($message, $exception);
$this->logger->error($exception->getMessage());
throw $exception;
}
}
}
Signup or Login in order to comment.