mirror of
https://github.com/pmmp/PocketMine-MP.git
synced 2025-04-22 00:33:59 +00:00
AsyncTask: Rewrite how thread-local storage works, now non-dependent on Server or ServerScheduler
this implementation was god-awful bad and it was entirely avoidable to make it this complicated. This utilizes the fact that pthreads treats static properties as thread-local. AsyncTask local storage now utilizes a \SplObjectStorage stored in an AsyncTask private static field.
This commit is contained in:
parent
299e4c8a85
commit
81957d133d
@ -114,7 +114,7 @@ class TimingsCommand extends VanillaCommand{
|
||||
]]
|
||||
], $sender) extends BulkCurlTask{
|
||||
public function onCompletion(Server $server){
|
||||
$sender = $this->fetchLocal($server);
|
||||
$sender = $this->fetchLocal();
|
||||
if($sender instanceof Player and !$sender->isOnline()){ // TODO replace with a more generic API method for checking availability of CommandSender
|
||||
return;
|
||||
}
|
||||
|
@ -144,7 +144,7 @@ class VerifyLoginTask extends AsyncTask{
|
||||
|
||||
public function onCompletion(Server $server){
|
||||
/** @var Player $player */
|
||||
$player = $this->fetchLocal($server);
|
||||
$player = $this->fetchLocal();
|
||||
if($player->isClosed()){
|
||||
$server->getLogger()->error("Player " . $player->getName() . " was disconnected before their login could be verified");
|
||||
}else{
|
||||
|
@ -161,14 +161,14 @@ class AsyncPool{
|
||||
if(!$task->hasCancelledRun()){
|
||||
try{
|
||||
$task->onCompletion($this->server);
|
||||
if(!$this->server->getScheduler()->removeLocalComplex($task)){
|
||||
if($task->removeDanglingStoredObjects()){
|
||||
$this->server->getLogger()->notice("AsyncTask " . get_class($task) . " stored local complex data but did not remove them after completion");
|
||||
}
|
||||
}catch(\Throwable $e){
|
||||
$this->server->getLogger()->critical("Could not execute completion of asynchronous task " . (new \ReflectionClass($task))->getShortName() . ": " . $e->getMessage());
|
||||
$this->server->getLogger()->logException($e);
|
||||
|
||||
$this->server->getScheduler()->removeLocalComplex($task); //silent
|
||||
$task->removeDanglingStoredObjects(); //silent
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -44,6 +44,11 @@ use pocketmine\Server;
|
||||
* WARNING: Do not call PocketMine-MP API methods from other Threads!!
|
||||
*/
|
||||
abstract class AsyncTask extends Collectable{
|
||||
/**
|
||||
* @var \SplObjectStorage|null
|
||||
* Used to store objects on the main thread which should not be serialized.
|
||||
*/
|
||||
private static $localObjectStorage;
|
||||
|
||||
/** @var AsyncWorker $worker */
|
||||
public $worker = null;
|
||||
@ -201,9 +206,9 @@ abstract class AsyncTask extends Collectable{
|
||||
}
|
||||
|
||||
/**
|
||||
* Saves mixed data in ServerScheduler's object storage on the main thread. You may use this to retain references to
|
||||
* objects or arrays which you need to access in {@link AsyncTask#onCompletion} which cannot be stored as a
|
||||
* property of your task.
|
||||
* Saves mixed data in thread-local storage on the parent thread. You may use this to retain references to objects
|
||||
* or arrays which you need to access in {@link AsyncTask#onCompletion} which cannot be stored as a property of
|
||||
* your task (due to them becoming serialized).
|
||||
*
|
||||
* Scalar types can be stored directly in class properties instead of using this storage.
|
||||
*
|
||||
@ -224,16 +229,22 @@ abstract class AsyncTask extends Collectable{
|
||||
* @throws \BadMethodCallException if called from any thread except the main thread
|
||||
*/
|
||||
protected function storeLocal($complexData){
|
||||
$server = Server::getInstance();
|
||||
if($server === null){
|
||||
throw new \BadMethodCallException("Objects can only be stored from the main thread!");
|
||||
if($this->worker !== null and $this->worker === \Thread::getCurrentThread()){
|
||||
throw new \BadMethodCallException("Objects can only be stored from the parent thread");
|
||||
}
|
||||
|
||||
$server->getScheduler()->storeLocalComplex($this, $complexData);
|
||||
if(self::$localObjectStorage === null){
|
||||
self::$localObjectStorage = new \SplObjectStorage(); //lazy init
|
||||
}
|
||||
|
||||
if(isset(self::$localObjectStorage[$this])){
|
||||
throw new \InvalidStateException("Already storing complex data for this async task");
|
||||
}
|
||||
self::$localObjectStorage[$this] = $complexData;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns mixed data stored in the ServerScheduler's object store and clears it. Call this method from
|
||||
* Returns and removes mixed data in thread-local storage on the parent thread. Call this method from
|
||||
* {@link AsyncTask#onCompletion} to fetch the data stored in the object store, if any.
|
||||
*
|
||||
* If no data was stored in the local store, or if the data was already retrieved by a previous call to fetchLocal,
|
||||
@ -245,50 +256,59 @@ abstract class AsyncTask extends Collectable{
|
||||
*
|
||||
* WARNING: THIS METHOD SHOULD ONLY BE CALLED FROM THE MAIN THREAD!
|
||||
*
|
||||
* @param Server|null $server
|
||||
*
|
||||
* @return mixed
|
||||
*
|
||||
* @throws \RuntimeException if no data were stored by this AsyncTask instance.
|
||||
* @throws \BadMethodCallException if called from any thread except the main thread
|
||||
*/
|
||||
protected function fetchLocal(Server $server = null){
|
||||
if($server === null){
|
||||
$server = Server::getInstance();
|
||||
if($server === null){
|
||||
throw new \BadMethodCallException("Stored objects can only be retrieved from the main thread");
|
||||
protected function fetchLocal(){
|
||||
try{
|
||||
return $this->peekLocal();
|
||||
}finally{
|
||||
if(self::$localObjectStorage !== null){
|
||||
unset(self::$localObjectStorage[$this]);
|
||||
}
|
||||
}
|
||||
|
||||
return $server->getScheduler()->fetchLocalComplex($this);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns mixed data stored in the ServerScheduler's object store **without clearing** it. Call this method from
|
||||
* Returns mixed data in thread-local storage on the parent thread **without clearing** it. Call this method from
|
||||
* {@link AsyncTask#onProgressUpdate} to fetch the data stored if you need to be able to access the data later on,
|
||||
* such as in another progress update.
|
||||
*
|
||||
* Use {@link AsyncTask#peekLocal} instead from {@link AsyncTask#onCompletion}, because this method does not delete
|
||||
* Use {@link AsyncTask#fetchLocal} instead from {@link AsyncTask#onCompletion}, because this method does not delete
|
||||
* the data, and not clearing the data will result in a warning for memory leak after {@link AsyncTask#onCompletion}
|
||||
* finished executing.
|
||||
*
|
||||
* WARNING: THIS METHOD SHOULD ONLY BE CALLED FROM THE MAIN THREAD!
|
||||
*
|
||||
* @param Server|null $server
|
||||
*
|
||||
* @return mixed
|
||||
*
|
||||
* @throws \RuntimeException if no data were stored by this AsyncTask instance
|
||||
* @throws \BadMethodCallException if called from any thread except the main thread
|
||||
*/
|
||||
protected function peekLocal(Server $server = null){
|
||||
if($server === null){
|
||||
$server = Server::getInstance();
|
||||
if($server === null){
|
||||
throw new \BadMethodCallException("Stored objects can only be peeked from the main thread");
|
||||
}
|
||||
protected function peekLocal(){
|
||||
if($this->worker !== null and $this->worker === \Thread::getCurrentThread()){
|
||||
throw new \BadMethodCallException("Objects can only be retrieved from the parent thread");
|
||||
}
|
||||
|
||||
return $server->getScheduler()->peekLocalComplex($this);
|
||||
if(self::$localObjectStorage === null or !isset(self::$localObjectStorage[$this])){
|
||||
throw new \InvalidStateException("No complex data stored for this async task");
|
||||
}
|
||||
|
||||
return self::$localObjectStorage[$this];
|
||||
}
|
||||
|
||||
/**
|
||||
* @internal Called by the AsyncPool to destroy any leftover stored objects that this task failed to retrieve.
|
||||
* @return bool
|
||||
*/
|
||||
public function removeDanglingStoredObjects() : bool{
|
||||
if(self::$localObjectStorage !== null and isset(self::$localObjectStorage[$this])){
|
||||
unset(self::$localObjectStorage[$this]);
|
||||
return true;
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
@ -53,13 +53,9 @@ class ServerScheduler{
|
||||
/** @var int */
|
||||
protected $currentTick = 0;
|
||||
|
||||
/** @var \SplObjectStorage<AsyncTask, object|array> */
|
||||
protected $objectStore;
|
||||
|
||||
public function __construct(){
|
||||
$this->queue = new ReversePriorityQueue();
|
||||
$this->asyncPool = new AsyncPool(Server::getInstance(), self::$WORKERS);
|
||||
$this->objectStore = new \SplObjectStorage();
|
||||
}
|
||||
|
||||
/**
|
||||
@ -106,82 +102,6 @@ class ServerScheduler{
|
||||
$this->asyncPool->submitTaskToWorker($task, $worker);
|
||||
}
|
||||
|
||||
/**
|
||||
* Stores any data that must not be passed to other threads or be serialized
|
||||
*
|
||||
* @internal Only call from AsyncTask.php
|
||||
*
|
||||
* @param AsyncTask $for
|
||||
* @param object|array $cmplx
|
||||
*
|
||||
* @throws \RuntimeException if this method is called twice for the same instance of AsyncTask
|
||||
*/
|
||||
public function storeLocalComplex(AsyncTask $for, $cmplx){
|
||||
if(isset($this->objectStore[$for])){
|
||||
throw new \RuntimeException("Already storing a complex for this AsyncTask");
|
||||
}
|
||||
$this->objectStore[$for] = $cmplx;
|
||||
}
|
||||
|
||||
/**
|
||||
* Fetches data that must not be passed to other threads or be serialized, previously stored with
|
||||
* {@link ServerScheduler#storeLocalComplex}, without deletion of the data.
|
||||
*
|
||||
* @internal Only call from AsyncTask.php
|
||||
*
|
||||
* @param AsyncTask $for
|
||||
*
|
||||
* @return object|array
|
||||
*
|
||||
* @throws \RuntimeException if no data associated with this AsyncTask can be found
|
||||
*/
|
||||
public function peekLocalComplex(AsyncTask $for){
|
||||
if(!isset($this->objectStore[$for])){
|
||||
throw new \RuntimeException("No local complex stored for this AsyncTask");
|
||||
}
|
||||
return $this->objectStore[$for];
|
||||
}
|
||||
|
||||
/**
|
||||
* Fetches data that must not be passed to other threads or be serialized, previously stored with
|
||||
* {@link ServerScheduler#storeLocalComplex}, and delete the data from the storage.
|
||||
*
|
||||
* @internal Only call from AsyncTask.php
|
||||
*
|
||||
* @param AsyncTask $for
|
||||
*
|
||||
* @return object|array
|
||||
*
|
||||
* @throws \RuntimeException if no data associated with this AsyncTask can be found
|
||||
*/
|
||||
public function fetchLocalComplex(AsyncTask $for){
|
||||
if(!isset($this->objectStore[$for])){
|
||||
throw new \RuntimeException("No local complex stored for this AsyncTask");
|
||||
}
|
||||
$cmplx = $this->objectStore[$for];
|
||||
unset($this->objectStore[$for]);
|
||||
return $cmplx;
|
||||
}
|
||||
|
||||
/**
|
||||
* Makes sure no data stored from {@link #storeLocalComplex} is left for a specific AsyncTask
|
||||
*
|
||||
* @internal Only call from AsyncTask.php
|
||||
*
|
||||
* @param AsyncTask $for
|
||||
*
|
||||
* @return bool returns false if any data are removed from this call, true otherwise
|
||||
*/
|
||||
public function removeLocalComplex(AsyncTask $for) : bool{
|
||||
if(isset($this->objectStore[$for])){
|
||||
unset($this->objectStore[$for]);
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
public function getAsyncTaskPoolSize() : int{
|
||||
return $this->asyncPool->getSize();
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user