0 follower

Final Class Yiisoft\Queue\Redis\Adapter

InheritanceYiisoft\Queue\Redis\Adapter
ImplementsYiisoft\Queue\Adapter\AdapterInterface

Method Details

Hide inherited methods

__construct() public method

public mixed __construct ( Yiisoft\Queue\Redis\QueueProviderInterface $provider, \Yiisoft\Queue\Message\MessageSerializerInterface $serializer, \Yiisoft\Queue\Cli\LoopInterface $loop, integer $timeout 3 )
$provider Yiisoft\Queue\Redis\QueueProviderInterface
$serializer \Yiisoft\Queue\Message\MessageSerializerInterface
$loop \Yiisoft\Queue\Cli\LoopInterface
$timeout integer

                public function __construct(
    private QueueProviderInterface $provider,
    private MessageSerializerInterface $serializer,
    private LoopInterface $loop,
    private int $timeout = 3
) {
}

            
getChannel() public method

public string getChannel ( )

                public function getChannel(): string
{
    return $this->provider->getChannelName();
}

            
push() public method

public \Yiisoft\Queue\Message\MessageInterface push ( \Yiisoft\Queue\Message\MessageInterface $message )
$message \Yiisoft\Queue\Message\MessageInterface

                public function push(MessageInterface $message): MessageInterface
{
    $payload = $this->serializer->serialize($message);
    $id = $this->provider->pushMessage($payload, $message->getMetadata());
    return new IdEnvelope($message, $id);
}

            
runExisting() public method

public void runExisting ( callable $handlerCallback )
$handlerCallback callable

                public function runExisting(callable $handlerCallback): void
{
    $result = true;
    while ($result) {
        $message = $this->reserve();
        if (null === $message) {
            break;
        }
        $result = $handlerCallback($message);
        if ($result) {
            $this->provider->delete((string) $message->getId());
        }
    }
}

            
status() public method

public \Yiisoft\Queue\JobStatus status ( integer|string $id )
$id integer|string

                public function status(int|string $id): JobStatus
{
    $id = (int) $id;
    if ($id <= 0) {
        throw new \InvalidArgumentException('This adapter IDs start with 1.');
    }
    if ($this->provider->existInReserved($id)) {
        return JobStatus::RESERVED;
    }
    if ($this->provider->existInWaiting($id)) {
        return JobStatus::WAITING;
    }
    return JobStatus::DONE;
}

            
subscribe() public method

public void subscribe ( callable $handlerCallback )
$handlerCallback callable

                public function subscribe(callable $handlerCallback): void
{
    while ($this->loop->canContinue()) {
        $message = $this->reserve();
        if (null === $message) {
            continue;
        }
        $result = $handlerCallback($message);
        if ($result) {
            $this->provider->delete((string) $message->getId());
        }
    }
}

            
withChannel() public method

public \Yiisoft\Queue\Adapter\AdapterInterface withChannel ( \BackedEnum|string $channel )
$channel \BackedEnum|string

                public function withChannel(BackedEnum|string $channel): AdapterInterface
{
    $adapter = clone $this;
    $channelName = is_string($channel) ? $channel : (string) $channel->value;
    $adapter->provider = $this->provider->withChannelName($channelName);
    return $adapter;
}