custom/plugins/SendcloudShipping/src/Core/Infrastructure/TaskExecution/Queue.php line 290

Open in your IDE?
  1. <?php
  2. namespace Sendcloud\Shipping\Core\Infrastructure\TaskExecution;
  3. use Sendcloud\Shipping\Core\Infrastructure\Interfaces\Required\Configuration;
  4. use Sendcloud\Shipping\Core\Infrastructure\Interfaces\Required\TaskQueueStorage;
  5. use Sendcloud\Shipping\Core\Infrastructure\Interfaces\Exposed\TaskRunnerWakeup as TaskRunnerWakeupInterface;
  6. use Sendcloud\Shipping\Core\Infrastructure\ServiceRegister;
  7. use Sendcloud\Shipping\Core\Infrastructure\TaskExecution\Exceptions\QueueItemDeserializationException;
  8. use Sendcloud\Shipping\Core\Infrastructure\TaskExecution\Exceptions\QueueItemSaveException;
  9. use \Sendcloud\Shipping\Core\Infrastructure\TaskExecution\Exceptions\QueueStorageUnavailableException;
  10. use Sendcloud\Shipping\Core\Infrastructure\Utility\Exceptions\HttpAuthenticationException;
  11. use Sendcloud\Shipping\Core\Infrastructure\Utility\TimeProvider;
  12. /**
  13.  * Class Queue
  14.  * @package Sendcloud\Shipping\Core\Infrastructure\TaskExecution
  15.  */
  16. class Queue
  17. {
  18.     const CLASS_NAME __CLASS__;
  19.     /** Maximum failure retries count */
  20.     const MAX_RETRIES 5;
  21.     /** @var TaskQueueStorage */
  22.     private $storage;
  23.     /** @var TimeProvider */
  24.     private $timeProvider;
  25.     /** @var TaskRunnerWakeupInterface */
  26.     private $taskRunnerWakeup;
  27.     /** @var Configuration */
  28.     private $configService;
  29.     /**
  30.      * Enqueues queue item to a given queue and stores changes
  31.      *
  32.      * @param string $queueName Name of a queue where queue item should be queued
  33.      * @param Task $task Task to enqueue
  34.      * @param string $context Task execution context. If integration supports multiple accounts (middleware integration) context
  35.      * based on account id should be provided. Failing to do this will result in global task context and unpredictable task execution
  36.      *
  37.      * @return \Sendcloud\Shipping\Core\Infrastructure\TaskExecution\QueueItem Created queue item
  38.      * @throws \Sendcloud\Shipping\Core\Infrastructure\TaskExecution\Exceptions\QueueStorageUnavailableException
  39.      */
  40.     public function enqueue($queueNameTask $task$context '')
  41.     {
  42.         $queueItem = new QueueItem($task);
  43.         $queueItem->setStatus(QueueItem::QUEUED);
  44.         $queueItem->setQueueName($queueName);
  45.         $queueItem->setContext($context);
  46.         $queueItem->setQueueTimestamp($this->getTimeProvider()->getCurrentLocalTime()->getTimestamp());
  47.         try {
  48.             $this->save($queueItem);
  49.             $this->getTaskRunnerWakeup()->wakeup();
  50.         } catch (QueueItemSaveException $exception) {
  51.             throw new QueueStorageUnavailableException('Unable to enqueue task. Queue storage failed to save item.'0$exception);
  52.         }
  53.         return $queueItem;
  54.     }
  55.     /**
  56.      * Starts task execution, puts queue item in "in_progress" state and stores queue item changes
  57.      *
  58.      * @param QueueItem $queueItem Queue item to start
  59.      *
  60.      * @throws HttpAuthenticationException
  61.      * @throws QueueStorageUnavailableException
  62.      * @throws QueueItemDeserializationException
  63.      */
  64.     public function start(QueueItem $queueItem)
  65.     {
  66.         if ($queueItem->getStatus() !== QueueItem::QUEUED) {
  67.             $this->throwIllegalTransitionException($queueItem->getStatus(), QueueItem::IN_PROGRESS);
  68.         }
  69.         $lastUpdateTimestamp $queueItem->getLastUpdateTimestamp();
  70.         $queueItem->setStatus(QueueItem::IN_PROGRESS);
  71.         $queueItem->setStartTimestamp($this->getTimeProvider()->getCurrentLocalTime()->getTimestamp());
  72.         $queueItem->setLastUpdateTimestamp($queueItem->getStartTimestamp());
  73.         try {
  74.             $this->save(
  75.                 $queueItem,
  76.                 array('status' => QueueItem::QUEUED'lastUpdateTimestamp' => $lastUpdateTimestamp)
  77.             );
  78.             $queueItem->getTask()->execute();
  79.         } catch (HttpAuthenticationException $exception) {
  80.             throw $exception;
  81.         } catch (QueueItemSaveException $exception) {
  82.             throw new QueueStorageUnavailableException('Unable to start task. Queue storage failed to save item.'0$exception);
  83.         }
  84.     }
  85.     /**
  86.      * Puts queue item in finished status and stores changes
  87.      *
  88.      * @param QueueItem $queueItem Queue item to finish
  89.      *
  90.      * @throws \Sendcloud\Shipping\Core\Infrastructure\TaskExecution\Exceptions\QueueStorageUnavailableException
  91.      */
  92.     public function finish(QueueItem $queueItem)
  93.     {
  94.         if ($queueItem->getStatus() !== QueueItem::IN_PROGRESS) {
  95.             $this->throwIllegalTransitionException($queueItem->getStatus(), QueueItem::COMPLETED);
  96.         }
  97.         $queueItem->setStatus(QueueItem::COMPLETED);
  98.         $queueItem->setFinishTimestamp($this->getTimeProvider()->getCurrentLocalTime()->getTimestamp());
  99.         $queueItem->setProgressBasePoints(10000);
  100.         try {
  101.             $this->save(
  102.                 $queueItem,
  103.                 array('status' => QueueItem::IN_PROGRESS'lastUpdateTimestamp' => $queueItem->getLastUpdateTimestamp())
  104.             );
  105.         } catch (QueueItemSaveException $exception) {
  106.             throw new QueueStorageUnavailableException('Unable to finish task. Queue storage failed to save item.'0$exception);
  107.         }
  108.     }
  109.     /**
  110.      * Returns queue item back to queue and sets updates last execution progress to current progress value.
  111.      *
  112.      * @param QueueItem $queueItem Queue item to requeue
  113.      *
  114.      * @throws QueueStorageUnavailableException
  115.      */
  116.     public function requeue(QueueItem $queueItem)
  117.     {
  118.         if ($queueItem->getStatus() !== QueueItem::IN_PROGRESS) {
  119.             $this->throwIllegalTransitionException($queueItem->getStatus(), QueueItem::QUEUED);
  120.         }
  121.         $lastExecutionProgress $queueItem->getLastExecutionProgressBasePoints();
  122.         $queueItem->setStatus(QueueItem::QUEUED);
  123.         $queueItem->setStartTimestamp(null);
  124.         $queueItem->setLastExecutionProgressBasePoints($queueItem->getProgressBasePoints());
  125.         try {
  126.             $this->save(
  127.                 $queueItem,
  128.                 array(
  129.                     'status' => QueueItem::IN_PROGRESS,
  130.                     'lastExecutionProgress' => $lastExecutionProgress,
  131.                     'lastUpdateTimestamp' => $queueItem->getLastUpdateTimestamp()
  132.                 )
  133.             );
  134.         } catch (QueueItemSaveException $exception) {
  135.             throw new QueueStorageUnavailableException('Unable to requeue task. Queue storage failed to save item.'0$exception);
  136.         }
  137.     }
  138.     /**
  139.      * Returns queue item back to queue and increments retries count. When max retries count is reached puts item in failed status
  140.      *
  141.      * @param QueueItem $queueItem Queue item to fail
  142.      * @param string $failureDescription Verbal description of failure
  143.      *
  144.      * @throws \BadMethodCallException Queue item must be in "in_progress" status for fail method
  145.      * @throws QueueStorageUnavailableException
  146.      */
  147.     public function fail(QueueItem $queueItem$failureDescription)
  148.     {
  149.         if ($queueItem->getStatus() !== QueueItem::IN_PROGRESS) {
  150.             $this->throwIllegalTransitionException($queueItem->getStatus(), QueueItem::FAILED);
  151.         }
  152.         $lastExecutionProgress $queueItem->getLastExecutionProgressBasePoints();
  153.         $queueItem->setRetries($queueItem->getRetries() + 1);
  154.         $queueItem->setFailureDescription($failureDescription);
  155.         if ($queueItem->getRetries() > $this->getMaxRetries()) {
  156.             $queueItem->setStatus(QueueItem::FAILED);
  157.             $queueItem->setFailTimestamp($this->getTimeProvider()->getCurrentLocalTime()->getTimestamp());
  158.         } else {
  159.             $queueItem->setStatus(QueueItem::QUEUED);
  160.             $queueItem->setStartTimestamp(null);
  161.         }
  162.         try {
  163.             $this->save(
  164.                 $queueItem,
  165.                 array(
  166.                     'status' => QueueItem::IN_PROGRESS,
  167.                     'lastExecutionProgress' => $lastExecutionProgress,
  168.                     'lastUpdateTimestamp' => $queueItem->getLastUpdateTimestamp()
  169.                 )
  170.             );
  171.         } catch (QueueItemSaveException $exception) {
  172.             throw new QueueStorageUnavailableException('Unable to fail task. Queue storage failed to save item.'0$exception);
  173.         }
  174.     }
  175.     /**
  176.      * Updates queue item progress
  177.      *
  178.      * @param QueueItem $queueItem
  179.      * @param int $progress
  180.      *
  181.      * @throws QueueStorageUnavailableException
  182.      */
  183.     public function updateProgress(QueueItem $queueItem$progress)
  184.     {
  185.         if ($queueItem->getStatus() !== QueueItem::IN_PROGRESS) {
  186.             throw new \BadMethodCallException('Progress reported for not started queue item.');
  187.         }
  188.         $lastExecutionProgress $queueItem->getLastExecutionProgressBasePoints();
  189.         $lastUpdateTimestamp $queueItem->getLastUpdateTimestamp();
  190.         $queueItem->setProgressBasePoints($progress);
  191.         $queueItem->setLastUpdateTimestamp($this->getTimeProvider()->getCurrentLocalTime()->getTimestamp());
  192.         try {
  193.             $this->save($queueItem, array(
  194.                 'status' => QueueItem::IN_PROGRESS,
  195.                 'lastExecutionProgress' => $lastExecutionProgress,
  196.                 'lastUpdateTimestamp' => $lastUpdateTimestamp
  197.             ));
  198.         } catch (QueueItemSaveException $exception) {
  199.             throw new QueueStorageUnavailableException('Unable to update task progress. Queue storage failed to save item.'0$exception);
  200.         }
  201.     }
  202.     /**
  203.      * Keeps passed queue item alive by setting last update timestamp
  204.      *
  205.      * @param QueueItem $queueItem
  206.      *
  207.      * @throws QueueStorageUnavailableException
  208.      */
  209.     public function keepAlive(QueueItem $queueItem)
  210.     {
  211.         $lastExecutionProgress $queueItem->getLastExecutionProgressBasePoints();
  212.         $lastUpdateTimestamp $queueItem->getLastUpdateTimestamp();
  213.         $queueItem->setLastUpdateTimestamp($this->getTimeProvider()->getCurrentLocalTime()->getTimestamp());
  214.         try {
  215.             $this->save($queueItem, array(
  216.                 'status' => QueueItem::IN_PROGRESS,
  217.                 'lastExecutionProgress' => $lastExecutionProgress,
  218.                 'lastUpdateTimestamp' => $lastUpdateTimestamp
  219.             ));
  220.         } catch (QueueItemSaveException $exception) {
  221.             throw new QueueStorageUnavailableException('Unable to keep task alive. Queue storage failed to save item.'0$exception);
  222.         }
  223.     }
  224.     /**
  225.      * Finds queue item by id
  226.      *
  227.      * @param int $id Id of a queue item to find
  228.      *
  229.      * @return QueueItem|null Found queue item or null when queue item does not exist
  230.      */
  231.     public function find($id)
  232.     {
  233.         return $this->getStorage()->find($id);
  234.     }
  235.     /**
  236.      * Finds latest queue item by type
  237.      *
  238.      * @param string $type Type of a queue item to find
  239.      * @param string $context Task scope restriction, default is global scope
  240.      *
  241.      * @return QueueItem|null Found queue item or null when queue item does not exist
  242.      */
  243.     public function findLatestByType($type$context '')
  244.     {
  245.         return $this->getStorage()->findLatestByType($type$context);
  246.     }
  247.     /**
  248.      * Finds queue items with status "in_progress"
  249.      *
  250.      * @return QueueItem[] Running queue items
  251.      */
  252.     public function findRunningItems()
  253.     {
  254.         return $this->getStorage()->findAll(array('status' => QueueItem::IN_PROGRESS));
  255.     }
  256.     /**
  257.      * Finds list of earliest queued queue items per queue. Only queues that doesn't have running tasks are taken in consideration.
  258.      *
  259.      * @param int $limit Result set limit. By default max 10 earliest queue items will be returned
  260.      *
  261.      * @return \Sendcloud\Shipping\Core\Infrastructure\TaskExecution\QueueItem[] Found queue item list
  262.      */
  263.     public function findOldestQueuedItems($limit 10)
  264.     {
  265.         return $this->getStorage()->findOldestQueuedItems($limit);
  266.     }
  267.     /**
  268.      * Creates or updates given queue item using storage service. If queue item id is not set, new queue item will be created
  269.      * otherwise update will be performed.
  270.      *
  271.      * @param QueueItem $queueItem Item to save
  272.      * @param array $additionalWhere List of key/value pairs to set in where clause when saving queue item
  273.      *
  274.      * @return int Id of saved queue item
  275.      * @throws QueueItemSaveException if save fails
  276.      */
  277.     private function save(QueueItem $queueItem, array $additionalWhere = array())
  278.     {
  279.         $id $this->getStorage()->save($queueItem$additionalWhere);
  280.         $queueItem->setId($id);
  281.         return $id;
  282.     }
  283.     /**
  284.      * @return TaskQueueStorage
  285.      */
  286.     private function getStorage()
  287.     {
  288.         if (empty($this->storage)) {
  289.             $this->storage ServiceRegister::getService(TaskQueueStorage::CLASS_NAME);
  290.         }
  291.         return $this->storage;
  292.     }
  293.     /**
  294.      * @return TimeProvider
  295.      */
  296.     private function getTimeProvider()
  297.     {
  298.         if (empty($this->timeProvider)) {
  299.             $this->timeProvider ServiceRegister::getService(TimeProvider::CLASS_NAME);
  300.         }
  301.         return $this->timeProvider;
  302.     }
  303.     /**
  304.      * @return TaskRunnerWakeupInterface
  305.      */
  306.     private function getTaskRunnerWakeup()
  307.     {
  308.         if (empty($this->taskRunnerWakeup)) {
  309.             $this->taskRunnerWakeup ServiceRegister::getService(TaskRunnerWakeupInterface::CLASS_NAME);
  310.         }
  311.         return $this->taskRunnerWakeup;
  312.     }
  313.     /**
  314.      * @return Configuration
  315.      */
  316.     private function getConfigService()
  317.     {
  318.         if (empty($this->configService)) {
  319.             $this->configService ServiceRegister::getService(Configuration::CLASS_NAME);
  320.         }
  321.         return $this->configService;
  322.     }
  323.     /**
  324.      * Prepares exception message and throws exception
  325.      *
  326.      * @param string $fromStatus
  327.      * @param string $toStatus
  328.      * @throws \BadMethodCallException
  329.      */
  330.     private function throwIllegalTransitionException($fromStatus$toStatus)
  331.     {
  332.         throw new \BadMethodCallException(sprintf(
  333.             'Illegal queue item state transition from "%s" to "%s"',
  334.             $fromStatus,
  335.             $toStatus
  336.         ));
  337.     }
  338.     /**
  339.      * Returns maximum number of retries
  340.      *
  341.      * @return int
  342.      */
  343.     private function getMaxRetries()
  344.     {
  345.         $configurationValue $this->getConfigService()->getMaxTaskExecutionRetries();
  346.         return !is_null($configurationValue) ? $configurationValue self::MAX_RETRIES;
  347.     }
  348. }