Class Yiisoft\Queue\Redis\QueueProvider
| Inheritance | Yiisoft\Queue\Redis\QueueProvider |
|---|---|
| Implements | Yiisoft\Queue\Redis\QueueProviderInterface |
Public Methods
Constants
| Constant | Value | Description | Defined By |
|---|---|---|---|
| DEFAULT_CHANNEL_NAME | 'yii-queue' | Yiisoft\Queue\Redis\QueueProvider |
Method Details
| public mixed __construct ( \Redis $redis, string $channelName = self::DEFAULT_CHANNEL_NAME ) | ||
| $redis | \Redis | |
| $channelName | string | |
| throws | \RedisException | |
|---|---|---|
public function __construct(
private \Redis $redis, //redis connection,
private string $channelName = self::DEFAULT_CHANNEL_NAME
) {
}
| public void delete ( string $id ) | ||
| $id | string | |
| throws | \RedisException | |
|---|---|---|
public function delete(string $id): void
{
$this->checkConnection();
$this->redis->zrem("$this->channelName.reserved", $id);
$this->redis->zrem("$this->channelName.delayed", $id);
$this->redis->hdel("$this->channelName.messages", $id);
$this->redis->hdel("$this->channelName.attempts", $id);
}
| public boolean existInReserved ( integer $id ) | ||
| $id | integer | |
| throws | \RedisException | |
|---|---|---|
public function existInReserved(int $id): bool
{
$this->checkConnection();
$exist = $this->redis->hexists("$this->channelName.attempts", (string) $id);
return is_bool($exist) ? $exist : false;
}
| public boolean existInWaiting ( integer $id ) | ||
| $id | integer | |
| throws | \RedisException | |
|---|---|---|
public function existInWaiting(int $id): bool
{
$this->checkConnection();
$exist = $this->redis->hexists("$this->channelName.messages", (string) $id);
return is_bool($exist) ? $exist : false;
}
| public string getChannelName ( ) |
public function getChannelName(): string
{
return $this->channelName;
}
| public integer getId ( ) | ||
| throws | \RedisException | |
|---|---|---|
public function getId(): int
{
$this->checkConnection();
$id = $this->redis->incr("$this->channelName.message_id");
if (is_int($id)) {
return $id;
}
throw new \RuntimeException('Unable to get message id.');
}
| public integer pushMessage ( string $message, array $metadata = [] ) | ||
| $message | string | |
| $metadata | array | |
| throws | \RedisException | |
|---|---|---|
public function pushMessage(string $message, array $metadata = []): int
{
$this->checkConnection();
$id = $this->getId();
$this->redis->hset("$this->channelName.messages", (string) $id, $message);
$delay = isset($metadata['delay']) && is_int($metadata['delay']) ? $metadata['delay'] : 0;
if ($delay > 0) {
$this->redis->zadd("$this->channelName.delayed", time() + $delay, $id);
} else {
$this->redis->lpush("$this->channelName.waiting", $id);
}
return $id;
}
| public Yiisoft\Queue\Redis\Reserve|null reserve ( integer $timeout = 0 ) | ||
| $timeout | integer | |
| throws | \RedisException | |
|---|---|---|
public function reserve(int $timeout = 0): ?Reserve
{
$this->checkConnection();
// Moves delayed and reserved jobs into waiting list with lock for one second
try {
if ($this->redis->set("$this->channelName.moving_lock", 'true', ['NX', 'EX', 1])) {
$this->moveExpired("$this->channelName.delayed");
$this->moveExpired("$this->channelName.reserved");
}
} finally {
$this->redis->del("$this->channelName.moving_lock");
}
$result = $this->redis->brpop("$this->channelName.waiting", $timeout);
if (null === $result || !isset($result[1])) {
return null;
}
$id = $result[1];
if (!is_string($id)) {
return null;
}
$payload = $this->redis->hget("$this->channelName.messages", $id);
if (!is_string($payload)) {
return null;
}
$this->redis->zRem("$this->channelName.reserved", time(), $id);
$this->redis->hincrby("$this->channelName.attempts", $id, 1);
return new Reserve((int) $id, $payload);
}
| public Yiisoft\Queue\Redis\QueueProviderInterface withChannelName ( string $channelName ) | ||
| $channelName | string | |
public function withChannelName(string $channelName): QueueProviderInterface
{
if ($this->channelName === $channelName) {
return $this;
}
return new self($this->redis, $channelName);
}
Signup or Login in order to comment.