From e6485c47341238a2121bfa0f3e70557f4e8fdb3d Mon Sep 17 00:00:00 2001 From: SOFe Date: Sat, 12 Nov 2016 17:31:59 +0800 Subject: [PATCH 1/7] Added AsyncTask progress update API --- src/pocketmine/scheduler/AsyncPool.php | 7 ++- src/pocketmine/scheduler/AsyncTask.php | 57 +++++++++++++++++++- src/pocketmine/scheduler/ServerScheduler.php | 36 ++++++++++--- 3 files changed, 91 insertions(+), 9 deletions(-) diff --git a/src/pocketmine/scheduler/AsyncPool.php b/src/pocketmine/scheduler/AsyncPool.php index 6c81d8c15..b8069501c 100644 --- a/src/pocketmine/scheduler/AsyncPool.php +++ b/src/pocketmine/scheduler/AsyncPool.php @@ -142,8 +142,13 @@ class AsyncPool{ Timings::$schedulerAsyncTimer->startTiming(); foreach($this->tasks as $task){ + if($task->progressUpdates !== null){ + if($task->progressUpdates->count() !== 0){ + $progress = $task->progressUpdates->shift(); + $task->onProgressUpdate($this->server, $progress); + } + } if($task->isGarbage() and !$task->isRunning() and !$task->isCrashed()){ - if(!$task->hasCancelledRun()){ $task->onCompletion($this->server); $this->server->getScheduler()->removeLocalComplex($task); diff --git a/src/pocketmine/scheduler/AsyncTask.php b/src/pocketmine/scheduler/AsyncTask.php index 52d1defc2..295b881b1 100644 --- a/src/pocketmine/scheduler/AsyncTask.php +++ b/src/pocketmine/scheduler/AsyncTask.php @@ -34,6 +34,9 @@ abstract class AsyncTask extends Collectable{ /** @var AsyncWorker $worker */ public $worker = null; + /** @var \Threaded */ + public $progressUpdates = null; + private $result = null; private $serialized = false; private $cancelRun = false; @@ -66,6 +69,7 @@ abstract class AsyncTask extends Collectable{ } public function run(){ + $this->progressUpdates = new \Threaded; // Do not move this to __construct for backwards compatibility. $this->result = null; if($this->cancelRun !== true){ @@ -169,7 +173,36 @@ abstract class AsyncTask extends Collectable{ } /** - * Call this method from {@link #onCompletion} to fetch the data stored in the constructor, if any. + * Call this method from {@link AsyncTask#onRun} (AsyncTask execution therad) to schedule a call to + * {@link AsyncTask#onProgressUpdate} from the main thread with the given progress parameter. + * + * @param \Threaded|mixed $progress A Threaded object, or a value that can be safely serialize()'ed. + */ + public function publishProgress($progress){ + $this->progressUpdates[] = $progress; + } + + /** + * Called from the main thread after {@link AsyncTask#publishProgress} is called. + * All {@link AsyncTask#publishProgress} calls should result in {@link AsyncTask#onProgressUpdate} calls before + * {@link AsyncTask#onCompletion} is called. + * + * @param Server $server + * @param \Threaded|mixed $progress The parameter passed to {@link AsyncTask#publishProgress}. If it is not a + * Threaded object, it would be serialize()'ed and later unserialize()'ed, as if it + * has been cloned. + */ + public function onProgressUpdate(Server $server, $progress){ + + } + + /** + * Call this method from {@link AsyncTask#onCompletion} to fetch the data stored in the constructor, if any, and + * clears it from the storage. + * + * Do not call this method from {@link AsyncTask#onProgressUpdate}, because this method deletes the data and cannot + * be used in the next {@link AsyncTask#onProgressUpdate} call or from {@link AsyncTask#onCompletion}. Use + * {@link AsyncTask#peekLocal} instead. * * @param Server $server default null * @@ -186,6 +219,28 @@ abstract class AsyncTask extends Collectable{ return $server->getScheduler()->fetchLocalComplex($this); } + /** + * Call this method from {@link AsyncTask#onProgressUpdate} to fetch the data stored in the constructor. + * + * Use {@link AsyncTask#peekLocal} instead from {@link AsyncTask#onCompletion}, because this method does not delete + * the data, and not clearing the data will result in a warning for memory leak after {@link AsyncTask#onCompletion} + * finished executing. + * + * @param Server|null $server default null + * + * @return mixed + * + * @throws \RuntimeException if no data were stored by this AsyncTask instance + */ + protected function peekLocal(Server $server = null){ + if($server === null){ + $server = Server::getInstance(); + assert($server !== null, "Call this method only from the main thread!"); + } + + return $server->getScheduler()->peekLocalComplex($this); + } + public function cleanObject(){ foreach($this as $p => $v){ if(!($v instanceof \Threaded)){ diff --git a/src/pocketmine/scheduler/ServerScheduler.php b/src/pocketmine/scheduler/ServerScheduler.php index 12beb4e51..b25453648 100644 --- a/src/pocketmine/scheduler/ServerScheduler.php +++ b/src/pocketmine/scheduler/ServerScheduler.php @@ -50,7 +50,7 @@ class ServerScheduler{ /** @var int */ protected $currentTick = 0; - /** @var \SplObjectStorage */ + /** @var \SplObjectStorage */ protected $objectStore; public function __construct(){ @@ -100,7 +100,7 @@ class ServerScheduler{ * * @internal Only call from AsyncTask.php * - * @param AsyncTask $for + * @param AsyncTask $for * @param object|array $cmplx * * @throws \RuntimeException if this method is called twice for the same instance of AsyncTask @@ -113,17 +113,39 @@ class ServerScheduler{ } /** - * Fetches data that must not be passed to other threads or be serialized, previously stored with {@link #storeLocalComplex} + * Fetches data that must not be passed to other threads or be serialized, previously stored with + * {@link ServerScheduler#storeLocalComplex}, without deletion of the data. * * @internal Only call from AsyncTask.php * * @param AsyncTask $for * - * @throws \RuntimException if no data associated with this AsyncTask can be found - */ + * @return object|array + * + * @throws \RuntimeException if no data associated with this AsyncTask can be found + */ + public function peekLocalComplex(AsyncTask $for){ + if(!isset($this->objectStore[$for])){ + throw new \RuntimeException("No local complex stored for this AsyncTask"); + } + return $this->objectStore[$for]; + } + + /** + * Fetches data that must not be passed to other threads or be serialized, previously stored with + * {@link ServerScheduler#storeLocalComplex}, and delete the data from the storage. + * + * @internal Only call from AsyncTask.php + * + * @param AsyncTask $for + * + * @return object|array + * + * @throws \RuntimeException if no data associated with this AsyncTask can be found + */ public function fetchLocalComplex(AsyncTask $for){ if(!isset($this->objectStore[$for])){ - throw new \RuntimeException("Attempt to fetch undefined complex"); + throw new \RuntimeException("No local complex stored for this AsyncTask"); } $cmplx = $this->objectStore[$for]; unset($this->objectStore[$for]); @@ -138,7 +160,7 @@ class ServerScheduler{ * @param AsyncTask $for * * @return bool returns false if any data are removed from this call, true otherwise - */ + */ public function removeLocalComplex(AsyncTask $for) : bool{ if(isset($this->objectStore[$for])){ Server::getInstance()->getLogger()->notice("AsyncTask " . get_class($for) . " stored local complex data but did not remove them after completion"); From 7861822a0fff941b510a9e75414b97a02bff1aaa Mon Sep 17 00:00:00 2001 From: SOFe Date: Sat, 12 Nov 2016 17:47:07 +0800 Subject: [PATCH 2/7] Fixed garbage AsyncTasks cannot be accessed --- src/pocketmine/scheduler/AsyncPool.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/pocketmine/scheduler/AsyncPool.php b/src/pocketmine/scheduler/AsyncPool.php index b8069501c..a4bdb55d1 100644 --- a/src/pocketmine/scheduler/AsyncPool.php +++ b/src/pocketmine/scheduler/AsyncPool.php @@ -142,7 +142,7 @@ class AsyncPool{ Timings::$schedulerAsyncTimer->startTiming(); foreach($this->tasks as $task){ - if($task->progressUpdates !== null){ + if(!$task->isGarbage() and $task->progressUpdates !== null){ if($task->progressUpdates->count() !== 0){ $progress = $task->progressUpdates->shift(); $task->onProgressUpdate($this->server, $progress); From 8404ce88bd6ea2beba7057dbb583e5052b185290 Mon Sep 17 00:00:00 2001 From: SOFe Date: Sat, 12 Nov 2016 18:30:55 +0800 Subject: [PATCH 3/7] Fixed pthreads crashes with progressUpdates --- src/pocketmine/scheduler/AsyncPool.php | 7 ++----- src/pocketmine/scheduler/AsyncTask.php | 15 +++++++++++++-- src/pocketmine/scheduler/ServerScheduler.php | 1 + 3 files changed, 16 insertions(+), 7 deletions(-) diff --git a/src/pocketmine/scheduler/AsyncPool.php b/src/pocketmine/scheduler/AsyncPool.php index a4bdb55d1..22433690c 100644 --- a/src/pocketmine/scheduler/AsyncPool.php +++ b/src/pocketmine/scheduler/AsyncPool.php @@ -142,11 +142,8 @@ class AsyncPool{ Timings::$schedulerAsyncTimer->startTiming(); foreach($this->tasks as $task){ - if(!$task->isGarbage() and $task->progressUpdates !== null){ - if($task->progressUpdates->count() !== 0){ - $progress = $task->progressUpdates->shift(); - $task->onProgressUpdate($this->server, $progress); - } + if(!$task->isGarbage()){ + $task->checkProgressUpdates($this->server); } if($task->isGarbage() and !$task->isRunning() and !$task->isCrashed()){ if(!$task->hasCancelledRun()){ diff --git a/src/pocketmine/scheduler/AsyncTask.php b/src/pocketmine/scheduler/AsyncTask.php index 295b881b1..eeabd63d4 100644 --- a/src/pocketmine/scheduler/AsyncTask.php +++ b/src/pocketmine/scheduler/AsyncTask.php @@ -35,7 +35,7 @@ abstract class AsyncTask extends Collectable{ public $worker = null; /** @var \Threaded */ - public $progressUpdates = null; + public $progressUpdates; private $result = null; private $serialized = false; @@ -69,7 +69,6 @@ abstract class AsyncTask extends Collectable{ } public function run(){ - $this->progressUpdates = new \Threaded; // Do not move this to __construct for backwards compatibility. $this->result = null; if($this->cancelRun !== true){ @@ -182,6 +181,18 @@ abstract class AsyncTask extends Collectable{ $this->progressUpdates[] = $progress; } + /** + * @internal Only call from AsyncPool.php on the main thread + * + * @param Server $server + */ + public function checkProgressUpdates(Server $server){ + if($this->progressUpdates->count() !== 0){ + $progress = $this->progressUpdates->shift(); + $this->onProgressUpdate($server, $progress); + } + } + /** * Called from the main thread after {@link AsyncTask#publishProgress} is called. * All {@link AsyncTask#publishProgress} calls should result in {@link AsyncTask#onProgressUpdate} calls before diff --git a/src/pocketmine/scheduler/ServerScheduler.php b/src/pocketmine/scheduler/ServerScheduler.php index b25453648..76e8a5566 100644 --- a/src/pocketmine/scheduler/ServerScheduler.php +++ b/src/pocketmine/scheduler/ServerScheduler.php @@ -78,6 +78,7 @@ class ServerScheduler{ public function scheduleAsyncTask(AsyncTask $task){ $id = $this->nextId(); $task->setTaskId($id); + $task->progressUpdates = new \Threaded; $this->asyncPool->submitTask($task); } From d5881dbe83afb5b0d0d70d8faa6499f765410602 Mon Sep 17 00:00:00 2001 From: SOFe Date: Sat, 12 Nov 2016 18:33:25 +0800 Subject: [PATCH 4/7] Disallow Threaded objects to be passed as progress parameter --- src/pocketmine/scheduler/AsyncTask.php | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/src/pocketmine/scheduler/AsyncTask.php b/src/pocketmine/scheduler/AsyncTask.php index eeabd63d4..4d2a48912 100644 --- a/src/pocketmine/scheduler/AsyncTask.php +++ b/src/pocketmine/scheduler/AsyncTask.php @@ -175,7 +175,7 @@ abstract class AsyncTask extends Collectable{ * Call this method from {@link AsyncTask#onRun} (AsyncTask execution therad) to schedule a call to * {@link AsyncTask#onProgressUpdate} from the main thread with the given progress parameter. * - * @param \Threaded|mixed $progress A Threaded object, or a value that can be safely serialize()'ed. + * @param mixed $progress A value that can be safely serialize()'ed. */ public function publishProgress($progress){ $this->progressUpdates[] = $progress; @@ -198,10 +198,9 @@ abstract class AsyncTask extends Collectable{ * All {@link AsyncTask#publishProgress} calls should result in {@link AsyncTask#onProgressUpdate} calls before * {@link AsyncTask#onCompletion} is called. * - * @param Server $server - * @param \Threaded|mixed $progress The parameter passed to {@link AsyncTask#publishProgress}. If it is not a - * Threaded object, it would be serialize()'ed and later unserialize()'ed, as if it - * has been cloned. + * @param Server $server + * @param mixed $progress The parameter passed to {@link AsyncTask#publishProgress}. It is serialize()'ed + * and then unserialize()'ed, as if it has been cloned. */ public function onProgressUpdate(Server $server, $progress){ From 75fa2f1132bfbdd8f1945445360706704d05e77c Mon Sep 17 00:00:00 2001 From: SOFe Date: Sat, 12 Nov 2016 18:57:52 +0800 Subject: [PATCH 5/7] Fixed array progress crashing Forgot to serialize them. They would be converted to Volatile, which is Threaded. Threaded objects still crash with progressUpdates. --- src/pocketmine/scheduler/AsyncTask.php | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/pocketmine/scheduler/AsyncTask.php b/src/pocketmine/scheduler/AsyncTask.php index 4d2a48912..86d132387 100644 --- a/src/pocketmine/scheduler/AsyncTask.php +++ b/src/pocketmine/scheduler/AsyncTask.php @@ -178,7 +178,7 @@ abstract class AsyncTask extends Collectable{ * @param mixed $progress A value that can be safely serialize()'ed. */ public function publishProgress($progress){ - $this->progressUpdates[] = $progress; + $this->progressUpdates[] = serialize($progress); } /** @@ -187,9 +187,9 @@ abstract class AsyncTask extends Collectable{ * @param Server $server */ public function checkProgressUpdates(Server $server){ - if($this->progressUpdates->count() !== 0){ + while($this->progressUpdates->count() !== 0){ $progress = $this->progressUpdates->shift(); - $this->onProgressUpdate($server, $progress); + $this->onProgressUpdate($server, unserialize($progress)); } } From ab4d6b1e36f4b26bbefa7bcf983e2578a3f048a1 Mon Sep 17 00:00:00 2001 From: SOFe Date: Sun, 13 Nov 2016 18:14:41 +0800 Subject: [PATCH 6/7] Clarified that AsyncTask should not run for a long time --- src/pocketmine/scheduler/AsyncTask.php | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/pocketmine/scheduler/AsyncTask.php b/src/pocketmine/scheduler/AsyncTask.php index 86d132387..98d3f8cdc 100644 --- a/src/pocketmine/scheduler/AsyncTask.php +++ b/src/pocketmine/scheduler/AsyncTask.php @@ -27,6 +27,10 @@ use pocketmine\Server; /** * Class used to run async tasks in other threads. * + * An AsyncTask does not have its own thread. It is queued into an AsyncPool and executed if there is an async worker + * with no AsyncTask running. Therefore, an AsyncTask SHOULD NOT execute for more than a few seconds. For tasks that + * run for a long time or infinitely, start another {@link \pocketmine\Thread} instead. + * * WARNING: Do not call PocketMine-MP API methods, or save objects (and arrays cotaining objects) from/on other Threads!! */ abstract class AsyncTask extends Collectable{ From 37ae760417c6774a63690e278368b200c6342be1 Mon Sep 17 00:00:00 2001 From: SOFe Date: Sun, 13 Nov 2016 18:32:11 +0800 Subject: [PATCH 7/7] Fixed crashes after scheduleAsyncTaskToWorker() calls --- src/pocketmine/scheduler/ServerScheduler.php | 1 + 1 file changed, 1 insertion(+) diff --git a/src/pocketmine/scheduler/ServerScheduler.php b/src/pocketmine/scheduler/ServerScheduler.php index 76e8a5566..16edf934f 100644 --- a/src/pocketmine/scheduler/ServerScheduler.php +++ b/src/pocketmine/scheduler/ServerScheduler.php @@ -93,6 +93,7 @@ class ServerScheduler{ public function scheduleAsyncTaskToWorker(AsyncTask $task, $worker){ $id = $this->nextId(); $task->setTaskId($id); + $task->progressUpdates = new \Threaded; $this->asyncPool->submitTaskToWorker($task, $worker); }