Merge branch 'asynctask-progress'

This commit is contained in:
SOFe 2016-11-19 23:17:26 +08:00
commit f0f6d85809
No known key found for this signature in database
GPG Key ID: A0379676C4D9D5D9
3 changed files with 104 additions and 9 deletions

View File

@ -142,8 +142,10 @@ class AsyncPool{
Timings::$schedulerAsyncTimer->startTiming();
foreach($this->tasks as $task){
if(!$task->isGarbage()){
$task->checkProgressUpdates($this->server);
}
if($task->isGarbage() and !$task->isRunning() and !$task->isCrashed()){
if(!$task->hasCancelledRun()){
$task->onCompletion($this->server);
$this->server->getScheduler()->removeLocalComplex($task);

View File

@ -27,6 +27,10 @@ use pocketmine\Server;
/**
* Class used to run async tasks in other threads.
*
* An AsyncTask does not have its own thread. It is queued into an AsyncPool and executed if there is an async worker
* with no AsyncTask running. Therefore, an AsyncTask SHOULD NOT execute for more than a few seconds. For tasks that
* run for a long time or infinitely, start another {@link \pocketmine\Thread} instead.
*
* WARNING: Do not call PocketMine-MP API methods, or save objects (and arrays cotaining objects) from/on other Threads!!
*/
abstract class AsyncTask extends Collectable{
@ -34,6 +38,9 @@ abstract class AsyncTask extends Collectable{
/** @var AsyncWorker $worker */
public $worker = null;
/** @var \Threaded */
public $progressUpdates;
private $result = null;
private $serialized = false;
private $cancelRun = false;
@ -169,7 +176,47 @@ abstract class AsyncTask extends Collectable{
}
/**
* Call this method from {@link #onCompletion} to fetch the data stored in the constructor, if any.
* Call this method from {@link AsyncTask#onRun} (AsyncTask execution therad) to schedule a call to
* {@link AsyncTask#onProgressUpdate} from the main thread with the given progress parameter.
*
* @param mixed $progress A value that can be safely serialize()'ed.
*/
public function publishProgress($progress){
$this->progressUpdates[] = serialize($progress);
}
/**
* @internal Only call from AsyncPool.php on the main thread
*
* @param Server $server
*/
public function checkProgressUpdates(Server $server){
while($this->progressUpdates->count() !== 0){
$progress = $this->progressUpdates->shift();
$this->onProgressUpdate($server, unserialize($progress));
}
}
/**
* Called from the main thread after {@link AsyncTask#publishProgress} is called.
* All {@link AsyncTask#publishProgress} calls should result in {@link AsyncTask#onProgressUpdate} calls before
* {@link AsyncTask#onCompletion} is called.
*
* @param Server $server
* @param mixed $progress The parameter passed to {@link AsyncTask#publishProgress}. It is serialize()'ed
* and then unserialize()'ed, as if it has been cloned.
*/
public function onProgressUpdate(Server $server, $progress){
}
/**
* Call this method from {@link AsyncTask#onCompletion} to fetch the data stored in the constructor, if any, and
* clears it from the storage.
*
* Do not call this method from {@link AsyncTask#onProgressUpdate}, because this method deletes the data and cannot
* be used in the next {@link AsyncTask#onProgressUpdate} call or from {@link AsyncTask#onCompletion}. Use
* {@link AsyncTask#peekLocal} instead.
*
* @param Server $server default null
*
@ -186,6 +233,28 @@ abstract class AsyncTask extends Collectable{
return $server->getScheduler()->fetchLocalComplex($this);
}
/**
* Call this method from {@link AsyncTask#onProgressUpdate} to fetch the data stored in the constructor.
*
* Use {@link AsyncTask#peekLocal} instead from {@link AsyncTask#onCompletion}, because this method does not delete
* the data, and not clearing the data will result in a warning for memory leak after {@link AsyncTask#onCompletion}
* finished executing.
*
* @param Server|null $server default null
*
* @return mixed
*
* @throws \RuntimeException if no data were stored by this AsyncTask instance
*/
protected function peekLocal(Server $server = null){
if($server === null){
$server = Server::getInstance();
assert($server !== null, "Call this method only from the main thread!");
}
return $server->getScheduler()->peekLocalComplex($this);
}
public function cleanObject(){
foreach($this as $p => $v){
if(!($v instanceof \Threaded)){

View File

@ -50,7 +50,7 @@ class ServerScheduler{
/** @var int */
protected $currentTick = 0;
/** @var \SplObjectStorage<AsyncTask> */
/** @var \SplObjectStorage<AsyncTask, object|array> */
protected $objectStore;
public function __construct(){
@ -78,6 +78,7 @@ class ServerScheduler{
public function scheduleAsyncTask(AsyncTask $task){
$id = $this->nextId();
$task->setTaskId($id);
$task->progressUpdates = new \Threaded;
$this->asyncPool->submitTask($task);
}
@ -92,6 +93,7 @@ class ServerScheduler{
public function scheduleAsyncTaskToWorker(AsyncTask $task, $worker){
$id = $this->nextId();
$task->setTaskId($id);
$task->progressUpdates = new \Threaded;
$this->asyncPool->submitTaskToWorker($task, $worker);
}
@ -100,7 +102,7 @@ class ServerScheduler{
*
* @internal Only call from AsyncTask.php
*
* @param AsyncTask $for
* @param AsyncTask $for
* @param object|array $cmplx
*
* @throws \RuntimeException if this method is called twice for the same instance of AsyncTask
@ -113,17 +115,39 @@ class ServerScheduler{
}
/**
* Fetches data that must not be passed to other threads or be serialized, previously stored with {@link #storeLocalComplex}
* Fetches data that must not be passed to other threads or be serialized, previously stored with
* {@link ServerScheduler#storeLocalComplex}, without deletion of the data.
*
* @internal Only call from AsyncTask.php
*
* @param AsyncTask $for
*
* @throws \RuntimException if no data associated with this AsyncTask can be found
*/
* @return object|array
*
* @throws \RuntimeException if no data associated with this AsyncTask can be found
*/
public function peekLocalComplex(AsyncTask $for){
if(!isset($this->objectStore[$for])){
throw new \RuntimeException("No local complex stored for this AsyncTask");
}
return $this->objectStore[$for];
}
/**
* Fetches data that must not be passed to other threads or be serialized, previously stored with
* {@link ServerScheduler#storeLocalComplex}, and delete the data from the storage.
*
* @internal Only call from AsyncTask.php
*
* @param AsyncTask $for
*
* @return object|array
*
* @throws \RuntimeException if no data associated with this AsyncTask can be found
*/
public function fetchLocalComplex(AsyncTask $for){
if(!isset($this->objectStore[$for])){
throw new \RuntimeException("Attempt to fetch undefined complex");
throw new \RuntimeException("No local complex stored for this AsyncTask");
}
$cmplx = $this->objectStore[$for];
unset($this->objectStore[$for]);
@ -138,7 +162,7 @@ class ServerScheduler{
* @param AsyncTask $for
*
* @return bool returns false if any data are removed from this call, true otherwise
*/
*/
public function removeLocalComplex(AsyncTask $for) : bool{
if(isset($this->objectStore[$for])){
Server::getInstance()->getLogger()->notice("AsyncTask " . get_class($for) . " stored local complex data but did not remove them after completion");