From 29f6ed3f688e527bb7a4454fb24b3d18627a603e Mon Sep 17 00:00:00 2001 From: "Dylan K. Taylor" Date: Wed, 2 Dec 2020 19:34:34 +0000 Subject: [PATCH] Use Snooze to improve AsyncTask collection times regardless of how long an async task takes to run, it will take a multiple of 50ms to get the result processed. This delay causes issues in some cases for stuff like generation, which causes locking of adjacent chunks, and async packet compression, which experiences elevated latency because of this problem. This is not an ideal solution for packet compression since it will cause the sleeper handler to get hammered, but since it's already getting hammered by every packet from RakLib, I don't think that's a big problem. --- src/Server.php | 2 +- src/scheduler/AsyncPool.php | 87 ++++++++++++++--------- src/scheduler/AsyncTask.php | 1 + src/scheduler/AsyncWorker.php | 11 ++- tests/phpstan/configs/l8-baseline.neon | 5 ++ tests/phpunit/scheduler/AsyncPoolTest.php | 3 +- 6 files changed, 73 insertions(+), 36 deletions(-) diff --git a/src/Server.php b/src/Server.php index 5ed46004d..0e2084762 100644 --- a/src/Server.php +++ b/src/Server.php @@ -873,7 +873,7 @@ class Server{ $poolSize = max(1, (int) $poolSize); } - $this->asyncPool = new AsyncPool($poolSize, max(-1, (int) $this->configGroup->getProperty("memory.async-worker-hard-limit", 256)), $this->autoloader, $this->logger); + $this->asyncPool = new AsyncPool($poolSize, max(-1, (int) $this->configGroup->getProperty("memory.async-worker-hard-limit", 256)), $this->autoloader, $this->logger, $this->tickSleeper); $netCompressionThreshold = -1; if($this->configGroup->getProperty("network.batch-threshold", 256) >= 0){ diff --git a/src/scheduler/AsyncPool.php b/src/scheduler/AsyncPool.php index f7606c7c3..9eb7126ef 100644 --- a/src/scheduler/AsyncPool.php +++ b/src/scheduler/AsyncPool.php @@ -23,6 +23,8 @@ declare(strict_types=1); namespace pocketmine\scheduler; +use pocketmine\snooze\SleeperHandler; +use pocketmine\snooze\SleeperNotifier; use pocketmine\utils\Utils; use function array_keys; use function array_map; @@ -73,11 +75,15 @@ class AsyncPool{ */ private $workerStartHooks = []; - public function __construct(int $size, int $workerMemoryLimit, \ClassLoader $classLoader, \ThreadedLogger $logger){ + /** @var SleeperHandler */ + private $eventLoop; + + public function __construct(int $size, int $workerMemoryLimit, \ClassLoader $classLoader, \ThreadedLogger $logger, SleeperHandler $eventLoop){ $this->size = $size; $this->workerMemoryLimit = $workerMemoryLimit; $this->classLoader = $classLoader; $this->logger = $logger; + $this->eventLoop = $eventLoop; } /** @@ -136,8 +142,11 @@ class AsyncPool{ */ private function getWorker(int $worker) : AsyncWorker{ if(!isset($this->workers[$worker])){ - - $this->workers[$worker] = new AsyncWorker($this->logger, $worker, $this->workerMemoryLimit); + $notifier = new SleeperNotifier(); + $this->workers[$worker] = new AsyncWorker($this->logger, $worker, $this->workerMemoryLimit, $notifier); + $this->eventLoop->addNotifier($notifier, function() use ($worker) : void{ + $this->collectTasksFromWorker($worker); + }); $this->workers[$worker]->setClassLoader($this->classLoader); $this->workers[$worker]->start(self::WORKER_START_OPTIONS); @@ -226,39 +235,49 @@ class AsyncPool{ public function collectTasks() : bool{ $more = false; foreach($this->taskQueues as $worker => $queue){ - $doGC = false; - while(!$queue->isEmpty()){ - /** @var AsyncTask $task */ - $task = $queue->bottom(); - $task->checkProgressUpdates(); - if($task->isFinished()){ //make sure the task actually executed before trying to collect - $doGC = true; - $queue->dequeue(); + $more = $this->collectTasksFromWorker($worker) || $more; + } + return $more; + } - if($task->isCrashed()){ - $this->logger->critical("Could not execute asynchronous task " . (new \ReflectionClass($task))->getShortName() . ": Task crashed"); - $task->onError(); - }elseif(!$task->hasCancelledRun()){ - /* - * It's possible for a task to submit a progress update and then finish before the progress - * update is detected by the parent thread, so here we consume any missed updates. - * - * When this happens, it's possible for a progress update to arrive between the previous - * checkProgressUpdates() call and the next isGarbage() call, causing progress updates to be - * lost. Thus, it's necessary to do one last check here to make sure all progress updates have - * been consumed before completing. - */ - $task->checkProgressUpdates(); - $task->onCompletion(); - } - }else{ - $more = true; - break; //current task is still running, skip to next worker + public function collectTasksFromWorker(int $worker) : bool{ + if(!isset($this->taskQueues[$worker])){ + throw new \InvalidArgumentException("No such worker $worker"); + } + $queue = $this->taskQueues[$worker]; + $doGC = false; + $more = false; + while(!$queue->isEmpty()){ + /** @var AsyncTask $task */ + $task = $queue->bottom(); + $task->checkProgressUpdates(); + if($task->isFinished()){ //make sure the task actually executed before trying to collect + $doGC = true; + $queue->dequeue(); + + if($task->isCrashed()){ + $this->logger->critical("Could not execute asynchronous task " . (new \ReflectionClass($task))->getShortName() . ": Task crashed"); + $task->onError(); + }elseif(!$task->hasCancelledRun()){ + /* + * It's possible for a task to submit a progress update and then finish before the progress + * update is detected by the parent thread, so here we consume any missed updates. + * + * When this happens, it's possible for a progress update to arrive between the previous + * checkProgressUpdates() call and the next isGarbage() call, causing progress updates to be + * lost. Thus, it's necessary to do one last check here to make sure all progress updates have + * been consumed before completing. + */ + $task->checkProgressUpdates(); + $task->onCompletion(); } + }else{ + $more = true; + break; //current task is still running, skip to next worker } - if($doGC){ - $this->workers[$worker]->collect(); - } + } + if($doGC){ + $this->workers[$worker]->collect(); } return $more; } @@ -279,6 +298,7 @@ class AsyncPool{ foreach($this->taskQueues as $i => $queue){ if((!isset($this->workerLastUsed[$i]) or $this->workerLastUsed[$i] + 300 < $time) and $queue->isEmpty()){ $this->workers[$i]->quit(); + $this->eventLoop->removeNotifier($this->workers[$i]->getNotifier()); unset($this->workers[$i], $this->taskQueues[$i], $this->workerLastUsed[$i]); $ret++; } @@ -309,6 +329,7 @@ class AsyncPool{ foreach($this->workers as $worker){ $worker->quit(); + $this->eventLoop->removeNotifier($worker->getNotifier()); } $this->workers = []; $this->taskQueues = []; diff --git a/src/scheduler/AsyncTask.php b/src/scheduler/AsyncTask.php index cd79e6f40..b58f1fddb 100644 --- a/src/scheduler/AsyncTask.php +++ b/src/scheduler/AsyncTask.php @@ -89,6 +89,7 @@ abstract class AsyncTask extends \Threaded{ } $this->finished = true; + $this->worker->getNotifier()->wakeupSleeper(); } public function isCrashed() : bool{ diff --git a/src/scheduler/AsyncWorker.php b/src/scheduler/AsyncWorker.php index 51bdfd06e..eedb7ea5c 100644 --- a/src/scheduler/AsyncWorker.php +++ b/src/scheduler/AsyncWorker.php @@ -23,6 +23,7 @@ declare(strict_types=1); namespace pocketmine\scheduler; +use pocketmine\snooze\SleeperNotifier; use pocketmine\thread\Worker; use function gc_enable; use function ini_set; @@ -39,10 +40,18 @@ class AsyncWorker extends Worker{ /** @var int */ private $memoryLimit; - public function __construct(\ThreadedLogger $logger, int $id, int $memoryLimit){ + /** @var SleeperNotifier */ + private $notifier; + + public function __construct(\ThreadedLogger $logger, int $id, int $memoryLimit, SleeperNotifier $notifier){ $this->logger = $logger; $this->id = $id; $this->memoryLimit = $memoryLimit; + $this->notifier = $notifier; + } + + public function getNotifier() : SleeperNotifier{ + return $this->notifier; } protected function onRun() : void{ diff --git a/tests/phpstan/configs/l8-baseline.neon b/tests/phpstan/configs/l8-baseline.neon index ef7fd7b38..e7f7642c4 100644 --- a/tests/phpstan/configs/l8-baseline.neon +++ b/tests/phpstan/configs/l8-baseline.neon @@ -460,6 +460,11 @@ parameters: count: 1 path: ../../../src/scheduler/AsyncTask.php + - + message: "#^Cannot call method getNotifier\\(\\) on pocketmine\\\\scheduler\\\\AsyncWorker\\|null\\.$#" + count: 1 + path: ../../../src/scheduler/AsyncTask.php + - message: "#^Cannot call method count\\(\\) on ArrayObject\\\\>\\|null\\.$#" count: 1 diff --git a/tests/phpunit/scheduler/AsyncPoolTest.php b/tests/phpunit/scheduler/AsyncPoolTest.php index f2a521ead..ab2bc5a29 100644 --- a/tests/phpunit/scheduler/AsyncPoolTest.php +++ b/tests/phpunit/scheduler/AsyncPoolTest.php @@ -24,6 +24,7 @@ declare(strict_types=1); namespace pocketmine\scheduler; use PHPUnit\Framework\TestCase; +use pocketmine\snooze\SleeperHandler; use pocketmine\utils\MainLogger; use pocketmine\utils\Terminal; use function define; @@ -44,7 +45,7 @@ class AsyncPoolTest extends TestCase{ Terminal::init(); @define('pocketmine\\COMPOSER_AUTOLOADER_PATH', dirname(__DIR__, 3) . '/vendor/autoload.php'); $this->mainLogger = new MainLogger(tempnam(sys_get_temp_dir(), "pmlog")); - $this->pool = new AsyncPool(2, 1024, new \BaseClassLoader(), $this->mainLogger); + $this->pool = new AsyncPool(2, 1024, new \BaseClassLoader(), $this->mainLogger, new SleeperHandler()); } public function tearDown() : void{