AsyncPool: Lazy-start AsyncWorkers when they are needed only

This changes how the AsyncPool works so that it does not immediately always start all of the workers in the pool.
Instead, workers will be started only when an idle worker was not found.
This allows for significant memory footprint reductions while idle.

In effect the async-workers setting in pocketmine.yml now dictates a _maximum_ pool size, not a fixed pool size.
This commit is contained in:
Dylan K. Taylor 2018-06-11 10:23:46 +01:00
parent 4b221c0601
commit 88d83e0fca
5 changed files with 95 additions and 41 deletions

View File

@ -243,7 +243,7 @@ class MemoryManager{
if($this->garbageCollectionAsync){
$pool = $this->server->getAsyncPool();
for($i = 0, $size = $pool->getSize(); $i < $size; ++$i){
foreach($pool->getRunningWorkers() as $i){
$pool->submitTaskToWorker(new GarbageCollectionTask(), $i);
}
}
@ -268,7 +268,7 @@ class MemoryManager{
if($this->dumpWorkers){
$pool = $this->server->getAsyncPool();
for($i = 0, $size = $pool->getSize(); $i < $size; ++$i){
foreach($pool->getRunningWorkers() as $i){
$pool->submitTaskToWorker(new DumpWorkerMemoryTask($outputFolder, $maxNesting, $maxStringSize), $i);
}
}

View File

@ -1044,8 +1044,6 @@ class Server{
$this->levels[$level->getId()] = $level;
$level->initLevel();
$this->getPluginManager()->callEvent(new LevelLoadEvent($level));
$level->setTickRate($this->baseTickRate);
@ -1090,8 +1088,6 @@ class Server{
$level = new Level($this, $name, new $providerClass($path));
$this->levels[$level->getId()] = $level;
$level->initLevel();
$level->setTickRate($this->baseTickRate);
}catch(\Throwable $e){
$this->logger->error($this->getLanguage()->translateString("pocketmine.level.generationError", [$name, $e->getMessage()]));

View File

@ -245,7 +245,8 @@ class Level implements ChunkManager, Metadatable{
/** @var bool */
private $closed = false;
/** @var \Closure */
private $asyncPoolStartHook;
public static function chunkHash(int $x, int $z) : int{
return (($x & 0xFFFFFFFF) << 32) | ($z & 0xFFFFFFFF);
@ -362,6 +363,10 @@ class Level implements ChunkManager, Metadatable{
$this->temporalPosition = new Position(0, 0, 0, $this);
$this->temporalVector = new Vector3(0, 0, 0);
$this->tickRate = 1;
$this->server->getAsyncPool()->addWorkerStartHook($this->asyncPoolStartHook = function(int $worker) : void{
$this->registerGeneratorToWorker($worker);
});
}
public function getTickRate() : int{
@ -376,21 +381,14 @@ class Level implements ChunkManager, Metadatable{
$this->tickRate = $tickRate;
}
public function initLevel(){
$this->registerGenerator();
}
public function registerGenerator(){
$pool = $this->server->getAsyncPool();
for($i = 0, $size = $pool->getSize(); $i < $size; ++$i){
$pool->submitTaskToWorker(new GeneratorRegisterTask($this, $this->generator, $this->provider->getGeneratorOptions()), $i);
}
public function registerGeneratorToWorker(int $worker) : void{
$this->server->getAsyncPool()->submitTaskToWorker(new GeneratorRegisterTask($this, $this->generator, $this->provider->getGeneratorOptions()), $worker);
}
public function unregisterGenerator(){
$pool = $this->server->getAsyncPool();
for($i = 0, $size = $pool->getSize(); $i < $size; ++$i){
$pool->removeWorkerStartHook($this->asyncPoolStartHook);
foreach($pool->getRunningWorkers() as $i){
$pool->submitTaskToWorker(new GeneratorUnregisterTask($this), $i);
}
}

View File

@ -139,7 +139,7 @@ class PopulationTask extends AsyncTask{
$level = $server->getLevel($this->levelId);
if($level !== null){
if(!$this->state){
$level->registerGenerator();
$level->registerGeneratorToWorker($this->worker->getAsyncWorkerId() - 1);
}
$chunk = Chunk::fastDeserialize($this->chunk);

View File

@ -52,19 +52,15 @@ class AsyncPool{
/** @var int[] */
private $workerUsage = [];
/** @var \Closure[] */
private $workerStartHooks = [];
public function __construct(Server $server, int $size, int $workerMemoryLimit, \ClassLoader $classLoader, \ThreadedLogger $logger){
$this->server = $server;
$this->size = $size;
$this->workerMemoryLimit = $workerMemoryLimit;
$this->classLoader = $classLoader;
$this->logger = $logger;
for($i = 0; $i < $this->size; ++$i){
$this->workerUsage[$i] = 0;
$this->workers[$i] = new AsyncWorker($this->logger, $i + 1, $this->workerMemoryLimit);
$this->workers[$i]->setClassLoader($this->classLoader);
$this->workers[$i]->start(self::WORKER_START_OPTIONS);
}
}
public function getSize() : int{
@ -73,16 +69,66 @@ class AsyncPool{
public function increaseSize(int $newSize){
if($newSize > $this->size){
for($i = $this->size; $i < $newSize; ++$i){
$this->workerUsage[$i] = 0;
$this->workers[$i] = new AsyncWorker($this->logger, $i + 1, $this->workerMemoryLimit);
$this->workers[$i]->setClassLoader($this->classLoader);
$this->workers[$i]->start(self::WORKER_START_OPTIONS);
}
$this->size = $newSize;
}
}
/**
* Registers a Closure callback to be fired whenever a new worker is started by the pool.
* The signature should be `function(int $worker) : void`
*
* This function will call the hook for every already-running worker.
*
* @param \Closure $hook
*/
public function addWorkerStartHook(\Closure $hook) : void{
$this->workerStartHooks[spl_object_hash($hook)] = $hook;
foreach($this->workers as $i => $worker){
$hook($i);
}
}
/**
* Removes a previously-registered callback listening for workers being started.
*
* @param \Closure $hook
*/
public function removeWorkerStartHook(\Closure $hook) : void{
unset($this->workerStartHooks[spl_object_hash($hook)]);
}
/**
* Returns an array of IDs of currently running workers.
*
* @return int[]
*/
public function getRunningWorkers() : array{
return array_keys($this->workers);
}
/**
* Fetches the worker with the specified ID, starting it if it does not exist, and firing any registered worker
* start hooks.
*
* @param int $worker
*
* @return AsyncWorker
*/
private function getWorker(int $worker) : AsyncWorker{
if(!isset($this->workers[$worker])){
$this->workerUsage[$worker] = 0;
$this->workers[$worker] = new AsyncWorker($this->logger, $worker + 1, $this->workerMemoryLimit);
$this->workers[$worker]->setClassLoader($this->classLoader);
$this->workers[$worker]->start(self::WORKER_START_OPTIONS);
foreach($this->workerStartHooks as $hook){
$hook($worker);
}
}
return $this->workers[$worker];
}
public function submitTaskToWorker(AsyncTask $task, int $worker){
if($worker < 0 or $worker >= $this->size){
throw new \InvalidArgumentException("Invalid worker $worker");
@ -96,7 +142,7 @@ class AsyncPool{
$this->tasks[$task->getTaskId()] = $task;
$this->workers[$worker]->stack($task);
$this->getWorker($worker)->stack($task);
$this->workerUsage[$worker]++;
$this->taskWorkers[$task->getTaskId()] = $worker;
}
@ -106,17 +152,31 @@ class AsyncPool{
throw new \InvalidArgumentException("Cannot submit the same AsyncTask instance more than once");
}
$selectedWorker = mt_rand(0, $this->size - 1);
$selectedTasks = $this->workerUsage[$selectedWorker];
for($i = 0; $i < $this->size; ++$i){
if($this->workerUsage[$i] < $selectedTasks){
$selectedWorker = $i;
$selectedTasks = $this->workerUsage[$i];
$worker = null;
$minUsage = PHP_INT_MAX;
foreach($this->workerUsage as $i => $usage){
if($usage < $minUsage){
$worker = $i;
$minUsage = $usage;
if($usage === 0){
break;
}
}
}
if($worker === null or ($minUsage > 0 and count($this->workers) < $this->size)){
//select a worker to start on the fly
for($i = 0; $i < $this->size; ++$i){
if(!isset($this->workers[$i])){
$worker = $i;
break;
}
}
}
$this->submitTaskToWorker($task, $selectedWorker);
return $selectedWorker;
assert($worker !== null);
$this->submitTaskToWorker($task, $worker);
return $worker;
}
private function removeTask(AsyncTask $task, bool $force = false){