custom/plugins/SendcloudShipping/src/Entity/Queue/QueueEntityRepository.php line 203

Open in your IDE?
  1. <?php
  2. namespace Sendcloud\Shipping\Entity\Queue;
  3. use Doctrine\DBAL\Connection;
  4. use Doctrine\DBAL\DBALException;
  5. use Doctrine\DBAL\Exception\InvalidArgumentException;
  6. use Sendcloud\Shipping\Core\Infrastructure\TaskExecution\QueueItem;
  7. use Sendcloud\Shipping\Migration\Migration1572012872CreateQueuesTable;
  8. use Shopware\Core\Framework\Context;
  9. use Shopware\Core\Framework\DataAbstractionLayer\EntityCollection;
  10. use Shopware\Core\Framework\DataAbstractionLayer\EntityRepositoryInterface;
  11. use Shopware\Core\Framework\DataAbstractionLayer\Exception\InconsistentCriteriaIdsException;
  12. use Shopware\Core\Framework\DataAbstractionLayer\Search\Criteria;
  13. use Shopware\Core\Framework\DataAbstractionLayer\Search\Filter\EqualsAnyFilter;
  14. use Shopware\Core\Framework\DataAbstractionLayer\Search\Filter\EqualsFilter;
  15. use Shopware\Core\Framework\DataAbstractionLayer\Search\Filter\NotFilter;
  16. use Shopware\Core\Framework\DataAbstractionLayer\Search\Filter\RangeFilter;
  17. use Shopware\Core\Framework\DataAbstractionLayer\Search\Sorting\FieldSorting;
  18. /**
  19.  * Class QueueEntityRepository
  20.  *
  21.  * @package Sendcloud\Shipping\Entity\Queue
  22.  */
  23. class QueueEntityRepository
  24. {
  25.     /**
  26.      * @var EntityRepositoryInterface
  27.      */
  28.     private $baseRepository;
  29.     /**
  30.      * @var Connection
  31.      */
  32.     private $connection;
  33.     /**
  34.      * @var string
  35.      */
  36.     private $tableName;
  37.     /**
  38.      * QueueEntityRepository constructor.
  39.      *
  40.      * @param EntityRepositoryInterface $baseRepository
  41.      * @param Connection $connection
  42.      */
  43.     public function __construct(EntityRepositoryInterface $baseRepositoryConnection $connection)
  44.     {
  45.         $this->baseRepository $baseRepository;
  46.         $this->connection $connection;
  47.         $this->tableName Migration1572012872CreateQueuesTable::QUEUES_TABLE;
  48.     }
  49.     /**
  50.      * Removes queue item by type
  51.      *
  52.      * @param string $type
  53.      *
  54.      * @return int
  55.      * @throws DBALException
  56.      * @throws InvalidArgumentException
  57.      */
  58.     public function deleteByType(string $type): int
  59.     {
  60.         return $this->connection->delete($this->tableName, ['`type`' => $type]);
  61.     }
  62.     public function deleteOldItemsBy(
  63.         \DateTime $timeBefore,
  64.         array $filterBy = array(),
  65.         array $excludeTypes = array(),
  66.         $limit 1000
  67.     ): int {
  68.         $criteria $this->buildCriteria(null$filterBy, ['internalId' => FieldSorting::ASCENDING], $limit);
  69.         $criteria->addFilter(
  70.             new RangeFilter('createTimestamp', [
  71.                 RangeFilter::LTE => $timeBefore->getTimestamp()
  72.             ])
  73.         );
  74.         if (!empty($excludeTypes)) {
  75.             $criteria->addFilter(
  76.                 new NotFilter(NotFilter::CONNECTION_AND, [
  77.                     new EqualsAnyFilter('type'$excludeTypes),
  78.                 ])
  79.             );
  80.         }
  81.         $context Context::createDefaultContext();
  82.         $oldItems $this->baseRepository->searchIds($criteria$context);
  83.         if ($oldItems->getTotal() === 0) {
  84.             return 0;
  85.         }
  86.         $keys array_map(function ($id) {
  87.             return ['id' => $id];
  88.         }, $oldItems->getIds());
  89.         $this->baseRepository->delete($keys$context);
  90.         return $oldItems->getTotal();
  91.     }
  92.     /**
  93.      * Creates or update queue item
  94.      *
  95.      * @param string|null $id
  96.      * @param array $data
  97.      * @param array $additionalConditions
  98.      *
  99.      * @return string|null
  100.      * @throws InconsistentCriteriaIdsException
  101.      */
  102.     public function save(?string $id, array $data, array $additionalConditions): ?string
  103.     {
  104.         $context Context::createDefaultContext();
  105.         /** @var QueueEntity $queueEntity */
  106.         if ($id) {
  107.             $queueEntity $this->baseRepository->search($this->buildCriteria($id$additionalConditions), $context)->first();
  108.             if ($queueEntity) {
  109.                 $updateData array_merge(['id' => $queueEntity->getId()], $data);
  110.                 $this->baseRepository->update([$updateData], $context);
  111.                 return $queueEntity->getId();
  112.             }
  113.         }
  114.         $event $this->baseRepository->create([$data], $context)->getEventByEntityName(QueueEntity::class);
  115.         return $event $event->getIds()[0] : null;
  116.     }
  117.     /**
  118.      * Returns QueueEntity by its id
  119.      *
  120.      * @param string $id
  121.      *
  122.      * @return QueueEntity|null
  123.      * @throws InconsistentCriteriaIdsException
  124.      */
  125.     public function getById(string $id): ?QueueEntity
  126.     {
  127.         return $this->baseRepository->search(new Criteria([$id]), Context::createDefaultContext())->first();
  128.     }
  129.     /**
  130.      * Returns queue item with latest queueTimestamp
  131.      *
  132.      * @param string $type
  133.      *
  134.      * @return QueueEntity|null
  135.      * @throws InconsistentCriteriaIdsException
  136.      */
  137.     public function findLatestByType(string $type): ?QueueEntity
  138.     {
  139.         $filter = ['type' => $type];
  140.         $sortBy = ['queueTimestamp' => FieldSorting::DESCENDING];
  141.         return $this->baseRepository->search($this->buildCriteria(null$filter$sortBy), Context::createDefaultContext())->first();
  142.     }
  143.     /**
  144.      * Finds list of earliest queued queue items per queue.
  145.      *
  146.      * @param int $limit
  147.      *
  148.      * @return mixed[]
  149.      * @throws DBALException
  150.      */
  151.     public function findOldestQueuedEntities(int $limit 10): array
  152.     {
  153.         $runningQueuesQuery "SELECT DISTINCT queueName 
  154.                                FROM {$this->tableName} qi2 
  155.                                WHERE qi2.status='" QueueItem::IN_PROGRESS "'";
  156.         $query "SELECT * 
  157.                   FROM (
  158.                     SELECT queueName, min(internalId) AS internalId
  159.                     FROM {$this->tableName} AS t
  160.                     WHERE t.status='" QueueItem::QUEUED "' AND t.queueName NOT IN ({$runningQueuesQuery})
  161.                     GROUP BY queueName LIMIT {$limit}
  162.                   ) AS queueView
  163.                   INNER JOIN {$this->tableName} AS qi ON queueView.queueName=qi.queueName and queueView.internalId=qi.internalId";
  164.         return $this->connection->executeQuery($query)->fetchAll();
  165.     }
  166.     /**
  167.      * Returns all queue items which satisfy given condition
  168.      *
  169.      * @param array $filterBy
  170.      * @param array $sortBy
  171.      * @param int $start
  172.      * @param int $limit
  173.      *
  174.      * @return EntityCollection
  175.      * @throws InconsistentCriteriaIdsException
  176.      */
  177.     public function findAll(array $filterBy = [], array $sortBy = [], $start 0$limit 10): EntityCollection
  178.     {
  179.         return $this->baseRepository
  180.             ->search($this->buildCriteria(null$filterBy$sortBy$limit$start), Context::createDefaultContext())
  181.             ->getEntities();
  182.     }
  183.     /**
  184.      * Count all queue items which satisfy given condition
  185.      *
  186.      * @param array $filterBy
  187.      *
  188.      * @return int
  189.      */
  190.     public function countAll(array $filterBy = []): int
  191.     {
  192.         $criteria = new Criteria([]);
  193.         foreach ($filterBy as $key => $value) {
  194.             $criteria->addFilter(new EqualsFilter($key$value));
  195.         }
  196.         return $this->baseRepository
  197.             ->search($criteriaContext::createDefaultContext())
  198.             ->count();
  199.     }
  200.     /**
  201.      * Creates search criteria
  202.      *
  203.      * @param string|null $id
  204.      * @param array $additionalConditions
  205.      *
  206.      * @param array $sorting
  207.      * @param int $limit
  208.      * @param int $offset
  209.      *
  210.      * @return Criteria
  211.      * @throws InconsistentCriteriaIdsException
  212.      */
  213.     private function buildCriteria(
  214.         ?string $id,
  215.         array $additionalConditions,
  216.         array $sorting = [],
  217.         int $limit 50,
  218.         int $offset 0
  219.     ): Criteria
  220.     {
  221.         $ids $id ? [$id] : [];
  222.         $criteria = new Criteria($ids);
  223.         foreach ($additionalConditions as $key => $value) {
  224.             $criteria->addFilter(new EqualsFilter($key$value));
  225.         }
  226.         foreach ($sorting as $field => $direction) {
  227.             $criteria->addSorting(new FieldSorting($field$direction));
  228.         }
  229.         $criteria->setLimit($limit);
  230.         $criteria->setOffset($offset);
  231.         return $criteria;
  232.     }
  233. }