<?php declare(strict_types=1);
namespace App\RabbitMq\Processing;
use App\Entity\AbstractDelayed;
use DateTime;
use Doctrine\ORM\EntityManagerInterface;
use Doctrine\Persistence\ManagerRegistry;
use InvalidArgumentException;
use Psr\Log\LoggerInterface;
use Symfony\Component\EventDispatcher\EventSubscriberInterface;
class ProcessingSubscriber implements EventSubscriberInterface
{
private ManagerRegistry $doctrine;
private LoggerInterface $logger;
public function __construct(ManagerRegistry $doctrine, LoggerInterface $logger)
{
$this->doctrine = $doctrine;
$this->logger = $logger;
}
public function onPushProcess(PushProcessingEvent $event): void
{
$this->onProcess($event, true);
}
public function onPopProcess(PopProcessingEvent $event): void
{
$this->onProcess($event, false);
}
private function onProcess(ProcessingEvent $event, bool $isPush): void
{
$entity = $event->getEntity();
if (!($entity instanceof AbstractDelayed)) {
return;
}
$this->logger->debug(($isPush ? 'Pushing ' : 'Popping ') . ' task ' . $event->getService() . ':' . $event->getMethod());
$this->delayedTasks($entity, $event->getEntityClass(), $isPush);
}
private function delayedTasks(AbstractDelayed $entity, string $entityClass, bool $increment): void
{
if (null !== $entity->getId()) {
$delayedTasksInc = ($increment ? 1 : 0) * 2 - 1;
/** @var EntityManagerInterface $manager */
$manager = $this->doctrine->getManager();
$qb = $manager->createQueryBuilder();
$qb->update($entityClass, 'e')
->set('e.delayedTasks', 'e.delayedTasks + :delayedTasksProcess')
->where('e.id = :entity_id')
->setParameter('delayedTasksProcess', $delayedTasksInc)
->setParameter('entity_id', $entity->getId())
->getQuery()->execute();
// this value will not be saved to database, but will allow us to see if tasks were scheduled
$entity->incrementTasks($delayedTasksInc);
if (0 === $entity->getDelayedTasks() + $entity->getTasksIncrement()) {
$entity->setLastModified(new DateTime());
}
} else {
$increment
? $entity->addDelayedTask()
: $entity->deductDelayedTask();
}
}
public function forceDelayedTasks(AbstractDelayed $entity, string $entityName, bool $increment, string $iKnow): void
{
// this thing was designed to track how many RabbitMQ tasks are left to process in
// \App\RabbitMq\TasksProcessor so overriding delayedTasks need to be done with
// EXTREME CAUTION
if ($iKnow !== 'Yes, i am sure i want to do this') {
throw new InvalidArgumentException('You should not do this then');
}
$this->delayedTasks($entity, $entityName, $increment);
}
/**
* @return array<string, array<int|string, array<int|string, int|string>|int|string>|string>
*/
public static function getSubscribedEvents(): array
{
return [
PushProcessingEvent::class => 'onPushProcess',
PopProcessingEvent::class => 'onPopProcess',
];
}
}