Removed pocketmine subdirectory, map PSR-4 style

This commit is contained in:
Dylan K. Taylor
2019-07-30 19:14:57 +01:00
parent 7a77d3dc30
commit 5499ac620c
1044 changed files with 3 additions and 3 deletions

307
src/scheduler/AsyncPool.php Normal file
View File

@@ -0,0 +1,307 @@
<?php
/*
*
* ____ _ _ __ __ _ __ __ ____
* | _ \ ___ ___| | _____| |_| \/ (_)_ __ ___ | \/ | _ \
* | |_) / _ \ / __| |/ / _ \ __| |\/| | | '_ \ / _ \_____| |\/| | |_) |
* | __/ (_) | (__| < __/ |_| | | | | | | | __/_____| | | | __/
* |_| \___/ \___|_|\_\___|\__|_| |_|_|_| |_|\___| |_| |_|_|
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* @author PocketMine Team
* @link http://www.pocketmine.net/
*
*
*/
declare(strict_types=1);
namespace pocketmine\scheduler;
use pocketmine\utils\Utils;
use function array_keys;
use function assert;
use function count;
use function spl_object_id;
use function time;
use const PHP_INT_MAX;
use const PTHREADS_INHERIT_CONSTANTS;
use const PTHREADS_INHERIT_INI;
/**
* Manages general-purpose worker threads used for processing asynchronous tasks, and the tasks submitted to those
* workers.
*/
class AsyncPool{
private const WORKER_START_OPTIONS = PTHREADS_INHERIT_INI | PTHREADS_INHERIT_CONSTANTS;
/** @var \ClassLoader */
private $classLoader;
/** @var \ThreadedLogger */
private $logger;
/** @var int */
protected $size;
/** @var int */
private $workerMemoryLimit;
/** @var \SplQueue[]|AsyncTask[][] */
private $taskQueues = [];
/** @var AsyncWorker[] */
private $workers = [];
/** @var int[] */
private $workerLastUsed = [];
/** @var \Closure[] */
private $workerStartHooks = [];
public function __construct(int $size, int $workerMemoryLimit, \ClassLoader $classLoader, \ThreadedLogger $logger){
$this->size = $size;
$this->workerMemoryLimit = $workerMemoryLimit;
$this->classLoader = $classLoader;
$this->logger = $logger;
}
/**
* Returns the maximum size of the pool. Note that there may be less active workers than this number.
*
* @return int
*/
public function getSize() : int{
return $this->size;
}
/**
* Increases the maximum size of the pool to the specified amount. This does not immediately start new workers.
*
* @param int $newSize
*/
public function increaseSize(int $newSize) : void{
if($newSize > $this->size){
$this->size = $newSize;
}
}
/**
* Registers a Closure callback to be fired whenever a new worker is started by the pool.
* The signature should be `function(int $worker) : void`
*
* This function will call the hook for every already-running worker.
*
* @param \Closure $hook
*/
public function addWorkerStartHook(\Closure $hook) : void{
Utils::validateCallableSignature(function(int $worker) : void{}, $hook);
$this->workerStartHooks[spl_object_id($hook)] = $hook;
foreach($this->workers as $i => $worker){
$hook($i);
}
}
/**
* Removes a previously-registered callback listening for workers being started.
*
* @param \Closure $hook
*/
public function removeWorkerStartHook(\Closure $hook) : void{
unset($this->workerStartHooks[spl_object_id($hook)]);
}
/**
* Returns an array of IDs of currently running workers.
*
* @return int[]
*/
public function getRunningWorkers() : array{
return array_keys($this->workers);
}
/**
* Fetches the worker with the specified ID, starting it if it does not exist, and firing any registered worker
* start hooks.
*
* @param int $worker
*
* @return AsyncWorker
*/
private function getWorker(int $worker) : AsyncWorker{
if(!isset($this->workers[$worker])){
$this->workers[$worker] = new AsyncWorker($this->logger, $worker, $this->workerMemoryLimit);
$this->workers[$worker]->setClassLoader($this->classLoader);
$this->workers[$worker]->start(self::WORKER_START_OPTIONS);
$this->taskQueues[$worker] = new \SplQueue();
foreach($this->workerStartHooks as $hook){
$hook($worker);
}
}
return $this->workers[$worker];
}
/**
* Submits an AsyncTask to an arbitrary worker.
*
* @param AsyncTask $task
* @param int $worker
*/
public function submitTaskToWorker(AsyncTask $task, int $worker) : void{
if($worker < 0 or $worker >= $this->size){
throw new \InvalidArgumentException("Invalid worker $worker");
}
if($task->isSubmitted()){
throw new \InvalidArgumentException("Cannot submit the same AsyncTask instance more than once");
}
$task->progressUpdates = new \Threaded;
$task->setSubmitted();
$this->getWorker($worker)->stack($task);
$this->taskQueues[$worker]->enqueue($task);
$this->workerLastUsed[$worker] = time();
}
/**
* Selects a worker ID to run a task.
*
* - if an idle worker is found, it will be selected
* - else, if the worker pool is not full, a new worker will be selected
* - else, the worker with the smallest backlog is chosen.
*
* @return int
*/
public function selectWorker() : int{
$worker = null;
$minUsage = PHP_INT_MAX;
foreach($this->taskQueues as $i => $queue){
if(($usage = $queue->count()) < $minUsage){
$worker = $i;
$minUsage = $usage;
if($usage === 0){
break;
}
}
}
if($worker === null or ($minUsage > 0 and count($this->workers) < $this->size)){
//select a worker to start on the fly
for($i = 0; $i < $this->size; ++$i){
if(!isset($this->workers[$i])){
$worker = $i;
break;
}
}
}
assert($worker !== null);
return $worker;
}
/**
* Submits an AsyncTask to the worker with the least load. If all workers are busy and the pool is not full, a new
* worker may be started.
*
* @param AsyncTask $task
*
* @return int
*/
public function submitTask(AsyncTask $task) : int{
if($task->isSubmitted()){
throw new \InvalidArgumentException("Cannot submit the same AsyncTask instance more than once");
}
$worker = $this->selectWorker();
$this->submitTaskToWorker($task, $worker);
return $worker;
}
/**
* Collects finished and/or crashed tasks from the workers, firing their on-completion hooks where appropriate.
*
* @throws \ReflectionException
*/
public function collectTasks() : void{
foreach($this->taskQueues as $worker => $queue){
$doGC = false;
while(!$queue->isEmpty()){
/** @var AsyncTask $task */
$task = $queue->bottom();
$task->checkProgressUpdates();
if($task->isFinished()){ //make sure the task actually executed before trying to collect
$doGC = true;
$queue->dequeue();
if($task->isCrashed()){
$this->logger->critical("Could not execute asynchronous task " . (new \ReflectionClass($task))->getShortName() . ": Task crashed");
$task->onError();
}elseif(!$task->hasCancelledRun()){
/*
* It's possible for a task to submit a progress update and then finish before the progress
* update is detected by the parent thread, so here we consume any missed updates.
*
* When this happens, it's possible for a progress update to arrive between the previous
* checkProgressUpdates() call and the next isGarbage() call, causing progress updates to be
* lost. Thus, it's necessary to do one last check here to make sure all progress updates have
* been consumed before completing.
*/
$task->checkProgressUpdates();
$task->onCompletion();
}
}else{
break; //current task is still running, skip to next worker
}
}
if($doGC){
$this->workers[$worker]->collect();
}
}
}
public function shutdownUnusedWorkers() : int{
$ret = 0;
$time = time();
foreach($this->taskQueues as $i => $queue){
if((!isset($this->workerLastUsed[$i]) or $this->workerLastUsed[$i] + 300 < $time) and $queue->isEmpty()){
$this->workers[$i]->quit();
unset($this->workers[$i], $this->taskQueues[$i], $this->workerLastUsed[$i]);
$ret++;
}
}
return $ret;
}
/**
* Cancels all pending tasks and shuts down all the workers in the pool.
*/
public function shutdown() : void{
$this->collectTasks();
foreach($this->workers as $worker){
/** @var AsyncTask $task */
while(($task = $worker->unstack()) !== null){
//NOOP: the below loop will deal with marking tasks as garbage
}
}
foreach($this->taskQueues as $queue){
while(!$queue->isEmpty()){
/** @var AsyncTask $task */
$task = $queue->dequeue();
$task->cancelRun();
}
}
foreach($this->workers as $worker){
$worker->quit();
}
$this->workers = [];
$this->taskQueues = [];
$this->workerLastUsed = [];
}
}

261
src/scheduler/AsyncTask.php Normal file
View File

@@ -0,0 +1,261 @@
<?php
/*
*
* ____ _ _ __ __ _ __ __ ____
* | _ \ ___ ___| | _____| |_| \/ (_)_ __ ___ | \/ | _ \
* | |_) / _ \ / __| |/ / _ \ __| |\/| | | '_ \ / _ \_____| |\/| | |_) |
* | __/ (_) | (__| < __/ |_| | | | | | | | __/_____| | | | __/
* |_| \___/ \___|_|\_\___|\__|_| |_|_|_| |_|\___| |_| |_|_|
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* @author PocketMine Team
* @link http://www.pocketmine.net/
*
*
*/
declare(strict_types=1);
namespace pocketmine\scheduler;
use function is_scalar;
use function serialize;
use function spl_object_id;
use function unserialize;
/**
* Class used to run async tasks in other threads.
*
* An AsyncTask does not have its own thread. It is queued into an AsyncPool and executed if there is an async worker
* with no AsyncTask running. Therefore, an AsyncTask SHOULD NOT execute for more than a few seconds. For tasks that
* run for a long time or infinitely, start another thread instead.
*
* WARNING: Any non-Threaded objects WILL BE SERIALIZED when assigned to members of AsyncTasks or other Threaded object.
* If later accessed from said Threaded object, you will be operating on a COPY OF THE OBJECT, NOT THE ORIGINAL OBJECT.
* If you want to store non-serializable objects to access when the task completes, store them using
* {@link AsyncTask::storeLocal}.
*
* WARNING: As of pthreads v3.1.6, arrays are converted to Volatile objects when assigned as members of Threaded objects.
* Keep this in mind when using arrays stored as members of your AsyncTask.
*
* WARNING: Do not call PocketMine-MP API methods from other Threads!!
*/
abstract class AsyncTask extends \Threaded{
/**
* @var \ArrayObject|mixed[] object hash => mixed data
*
* Used to store objects which are only needed on one thread and should not be serialized.
*/
private static $threadLocalStorage = null;
/** @var AsyncWorker $worker */
public $worker = null;
/** @var \Threaded */
public $progressUpdates;
private $result = null;
private $serialized = false;
private $cancelRun = false;
/** @var bool */
private $submitted = false;
private $crashed = false;
/** @var bool */
private $finished = false;
public function run() : void{
$this->result = null;
if(!$this->cancelRun){
try{
$this->onRun();
}catch(\Throwable $e){
$this->crashed = true;
$this->worker->handleException($e);
}
}
$this->finished = true;
}
public function isCrashed() : bool{
return $this->crashed or $this->isTerminated();
}
/**
* Returns whether this task has finished executing, whether successfully or not. This differs from isRunning()
* because it is not true prior to task execution.
*
* @return bool
*/
public function isFinished() : bool{
return $this->finished or $this->isCrashed();
}
/**
* @return mixed
*/
public function getResult(){
return $this->serialized ? unserialize($this->result) : $this->result;
}
public function cancelRun() : void{
$this->cancelRun = true;
}
public function hasCancelledRun() : bool{
return $this->cancelRun;
}
/**
* @return bool
*/
public function hasResult() : bool{
return $this->result !== null;
}
/**
* @param mixed $result
*/
public function setResult($result) : void{
$this->result = ($this->serialized = !is_scalar($result)) ? serialize($result) : $result;
}
public function setSubmitted() : void{
$this->submitted = true;
}
/**
* @return bool
*/
public function isSubmitted() : bool{
return $this->submitted;
}
/**
* Actions to execute when run
*/
abstract public function onRun() : void;
/**
* Actions to execute when completed (on main thread)
* Implement this if you want to handle the data in your AsyncTask after it has been processed
*/
public function onCompletion() : void{
}
/**
* Call this method from {@link AsyncTask::onRun} (AsyncTask execution thread) to schedule a call to
* {@link AsyncTask::onProgressUpdate} from the main thread with the given progress parameter.
*
* @param mixed $progress A value that can be safely serialize()'ed.
*/
public function publishProgress($progress) : void{
$this->progressUpdates[] = serialize($progress);
}
/**
* @internal Only call from AsyncPool.php on the main thread
*/
public function checkProgressUpdates() : void{
while($this->progressUpdates->count() !== 0){
$progress = $this->progressUpdates->shift();
$this->onProgressUpdate(unserialize($progress));
}
}
/**
* Called from the main thread after {@link AsyncTask::publishProgress} is called.
* All {@link AsyncTask::publishProgress} calls should result in {@link AsyncTask::onProgressUpdate} calls before
* {@link AsyncTask::onCompletion} is called.
*
* @param mixed $progress The parameter passed to {@link AsyncTask#publishProgress}. It is serialize()'ed
* and then unserialize()'ed, as if it has been cloned.
*/
public function onProgressUpdate($progress) : void{
}
/**
* Called from the main thread when the async task experiences an error during onRun(). Use this for things like
* promise rejection.
*/
public function onError() : void{
}
/**
* Saves mixed data in thread-local storage. Data stored using this storage is **only accessible from the thread it
* was stored on**. Data stored using this method will **not** be serialized.
* This can be used to store references to variables which you need later on on the same thread, but not others.
*
* For example, plugin references could be stored in the constructor of the async task (which is called on the main
* thread) using this, and then fetched in onCompletion() (which is also called on the main thread), without them
* becoming serialized.
*
* Scalar types can be stored directly in class properties instead of using this storage.
*
* Objects stored in this storage can be retrieved using fetchLocal() on the same thread that this method was called
* from.
*
* @param string $key
* @param mixed $complexData the data to store
*/
protected function storeLocal(string $key, $complexData) : void{
if(self::$threadLocalStorage === null){
/*
* It's necessary to use an object (not array) here because pthreads is stupid. Non-default array statics
* will be inherited when task classes are copied to the worker thread, which would cause unwanted
* inheritance of primitive thread-locals, which we really don't want for various reasons.
* It won't try to inherit objects though, so this is the easiest solution.
*/
self::$threadLocalStorage = new \ArrayObject();
}
self::$threadLocalStorage[spl_object_id($this)][$key] = $complexData;
}
/**
* Retrieves data stored in thread-local storage.
*
* If you used storeLocal(), you can use this on the same thread to fetch data stored. This should be used during
* onProgressUpdate() and onCompletion() to fetch thread-local data stored on the parent thread.
*
* @param string $key
*
* @return mixed
*
* @throws \InvalidArgumentException if no data were stored by this AsyncTask instance.
*/
protected function fetchLocal(string $key){
$id = spl_object_id($this);
if(self::$threadLocalStorage === null or !isset(self::$threadLocalStorage[$id][$key])){
throw new \InvalidArgumentException("No matching thread-local data found on this thread");
}
return self::$threadLocalStorage[$id][$key];
}
final public function __destruct(){
$this->reallyDestruct();
if(self::$threadLocalStorage !== null and isset(self::$threadLocalStorage[$h = spl_object_id($this)])){
unset(self::$threadLocalStorage[$h]);
if(self::$threadLocalStorage->count() === 0){
self::$threadLocalStorage = null;
}
}
}
/**
* Override this to do normal __destruct() cleanup from a child class.
*/
protected function reallyDestruct() : void{
}
}

View File

@@ -0,0 +1,120 @@
<?php
/*
*
* ____ _ _ __ __ _ __ __ ____
* | _ \ ___ ___| | _____| |_| \/ (_)_ __ ___ | \/ | _ \
* | |_) / _ \ / __| |/ / _ \ __| |\/| | | '_ \ / _ \_____| |\/| | |_) |
* | __/ (_) | (__| < __/ |_| | | | | | | | __/_____| | | | __/
* |_| \___/ \___|_|\_\___|\__|_| |_|_|_| |_|\___| |_| |_|_|
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* @author PocketMine Team
* @link http://www.pocketmine.net/
*
*
*/
declare(strict_types=1);
namespace pocketmine\scheduler;
use pocketmine\thread\Worker;
use function gc_enable;
use function ini_set;
class AsyncWorker extends Worker{
/** @var mixed[] */
private static $store = [];
private $logger;
private $id;
/** @var int */
private $memoryLimit;
public function __construct(\ThreadedLogger $logger, int $id, int $memoryLimit){
$this->logger = $logger;
$this->id = $id;
$this->memoryLimit = $memoryLimit;
}
protected function onRun() : void{
\GlobalLogger::set($this->logger);
gc_enable();
if($this->memoryLimit > 0){
ini_set('memory_limit', $this->memoryLimit . 'M');
$this->logger->debug("Set memory limit to " . $this->memoryLimit . " MB");
}else{
ini_set('memory_limit', '-1');
$this->logger->debug("No memory limit set");
}
}
public function getLogger() : \ThreadedLogger{
return $this->logger;
}
public function handleException(\Throwable $e) : void{
$this->logger->logException($e);
}
public function getThreadName() : string{
return "AsyncWorker#" . $this->id;
}
public function getAsyncWorkerId() : int{
return $this->id;
}
/**
* Saves mixed data into the worker's thread-local object store. This can be used to store objects which you
* want to use on this worker thread from multiple AsyncTasks.
*
* @param string $identifier
* @param mixed $value
*/
public function saveToThreadStore(string $identifier, $value) : void{
if(\Thread::getCurrentThread() !== $this){
throw new \InvalidStateException("Thread-local data can only be stored in the thread context");
}
self::$store[$identifier] = $value;
}
/**
* Retrieves mixed data from the worker's thread-local object store.
*
* Note that the thread-local object store could be cleared and your data might not exist, so your code should
* account for the possibility that what you're trying to retrieve might not exist.
*
* Objects stored in this storage may ONLY be retrieved while the task is running.
*
* @param string $identifier
*
* @return mixed
*/
public function getFromThreadStore(string $identifier){
if(\Thread::getCurrentThread() !== $this){
throw new \InvalidStateException("Thread-local data can only be fetched in the thread context");
}
return self::$store[$identifier] ?? null;
}
/**
* Removes previously-stored mixed data from the worker's thread-local object store.
*
* @param string $identifier
*/
public function removeFromThreadStore(string $identifier) : void{
if(\Thread::getCurrentThread() !== $this){
throw new \InvalidStateException("Thread-local data can only be removed in the thread context");
}
unset(self::$store[$identifier]);
}
}

View File

@@ -0,0 +1,64 @@
<?php
/*
*
* ____ _ _ __ __ _ __ __ ____
* | _ \ ___ ___| | _____| |_| \/ (_)_ __ ___ | \/ | _ \
* | |_) / _ \ / __| |/ / _ \ __| |\/| | | '_ \ / _ \_____| |\/| | |_) |
* | __/ (_) | (__| < __/ |_| | | | | | | | __/_____| | | | __/
* |_| \___/ \___|_|\_\___|\__|_| |_|_|_| |_|\___| |_| |_|_|
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* @author PocketMine Team
* @link http://www.pocketmine.net/
*
*
*/
declare(strict_types=1);
namespace pocketmine\scheduler;
use pocketmine\utils\Internet;
use pocketmine\utils\InternetException;
use function serialize;
use function unserialize;
/**
* Executes a consecutive list of cURL operations.
*
* The result of this AsyncTask is an array of arrays (returned from {@link Internet::simpleCurl}) or InternetException objects.
*/
class BulkCurlTask extends AsyncTask{
private $operations;
/**
* BulkCurlTask constructor.
*
* $operations accepts an array of arrays. Each member array must contain a string mapped to "page", and optionally,
* "timeout", "extraHeaders" and "extraOpts". Documentation of these options are same as those in
* {@link Utils::simpleCurl}.
*
* @param array $operations
*/
public function __construct(array $operations){
$this->operations = serialize($operations);
}
public function onRun() : void{
$operations = unserialize($this->operations);
$results = [];
foreach($operations as $op){
try{
$results[] = Internet::simpleCurl($op["page"], $op["timeout"] ?? 10, $op["extraHeaders"] ?? [], $op["extraOpts"] ?? []);
}catch(InternetException $e){
$results[] = $e;
}
}
$this->setResult($results);
}
}

View File

@@ -0,0 +1,73 @@
<?php
/*
*
* ____ _ _ __ __ _ __ __ ____
* | _ \ ___ ___| | _____| |_| \/ (_)_ __ ___ | \/ | _ \
* | |_) / _ \ / __| |/ / _ \ __| |\/| | | '_ \ / _ \_____| |\/| | |_) |
* | __/ (_) | (__| < __/ |_| | | | | | | | __/_____| | | | __/
* |_| \___/ \___|_|\_\___|\__|_| |_|_|_| |_|\___| |_| |_|_|
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* @author PocketMine Team
* @link http://www.pocketmine.net/
*
*
*/
declare(strict_types=1);
namespace pocketmine\scheduler;
use pocketmine\utils\Utils;
/**
* This class permits scheduling a self-cancelling closure to run. This is useful for repeating tasks.
* The given closure must return a bool which indicates whether it should continue executing.
*
* Example usage:
*
* ```
* TaskScheduler->scheduleTask(new CancellableClosureTask(function(int $currentTick) : bool{
* echo "HI on $currentTick\n";
* $continue = false;
* return $continue; //stop repeating
* });
* ```
*
* @see ClosureTask
*/
class CancellableClosureTask extends Task{
public const CONTINUE = true;
public const CANCEL = false;
/** @var \Closure */
private $closure;
/**
* CancellableClosureTask constructor.
*
* The closure should follow the signature callback(int $currentTick) : bool. The return value will be used to
* decide whether to continue repeating.
*
* @param \Closure $closure
*/
public function __construct(\Closure $closure){
Utils::validateCallableSignature(function(int $currentTick) : bool{ return false; }, $closure);
$this->closure = $closure;
}
public function getName() : string{
return Utils::getNiceClosureName($this->closure);
}
public function onRun(int $currentTick) : void{
if(!($this->closure)($currentTick)){
$this->getHandler()->cancel();
}
}
}

View File

@@ -0,0 +1,59 @@
<?php
/*
*
* ____ _ _ __ __ _ __ __ ____
* | _ \ ___ ___| | _____| |_| \/ (_)_ __ ___ | \/ | _ \
* | |_) / _ \ / __| |/ / _ \ __| |\/| | | '_ \ / _ \_____| |\/| | |_) |
* | __/ (_) | (__| < __/ |_| | | | | | | | __/_____| | | | __/
* |_| \___/ \___|_|\_\___|\__|_| |_|_|_| |_|\___| |_| |_|_|
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* @author PocketMine Team
* @link http://www.pocketmine.net/
*
*
*/
declare(strict_types=1);
namespace pocketmine\scheduler;
use pocketmine\utils\Utils;
/**
* Task implementation which allows closures to be called by a scheduler.
*
* Example usage:
*
* ```
* TaskScheduler->scheduleTask(new ClosureTask(function(int $currentTick) : void{
* echo "HI on $currentTick\n";
* });
* ```
*/
class ClosureTask extends Task{
/** @var \Closure */
private $closure;
/**
* @param \Closure $closure Must accept only ONE parameter, $currentTick
*/
public function __construct(\Closure $closure){
Utils::validateCallableSignature(function(int $currentTick){}, $closure);
$this->closure = $closure;
}
public function getName() : string{
return Utils::getNiceClosureName($this->closure);
}
public function onRun(int $currentTick) : void{
($this->closure)($currentTick);
}
}

View File

@@ -0,0 +1,60 @@
<?php
/*
*
* ____ _ _ __ __ _ __ __ ____
* | _ \ ___ ___| | _____| |_| \/ (_)_ __ ___ | \/ | _ \
* | |_) / _ \ / __| |/ / _ \ __| |\/| | | '_ \ / _ \_____| |\/| | |_) |
* | __/ (_) | (__| < __/ |_| | | | | | | | __/_____| | | | __/
* |_| \___/ \___|_|\_\___|\__|_| |_|_|_| |_|\___| |_| |_|_|
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* @author PocketMine Team
* @link http://www.pocketmine.net/
*
*
*/
declare(strict_types=1);
namespace pocketmine\scheduler;
use pocketmine\MemoryManager;
use const DIRECTORY_SEPARATOR;
/**
* Task used to dump memory from AsyncWorkers
*/
class DumpWorkerMemoryTask extends AsyncTask{
/** @var string */
private $outputFolder;
/** @var int */
private $maxNesting;
/** @var int */
private $maxStringSize;
/**
* @param string $outputFolder
* @param int $maxNesting
* @param int $maxStringSize
*/
public function __construct(string $outputFolder, int $maxNesting, int $maxStringSize){
$this->outputFolder = $outputFolder;
$this->maxNesting = $maxNesting;
$this->maxStringSize = $maxStringSize;
}
public function onRun() : void{
MemoryManager::dumpMemory(
$this->worker,
$this->outputFolder . DIRECTORY_SEPARATOR . "AsyncWorker#" . $this->worker->getAsyncWorkerId(),
$this->maxNesting,
$this->maxStringSize,
new \PrefixedLogger($this->worker->getLogger(), "Memory Dump")
);
}
}

View File

@@ -0,0 +1,51 @@
<?php
/*
*
* ____ _ _ __ __ _ __ __ ____
* | _ \ ___ ___| | _____| |_| \/ (_)_ __ ___ | \/ | _ \
* | |_) / _ \ / __| |/ / _ \ __| |\/| | | '_ \ / _ \_____| |\/| | |_) |
* | __/ (_) | (__| < __/ |_| | | | | | | | __/_____| | | | __/
* |_| \___/ \___|_|\_\___|\__|_| |_|_|_| |_|\___| |_| |_|_|
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* @author PocketMine Team
* @link http://www.pocketmine.net/
*
*
*/
declare(strict_types=1);
namespace pocketmine\scheduler;
use function file_put_contents;
class FileWriteTask extends AsyncTask{
/** @var string */
private $path;
/** @var mixed */
private $contents;
/** @var int */
private $flags;
/**
* @param string $path
* @param mixed $contents
* @param int $flags
*/
public function __construct(string $path, $contents, int $flags = 0){
$this->path = $path;
$this->contents = $contents;
$this->flags = $flags;
}
public function onRun() : void{
file_put_contents($this->path, $this->contents, $this->flags);
}
}

View File

@@ -0,0 +1,35 @@
<?php
/*
*
* ____ _ _ __ __ _ __ __ ____
* | _ \ ___ ___| | _____| |_| \/ (_)_ __ ___ | \/ | _ \
* | |_) / _ \ / __| |/ / _ \ __| |\/| | | '_ \ / _ \_____| |\/| | |_) |
* | __/ (_) | (__| < __/ |_| | | | | | | | __/_____| | | | __/
* |_| \___/ \___|_|\_\___|\__|_| |_|_|_| |_|\___| |_| |_|_|
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* @author PocketMine Team
* @link http://www.pocketmine.net/
*
*
*/
declare(strict_types=1);
namespace pocketmine\scheduler;
use function gc_collect_cycles;
use function gc_enable;
class GarbageCollectionTask extends AsyncTask{
public function onRun() : void{
gc_enable();
gc_collect_cycles();
}
}

View File

@@ -0,0 +1,161 @@
<?php
/*
*
* ____ _ _ __ __ _ __ __ ____
* | _ \ ___ ___| | _____| |_| \/ (_)_ __ ___ | \/ | _ \
* | |_) / _ \ / __| |/ / _ \ __| |\/| | | '_ \ / _ \_____| |\/| | |_) |
* | __/ (_) | (__| < __/ |_| | | | | | | | __/_____| | | | __/
* |_| \___/ \___|_|\_\___|\__|_| |_|_|_| |_|\___| |_| |_|_|
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* @author PocketMine Team
* @link http://www.pocketmine.net/
*
*
*/
declare(strict_types=1);
namespace pocketmine\scheduler;
use pocketmine\network\mcpe\protocol\ProtocolInfo;
use pocketmine\player\Player;
use pocketmine\Server;
use pocketmine\utils\Internet;
use pocketmine\utils\Process;
use pocketmine\utils\Utils;
use pocketmine\utils\UUID;
use pocketmine\utils\VersionString;
use function array_map;
use function array_values;
use function count;
use function json_encode;
use function md5;
use function microtime;
use function php_uname;
use function strlen;
use const PHP_VERSION;
class SendUsageTask extends AsyncTask{
public const TYPE_OPEN = 1;
public const TYPE_STATUS = 2;
public const TYPE_CLOSE = 3;
public $endpoint;
public $data;
/**
* @param Server $server
* @param int $type
* @param array $playerList
*/
public function __construct(Server $server, int $type, array $playerList = []){
$endpoint = "http://" . $server->getProperty("anonymous-statistics.host", "stats.pocketmine.net") . "/";
$data = [];
$data["uniqueServerId"] = $server->getServerUniqueId()->toString();
$data["uniqueMachineId"] = Utils::getMachineUniqueId()->toString();
$data["uniqueRequestId"] = UUID::fromData($server->getServerUniqueId()->toString(), microtime(false))->toString();
switch($type){
case self::TYPE_OPEN:
$data["event"] = "open";
$version = new VersionString(\pocketmine\BASE_VERSION, \pocketmine\IS_DEVELOPMENT_BUILD, \pocketmine\BUILD_NUMBER);
$data["server"] = [
"port" => $server->getPort(),
"software" => $server->getName(),
"fullVersion" => $version->getFullVersion(true),
"version" => $version->getFullVersion(false),
"build" => $version->getBuild(),
"api" => $server->getApiVersion(),
"minecraftVersion" => $server->getVersion(),
"protocol" => ProtocolInfo::CURRENT_PROTOCOL
];
$data["system"] = [
"operatingSystem" => Utils::getOS(),
"cores" => Utils::getCoreCount(),
"phpVersion" => PHP_VERSION,
"machine" => php_uname("a"),
"release" => php_uname("r"),
"platform" => php_uname("i")
];
$data["players"] = [
"count" => 0,
"limit" => $server->getMaxPlayers()
];
$plugins = [];
foreach($server->getPluginManager()->getPlugins() as $p){
$d = $p->getDescription();
$plugins[$d->getName()] = [
"name" => $d->getName(),
"version" => $d->getVersion(),
"enabled" => $p->isEnabled()
];
}
$data["plugins"] = $plugins;
break;
case self::TYPE_STATUS:
$data["event"] = "status";
$data["server"] = [
"ticksPerSecond" => $server->getTicksPerSecondAverage(),
"tickUsage" => $server->getTickUsageAverage(),
"ticks" => $server->getTick()
];
//This anonymizes the user ids so they cannot be reversed to the original
foreach($playerList as $k => $v){
$playerList[$k] = md5($v);
}
$players = array_map(function(Player $p){ return md5($p->getUniqueId()->toBinary()); }, $server->getOnlinePlayers());
$data["players"] = [
"count" => count($players),
"limit" => $server->getMaxPlayers(),
"currentList" => $players,
"historyList" => array_values($playerList)
];
$info = Process::getMemoryUsage(true);
$data["system"] = [
"mainMemory" => $info[0],
"totalMemory" => $info[1],
"availableMemory" => $info[2],
"threadCount" => Process::getThreadCount()
];
break;
case self::TYPE_CLOSE:
$data["event"] = "close";
$data["crashing"] = $server->isRunning();
break;
}
$this->endpoint = $endpoint . "api/post";
$this->data = json_encode($data/*, JSON_PRETTY_PRINT*/);
}
public function onRun() : void{
Internet::postURL($this->endpoint, $this->data, 5, [
"Content-Type: application/json",
"Content-Length: " . strlen($this->data)
]);
}
}

79
src/scheduler/Task.php Normal file
View File

@@ -0,0 +1,79 @@
<?php
/*
*
* ____ _ _ __ __ _ __ __ ____
* | _ \ ___ ___| | _____| |_| \/ (_)_ __ ___ | \/ | _ \
* | |_) / _ \ / __| |/ / _ \ __| |\/| | | '_ \ / _ \_____| |\/| | |_) |
* | __/ (_) | (__| < __/ |_| | | | | | | | __/_____| | | | __/
* |_| \___/ \___|_|\_\___|\__|_| |_|_|_| |_|\___| |_| |_|_|
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* @author PocketMine Team
* @link http://www.pocketmine.net/
*
*
*/
declare(strict_types=1);
namespace pocketmine\scheduler;
use pocketmine\utils\Utils;
abstract class Task{
/** @var TaskHandler */
private $taskHandler = null;
/**
* @return TaskHandler|null
*/
final public function getHandler(){
return $this->taskHandler;
}
/**
* @return int
*/
final public function getTaskId() : int{
if($this->taskHandler !== null){
return $this->taskHandler->getTaskId();
}
return -1;
}
public function getName() : string{
return Utils::getNiceClassName($this);
}
/**
* @param TaskHandler|null $taskHandler
*/
final public function setHandler(?TaskHandler $taskHandler) : void{
if($this->taskHandler === null or $taskHandler === null){
$this->taskHandler = $taskHandler;
}
}
/**
* Actions to execute when run
*
* @param int $currentTick
*
* @return void
*/
abstract public function onRun(int $currentTick);
/**
* Actions to execute if the Task is cancelled
*/
public function onCancel(){
}
}

View File

@@ -0,0 +1,175 @@
<?php
/*
*
* ____ _ _ __ __ _ __ __ ____
* | _ \ ___ ___| | _____| |_| \/ (_)_ __ ___ | \/ | _ \
* | |_) / _ \ / __| |/ / _ \ __| |\/| | | '_ \ / _ \_____| |\/| | |_) |
* | __/ (_) | (__| < __/ |_| | | | | | | | __/_____| | | | __/
* |_| \___/ \___|_|\_\___|\__|_| |_|_|_| |_|\___| |_| |_|_|
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* @author PocketMine Team
* @link http://www.pocketmine.net/
*
*
*/
declare(strict_types=1);
namespace pocketmine\scheduler;
use pocketmine\timings\Timings;
use pocketmine\timings\TimingsHandler;
class TaskHandler{
/** @var Task */
protected $task;
/** @var int */
protected $taskId;
/** @var int */
protected $delay;
/** @var int */
protected $period;
/** @var int */
protected $nextRun;
/** @var bool */
protected $cancelled = false;
/** @var TimingsHandler */
private $timings;
/** @var string */
private $taskName;
/** @var string */
private $ownerName;
/**
* @param Task $task
* @param int $taskId
* @param int $delay
* @param int $period
* @param string|null $ownerName
*/
public function __construct(Task $task, int $taskId, int $delay = -1, int $period = -1, ?string $ownerName = null){
$this->task = $task;
$this->taskId = $taskId;
$this->delay = $delay;
$this->period = $period;
$this->taskName = $task->getName();
$this->ownerName = $ownerName ?? "Unknown";
$this->timings = Timings::getScheduledTaskTimings($this, $period);
$this->task->setHandler($this);
}
/**
* @return bool
*/
public function isCancelled() : bool{
return $this->cancelled;
}
/**
* @return int
*/
public function getNextRun() : int{
return $this->nextRun;
}
/**
* @param int $ticks
*/
public function setNextRun(int $ticks) : void{
$this->nextRun = $ticks;
}
/**
* @return int
*/
public function getTaskId() : int{
return $this->taskId;
}
/**
* @return Task
*/
public function getTask() : Task{
return $this->task;
}
/**
* @return int
*/
public function getDelay() : int{
return $this->delay;
}
/**
* @return bool
*/
public function isDelayed() : bool{
return $this->delay > 0;
}
/**
* @return bool
*/
public function isRepeating() : bool{
return $this->period > 0;
}
/**
* @return int
*/
public function getPeriod() : int{
return $this->period;
}
public function cancel() : void{
try{
if(!$this->isCancelled()){
$this->task->onCancel();
}
}finally{
$this->remove();
}
}
public function remove() : void{
$this->cancelled = true;
$this->task->setHandler(null);
}
/**
* @param int $currentTick
*/
public function run(int $currentTick) : void{
$this->timings->startTiming();
try{
$this->task->onRun($currentTick);
}finally{
$this->timings->stopTiming();
}
}
/**
* @return string
*/
public function getTaskName() : string{
return $this->taskName;
}
public function getOwnerName() : string{
return $this->ownerName;
}
}

View File

@@ -0,0 +1,219 @@
<?php
/*
*
* ____ _ _ __ __ _ __ __ ____
* | _ \ ___ ___| | _____| |_| \/ (_)_ __ ___ | \/ | _ \
* | |_) / _ \ / __| |/ / _ \ __| |\/| | | '_ \ / _ \_____| |\/| | |_) |
* | __/ (_) | (__| < __/ |_| | | | | | | | __/_____| | | | __/
* |_| \___/ \___|_|\_\___|\__|_| |_|_|_| |_|\___| |_| |_|_|
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* @author PocketMine Team
* @link http://www.pocketmine.net/
*
*
*/
declare(strict_types=1);
/**
* Task scheduling related classes
*/
namespace pocketmine\scheduler;
use pocketmine\utils\ReversePriorityQueue;
class TaskScheduler{
/** @var string|null */
private $owner;
/** @var bool */
private $enabled = true;
/**
* @var ReversePriorityQueue<Task>
*/
protected $queue;
/**
* @var TaskHandler[]
*/
protected $tasks = [];
/** @var int */
private $ids = 1;
/** @var int */
protected $currentTick = 0;
/**
* @param null|string $owner
*/
public function __construct(?string $owner = null){
$this->owner = $owner;
$this->queue = new ReversePriorityQueue();
}
/**
* @param Task $task
*
* @return TaskHandler
*/
public function scheduleTask(Task $task) : TaskHandler{
return $this->addTask($task, -1, -1);
}
/**
* @param Task $task
* @param int $delay
*
* @return TaskHandler
*/
public function scheduleDelayedTask(Task $task, int $delay) : TaskHandler{
return $this->addTask($task, $delay, -1);
}
/**
* @param Task $task
* @param int $period
*
* @return TaskHandler
*/
public function scheduleRepeatingTask(Task $task, int $period) : TaskHandler{
return $this->addTask($task, -1, $period);
}
/**
* @param Task $task
* @param int $delay
* @param int $period
*
* @return TaskHandler
*/
public function scheduleDelayedRepeatingTask(Task $task, int $delay, int $period) : TaskHandler{
return $this->addTask($task, $delay, $period);
}
/**
* @param int $taskId
*/
public function cancelTask(int $taskId) : void{
if(isset($this->tasks[$taskId])){
try{
$this->tasks[$taskId]->cancel();
}finally{
unset($this->tasks[$taskId]);
}
}
}
public function cancelAllTasks() : void{
foreach($this->tasks as $id => $task){
$this->cancelTask($id);
}
$this->tasks = [];
while(!$this->queue->isEmpty()){
$this->queue->extract();
}
$this->ids = 1;
}
/**
* @param int $taskId
*
* @return bool
*/
public function isQueued(int $taskId) : bool{
return isset($this->tasks[$taskId]);
}
/**
* @param Task $task
* @param int $delay
* @param int $period
*
* @return TaskHandler
*
* @throws \InvalidStateException
*/
private function addTask(Task $task, int $delay, int $period) : TaskHandler{
if(!$this->enabled){
throw new \InvalidStateException("Tried to schedule task to disabled scheduler");
}
if($delay <= 0){
$delay = -1;
}
if($period <= -1){
$period = -1;
}elseif($period < 1){
$period = 1;
}
return $this->handle(new TaskHandler($task, $this->nextId(), $delay, $period, $this->owner));
}
private function handle(TaskHandler $handler) : TaskHandler{
if($handler->isDelayed()){
$nextRun = $this->currentTick + $handler->getDelay();
}else{
$nextRun = $this->currentTick;
}
$handler->setNextRun($nextRun);
$this->tasks[$handler->getTaskId()] = $handler;
$this->queue->insert($handler, $nextRun);
return $handler;
}
public function shutdown() : void{
$this->enabled = false;
$this->cancelAllTasks();
}
public function setEnabled(bool $enabled) : void{
$this->enabled = $enabled;
}
/**
* @param int $currentTick
*/
public function mainThreadHeartbeat(int $currentTick) : void{
$this->currentTick = $currentTick;
while($this->isReady($this->currentTick)){
/** @var TaskHandler $task */
$task = $this->queue->extract();
if($task->isCancelled()){
unset($this->tasks[$task->getTaskId()]);
continue;
}
$task->run($this->currentTick);
if($task->isRepeating()){
$task->setNextRun($this->currentTick + $task->getPeriod());
$this->queue->insert($task, $this->currentTick + $task->getPeriod());
}else{
$task->remove();
unset($this->tasks[$task->getTaskId()]);
}
}
}
private function isReady(int $currentTick) : bool{
return !$this->queue->isEmpty() and $this->queue->current()->getNextRun() <= $currentTick;
}
/**
* @return int
*/
private function nextId() : int{
return $this->ids++;
}
}