pmmpthread support

This commit is contained in:
Dylan K. Taylor
2023-05-20 01:29:26 +01:00
parent 8454076235
commit e0630fbb25
24 changed files with 159 additions and 126 deletions

View File

@@ -338,7 +338,7 @@ JIT_WARNING
$logger->info("Stopping other threads");
$killer = new ServerKiller(8);
$killer->start(PTHREADS_INHERIT_NONE);
$killer->start();
usleep(10000); //Fixes ServerKiller not being able to start on single-core machines
if(ThreadManager::getInstance()->stopAll() > 0){

View File

@@ -417,7 +417,7 @@ class Server{
return $this->autoloader;
}
public function getLogger() : \AttachableThreadedLogger{
public function getLogger() : \AttachableThreadSafeLogger{
return $this->logger;
}
@@ -760,7 +760,7 @@ class Server{
public function __construct(
private \DynamicClassLoader $autoloader,
private \AttachableThreadedLogger $logger,
private \AttachableThreadSafeLogger $logger,
string $dataPath,
string $pluginPath
){

View File

@@ -23,6 +23,8 @@ declare(strict_types=1);
namespace pocketmine\console;
use pmmp\thread\Thread as NativeThread;
use pmmp\thread\ThreadSafeArray;
use pocketmine\utils\Process;
use function cli_set_process_title;
use function count;
@@ -30,7 +32,6 @@ use function dirname;
use function feof;
use function fwrite;
use function stream_socket_client;
use const PTHREADS_INHERIT_NONE;
require dirname(__DIR__, 2) . '/vendor/autoload.php';
@@ -46,14 +47,14 @@ if($socket === false){
throw new \RuntimeException("Failed to connect to server process ($errCode): $errMessage");
}
/** @phpstan-var \ThreadedArray<int, string> $channel */
$channel = new \ThreadedArray();
$thread = new class($channel) extends \Thread{
/** @phpstan-var ThreadSafeArray<int, string> $channel */
$channel = new ThreadSafeArray();
$thread = new class($channel) extends NativeThread{
/**
* @phpstan-param \ThreadedArray<int, string> $channel
* @phpstan-param ThreadSafeArray<int, string> $channel
*/
public function __construct(
private \ThreadedArray $channel,
private ThreadSafeArray $channel,
){}
public function run() : void{
@@ -73,7 +74,7 @@ $thread = new class($channel) extends \Thread{
}
};
$thread->start(PTHREADS_INHERIT_NONE);
$thread->start(NativeThread::INHERIT_NONE);
while(!feof($socket)){
$line = $channel->synchronized(function() use ($channel) : ?string{
if(count($channel) === 0){

View File

@@ -23,13 +23,14 @@ declare(strict_types=1);
namespace pocketmine\network\mcpe\raklib;
use pmmp\thread\ThreadSafeArray;
use raklib\server\ipc\InterThreadChannelReader;
final class PthreadsChannelReader implements InterThreadChannelReader{
/**
* @phpstan-param \ThreadedArray<int, string> $buffer
* @phpstan-param ThreadSafeArray<int, string> $buffer
*/
public function __construct(private \ThreadedArray $buffer){}
public function __construct(private ThreadSafeArray $buffer){}
public function read() : ?string{
return $this->buffer->shift();

View File

@@ -23,13 +23,14 @@ declare(strict_types=1);
namespace pocketmine\network\mcpe\raklib;
use pmmp\thread\ThreadSafeArray;
use raklib\server\ipc\InterThreadChannelWriter;
final class PthreadsChannelWriter implements InterThreadChannelWriter{
/**
* @phpstan-param \ThreadedArray<int, string> $buffer
* @phpstan-param ThreadSafeArray<int, string> $buffer
*/
public function __construct(private \ThreadedArray $buffer){}
public function __construct(private ThreadSafeArray $buffer){}
public function write(string $str) : void{
$this->buffer[] = $str;

View File

@@ -23,6 +23,7 @@ declare(strict_types=1);
namespace pocketmine\network\mcpe\raklib;
use pmmp\thread\ThreadSafeArray;
use pocketmine\lang\KnownTranslationFactory;
use pocketmine\network\AdvancedNetworkInterface;
use pocketmine\network\mcpe\compression\ZlibCompressor;
@@ -105,10 +106,10 @@ class RakLibInterface implements ServerEventListener, AdvancedNetworkInterface{
$this->sleeper = new SleeperNotifier();
/** @phpstan-var \ThreadedArray<int, string> $mainToThreadBuffer */
$mainToThreadBuffer = new \ThreadedArray();
/** @phpstan-var \ThreadedArray<int, string> $threadToMainBuffer */
$threadToMainBuffer = new \ThreadedArray();
/** @phpstan-var ThreadSafeArray<int, string> $mainToThreadBuffer */
$mainToThreadBuffer = new ThreadSafeArray();
/** @phpstan-var ThreadSafeArray<int, string> $threadToMainBuffer */
$threadToMainBuffer = new ThreadSafeArray();
$this->rakLib = new RakLibServer(
$this->server->getLogger(),

View File

@@ -23,6 +23,8 @@ declare(strict_types=1);
namespace pocketmine\network\mcpe\raklib;
use pmmp\thread\Thread as NativeThread;
use pmmp\thread\ThreadSafeArray;
use pocketmine\snooze\SleeperNotifier;
use pocketmine\thread\NonThreadSafeValue;
use pocketmine\thread\Thread;
@@ -38,7 +40,6 @@ use function error_get_last;
use function gc_enable;
use function ini_set;
use function register_shutdown_function;
use const PTHREADS_INHERIT_NONE;
class RakLibServer extends Thread{
protected bool $cleanShutdown = false;
@@ -50,13 +51,13 @@ class RakLibServer extends Thread{
protected NonThreadSafeValue $address;
/**
* @phpstan-param \ThreadedArray<int, string> $mainToThreadBuffer
* @phpstan-param \ThreadedArray<int, string> $threadToMainBuffer
* @phpstan-param ThreadSafeArray<int, string> $mainToThreadBuffer
* @phpstan-param ThreadSafeArray<int, string> $threadToMainBuffer
*/
public function __construct(
protected \ThreadedLogger $logger,
protected \ThreadedArray $mainToThreadBuffer,
protected \ThreadedArray $threadToMainBuffer,
protected \ThreadSafeLogger $logger,
protected ThreadSafeArray $mainToThreadBuffer,
protected ThreadSafeArray $threadToMainBuffer,
InternetAddress $address,
protected int $serverId,
protected int $maxMtuSize,
@@ -88,13 +89,13 @@ class RakLibServer extends Thread{
}
private function setCrashInfo(RakLibThreadCrashInfo $info) : void{
$this->synchronized(function(RakLibThreadCrashInfo $info) : void{
$this->synchronized(function() use ($info) : void{
$this->crashInfo = new NonThreadSafeValue($info);
$this->notify();
}, $info);
});
}
public function startAndWait(int $options = PTHREADS_INHERIT_NONE) : void{
public function startAndWait(int $options = NativeThread::INHERIT_NONE) : void{
$this->start($options);
$this->synchronized(function() : void{
while(!$this->ready && $this->crashInfo === null){

View File

@@ -23,15 +23,16 @@ declare(strict_types=1);
namespace pocketmine\network\mcpe\raklib;
use pmmp\thread\ThreadSafeArray;
use pocketmine\snooze\SleeperNotifier;
use raklib\server\ipc\InterThreadChannelWriter;
final class SnoozeAwarePthreadsChannelWriter implements InterThreadChannelWriter{
/**
* @phpstan-param \ThreadedArray<int, string> $buffer
* @phpstan-param ThreadSafeArray<int, string> $buffer
*/
public function __construct(
private \ThreadedArray $buffer,
private ThreadSafeArray $buffer,
private SleeperNotifier $notifier
){}

View File

@@ -23,6 +23,8 @@ declare(strict_types=1);
namespace pocketmine\scheduler;
use pmmp\thread\Thread as NativeThread;
use pmmp\thread\ThreadSafeArray;
use pocketmine\snooze\SleeperHandler;
use pocketmine\snooze\SleeperNotifier;
use pocketmine\utils\Utils;
@@ -33,14 +35,13 @@ use function count;
use function spl_object_id;
use function time;
use const PHP_INT_MAX;
use const PTHREADS_INHERIT_INI;
/**
* Manages general-purpose worker threads used for processing asynchronous tasks, and the tasks submitted to those
* workers.
*/
class AsyncPool{
private const WORKER_START_OPTIONS = PTHREADS_INHERIT_INI;
private const WORKER_START_OPTIONS = NativeThread::INHERIT_INI | NativeThread::INHERIT_COMMENTS;
/**
* @var \SplQueue[]|AsyncTask[][]
@@ -69,7 +70,7 @@ class AsyncPool{
protected int $size,
private int $workerMemoryLimit,
private \ClassLoader $classLoader,
private \ThreadedLogger $logger,
private \ThreadSafeLogger $logger,
private SleeperHandler $eventLoop
){}
@@ -158,7 +159,7 @@ class AsyncPool{
throw new \InvalidArgumentException("Cannot submit the same AsyncTask instance more than once");
}
$task->progressUpdates = new \ThreadedArray();
$task->progressUpdates = new ThreadSafeArray();
$task->setSubmitted();
$this->getWorker($worker)->stack($task);

View File

@@ -23,7 +23,11 @@ declare(strict_types=1);
namespace pocketmine\scheduler;
use pmmp\thread\Runnable;
use pmmp\thread\Thread as NativeThread;
use pmmp\thread\ThreadSafeArray;
use pocketmine\thread\NonThreadSafeValue;
use function assert;
use function igbinary_serialize;
use function igbinary_unserialize;
use function is_null;
@@ -54,7 +58,7 @@ use function spl_object_id;
* If you want to store non-thread-safe objects to access when the task completes, store them using
* {@link AsyncTask::storeLocal}.
*/
abstract class AsyncTask extends \ThreadedRunnable{
abstract class AsyncTask extends Runnable{
/**
* @var \ArrayObject|mixed[]|null object hash => mixed data
* @phpstan-var \ArrayObject<int, array<string, mixed>>|null
@@ -63,11 +67,8 @@ abstract class AsyncTask extends \ThreadedRunnable{
*/
private static ?\ArrayObject $threadLocalStorage = null;
/** @var AsyncWorker|null $worker */
public $worker = null;
/** @phpstan-var \ThreadedArray<int, string> */
public \ThreadedArray $progressUpdates;
/** @phpstan-var ThreadSafeArray<int, string> */
public ThreadSafeArray $progressUpdates;
/** @phpstan-var NonThreadSafeValue<mixed>|string|int|bool|float|null */
private NonThreadSafeValue|string|int|bool|null|float $result = null;
@@ -85,12 +86,15 @@ abstract class AsyncTask extends \ThreadedRunnable{
$this->onRun();
}catch(\Throwable $e){
$this->crashed = true;
$this->worker->handleException($e);
\GlobalLogger::get()->logException($e);
}
}
$this->finished = true;
$this->worker->getNotifier()->wakeupSleeper();
$worker = NativeThread::getCurrentThread();
assert($worker instanceof AsyncWorker);
$worker->getNotifier()->wakeupSleeper();
}
public function isCrashed() : bool{

View File

@@ -23,6 +23,7 @@ declare(strict_types=1);
namespace pocketmine\scheduler;
use pmmp\thread\Thread as NativeThread;
use pocketmine\snooze\SleeperNotifier;
use pocketmine\thread\Worker;
use function gc_enable;
@@ -33,7 +34,7 @@ class AsyncWorker extends Worker{
private static array $store = [];
public function __construct(
private \ThreadedLogger $logger,
private \ThreadSafeLogger $logger,
private int $id,
private int $memoryLimit,
private SleeperNotifier $notifier
@@ -57,7 +58,7 @@ class AsyncWorker extends Worker{
}
}
public function getLogger() : \ThreadedLogger{
public function getLogger() : \ThreadSafeLogger{
return $this->logger;
}
@@ -78,7 +79,7 @@ class AsyncWorker extends Worker{
* want to use on this worker thread from multiple AsyncTasks.
*/
public function saveToThreadStore(string $identifier, mixed $value) : void{
if(\Thread::getCurrentThread() !== $this){
if(NativeThread::getCurrentThread() !== $this){
throw new \LogicException("Thread-local data can only be stored in the thread context");
}
self::$store[$identifier] = $value;
@@ -93,7 +94,7 @@ class AsyncWorker extends Worker{
* Objects stored in this storage may ONLY be retrieved while the task is running.
*/
public function getFromThreadStore(string $identifier) : mixed{
if(\Thread::getCurrentThread() !== $this){
if(NativeThread::getCurrentThread() !== $this){
throw new \LogicException("Thread-local data can only be fetched in the thread context");
}
return self::$store[$identifier] ?? null;
@@ -103,7 +104,7 @@ class AsyncWorker extends Worker{
* Removes previously-stored mixed data from the worker's thread-local object store.
*/
public function removeFromThreadStore(string $identifier) : void{
if(\Thread::getCurrentThread() !== $this){
if(NativeThread::getCurrentThread() !== $this){
throw new \LogicException("Thread-local data can only be removed in the thread context");
}
unset(self::$store[$identifier]);

View File

@@ -23,8 +23,10 @@ declare(strict_types=1);
namespace pocketmine\scheduler;
use pmmp\thread\Thread as NativeThread;
use pocketmine\MemoryManager;
use Symfony\Component\Filesystem\Path;
use function assert;
/**
* Task used to dump memory from AsyncWorkers
@@ -37,12 +39,14 @@ class DumpWorkerMemoryTask extends AsyncTask{
){}
public function onRun() : void{
$worker = NativeThread::getCurrentThread();
assert($worker instanceof AsyncWorker);
MemoryManager::dumpMemory(
$this->worker,
Path::join($this->outputFolder, "AsyncWorker#" . $this->worker->getAsyncWorkerId()),
$worker,
Path::join($this->outputFolder, "AsyncWorker#" . $worker->getAsyncWorkerId()),
$this->maxNesting,
$this->maxStringSize,
new \PrefixedLogger($this->worker->getLogger(), "Memory Dump")
new \PrefixedLogger($worker->getLogger(), "Memory Dump")
);
}
}

View File

@@ -23,16 +23,17 @@ declare(strict_types=1);
namespace pocketmine\thread;
use pmmp\thread\ThreadSafeArray;
use pocketmine\errorhandler\ErrorToExceptionHandler;
use pocketmine\Server;
use function error_reporting;
trait CommonThreadPartsTrait{
/**
* @var \ThreadedArray|\ClassLoader[]|null
* @phpstan-var \ThreadedArray<int, \ClassLoader>|null
* @var ThreadSafeArray|\ClassLoader[]|null
* @phpstan-var ThreadSafeArray<int, \ClassLoader>|null
*/
private ?\ThreadedArray $classLoaders = null;
private ?ThreadSafeArray $classLoaders = null;
protected ?string $composerAutoloaderPath = null;
protected bool $isKilled = false;
@@ -55,14 +56,15 @@ trait CommonThreadPartsTrait{
}
if($this->classLoaders === null){
$this->classLoaders = new \ThreadedArray();
$loaders = $this->classLoaders = new ThreadSafeArray();
}else{
$loaders = $this->classLoaders;
foreach($this->classLoaders as $k => $autoloader){
unset($this->classLoaders[$k]);
}
}
foreach($autoloaders as $autoloader){
$this->classLoaders[] = $autoloader;
$loaders[] = $autoloader;
}
}

View File

@@ -23,6 +23,7 @@ declare(strict_types=1);
namespace pocketmine\thread;
use pmmp\thread\ThreadSafe;
use function get_debug_type;
use function igbinary_serialize;
use function igbinary_unserialize;
@@ -34,7 +35,7 @@ use function igbinary_unserialize;
*
* @phpstan-template TValue
*/
final class NonThreadSafeValue extends \ThreadedBase{
final class NonThreadSafeValue extends ThreadSafe{
private string $variable;
/**

View File

@@ -23,8 +23,8 @@ declare(strict_types=1);
namespace pocketmine\thread;
use pmmp\thread\Thread as NativeThread;
use pocketmine\scheduler\AsyncTask;
use const PTHREADS_INHERIT_NONE;
/**
* Specialized Thread class aimed at PocketMine-MP-related usages. It handles setting up autoloading and error handling.
@@ -35,10 +35,10 @@ use const PTHREADS_INHERIT_NONE;
* CPU.
* @see AsyncTask
*/
abstract class Thread extends \Thread{
abstract class Thread extends NativeThread{
use CommonThreadPartsTrait;
public function start(int $options = PTHREADS_INHERIT_NONE) : bool{
public function start(int $options = NativeThread::INHERIT_NONE) : bool{
//this is intentionally not traitified
ThreadManager::getInstance()->add($this);

View File

@@ -23,9 +23,11 @@ declare(strict_types=1);
namespace pocketmine\thread;
use pmmp\thread\ThreadSafe;
use pmmp\thread\ThreadSafeArray;
use function spl_object_id;
class ThreadManager extends \ThreadedBase{
class ThreadManager extends ThreadSafe{
private static ?self $instance = null;
@@ -40,11 +42,11 @@ class ThreadManager extends \ThreadedBase{
return self::$instance;
}
/** @phpstan-var \ThreadedArray<int, Thread|Worker> */
private \ThreadedArray $threads;
/** @phpstan-var ThreadSafeArray<int, Thread|Worker> */
private ThreadSafeArray $threads;
private function __construct(){
$this->threads = new \ThreadedArray();
$this->threads = new ThreadSafeArray();
}
public function add(Worker|Thread $thread) : void{

View File

@@ -23,8 +23,9 @@ declare(strict_types=1);
namespace pocketmine\thread;
use pmmp\thread\Thread as NativeThread;
use pmmp\thread\Worker as NativeWorker;
use pocketmine\scheduler\AsyncTask;
use const PTHREADS_INHERIT_NONE;
/**
* Specialized Worker class for PocketMine-MP-related use cases. It handles setting up autoloading and error handling.
@@ -36,10 +37,10 @@ use const PTHREADS_INHERIT_NONE;
* If you want to run tasks on other CPU cores, check out AsyncTask first.
* @see AsyncTask
*/
abstract class Worker extends \Worker{
abstract class Worker extends NativeWorker{
use CommonThreadPartsTrait;
public function start(int $options = PTHREADS_INHERIT_NONE) : bool{
public function start(int $options = NativeThread::INHERIT_NONE) : bool{
//this is intentionally not traitified
ThreadManager::getInstance()->add($this);

View File

@@ -24,14 +24,14 @@ declare(strict_types=1);
namespace pocketmine\utils;
use LogLevel;
use pmmp\thread\Thread as NativeThread;
use pocketmine\thread\Thread;
use pocketmine\thread\Worker;
use function implode;
use function sprintf;
use const PHP_EOL;
use const PTHREADS_INHERIT_NONE;
class MainLogger extends \AttachableThreadedLogger implements \BufferedLogger{
class MainLogger extends \AttachableThreadSafeLogger implements \BufferedLogger{
protected bool $logDebug;
private string $format = TextFormat::AQUA . "[%s] " . TextFormat::RESET . "%s[%s/%s]: %s" . TextFormat::RESET;
@@ -52,7 +52,7 @@ class MainLogger extends \AttachableThreadedLogger implements \BufferedLogger{
$this->timezone = $timezone->getName();
$this->logWriterThread = new MainLoggerThread($logFile);
$this->logWriterThread->start(PTHREADS_INHERIT_NONE);
$this->logWriterThread->start(NativeThread::INHERIT_NONE);
}
/**
@@ -165,7 +165,7 @@ class MainLogger extends \AttachableThreadedLogger implements \BufferedLogger{
}
public function shutdownLogWriterThread() : void{
if(\Thread::getCurrentThreadId() === $this->logWriterThread->getCreatorId()){
if(NativeThread::getCurrentThreadId() === $this->logWriterThread->getCreatorId()){
$this->logWriterThread->shutdown();
}else{
throw new \LogicException("Only the creator thread can shutdown the logger thread");
@@ -175,7 +175,7 @@ class MainLogger extends \AttachableThreadedLogger implements \BufferedLogger{
protected function send(string $message, string $level, string $prefix, string $color) : void{
$time = new \DateTime('now', new \DateTimeZone($this->timezone));
$thread = \Thread::getCurrentThread();
$thread = NativeThread::getCurrentThread();
if($thread === null){
$threadName = $this->mainThreadName . " thread";
}elseif($thread instanceof Thread || $thread instanceof Worker){
@@ -195,7 +195,7 @@ class MainLogger extends \AttachableThreadedLogger implements \BufferedLogger{
$this->logWriterThread->write($time->format("Y-m-d") . " " . TextFormat::clean($message) . PHP_EOL);
/**
* @var \ThreadedLoggerAttachment $attachment
* @var \ThreadSafeLoggerAttachment $attachment
*/
foreach($this->attachments as $attachment){
$attachment->log($level, $message);
@@ -208,7 +208,7 @@ class MainLogger extends \AttachableThreadedLogger implements \BufferedLogger{
}
public function __destruct(){
if(!$this->logWriterThread->isJoined() && \Thread::getCurrentThreadId() === $this->logWriterThread->getCreatorId()){
if(!$this->logWriterThread->isJoined() && NativeThread::getCurrentThreadId() === $this->logWriterThread->getCreatorId()){
$this->shutdownLogWriterThread();
}
}

View File

@@ -23,22 +23,24 @@ declare(strict_types=1);
namespace pocketmine\utils;
use pmmp\thread\Thread;
use pmmp\thread\ThreadSafeArray;
use function fclose;
use function fopen;
use function fwrite;
use function is_resource;
use function touch;
final class MainLoggerThread extends \Thread{
/** @phpstan-var \ThreadedArray<int, string> */
private \ThreadedArray $buffer;
final class MainLoggerThread extends Thread{
/** @phpstan-var ThreadSafeArray<int, string> */
private ThreadSafeArray $buffer;
private bool $syncFlush = false;
private bool $shutdown = false;
public function __construct(
private string $logFile
){
$this->buffer = new \ThreadedArray();
$this->buffer = new ThreadSafeArray();
touch($this->logFile);
}