From d03f36ebeef8a7e3bd72d42935b9c21d6e4c3ab0 Mon Sep 17 00:00:00 2001 From: "Dylan K. Taylor" Date: Wed, 30 May 2018 12:20:10 +0100 Subject: [PATCH] First look at splitting up AsyncPool and ServerScheduler This commit contains quite a few breaking changes with respect to how AsyncTasks are handled. This is necessary to allow separation of the ServerScheduler and the AsyncPool, because in the future the ServerScheduler may be removed and instead there will be isolated per-plugin sync-task schedulers - but we cannot have every plugin with its own worker pool for memory usage reasons if nothing else. The following things have changed: - ServerScheduler: scheduleAsyncTask(), scheduleAsyncTaskToWorker(), getAsyncTaskPoolSize(), increaseAsyncTaskPoolSize() and similar methods have all been removed. Additionally the static \$WORKERS field has been removed. - Server: added API method getAsyncPool(). This grants you direct access to the server's AsyncPool. Calls to getScheduler()->scheduleAsyncTask() and scheduleAsyncTaskToWorker() should be replaced with getAsyncPool()->submitTask() and submitTaskToWorker() respectively. --- src/pocketmine/MemoryManager.php | 12 ++--- src/pocketmine/Player.php | 2 +- src/pocketmine/Server.php | 32 +++++++++--- .../command/defaults/TimingsCommand.php | 2 +- src/pocketmine/level/Level.php | 18 +++---- src/pocketmine/scheduler/AsyncPool.php | 21 ++++---- src/pocketmine/scheduler/ServerScheduler.php | 52 ------------------- src/pocketmine/updater/AutoUpdater.php | 2 +- src/pocketmine/utils/Config.php | 2 +- tests/plugins/PocketMine-TesterPlugin | 2 +- 10 files changed, 54 insertions(+), 91 deletions(-) diff --git a/src/pocketmine/MemoryManager.php b/src/pocketmine/MemoryManager.php index cbf214ce8..ade465a1a 100644 --- a/src/pocketmine/MemoryManager.php +++ b/src/pocketmine/MemoryManager.php @@ -243,9 +243,9 @@ class MemoryManager{ Timings::$garbageCollectorTimer->startTiming(); if($this->garbageCollectionAsync){ - $size = $this->server->getScheduler()->getAsyncTaskPoolSize(); - for($i = 0; $i < $size; ++$i){ - $this->server->getScheduler()->scheduleAsyncTaskToWorker(new GarbageCollectionTask(), $i); + $pool = $this->server->getAsyncPool(); + for($i = 0, $size = $pool->getSize(); $i < $size; ++$i){ + $pool->submitTaskToWorker(new GarbageCollectionTask(), $i); } } @@ -268,9 +268,9 @@ class MemoryManager{ self::dumpMemory($this->server, $outputFolder, $maxNesting, $maxStringSize); if($this->dumpWorkers){ - $scheduler = $this->server->getScheduler(); - for($i = 0, $size = $scheduler->getAsyncTaskPoolSize(); $i < $size; ++$i){ - $scheduler->scheduleAsyncTaskToWorker(new DumpWorkerMemoryTask($outputFolder, $maxNesting, $maxStringSize), $i); + $pool = $this->server->getAsyncPool(); + for($i = 0, $size = $pool->getSize(); $i < $size; ++$i){ + $pool->submitTaskToWorker(new DumpWorkerMemoryTask($outputFolder, $maxNesting, $maxStringSize), $i); } } } diff --git a/src/pocketmine/Player.php b/src/pocketmine/Player.php index 67bf3c78d..04e010b42 100644 --- a/src/pocketmine/Player.php +++ b/src/pocketmine/Player.php @@ -1916,7 +1916,7 @@ class Player extends Human implements CommandSender, ChunkLoader, IPlayer{ } if(!$packet->skipVerification){ - $this->server->getScheduler()->scheduleAsyncTask(new VerifyLoginTask($this, $packet)); + $this->server->getAsyncPool()->submitTask(new VerifyLoginTask($this, $packet)); }else{ $this->onVerifyCompleted($packet, null, true); } diff --git a/src/pocketmine/Server.php b/src/pocketmine/Server.php index f90556ee1..f60675df0 100644 --- a/src/pocketmine/Server.php +++ b/src/pocketmine/Server.php @@ -87,6 +87,7 @@ use pocketmine\plugin\PluginLoadOrder; use pocketmine\plugin\PluginManager; use pocketmine\plugin\ScriptPluginLoader; use pocketmine\resourcepacks\ResourcePackManager; +use pocketmine\scheduler\AsyncPool; use pocketmine\scheduler\FileWriteTask; use pocketmine\scheduler\SendUsageTask; use pocketmine\scheduler\ServerScheduler; @@ -150,6 +151,8 @@ class Server{ /** @var ServerScheduler */ private $scheduler = null; + /** @var AsyncPool */ + private $asyncPool; /** * Counts the ticks since the server start @@ -657,6 +660,10 @@ class Server{ return $this->scheduler; } + public function getAsyncPool() : AsyncPool{ + return $this->asyncPool; + } + /** * @return int */ @@ -824,7 +831,7 @@ class Server{ $nbt = new BigEndianNBTStream(); try{ if($async){ - $this->getScheduler()->scheduleAsyncTask(new FileWriteTask($this->getDataPath() . "players/" . strtolower($name) . ".dat", $nbt->writeCompressed($ev->getSaveData()))); + $this->asyncPool->submitTask(new FileWriteTask($this->getDataPath() . "players/" . strtolower($name) . ".dat", $nbt->writeCompressed($ev->getSaveData()))); }else{ file_put_contents($this->getDataPath() . "players/" . strtolower($name) . ".dat", $nbt->writeCompressed($ev->getSaveData())); } @@ -1518,8 +1525,10 @@ class Server{ $this->logger->info($this->getLanguage()->translateString("pocketmine.server.start", [TextFormat::AQUA . $this->getVersion() . TextFormat::RESET])); + $this->scheduler = new ServerScheduler(); + if(($poolSize = $this->getProperty("settings.async-workers", "auto")) === "auto"){ - $poolSize = ServerScheduler::$WORKERS; + $poolSize = 2; $processors = Utils::getCoreCount() - 2; if($processors > 0){ @@ -1529,7 +1538,7 @@ class Server{ $poolSize = (int) $poolSize; } - ServerScheduler::$WORKERS = $poolSize; + $this->asyncPool = new AsyncPool($this, $poolSize); if($this->getProperty("network.batch-threshold", 256) >= 0){ Network::$BATCH_THRESHOLD = (int) $this->getProperty("network.batch-threshold", 256); @@ -1551,7 +1560,6 @@ class Server{ $this->doTitleTick = ((bool) $this->getProperty("console.title-tick", true)) && Terminal::hasFormattingCodes(); - $this->scheduler = new ServerScheduler(); if($this->getConfigBool("enable-rcon", false)){ try{ @@ -1903,7 +1911,7 @@ class Server{ if(!$forceSync and !$immediate and $this->networkCompressionAsync){ $task = new CompressBatchedTask($pk, $targets); - $this->getScheduler()->scheduleAsyncTask($task); + $this->asyncPool->submitTask($task); }else{ $this->broadcastPacketsCallback($pk, $targets, $immediate); } @@ -2068,8 +2076,12 @@ class Server{ HandlerList::unregisterAll(); if($this->scheduler instanceof ServerScheduler){ - $this->getLogger()->debug("Shutting down task scheduler"); - $this->scheduler->shutdown(); + $this->getLogger()->debug("Stopping all tasks"); + $this->scheduler->cancelAllTasks(); + } + if($this->asyncPool instanceof AsyncPool){ + $this->getLogger()->debug("Shutting down async task worker pool"); + $this->asyncPool->shutdown(); } if($this->properties !== null and $this->properties->hasChanged()){ @@ -2427,7 +2439,7 @@ class Server{ public function sendUsage($type = SendUsageTask::TYPE_STATUS){ if((bool) $this->getProperty("anonymous-statistics.enabled", true)){ - $this->scheduler->scheduleAsyncTask(new SendUsageTask($this, $type, $this->uniquePlayers)); + $this->asyncPool->submitTask(new SendUsageTask($this, $type, $this->uniquePlayers)); } $this->uniquePlayers = []; } @@ -2525,6 +2537,10 @@ class Server{ $this->scheduler->mainThreadHeartbeat($this->tickCounter); Timings::$schedulerTimer->stopTiming(); + Timings::$schedulerAsyncTimer->startTiming(); + $this->asyncPool->collectTasks(); + Timings::$schedulerAsyncTimer->stopTiming(); + $this->checkTickUpdates($this->tickCounter, $tickTime); foreach($this->players as $player){ diff --git a/src/pocketmine/command/defaults/TimingsCommand.php b/src/pocketmine/command/defaults/TimingsCommand.php index ca461268a..3ca5d920f 100644 --- a/src/pocketmine/command/defaults/TimingsCommand.php +++ b/src/pocketmine/command/defaults/TimingsCommand.php @@ -104,7 +104,7 @@ class TimingsCommand extends VanillaCommand{ ]; fclose($fileTimings); - $sender->getServer()->getScheduler()->scheduleAsyncTask(new class([ + $sender->getServer()->getAsyncPool()->submitTask(new class([ ["page" => "http://paste.ubuntu.com", "extraOpts" => [ CURLOPT_HTTPHEADER => ["User-Agent: " . $sender->getServer()->getName() . " " . $sender->getServer()->getPocketMineVersion()], CURLOPT_POST => 1, diff --git a/src/pocketmine/level/Level.php b/src/pocketmine/level/Level.php index a72206288..d40b5e604 100644 --- a/src/pocketmine/level/Level.php +++ b/src/pocketmine/level/Level.php @@ -387,16 +387,16 @@ class Level implements ChunkManager, Metadatable{ } public function registerGenerator(){ - $size = $this->server->getScheduler()->getAsyncTaskPoolSize(); - for($i = 0; $i < $size; ++$i){ - $this->server->getScheduler()->scheduleAsyncTaskToWorker(new GeneratorRegisterTask($this, $this->generatorInstance), $i); + $pool = $this->server->getAsyncPool(); + for($i = 0, $size = $pool->getSize(); $i < $size; ++$i){ + $pool->submitTaskToWorker(new GeneratorRegisterTask($this, $this->generatorInstance), $i); } } public function unregisterGenerator(){ - $size = $this->server->getScheduler()->getAsyncTaskPoolSize(); - for($i = 0; $i < $size; ++$i){ - $this->server->getScheduler()->scheduleAsyncTaskToWorker(new GeneratorUnregisterTask($this), $i); + $pool = $this->server->getAsyncPool(); + for($i = 0, $size = $pool->getSize(); $i < $size; ++$i){ + $pool->submitTaskToWorker(new GeneratorUnregisterTask($this), $i); } } @@ -2486,7 +2486,7 @@ class Level implements ChunkManager, Metadatable{ throw new ChunkException("Invalid Chunk sent"); } - $this->server->getScheduler()->scheduleAsyncTask(new ChunkRequestTask($this, $chunk)); + $this->server->getAsyncPool()->submitTask(new ChunkRequestTask($this, $chunk)); $this->timings->syncChunkSendPrepareTimer->stopTiming(); } @@ -2668,7 +2668,7 @@ class Level implements ChunkManager, Metadatable{ $this->server->getPluginManager()->callEvent(new ChunkLoadEvent($this, $chunk, !$chunk->isGenerated())); if(!$chunk->isLightPopulated() and $chunk->isPopulated() and $this->getServer()->getProperty("chunk-ticking.light-updates", false)){ - $this->getServer()->getScheduler()->scheduleAsyncTask(new LightPopulationTask($this, $chunk)); + $this->getServer()->getAsyncPool()->submitTask(new LightPopulationTask($this, $chunk)); } if($this->isChunkInUse($x, $z)){ @@ -2951,7 +2951,7 @@ class Level implements ChunkManager, Metadatable{ } } $task = new PopulationTask($this, $chunk); - $this->server->getScheduler()->scheduleAsyncTask($task); + $this->server->getAsyncPool()->submitTask($task); } } diff --git a/src/pocketmine/scheduler/AsyncPool.php b/src/pocketmine/scheduler/AsyncPool.php index 96f4e8cc6..4b136d4e6 100644 --- a/src/pocketmine/scheduler/AsyncPool.php +++ b/src/pocketmine/scheduler/AsyncPool.php @@ -24,7 +24,6 @@ declare(strict_types=1); namespace pocketmine\scheduler; use pocketmine\Server; -use pocketmine\timings\Timings; class AsyncPool{ @@ -37,6 +36,8 @@ class AsyncPool{ private $tasks = []; /** @var int[] */ private $taskWorkers = []; + /** @var int */ + private $nextTaskId = 1; /** @var AsyncWorker[] */ private $workers = []; @@ -77,13 +78,15 @@ class AsyncPool{ } public function submitTaskToWorker(AsyncTask $task, int $worker){ - if(isset($this->tasks[$task->getTaskId()]) or $task->isGarbage()){ - return; - } - if($worker < 0 or $worker >= $this->size){ throw new \InvalidArgumentException("Invalid worker $worker"); } + if($task->getTaskId() !== null){ + throw new \InvalidArgumentException("Cannot submit the same AsyncTask instance more than once"); + } + + $task->progressUpdates = new \Threaded; + $task->setTaskId($this->nextTaskId++); $this->tasks[$task->getTaskId()] = $task; @@ -93,8 +96,8 @@ class AsyncPool{ } public function submitTask(AsyncTask $task) : int{ - if(isset($this->tasks[$task->getTaskId()]) or $task->isGarbage()){ - return -1; + if($task->getTaskId() !== null){ + throw new \InvalidArgumentException("Cannot submit the same AsyncTask instance more than once"); } $selectedWorker = mt_rand(0, $this->size - 1); @@ -160,8 +163,6 @@ class AsyncPool{ } public function collectTasks(){ - Timings::$schedulerAsyncTimer->startTiming(); - foreach($this->tasks as $task){ if(!$task->isGarbage()){ $task->checkProgressUpdates($this->server); @@ -189,8 +190,6 @@ class AsyncPool{ } $this->collectWorkers(); - - Timings::$schedulerAsyncTimer->stopTiming(); } public function shutdown() : void{ diff --git a/src/pocketmine/scheduler/ServerScheduler.php b/src/pocketmine/scheduler/ServerScheduler.php index 210b9b736..a71111217 100644 --- a/src/pocketmine/scheduler/ServerScheduler.php +++ b/src/pocketmine/scheduler/ServerScheduler.php @@ -33,7 +33,6 @@ use pocketmine\Server; use pocketmine\utils\ReversePriorityQueue; class ServerScheduler{ - public static $WORKERS = 2; /** * @var ReversePriorityQueue */ @@ -44,9 +43,6 @@ class ServerScheduler{ */ protected $tasks = []; - /** @var AsyncPool */ - protected $asyncPool; - /** @var int */ private $ids = 1; @@ -55,7 +51,6 @@ class ServerScheduler{ public function __construct(){ $this->queue = new ReversePriorityQueue(); - $this->asyncPool = new AsyncPool(Server::getInstance(), self::$WORKERS); } /** @@ -67,49 +62,6 @@ class ServerScheduler{ return $this->addTask($task, -1, -1); } - /** - * Submits an asynchronous task to the Worker Pool - * - * @param AsyncTask $task - * - * @return int - */ - public function scheduleAsyncTask(AsyncTask $task) : int{ - if($task->getTaskId() !== null){ - throw new \UnexpectedValueException("Attempt to schedule the same AsyncTask instance twice"); - } - $id = $this->nextId(); - $task->setTaskId($id); - $task->progressUpdates = new \Threaded; - return $this->asyncPool->submitTask($task); - } - - /** - * Submits an asynchronous task to a specific Worker in the Pool - * - * @param AsyncTask $task - * @param int $worker - * - * @return void - */ - public function scheduleAsyncTaskToWorker(AsyncTask $task, int $worker){ - if($task->getTaskId() !== null){ - throw new \UnexpectedValueException("Attempt to schedule the same AsyncTask instance twice"); - } - $id = $this->nextId(); - $task->setTaskId($id); - $task->progressUpdates = new \Threaded; - $this->asyncPool->submitTaskToWorker($task, $worker); - } - - public function getAsyncTaskPoolSize() : int{ - return $this->asyncPool->getSize(); - } - - public function increaseAsyncTaskPoolSize(int $newSize){ - $this->asyncPool->increaseSize($newSize); - } - /** * @param Task $task * @param int $delay @@ -169,7 +121,6 @@ class ServerScheduler{ $task->cancel(); } $this->tasks = []; - $this->asyncPool->removeTasks(); while(!$this->queue->isEmpty()){ $this->queue->extract(); } @@ -228,7 +179,6 @@ class ServerScheduler{ public function shutdown() : void{ $this->cancelAllTasks(); - $this->asyncPool->shutdown(); } /** @@ -260,8 +210,6 @@ class ServerScheduler{ unset($this->tasks[$task->getTaskId()]); } } - - $this->asyncPool->collectTasks(); } private function isReady(int $currentTicks) : bool{ diff --git a/src/pocketmine/updater/AutoUpdater.php b/src/pocketmine/updater/AutoUpdater.php index a2a20e786..c581cc113 100644 --- a/src/pocketmine/updater/AutoUpdater.php +++ b/src/pocketmine/updater/AutoUpdater.php @@ -149,7 +149,7 @@ class AutoUpdater{ * Schedules an AsyncTask to check for an update. */ public function doCheck(){ - $this->server->getScheduler()->scheduleAsyncTask(new UpdateCheckTask($this->endpoint, $this->getChannel())); + $this->server->getAsyncPool()->submitTask(new UpdateCheckTask($this->endpoint, $this->getChannel())); } /** diff --git a/src/pocketmine/utils/Config.php b/src/pocketmine/utils/Config.php index 5a5b45617..c041b1d3b 100644 --- a/src/pocketmine/utils/Config.php +++ b/src/pocketmine/utils/Config.php @@ -217,7 +217,7 @@ class Config{ } if($async){ - Server::getInstance()->getScheduler()->scheduleAsyncTask(new FileWriteTask($this->file, $content)); + Server::getInstance()->getAsyncPool()->submitTask(new FileWriteTask($this->file, $content)); }else{ file_put_contents($this->file, $content); } diff --git a/tests/plugins/PocketMine-TesterPlugin b/tests/plugins/PocketMine-TesterPlugin index e60eba032..c54e6732b 160000 --- a/tests/plugins/PocketMine-TesterPlugin +++ b/tests/plugins/PocketMine-TesterPlugin @@ -1 +1 @@ -Subproject commit e60eba0324e4540abae317630ed758d3814e0137 +Subproject commit c54e6732b489b4124073eb6714c4fc594faf9c80