0 follower

Final Class Yiisoft\Queue\AMQP\ExistingMessagesConsumer

InheritanceYiisoft\Queue\AMQP\ExistingMessagesConsumer

Method Details

Hide inherited methods

__construct() public method

public mixed __construct ( Yiisoft\Queue\AMQP\QueueProviderInterface $queueProvider, \Yiisoft\Queue\Message\MessageSerializerInterface $serializer )
$queueProvider Yiisoft\Queue\AMQP\QueueProviderInterface
$serializer \Yiisoft\Queue\Message\MessageSerializerInterface

                public function __construct(
    private readonly QueueProviderInterface $queueProvider,
    private readonly MessageSerializerInterface $serializer
) {
}

            
consume() public method

public void consume ( callable $callback )
$callback callable

                public function consume(callable $callback): void
{
    $channel = $this->queueProvider->getChannel();
    $consumerTag = uniqid(more_entropy: true);
    try {
        $channel->basic_consume(
            $this->queueProvider->getQueueSettings()->getName(),
            $consumerTag,
            false,
            false,
            false,
            false,
            function (AMQPMessage $amqpMessage) use ($callback, $channel): void {
                try {
                    $message = $this->serializer->unserialize($amqpMessage->getBody());
                    if ($this->messageConsumed = $callback($message)) {
                        $channel->basic_ack($amqpMessage->getDeliveryTag());
                    } else {
                        $channel->basic_nack($amqpMessage->getDeliveryTag(), false, true);
                    }
                } catch (Throwable $exception) {
                    $this->messageConsumed = false;
                    $channel->basic_nack($amqpMessage->getDeliveryTag(), false, true);
                    throw $exception;
                }
            }
        );
        do {
            $this->messageConsumed = false;
            $channel->wait(null, true);
        } while ($this->messageConsumed === true);
    } finally {
        $channel->basic_cancel($consumerTag);
        $this->queueProvider->channelClose();
    }
}