0 follower

Final Class Yiisoft\Queue\Worker\Worker

InheritanceYiisoft\Queue\Worker\Worker
ImplementsYiisoft\Queue\Worker\WorkerInterface

Method Details

Hide inherited methods

__construct() public method

public mixed __construct ( array $handlers, \Psr\Log\LoggerInterface $logger, \Yiisoft\Injector\Injector $injector, \Psr\Container\ContainerInterface $container, Yiisoft\Queue\Middleware\Consume\ConsumeMiddlewareDispatcher $consumeMiddlewareDispatcher, Yiisoft\Queue\Middleware\FailureHandling\FailureMiddlewareDispatcher $failureMiddlewareDispatcher )
$handlers array
$logger \Psr\Log\LoggerInterface
$injector \Yiisoft\Injector\Injector
$container \Psr\Container\ContainerInterface
$consumeMiddlewareDispatcher Yiisoft\Queue\Middleware\Consume\ConsumeMiddlewareDispatcher
$failureMiddlewareDispatcher Yiisoft\Queue\Middleware\FailureHandling\FailureMiddlewareDispatcher

                public function __construct(
    private readonly array $handlers,
    private readonly LoggerInterface $logger,
    private readonly Injector $injector,
    private readonly ContainerInterface $container,
    private readonly ConsumeMiddlewareDispatcher $consumeMiddlewareDispatcher,
    private readonly FailureMiddlewareDispatcher $failureMiddlewareDispatcher,
) {
}

            
process() public method

public Yiisoft\Queue\Message\MessageInterface process ( Yiisoft\Queue\Message\MessageInterface $message, Yiisoft\Queue\QueueInterface $queue )
$message Yiisoft\Queue\Message\MessageInterface
$queue Yiisoft\Queue\QueueInterface
throws Throwable

                public function process(MessageInterface $message, QueueInterface $queue): MessageInterface
{
    $this->logger->info('Processing message #{message}.', ['message' => $message->getMetadata()[IdEnvelope::MESSAGE_ID_KEY] ?? 'null']);
    $name = $message->getHandlerName();
    $handler = $this->getHandler($name);
    if ($handler === null) {
        throw new RuntimeException(sprintf('Queue handler with name "%s" does not exist', $name));
    }
    $request = new ConsumeRequest($message, $queue);
    $closure = fn (MessageInterface $message): mixed => $this->injector->invoke($handler, [$message]);
    try {
        return $this->consumeMiddlewareDispatcher->dispatch($request, $this->createConsumeHandler($closure))->getMessage();
    } catch (Throwable $exception) {
        $request = new FailureHandlingRequest($request->getMessage(), $exception, $request->getQueue());
        try {
            $result = $this->failureMiddlewareDispatcher->dispatch($request, $this->createFailureHandler());
            $this->logger->info($exception->getMessage());
            return $result->getMessage();
        } catch (Throwable $exception) {
            $exception = new JobFailureException($message, $exception);
            $this->logger->error($exception->getMessage());
            throw $exception;
        }
    }
}