mirror of
https://github.com/pmmp/PocketMine-MP.git
synced 2025-06-24 20:34:02 +00:00
AsyncPool: more documentation
This commit is contained in:
parent
88d83e0fca
commit
7d5b3079bc
@ -25,6 +25,10 @@ namespace pocketmine\scheduler;
|
|||||||
|
|
||||||
use pocketmine\Server;
|
use pocketmine\Server;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Manages general-purpose worker threads used for processing asynchronous tasks, and the tasks submitted to those
|
||||||
|
* workers.
|
||||||
|
*/
|
||||||
class AsyncPool{
|
class AsyncPool{
|
||||||
private const WORKER_START_OPTIONS = PTHREADS_INHERIT_INI | PTHREADS_INHERIT_CONSTANTS;
|
private const WORKER_START_OPTIONS = PTHREADS_INHERIT_INI | PTHREADS_INHERIT_CONSTANTS;
|
||||||
|
|
||||||
@ -63,10 +67,20 @@ class AsyncPool{
|
|||||||
$this->logger = $logger;
|
$this->logger = $logger;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the maximum size of the pool. Note that there may be less active workers than this number.
|
||||||
|
*
|
||||||
|
* @return int
|
||||||
|
*/
|
||||||
public function getSize() : int{
|
public function getSize() : int{
|
||||||
return $this->size;
|
return $this->size;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Increases the maximum size of the pool to the specified amount. This does not immediately start new workers.
|
||||||
|
*
|
||||||
|
* @param int $newSize
|
||||||
|
*/
|
||||||
public function increaseSize(int $newSize){
|
public function increaseSize(int $newSize){
|
||||||
if($newSize > $this->size){
|
if($newSize > $this->size){
|
||||||
$this->size = $newSize;
|
$this->size = $newSize;
|
||||||
@ -129,6 +143,12 @@ class AsyncPool{
|
|||||||
return $this->workers[$worker];
|
return $this->workers[$worker];
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Submits an AsyncTask to an arbitrary worker.
|
||||||
|
*
|
||||||
|
* @param AsyncTask $task
|
||||||
|
* @param int $worker
|
||||||
|
*/
|
||||||
public function submitTaskToWorker(AsyncTask $task, int $worker){
|
public function submitTaskToWorker(AsyncTask $task, int $worker){
|
||||||
if($worker < 0 or $worker >= $this->size){
|
if($worker < 0 or $worker >= $this->size){
|
||||||
throw new \InvalidArgumentException("Invalid worker $worker");
|
throw new \InvalidArgumentException("Invalid worker $worker");
|
||||||
@ -147,6 +167,14 @@ class AsyncPool{
|
|||||||
$this->taskWorkers[$task->getTaskId()] = $worker;
|
$this->taskWorkers[$task->getTaskId()] = $worker;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Submits an AsyncTask to the worker with the least load. If all workers are busy and the pool is not full, a new
|
||||||
|
* worker may be started.
|
||||||
|
*
|
||||||
|
* @param AsyncTask $task
|
||||||
|
*
|
||||||
|
* @return int
|
||||||
|
*/
|
||||||
public function submitTask(AsyncTask $task) : int{
|
public function submitTask(AsyncTask $task) : int{
|
||||||
if($task->getTaskId() !== null){
|
if($task->getTaskId() !== null){
|
||||||
throw new \InvalidArgumentException("Cannot submit the same AsyncTask instance more than once");
|
throw new \InvalidArgumentException("Cannot submit the same AsyncTask instance more than once");
|
||||||
@ -179,6 +207,12 @@ class AsyncPool{
|
|||||||
return $worker;
|
return $worker;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Removes a completed or crashed task from the pool.
|
||||||
|
*
|
||||||
|
* @param AsyncTask $task
|
||||||
|
* @param bool $force
|
||||||
|
*/
|
||||||
private function removeTask(AsyncTask $task, bool $force = false){
|
private function removeTask(AsyncTask $task, bool $force = false){
|
||||||
if(isset($this->taskWorkers[$task->getTaskId()])){
|
if(isset($this->taskWorkers[$task->getTaskId()])){
|
||||||
if(!$force and ($task->isRunning() or !$task->isGarbage())){
|
if(!$force and ($task->isRunning() or !$task->isGarbage())){
|
||||||
@ -191,6 +225,10 @@ class AsyncPool{
|
|||||||
unset($this->taskWorkers[$task->getTaskId()]);
|
unset($this->taskWorkers[$task->getTaskId()]);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Removes all tasks from the pool, cancelling where possible. This will block until all tasks have been
|
||||||
|
* successfully deleted.
|
||||||
|
*/
|
||||||
public function removeTasks(){
|
public function removeTasks(){
|
||||||
foreach($this->workers as $worker){
|
foreach($this->workers as $worker){
|
||||||
/** @var AsyncTask $task */
|
/** @var AsyncTask $task */
|
||||||
@ -222,12 +260,20 @@ class AsyncPool{
|
|||||||
$this->collectWorkers();
|
$this->collectWorkers();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Collects garbage from running workers.
|
||||||
|
*/
|
||||||
private function collectWorkers(){
|
private function collectWorkers(){
|
||||||
foreach($this->workers as $worker){
|
foreach($this->workers as $worker){
|
||||||
$worker->collect();
|
$worker->collect();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Collects finished and/or crashed tasks from the workers, firing their on-completion hooks where appropriate.
|
||||||
|
*
|
||||||
|
* @throws \ReflectionException
|
||||||
|
*/
|
||||||
public function collectTasks(){
|
public function collectTasks(){
|
||||||
foreach($this->tasks as $task){
|
foreach($this->tasks as $task){
|
||||||
if(!$task->isGarbage()){
|
if(!$task->isGarbage()){
|
||||||
@ -258,6 +304,9 @@ class AsyncPool{
|
|||||||
$this->collectWorkers();
|
$this->collectWorkers();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Cancels all pending tasks and shuts down all the workers in the pool.
|
||||||
|
*/
|
||||||
public function shutdown() : void{
|
public function shutdown() : void{
|
||||||
$this->collectTasks();
|
$this->collectTasks();
|
||||||
$this->removeTasks();
|
$this->removeTasks();
|
||||||
|
Loading…
x
Reference in New Issue
Block a user