src/App/RabbitMq/Processing/ProcessingSubscriber.php line 31

Open in your IDE?
  1. <?php declare(strict_types=1);
  2. namespace App\RabbitMq\Processing;
  3. use App\Entity\AbstractDelayed;
  4. use DateTime;
  5. use Doctrine\ORM\EntityManagerInterface;
  6. use Doctrine\Persistence\ManagerRegistry;
  7. use InvalidArgumentException;
  8. use Psr\Log\LoggerInterface;
  9. use Symfony\Component\EventDispatcher\EventSubscriberInterface;
  10. class ProcessingSubscriber implements EventSubscriberInterface
  11. {
  12.     private ManagerRegistry $doctrine;
  13.     private LoggerInterface $logger;
  14.     public function __construct(ManagerRegistry $doctrineLoggerInterface $logger)
  15.     {
  16.         $this->doctrine $doctrine;
  17.         $this->logger $logger;
  18.     }
  19.     public function onPushProcess(PushProcessingEvent $event): void
  20.     {
  21.         $this->onProcess($eventtrue);
  22.     }
  23.     public function onPopProcess(PopProcessingEvent $event): void
  24.     {
  25.         $this->onProcess($eventfalse);
  26.     }
  27.     private function onProcess(ProcessingEvent $eventbool $isPush): void
  28.     {
  29.         $entity $event->getEntity();
  30.         if (!($entity instanceof AbstractDelayed)) {
  31.             return;
  32.         }
  33.         $this->logger->debug(($isPush 'Pushing ' 'Popping ') . ' task ' $event->getService() . ':' $event->getMethod());
  34.         $this->delayedTasks($entity$event->getEntityClass(), $isPush);
  35.     }
  36.     private function delayedTasks(AbstractDelayed $entitystring $entityClassbool $increment): void
  37.     {
  38.         if (null !== $entity->getId()) {
  39.             $delayedTasksInc = ($increment 0) * 1;
  40.             /** @var EntityManagerInterface $manager */
  41.             $manager $this->doctrine->getManager();
  42.             $qb $manager->createQueryBuilder();
  43.             $qb->update($entityClass'e')
  44.                 ->set('e.delayedTasks''e.delayedTasks + :delayedTasksProcess')
  45.                 ->where('e.id = :entity_id')
  46.                 ->setParameter('delayedTasksProcess'$delayedTasksInc)
  47.                 ->setParameter('entity_id'$entity->getId())
  48.                 ->getQuery()->execute();
  49.             // this value will not be saved to database, but will allow us to see if tasks were scheduled
  50.             $entity->incrementTasks($delayedTasksInc);
  51.             if (=== $entity->getDelayedTasks() + $entity->getTasksIncrement()) {
  52.                 $entity->setLastModified(new DateTime());
  53.             }
  54.         } else {
  55.             $increment
  56.                 $entity->addDelayedTask()
  57.                 : $entity->deductDelayedTask();
  58.         }
  59.     }
  60.     public function forceDelayedTasks(AbstractDelayed $entitystring $entityNamebool $incrementstring $iKnow): void
  61.     {
  62.         // this thing was designed to track how many RabbitMQ tasks are left to process in
  63.         // \App\RabbitMq\TasksProcessor so overriding delayedTasks need to be done with
  64.         // EXTREME CAUTION
  65.         if ($iKnow !== 'Yes, i am sure i want to do this') {
  66.             throw new InvalidArgumentException('You should not do this then');
  67.         }
  68.         $this->delayedTasks($entity$entityName$increment);
  69.     }
  70.     /**
  71.      * @return array<string, array<int|string, array<int|string, int|string>|int|string>|string>
  72.      */
  73.     public static function getSubscribedEvents(): array
  74.     {
  75.         return [
  76.             PushProcessingEvent::class => 'onPushProcess',
  77.             PopProcessingEvent::class => 'onPopProcess',
  78.         ];
  79.     }
  80. }