Final Class Yiisoft\Queue\Db\Adapter
| Inheritance | Yiisoft\Queue\Db\Adapter |
|---|---|
| Implements | Yiisoft\Queue\Adapter\AdapterInterface |
Public Properties
| Property | Type | Description | Defined By |
|---|---|---|---|
| $deleteReleased | boolean | Ability to delete released messages from table. | Yiisoft\Queue\Db\Adapter |
| $mutex | \Yiisoft\Mutex\MutexInterface | Mutex interface. | Yiisoft\Queue\Db\Adapter |
| $mutexTimeout | integer | Mutex timeout. | Yiisoft\Queue\Db\Adapter |
| $tableName | string | Table name. | Yiisoft\Queue\Db\Adapter |
Public Methods
| Method | Description | Defined By |
|---|---|---|
| __construct() | Yiisoft\Queue\Db\Adapter | |
| push() | Yiisoft\Queue\Db\Adapter | |
| run() | Listens queue and runs each job. | Yiisoft\Queue\Db\Adapter |
| runExisting() | Yiisoft\Queue\Db\Adapter | |
| status() | Yiisoft\Queue\Db\Adapter | |
| subscribe() | Yiisoft\Queue\Db\Adapter | |
| withChannel() | Yiisoft\Queue\Db\Adapter |
Protected Methods
| Method | Description | Defined By |
|---|---|---|
| release() | Yiisoft\Queue\Db\Adapter | |
| reserve() | Takes one message from waiting list and reserves it for handling. | Yiisoft\Queue\Db\Adapter |
Property Details
Ability to delete released messages from table.
Method Details
| public mixed __construct ( \Yiisoft\Db\Connection\ConnectionInterface $db, \Yiisoft\Queue\Message\MessageSerializerInterface $serializer, \Yiisoft\Queue\Cli\LoopInterface $loop, \Yiisoft\Mutex\MutexFactoryInterface $mutexFactory, string $channel = QueueFactory::DEFAULT_CHANNEL_NAME ) | ||
| $db | \Yiisoft\Db\Connection\ConnectionInterface | |
| $serializer | \Yiisoft\Queue\Message\MessageSerializerInterface | |
| $loop | \Yiisoft\Queue\Cli\LoopInterface | |
| $mutexFactory | \Yiisoft\Mutex\MutexFactoryInterface | |
| $channel | string | |
public function __construct(
private ConnectionInterface $db,
private MessageSerializerInterface $serializer,
private LoopInterface $loop,
private MutexFactoryInterface $mutexFactory,
private string $channel = QueueFactory::DEFAULT_CHANNEL_NAME,
) {
$this->mutex = $this->mutexFactory->create(self::class . $this->channel);
}
| public \Yiisoft\Queue\Message\MessageInterface push ( \Yiisoft\Queue\Message\MessageInterface $message ) | ||
| $message | \Yiisoft\Queue\Message\MessageInterface | |
public function push(MessageInterface $message): MessageInterface
{
$metadata = $message->getMetadata();
$this->db->createCommand()->insert($this->tableName, [
'channel' => $this->channel,
'job' => $this->serializer->serialize($message),
'pushed_at' => time(),
'ttr' => $metadata['ttr'] ?? 300,
'delay' => $metadata['delay'] ?? 0,
'priority' => $metadata['priority'] ?? 1024,
])->execute();
$tableSchema = $this->db->getTableSchema($this->tableName);
$key = $tableSchema ? $this->db->getLastInsertID($tableSchema->getSequenceName()) : $tableSchema;
return new IdEnvelope($message, $key);
}
| protected void release ( array $payload ) | ||
| $payload | array | |
protected function release($payload): void
{
if ($this->deleteReleased) {
$this->db->createCommand()->delete(
$this->tableName,
['id' => $payload['id']]
)->execute();
} else {
$this->db->createCommand()->update(
$this->tableName,
['done_at' => time()],
['id' => $payload['id']]
)->execute();
}
}
Takes one message from waiting list and reserves it for handling.
| protected array|null reserve ( ) | ||
| return | array|null |
Payload |
|---|---|---|
| throws | Exception |
in case it hasn't waited the lock |
protected function reserve(): array|null
{
// TWK TODO what is useMaster in Yii3 return $this->db->useMaster(function () {
if (!$this->mutex->acquire($this->mutexTimeout)) {
throw new \Exception('Has not waited the lock.');
}
try {
$this->moveExpired();
// Reserve one message
$payload = (new Query($this->db))
->from($this->tableName)
->andWhere(['channel' => $this->channel, 'reserved_at' => null])
->andWhere('[[pushed_at]] <= :time - [[delay]]', [':time' => time()])
->orderBy(['priority' => SORT_ASC, 'id' => SORT_ASC])
->limit(1)
->one();
if (is_array($payload)) {
$payload['reserved_at'] = time();
$payload['attempt'] = (int) $payload['attempt'] + 1;
$this->db->createCommand()->update($this->tableName, [
'reserved_at' => $payload['reserved_at'],
'attempt' => $payload['attempt'],
], [
'id' => $payload['id'],
])->execute();
// pgsql
if (is_resource($payload['job'])) {
$payload['job'] = stream_get_contents($payload['job']);
}
}
} finally {
$this->mutex->release();
}
return $payload;
// TWK TODO ??? });
}
Listens queue and runs each job.
| public void run ( callable $handlerCallback, boolean $repeat, integer $timeout = 0 ) | ||
| $handlerCallback | callable |
The handler which will handle messages. Returns false if it cannot continue handling messages |
| $repeat | boolean |
Whether to continue listening when queue is empty. |
| $timeout | integer | |
public function run(callable $handlerCallback, bool $repeat, int $timeout = 0): void
{
while ($this->loop->canContinue()) {
if ($payload = $this->reserve()) {
if ($handlerCallback($this->serializer->unserialize($payload['job']))) {
$this->release($payload);
}
continue;
}
if (!$repeat) {
break;
}
if ($timeout > 0) {
sleep($timeout);
}
}
}
| public void runExisting ( callable $handlerCallback ) | ||
| $handlerCallback | callable | |
public function runExisting(callable $handlerCallback): void
{
$this->run($handlerCallback, false);
}
| public \Yiisoft\Queue\Enum\JobStatus status ( string|integer $id ) | ||
| $id | string|integer | |
public function status(string|int $id): JobStatus
{
$id = (int) $id;
$payload = (new Query($this->db))
->from($this->tableName)
->where(['id' => $id])
->one();
if (!$payload) {
if ($this->deleteReleased) {
return JobStatus::done();
}
throw new InvalidArgumentException("Unknown message ID: $id.");
}
if (!$payload['reserved_at']) {
return JobStatus::waiting();
}
if (!$payload['done_at']) {
return JobStatus::reserved();
}
return JobStatus::done();
}
| public void subscribe ( callable $handlerCallback ) | ||
| $handlerCallback | callable | |
public function subscribe(callable $handlerCallback): void
{
$this->run($handlerCallback, true, 5); // TWK TODO timeout should not be hard coded
}
| public self withChannel ( string $channel ) | ||
| $channel | string | |
public function withChannel(string $channel): self
{
if ($channel === $this->channel) {
return $this;
}
$new = clone $this;
$new->channel = $channel;
$new->mutex = $this->mutexFactory->create(self::class . $this->channel);
return $new;
}
Signup or Login in order to comment.