Consolidate worker data under AsyncPoolWorkerEntry

instead of having a bunch of arrays... this improves the system integrity and makes it less obnoxious to look at
This commit is contained in:
Dylan K. Taylor 2023-05-23 01:31:25 +01:00
parent ed64eac76f
commit cbda24d77e
No known key found for this signature in database
GPG Key ID: 8927471A91CAFD3D
2 changed files with 43 additions and 42 deletions

View File

@ -44,22 +44,11 @@ use const PHP_INT_MAX;
class AsyncPool{
private const WORKER_START_OPTIONS = NativeThread::INHERIT_INI | NativeThread::INHERIT_COMMENTS;
/**
* @var \SplQueue[]|AsyncTask[][]
* @phpstan-var array<int, \SplQueue<AsyncTask>>
*/
private array $taskQueues = [];
/**
* @var AsyncPoolWorkerEntry[]
* @phpstan-var array<int, AsyncPoolWorkerEntry>
*/
private array $workers = [];
/**
* @var int[]
* @phpstan-var array<int, int>
*/
private array $workerLastUsed = [];
/**
* @var \Closure[]
@ -129,23 +118,21 @@ class AsyncPool{
* Fetches the worker with the specified ID, starting it if it does not exist, and firing any registered worker
* start hooks.
*/
private function getWorker(int $worker) : AsyncWorker{
if(!isset($this->workers[$worker])){
$sleeperEntry = $this->eventLoop->addNotifier(function() use ($worker) : void{
$this->collectTasksFromWorker($worker);
private function getWorker(int $workerId) : AsyncPoolWorkerEntry{
if(!isset($this->workers[$workerId])){
$sleeperEntry = $this->eventLoop->addNotifier(function() use ($workerId) : void{
$this->collectTasksFromWorker($workerId);
});
$this->workers[$worker] = new AsyncPoolWorkerEntry(new AsyncWorker($this->logger, $worker, $this->workerMemoryLimit, $sleeperEntry), $sleeperEntry->getNotifierId());
$this->workers[$worker]->worker->setClassLoaders([$this->classLoader]);
$this->workers[$worker]->worker->start(self::WORKER_START_OPTIONS);
$this->taskQueues[$worker] = new \SplQueue();
$this->workers[$workerId] = new AsyncPoolWorkerEntry(new AsyncWorker($this->logger, $workerId, $this->workerMemoryLimit, $sleeperEntry), $sleeperEntry->getNotifierId());
$this->workers[$workerId]->worker->setClassLoaders([$this->classLoader]);
$this->workers[$workerId]->worker->start(self::WORKER_START_OPTIONS);
foreach($this->workerStartHooks as $hook){
$hook($worker);
$hook($workerId);
}
}
return $this->workers[$worker]->worker;
return $this->workers[$workerId];
}
/**
@ -162,9 +149,7 @@ class AsyncPool{
$task->progressUpdates = new ThreadSafeArray();
$task->setSubmitted();
$this->getWorker($worker)->stack($task);
$this->taskQueues[$worker]->enqueue($task);
$this->workerLastUsed[$worker] = time();
$this->getWorker($worker)->submit($task);
}
/**
@ -177,8 +162,8 @@ class AsyncPool{
public function selectWorker() : int{
$worker = null;
$minUsage = PHP_INT_MAX;
foreach($this->taskQueues as $i => $queue){
if(($usage = $queue->count()) < $minUsage){
foreach($this->workers as $i => $entry){
if(($usage = $entry->tasks->count()) < $minUsage){
$worker = $i;
$minUsage = $usage;
if($usage === 0){
@ -221,13 +206,13 @@ class AsyncPool{
* @return bool whether there are tasks left to be collected
*/
public function collectTasks() : bool{
foreach($this->taskQueues as $worker => $queue){
$this->collectTasksFromWorker($worker);
foreach($this->workers as $workerId => $entry){
$this->collectTasksFromWorker($workerId);
}
//we check this in a second loop, because task collection could have caused new tasks to be added to the queues
foreach($this->taskQueues as $queue){
if(!$queue->isEmpty()){
foreach($this->workers as $entry){
if(!$entry->tasks->isEmpty()){
return true;
}
}
@ -235,10 +220,10 @@ class AsyncPool{
}
public function collectTasksFromWorker(int $worker) : bool{
if(!isset($this->taskQueues[$worker])){
if(!isset($this->workers[$worker])){
throw new \InvalidArgumentException("No such worker $worker");
}
$queue = $this->taskQueues[$worker];
$queue = $this->workers[$worker]->tasks;
$more = false;
while(!$queue->isEmpty()){
/** @var AsyncTask $task */
@ -279,17 +264,17 @@ class AsyncPool{
* @phpstan-return array<int, int>
*/
public function getTaskQueueSizes() : array{
return array_map(function(\SplQueue $queue) : int{ return $queue->count(); }, $this->taskQueues);
return array_map(function(AsyncPoolWorkerEntry $entry) : int{ return $entry->tasks->count(); }, $this->workers);
}
public function shutdownUnusedWorkers() : int{
$ret = 0;
$time = time();
foreach($this->taskQueues as $i => $queue){
if((!isset($this->workerLastUsed[$i]) || $this->workerLastUsed[$i] + 300 < $time) && $queue->isEmpty()){
$this->workers[$i]->worker->quit();
$this->eventLoop->removeNotifier($this->workers[$i]->sleeperNotifierId);
unset($this->workers[$i], $this->taskQueues[$i], $this->workerLastUsed[$i]);
foreach($this->workers as $i => $entry){
if($entry->lastUsed + 300 < $time && $entry->tasks->isEmpty()){
$entry->worker->quit();
$this->eventLoop->removeNotifier($entry->sleeperNotifierId);
unset($this->workers[$i]);
$ret++;
}
}
@ -310,7 +295,5 @@ class AsyncPool{
$this->eventLoop->removeNotifier($worker->sleeperNotifierId);
}
$this->workers = [];
$this->taskQueues = [];
$this->workerLastUsed = [];
}
}

View File

@ -23,10 +23,28 @@ declare(strict_types=1);
namespace pocketmine\scheduler;
use function time;
final class AsyncPoolWorkerEntry{
public int $lastUsed;
/**
* @var \SplQueue|AsyncTask[]
* @phpstan-var \SplQueue<AsyncTask>
*/
public \SplQueue $tasks;
public function __construct(
public readonly AsyncWorker $worker,
public readonly int $sleeperNotifierId
){}
){
$this->lastUsed = time();
$this->tasks = new \SplQueue();
}
public function submit(AsyncTask $task) : void{
$this->tasks->enqueue($task);
$this->lastUsed = time();
$this->worker->stack($task);
}
}