Final Class Yiisoft\Queue\AMQP\Adapter
| Inheritance | Yiisoft\Queue\AMQP\Adapter |
|---|---|
| Implements | Yiisoft\Queue\Adapter\AdapterInterface |
Public Methods
Method Details
| public mixed __construct ( Yiisoft\Queue\AMQP\QueueProviderInterface $queueProvider, \Yiisoft\Queue\Message\MessageSerializerInterface $serializer, \Yiisoft\Queue\Cli\LoopInterface $loop ) | ||
| $queueProvider | Yiisoft\Queue\AMQP\QueueProviderInterface | |
| $serializer | \Yiisoft\Queue\Message\MessageSerializerInterface | |
| $loop | \Yiisoft\Queue\Cli\LoopInterface | |
public function __construct(
private QueueProviderInterface $queueProvider,
private readonly MessageSerializerInterface $serializer,
private readonly LoopInterface $loop,
) {
}
| public string getChannel ( ) |
public function getChannel(): string
{
return $this->queueProvider->getQueueSettings()->getName();
}
| public Yiisoft\Queue\AMQP\QueueProviderInterface getQueueProvider ( ) |
public function getQueueProvider(): QueueProviderInterface
{
return $this->queueProvider;
}
| public \Yiisoft\Queue\Message\MessageInterface push ( \Yiisoft\Queue\Message\MessageInterface $message ) | ||
| $message | \Yiisoft\Queue\Message\MessageInterface | |
public function push(MessageInterface $message): MessageInterface
{
$this->amqpMessage ??= new AMQPMessage(
'',
$this->queueProvider->getMessageProperties(),
);
$amqpMessage = $this->amqpMessage;
$payload = $this->serializer->serialize($message);
$amqpMessage->setBody($payload);
$exchangeSettings = $this->queueProvider->getExchangeSettings();
$this->queueProvider
->getChannel()
->basic_publish(
$amqpMessage,
$exchangeSettings?->getName() ?? '',
$exchangeSettings ? '' : $this->queueProvider
->getQueueSettings()
->getName()
);
return $message;
}
| public void runExisting ( callable $handlerCallback ) | ||
| $handlerCallback | callable | |
public function runExisting(callable $handlerCallback): void
{
(new ExistingMessagesConsumer($this->queueProvider, $this->serializer))->consume($handlerCallback);
}
| public never status ( integer|string $id ) | ||
| $id | integer|string | |
public function status(int|string $id): JobStatus
{
throw new NotImplementedException('Status check is not supported by the adapter ' . self::class . '.');
}
| public void subscribe ( callable $handlerCallback ) | ||
| $handlerCallback | callable | |
public function subscribe(callable $handlerCallback): void
{
$channel = $this->queueProvider->getChannel();
$channel->basic_consume(
$this->queueProvider
->getQueueSettings()
->getName(),
$this->queueProvider
->getQueueSettings()
->getName(),
false,
false,
false,
true,
function (AMQPMessage $amqpMessage) use ($handlerCallback, $channel): void {
try {
$handlerCallback($this->serializer->unserialize($amqpMessage->getBody()));
$channel->basic_ack($amqpMessage->getDeliveryTag());
} catch (Throwable $exception) {
$consumerTag = $amqpMessage->getConsumerTag();
if ($consumerTag !== null) {
$channel->basic_cancel($consumerTag);
}
throw $exception;
}
}
);
while ($this->loop->canContinue()) {
$channel->wait();
}
}
| public self withChannel ( \BackedEnum|string $channel ) | ||
| $channel | \BackedEnum|string | |
public function withChannel(BackedEnum|string $channel): self
{
$instance = clone $this;
$channelName = is_string($channel) ? $channel : (string) $channel->value;
$instance->queueProvider = $this->queueProvider->withChannelName($channelName);
$instance->amqpMessage = null;
return $instance;
}
| public self withQueueProvider ( Yiisoft\Queue\AMQP\QueueProviderInterface $queueProvider ) | ||
| $queueProvider | Yiisoft\Queue\AMQP\QueueProviderInterface | |
public function withQueueProvider(QueueProviderInterface $queueProvider): self
{
$new = clone $this;
$new->queueProvider = $queueProvider;
return $new;
}
Signup or Login in order to comment.