Final Class Yiisoft\Queue\AMQP\ExistingMessagesConsumer
| Inheritance | Yiisoft\Queue\AMQP\ExistingMessagesConsumer |
|---|
Public Methods
| Method | Description | Defined By |
|---|---|---|
| __construct() | Yiisoft\Queue\AMQP\ExistingMessagesConsumer | |
| consume() | Yiisoft\Queue\AMQP\ExistingMessagesConsumer |
Method Details
| public mixed __construct ( Yiisoft\Queue\AMQP\QueueProviderInterface $queueProvider, \Yiisoft\Queue\Message\MessageSerializerInterface $serializer ) | ||
| $queueProvider | Yiisoft\Queue\AMQP\QueueProviderInterface | |
| $serializer | \Yiisoft\Queue\Message\MessageSerializerInterface | |
public function __construct(
private readonly QueueProviderInterface $queueProvider,
private readonly MessageSerializerInterface $serializer
) {
}
| public void consume ( callable $callback ) | ||
| $callback | callable | |
public function consume(callable $callback): void
{
$channel = $this->queueProvider->getChannel();
$consumerTag = uniqid(more_entropy: true);
try {
$channel->basic_consume(
$this->queueProvider->getQueueSettings()->getName(),
$consumerTag,
false,
false,
false,
false,
function (AMQPMessage $amqpMessage) use ($callback, $channel): void {
try {
$message = $this->serializer->unserialize($amqpMessage->getBody());
if ($this->messageConsumed = $callback($message)) {
$channel->basic_ack($amqpMessage->getDeliveryTag());
} else {
$channel->basic_nack($amqpMessage->getDeliveryTag(), false, true);
}
} catch (Throwable $exception) {
$this->messageConsumed = false;
$channel->basic_nack($amqpMessage->getDeliveryTag(), false, true);
throw $exception;
}
}
);
do {
$this->messageConsumed = false;
$channel->wait(null, true);
} while ($this->messageConsumed === true);
} finally {
$channel->basic_cancel($consumerTag);
$this->queueProvider->channelClose();
}
}
Signup or Login in order to comment.