Use Snooze to improve AsyncTask collection times

regardless of how long an async task takes to run, it will take a multiple of 50ms to get the result processed. This delay causes issues in some cases for stuff like generation, which causes locking of adjacent chunks, and async packet compression, which experiences elevated latency because of this problem.
This is not an ideal solution for packet compression since it will cause the sleeper handler to get hammered, but since it's already getting hammered by every packet from RakLib, I don't think that's a big problem.
This commit is contained in:
Dylan K. Taylor 2020-12-02 19:34:34 +00:00
parent 1775fb669b
commit 29f6ed3f68
6 changed files with 73 additions and 36 deletions

View File

@ -873,7 +873,7 @@ class Server{
$poolSize = max(1, (int) $poolSize);
}
$this->asyncPool = new AsyncPool($poolSize, max(-1, (int) $this->configGroup->getProperty("memory.async-worker-hard-limit", 256)), $this->autoloader, $this->logger);
$this->asyncPool = new AsyncPool($poolSize, max(-1, (int) $this->configGroup->getProperty("memory.async-worker-hard-limit", 256)), $this->autoloader, $this->logger, $this->tickSleeper);
$netCompressionThreshold = -1;
if($this->configGroup->getProperty("network.batch-threshold", 256) >= 0){

View File

@ -23,6 +23,8 @@ declare(strict_types=1);
namespace pocketmine\scheduler;
use pocketmine\snooze\SleeperHandler;
use pocketmine\snooze\SleeperNotifier;
use pocketmine\utils\Utils;
use function array_keys;
use function array_map;
@ -73,11 +75,15 @@ class AsyncPool{
*/
private $workerStartHooks = [];
public function __construct(int $size, int $workerMemoryLimit, \ClassLoader $classLoader, \ThreadedLogger $logger){
/** @var SleeperHandler */
private $eventLoop;
public function __construct(int $size, int $workerMemoryLimit, \ClassLoader $classLoader, \ThreadedLogger $logger, SleeperHandler $eventLoop){
$this->size = $size;
$this->workerMemoryLimit = $workerMemoryLimit;
$this->classLoader = $classLoader;
$this->logger = $logger;
$this->eventLoop = $eventLoop;
}
/**
@ -136,8 +142,11 @@ class AsyncPool{
*/
private function getWorker(int $worker) : AsyncWorker{
if(!isset($this->workers[$worker])){
$this->workers[$worker] = new AsyncWorker($this->logger, $worker, $this->workerMemoryLimit);
$notifier = new SleeperNotifier();
$this->workers[$worker] = new AsyncWorker($this->logger, $worker, $this->workerMemoryLimit, $notifier);
$this->eventLoop->addNotifier($notifier, function() use ($worker) : void{
$this->collectTasksFromWorker($worker);
});
$this->workers[$worker]->setClassLoader($this->classLoader);
$this->workers[$worker]->start(self::WORKER_START_OPTIONS);
@ -226,39 +235,49 @@ class AsyncPool{
public function collectTasks() : bool{
$more = false;
foreach($this->taskQueues as $worker => $queue){
$doGC = false;
while(!$queue->isEmpty()){
/** @var AsyncTask $task */
$task = $queue->bottom();
$task->checkProgressUpdates();
if($task->isFinished()){ //make sure the task actually executed before trying to collect
$doGC = true;
$queue->dequeue();
$more = $this->collectTasksFromWorker($worker) || $more;
}
return $more;
}
if($task->isCrashed()){
$this->logger->critical("Could not execute asynchronous task " . (new \ReflectionClass($task))->getShortName() . ": Task crashed");
$task->onError();
}elseif(!$task->hasCancelledRun()){
/*
* It's possible for a task to submit a progress update and then finish before the progress
* update is detected by the parent thread, so here we consume any missed updates.
*
* When this happens, it's possible for a progress update to arrive between the previous
* checkProgressUpdates() call and the next isGarbage() call, causing progress updates to be
* lost. Thus, it's necessary to do one last check here to make sure all progress updates have
* been consumed before completing.
*/
$task->checkProgressUpdates();
$task->onCompletion();
}
}else{
$more = true;
break; //current task is still running, skip to next worker
public function collectTasksFromWorker(int $worker) : bool{
if(!isset($this->taskQueues[$worker])){
throw new \InvalidArgumentException("No such worker $worker");
}
$queue = $this->taskQueues[$worker];
$doGC = false;
$more = false;
while(!$queue->isEmpty()){
/** @var AsyncTask $task */
$task = $queue->bottom();
$task->checkProgressUpdates();
if($task->isFinished()){ //make sure the task actually executed before trying to collect
$doGC = true;
$queue->dequeue();
if($task->isCrashed()){
$this->logger->critical("Could not execute asynchronous task " . (new \ReflectionClass($task))->getShortName() . ": Task crashed");
$task->onError();
}elseif(!$task->hasCancelledRun()){
/*
* It's possible for a task to submit a progress update and then finish before the progress
* update is detected by the parent thread, so here we consume any missed updates.
*
* When this happens, it's possible for a progress update to arrive between the previous
* checkProgressUpdates() call and the next isGarbage() call, causing progress updates to be
* lost. Thus, it's necessary to do one last check here to make sure all progress updates have
* been consumed before completing.
*/
$task->checkProgressUpdates();
$task->onCompletion();
}
}else{
$more = true;
break; //current task is still running, skip to next worker
}
if($doGC){
$this->workers[$worker]->collect();
}
}
if($doGC){
$this->workers[$worker]->collect();
}
return $more;
}
@ -279,6 +298,7 @@ class AsyncPool{
foreach($this->taskQueues as $i => $queue){
if((!isset($this->workerLastUsed[$i]) or $this->workerLastUsed[$i] + 300 < $time) and $queue->isEmpty()){
$this->workers[$i]->quit();
$this->eventLoop->removeNotifier($this->workers[$i]->getNotifier());
unset($this->workers[$i], $this->taskQueues[$i], $this->workerLastUsed[$i]);
$ret++;
}
@ -309,6 +329,7 @@ class AsyncPool{
foreach($this->workers as $worker){
$worker->quit();
$this->eventLoop->removeNotifier($worker->getNotifier());
}
$this->workers = [];
$this->taskQueues = [];

View File

@ -89,6 +89,7 @@ abstract class AsyncTask extends \Threaded{
}
$this->finished = true;
$this->worker->getNotifier()->wakeupSleeper();
}
public function isCrashed() : bool{

View File

@ -23,6 +23,7 @@ declare(strict_types=1);
namespace pocketmine\scheduler;
use pocketmine\snooze\SleeperNotifier;
use pocketmine\thread\Worker;
use function gc_enable;
use function ini_set;
@ -39,10 +40,18 @@ class AsyncWorker extends Worker{
/** @var int */
private $memoryLimit;
public function __construct(\ThreadedLogger $logger, int $id, int $memoryLimit){
/** @var SleeperNotifier */
private $notifier;
public function __construct(\ThreadedLogger $logger, int $id, int $memoryLimit, SleeperNotifier $notifier){
$this->logger = $logger;
$this->id = $id;
$this->memoryLimit = $memoryLimit;
$this->notifier = $notifier;
}
public function getNotifier() : SleeperNotifier{
return $this->notifier;
}
protected function onRun() : void{

View File

@ -460,6 +460,11 @@ parameters:
count: 1
path: ../../../src/scheduler/AsyncTask.php
-
message: "#^Cannot call method getNotifier\\(\\) on pocketmine\\\\scheduler\\\\AsyncWorker\\|null\\.$#"
count: 1
path: ../../../src/scheduler/AsyncTask.php
-
message: "#^Cannot call method count\\(\\) on ArrayObject\\<int, array\\<string, mixed\\>\\>\\|null\\.$#"
count: 1

View File

@ -24,6 +24,7 @@ declare(strict_types=1);
namespace pocketmine\scheduler;
use PHPUnit\Framework\TestCase;
use pocketmine\snooze\SleeperHandler;
use pocketmine\utils\MainLogger;
use pocketmine\utils\Terminal;
use function define;
@ -44,7 +45,7 @@ class AsyncPoolTest extends TestCase{
Terminal::init();
@define('pocketmine\\COMPOSER_AUTOLOADER_PATH', dirname(__DIR__, 3) . '/vendor/autoload.php');
$this->mainLogger = new MainLogger(tempnam(sys_get_temp_dir(), "pmlog"));
$this->pool = new AsyncPool(2, 1024, new \BaseClassLoader(), $this->mainLogger);
$this->pool = new AsyncPool(2, 1024, new \BaseClassLoader(), $this->mainLogger, new SleeperHandler());
}
public function tearDown() : void{