diff --git a/src/pocketmine/scheduler/AsyncPool.php b/src/pocketmine/scheduler/AsyncPool.php index 70e71a5c1..b5633b57c 100644 --- a/src/pocketmine/scheduler/AsyncPool.php +++ b/src/pocketmine/scheduler/AsyncPool.php @@ -39,17 +39,11 @@ class AsyncPool{ /** @var int */ private $workerMemoryLimit; - /** @var AsyncTask[] */ - private $tasks = []; - /** @var int[] */ - private $taskWorkers = []; - /** @var int */ - private $nextTaskId = 1; + /** @var \SplQueue[]|AsyncTask[][] */ + private $taskQueues = []; /** @var AsyncWorker[] */ private $workers = []; - /** @var int[] */ - private $workerUsage = []; /** @var \Closure[] */ private $workerStartHooks = []; @@ -124,11 +118,13 @@ class AsyncPool{ */ private function getWorker(int $worker) : AsyncWorker{ if(!isset($this->workers[$worker])){ - $this->workerUsage[$worker] = 0; + $this->workers[$worker] = new AsyncWorker($this->logger, $worker, $this->workerMemoryLimit); $this->workers[$worker]->setClassLoader($this->classLoader); $this->workers[$worker]->start(self::WORKER_START_OPTIONS); + $this->taskQueues[$worker] = new \SplQueue(); + foreach($this->workerStartHooks as $hook){ $hook($worker); } @@ -147,18 +143,15 @@ class AsyncPool{ if($worker < 0 or $worker >= $this->size){ throw new \InvalidArgumentException("Invalid worker $worker"); } - if($task->getTaskId() !== null){ + if($task->isSubmitted()){ throw new \InvalidArgumentException("Cannot submit the same AsyncTask instance more than once"); } $task->progressUpdates = new \Threaded; - $task->setTaskId($this->nextTaskId++); - - $this->tasks[$task->getTaskId()] = $task; + $task->setSubmitted(); $this->getWorker($worker)->stack($task); - $this->workerUsage[$worker]++; - $this->taskWorkers[$task->getTaskId()] = $worker; + $this->taskQueues[$worker]->enqueue($task); } /** @@ -173,8 +166,8 @@ class AsyncPool{ public function selectWorker() : int{ $worker = null; $minUsage = PHP_INT_MAX; - foreach($this->workerUsage as $i => $usage){ - if($usage < $minUsage){ + foreach($this->taskQueues as $i => $queue){ + if(($usage = $queue->count()) < $minUsage){ $worker = $i; $minUsage = $usage; if($usage === 0){ @@ -205,7 +198,7 @@ class AsyncPool{ * @return int */ public function submitTask(AsyncTask $task) : int{ - if($task->getTaskId() !== null){ + if($task->isSubmitted()){ throw new \InvalidArgumentException("Cannot submit the same AsyncTask instance more than once"); } @@ -214,77 +207,58 @@ class AsyncPool{ return $worker; } - /** - * Removes a completed or crashed task from the pool. - * - * @param AsyncTask $task - * @param bool $force - */ - private function removeTask(AsyncTask $task, bool $force = false) : void{ - if(isset($this->taskWorkers[$task->getTaskId()])){ - if(!$force and ($task->isRunning() or !$task->isGarbage())){ - return; - } - $this->workerUsage[$this->taskWorkers[$task->getTaskId()]]--; - } - - unset($this->tasks[$task->getTaskId()]); - unset($this->taskWorkers[$task->getTaskId()]); - } - - /** - * Collects garbage from running workers. - */ - private function collectWorkers() : void{ - foreach($this->workers as $worker){ - $worker->collect(); - } - } - /** * Collects finished and/or crashed tasks from the workers, firing their on-completion hooks where appropriate. * * @throws \ReflectionException */ public function collectTasks() : void{ - foreach($this->tasks as $task){ - $task->checkProgressUpdates(); - if($task->isGarbage() and !$task->isRunning() and !$task->isCrashed()){ - if(!$task->hasCancelledRun()){ - try{ - /* - * It's possible for a task to submit a progress update and then finish before the progress - * update is detected by the parent thread, so here we consume any missed updates. - * - * When this happens, it's possible for a progress update to arrive between the previous - * checkProgressUpdates() call and the next isGarbage() call, causing progress updates to be - * lost. Thus, it's necessary to do one last check here to make sure all progress updates have - * been consumed before completing. - */ - $task->checkProgressUpdates(); - $task->onCompletion(); - }catch(\Throwable $e){ - $this->logger->critical("Could not execute completion of asynchronous task " . (new \ReflectionClass($task))->getShortName() . ": " . $e->getMessage()); - $this->logger->logException($e); - } - } + foreach($this->taskQueues as $worker => $queue){ + $doGC = false; + while(!$queue->isEmpty()){ + /** @var AsyncTask $task */ + $task = $queue->bottom(); + $task->checkProgressUpdates(); + if(!$task->isRunning() and $task->isGarbage()){ //make sure the task actually executed before trying to collect + $doGC = true; + $queue->dequeue(); - $this->removeTask($task); - }elseif($task->isCrashed()){ - $this->logger->critical("Could not execute asynchronous task " . (new \ReflectionClass($task))->getShortName() . ": Task crashed"); - $this->removeTask($task, true); + if($task->isCrashed()){ + $this->logger->critical("Could not execute asynchronous task " . (new \ReflectionClass($task))->getShortName() . ": Task crashed"); + }elseif(!$task->hasCancelledRun()){ + try{ + /* + * It's possible for a task to submit a progress update and then finish before the progress + * update is detected by the parent thread, so here we consume any missed updates. + * + * When this happens, it's possible for a progress update to arrive between the previous + * checkProgressUpdates() call and the next isGarbage() call, causing progress updates to be + * lost. Thus, it's necessary to do one last check here to make sure all progress updates have + * been consumed before completing. + */ + $task->checkProgressUpdates(); + $task->onCompletion(); + }catch(\Throwable $e){ + $this->logger->critical("Could not execute completion of asynchronous task " . (new \ReflectionClass($task))->getShortName() . ": " . $e->getMessage()); + $this->logger->logException($e); + } + } + }else{ + break; //current task is still running, skip to next worker + } + } + if($doGC){ + $this->workers[$worker]->collect(); } } - - $this->collectWorkers(); } public function shutdownUnusedWorkers() : int{ $ret = 0; - foreach($this->workerUsage as $i => $usage){ - if($usage === 0){ + foreach($this->taskQueues as $i => $queue){ + if($queue->isEmpty()){ $this->workers[$i]->quit(); - unset($this->workers[$i], $this->workerUsage[$i]); + unset($this->workers[$i], $this->taskQueues[$i]); $ret++; } } @@ -301,24 +275,21 @@ class AsyncPool{ foreach($this->workers as $worker){ /** @var AsyncTask $task */ while(($task = $worker->unstack()) !== null){ - //cancelRun() is not strictly necessary here, but it might be used to inform plugins of the task state - //(i.e. it never executed). - $task->cancelRun(); - $this->removeTask($task, true); + //NOOP: the below loop will deal with marking tasks as garbage } } - foreach($this->tasks as $task){ - $task->cancelRun(); - $this->removeTask($task, true); + foreach($this->taskQueues as $queue){ + while(!$queue->isEmpty()){ + /** @var AsyncTask $task */ + $task = $queue->dequeue(); + $task->cancelRun(); + } } - $this->taskWorkers = []; - $this->tasks = []; - foreach($this->workers as $worker){ $worker->quit(); } $this->workers = []; - $this->workerUsage = []; + $this->taskQueues = []; } } diff --git a/src/pocketmine/scheduler/AsyncTask.php b/src/pocketmine/scheduler/AsyncTask.php index 8360fd312..36231235a 100644 --- a/src/pocketmine/scheduler/AsyncTask.php +++ b/src/pocketmine/scheduler/AsyncTask.php @@ -59,8 +59,8 @@ abstract class AsyncTask extends Collectable{ private $result = null; private $serialized = false; private $cancelRun = false; - /** @var int|null */ - private $taskId = null; + /** @var bool */ + private $submitted = false; private $crashed = false; @@ -114,15 +114,15 @@ abstract class AsyncTask extends Collectable{ $this->serialized = $serialize; } - public function setTaskId(int $taskId) : void{ - $this->taskId = $taskId; + public function setSubmitted() : void{ + $this->submitted = true; } /** - * @return int|null + * @return bool */ - public function getTaskId() : ?int{ - return $this->taskId; + public function isSubmitted() : bool{ + return $this->submitted; } /**