AsyncPool: Switch to a more efficient collection algorithm, revamp internals (#2493)

This greatly improves GC performance by being more intelligent about how it collects garbage tasks. It knows that if X task in the queue is not finished, none of the tasks behind it can be finished either, so there's no point checking them.

This also presents the opportunity to cleanup a lot of async pool internals, so I've taken it and torched a lot of garbage.
This commit is contained in:
Dylan K. Taylor 2018-10-25 15:36:38 +01:00 committed by GitHub
parent a7eaec13b9
commit 212d72657a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 64 additions and 93 deletions

View File

@ -39,17 +39,11 @@ class AsyncPool{
/** @var int */
private $workerMemoryLimit;
/** @var AsyncTask[] */
private $tasks = [];
/** @var int[] */
private $taskWorkers = [];
/** @var int */
private $nextTaskId = 1;
/** @var \SplQueue[]|AsyncTask[][] */
private $taskQueues = [];
/** @var AsyncWorker[] */
private $workers = [];
/** @var int[] */
private $workerUsage = [];
/** @var \Closure[] */
private $workerStartHooks = [];
@ -124,11 +118,13 @@ class AsyncPool{
*/
private function getWorker(int $worker) : AsyncWorker{
if(!isset($this->workers[$worker])){
$this->workerUsage[$worker] = 0;
$this->workers[$worker] = new AsyncWorker($this->logger, $worker, $this->workerMemoryLimit);
$this->workers[$worker]->setClassLoader($this->classLoader);
$this->workers[$worker]->start(self::WORKER_START_OPTIONS);
$this->taskQueues[$worker] = new \SplQueue();
foreach($this->workerStartHooks as $hook){
$hook($worker);
}
@ -147,18 +143,15 @@ class AsyncPool{
if($worker < 0 or $worker >= $this->size){
throw new \InvalidArgumentException("Invalid worker $worker");
}
if($task->getTaskId() !== null){
if($task->isSubmitted()){
throw new \InvalidArgumentException("Cannot submit the same AsyncTask instance more than once");
}
$task->progressUpdates = new \Threaded;
$task->setTaskId($this->nextTaskId++);
$this->tasks[$task->getTaskId()] = $task;
$task->setSubmitted();
$this->getWorker($worker)->stack($task);
$this->workerUsage[$worker]++;
$this->taskWorkers[$task->getTaskId()] = $worker;
$this->taskQueues[$worker]->enqueue($task);
}
/**
@ -173,8 +166,8 @@ class AsyncPool{
public function selectWorker() : int{
$worker = null;
$minUsage = PHP_INT_MAX;
foreach($this->workerUsage as $i => $usage){
if($usage < $minUsage){
foreach($this->taskQueues as $i => $queue){
if(($usage = $queue->count()) < $minUsage){
$worker = $i;
$minUsage = $usage;
if($usage === 0){
@ -205,7 +198,7 @@ class AsyncPool{
* @return int
*/
public function submitTask(AsyncTask $task) : int{
if($task->getTaskId() !== null){
if($task->isSubmitted()){
throw new \InvalidArgumentException("Cannot submit the same AsyncTask instance more than once");
}
@ -214,77 +207,58 @@ class AsyncPool{
return $worker;
}
/**
* Removes a completed or crashed task from the pool.
*
* @param AsyncTask $task
* @param bool $force
*/
private function removeTask(AsyncTask $task, bool $force = false) : void{
if(isset($this->taskWorkers[$task->getTaskId()])){
if(!$force and ($task->isRunning() or !$task->isGarbage())){
return;
}
$this->workerUsage[$this->taskWorkers[$task->getTaskId()]]--;
}
unset($this->tasks[$task->getTaskId()]);
unset($this->taskWorkers[$task->getTaskId()]);
}
/**
* Collects garbage from running workers.
*/
private function collectWorkers() : void{
foreach($this->workers as $worker){
$worker->collect();
}
}
/**
* Collects finished and/or crashed tasks from the workers, firing their on-completion hooks where appropriate.
*
* @throws \ReflectionException
*/
public function collectTasks() : void{
foreach($this->tasks as $task){
$task->checkProgressUpdates();
if($task->isGarbage() and !$task->isRunning() and !$task->isCrashed()){
if(!$task->hasCancelledRun()){
try{
/*
* It's possible for a task to submit a progress update and then finish before the progress
* update is detected by the parent thread, so here we consume any missed updates.
*
* When this happens, it's possible for a progress update to arrive between the previous
* checkProgressUpdates() call and the next isGarbage() call, causing progress updates to be
* lost. Thus, it's necessary to do one last check here to make sure all progress updates have
* been consumed before completing.
*/
$task->checkProgressUpdates();
$task->onCompletion();
}catch(\Throwable $e){
$this->logger->critical("Could not execute completion of asynchronous task " . (new \ReflectionClass($task))->getShortName() . ": " . $e->getMessage());
$this->logger->logException($e);
}
}
foreach($this->taskQueues as $worker => $queue){
$doGC = false;
while(!$queue->isEmpty()){
/** @var AsyncTask $task */
$task = $queue->bottom();
$task->checkProgressUpdates();
if(!$task->isRunning() and $task->isGarbage()){ //make sure the task actually executed before trying to collect
$doGC = true;
$queue->dequeue();
$this->removeTask($task);
}elseif($task->isCrashed()){
$this->logger->critical("Could not execute asynchronous task " . (new \ReflectionClass($task))->getShortName() . ": Task crashed");
$this->removeTask($task, true);
if($task->isCrashed()){
$this->logger->critical("Could not execute asynchronous task " . (new \ReflectionClass($task))->getShortName() . ": Task crashed");
}elseif(!$task->hasCancelledRun()){
try{
/*
* It's possible for a task to submit a progress update and then finish before the progress
* update is detected by the parent thread, so here we consume any missed updates.
*
* When this happens, it's possible for a progress update to arrive between the previous
* checkProgressUpdates() call and the next isGarbage() call, causing progress updates to be
* lost. Thus, it's necessary to do one last check here to make sure all progress updates have
* been consumed before completing.
*/
$task->checkProgressUpdates();
$task->onCompletion();
}catch(\Throwable $e){
$this->logger->critical("Could not execute completion of asynchronous task " . (new \ReflectionClass($task))->getShortName() . ": " . $e->getMessage());
$this->logger->logException($e);
}
}
}else{
break; //current task is still running, skip to next worker
}
}
if($doGC){
$this->workers[$worker]->collect();
}
}
$this->collectWorkers();
}
public function shutdownUnusedWorkers() : int{
$ret = 0;
foreach($this->workerUsage as $i => $usage){
if($usage === 0){
foreach($this->taskQueues as $i => $queue){
if($queue->isEmpty()){
$this->workers[$i]->quit();
unset($this->workers[$i], $this->workerUsage[$i]);
unset($this->workers[$i], $this->taskQueues[$i]);
$ret++;
}
}
@ -301,24 +275,21 @@ class AsyncPool{
foreach($this->workers as $worker){
/** @var AsyncTask $task */
while(($task = $worker->unstack()) !== null){
//cancelRun() is not strictly necessary here, but it might be used to inform plugins of the task state
//(i.e. it never executed).
$task->cancelRun();
$this->removeTask($task, true);
//NOOP: the below loop will deal with marking tasks as garbage
}
}
foreach($this->tasks as $task){
$task->cancelRun();
$this->removeTask($task, true);
foreach($this->taskQueues as $queue){
while(!$queue->isEmpty()){
/** @var AsyncTask $task */
$task = $queue->dequeue();
$task->cancelRun();
}
}
$this->taskWorkers = [];
$this->tasks = [];
foreach($this->workers as $worker){
$worker->quit();
}
$this->workers = [];
$this->workerUsage = [];
$this->taskQueues = [];
}
}

View File

@ -59,8 +59,8 @@ abstract class AsyncTask extends Collectable{
private $result = null;
private $serialized = false;
private $cancelRun = false;
/** @var int|null */
private $taskId = null;
/** @var bool */
private $submitted = false;
private $crashed = false;
@ -114,15 +114,15 @@ abstract class AsyncTask extends Collectable{
$this->serialized = $serialize;
}
public function setTaskId(int $taskId) : void{
$this->taskId = $taskId;
public function setSubmitted() : void{
$this->submitted = true;
}
/**
* @return int|null
* @return bool
*/
public function getTaskId() : ?int{
return $this->taskId;
public function isSubmitted() : bool{
return $this->submitted;
}
/**