0 follower

Final Class Yiisoft\Queue\Worker\Worker

InheritanceYiisoft\Queue\Worker\Worker
ImplementsYiisoft\Queue\Worker\WorkerInterface

Method Details

Hide inherited methods

__construct() public method

public mixed __construct ( array $handlers, \Psr\Log\LoggerInterface $logger, \Yiisoft\Injector\Injector $injector, \Psr\Container\ContainerInterface $container, Yiisoft\Queue\Middleware\Consume\ConsumeMiddlewareDispatcher $consumeMiddlewareDispatcher, Yiisoft\Queue\Middleware\FailureHandling\FailureMiddlewareDispatcher $failureMiddlewareDispatcher, Yiisoft\Queue\Middleware\CallableFactory $callableFactory )
$handlers array
$logger \Psr\Log\LoggerInterface
$injector \Yiisoft\Injector\Injector
$container \Psr\Container\ContainerInterface
$consumeMiddlewareDispatcher Yiisoft\Queue\Middleware\Consume\ConsumeMiddlewareDispatcher
$failureMiddlewareDispatcher Yiisoft\Queue\Middleware\FailureHandling\FailureMiddlewareDispatcher
$callableFactory Yiisoft\Queue\Middleware\CallableFactory

                public function __construct(
    /** @var array<non-empty-string, array|callable|object|string|null> */
    private readonly array $handlers,
    private readonly LoggerInterface $logger,
    private readonly Injector $injector,
    private readonly ContainerInterface $container,
    private readonly ConsumeMiddlewareDispatcher $consumeMiddlewareDispatcher,
    private readonly FailureMiddlewareDispatcher $failureMiddlewareDispatcher,
    private readonly CallableFactory $callableFactory,
) {}

            
process() public method

public Yiisoft\Queue\Message\MessageInterface process ( Yiisoft\Queue\Message\MessageInterface $message, Yiisoft\Queue\QueueInterface $queue )
$message Yiisoft\Queue\Message\MessageInterface
$queue Yiisoft\Queue\QueueInterface
throws Throwable

                public function process(MessageInterface $message, QueueInterface $queue): MessageInterface
{
    $messageId = IdEnvelope::fromMessage($message)->getId();
    if ($messageId === null) {
        $this->logger->info('Processing message without ID.');
    } else {
        $this->logger->info('Processing message #{message}.', ['message' => $messageId]);
    }
    $messageType = $message->getType();
    try {
        $handler = $this->getHandler($messageType);
    } catch (InvalidCallableConfigurationException $exception) {
        throw new RuntimeException(sprintf('Queue handler for message type "%s" does not exist.', $messageType), 0, $exception);
    }
    if ($handler === null) {
        throw new RuntimeException(sprintf('Queue handler for message type "%s" does not exist.', $messageType));
    }
    $request = new ConsumeRequest($message, $queue);
    $closure = fn(MessageInterface $message): mixed => $this->injector->invoke($handler, [$message]);
    try {
        return $this->consumeMiddlewareDispatcher->dispatch($request, $this->createConsumeHandler($closure))->getMessage();
    } catch (Throwable $exception) {
        $request = new FailureHandlingRequest($request->getMessage(), $exception, $request->getQueue());
        try {
            $result = $this->failureMiddlewareDispatcher->dispatch($request, $this->createFailureHandler());
            $this->logger->info($exception->getMessage());
            return $result->getMessage();
        } catch (Throwable $exception) {
            $exception = new MessageFailureException($message, $exception);
            $this->logger->error($exception->getMessage());
            throw $exception;
        }
    }
}