From cbda24d77e31389f0311c841c901cc1bf1ec45f4 Mon Sep 17 00:00:00 2001 From: "Dylan K. Taylor" Date: Tue, 23 May 2023 01:31:25 +0100 Subject: [PATCH] Consolidate worker data under AsyncPoolWorkerEntry instead of having a bunch of arrays... this improves the system integrity and makes it less obnoxious to look at --- src/scheduler/AsyncPool.php | 65 ++++++++++---------------- src/scheduler/AsyncPoolWorkerEntry.php | 20 +++++++- 2 files changed, 43 insertions(+), 42 deletions(-) diff --git a/src/scheduler/AsyncPool.php b/src/scheduler/AsyncPool.php index 6a696fd1b..85a27390a 100644 --- a/src/scheduler/AsyncPool.php +++ b/src/scheduler/AsyncPool.php @@ -44,22 +44,11 @@ use const PHP_INT_MAX; class AsyncPool{ private const WORKER_START_OPTIONS = NativeThread::INHERIT_INI | NativeThread::INHERIT_COMMENTS; - /** - * @var \SplQueue[]|AsyncTask[][] - * @phpstan-var array> - */ - private array $taskQueues = []; - /** * @var AsyncPoolWorkerEntry[] * @phpstan-var array */ private array $workers = []; - /** - * @var int[] - * @phpstan-var array - */ - private array $workerLastUsed = []; /** * @var \Closure[] @@ -129,23 +118,21 @@ class AsyncPool{ * Fetches the worker with the specified ID, starting it if it does not exist, and firing any registered worker * start hooks. */ - private function getWorker(int $worker) : AsyncWorker{ - if(!isset($this->workers[$worker])){ - $sleeperEntry = $this->eventLoop->addNotifier(function() use ($worker) : void{ - $this->collectTasksFromWorker($worker); + private function getWorker(int $workerId) : AsyncPoolWorkerEntry{ + if(!isset($this->workers[$workerId])){ + $sleeperEntry = $this->eventLoop->addNotifier(function() use ($workerId) : void{ + $this->collectTasksFromWorker($workerId); }); - $this->workers[$worker] = new AsyncPoolWorkerEntry(new AsyncWorker($this->logger, $worker, $this->workerMemoryLimit, $sleeperEntry), $sleeperEntry->getNotifierId()); - $this->workers[$worker]->worker->setClassLoaders([$this->classLoader]); - $this->workers[$worker]->worker->start(self::WORKER_START_OPTIONS); - - $this->taskQueues[$worker] = new \SplQueue(); + $this->workers[$workerId] = new AsyncPoolWorkerEntry(new AsyncWorker($this->logger, $workerId, $this->workerMemoryLimit, $sleeperEntry), $sleeperEntry->getNotifierId()); + $this->workers[$workerId]->worker->setClassLoaders([$this->classLoader]); + $this->workers[$workerId]->worker->start(self::WORKER_START_OPTIONS); foreach($this->workerStartHooks as $hook){ - $hook($worker); + $hook($workerId); } } - return $this->workers[$worker]->worker; + return $this->workers[$workerId]; } /** @@ -162,9 +149,7 @@ class AsyncPool{ $task->progressUpdates = new ThreadSafeArray(); $task->setSubmitted(); - $this->getWorker($worker)->stack($task); - $this->taskQueues[$worker]->enqueue($task); - $this->workerLastUsed[$worker] = time(); + $this->getWorker($worker)->submit($task); } /** @@ -177,8 +162,8 @@ class AsyncPool{ public function selectWorker() : int{ $worker = null; $minUsage = PHP_INT_MAX; - foreach($this->taskQueues as $i => $queue){ - if(($usage = $queue->count()) < $minUsage){ + foreach($this->workers as $i => $entry){ + if(($usage = $entry->tasks->count()) < $minUsage){ $worker = $i; $minUsage = $usage; if($usage === 0){ @@ -221,13 +206,13 @@ class AsyncPool{ * @return bool whether there are tasks left to be collected */ public function collectTasks() : bool{ - foreach($this->taskQueues as $worker => $queue){ - $this->collectTasksFromWorker($worker); + foreach($this->workers as $workerId => $entry){ + $this->collectTasksFromWorker($workerId); } //we check this in a second loop, because task collection could have caused new tasks to be added to the queues - foreach($this->taskQueues as $queue){ - if(!$queue->isEmpty()){ + foreach($this->workers as $entry){ + if(!$entry->tasks->isEmpty()){ return true; } } @@ -235,10 +220,10 @@ class AsyncPool{ } public function collectTasksFromWorker(int $worker) : bool{ - if(!isset($this->taskQueues[$worker])){ + if(!isset($this->workers[$worker])){ throw new \InvalidArgumentException("No such worker $worker"); } - $queue = $this->taskQueues[$worker]; + $queue = $this->workers[$worker]->tasks; $more = false; while(!$queue->isEmpty()){ /** @var AsyncTask $task */ @@ -279,17 +264,17 @@ class AsyncPool{ * @phpstan-return array */ public function getTaskQueueSizes() : array{ - return array_map(function(\SplQueue $queue) : int{ return $queue->count(); }, $this->taskQueues); + return array_map(function(AsyncPoolWorkerEntry $entry) : int{ return $entry->tasks->count(); }, $this->workers); } public function shutdownUnusedWorkers() : int{ $ret = 0; $time = time(); - foreach($this->taskQueues as $i => $queue){ - if((!isset($this->workerLastUsed[$i]) || $this->workerLastUsed[$i] + 300 < $time) && $queue->isEmpty()){ - $this->workers[$i]->worker->quit(); - $this->eventLoop->removeNotifier($this->workers[$i]->sleeperNotifierId); - unset($this->workers[$i], $this->taskQueues[$i], $this->workerLastUsed[$i]); + foreach($this->workers as $i => $entry){ + if($entry->lastUsed + 300 < $time && $entry->tasks->isEmpty()){ + $entry->worker->quit(); + $this->eventLoop->removeNotifier($entry->sleeperNotifierId); + unset($this->workers[$i]); $ret++; } } @@ -310,7 +295,5 @@ class AsyncPool{ $this->eventLoop->removeNotifier($worker->sleeperNotifierId); } $this->workers = []; - $this->taskQueues = []; - $this->workerLastUsed = []; } } diff --git a/src/scheduler/AsyncPoolWorkerEntry.php b/src/scheduler/AsyncPoolWorkerEntry.php index ce2412620..6f6fdc5d3 100644 --- a/src/scheduler/AsyncPoolWorkerEntry.php +++ b/src/scheduler/AsyncPoolWorkerEntry.php @@ -23,10 +23,28 @@ declare(strict_types=1); namespace pocketmine\scheduler; +use function time; + final class AsyncPoolWorkerEntry{ + public int $lastUsed; + /** + * @var \SplQueue|AsyncTask[] + * @phpstan-var \SplQueue + */ + public \SplQueue $tasks; + public function __construct( public readonly AsyncWorker $worker, public readonly int $sleeperNotifierId - ){} + ){ + $this->lastUsed = time(); + $this->tasks = new \SplQueue(); + } + + public function submit(AsyncTask $task) : void{ + $this->tasks->enqueue($task); + $this->lastUsed = time(); + $this->worker->stack($task); + } }