*/ protected $queue; /** * @var TaskHandler[] */ protected $tasks = []; /** @var int */ private $ids = 1; /** @var int */ protected $currentTick = 0; public function __construct(\Logger $logger, ?string $owner = null){ $this->logger = $logger; $this->owner = $owner; $this->queue = new ReversePriorityQueue(); } /** * @param Task $task * * @return null|TaskHandler */ public function scheduleTask(Task $task){ return $this->addTask($task, -1, -1); } /** * @param Task $task * @param int $delay * * @return null|TaskHandler */ public function scheduleDelayedTask(Task $task, int $delay){ return $this->addTask($task, $delay, -1); } /** * @param Task $task * @param int $period * * @return null|TaskHandler */ public function scheduleRepeatingTask(Task $task, int $period){ return $this->addTask($task, -1, $period); } /** * @param Task $task * @param int $delay * @param int $period * * @return null|TaskHandler */ public function scheduleDelayedRepeatingTask(Task $task, int $delay, int $period){ return $this->addTask($task, $delay, $period); } /** * @param int $taskId */ public function cancelTask(int $taskId){ if($taskId !== null and isset($this->tasks[$taskId])){ try{ $this->tasks[$taskId]->cancel(); }catch(\Throwable $e){ $this->logger->critical("Task " . $this->tasks[$taskId]->getTaskName() . " threw an exception when trying to cancel: " . $e->getMessage()); $this->logger->logException($e); } unset($this->tasks[$taskId]); } } public function cancelAllTasks(){ foreach($this->tasks as $id => $task){ $this->cancelTask($id); } $this->tasks = []; while(!$this->queue->isEmpty()){ $this->queue->extract(); } $this->ids = 1; } /** * @param int $taskId * * @return bool */ public function isQueued(int $taskId) : bool{ return isset($this->tasks[$taskId]); } /** * @param Task $task * @param int $delay * @param int $period * * @return null|TaskHandler * * @throws \InvalidStateException */ private function addTask(Task $task, int $delay, int $period){ if($this->shutdown){ throw new \InvalidStateException("Tried to schedule task after scheduler shutdown"); } if($delay <= 0){ $delay = -1; } if($period <= -1){ $period = -1; }elseif($period < 1){ $period = 1; } return $this->handle(new TaskHandler($task, $this->nextId(), $delay, $period, $this->owner)); } private function handle(TaskHandler $handler) : TaskHandler{ if($handler->isDelayed()){ $nextRun = $this->currentTick + $handler->getDelay(); }else{ $nextRun = $this->currentTick; } $handler->setNextRun($nextRun); $this->tasks[$handler->getTaskId()] = $handler; $this->queue->insert($handler, $nextRun); return $handler; } public function shutdown() : void{ $this->shutdown = true; $this->cancelAllTasks(); } /** * @param int $currentTick */ public function mainThreadHeartbeat(int $currentTick){ $this->currentTick = $currentTick; while($this->isReady($this->currentTick)){ /** @var TaskHandler $task */ $task = $this->queue->extract(); if($task->isCancelled()){ unset($this->tasks[$task->getTaskId()]); continue; }else{ try{ $task->run($this->currentTick); }catch(\Throwable $e){ $this->logger->critical("Could not execute task " . $task->getTaskName() . ": " . $e->getMessage()); $this->logger->logException($e); } } if($task->isRepeating()){ $task->setNextRun($this->currentTick + $task->getPeriod()); $this->queue->insert($task, $this->currentTick + $task->getPeriod()); }else{ $task->remove(); unset($this->tasks[$task->getTaskId()]); } } } private function isReady(int $currentTick) : bool{ return count($this->tasks) > 0 and $this->queue->current()->getNextRun() <= $currentTick; } /** * @return int */ private function nextId() : int{ return $this->ids++; } }