First look at splitting up AsyncPool and ServerScheduler

This commit contains quite a few breaking changes with respect to how AsyncTasks are handled. This is necessary to allow separation of the ServerScheduler and the AsyncPool, because in the future the ServerScheduler may be removed and instead there will be isolated per-plugin sync-task schedulers - but we cannot have every plugin with its own worker pool for memory usage reasons if nothing else.

The following things have changed:
- ServerScheduler: scheduleAsyncTask(), scheduleAsyncTaskToWorker(), getAsyncTaskPoolSize(), increaseAsyncTaskPoolSize() and similar methods have all been removed. Additionally the static \$WORKERS field has been removed.
- Server: added API method getAsyncPool(). This grants you direct access to the server's AsyncPool. Calls to getScheduler()->scheduleAsyncTask() and scheduleAsyncTaskToWorker() should be replaced with getAsyncPool()->submitTask() and submitTaskToWorker() respectively.
This commit is contained in:
Dylan K. Taylor 2018-05-30 12:20:10 +01:00
parent 7fce48d38c
commit d03f36ebee
10 changed files with 54 additions and 91 deletions

View File

@ -243,9 +243,9 @@ class MemoryManager{
Timings::$garbageCollectorTimer->startTiming();
if($this->garbageCollectionAsync){
$size = $this->server->getScheduler()->getAsyncTaskPoolSize();
for($i = 0; $i < $size; ++$i){
$this->server->getScheduler()->scheduleAsyncTaskToWorker(new GarbageCollectionTask(), $i);
$pool = $this->server->getAsyncPool();
for($i = 0, $size = $pool->getSize(); $i < $size; ++$i){
$pool->submitTaskToWorker(new GarbageCollectionTask(), $i);
}
}
@ -268,9 +268,9 @@ class MemoryManager{
self::dumpMemory($this->server, $outputFolder, $maxNesting, $maxStringSize);
if($this->dumpWorkers){
$scheduler = $this->server->getScheduler();
for($i = 0, $size = $scheduler->getAsyncTaskPoolSize(); $i < $size; ++$i){
$scheduler->scheduleAsyncTaskToWorker(new DumpWorkerMemoryTask($outputFolder, $maxNesting, $maxStringSize), $i);
$pool = $this->server->getAsyncPool();
for($i = 0, $size = $pool->getSize(); $i < $size; ++$i){
$pool->submitTaskToWorker(new DumpWorkerMemoryTask($outputFolder, $maxNesting, $maxStringSize), $i);
}
}
}

View File

@ -1916,7 +1916,7 @@ class Player extends Human implements CommandSender, ChunkLoader, IPlayer{
}
if(!$packet->skipVerification){
$this->server->getScheduler()->scheduleAsyncTask(new VerifyLoginTask($this, $packet));
$this->server->getAsyncPool()->submitTask(new VerifyLoginTask($this, $packet));
}else{
$this->onVerifyCompleted($packet, null, true);
}

View File

@ -87,6 +87,7 @@ use pocketmine\plugin\PluginLoadOrder;
use pocketmine\plugin\PluginManager;
use pocketmine\plugin\ScriptPluginLoader;
use pocketmine\resourcepacks\ResourcePackManager;
use pocketmine\scheduler\AsyncPool;
use pocketmine\scheduler\FileWriteTask;
use pocketmine\scheduler\SendUsageTask;
use pocketmine\scheduler\ServerScheduler;
@ -150,6 +151,8 @@ class Server{
/** @var ServerScheduler */
private $scheduler = null;
/** @var AsyncPool */
private $asyncPool;
/**
* Counts the ticks since the server start
@ -657,6 +660,10 @@ class Server{
return $this->scheduler;
}
public function getAsyncPool() : AsyncPool{
return $this->asyncPool;
}
/**
* @return int
*/
@ -824,7 +831,7 @@ class Server{
$nbt = new BigEndianNBTStream();
try{
if($async){
$this->getScheduler()->scheduleAsyncTask(new FileWriteTask($this->getDataPath() . "players/" . strtolower($name) . ".dat", $nbt->writeCompressed($ev->getSaveData())));
$this->asyncPool->submitTask(new FileWriteTask($this->getDataPath() . "players/" . strtolower($name) . ".dat", $nbt->writeCompressed($ev->getSaveData())));
}else{
file_put_contents($this->getDataPath() . "players/" . strtolower($name) . ".dat", $nbt->writeCompressed($ev->getSaveData()));
}
@ -1518,8 +1525,10 @@ class Server{
$this->logger->info($this->getLanguage()->translateString("pocketmine.server.start", [TextFormat::AQUA . $this->getVersion() . TextFormat::RESET]));
$this->scheduler = new ServerScheduler();
if(($poolSize = $this->getProperty("settings.async-workers", "auto")) === "auto"){
$poolSize = ServerScheduler::$WORKERS;
$poolSize = 2;
$processors = Utils::getCoreCount() - 2;
if($processors > 0){
@ -1529,7 +1538,7 @@ class Server{
$poolSize = (int) $poolSize;
}
ServerScheduler::$WORKERS = $poolSize;
$this->asyncPool = new AsyncPool($this, $poolSize);
if($this->getProperty("network.batch-threshold", 256) >= 0){
Network::$BATCH_THRESHOLD = (int) $this->getProperty("network.batch-threshold", 256);
@ -1551,7 +1560,6 @@ class Server{
$this->doTitleTick = ((bool) $this->getProperty("console.title-tick", true)) && Terminal::hasFormattingCodes();
$this->scheduler = new ServerScheduler();
if($this->getConfigBool("enable-rcon", false)){
try{
@ -1903,7 +1911,7 @@ class Server{
if(!$forceSync and !$immediate and $this->networkCompressionAsync){
$task = new CompressBatchedTask($pk, $targets);
$this->getScheduler()->scheduleAsyncTask($task);
$this->asyncPool->submitTask($task);
}else{
$this->broadcastPacketsCallback($pk, $targets, $immediate);
}
@ -2068,8 +2076,12 @@ class Server{
HandlerList::unregisterAll();
if($this->scheduler instanceof ServerScheduler){
$this->getLogger()->debug("Shutting down task scheduler");
$this->scheduler->shutdown();
$this->getLogger()->debug("Stopping all tasks");
$this->scheduler->cancelAllTasks();
}
if($this->asyncPool instanceof AsyncPool){
$this->getLogger()->debug("Shutting down async task worker pool");
$this->asyncPool->shutdown();
}
if($this->properties !== null and $this->properties->hasChanged()){
@ -2427,7 +2439,7 @@ class Server{
public function sendUsage($type = SendUsageTask::TYPE_STATUS){
if((bool) $this->getProperty("anonymous-statistics.enabled", true)){
$this->scheduler->scheduleAsyncTask(new SendUsageTask($this, $type, $this->uniquePlayers));
$this->asyncPool->submitTask(new SendUsageTask($this, $type, $this->uniquePlayers));
}
$this->uniquePlayers = [];
}
@ -2525,6 +2537,10 @@ class Server{
$this->scheduler->mainThreadHeartbeat($this->tickCounter);
Timings::$schedulerTimer->stopTiming();
Timings::$schedulerAsyncTimer->startTiming();
$this->asyncPool->collectTasks();
Timings::$schedulerAsyncTimer->stopTiming();
$this->checkTickUpdates($this->tickCounter, $tickTime);
foreach($this->players as $player){

View File

@ -104,7 +104,7 @@ class TimingsCommand extends VanillaCommand{
];
fclose($fileTimings);
$sender->getServer()->getScheduler()->scheduleAsyncTask(new class([
$sender->getServer()->getAsyncPool()->submitTask(new class([
["page" => "http://paste.ubuntu.com", "extraOpts" => [
CURLOPT_HTTPHEADER => ["User-Agent: " . $sender->getServer()->getName() . " " . $sender->getServer()->getPocketMineVersion()],
CURLOPT_POST => 1,

View File

@ -387,16 +387,16 @@ class Level implements ChunkManager, Metadatable{
}
public function registerGenerator(){
$size = $this->server->getScheduler()->getAsyncTaskPoolSize();
for($i = 0; $i < $size; ++$i){
$this->server->getScheduler()->scheduleAsyncTaskToWorker(new GeneratorRegisterTask($this, $this->generatorInstance), $i);
$pool = $this->server->getAsyncPool();
for($i = 0, $size = $pool->getSize(); $i < $size; ++$i){
$pool->submitTaskToWorker(new GeneratorRegisterTask($this, $this->generatorInstance), $i);
}
}
public function unregisterGenerator(){
$size = $this->server->getScheduler()->getAsyncTaskPoolSize();
for($i = 0; $i < $size; ++$i){
$this->server->getScheduler()->scheduleAsyncTaskToWorker(new GeneratorUnregisterTask($this), $i);
$pool = $this->server->getAsyncPool();
for($i = 0, $size = $pool->getSize(); $i < $size; ++$i){
$pool->submitTaskToWorker(new GeneratorUnregisterTask($this), $i);
}
}
@ -2486,7 +2486,7 @@ class Level implements ChunkManager, Metadatable{
throw new ChunkException("Invalid Chunk sent");
}
$this->server->getScheduler()->scheduleAsyncTask(new ChunkRequestTask($this, $chunk));
$this->server->getAsyncPool()->submitTask(new ChunkRequestTask($this, $chunk));
$this->timings->syncChunkSendPrepareTimer->stopTiming();
}
@ -2668,7 +2668,7 @@ class Level implements ChunkManager, Metadatable{
$this->server->getPluginManager()->callEvent(new ChunkLoadEvent($this, $chunk, !$chunk->isGenerated()));
if(!$chunk->isLightPopulated() and $chunk->isPopulated() and $this->getServer()->getProperty("chunk-ticking.light-updates", false)){
$this->getServer()->getScheduler()->scheduleAsyncTask(new LightPopulationTask($this, $chunk));
$this->getServer()->getAsyncPool()->submitTask(new LightPopulationTask($this, $chunk));
}
if($this->isChunkInUse($x, $z)){
@ -2951,7 +2951,7 @@ class Level implements ChunkManager, Metadatable{
}
}
$task = new PopulationTask($this, $chunk);
$this->server->getScheduler()->scheduleAsyncTask($task);
$this->server->getAsyncPool()->submitTask($task);
}
}

View File

@ -24,7 +24,6 @@ declare(strict_types=1);
namespace pocketmine\scheduler;
use pocketmine\Server;
use pocketmine\timings\Timings;
class AsyncPool{
@ -37,6 +36,8 @@ class AsyncPool{
private $tasks = [];
/** @var int[] */
private $taskWorkers = [];
/** @var int */
private $nextTaskId = 1;
/** @var AsyncWorker[] */
private $workers = [];
@ -77,13 +78,15 @@ class AsyncPool{
}
public function submitTaskToWorker(AsyncTask $task, int $worker){
if(isset($this->tasks[$task->getTaskId()]) or $task->isGarbage()){
return;
}
if($worker < 0 or $worker >= $this->size){
throw new \InvalidArgumentException("Invalid worker $worker");
}
if($task->getTaskId() !== null){
throw new \InvalidArgumentException("Cannot submit the same AsyncTask instance more than once");
}
$task->progressUpdates = new \Threaded;
$task->setTaskId($this->nextTaskId++);
$this->tasks[$task->getTaskId()] = $task;
@ -93,8 +96,8 @@ class AsyncPool{
}
public function submitTask(AsyncTask $task) : int{
if(isset($this->tasks[$task->getTaskId()]) or $task->isGarbage()){
return -1;
if($task->getTaskId() !== null){
throw new \InvalidArgumentException("Cannot submit the same AsyncTask instance more than once");
}
$selectedWorker = mt_rand(0, $this->size - 1);
@ -160,8 +163,6 @@ class AsyncPool{
}
public function collectTasks(){
Timings::$schedulerAsyncTimer->startTiming();
foreach($this->tasks as $task){
if(!$task->isGarbage()){
$task->checkProgressUpdates($this->server);
@ -189,8 +190,6 @@ class AsyncPool{
}
$this->collectWorkers();
Timings::$schedulerAsyncTimer->stopTiming();
}
public function shutdown() : void{

View File

@ -33,7 +33,6 @@ use pocketmine\Server;
use pocketmine\utils\ReversePriorityQueue;
class ServerScheduler{
public static $WORKERS = 2;
/**
* @var ReversePriorityQueue<Task>
*/
@ -44,9 +43,6 @@ class ServerScheduler{
*/
protected $tasks = [];
/** @var AsyncPool */
protected $asyncPool;
/** @var int */
private $ids = 1;
@ -55,7 +51,6 @@ class ServerScheduler{
public function __construct(){
$this->queue = new ReversePriorityQueue();
$this->asyncPool = new AsyncPool(Server::getInstance(), self::$WORKERS);
}
/**
@ -67,49 +62,6 @@ class ServerScheduler{
return $this->addTask($task, -1, -1);
}
/**
* Submits an asynchronous task to the Worker Pool
*
* @param AsyncTask $task
*
* @return int
*/
public function scheduleAsyncTask(AsyncTask $task) : int{
if($task->getTaskId() !== null){
throw new \UnexpectedValueException("Attempt to schedule the same AsyncTask instance twice");
}
$id = $this->nextId();
$task->setTaskId($id);
$task->progressUpdates = new \Threaded;
return $this->asyncPool->submitTask($task);
}
/**
* Submits an asynchronous task to a specific Worker in the Pool
*
* @param AsyncTask $task
* @param int $worker
*
* @return void
*/
public function scheduleAsyncTaskToWorker(AsyncTask $task, int $worker){
if($task->getTaskId() !== null){
throw new \UnexpectedValueException("Attempt to schedule the same AsyncTask instance twice");
}
$id = $this->nextId();
$task->setTaskId($id);
$task->progressUpdates = new \Threaded;
$this->asyncPool->submitTaskToWorker($task, $worker);
}
public function getAsyncTaskPoolSize() : int{
return $this->asyncPool->getSize();
}
public function increaseAsyncTaskPoolSize(int $newSize){
$this->asyncPool->increaseSize($newSize);
}
/**
* @param Task $task
* @param int $delay
@ -169,7 +121,6 @@ class ServerScheduler{
$task->cancel();
}
$this->tasks = [];
$this->asyncPool->removeTasks();
while(!$this->queue->isEmpty()){
$this->queue->extract();
}
@ -228,7 +179,6 @@ class ServerScheduler{
public function shutdown() : void{
$this->cancelAllTasks();
$this->asyncPool->shutdown();
}
/**
@ -260,8 +210,6 @@ class ServerScheduler{
unset($this->tasks[$task->getTaskId()]);
}
}
$this->asyncPool->collectTasks();
}
private function isReady(int $currentTicks) : bool{

View File

@ -149,7 +149,7 @@ class AutoUpdater{
* Schedules an AsyncTask to check for an update.
*/
public function doCheck(){
$this->server->getScheduler()->scheduleAsyncTask(new UpdateCheckTask($this->endpoint, $this->getChannel()));
$this->server->getAsyncPool()->submitTask(new UpdateCheckTask($this->endpoint, $this->getChannel()));
}
/**

View File

@ -217,7 +217,7 @@ class Config{
}
if($async){
Server::getInstance()->getScheduler()->scheduleAsyncTask(new FileWriteTask($this->file, $content));
Server::getInstance()->getAsyncPool()->submitTask(new FileWriteTask($this->file, $content));
}else{
file_put_contents($this->file, $content);
}

@ -1 +1 @@
Subproject commit e60eba0324e4540abae317630ed758d3814e0137
Subproject commit c54e6732b489b4124073eb6714c4fc594faf9c80