0 follower

Final Class Yiisoft\Queue\Db\Adapter

InheritanceYiisoft\Queue\Db\Adapter
ImplementsYiisoft\Queue\Adapter\AdapterInterface

Public Properties

Hide inherited properties

Property Type Description Defined By
$deleteReleased boolean Ability to delete released messages from table. Yiisoft\Queue\Db\Adapter
$mutex \Yiisoft\Mutex\MutexInterface Mutex interface. Yiisoft\Queue\Db\Adapter
$mutexTimeout integer Mutex timeout. Yiisoft\Queue\Db\Adapter
$tableName string Table name. Yiisoft\Queue\Db\Adapter

Protected Methods

Hide inherited methods

Method Description Defined By
release() Yiisoft\Queue\Db\Adapter
reserve() Takes one message from waiting list and reserves it for handling. Yiisoft\Queue\Db\Adapter

Property Details

Hide inherited properties

$deleteReleased public property

Ability to delete released messages from table.

public boolean $deleteReleased true
$mutex public property

Mutex interface.

public \Yiisoft\Mutex\MutexInterface $mutex null
$mutexTimeout public property

Mutex timeout.

$tableName public property

Table name.

public string $tableName '{{%queue}}'

Method Details

Hide inherited methods

__construct() public method

public mixed __construct ( \Yiisoft\Db\Connection\ConnectionInterface $db, \Yiisoft\Queue\Message\MessageSerializerInterface $serializer, \Yiisoft\Queue\Cli\LoopInterface $loop, \Yiisoft\Mutex\MutexFactoryInterface $mutexFactory, string $channel QueueFactory::DEFAULT_CHANNEL_NAME )
$db \Yiisoft\Db\Connection\ConnectionInterface
$serializer \Yiisoft\Queue\Message\MessageSerializerInterface
$loop \Yiisoft\Queue\Cli\LoopInterface
$mutexFactory \Yiisoft\Mutex\MutexFactoryInterface
$channel string

                public function __construct(
    private ConnectionInterface $db,
    private MessageSerializerInterface $serializer,
    private LoopInterface $loop,
    private MutexFactoryInterface $mutexFactory,
    private string $channel = QueueFactory::DEFAULT_CHANNEL_NAME,
) {
    $this->mutex = $this->mutexFactory->create(self::class . $this->channel);
}

            
push() public method

public \Yiisoft\Queue\Message\MessageInterface push ( \Yiisoft\Queue\Message\MessageInterface $message )
$message \Yiisoft\Queue\Message\MessageInterface

                public function push(MessageInterface $message): MessageInterface
{
    $metadata = $message->getMetadata();
    $this->db->createCommand()->insert($this->tableName, [
        'channel' => $this->channel,
        'job' => $this->serializer->serialize($message),
        'pushed_at' => time(),
        'ttr' => $metadata['ttr'] ?? 300,
        'delay' => $metadata['delay'] ?? 0,
        'priority' => $metadata['priority'] ?? 1024,
    ])->execute();
    $tableSchema = $this->db->getTableSchema($this->tableName);
    $key = $tableSchema ? $this->db->getLastInsertID($tableSchema->getSequenceName()) : $tableSchema;
    return new IdEnvelope($message, $key);
}

            
release() protected method

protected void release ( array $payload )
$payload array

                protected function release($payload): void
{
    if ($this->deleteReleased) {
        $this->db->createCommand()->delete(
            $this->tableName,
            ['id' => $payload['id']]
        )->execute();
    } else {
        $this->db->createCommand()->update(
            $this->tableName,
            ['done_at' => time()],
            ['id' => $payload['id']]
        )->execute();
    }
}

            
reserve() protected method

Takes one message from waiting list and reserves it for handling.

protected array|null reserve ( )
return array|null

Payload

throws Exception

in case it hasn't waited the lock

                protected function reserve(): array|null
{
    // TWK TODO what is useMaster in Yii3 return $this->db->useMaster(function () {
    if (!$this->mutex->acquire($this->mutexTimeout)) {
        throw new \Exception('Has not waited the lock.');
    }
    try {
        $this->moveExpired();
        // Reserve one message
        $payload = (new Query($this->db))
        ->from($this->tableName)
        ->andWhere(['channel' => $this->channel, 'reserved_at' => null])
        ->andWhere('[[pushed_at]] <= :time - [[delay]]', [':time' => time()])
        ->orderBy(['priority' => SORT_ASC, 'id' => SORT_ASC])
        ->limit(1)
        ->one();
        if (is_array($payload)) {
            $payload['reserved_at'] = time();
            $payload['attempt'] = (int) $payload['attempt'] + 1;
            $this->db->createCommand()->update($this->tableName, [
                'reserved_at' => $payload['reserved_at'],
                'attempt' => $payload['attempt'],
            ], [
                'id' => $payload['id'],
            ])->execute();
            // pgsql
            if (is_resource($payload['job'])) {
                $payload['job'] = stream_get_contents($payload['job']);
            }
        }
    } finally {
        $this->mutex->release();
    }
    return $payload;
    // TWK TODO ??? });
}

            
run() public method

Listens queue and runs each job.

public void run ( callable $handlerCallback, boolean $repeat, integer $timeout 0 )
$handlerCallback callable

The handler which will handle messages. Returns false if it cannot continue handling messages

$repeat boolean

Whether to continue listening when queue is empty.

$timeout integer

                public function run(callable $handlerCallback, bool $repeat, int $timeout = 0): void
{
    while ($this->loop->canContinue()) {
        if ($payload = $this->reserve()) {
            if ($handlerCallback($this->serializer->unserialize($payload['job']))) {
                $this->release($payload);
            }
            continue;
        }
        if (!$repeat) {
            break;
        }
        if ($timeout > 0) {
            sleep($timeout);
        }
    }
}

            
runExisting() public method

public void runExisting ( callable $handlerCallback )
$handlerCallback callable

                public function runExisting(callable $handlerCallback): void
{
    $this->run($handlerCallback, false);
}

            
status() public method

public \Yiisoft\Queue\Enum\JobStatus status ( string|integer $id )
$id string|integer

                public function status(string|int $id): JobStatus
{
    $id = (int) $id;
    $payload = (new Query($this->db))
    ->from($this->tableName)
    ->where(['id' => $id])
    ->one();
    if (!$payload) {
        if ($this->deleteReleased) {
            return JobStatus::done();
        }
        throw new InvalidArgumentException("Unknown message ID: $id.");
    }
    if (!$payload['reserved_at']) {
        return JobStatus::waiting();
    }
    if (!$payload['done_at']) {
        return JobStatus::reserved();
    }
    return JobStatus::done();
}

            
subscribe() public method

public void subscribe ( callable $handlerCallback )
$handlerCallback callable

                public function subscribe(callable $handlerCallback): void
{
    $this->run($handlerCallback, true, 5); // TWK TODO timeout should not be hard coded
}

            
withChannel() public method

public self withChannel ( string $channel )
$channel string

                public function withChannel(string $channel): self
{
    if ($channel === $this->channel) {
        return $this;
    }
    $new = clone $this;
    $new->channel = $channel;
    $new->mutex = $this->mutexFactory->create(self::class . $this->channel);
    return $new;
}