0 follower

Final Class Yiisoft\Queue\Adapter\SynchronousAdapter

InheritanceYiisoft\Queue\Adapter\SynchronousAdapter
ImplementsYiisoft\Queue\Adapter\AdapterInterface

Method Details

Hide inherited methods

__construct() public method

public mixed __construct ( Yiisoft\Queue\Worker\WorkerInterface $worker, Yiisoft\Queue\QueueInterface $queue, string|\BackedEnum $channel QueueInterface::DEFAULT_CHANNEL )
$worker Yiisoft\Queue\Worker\WorkerInterface
$queue Yiisoft\Queue\QueueInterface
$channel string|\BackedEnum

                public function __construct(
    private readonly WorkerInterface $worker,
    private readonly QueueInterface $queue,
    string|BackedEnum $channel = QueueInterface::DEFAULT_CHANNEL,
) {
    $this->channel = ChannelNormalizer::normalize($channel);
}

            
__destruct() public method

public mixed __destruct ( )

                public function __destruct()
{
    $this->runExisting(function (MessageInterface $message): bool {
        $this->worker->process($message, $this->queue);
        return true;
    });
}

            
getChannel() public method

public string getChannel ( )

                public function getChannel(): string
{
    return $this->channel;
}

            
push() public method

public Yiisoft\Queue\Message\MessageInterface push ( Yiisoft\Queue\Message\MessageInterface $message )
$message Yiisoft\Queue\Message\MessageInterface

                public function push(MessageInterface $message): MessageInterface
{
    $key = count($this->messages) + $this->current;
    $this->messages[] = $message;
    return new IdEnvelope($message, $key);
}

            
runExisting() public method

public void runExisting ( callable $handlerCallback )
$handlerCallback callable

                public function runExisting(callable $handlerCallback): void
{
    $result = true;
    while (isset($this->messages[$this->current]) && $result === true) {
        $result = $handlerCallback($this->messages[$this->current]);
        unset($this->messages[$this->current]);
        $this->current++;
    }
}

            
status() public method

public \Yiisoft\Queue\JobStatus status ( string|integer $id )
$id string|integer

                public function status(string|int $id): JobStatus
{
    $id = (int) $id;
    if ($id < 0) {
        throw new InvalidArgumentException('This adapter IDs start with 0.');
    }
    if ($id < $this->current) {
        return JobStatus::DONE;
    }
    if (isset($this->messages[$id])) {
        return JobStatus::WAITING;
    }
    throw new InvalidArgumentException('There is no message with the given ID.');
}

            
subscribe() public method

public void subscribe ( callable $handlerCallback )
$handlerCallback callable

                public function subscribe(callable $handlerCallback): void
{
    $this->runExisting($handlerCallback);
}

            
withChannel() public method

public self withChannel ( string|\BackedEnum $channel )
$channel string|\BackedEnum

                public function withChannel(string|BackedEnum $channel): self
{
    $channel = ChannelNormalizer::normalize($channel);
    if ($channel === $this->channel) {
        return $this;
    }
    $new = clone $this;
    $new->channel = $channel;
    $new->messages = [];
    return $new;
}