diff --git a/src/PocketMine/PocketMine.php b/src/PocketMine/PocketMine.php index c3520cc18..778a614f2 100644 --- a/src/PocketMine/PocketMine.php +++ b/src/PocketMine/PocketMine.php @@ -380,16 +380,14 @@ namespace PocketMine { define("PocketMine\\GIT_COMMIT", str_repeat("00", 20)); } - ini_set("opcache.mmap_base", bin2hex(Utils\Utils::getRandomBytes(8, false))); //Fix OPCache address errors - - require_once(\PocketMine\PATH . "src/pthreads.php"); + @ini_set("opcache.mmap_base", bin2hex(Utils\Utils::getRandomBytes(8, false))); //Fix OPCache address errors if(!file_exists(\PocketMine\DATA . "server.properties") and !isset($opts["no-wizard"])){ new Wizard\Installer(); } if(!defined("PARENT_API_EXISTENT")){ - $server = new ServerAPI(); + $server = new Server(); $server->start(); kill(getmypid()); //Fix for ConsoleAPI being blocked diff --git a/src/PocketMine/event/scheduler/ServerAsyncTask.php b/src/PocketMine/event/scheduler/ServerAsyncTask.php new file mode 100644 index 000000000..53bec5378 --- /dev/null +++ b/src/PocketMine/event/scheduler/ServerAsyncTask.php @@ -0,0 +1,102 @@ + + */ + private $workers; + + /** + * @var \Threaded + */ + private $runners; + + + public function __construct(\Threaded $runners, Plugin $plugin, \Threaded $task, $id, $delay){ + parent::__construct($plugin, $task, $id, $delay); + $this->runners = $runners; + $this->workers = new \Threaded(); + } + + public function isSync(){ + return false; + } + + public function run(){ + $thread = \Thread::getCurrentThread(); + $this->workers->synchronized(function($workers, \Thread $thread, ServerAsyncTask $asyncTask){ + if($asyncTask->getPeriod() === -2){ + return; + } + $workers[] = new ServerWorker($asyncTask, $thread); + }, $this->workers, $thread, $this); + + parent::run(); + + $this->workers->synchronized(function(\Threaded $workers, \Threaded $runners, \Thread $thread){ + $removed = false; + foreach($workers as $index => $worker){ + if($worker->getThread() === $thread){ + unset($workers[$index]); + $removed = true; + break; + } + } + + if(!$removed){ + trigger_error("Unable to remove worker ".$thread->getThreadId()." on task ".$this->getTaskId()." for ".$this->getOwner()->getDescription()->getName(), E_USER_WARNING); + } + + if($this->getPeriod() < 0 and $this->workers->count() === 0){ + unset($runners[$this->getTaskId()]); + } + }, $this->workers, $this->runners, $thread); + } + + /** + * @return \Threaded + */ + public function getWorkers(){ + return $this->workers; + } + + /** + * @return bool + */ + public function cancel0(){ + $this->workers->synchronized(function(ServerAsyncTask $asyncTask, \Threaded $runners, \Threaded $workers){ + $asyncTask->setPeriod(-2); + if($workers->count() === 0){ + unset($runners[$asyncTask->getTaskId()]); + } + }, $this, $this->runners, $this->workers); + return true; + } + + +} \ No newline at end of file diff --git a/src/PocketMine/event/scheduler/ServerCallable.php b/src/PocketMine/event/scheduler/ServerCallable.php new file mode 100644 index 000000000..8a7e10772 --- /dev/null +++ b/src/PocketMine/event/scheduler/ServerCallable.php @@ -0,0 +1,35 @@ +callable = $callable; + } + + public function cancel(){ + if($this->getPeriod() !== -1){ + return false; + } + $this->setPeriod(-2); + return true; + } + + public function isCancelled(){ + return $this->getPeriod() === -2; + } + + public function isDone(){ + return $this->getPeriod() !== -1 and $this->getPeriod() !== -3; + } + + /** + * @param int $timeout Microseconds to wait + * + * @return mixed|null + */ + public function get($timeout){ + $period = $this->getPeriod(); + $timestamp = $timeout > 0 ? (int) (microtime(true) / 1000000) : 0; + + while(true){ + if($period === -1 or $period === -3){ + $this->wait($timeout); + $period = $this->getPeriod(); + if($period === -1 or $period === -3){ + if($timeout === 0){ + continue; + } + $timeout += $timestamp - ($timestamp = (int) (microtime(true) / 1000000)); + + if($timeout > 0){ + continue; + } + return null; + } + } + if($period === -2){ + return null; //Cancelled + } + if($period === -4){ + return $this->value; + } + return null; //Invalid state + } + } + + public function run(){ + if($this->synchronized(function(ServerFuture $future){ + if($future->getPeriod() === -2){ + return false; + } + $future->setPeriod(-3); + return true; + }, $this) === false){ + return; + } + + $this->callable->run(); + $this->value = $this->callable->getResult(); + + $this->synchronized(function(ServerFuture $future){ + $future->setPeriod(-4); + $future->notify(); + }, $this); + } + + public function cancel0(){ + if($this->getPeriod() !== -1){ + return false; + } + $this->setPeriod(-2); + $this->notify(); + return true; + } +} \ No newline at end of file diff --git a/src/PocketMine/event/scheduler/ServerRunnable.php b/src/PocketMine/event/scheduler/ServerRunnable.php new file mode 100644 index 000000000..e75eae0ae --- /dev/null +++ b/src/PocketMine/event/scheduler/ServerRunnable.php @@ -0,0 +1,152 @@ +cancelTask($this->getTaskId()); + } + + /** + * Runs the task on the next tick + * + * @param Plugin $plugin + * + * @return ServerTask|null + */ + public function runTask(Plugin $plugin){ + if($this->checkState()){ + return $this->setupId(ServerScheduler::getInstance()->runTask($plugin, $this)); + } + return null; + } + + /** + * Schedules the task to run asynchronously + * + * @param Plugin $plugin + * + * @return ServerTask|null + */ + public function runTaskAsynchronously(Plugin $plugin){ + if($this->checkState()){ + return $this->setupId(ServerScheduler::getInstance()->runTaskAsynchronously($plugin, $this)); + } + return null; + } + + /** + * Runs the task after a number of server ticks + * + * @param Plugin $plugin + * @param int $delay + * + * @return ServerTask|null + */ + public function runTaskLater(Plugin $plugin, $delay){ + if($this->checkState()){ + return $this->setupId(ServerScheduler::getInstance()->runTaskLater($plugin, $this, $delay)); + } + return null; + } + + /** + * @param Plugin $plugin + * @param int $delay + * + * @return ServerTask|null + */ + public function runTaskLaterAsynchronously(Plugin $plugin, $delay){ + if($this->checkState()){ + return $this->setupId(ServerScheduler::getInstance()->runTaskLaterAsynchronously($plugin, $this, $delay)); + } + return null; + } + + /** + * @param Plugin $plugin + * @param int $delay + * @param int $period + * + * @return ServerTask|null + */ + public function runTaskTimer(Plugin $plugin, $delay, $period){ + if($this->checkState()){ + return $this->setupId(ServerScheduler::getInstance()->runTaskTimer($plugin, $this, $delay, $period)); + } + return null; + } + + /** + * @param Plugin $plugin + * @param int $delay + * @param int $period + * + * @return ServerTask|null + */ + public function runTaskTimerAsynchronously(Plugin $plugin, $delay, $period){ + if($this->checkState()){ + return $this->setupId(ServerScheduler::getInstance()->runTaskTimerAsynchronously($plugin, $this, $delay, $period)); + } + return null; + } + + /** + * @return int + */ + public function getTaskId(){ + if($this->taskId === -1){ + return -1; + } + return $this->taskId; + } + + + public function checkState(){ + if($this->taskId !== -1){ + return false; + } + return true; + } + + public function setupId(ServerTask $task){ + if($task !== null){ + $this->taskId = $task->getTaskId(); + } + return $task; + } + + +} \ No newline at end of file diff --git a/src/PocketMine/event/scheduler/ServerScheduler.php b/src/PocketMine/event/scheduler/ServerScheduler.php new file mode 100644 index 000000000..1534b403c --- /dev/null +++ b/src/PocketMine/event/scheduler/ServerScheduler.php @@ -0,0 +1,510 @@ + + */ + private $pending; + + /** + * @var ServerTask[] + */ + private $temp = array(); + + /** + * @var \Threaded + */ + private $runners; + + /** + * @var int + */ + private $currentTick = -1; + + /** + * @var \Pool[] + */ + private $executor; + + /** + * @return ServerScheduler + */ + public static function getInstance(){ + return self::$instance; + } + + /** + * @param int $workers + */ + public function __construct($workers = 2){ + self::$instance = $this; + $this->head = new ServerTask(); + $this->tail = new ServerTask(); + $this->pending = new \SplPriorityQueue(); + $this->temp = array(); + $this->runners = new \Threaded(); + $this->executor = new \Pool($workers); + + } + + /** + * @param Plugin $plugin + * @param \Threaded $task + * @param int $delay + * + * @return int taskId + */ + public function scheduleSyncDelayedTask(Plugin $plugin, \Threaded $task, $delay = 0){ + return $this->scheduleSyncRepeatingTask($plugin, $task, $delay, -1); + } + + /** + * @param Plugin $plugin + * @param \Threaded $task + * @param int $delay + * + * @return int taskId + */ + public function scheduleAsyncDelayedTask(Plugin $plugin, \Threaded $task, $delay = 0){ + return $this->scheduleAsyncRepeatingTask($plugin, $task, $delay, -1); + } + + /** + * @param Plugin $plugin + * @param \Threaded $task + * + * @return ServerTask|null + */ + public function runTaskAsynchronously(Plugin $plugin, \Threaded $task){ + return $this->runTaskLaterAsynchronously($plugin, $task, 0); + } + + /** + * @param Plugin $plugin + * @param \Threaded $task + * + * @return ServerTask|null + */ + public function runTask(Plugin $plugin, \Threaded $task){ + return $this->runTaskLater($plugin, $task, 0); + } + + /** + * @param Plugin $plugin + * @param \Threaded $task + * @param int $delay + * + * @return ServerTask|null + */ + public function runTaskLater(Plugin $plugin, \Threaded $task, $delay = 0){ + return $this->runTaskTimer($plugin, $task, $delay, -1); + } + + /** + * @param Plugin $plugin + * @param \Threaded $task + * @param int $delay + * + * @return ServerTask|null + */ + public function runTaskLaterAsynchronously(Plugin $plugin, \Threaded $task, $delay){ + return $this->runTaskTimerAsynchronously($plugin, $task, $delay, -1); + } + + /** + * @param Plugin $plugin + * @param \Threaded $task + * @param int $delay + * @param int $period + * + * @return int taskId + */ + public function scheduleSyncRepeatingTask(Plugin $plugin, \Threaded $task, $delay, $period){ + return $this->runTaskTimer($plugin, $task, $delay, $period)->getTaskId(); + } + + /** + * @param Plugin $plugin + * @param \Threaded $task + * @param int $delay + * @param int $period + * + * @return int taskId + */ + public function scheduleAsyncRepeatingTask(Plugin $plugin, \Threaded $task, $delay, $period){ + return $this->runTaskTimerAsynchronously($plugin, $task, $delay, $period)->getTaskId(); + } + + /** + * @param Plugin $plugin + * @param \Threaded $task + * @param int $delay + * @param int $period + * + * @return ServerTask|null + */ + public function runTaskTimer(Plugin $plugin, \Threaded $task, $delay, $period){ + if(!$this->validate($plugin, $task)){ + return null; + } + + if($delay < 0){ + $delay = 0; + } + + if($period === 0){ + $period = 1; + }elseif($period < -1){ + $period = -1; + } + + return $this->handle(new ServerTask($plugin, $task, $this->nextId(), $period), $delay); + } + + /** + * @param Plugin $plugin + * @param \Threaded $task + * @param int $delay + * @param int $period + * + * @return ServerTask|null + */ + public function runTaskTimerAsynchronously(Plugin $plugin, \Threaded $task, $delay, $period){ + if(!$this->validate($plugin, $task)){ + return null; + } + + if($delay < 0){ + $delay = 0; + } + + if($period === 0){ + $period = 1; + }elseif($period < -1){ + $period = -1; + } + + return $this->handle(new ServerAsyncTask($this->runners, $plugin, $task, $this->nextId(), $period), $delay); + } + + /** + * @param Plugin $plugin + * @param ServerCallable $task + * + * @return ServerFuture|null + */ + public function callSyncMethod(Plugin $plugin, ServerCallable $task){ + if(!$this->validate($plugin, $task)){ + return null; + } + $future = new ServerFuture($task, $plugin, $this->nextId()); + $this->handle($future, 0); + return $future; + } + + /** + * @param int $taskId + */ + public function cancelTask($taskId){ + if($taskId < 0){ + return; + } + + if(isset($this->runners[$taskId])){ + $this->runners[$taskId]->cancel0(); + } + + $task = new ServerTask(null, new ServerTaskCanceller($taskId)); + $this->handle($task, 0); + for($taskPending = $this->head->getNext(); $taskPending !== null; $taskPending = $taskPending->getNext()){ + if($taskPending === $task){ + return; + } + if($taskPending->getTaskId() === $taskId){ + $taskPending->cancel0(); + } + } + } + + /** + * @param Plugin $plugin + */ + public function cancelTasks(Plugin $plugin){ + if($plugin === null){ + return; + } + + $task = new ServerTask(null, new ServerPluginTaskCanceller($plugin)); + $this->handle($task, 0); + for($taskPending = $this->head->getNext(); $taskPending !== null; $taskPending = $taskPending->getNext()){ + if($taskPending === $task){ + return; + } + if($taskPending->getTaskId() !== -1 and $taskPending->getOwner() === $plugin){ + $taskPending->cancel0(); + } + } + + foreach($this->runners as $runner){ + if($runner->getOwner() === $plugin){ + $runner->cancel0(); + } + } + } + + /** + * + */ + public function cancelAllTasks(){ + $task = new ServerTask(null, new ServerAllTaskCanceller()); + $this->handle($task, 0); + for($taskPending = $this->head->getNext(); $taskPending !== null; $taskPending = $taskPending->getNext()){ + if($taskPending === $task){ + break; + } + $taskPending->cancel0(); + } + + foreach($this->runners as $runner){ + $runner->cancel0(); + } + } + + /** + * @param int $taskId + * + * @return bool + */ + public function isCurrentlyRunning($taskId){ + if(!isset($this->runners[$taskId]) or $this->runners[$taskId]->isSync()){ + return false; + } + + $asyncTask = $this->runners[$taskId]; + return $asyncTask->syncronized(function($asyncTask){ + return count($asyncTask->getWorkers()) === 0; + }, $asyncTask); + } + + /** + * @param int $taskId + * + * @return bool + */ + public function isQueued($taskId){ + if($taskId <= 0){ + return false; + } + + for($task = $this->head->getNext(); $task !== null; $task = $task->getNext()){ + if($task->getTaskId() === $taskId){ + return $task->getPeriod() >= -1; + } + } + + if(isset($this->runners[$taskId])){ + return $this->runners[$taskId]->getPeriod() >= -1; + } + return false; + } + + /** + * @return ServerWorker[] + */ + public function getActiveWorkers(){ + $workers = array();//new \Threaded(); + + foreach($this->runners as $taskObj){ + if($taskObj->isSync()){ + continue; + } + + $taskObj->syncronized(function($workers, $taskObj){ + foreach($taskObj->getWorkers() as $worker){ + $workers[] = $worker; + } + }, $workers, $taskObj); + } + + //$workers->run(); //Protect against memory leaks + return $workers; + } + + /** + * @return ServerTask[] + */ + public function getPendingTasks(){ + $truePending = array(); + for($task = $this->head->getNext(); $task !== null; $task = $task->getNext()){ + if($task->getTaskId() !== -1){ + $truePending[] = $task; + } + } + + $pending = array(); + foreach($this->runners as $task){ + if($task->getPeriod() >= -1){ + $pending[] = $task; + } + } + + foreach($truePending as $task){ + if($task->getPeriod() >= -1 and !in_array($pending, $task, true)){ + $pending[] = $task; + } + } + + return $pending; + } + + /** + * @param int $currentTick + */ + public function mainThreadHeartbeat($currentTick){ + $this->currentTick = $currentTick; + $this->parsePending(); + while($this->isReady($currentTick)){ + $task = $this->pending->extract(); + if($task->getPeriod() < -1){ + if($task->isSync()){ + unset($this->runners[$task->getTaskId()]); + } + $this->parsePending(); + continue; + } + + if($task->isSync()){ + $task->run(); + }else{ + $this->executor->submit($task); + } + + $period = $task->getPeriod(); + if($period > 0){ + $task->setNextRun($currentTick + $period); + $this->temp[] = $task; + }elseif($task->isSync()){ + unset($this->runners[$task->getTaskId()]); + } + } + + foreach($this->temp as $task){ + $this->pending->insert($task, $task->getNextRun()); + } + $this->temp = array(); + + } + + /** + * @param ServerTask $task + */ + private function addTask(ServerTask $task){ + $this->tail->setNext($task); + } + + /** + * @param ServerTask $task + * @param int $delay + * + * @return ServerTask + */ + private function handle(ServerTask $task, $delay){ + $task->setNextRun($this->currentTick + $delay); + $this->addTask($task); + return $task; + } + + /** + * @param Plugin $plugin + * @param \Threaded $task + * + * @return bool + */ + private function validate(Plugin $plugin, \Threaded $task){ + if($plugin === null or $task === null){ + return false; + }elseif(!$plugin->isEnabled()){ + return false; + } + return true; + } + + /** + * @return int + */ + private function nextId(){ + return $this->ids++; + } + + private function parsePending(){ + $head = $this->head; + $task = $head->getNext(); + $lastTask = $head; + for(; $task !== null; $task = $lastTask->getNext()){ + if($task->getTaskId() === -1){ + $task->run(); + }elseif($task->getPeriod() >= -1){ + $this->pending[] = $task; + $this->runners[$task->getTaskId()] = $task; + } + $lastTask = $task; + } + + for($task = $head; $task !== $lastTask; $task = $head){ + $head = $task->getNext(); + $task->setNext(null); + } + $this->head = $lastTask; + } + + /** + * @param int $currentTick + * + * @return bool + */ + private function isReady($currentTick){ + return $this->pending->count() > 0 and $this->pending->top()->getNextRun() <= $currentTick; + } +} diff --git a/src/PocketMine/event/scheduler/ServerTask.php b/src/PocketMine/event/scheduler/ServerTask.php new file mode 100644 index 000000000..ab9b98c8a --- /dev/null +++ b/src/PocketMine/event/scheduler/ServerTask.php @@ -0,0 +1,132 @@ +0 means number of ticks to wait between each execution + * + * @var int + */ + private $period; + private $nextRun; + + /** + * @var \Threaded; + */ + private $task; + /** + * @var Plugin + */ + private $plugin; + + /** + * @var int + */ + private $id; + + public function __construct(Plugin $plugin, \Threaded $task, $id = -1, $period = -1){ + $this->plugin = $plugin; + $this->task = $task; + $this->id = $id; + $this->period = $period; + } + + /** + * @return int + */ + public function getTaskId(){ + return $this->id; + } + + /** + * @return Plugin + */ + public function getOwner(){ + return $this->plugin; + } + + public function isSync(){ + return true; + } + + public function run(){ + $this->task->run(); + } + + public function getPeriod(){ + return $this->period; + } + + /** + * @param int $period + */ + public function setPeriod($period){ + $this->period = $period; + } + + public function getNextRun(){ + return $this->nextRun; + } + + public function setNextRun($nextRun){ + $this->nextRun = $nextRun; + } + + /** + * @return ServerTask + */ + public function getNext(){ + return $this->next; + } + + public function setNext(ServerTask $next){ + $this->next = $next; + } + + public function getTaskClass(){ + return get_class($this->next); + } + + public function cancel(){ + ServerScheduler::getInstance()->cancelTask($this->id); + } + + public function cancel0(){ + $this->setPeriod(-2); + return true; + } + + +} \ No newline at end of file diff --git a/src/PocketMine/event/scheduler/ServerWorker.php b/src/PocketMine/event/scheduler/ServerWorker.php new file mode 100644 index 000000000..573e79b52 --- /dev/null +++ b/src/PocketMine/event/scheduler/ServerWorker.php @@ -0,0 +1,64 @@ +asyncTask = $asyncTask; + $this->thread = $thread; + } + + public function __destruct(){ + unset($this->asyncTask, $this->thread); + } + + /** + * @return int + */ + public function getTaskId(){ + return $this->asyncTask->getTaskId(); + } + + /** + * @return Plugin; + */ + public function getOwner(){ + return $this->asyncTask->getOwner(); + } + + /** + * @return \Thread + */ + public function getThread(){ + //TODO + } +} \ No newline at end of file