src/App/RabbitMq/TasksPool.php line 57

Open in your IDE?
  1. <?php declare(strict_types=1);
  2. namespace App\RabbitMq;
  3. use App\RabbitMq\Processing\PushProcessingEvent;
  4. use \DomainException;
  5. use Sindrive\RabbitMqTaskBundle\Service\TaskHandler;
  6. use Symfony\Component\EventDispatcher\EventSubscriberInterface;
  7. use const \SORT_NUMERIC;
  8. class TasksPool implements EventSubscriberInterface
  9. {
  10.     /**
  11.      * @var mixed[][][][][]
  12.      */
  13.     private array $tasks = [];
  14.     private bool $sent false;
  15.     private TaskHandler $rabbitmqProducer;
  16.     public function __construct(TaskHandler $producer)
  17.     {
  18.         $this->rabbitmqProducer $producer;
  19.     }
  20.     /**
  21.      * @param mixed[] $arguments
  22.      */
  23.     public function addTask(
  24.         string $service,
  25.         string $method,
  26.         string $entityClass,
  27.         int $entityId,
  28.         array $arguments = [],
  29.         bool $isImageHandler false,
  30.         int $priority 255
  31.     ): void
  32.     {
  33.         if ($entityId 1) {
  34.             throw new DomainException('Entity without an id is not supported');
  35.             // how do you want to handle null/string/negative IDs in RMQ?
  36.         }
  37.         $this->sent false;
  38.         $this->tasks[$priority][$entityClass][$entityId][] = [
  39.             'service' => $service,
  40.             'method' => $method,
  41.             'arguments' => $arguments,
  42.             'imageHandler' => $isImageHandler,
  43.         ];
  44.     }
  45.     public function onProcess(PushProcessingEvent $eventstring $eventName): void
  46.     {
  47.         $this->addTask(
  48.             $event->getService(),
  49.             $event->getMethod(),
  50.             $event->getEntityClass(),
  51.             $event->getEntityId(),
  52.             $event->getArguments(),
  53.             $event->isImageHandler(),
  54.             $event->getPriority()
  55.         );
  56.     }
  57.     public function send(): void
  58.     {
  59.         if (count($this->tasks) > 0) {
  60.             $tasks $this->tasks;
  61.             krsort($tasksSORT_NUMERIC);
  62.             // need to clean up tasks before sending out a RabbitMQ task, because it can call this method and we will
  63.             // pile up a queue of duplicate tasks
  64.             $this->tasks = [];
  65.             $this->rabbitmqProducer->sendTask(TasksProcessor::QUEUEserialize($tasks));
  66.         }
  67.         $this->sent true;
  68.     }
  69.     public function isSent(): bool
  70.     {
  71.         return $this->sent;
  72.     }
  73.     /**
  74.      * @return array<string, array<int|string, array<int|string, int|string>|int|string>|string>
  75.      */
  76.     public static function getSubscribedEvents(): array
  77.     {
  78.         return [
  79.             PushProcessingEvent::class => 'onProcess',
  80.         ];
  81.     }
  82. }