mirror of
https://github.com/pmmp/PocketMine-MP.git
synced 2025-04-20 16:00:20 +00:00
Merge pull request #2213: Scheduler API refactor, plugins now have their own schedulers
- Removed `Server->getScheduler()`. All plugins now have their own scheduler which is accessible using `Plugin->getScheduler()`. Aside from being syntactically more concise and pleasant, this also allows much more effective management of tasks when plugins are disabled. - Removed `PluginTask` class. Before this PR it was necessary for plugin tasks to descend from `PluginTask` to ensure that the server could clean them up correctly on plugin disable. This is no longer necessary, so the `PluginTask` class has been removed. Plugins may now utilize the `Task` class as a base if they like. - Added `Server->getAsyncPool()`. Since the global scheduler does not exist any more, it does not manage the server's `AsyncPool` any more. Additionally, `ServerScheduler` was previously bloated by a lot of `AsyncTask` related methods, which are now not necessary because direct access to `AsyncPool` is granted instead. - `ServerScheduler`: - `ServerScheduler` has been renamed to `TaskScheduler` since it is now a general-purpose task scheduler which is non-dependent on the user. This allows much greater flexibility and also makes it possible to unit-test. - All `AsyncTask`/`AsyncPool` related methods have been removed - the task scheduler does not manage the async pool anymore. - Calls to `Server->getScheduler()->scheduleAsyncTask()` should be replaced with `Server->getAsyncPool()->submitTask()`. - Calls to `Server->getScheduler()->scheduleAsyncTaskToWorker()` should be replaced with and `Server->getAsyncPool()->submitTaskToWorker()`. ## Backwards compatibility This poses significant backwards compatibility breaks for any plugins utilizing Tasks or AsyncTasks. These breaks are described above, along with basic upgrade steps. The upgrade process is quite straightforward. ## Follow-up A large part of the goal with this pull request is to modularize these parts of the code so that they can be reused and also unit-tested. I would like to remove the existing test set from TesterPlugin at some stage when the AsyncPool can operate without a Server. Because of the above, I am considering making further backwards incompatible changes directly to `AsyncTask` to remove the `Server` parameters from `onCompletion()` and `onProgressUpdate()`. These shouldn't be too difficult to upgrade from and can be prepared for in advance.
This commit is contained in:
commit
9644766df3
@ -243,9 +243,9 @@ class MemoryManager{
|
||||
Timings::$garbageCollectorTimer->startTiming();
|
||||
|
||||
if($this->garbageCollectionAsync){
|
||||
$size = $this->server->getScheduler()->getAsyncTaskPoolSize();
|
||||
for($i = 0; $i < $size; ++$i){
|
||||
$this->server->getScheduler()->scheduleAsyncTaskToWorker(new GarbageCollectionTask(), $i);
|
||||
$pool = $this->server->getAsyncPool();
|
||||
for($i = 0, $size = $pool->getSize(); $i < $size; ++$i){
|
||||
$pool->submitTaskToWorker(new GarbageCollectionTask(), $i);
|
||||
}
|
||||
}
|
||||
|
||||
@ -268,9 +268,9 @@ class MemoryManager{
|
||||
self::dumpMemory($this->server, $outputFolder, $maxNesting, $maxStringSize);
|
||||
|
||||
if($this->dumpWorkers){
|
||||
$scheduler = $this->server->getScheduler();
|
||||
for($i = 0, $size = $scheduler->getAsyncTaskPoolSize(); $i < $size; ++$i){
|
||||
$scheduler->scheduleAsyncTaskToWorker(new DumpWorkerMemoryTask($outputFolder, $maxNesting, $maxStringSize), $i);
|
||||
$pool = $this->server->getAsyncPool();
|
||||
for($i = 0, $size = $pool->getSize(); $i < $size; ++$i){
|
||||
$pool->submitTaskToWorker(new DumpWorkerMemoryTask($outputFolder, $maxNesting, $maxStringSize), $i);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1916,7 +1916,7 @@ class Player extends Human implements CommandSender, ChunkLoader, IPlayer{
|
||||
}
|
||||
|
||||
if(!$packet->skipVerification){
|
||||
$this->server->getScheduler()->scheduleAsyncTask(new VerifyLoginTask($this, $packet));
|
||||
$this->server->getAsyncPool()->submitTask(new VerifyLoginTask($this, $packet));
|
||||
}else{
|
||||
$this->onVerifyCompleted($packet, null, true);
|
||||
}
|
||||
|
@ -87,9 +87,9 @@ use pocketmine\plugin\PluginLoadOrder;
|
||||
use pocketmine\plugin\PluginManager;
|
||||
use pocketmine\plugin\ScriptPluginLoader;
|
||||
use pocketmine\resourcepacks\ResourcePackManager;
|
||||
use pocketmine\scheduler\AsyncPool;
|
||||
use pocketmine\scheduler\FileWriteTask;
|
||||
use pocketmine\scheduler\SendUsageTask;
|
||||
use pocketmine\scheduler\ServerScheduler;
|
||||
use pocketmine\snooze\SleeperHandler;
|
||||
use pocketmine\snooze\SleeperNotifier;
|
||||
use pocketmine\tile\Tile;
|
||||
@ -148,8 +148,8 @@ class Server{
|
||||
/** @var AutoUpdater */
|
||||
private $updater = null;
|
||||
|
||||
/** @var ServerScheduler */
|
||||
private $scheduler = null;
|
||||
/** @var AsyncPool */
|
||||
private $asyncPool;
|
||||
|
||||
/**
|
||||
* Counts the ticks since the server start
|
||||
@ -650,11 +650,8 @@ class Server{
|
||||
return $this->resourceManager;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return ServerScheduler
|
||||
*/
|
||||
public function getScheduler(){
|
||||
return $this->scheduler;
|
||||
public function getAsyncPool() : AsyncPool{
|
||||
return $this->asyncPool;
|
||||
}
|
||||
|
||||
/**
|
||||
@ -824,7 +821,7 @@ class Server{
|
||||
$nbt = new BigEndianNBTStream();
|
||||
try{
|
||||
if($async){
|
||||
$this->getScheduler()->scheduleAsyncTask(new FileWriteTask($this->getDataPath() . "players/" . strtolower($name) . ".dat", $nbt->writeCompressed($ev->getSaveData())));
|
||||
$this->asyncPool->submitTask(new FileWriteTask($this->getDataPath() . "players/" . strtolower($name) . ".dat", $nbt->writeCompressed($ev->getSaveData())));
|
||||
}else{
|
||||
file_put_contents($this->getDataPath() . "players/" . strtolower($name) . ".dat", $nbt->writeCompressed($ev->getSaveData()));
|
||||
}
|
||||
@ -1519,7 +1516,7 @@ class Server{
|
||||
$this->logger->info($this->getLanguage()->translateString("pocketmine.server.start", [TextFormat::AQUA . $this->getVersion() . TextFormat::RESET]));
|
||||
|
||||
if(($poolSize = $this->getProperty("settings.async-workers", "auto")) === "auto"){
|
||||
$poolSize = ServerScheduler::$WORKERS;
|
||||
$poolSize = 2;
|
||||
$processors = Utils::getCoreCount() - 2;
|
||||
|
||||
if($processors > 0){
|
||||
@ -1529,7 +1526,7 @@ class Server{
|
||||
$poolSize = (int) $poolSize;
|
||||
}
|
||||
|
||||
ServerScheduler::$WORKERS = $poolSize;
|
||||
$this->asyncPool = new AsyncPool($this, $poolSize);
|
||||
|
||||
if($this->getProperty("network.batch-threshold", 256) >= 0){
|
||||
Network::$BATCH_THRESHOLD = (int) $this->getProperty("network.batch-threshold", 256);
|
||||
@ -1551,7 +1548,6 @@ class Server{
|
||||
|
||||
$this->doTitleTick = ((bool) $this->getProperty("console.title-tick", true)) && Terminal::hasFormattingCodes();
|
||||
|
||||
$this->scheduler = new ServerScheduler();
|
||||
|
||||
if($this->getConfigBool("enable-rcon", false)){
|
||||
try{
|
||||
@ -1904,7 +1900,7 @@ class Server{
|
||||
|
||||
if(!$forceSync and !$immediate and $this->networkCompressionAsync){
|
||||
$task = new CompressBatchedTask($pk, $targets);
|
||||
$this->getScheduler()->scheduleAsyncTask($task);
|
||||
$this->asyncPool->submitTask($task);
|
||||
}else{
|
||||
$this->broadcastPacketsCallback($pk, $targets, $immediate);
|
||||
}
|
||||
@ -2068,9 +2064,9 @@ class Server{
|
||||
$this->getLogger()->debug("Removing event handlers");
|
||||
HandlerList::unregisterAll();
|
||||
|
||||
if($this->scheduler instanceof ServerScheduler){
|
||||
$this->getLogger()->debug("Shutting down task scheduler");
|
||||
$this->scheduler->shutdown();
|
||||
if($this->asyncPool instanceof AsyncPool){
|
||||
$this->getLogger()->debug("Shutting down async task worker pool");
|
||||
$this->asyncPool->shutdown();
|
||||
}
|
||||
|
||||
if($this->properties !== null and $this->properties->hasChanged()){
|
||||
@ -2428,7 +2424,7 @@ class Server{
|
||||
|
||||
public function sendUsage($type = SendUsageTask::TYPE_STATUS){
|
||||
if((bool) $this->getProperty("anonymous-statistics.enabled", true)){
|
||||
$this->scheduler->scheduleAsyncTask(new SendUsageTask($this, $type, $this->uniquePlayers));
|
||||
$this->asyncPool->submitTask(new SendUsageTask($this, $type, $this->uniquePlayers));
|
||||
}
|
||||
$this->uniquePlayers = [];
|
||||
}
|
||||
@ -2523,9 +2519,13 @@ class Server{
|
||||
Timings::$connectionTimer->stopTiming();
|
||||
|
||||
Timings::$schedulerTimer->startTiming();
|
||||
$this->scheduler->mainThreadHeartbeat($this->tickCounter);
|
||||
$this->pluginManager->tickSchedulers($this->tickCounter);
|
||||
Timings::$schedulerTimer->stopTiming();
|
||||
|
||||
Timings::$schedulerAsyncTimer->startTiming();
|
||||
$this->asyncPool->collectTasks();
|
||||
Timings::$schedulerAsyncTimer->stopTiming();
|
||||
|
||||
$this->checkTickUpdates($this->tickCounter, $tickTime);
|
||||
|
||||
foreach($this->players as $player){
|
||||
|
@ -104,7 +104,7 @@ class TimingsCommand extends VanillaCommand{
|
||||
];
|
||||
fclose($fileTimings);
|
||||
|
||||
$sender->getServer()->getScheduler()->scheduleAsyncTask(new class([
|
||||
$sender->getServer()->getAsyncPool()->submitTask(new class([
|
||||
["page" => "http://paste.ubuntu.com", "extraOpts" => [
|
||||
CURLOPT_HTTPHEADER => ["User-Agent: " . $sender->getServer()->getName() . " " . $sender->getServer()->getPocketMineVersion()],
|
||||
CURLOPT_POST => 1,
|
||||
|
@ -381,16 +381,17 @@ class Level implements ChunkManager, Metadatable{
|
||||
}
|
||||
|
||||
public function registerGenerator(){
|
||||
$size = $this->server->getScheduler()->getAsyncTaskPoolSize();
|
||||
for($i = 0; $i < $size; ++$i){
|
||||
$this->server->getScheduler()->scheduleAsyncTaskToWorker(new GeneratorRegisterTask($this, $this->generator, $this->provider->getGeneratorOptions()), $i);
|
||||
$pool = $this->server->getAsyncPool();
|
||||
for($i = 0, $size = $pool->getSize(); $i < $size; ++$i){
|
||||
$pool->submitTaskToWorker(new GeneratorRegisterTask($this, $this->generator, $this->provider->getGeneratorOptions()), $i);
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
public function unregisterGenerator(){
|
||||
$size = $this->server->getScheduler()->getAsyncTaskPoolSize();
|
||||
for($i = 0; $i < $size; ++$i){
|
||||
$this->server->getScheduler()->scheduleAsyncTaskToWorker(new GeneratorUnregisterTask($this), $i);
|
||||
$pool = $this->server->getAsyncPool();
|
||||
for($i = 0, $size = $pool->getSize(); $i < $size; ++$i){
|
||||
$pool->submitTaskToWorker(new GeneratorUnregisterTask($this), $i);
|
||||
}
|
||||
}
|
||||
|
||||
@ -2479,7 +2480,7 @@ class Level implements ChunkManager, Metadatable{
|
||||
throw new ChunkException("Invalid Chunk sent");
|
||||
}
|
||||
|
||||
$this->server->getScheduler()->scheduleAsyncTask(new ChunkRequestTask($this, $chunk));
|
||||
$this->server->getAsyncPool()->submitTask(new ChunkRequestTask($this, $chunk));
|
||||
|
||||
$this->timings->syncChunkSendPrepareTimer->stopTiming();
|
||||
}
|
||||
@ -2661,7 +2662,7 @@ class Level implements ChunkManager, Metadatable{
|
||||
$this->server->getPluginManager()->callEvent(new ChunkLoadEvent($this, $chunk, !$chunk->isGenerated()));
|
||||
|
||||
if(!$chunk->isLightPopulated() and $chunk->isPopulated() and $this->getServer()->getProperty("chunk-ticking.light-updates", false)){
|
||||
$this->getServer()->getScheduler()->scheduleAsyncTask(new LightPopulationTask($this, $chunk));
|
||||
$this->getServer()->getAsyncPool()->submitTask(new LightPopulationTask($this, $chunk));
|
||||
}
|
||||
|
||||
if($this->isChunkInUse($x, $z)){
|
||||
@ -2944,7 +2945,7 @@ class Level implements ChunkManager, Metadatable{
|
||||
}
|
||||
}
|
||||
$task = new PopulationTask($this, $chunk);
|
||||
$this->server->getScheduler()->scheduleAsyncTask($task);
|
||||
$this->server->getAsyncPool()->submitTask($task);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -27,6 +27,7 @@ declare(strict_types=1);
|
||||
namespace pocketmine\plugin;
|
||||
|
||||
use pocketmine\command\CommandExecutor;
|
||||
use pocketmine\scheduler\TaskScheduler;
|
||||
use pocketmine\Server;
|
||||
use pocketmine\utils\Config;
|
||||
|
||||
@ -136,4 +137,9 @@ interface Plugin extends CommandExecutor{
|
||||
*/
|
||||
public function getPluginLoader();
|
||||
|
||||
/**
|
||||
* @return TaskScheduler
|
||||
*/
|
||||
public function getScheduler() : TaskScheduler;
|
||||
|
||||
}
|
||||
|
@ -26,6 +26,7 @@ namespace pocketmine\plugin;
|
||||
use pocketmine\command\Command;
|
||||
use pocketmine\command\CommandSender;
|
||||
use pocketmine\command\PluginIdentifiableCommand;
|
||||
use pocketmine\scheduler\TaskScheduler;
|
||||
use pocketmine\Server;
|
||||
use pocketmine\utils\Config;
|
||||
|
||||
@ -58,6 +59,9 @@ abstract class PluginBase implements Plugin{
|
||||
/** @var PluginLogger */
|
||||
private $logger;
|
||||
|
||||
/** @var TaskScheduler */
|
||||
private $scheduler;
|
||||
|
||||
/**
|
||||
* Called when the plugin is loaded, before calling onEnable()
|
||||
*/
|
||||
@ -119,6 +123,7 @@ abstract class PluginBase implements Plugin{
|
||||
$this->file = rtrim($file, "\\/") . "/";
|
||||
$this->configFile = $this->dataFolder . "config.yml";
|
||||
$this->logger = new PluginLogger($this);
|
||||
$this->scheduler = new TaskScheduler($this->logger);
|
||||
}
|
||||
}
|
||||
|
||||
@ -305,4 +310,10 @@ abstract class PluginBase implements Plugin{
|
||||
return $this->loader;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return TaskScheduler
|
||||
*/
|
||||
public function getScheduler() : TaskScheduler{
|
||||
return $this->scheduler;
|
||||
}
|
||||
}
|
||||
|
@ -575,6 +575,7 @@ class PluginManager{
|
||||
foreach($plugin->getDescription()->getPermissions() as $perm){
|
||||
$this->addPermission($perm);
|
||||
}
|
||||
$plugin->getScheduler()->setEnabled(true);
|
||||
$plugin->getPluginLoader()->enablePlugin($plugin);
|
||||
}catch(\Throwable $e){
|
||||
$this->server->getLogger()->logException($e);
|
||||
@ -657,7 +658,7 @@ class PluginManager{
|
||||
$this->server->getLogger()->logException($e);
|
||||
}
|
||||
|
||||
$this->server->getScheduler()->cancelTasks($plugin);
|
||||
$plugin->getScheduler()->shutdown();
|
||||
HandlerList::unregisterAll($plugin);
|
||||
foreach($plugin->getDescription()->getPermissions() as $perm){
|
||||
$this->removePermission($perm);
|
||||
@ -665,6 +666,14 @@ class PluginManager{
|
||||
}
|
||||
}
|
||||
|
||||
public function tickSchedulers(int $currentTick) : void{
|
||||
foreach($this->plugins as $p){
|
||||
if($p->isEnabled()){
|
||||
$p->getScheduler()->mainThreadHeartbeat($currentTick);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public function clearPlugins(){
|
||||
$this->disablePlugins();
|
||||
$this->plugins = [];
|
||||
|
@ -24,7 +24,6 @@ declare(strict_types=1);
|
||||
namespace pocketmine\scheduler;
|
||||
|
||||
use pocketmine\Server;
|
||||
use pocketmine\timings\Timings;
|
||||
|
||||
class AsyncPool{
|
||||
|
||||
@ -37,6 +36,8 @@ class AsyncPool{
|
||||
private $tasks = [];
|
||||
/** @var int[] */
|
||||
private $taskWorkers = [];
|
||||
/** @var int */
|
||||
private $nextTaskId = 1;
|
||||
|
||||
/** @var AsyncWorker[] */
|
||||
private $workers = [];
|
||||
@ -77,13 +78,15 @@ class AsyncPool{
|
||||
}
|
||||
|
||||
public function submitTaskToWorker(AsyncTask $task, int $worker){
|
||||
if(isset($this->tasks[$task->getTaskId()]) or $task->isGarbage()){
|
||||
return;
|
||||
}
|
||||
|
||||
if($worker < 0 or $worker >= $this->size){
|
||||
throw new \InvalidArgumentException("Invalid worker $worker");
|
||||
}
|
||||
if($task->getTaskId() !== null){
|
||||
throw new \InvalidArgumentException("Cannot submit the same AsyncTask instance more than once");
|
||||
}
|
||||
|
||||
$task->progressUpdates = new \Threaded;
|
||||
$task->setTaskId($this->nextTaskId++);
|
||||
|
||||
$this->tasks[$task->getTaskId()] = $task;
|
||||
|
||||
@ -93,8 +96,8 @@ class AsyncPool{
|
||||
}
|
||||
|
||||
public function submitTask(AsyncTask $task) : int{
|
||||
if(isset($this->tasks[$task->getTaskId()]) or $task->isGarbage()){
|
||||
return -1;
|
||||
if($task->getTaskId() !== null){
|
||||
throw new \InvalidArgumentException("Cannot submit the same AsyncTask instance more than once");
|
||||
}
|
||||
|
||||
$selectedWorker = mt_rand(0, $this->size - 1);
|
||||
@ -160,8 +163,6 @@ class AsyncPool{
|
||||
}
|
||||
|
||||
public function collectTasks(){
|
||||
Timings::$schedulerAsyncTimer->startTiming();
|
||||
|
||||
foreach($this->tasks as $task){
|
||||
if(!$task->isGarbage()){
|
||||
$task->checkProgressUpdates($this->server);
|
||||
@ -189,8 +190,6 @@ class AsyncPool{
|
||||
}
|
||||
|
||||
$this->collectWorkers();
|
||||
|
||||
Timings::$schedulerAsyncTimer->stopTiming();
|
||||
}
|
||||
|
||||
public function shutdown() : void{
|
||||
|
@ -1,50 +0,0 @@
|
||||
<?php
|
||||
|
||||
/*
|
||||
*
|
||||
* ____ _ _ __ __ _ __ __ ____
|
||||
* | _ \ ___ ___| | _____| |_| \/ (_)_ __ ___ | \/ | _ \
|
||||
* | |_) / _ \ / __| |/ / _ \ __| |\/| | | '_ \ / _ \_____| |\/| | |_) |
|
||||
* | __/ (_) | (__| < __/ |_| | | | | | | | __/_____| | | | __/
|
||||
* |_| \___/ \___|_|\_\___|\__|_| |_|_|_| |_|\___| |_| |_|_|
|
||||
*
|
||||
* This program is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU Lesser General Public License as published by
|
||||
* the Free Software Foundation, either version 3 of the License, or
|
||||
* (at your option) any later version.
|
||||
*
|
||||
* @author PocketMine Team
|
||||
* @link http://www.pocketmine.net/
|
||||
*
|
||||
*
|
||||
*/
|
||||
|
||||
declare(strict_types=1);
|
||||
|
||||
namespace pocketmine\scheduler;
|
||||
|
||||
use pocketmine\plugin\Plugin;
|
||||
|
||||
/**
|
||||
* Base class for plugin tasks. Allows the Server to delete them easily when needed
|
||||
*/
|
||||
abstract class PluginTask extends Task{
|
||||
|
||||
/** @var Plugin */
|
||||
protected $owner;
|
||||
|
||||
/**
|
||||
* @param Plugin $owner
|
||||
*/
|
||||
public function __construct(Plugin $owner){
|
||||
$this->owner = $owner;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return Plugin
|
||||
*/
|
||||
final public function getOwner() : Plugin{
|
||||
return $this->owner;
|
||||
}
|
||||
|
||||
}
|
@ -23,9 +23,6 @@ declare(strict_types=1);
|
||||
|
||||
namespace pocketmine\scheduler;
|
||||
|
||||
/**
|
||||
* WARNING! Tasks created by plugins MUST extend PluginTask
|
||||
*/
|
||||
abstract class Task{
|
||||
|
||||
/** @var TaskHandler */
|
||||
|
@ -25,7 +25,6 @@ namespace pocketmine\scheduler;
|
||||
|
||||
use pocketmine\timings\Timings;
|
||||
use pocketmine\timings\TimingsHandler;
|
||||
use pocketmine\utils\MainLogger;
|
||||
|
||||
class TaskHandler{
|
||||
|
||||
@ -48,24 +47,28 @@ class TaskHandler{
|
||||
protected $cancelled = false;
|
||||
|
||||
/** @var TimingsHandler */
|
||||
public $timings;
|
||||
private $timings;
|
||||
|
||||
public $timingName = null;
|
||||
/** @var string */
|
||||
private $taskName;
|
||||
/** @var string */
|
||||
private $ownerName;
|
||||
|
||||
/**
|
||||
* @param string $timingName
|
||||
* @param Task $task
|
||||
* @param int $taskId
|
||||
* @param int $delay
|
||||
* @param int $period
|
||||
* @param Task $task
|
||||
* @param int $taskId
|
||||
* @param int $delay
|
||||
* @param int $period
|
||||
* @param string|null $ownerName
|
||||
*/
|
||||
public function __construct(string $timingName, Task $task, int $taskId, int $delay = -1, int $period = -1){
|
||||
public function __construct(Task $task, int $taskId, int $delay = -1, int $period = -1, ?string $ownerName = null){
|
||||
$this->task = $task;
|
||||
$this->taskId = $taskId;
|
||||
$this->delay = $delay;
|
||||
$this->period = $period;
|
||||
$this->timingName = $timingName ?? "Unknown";
|
||||
$this->timings = Timings::getPluginTaskTimings($this, $period);
|
||||
$this->taskName = get_class($task);
|
||||
$this->ownerName = $ownerName ?? "Unknown";
|
||||
$this->timings = Timings::getScheduledTaskTimings($this, $period);
|
||||
$this->task->setHandler($this);
|
||||
}
|
||||
|
||||
@ -141,8 +144,6 @@ class TaskHandler{
|
||||
if(!$this->isCancelled()){
|
||||
$this->task->onCancel();
|
||||
}
|
||||
}catch(\Throwable $e){
|
||||
MainLogger::getLogger()->logException($e);
|
||||
}finally{
|
||||
$this->remove();
|
||||
}
|
||||
@ -157,17 +158,22 @@ class TaskHandler{
|
||||
* @param int $currentTick
|
||||
*/
|
||||
public function run(int $currentTick){
|
||||
$this->task->onRun($currentTick);
|
||||
$this->timings->startTiming();
|
||||
try{
|
||||
$this->task->onRun($currentTick);
|
||||
}finally{
|
||||
$this->timings->stopTiming();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @return string
|
||||
*/
|
||||
public function getTaskName() : string{
|
||||
if($this->timingName !== null){
|
||||
return $this->timingName;
|
||||
}
|
||||
return $this->taskName;
|
||||
}
|
||||
|
||||
return get_class($this->task);
|
||||
public function getOwnerName() : string{
|
||||
return $this->ownerName;
|
||||
}
|
||||
}
|
||||
|
@ -27,13 +27,17 @@ declare(strict_types=1);
|
||||
|
||||
namespace pocketmine\scheduler;
|
||||
|
||||
use pocketmine\plugin\Plugin;
|
||||
use pocketmine\plugin\PluginException;
|
||||
use pocketmine\Server;
|
||||
use pocketmine\utils\ReversePriorityQueue;
|
||||
|
||||
class ServerScheduler{
|
||||
public static $WORKERS = 2;
|
||||
class TaskScheduler{
|
||||
/** @var \Logger */
|
||||
private $logger;
|
||||
/** @var string|null */
|
||||
private $owner;
|
||||
|
||||
/** @var bool */
|
||||
private $enabled = true;
|
||||
|
||||
/**
|
||||
* @var ReversePriorityQueue<Task>
|
||||
*/
|
||||
@ -44,18 +48,17 @@ class ServerScheduler{
|
||||
*/
|
||||
protected $tasks = [];
|
||||
|
||||
/** @var AsyncPool */
|
||||
protected $asyncPool;
|
||||
|
||||
/** @var int */
|
||||
private $ids = 1;
|
||||
|
||||
/** @var int */
|
||||
protected $currentTick = 0;
|
||||
|
||||
public function __construct(){
|
||||
|
||||
public function __construct(\Logger $logger, ?string $owner = null){
|
||||
$this->logger = $logger;
|
||||
$this->owner = $owner;
|
||||
$this->queue = new ReversePriorityQueue();
|
||||
$this->asyncPool = new AsyncPool(Server::getInstance(), self::$WORKERS);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -67,49 +70,6 @@ class ServerScheduler{
|
||||
return $this->addTask($task, -1, -1);
|
||||
}
|
||||
|
||||
/**
|
||||
* Submits an asynchronous task to the Worker Pool
|
||||
*
|
||||
* @param AsyncTask $task
|
||||
*
|
||||
* @return int
|
||||
*/
|
||||
public function scheduleAsyncTask(AsyncTask $task) : int{
|
||||
if($task->getTaskId() !== null){
|
||||
throw new \UnexpectedValueException("Attempt to schedule the same AsyncTask instance twice");
|
||||
}
|
||||
$id = $this->nextId();
|
||||
$task->setTaskId($id);
|
||||
$task->progressUpdates = new \Threaded;
|
||||
return $this->asyncPool->submitTask($task);
|
||||
}
|
||||
|
||||
/**
|
||||
* Submits an asynchronous task to a specific Worker in the Pool
|
||||
*
|
||||
* @param AsyncTask $task
|
||||
* @param int $worker
|
||||
*
|
||||
* @return void
|
||||
*/
|
||||
public function scheduleAsyncTaskToWorker(AsyncTask $task, int $worker){
|
||||
if($task->getTaskId() !== null){
|
||||
throw new \UnexpectedValueException("Attempt to schedule the same AsyncTask instance twice");
|
||||
}
|
||||
$id = $this->nextId();
|
||||
$task->setTaskId($id);
|
||||
$task->progressUpdates = new \Threaded;
|
||||
$this->asyncPool->submitTaskToWorker($task, $worker);
|
||||
}
|
||||
|
||||
public function getAsyncTaskPoolSize() : int{
|
||||
return $this->asyncPool->getSize();
|
||||
}
|
||||
|
||||
public function increaseAsyncTaskPoolSize(int $newSize){
|
||||
$this->asyncPool->increaseSize($newSize);
|
||||
}
|
||||
|
||||
/**
|
||||
* @param Task $task
|
||||
* @param int $delay
|
||||
@ -145,31 +105,22 @@ class ServerScheduler{
|
||||
* @param int $taskId
|
||||
*/
|
||||
public function cancelTask(int $taskId){
|
||||
if($taskId !== null and isset($this->tasks[$taskId])){
|
||||
$this->tasks[$taskId]->cancel();
|
||||
if(isset($this->tasks[$taskId])){
|
||||
try{
|
||||
$this->tasks[$taskId]->cancel();
|
||||
}catch(\Throwable $e){
|
||||
$this->logger->critical("Task " . $this->tasks[$taskId]->getTaskName() . " threw an exception when trying to cancel: " . $e->getMessage());
|
||||
$this->logger->logException($e);
|
||||
}
|
||||
unset($this->tasks[$taskId]);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @param Plugin $plugin
|
||||
*/
|
||||
public function cancelTasks(Plugin $plugin){
|
||||
foreach($this->tasks as $taskId => $task){
|
||||
$ptask = $task->getTask();
|
||||
if($ptask instanceof PluginTask and $ptask->getOwner() === $plugin){
|
||||
$task->cancel();
|
||||
unset($this->tasks[$taskId]);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public function cancelAllTasks(){
|
||||
foreach($this->tasks as $task){
|
||||
$task->cancel();
|
||||
foreach($this->tasks as $id => $task){
|
||||
$this->cancelTask($id);
|
||||
}
|
||||
$this->tasks = [];
|
||||
$this->asyncPool->removeTasks();
|
||||
while(!$this->queue->isEmpty()){
|
||||
$this->queue->extract();
|
||||
}
|
||||
@ -192,11 +143,11 @@ class ServerScheduler{
|
||||
*
|
||||
* @return null|TaskHandler
|
||||
*
|
||||
* @throws PluginException
|
||||
* @throws \InvalidStateException
|
||||
*/
|
||||
private function addTask(Task $task, int $delay, int $period){
|
||||
if($task instanceof PluginTask and !$task->getOwner()->isEnabled()){
|
||||
throw new PluginException("Plugin '" . $task->getOwner()->getName() . "' attempted to register a task while disabled");
|
||||
if(!$this->enabled){
|
||||
throw new \InvalidStateException("Tried to schedule task to disabled scheduler");
|
||||
}
|
||||
|
||||
if($delay <= 0){
|
||||
@ -209,7 +160,7 @@ class ServerScheduler{
|
||||
$period = 1;
|
||||
}
|
||||
|
||||
return $this->handle(new TaskHandler(get_class($task), $task, $this->nextId(), $delay, $period));
|
||||
return $this->handle(new TaskHandler($task, $this->nextId(), $delay, $period, $this->owner));
|
||||
}
|
||||
|
||||
private function handle(TaskHandler $handler) : TaskHandler{
|
||||
@ -227,8 +178,12 @@ class ServerScheduler{
|
||||
}
|
||||
|
||||
public function shutdown() : void{
|
||||
$this->enabled = false;
|
||||
$this->cancelAllTasks();
|
||||
$this->asyncPool->shutdown();
|
||||
}
|
||||
|
||||
public function setEnabled(bool $enabled) : void{
|
||||
$this->enabled = $enabled;
|
||||
}
|
||||
|
||||
/**
|
||||
@ -242,30 +197,32 @@ class ServerScheduler{
|
||||
if($task->isCancelled()){
|
||||
unset($this->tasks[$task->getTaskId()]);
|
||||
continue;
|
||||
}else{
|
||||
$task->timings->startTiming();
|
||||
try{
|
||||
$task->run($this->currentTick);
|
||||
}catch(\Throwable $e){
|
||||
Server::getInstance()->getLogger()->critical("Could not execute task " . $task->getTaskName() . ": " . $e->getMessage());
|
||||
Server::getInstance()->getLogger()->logException($e);
|
||||
}
|
||||
$task->timings->stopTiming();
|
||||
}
|
||||
$crashed = false;
|
||||
try{
|
||||
$task->run($this->currentTick);
|
||||
}catch(\Throwable $e){
|
||||
$crashed = true;
|
||||
$this->logger->critical("Could not execute task " . $task->getTaskName() . ": " . $e->getMessage());
|
||||
$this->logger->logException($e);
|
||||
}
|
||||
if($task->isRepeating()){
|
||||
$task->setNextRun($this->currentTick + $task->getPeriod());
|
||||
$this->queue->insert($task, $this->currentTick + $task->getPeriod());
|
||||
}else{
|
||||
$task->remove();
|
||||
unset($this->tasks[$task->getTaskId()]);
|
||||
if($crashed){
|
||||
$this->logger->debug("Dropping repeating task " . $task->getTaskName() . " due to exceptions thrown while running");
|
||||
}else{
|
||||
$task->setNextRun($this->currentTick + $task->getPeriod());
|
||||
$this->queue->insert($task, $this->currentTick + $task->getPeriod());
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
$this->asyncPool->collectTasks();
|
||||
$task->remove();
|
||||
unset($this->tasks[$task->getTaskId()]);
|
||||
}
|
||||
}
|
||||
|
||||
private function isReady(int $currentTicks) : bool{
|
||||
return count($this->tasks) > 0 and $this->queue->current()->getNextRun() <= $currentTicks;
|
||||
private function isReady(int $currentTick) : bool{
|
||||
return count($this->tasks) > 0 and $this->queue->current()->getNextRun() <= $currentTick;
|
||||
}
|
||||
|
||||
/**
|
||||
@ -274,5 +231,4 @@ class ServerScheduler{
|
||||
private function nextId() : int{
|
||||
return $this->ids++;
|
||||
}
|
||||
|
||||
}
|
@ -27,7 +27,6 @@ use pocketmine\entity\Entity;
|
||||
use pocketmine\network\mcpe\protocol\DataPacket;
|
||||
use pocketmine\Player;
|
||||
use pocketmine\plugin\PluginManager;
|
||||
use pocketmine\scheduler\PluginTask;
|
||||
use pocketmine\scheduler\TaskHandler;
|
||||
use pocketmine\tile\Tile;
|
||||
|
||||
@ -149,19 +148,8 @@ abstract class Timings{
|
||||
*
|
||||
* @return TimingsHandler
|
||||
*/
|
||||
public static function getPluginTaskTimings(TaskHandler $task, int $period) : TimingsHandler{
|
||||
$ftask = $task->getTask();
|
||||
if($ftask instanceof PluginTask and $ftask->getOwner() !== null){
|
||||
$plugin = $ftask->getOwner()->getDescription()->getFullName();
|
||||
}elseif($task->timingName !== null){
|
||||
$plugin = "Scheduler";
|
||||
}else{
|
||||
$plugin = "Unknown";
|
||||
}
|
||||
|
||||
$taskname = $task->getTaskName();
|
||||
|
||||
$name = "Task: " . $plugin . " Runnable: " . $taskname;
|
||||
public static function getScheduledTaskTimings(TaskHandler $task, int $period) : TimingsHandler{
|
||||
$name = "Task: " . ($task->getOwnerName() ?? "Unknown") . " Runnable: " . $task->getTaskName();
|
||||
|
||||
if($period > 0){
|
||||
$name .= "(interval:" . $period . ")";
|
||||
|
@ -149,7 +149,7 @@ class AutoUpdater{
|
||||
* Schedules an AsyncTask to check for an update.
|
||||
*/
|
||||
public function doCheck(){
|
||||
$this->server->getScheduler()->scheduleAsyncTask(new UpdateCheckTask($this->endpoint, $this->getChannel()));
|
||||
$this->server->getAsyncPool()->submitTask(new UpdateCheckTask($this->endpoint, $this->getChannel()));
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -217,7 +217,7 @@ class Config{
|
||||
}
|
||||
|
||||
if($async){
|
||||
Server::getInstance()->getScheduler()->scheduleAsyncTask(new FileWriteTask($this->file, $content));
|
||||
Server::getInstance()->getAsyncPool()->submitTask(new FileWriteTask($this->file, $content));
|
||||
}else{
|
||||
file_put_contents($this->file, $content);
|
||||
}
|
||||
|
@ -1 +1 @@
|
||||
Subproject commit e60eba0324e4540abae317630ed758d3814e0137
|
||||
Subproject commit 8906ba9831c19cc36d66bb90c43ee1f9d674b0b2
|
Loading…
x
Reference in New Issue
Block a user