<?php declare(strict_types=1);
namespace App\RabbitMq;
use App\RabbitMq\Processing\PushProcessingEvent;
use \DomainException;
use Sindrive\RabbitMqTaskBundle\Service\TaskHandler;
use Symfony\Component\EventDispatcher\EventSubscriberInterface;
use const \SORT_NUMERIC;
class TasksPool implements EventSubscriberInterface
{
/**
* @var mixed[][][][][]
*/
private array $tasks = [];
private bool $sent = false;
private TaskHandler $rabbitmqProducer;
public function __construct(TaskHandler $producer)
{
$this->rabbitmqProducer = $producer;
}
/**
* @param mixed[] $arguments
*/
public function addTask(
string $service,
string $method,
string $entityClass,
int $entityId,
array $arguments = [],
bool $isImageHandler = false,
int $priority = 255
): void
{
if ($entityId < 1) {
throw new DomainException('Entity without an id is not supported');
// how do you want to handle null/string/negative IDs in RMQ?
}
$this->sent = false;
$this->tasks[$priority][$entityClass][$entityId][] = [
'service' => $service,
'method' => $method,
'arguments' => $arguments,
'imageHandler' => $isImageHandler,
];
}
public function onProcess(PushProcessingEvent $event, string $eventName): void
{
$this->addTask(
$event->getService(),
$event->getMethod(),
$event->getEntityClass(),
$event->getEntityId(),
$event->getArguments(),
$event->isImageHandler(),
$event->getPriority()
);
}
public function send(): void
{
if (count($this->tasks) > 0) {
$tasks = $this->tasks;
krsort($tasks, SORT_NUMERIC);
// need to clean up tasks before sending out a RabbitMQ task, because it can call this method and we will
// pile up a queue of duplicate tasks
$this->tasks = [];
$this->rabbitmqProducer->sendTask(TasksProcessor::QUEUE, serialize($tasks));
}
$this->sent = true;
}
public function isSent(): bool
{
return $this->sent;
}
/**
* @return array<string, array<int|string, array<int|string, int|string>|int|string>|string>
*/
public static function getSubscribedEvents(): array
{
return [
PushProcessingEvent::class => 'onProcess',
];
}
}