From 20c2fae0c6ed7e7927d0139662bd61ac67e90625 Mon Sep 17 00:00:00 2001 From: "Dylan K. Taylor" Date: Mon, 5 Jun 2023 21:11:13 +0100 Subject: [PATCH] First look at specialized network compression threads closes #5641 This has been mostly tested on Windows so far, where it offers substantial performance gains for compression. On *nix, the performance advantage is smaller, since there's less overhead to creating new AsyncTask instances; however, the performance benefit is still clearly visible. There are still some wrinkles to iron out, such as the dumb algorithm used for cycling through threads which wastes memory on small servers, but this change is mainly aimed at large servers, where the benefit will be clearly apparent. In practice, this should reduce main thread CPU load by 10-20% on some of the largest servers, offering a large amount of headroom for increased player counts. --- resources/pocketmine.yml | 4 + src/Server.php | 49 +++-- .../mcpe/compression/CompressorWorker.php | 204 ++++++++++++++++++ .../mcpe/compression/CompressorWorkerPool.php | 91 ++++++++ 4 files changed, 335 insertions(+), 13 deletions(-) create mode 100644 src/network/mcpe/compression/CompressorWorker.php create mode 100644 src/network/mcpe/compression/CompressorWorkerPool.php diff --git a/resources/pocketmine.yml b/resources/pocketmine.yml index 408b5b95b..c2de49563 100644 --- a/resources/pocketmine.yml +++ b/resources/pocketmine.yml @@ -85,6 +85,10 @@ network: batch-threshold: 256 #Compression level used when sending batched packets. Higher = more CPU, less bandwidth usage compression-level: 6 + #EXPERIMENTAL! Max threads to use for packet compression. If disabled, compression will be done on the main thread. + #Set to 0 to disable, or "auto" to try to detect the number of available CPU cores. + #Higher values will allow using more CPU cores on large servers, but will also increase memory usage. + compression-threads: 0 #Use AsyncTasks for compression during the main game session. Increases latency, but may reduce main thread load async-compression: false #Threshold for async compression, in bytes. Only packets larger than this will be compressed asynchronously diff --git a/src/Server.php b/src/Server.php index f604c93d7..409db8c68 100644 --- a/src/Server.php +++ b/src/Server.php @@ -50,8 +50,8 @@ use pocketmine\lang\LanguageNotFoundException; use pocketmine\lang\Translatable; use pocketmine\nbt\tag\CompoundTag; use pocketmine\network\mcpe\compression\CompressBatchPromise; -use pocketmine\network\mcpe\compression\CompressBatchTask; use pocketmine\network\mcpe\compression\Compressor; +use pocketmine\network\mcpe\compression\CompressorWorkerPool; use pocketmine\network\mcpe\compression\ZlibCompressor; use pocketmine\network\mcpe\convert\TypeConverter; use pocketmine\network\mcpe\encryption\EncryptionContext; @@ -266,8 +266,13 @@ class Server{ private bool $onlineMode = true; private Network $network; - private bool $networkCompressionAsync = true; - private int $networkCompressionAsyncThreshold = self::DEFAULT_ASYNC_COMPRESSION_THRESHOLD; + + private int $networkCompressionThreads; + /** + * @var CompressorWorkerPool[] + * @phpstan-var array + */ + private array $networkCompressionThreadPools = []; private Language $language; private bool $forceLanguage = false; @@ -905,11 +910,12 @@ class Server{ } ZlibCompressor::setInstance(new ZlibCompressor($netCompressionLevel, $netCompressionThreshold, ZlibCompressor::DEFAULT_MAX_DECOMPRESSION_SIZE)); - $this->networkCompressionAsync = $this->configGroup->getPropertyBool("network.async-compression", true); - $this->networkCompressionAsyncThreshold = max( - $this->configGroup->getPropertyInt("network.async-compression-threshold", self::DEFAULT_ASYNC_COMPRESSION_THRESHOLD), - $netCompressionThreshold ?? self::DEFAULT_ASYNC_COMPRESSION_THRESHOLD - ); + $netCompressionThreads = $this->configGroup->getPropertyString("network.compression-threads", "auto"); + if($netCompressionThreads === "auto"){ + $this->networkCompressionThreads = max(1, Utils::getCoreCount() - 2); + }else{ + $this->networkCompressionThreads = max(0, (int) $netCompressionThreads); + } EncryptionContext::$ENABLED = $this->configGroup->getPropertyBool("network.enable-encryption", true); @@ -1352,6 +1358,17 @@ class Server{ return count($recipients); } + private function getNetworkCompressionWorkerPool(Compressor $compressor) : CompressorWorkerPool{ + $compressorId = spl_object_id($compressor); + $workerPool = $this->networkCompressionThreadPools[$compressorId] ?? null; + if($workerPool === null){ + $this->logger->debug("Creating new worker pool for compressor " . get_class($compressor) . "#" . $compressorId); + $workerPool = $this->networkCompressionThreadPools[$compressorId] = new CompressorWorkerPool($this->networkCompressionThreads, $compressor, $this->tickSleeper); + } + + return $workerPool; + } + /** * Broadcasts a list of packets in a batch to a list of players * @@ -1364,14 +1381,16 @@ class Server{ if($sync === null){ $threshold = $compressor->getCompressionThreshold(); - $sync = !$this->networkCompressionAsync || $threshold === null || strlen($buffer) < $threshold; + $sync = $threshold === null || strlen($buffer) < $threshold; } - $promise = new CompressBatchPromise(); - if(!$sync && strlen($buffer) >= $this->networkCompressionAsyncThreshold){ - $task = new CompressBatchTask($buffer, $promise, $compressor); - $this->asyncPool->submitTask($task); + if(!$sync && $this->networkCompressionThreads > 0){ + $workerPool = $this->getNetworkCompressionWorkerPool($compressor); + + //TODO: we really want to be submitting all sessions' buffers in one go to maximize performance + $promise = $workerPool->submit($buffer); }else{ + $promise = new CompressBatchPromise(); $promise->resolve($compressor->compress($buffer)); } @@ -1492,6 +1511,10 @@ class Server{ $this->network->unregisterInterface($interface); } } + foreach($this->networkCompressionThreadPools as $pool){ + $this->logger->debug("Shutting down network compression thread pool for compressor " . get_class($pool->getCompressor()) . "#" . spl_object_id($pool->getCompressor())); + $pool->shutdown(); + } }catch(\Throwable $e){ $this->logger->logException($e); $this->logger->emergency("Crashed while crashing, killing process"); diff --git a/src/network/mcpe/compression/CompressorWorker.php b/src/network/mcpe/compression/CompressorWorker.php new file mode 100644 index 000000000..b85dedec4 --- /dev/null +++ b/src/network/mcpe/compression/CompressorWorker.php @@ -0,0 +1,204 @@ + */ + private ThreadSafeArray $inChannel; + /** @phpstan-var ThreadSafeArray */ + private ThreadSafeArray $outChannel; + + /** + * @var CompressBatchPromise[]|\SplQueue + * @phpstan-var \SplQueue + */ + private \SplQueue $promises; + + private readonly int $sleeperNotifierId; + + private bool $shutdown = false; + + public function __construct( + Compressor $compressor, + private SleeperHandler $sleeperHandler, + ){ + $this->inChannel = new ThreadSafeArray(); + $this->outChannel = new ThreadSafeArray(); + $this->promises = new \SplQueue(); + + $sleeperEntry = $this->sleeperHandler->addNotifier(function() : void{ + $this->processResults(); + }); + $this->sleeperNotifierId = $sleeperEntry->getNotifierId(); + + $this->thread = new class($this->inChannel, $this->outChannel, $compressor, $sleeperEntry) extends Thread{ + private string $compressor; + + /** + * @phpstan-param ThreadSafeArray $inChannel + * @phpstan-param ThreadSafeArray $outChannel + */ + public function __construct( + private ThreadSafeArray $inChannel, + private ThreadSafeArray $outChannel, + Compressor $compressor, + private SleeperHandlerEntry $sleeperEntry + ){ + $this->compressor = serialize($compressor); + } + + public function onRun() : void{ + /** @var Compressor $compressor */ + $compressor = unserialize($this->compressor); + $inChannel = $this->inChannel; + $outChannel = $this->outChannel; + $sleeperNotifier = $this->sleeperEntry->createNotifier(); + + $shutdown = false; + + while(!$shutdown){ + $inBuffers = $inChannel->synchronized(function() use ($inChannel) : array{ + while($inChannel->count() === 0){ + $inChannel->wait(); + } + /** + * @phpstan-var array $result + * @var string[]|null[] $result + */ + $result = $inChannel->chunk(100, preserve: false); + return $result; + }); + + $outBuffers = []; + foreach($inBuffers as $inBuffer){ + if($inBuffer === null){ + $shutdown = true; + //don't break here - we still need to process the rest of the buffers + }else{ + $outBuffers[] = $compressor->compress($inBuffer); + } + } + + $outChannel->synchronized(function() use ($outChannel, $outBuffers) : void{ + foreach($outBuffers as $outBuffer){ + $outChannel[] = $outBuffer; + } + }); + $sleeperNotifier->wakeupSleeper(); + } + } + + public function quit() : void{ + $inChannel = $this->inChannel; + $inChannel->synchronized(function() use ($inChannel) : void{ + $inChannel[] = null; + $inChannel->notify(); + }); + parent::quit(); + } + }; + $this->thread->setClassLoaders([]); //plugin class loaders are not needed here + $this->thread->start(); + } + + public function submit(string $buffer) : CompressBatchPromise{ + if($this->shutdown){ + throw new \LogicException("This worker has been shut down"); + } + $this->inChannel->synchronized(function() use ($buffer) : void{ + $this->inChannel[] = $buffer; + $this->inChannel->notify(); + }); + $promise = new CompressBatchPromise(); + $this->promises->enqueue($promise); + return $promise; + } + + /** + * @param string[] $buffers + * @return CompressBatchPromise[] + */ + public function submitBulk(array $buffers) : array{ + if($this->shutdown){ + throw new \LogicException("This worker has been shut down"); + } + $this->inChannel->synchronized(function() use ($buffers) : void{ + foreach($buffers as $buffer){ + $this->inChannel[] = $buffer; + } + $this->inChannel->notify(); + }); + $promises = []; + foreach($buffers as $k => $buffer){ + $promise = new CompressBatchPromise(); + $this->promises->enqueue($promise); + $promises[$k] = $promise; + } + return $promises; + } + + private function processResults() : int{ + if(count($this->promises) === 0){ + return 0; + } + + do{ + $results = $this->outChannel->synchronized(function() : array{ + /** @var string[] $results */ + $results = $this->outChannel->chunk(100, preserve: false); + return $results; + }); + foreach($results as $compressed){ + $promise = $this->promises->dequeue(); + $promise->resolve($compressed); + } + }while(count($results) > 0); + + return count($this->promises); + } + + public function shutdown() : void{ + $this->shutdown = true; + $this->thread->quit(); + if($this->processResults() > 0){ + throw new AssumptionFailedError("All compression work should have been done before shutdown"); + } + $this->sleeperHandler->removeNotifier($this->sleeperNotifierId); + } + + public function __destruct(){ + $this->shutdown(); + } +} diff --git a/src/network/mcpe/compression/CompressorWorkerPool.php b/src/network/mcpe/compression/CompressorWorkerPool.php new file mode 100644 index 000000000..8f77bb23a --- /dev/null +++ b/src/network/mcpe/compression/CompressorWorkerPool.php @@ -0,0 +1,91 @@ + + */ + private array $workers = []; + + private int $nextWorker = 0; + + public function __construct( + private readonly int $maxSize, + private readonly Compressor $compressor, + private readonly SleeperHandler $sleeperHandler, + ){} + + public function getCompressor() : Compressor{ return $this->compressor; } + + public function submit(string $buffer) : CompressBatchPromise{ + $worker = $this->workers[$this->nextWorker] ?? null; + if($worker === null){ + $worker = new CompressorWorker($this->compressor, $this->sleeperHandler); + $this->workers[$this->nextWorker] = $worker; + } + $this->nextWorker = ($this->nextWorker + 1) % $this->maxSize; + return $worker->submit($buffer); + } + + /** + * @param string[] $buffers + * @return CompressBatchPromise[] + */ + public function submitBulk(array $buffers) : array{ + $splitSize = (int) ceil(count($buffers) / $this->maxSize); + + $results = []; + $offset = 0; + for($i = 0; $i < $this->maxSize; $i++){ + $worker = $this->workers[$i] ??= new CompressorWorker($this->compressor, $this->sleeperHandler); + + $results[] = $worker->submitBulk(array_slice($buffers, $offset, $splitSize, true)); + $offset += $splitSize; + if($offset >= count($buffers)){ + break; + } + } + return array_merge(...$results); + } + + public function shutdown() : void{ + foreach($this->workers as $worker){ + $worker->shutdown(); + } + $this->workers = []; + } + + public function __destruct(){ + $this->shutdown(); + } +}