custom/plugins/SendcloudShipping/src/Core/Infrastructure/TaskExecution/TaskRunner.php line 97

Open in your IDE?
  1. <?php
  2. namespace Sendcloud\Shipping\Core\Infrastructure\TaskExecution;
  3. use Sendcloud\Shipping\Core\Infrastructure\Interfaces\Required\AsyncProcessStarter;
  4. use Sendcloud\Shipping\Core\Infrastructure\Interfaces\Required\Configuration;
  5. use Sendcloud\Shipping\Core\Infrastructure\Interfaces\Exposed\TaskRunnerStatusStorage as TaskRunnerStatusStorageInterface;
  6. use Sendcloud\Shipping\Core\Infrastructure\Interfaces\Exposed\TaskRunnerWakeup as TaskRunnerWakeupInterface;
  7. use Sendcloud\Shipping\Core\Infrastructure\Logger\Logger;
  8. use Sendcloud\Shipping\Core\Infrastructure\ServiceRegister;
  9. use Sendcloud\Shipping\Core\Infrastructure\TaskExecution\Exceptions\ProcessStarterSaveException;
  10. use Sendcloud\Shipping\Core\Infrastructure\TaskExecution\Exceptions\QueueItemDeserializationException;
  11. use Sendcloud\Shipping\Core\Infrastructure\TaskExecution\Exceptions\QueueStorageUnavailableException;
  12. use Sendcloud\Shipping\Core\Infrastructure\TaskExecution\Exceptions\TaskRunnerStatusChangeException;
  13. use Sendcloud\Shipping\Core\Infrastructure\TaskExecution\Exceptions\TaskRunnerStatusStorageUnavailableException;
  14. use Sendcloud\Shipping\Core\Infrastructure\Utility\TimeProvider;
  15. /**
  16.  * Class TaskRunner
  17.  * @package Sendcloud\Shipping\Core\Infrastructure\TaskExecution
  18.  */
  19. class TaskRunner
  20. {
  21.     const CLASS_NAME __CLASS__;
  22.     /** Automatic task runner wakeup delay in seconds */
  23.     const WAKEUP_DELAY 5;
  24.     /** @var string Runner guid */
  25.     protected $guid;
  26.     /** @var AsyncProcessStarter */
  27.     private $asyncProcessStarter;
  28.     /** @var Queue */
  29.     private $queue;
  30.     /** @var TaskRunnerStatusStorageInterface */
  31.     private $runnerStorage;
  32.     /** @var Configuration */
  33.     private $configurationService;
  34.     /** @var TimeProvider */
  35.     private $timeProvider;
  36.     /** @var TaskRunnerWakeupInterface */
  37.     private $taskWakeup;
  38.     /**
  39.      * Sets task runner guid
  40.      *
  41.      * @param string $guid Runner guid to set
  42.      */
  43.     public function setGuid($guid)
  44.     {
  45.         $this->guid $guid;
  46.     }
  47.     /**
  48.      * Starts task runner lifecycle
  49.      */
  50.     public function run()
  51.     {
  52.         try {
  53.             $this->logDebug(array('Message' => 'Task runner: lifecycle started.'));
  54.             if ($this->isCurrentRunnerAlive()) {
  55.                 $this->failOrRequeueExpiredTasks();
  56.                 $this->startOldestQueuedItems();
  57.             }
  58.             $this->wakeup();
  59.             $this->logDebug(array('Message' => 'Task runner: lifecycle ended.'));
  60.         } catch (\Exception $ex) {
  61.             $this->logWarning(array(
  62.                 'Message' => 'Fail to run task runner. Unexpected error occurred.',
  63.                 'ExceptionMessage' => $ex->getMessage(),
  64.                 'ExceptionTrace' => $ex->getTraceAsString()
  65.             ));
  66.         }
  67.     }
  68.     /**
  69.      * Fails or re-queues expired tasks
  70.      *
  71.      * @return void
  72.      * @throws QueueItemDeserializationException
  73.      * @throws QueueStorageUnavailableException
  74.      * @throws TaskRunnerStatusStorageUnavailableException
  75.      */
  76.     private function failOrRequeueExpiredTasks()
  77.     {
  78.         $this->logDebug(array('Message' => 'Task runner: expired tasks cleanup started.'));
  79.         $runningItems $this->getQueue()->findRunningItems();
  80.         if (!$this->isCurrentRunnerAlive()) {
  81.             return;
  82.         }
  83.         foreach ($runningItems as $runningItem) {
  84.             if ($this->isItemExpired($runningItem) && $this->isCurrentRunnerAlive()) {
  85.                 $this->logMessageFor($runningItem'Task runner: Expired task detected.');
  86.                 $this->getConfigurationService()->setContext($runningItem->getContext());
  87.                 if ($runningItem->getProgressBasePoints() > $runningItem->getLastExecutionProgressBasePoints()) {
  88.                     $this->logMessageFor($runningItem'Task runner: Task requeue for execution continuation.');
  89.                     $this->getQueue()->requeue($runningItem);
  90.                 } else {
  91.                     $runningItem->reconfigureTask();
  92.                     $this->getQueue()->fail(
  93.                         $runningItem,
  94.                         sprintf('Task %s failed due to extended inactivity period.'$this->getItemDescription($runningItem))
  95.                     );
  96.                 }
  97.             }
  98.         }
  99.     }
  100.     /**
  101.      * Starts oldest queue item from all queues respecting following list of criteria:
  102.      *      - Queue must be without already running queue items
  103.      *      - For one queue only one (oldest queued) item should be started
  104.      *      - Number of running tasks must NOT be greater than maximal allowed by integration configuration
  105.      *
  106.      * @return void
  107.      * @throws ProcessStarterSaveException
  108.      * @throws TaskRunnerStatusStorageUnavailableException
  109.      * @throws QueueItemDeserializationException
  110.      */
  111.     private function startOldestQueuedItems()
  112.     {
  113.         $this->logDebug(array('Message' => 'Task runner: available task detection started.'));
  114.         // Calculate how many queue items can be started
  115.         $maxRunningTasks $this->getConfigurationService()->getMaxStartedTasksLimit();
  116.         $alreadyRunningItems $this->getQueue()->findRunningItems();
  117.         $numberOfAvailableSlotsForTaskRunning $maxRunningTasks count($alreadyRunningItems);
  118.         if ($numberOfAvailableSlotsForTaskRunning <= 0) {
  119.             $this->logDebug(array('Message' => 'Task runner: max number of active tasks reached.'));
  120.             return;
  121.         }
  122.         $items $this->getQueue()->findOldestQueuedItems($numberOfAvailableSlotsForTaskRunning);
  123.         if (!$this->isCurrentRunnerAlive()) {
  124.             return;
  125.         }
  126.         foreach ($items as $item) {
  127.             if (!$this->isCurrentRunnerAlive()) {
  128.                 return;
  129.             }
  130.             $this->logMessageFor($item'Task runner: Starting async task execution.');
  131.             $this->getAsyncProcessStarter()->start(new QueueItemStarter($item->getId()));
  132.         }
  133.     }
  134.     /**
  135.      * @throws TaskRunnerStatusChangeException
  136.      * @throws TaskRunnerStatusStorageUnavailableException
  137.      */
  138.     private function wakeup()
  139.     {
  140.         $this->logDebug(array('Message' => 'Task runner: starting self deactivation.'));
  141.         $this->getTimeProvider()->sleep($this->getWakeupDelay());
  142.         $this->getRunnerStorage()->setStatus(TaskRunnerStatus::createNullStatus());
  143.         $this->logDebug(array('Message' => 'Task runner: sending task runner wakeup signal.'));
  144.         $this->getTaskWakeup()->wakeup();
  145.     }
  146.     /**
  147.      * @return bool
  148.      * @throws TaskRunnerStatusStorageUnavailableException
  149.      */
  150.     private function isCurrentRunnerAlive()
  151.     {
  152.         $runnerStatus $this->getRunnerStorage()->getStatus();
  153.         $runnerExpired $runnerStatus->isExpired();
  154.         $runnerGuidIsCorrect $this->guid === $runnerStatus->getGuid();
  155.         if ($runnerExpired) {
  156.             $this->logWarning(array('Message' => 'Task runner: Task runner started but it is expired.'));
  157.         }
  158.         if (!$runnerGuidIsCorrect) {
  159.             $this->logWarning(array('Message' => 'Task runner: Task runner started but it is not active anymore.'));
  160.         }
  161.         return !$runnerExpired && $runnerGuidIsCorrect;
  162.     }
  163.     /**
  164.      * @param QueueItem $item
  165.      *
  166.      * @return bool
  167.      * @throws QueueItemDeserializationException
  168.      */
  169.     private function isItemExpired(QueueItem $item)
  170.     {
  171.         $currentTimestamp $this->getTimeProvider()->getCurrentLocalTime()->getTimestamp();
  172.         $maxTaskInactivityPeriod $item->getTask()->getMaxInactivityPeriod();
  173.         return ($item->getLastUpdateTimestamp() + $maxTaskInactivityPeriod) < $currentTimestamp;
  174.     }
  175.     /**
  176.      * @param QueueItem $item
  177.      *
  178.      * @return string
  179.      * @throws QueueItemDeserializationException
  180.      */
  181.     private function getItemDescription(QueueItem $item)
  182.     {
  183.         return "{$item->getId()}({$item->getTaskType()})";
  184.     }
  185.     /**
  186.      * @return AsyncProcessStarter
  187.      */
  188.     private function getAsyncProcessStarter()
  189.     {
  190.         if (empty($this->asyncProcessStarter)) {
  191.             $this->asyncProcessStarter ServiceRegister::getService(AsyncProcessStarter::CLASS_NAME);
  192.         }
  193.         return $this->asyncProcessStarter;
  194.     }
  195.     /**
  196.      * @return Queue
  197.      */
  198.     private function getQueue()
  199.     {
  200.         if (empty($this->queue)) {
  201.             $this->queue ServiceRegister::getService(Queue::CLASS_NAME);
  202.         }
  203.         return $this->queue;
  204.     }
  205.     /**
  206.      * @return TaskRunnerStatusStorageInterface
  207.      */
  208.     private function getRunnerStorage()
  209.     {
  210.         if (empty($this->runnerStorage)) {
  211.             $this->runnerStorage ServiceRegister::getService(TaskRunnerStatusStorageInterface::CLASS_NAME);
  212.         }
  213.         return $this->runnerStorage;
  214.     }
  215.     /**
  216.      * @return Configuration
  217.      */
  218.     private function getConfigurationService()
  219.     {
  220.         if (empty($this->configurationService)) {
  221.             $this->configurationService ServiceRegister::getService(Configuration::CLASS_NAME);
  222.         }
  223.         return $this->configurationService;
  224.     }
  225.     /**
  226.      * @return TimeProvider
  227.      */
  228.     private function getTimeProvider()
  229.     {
  230.         if (empty($this->timeProvider)) {
  231.             $this->timeProvider ServiceRegister::getService(TimeProvider::CLASS_NAME);
  232.         }
  233.         return $this->timeProvider;
  234.     }
  235.     /**
  236.      * @return TaskRunnerWakeupInterface
  237.      */
  238.     private function getTaskWakeup()
  239.     {
  240.         if (empty($this->taskWakeup)) {
  241.             $this->taskWakeup ServiceRegister::getService(TaskRunnerWakeupInterface::CLASS_NAME);
  242.         }
  243.         return $this->taskWakeup;
  244.     }
  245.     /**
  246.      * Returns wakeup delay in seconds
  247.      *
  248.      * @return int
  249.      */
  250.     private function getWakeupDelay()
  251.     {
  252.         $configurationValue $this->getConfigurationService()->getTaskRunnerWakeupDelay();
  253.         return !is_null($configurationValue) ? $configurationValue self::WAKEUP_DELAY;
  254.     }
  255.     /**
  256.      * Logs message and queue item details
  257.      *
  258.      * @param QueueItem $queueItem
  259.      * @param string $message
  260.      *
  261.      * @throws QueueItemDeserializationException
  262.      */
  263.     private function logMessageFor(QueueItem $queueItem$message)
  264.     {
  265.         $this->logDebug(array(
  266.             'RunnerGuid' => $this->guid,
  267.             'Message' => $message,
  268.             'TaskId' => $queueItem->getId(),
  269.             'TaskType' => $queueItem->getTaskType(),
  270.             'TaskRetries' => $queueItem->getRetries(),
  271.             'TaskProgressBasePoints' => $queueItem->getProgressBasePoints(),
  272.             'TaskLastExecutionProgressBasePoints' => $queueItem->getLastExecutionProgressBasePoints(),
  273.         ));
  274.     }
  275.     /**
  276.      * Helper methods to encapsulate debug level logging
  277.      *
  278.      * @param array $debugContent
  279.      */
  280.     private function logDebug(array $debugContent)
  281.     {
  282.         $debugContent['RunnerGuid'] = $this->guid;
  283.         Logger::logDebug($debugContent['Message'], 'Core'$debugContent);
  284.     }
  285.     /**
  286.      * Helper methods to encapsulate warning level logging
  287.      *
  288.      * @param array $debugContent
  289.      */
  290.     private function logWarning(array $debugContent)
  291.     {
  292.         $debugContent['RunnerGuid'] = $this->guid;
  293.         Logger::logWarning($debugContent['Message'], 'Core'$debugContent);
  294.     }
  295. }