custom/plugins/SendcloudShipping/src/Service/Infrastructure/TaskQueueStorageService.php line 136

Open in your IDE?
  1. <?php
  2. namespace Sendcloud\Shipping\Service\Infrastructure;
  3. use Sendcloud\Shipping\Core\Infrastructure\Interfaces\Required\TaskQueueStorage;
  4. use Sendcloud\Shipping\Core\Infrastructure\Logger\Logger;
  5. use Sendcloud\Shipping\Core\Infrastructure\TaskExecution\Exceptions\QueueItemDeserializationException;
  6. use Sendcloud\Shipping\Core\Infrastructure\TaskExecution\Exceptions\QueueItemSaveException;
  7. use Sendcloud\Shipping\Core\Infrastructure\TaskExecution\QueueItem;
  8. use Sendcloud\Shipping\Entity\Queue\QueueEntity;
  9. use Sendcloud\Shipping\Entity\Queue\QueueEntityRepository;
  10. /**
  11.  * Class TaskQueueStorageService
  12.  *
  13.  * @package Sendcloud\Shipping\Service\Infrastructure
  14.  */
  15. class TaskQueueStorageService implements TaskQueueStorage
  16. {
  17.     /**
  18.      * @var QueueEntityRepository
  19.      */
  20.     private $queueEntityRepository;
  21.     /**
  22.      * TaskQueueStorageService constructor.
  23.      *
  24.      * @param QueueEntityRepository $queueEntityRepository
  25.      */
  26.     public function __construct(QueueEntityRepository $queueEntityRepository)
  27.     {
  28.         $this->queueEntityRepository $queueEntityRepository;
  29.     }
  30.     /**
  31.      * @param QueueItem $queueItem
  32.      * @param array $additionalWhere
  33.      *
  34.      * @return int|void
  35.      * @throws QueueItemSaveException
  36.      */
  37.     public function save(QueueItem $queueItem, array $additionalWhere = [])
  38.     {
  39.         try {
  40.             return $this->queueEntityRepository->save($queueItem->getId(), $this->toArray($queueItem), $additionalWhere);
  41.         } catch (\Exception $exception) {
  42.             throw new QueueItemSaveException('Failed to save queue item into database');
  43.         }
  44.     }
  45.     /**
  46.      * Finds queue item by ID.
  47.      *
  48.      * @param int $id ID of a queue item to find.
  49.      * @return QueueItem|null
  50.      *   Found queue item or null when queue item does not exist.
  51.      */
  52.     public function find($id): ?QueueItem
  53.     {
  54.         try {
  55.             $queueEntity $this->queueEntityRepository->getById($id);
  56.             return $queueEntity $this->fromDatabaseEntity($queueEntity) : null;
  57.         } catch (\Exception $exception) {
  58.             Logger::logError("Failed to fetch queue item by id from database: {$exception->getMessage()}");
  59.             return null;
  60.         }
  61.     }
  62.     /**
  63.      * Finds latest queue item by type across all queues
  64.      *
  65.      * @param string $type Type of a queue item to find
  66.      * @param string $context Task context restriction if provided search will be limited
  67.      *      to given task context. Leave empty for search across all task contexts
  68.      *
  69.      * @return QueueItem|null
  70.      */
  71.     public function findLatestByType($type$context ''): ?QueueItem
  72.     {
  73.         try {
  74.             $queueEntity $this->queueEntityRepository->findLatestByType($type);
  75.             return $queueEntity $this->fromDatabaseEntity($queueEntity) : null;
  76.         } catch (\Exception $exception) {
  77.             Logger::logError("Failed to fetch latest queue item by type from database: {$exception->getMessage()}");
  78.             return null;
  79.         }
  80.     }
  81.     /**
  82.      * Finds list of earliest queued queue items per queue.
  83.      *
  84.      * Following list of criteria for searching must be satisfied:
  85.      *  - Queue must be without already running queue items
  86.      *  - For one queue only one (oldest queued) item should be returned
  87.      *
  88.      * @param int $limit Result set limit. By default 10 earliest queue items will be returned.
  89.      *
  90.      * @return QueueItem[]
  91.      *   Found queue item list.
  92.      */
  93.     public function findOldestQueuedItems($limit 10): array
  94.     {
  95.         try {
  96.             $items = [];
  97.             $rawItems $this->queueEntityRepository->findOldestQueuedEntities($limit);
  98.             foreach ($rawItems as $rawItem) {
  99.                 $items[] = $this->fromArray($rawItem);
  100.             }
  101.             return  $items;
  102.         } catch (\Exception $exception) {
  103.             Logger::logError("Failed to find queueItems in database. Search parameters: {$exception->getMessage()}"'Integration');
  104.             return [];
  105.         }
  106.     }
  107.     /**
  108.      * Finds all queue items from all queues
  109.      *
  110.      * @param array $filterBy List of simple search filters, where key is queue item property and
  111.      *      value is condition value for that property. Leave empty for unfiltered result.
  112.      * @param array $sortBy List of sorting options where key is queue item property and value
  113.      *      sort direction ("ASC" or "DESC"). Leave empty for default sorting.
  114.      * @param int $start From which record index result set should start.
  115.      * @param int $limit Max number of records that should be returned (default is 10).
  116.      *
  117.      * @return QueueItem[]
  118.      *   Found queue item list
  119.      */
  120.     public function findAll(array $filterBy = [], array $sortBy = [], $start 0$limit 10): array
  121.     {
  122.         try {
  123.             $items = [];
  124.             $collection $this->queueEntityRepository->findAll($filterBy$sortBy$start$limit);
  125.             /** @var QueueEntity $queueEntity */
  126.             foreach ($collection as $queueEntity) {
  127.                 $items[] = $this->fromDatabaseEntity($queueEntity);
  128.             }
  129.             return  $items;
  130.         } catch (\Exception $exception) {
  131.             Logger::logError("Failed to find queueItems in database. Search parameters: {$exception->getMessage()}"'Integration');
  132.             return [];
  133.         }
  134.     }
  135.     /**
  136.      * Deletes queue items by provided type.
  137.      *
  138.      * @param string $type Type of a queue item to find.
  139.      * @param string $context Task context restriction if provided search will be limited to given task context. Leave
  140.      *                        empty for search across all task contexts.
  141.      *
  142.      * @return bool True on success, otherwise false.
  143.      */
  144.     public function deleteByType($type$context ''): bool
  145.     {
  146.         $deletedRaws 0;
  147.         try {
  148.             $deletedRaws $this->queueEntityRepository->deleteByType($type);
  149.         } catch (\Exception $exception) {
  150.             Logger::logError("Failed to delete completed queue items: {$exception->getMessage()}"'Integration');
  151.         }
  152.         return $deletedRaws 0;
  153.     }
  154.     public function deleteOldItemsBy(
  155.         \DateTime $timeBefore,
  156.         array $filterBy = array(),
  157.         array $excludeTypes = array(),
  158.         $limit 1000
  159.     ): int {
  160.         $deletedRaws 0;
  161.         try {
  162.             $deletedRaws $this->queueEntityRepository->deleteOldItemsBy($timeBefore$filterBy$excludeTypes$limit);
  163.         } catch (\Exception $exception) {
  164.             Logger::logError(
  165.                 "Failed to delete old queue items: {$exception->getMessage()}",
  166.                 'Integration',
  167.                 [
  168.                     'TimeBefore' => $timeBefore->format(DATE_ATOM),
  169.                     'FilterBy' => $filterBy,
  170.                     'ExcludeTypes' => $excludeTypes,
  171.                     'Limit' => $limit,
  172.                 ]
  173.             );
  174.         }
  175.         return $deletedRaws;
  176.     }
  177.     /**
  178.      * @param array $filterBy
  179.      * @return int
  180.      */
  181.     public function countAll(array $filterBy = []): int
  182.     {
  183.         try {
  184.             return $this->queueEntityRepository->countAll($filterBy);
  185.         } catch (\Exception $exception) {
  186.             return 0;
  187.         }
  188.     }
  189.     /**
  190.      * @param QueueItem $queueItem
  191.      *
  192.      * @return array
  193.      * @throws QueueItemDeserializationException
  194.      */
  195.     public function toArray(QueueItem $queueItem): array
  196.     {
  197.         return [
  198.             'status' =>  $queueItem->getStatus(),
  199.             'type' => $queueItem->getTaskType(),
  200.             'queueName' => $queueItem->getQueueName(),
  201.             'progress' => $queueItem->getProgressBasePoints(),
  202.             'lastExecutionProgress' => $queueItem->getLastExecutionProgressBasePoints(),
  203.             'retries' => $queueItem->getRetries(),
  204.             'failureDescription' => $queueItem->getFailureDescription(),
  205.             'serializedTask' => $queueItem->getSerializedTask(),
  206.             'createTimestamp' => $queueItem->getCreateTimestamp(),
  207.             'queueTimestamp' => $queueItem->getQueueTimestamp(),
  208.             'lastUpdateTimestamp' => $queueItem->getLastUpdateTimestamp(),
  209.             'startTimestamp' => $queueItem->getStartTimestamp(),
  210.             'finishTimestamp' => $queueItem->getFinishTimestamp(),
  211.             'failTimestamp' => $queueItem->getFailTimestamp(),
  212.         ];
  213.     }
  214.     /**
  215.      * Creates QueueItem entity from entity model
  216.      *
  217.      * @param QueueEntity $queueEntity
  218.      *
  219.      * @return QueueItem
  220.      */
  221.     private function fromDatabaseEntity(QueueEntity $queueEntity): QueueItem
  222.     {
  223.         $queueItem = new QueueItem();
  224.         $queueItem->setId($queueEntity->getId());
  225.         $queueItem->setStatus($queueEntity->get('status'));
  226.         $queueItem->setQueueName($queueEntity->get('queueName'));
  227.         $queueItem->setProgressBasePoints($queueEntity->get('progress'));
  228.         $queueItem->setLastExecutionProgressBasePoints($queueEntity->get('lastExecutionProgress'));
  229.         $queueItem->setRetries($queueEntity->get('retries'));
  230.         $queueItem->setFailureDescription($queueEntity->get('failureDescription'));
  231.         $queueItem->setSerializedTask($queueEntity->get('serializedTask'));
  232.         $queueItem->setCreateTimestamp($queueEntity->get('createTimestamp'));
  233.         $queueItem->setQueueTimestamp($queueEntity->get('queueTimestamp'));
  234.         $queueItem->setLastUpdateTimestamp($queueEntity->get('lastUpdateTimestamp'));
  235.         $queueItem->setStartTimestamp($queueEntity->get('startTimestamp'));
  236.         $queueItem->setFinishTimestamp($queueEntity->get('finishTimestamp'));
  237.         $queueItem->setFailTimestamp($queueEntity->get('failTimestamp'));
  238.         return $queueItem;
  239.     }
  240.     /**
  241.      * Creates QueueItem entity from entity model
  242.      *
  243.      * @param array $rawItem
  244.      *
  245.      * @return QueueItem
  246.      */
  247.     private function fromArray(array $rawItem): QueueItem
  248.     {
  249.         $queueItem = new QueueItem();
  250.         $queueItem->setId(bin2hex($rawItem['id']));
  251.         $queueItem->setStatus($rawItem['status']);
  252.         $queueItem->setQueueName($rawItem['queueName']);
  253.         $queueItem->setProgressBasePoints((int)$rawItem['progress']);
  254.         $queueItem->setLastExecutionProgressBasePoints((int)$rawItem['lastExecutionProgress']);
  255.         $queueItem->setRetries((int)$rawItem['retries']);
  256.         $queueItem->setFailureDescription($rawItem['failureDescription']);
  257.         $queueItem->setSerializedTask($rawItem['serializedTask']);
  258.         $queueItem->setCreateTimestamp((int)$rawItem['createTimestamp']);
  259.         $queueItem->setQueueTimestamp((int)$rawItem['queueTimestamp']);
  260.         $queueItem->setLastUpdateTimestamp((int)$rawItem['lastUpdateTimestamp']);
  261.         $queueItem->setStartTimestamp((int)$rawItem['startTimestamp']);
  262.         $queueItem->setFinishTimestamp((int)$rawItem['finishTimestamp']);
  263.         $queueItem->setFailTimestamp((int)$rawItem['failTimestamp']);
  264.         return $queueItem;
  265.     }
  266. }