public function processPush(PushRequest $request, MessageHandlerPushInterface $handler): PushRequest
{
$adapter = $request->getAdapter();
if (!$adapter instanceof Adapter) {
$type = get_debug_type($adapter);
$class = Adapter::class;
throw new InvalidArgumentException(
"This middleware works only with the $class. $type given."
);
}
if ($adapter->getQueueProvider()->getExchangeSettings() === null) {
throw new InvalidArgumentException('Message cannot be delayed to a queue without an exchange. Exchange is mandatory.');
}
$queueProvider = $adapter->getQueueProvider();
$exchangeSettings = $this->getExchangeSettings($queueProvider->getExchangeSettings());
$queueSettings = $this->getQueueSettings($queueProvider->getQueueSettings(), $queueProvider->getExchangeSettings());
$adapter = $adapter->withQueueProvider(
$queueProvider
->withMessageProperties($this->getMessageProperties($queueProvider))
->withExchangeSettings($exchangeSettings)
->withQueueSettings($queueSettings)
);
return $handler->handlePush($request->withAdapter($adapter));
}
Signup or Login in order to comment.