AsyncTask and AsyncPool no longer tolerate uncaught errors in tasks

Since task execution depends on tasks executing sequentially on a particular worker in some cases (e.g. PopulationTask must be preceded by GeneratorRegisterTask), it doesn't make sense to continue task execution if an error occurs.
Moreover, a task crashing may render the whole server unstable, as it leaves the server in an undefined state. This is the same kind of problem we fixed with scheduled tasks in PM3.

In versions past, pthreads was unreliable enough that random tasks would crash without an obvious reason, forcing us to accommodate this. I still don't know the origin or frequency of said issues, but I think it's time to rip the band-aid off and solve these problems for real.
This commit is contained in:
Dylan K. Taylor 2023-06-22 13:29:36 +01:00
parent ad3f854701
commit 0240d35c05
No known key found for this signature in database
GPG Key ID: 8927471A91CAFD3D
7 changed files with 46 additions and 45 deletions

View File

@ -22,7 +22,7 @@
"ext-openssl": "*", "ext-openssl": "*",
"ext-pcre": "*", "ext-pcre": "*",
"ext-phar": "*", "ext-phar": "*",
"ext-pmmpthread": "^6.0.1", "ext-pmmpthread": "^6.0.4",
"ext-reflection": "*", "ext-reflection": "*",
"ext-simplexml": "*", "ext-simplexml": "*",
"ext-sockets": "*", "ext-sockets": "*",

View File

@ -120,8 +120,8 @@ namespace pocketmine {
} }
if(($pmmpthread_version = phpversion("pmmpthread")) !== false){ if(($pmmpthread_version = phpversion("pmmpthread")) !== false){
if(version_compare($pmmpthread_version, "6.0.1") < 0 || version_compare($pmmpthread_version, "7.0.0") >= 0){ if(version_compare($pmmpthread_version, "6.0.4") < 0 || version_compare($pmmpthread_version, "7.0.0") >= 0){
$messages[] = "pmmpthread ^6.0.1 is required, while you have $pmmpthread_version."; $messages[] = "pmmpthread ^6.0.4 is required, while you have $pmmpthread_version.";
} }
} }

View File

@ -39,7 +39,6 @@ use pocketmine\world\format\io\FastChunkSerializer;
class ChunkRequestTask extends AsyncTask{ class ChunkRequestTask extends AsyncTask{
private const TLS_KEY_PROMISE = "promise"; private const TLS_KEY_PROMISE = "promise";
private const TLS_KEY_ERROR_HOOK = "errorHook";
protected string $chunk; protected string $chunk;
protected int $chunkX; protected int $chunkX;
@ -48,10 +47,7 @@ class ChunkRequestTask extends AsyncTask{
protected NonThreadSafeValue $compressor; protected NonThreadSafeValue $compressor;
private string $tiles; private string $tiles;
/** public function __construct(int $chunkX, int $chunkZ, Chunk $chunk, CompressBatchPromise $promise, Compressor $compressor){
* @phpstan-param (\Closure() : void)|null $onError
*/
public function __construct(int $chunkX, int $chunkZ, Chunk $chunk, CompressBatchPromise $promise, Compressor $compressor, ?\Closure $onError = null){
$this->compressor = new NonThreadSafeValue($compressor); $this->compressor = new NonThreadSafeValue($compressor);
$this->chunk = FastChunkSerializer::serializeTerrain($chunk); $this->chunk = FastChunkSerializer::serializeTerrain($chunk);
@ -60,7 +56,6 @@ class ChunkRequestTask extends AsyncTask{
$this->tiles = ChunkSerializer::serializeTiles($chunk); $this->tiles = ChunkSerializer::serializeTiles($chunk);
$this->storeLocal(self::TLS_KEY_PROMISE, $promise); $this->storeLocal(self::TLS_KEY_PROMISE, $promise);
$this->storeLocal(self::TLS_KEY_ERROR_HOOK, $onError);
} }
public function onRun() : void{ public function onRun() : void{
@ -75,17 +70,6 @@ class ChunkRequestTask extends AsyncTask{
$this->setResult($this->compressor->deserialize()->compress($stream->getBuffer())); $this->setResult($this->compressor->deserialize()->compress($stream->getBuffer()));
} }
public function onError() : void{
/**
* @var \Closure|null $hook
* @phpstan-var (\Closure() : void)|null $hook
*/
$hook = $this->fetchLocal(self::TLS_KEY_ERROR_HOOK);
if($hook !== null){
$hook();
}
}
public function onCompletion() : void{ public function onCompletion() : void{
/** @var CompressBatchPromise $promise */ /** @var CompressBatchPromise $promise */
$promise = $this->fetchLocal(self::TLS_KEY_PROMISE); $promise = $this->fetchLocal(self::TLS_KEY_PROMISE);

View File

@ -118,14 +118,7 @@ class ChunkCache implements ChunkListener{
$chunkZ, $chunkZ,
$chunk, $chunk,
$this->caches[$chunkHash], $this->caches[$chunkHash],
$this->compressor, $this->compressor
function() use ($chunkHash, $chunkX, $chunkZ) : void{
$this->world->getLogger()->error("Failed preparing chunk $chunkX $chunkZ, retrying");
if(isset($this->caches[$chunkHash])){
$this->restartPendingRequest($chunkX, $chunkZ);
}
}
) )
); );

View File

@ -28,11 +28,13 @@ use pocketmine\snooze\SleeperHandler;
use pocketmine\thread\log\ThreadSafeLogger; use pocketmine\thread\log\ThreadSafeLogger;
use pocketmine\thread\ThreadSafeClassLoader; use pocketmine\thread\ThreadSafeClassLoader;
use pocketmine\timings\Timings; use pocketmine\timings\Timings;
use pocketmine\utils\AssumptionFailedError;
use pocketmine\utils\Utils; use pocketmine\utils\Utils;
use function array_keys; use function array_keys;
use function array_map; use function array_map;
use function assert; use function assert;
use function count; use function count;
use function get_class;
use function spl_object_id; use function spl_object_id;
use function time; use function time;
use const PHP_INT_MAX; use const PHP_INT_MAX;
@ -130,6 +132,8 @@ class AsyncPool{
foreach($this->workerStartHooks as $hook){ foreach($this->workerStartHooks as $hook){
$hook($workerId); $hook($workerId);
} }
}else{
$this->checkCrashedWorker($workerId, null);
} }
return $this->workers[$workerId]; return $this->workers[$workerId];
@ -198,6 +202,28 @@ class AsyncPool{
return $worker; return $worker;
} }
private function checkCrashedWorker(int $workerId, ?AsyncTask $crashedTask) : void{
$entry = $this->workers[$workerId];
if($entry->worker->isTerminated()){
if($crashedTask === null){
foreach($entry->tasks as $task){
if($task->isTerminated()){
$crashedTask = $task;
break;
}elseif(!$task->isFinished()){
break;
}
}
}
if($crashedTask !== null){
$message = "Worker $workerId crashed while running task " . get_class($crashedTask) . "#" . spl_object_id($crashedTask);
}else{
$message = "Worker $workerId crashed for unknown reason";
}
throw new \RuntimeException($message);
}
}
/** /**
* Collects finished and/or crashed tasks from the workers, firing their on-completion hooks where appropriate. * Collects finished and/or crashed tasks from the workers, firing their on-completion hooks where appropriate.
* *
@ -230,11 +256,9 @@ class AsyncPool{
if($task->isFinished()){ //make sure the task actually executed before trying to collect if($task->isFinished()){ //make sure the task actually executed before trying to collect
$queue->dequeue(); $queue->dequeue();
if($task->isCrashed()){ if($task->isTerminated()){
$this->logger->critical("Could not execute asynchronous task " . (new \ReflectionClass($task))->getShortName() . ": Task crashed"); $this->checkCrashedWorker($worker, $task);
Timings::getAsyncTaskErrorTimings($task)->time(function() use ($task) : void{ throw new AssumptionFailedError("checkCrashedWorker() should have thrown an exception, making this unreachable");
$task->onError();
});
}elseif(!$task->hasCancelledRun()){ }elseif(!$task->hasCancelledRun()){
/* /*
* It's possible for a task to submit a progress update and then finish before the progress * It's possible for a task to submit a progress update and then finish before the progress

View File

@ -75,20 +75,13 @@ abstract class AsyncTask extends Runnable{
private bool $cancelRun = false; private bool $cancelRun = false;
private bool $submitted = false; private bool $submitted = false;
private bool $crashed = false;
private bool $finished = false; private bool $finished = false;
public function run() : void{ public function run() : void{
$this->result = null; $this->result = null;
if(!$this->cancelRun){ if(!$this->cancelRun){
try{ $this->onRun();
$this->onRun();
}catch(\Throwable $e){
$this->crashed = true;
\GlobalLogger::get()->logException($e);
}
} }
$this->finished = true; $this->finished = true;
@ -97,8 +90,11 @@ abstract class AsyncTask extends Runnable{
$worker->getNotifier()->wakeupSleeper(); $worker->getNotifier()->wakeupSleeper();
} }
/**
* @deprecated
*/
public function isCrashed() : bool{ public function isCrashed() : bool{
return $this->crashed || $this->isTerminated(); return $this->isTerminated();
} }
/** /**
@ -106,7 +102,7 @@ abstract class AsyncTask extends Runnable{
* because it is not true prior to task execution. * because it is not true prior to task execution.
*/ */
public function isFinished() : bool{ public function isFinished() : bool{
return $this->finished || $this->isCrashed(); return $this->finished || $this->isTerminated();
} }
public function hasResult() : bool{ public function hasResult() : bool{
@ -195,8 +191,7 @@ abstract class AsyncTask extends Runnable{
} }
/** /**
* Called from the main thread when the async task experiences an error during onRun(). Use this for things like * @deprecated No longer used
* promise rejection.
*/ */
public function onError() : void{ public function onError() : void{

View File

@ -31,6 +31,7 @@ use pocketmine\thread\Worker;
use pocketmine\utils\AssumptionFailedError; use pocketmine\utils\AssumptionFailedError;
use function gc_enable; use function gc_enable;
use function ini_set; use function ini_set;
use function set_exception_handler;
class AsyncWorker extends Worker{ class AsyncWorker extends Worker{
/** @var mixed[] */ /** @var mixed[] */
@ -67,6 +68,10 @@ class AsyncWorker extends Worker{
} }
$this->saveToThreadStore(self::TLS_KEY_NOTIFIER, $this->sleeperEntry->createNotifier()); $this->saveToThreadStore(self::TLS_KEY_NOTIFIER, $this->sleeperEntry->createNotifier());
set_exception_handler(function(\Throwable $e){
$this->logger->logException($e);
});
} }
public function getLogger() : ThreadSafeLogger{ public function getLogger() : ThreadSafeLogger{