0 follower

Final Class Yiisoft\Queue\AMQP\Adapter

InheritanceYiisoft\Queue\AMQP\Adapter
ImplementsYiisoft\Queue\Adapter\AdapterInterface

Method Details

Hide inherited methods

__construct() public method

public mixed __construct ( Yiisoft\Queue\AMQP\QueueProviderInterface $queueProvider, \Yiisoft\Queue\Message\MessageSerializerInterface $serializer, \Yiisoft\Queue\Cli\LoopInterface $loop )
$queueProvider Yiisoft\Queue\AMQP\QueueProviderInterface
$serializer \Yiisoft\Queue\Message\MessageSerializerInterface
$loop \Yiisoft\Queue\Cli\LoopInterface

                public function __construct(
    private QueueProviderInterface $queueProvider,
    private readonly MessageSerializerInterface $serializer,
    private readonly LoopInterface $loop,
) {
}

            
getChannel() public method

public string getChannel ( )

                public function getChannel(): string
{
    return $this->queueProvider->getQueueSettings()->getName();
}

            
getQueueProvider() public method

public Yiisoft\Queue\AMQP\QueueProviderInterface getQueueProvider ( )

                public function getQueueProvider(): QueueProviderInterface
{
    return $this->queueProvider;
}

            
push() public method

public \Yiisoft\Queue\Message\MessageInterface push ( \Yiisoft\Queue\Message\MessageInterface $message )
$message \Yiisoft\Queue\Message\MessageInterface

                public function push(MessageInterface $message): MessageInterface
{
    $this->amqpMessage ??= new AMQPMessage(
        '',
        $this->queueProvider->getMessageProperties(),
    );
    $amqpMessage = $this->amqpMessage;
    $payload = $this->serializer->serialize($message);
    $amqpMessage->setBody($payload);
    $exchangeSettings = $this->queueProvider->getExchangeSettings();
    $this->queueProvider
        ->getChannel()
        ->basic_publish(
            $amqpMessage,
            $exchangeSettings?->getName() ?? '',
            $exchangeSettings ? '' : $this->queueProvider
                ->getQueueSettings()
                ->getName()
        );
    return $message;
}

            
runExisting() public method

public void runExisting ( callable $handlerCallback )
$handlerCallback callable

                public function runExisting(callable $handlerCallback): void
{
    (new ExistingMessagesConsumer($this->queueProvider, $this->serializer))->consume($handlerCallback);
}

            
status() public method

public never status ( integer|string $id )
$id integer|string

                public function status(int|string $id): JobStatus
{
    throw new NotImplementedException('Status check is not supported by the adapter ' . self::class . '.');
}

            
subscribe() public method

public void subscribe ( callable $handlerCallback )
$handlerCallback callable

                public function subscribe(callable $handlerCallback): void
{
    $channel = $this->queueProvider->getChannel();
    $channel->basic_consume(
        $this->queueProvider
            ->getQueueSettings()
            ->getName(),
        $this->queueProvider
            ->getQueueSettings()
            ->getName(),
        false,
        false,
        false,
        true,
        function (AMQPMessage $amqpMessage) use ($handlerCallback, $channel): void {
            try {
                $handlerCallback($this->serializer->unserialize($amqpMessage->getBody()));
                $channel->basic_ack($amqpMessage->getDeliveryTag());
            } catch (Throwable $exception) {
                $consumerTag = $amqpMessage->getConsumerTag();
                if ($consumerTag !== null) {
                    $channel->basic_cancel($consumerTag);
                }
                throw $exception;
            }
        }
    );
    while ($this->loop->canContinue()) {
        $channel->wait();
    }
}

            
withChannel() public method

public self withChannel ( \BackedEnum|string $channel )
$channel \BackedEnum|string

                public function withChannel(BackedEnum|string $channel): self
{
    $instance = clone $this;
    $channelName = is_string($channel) ? $channel : (string) $channel->value;
    $instance->queueProvider = $this->queueProvider->withChannelName($channelName);
    $instance->amqpMessage = null;
    return $instance;
}

            
withQueueProvider() public method

public self withQueueProvider ( Yiisoft\Queue\AMQP\QueueProviderInterface $queueProvider )
$queueProvider Yiisoft\Queue\AMQP\QueueProviderInterface

                public function withQueueProvider(QueueProviderInterface $queueProvider): self
{
    $new = clone $this;
    $new->queueProvider = $queueProvider;
    return $new;
}