First look at specialized network compression threads

closes #5641

This has been mostly tested on Windows so far, where it offers substantial performance gains for compression.
On *nix, the performance advantage is smaller, since there's less overhead to creating new AsyncTask instances; however, the performance benefit is still clearly visible.

There are still some wrinkles to iron out, such as the dumb algorithm used for cycling through threads which wastes memory on small servers, but this change is mainly aimed at large servers, where the benefit will be clearly apparent.
In practice, this should reduce main thread CPU load by 10-20% on some of the largest servers, offering a large amount of headroom for increased player counts.
This commit is contained in:
Dylan K. Taylor 2023-06-05 21:11:13 +01:00
parent 57cbc25080
commit 20c2fae0c6
No known key found for this signature in database
GPG Key ID: 8927471A91CAFD3D
4 changed files with 335 additions and 13 deletions

View File

@ -85,6 +85,10 @@ network:
batch-threshold: 256
#Compression level used when sending batched packets. Higher = more CPU, less bandwidth usage
compression-level: 6
#EXPERIMENTAL! Max threads to use for packet compression. If disabled, compression will be done on the main thread.
#Set to 0 to disable, or "auto" to try to detect the number of available CPU cores.
#Higher values will allow using more CPU cores on large servers, but will also increase memory usage.
compression-threads: 0
#Use AsyncTasks for compression during the main game session. Increases latency, but may reduce main thread load
async-compression: false
#Threshold for async compression, in bytes. Only packets larger than this will be compressed asynchronously

View File

@ -50,8 +50,8 @@ use pocketmine\lang\LanguageNotFoundException;
use pocketmine\lang\Translatable;
use pocketmine\nbt\tag\CompoundTag;
use pocketmine\network\mcpe\compression\CompressBatchPromise;
use pocketmine\network\mcpe\compression\CompressBatchTask;
use pocketmine\network\mcpe\compression\Compressor;
use pocketmine\network\mcpe\compression\CompressorWorkerPool;
use pocketmine\network\mcpe\compression\ZlibCompressor;
use pocketmine\network\mcpe\convert\TypeConverter;
use pocketmine\network\mcpe\encryption\EncryptionContext;
@ -266,8 +266,13 @@ class Server{
private bool $onlineMode = true;
private Network $network;
private bool $networkCompressionAsync = true;
private int $networkCompressionAsyncThreshold = self::DEFAULT_ASYNC_COMPRESSION_THRESHOLD;
private int $networkCompressionThreads;
/**
* @var CompressorWorkerPool[]
* @phpstan-var array<int, CompressorWorkerPool>
*/
private array $networkCompressionThreadPools = [];
private Language $language;
private bool $forceLanguage = false;
@ -905,11 +910,12 @@ class Server{
}
ZlibCompressor::setInstance(new ZlibCompressor($netCompressionLevel, $netCompressionThreshold, ZlibCompressor::DEFAULT_MAX_DECOMPRESSION_SIZE));
$this->networkCompressionAsync = $this->configGroup->getPropertyBool("network.async-compression", true);
$this->networkCompressionAsyncThreshold = max(
$this->configGroup->getPropertyInt("network.async-compression-threshold", self::DEFAULT_ASYNC_COMPRESSION_THRESHOLD),
$netCompressionThreshold ?? self::DEFAULT_ASYNC_COMPRESSION_THRESHOLD
);
$netCompressionThreads = $this->configGroup->getPropertyString("network.compression-threads", "auto");
if($netCompressionThreads === "auto"){
$this->networkCompressionThreads = max(1, Utils::getCoreCount() - 2);
}else{
$this->networkCompressionThreads = max(0, (int) $netCompressionThreads);
}
EncryptionContext::$ENABLED = $this->configGroup->getPropertyBool("network.enable-encryption", true);
@ -1352,6 +1358,17 @@ class Server{
return count($recipients);
}
private function getNetworkCompressionWorkerPool(Compressor $compressor) : CompressorWorkerPool{
$compressorId = spl_object_id($compressor);
$workerPool = $this->networkCompressionThreadPools[$compressorId] ?? null;
if($workerPool === null){
$this->logger->debug("Creating new worker pool for compressor " . get_class($compressor) . "#" . $compressorId);
$workerPool = $this->networkCompressionThreadPools[$compressorId] = new CompressorWorkerPool($this->networkCompressionThreads, $compressor, $this->tickSleeper);
}
return $workerPool;
}
/**
* Broadcasts a list of packets in a batch to a list of players
*
@ -1364,14 +1381,16 @@ class Server{
if($sync === null){
$threshold = $compressor->getCompressionThreshold();
$sync = !$this->networkCompressionAsync || $threshold === null || strlen($buffer) < $threshold;
$sync = $threshold === null || strlen($buffer) < $threshold;
}
$promise = new CompressBatchPromise();
if(!$sync && strlen($buffer) >= $this->networkCompressionAsyncThreshold){
$task = new CompressBatchTask($buffer, $promise, $compressor);
$this->asyncPool->submitTask($task);
if(!$sync && $this->networkCompressionThreads > 0){
$workerPool = $this->getNetworkCompressionWorkerPool($compressor);
//TODO: we really want to be submitting all sessions' buffers in one go to maximize performance
$promise = $workerPool->submit($buffer);
}else{
$promise = new CompressBatchPromise();
$promise->resolve($compressor->compress($buffer));
}
@ -1492,6 +1511,10 @@ class Server{
$this->network->unregisterInterface($interface);
}
}
foreach($this->networkCompressionThreadPools as $pool){
$this->logger->debug("Shutting down network compression thread pool for compressor " . get_class($pool->getCompressor()) . "#" . spl_object_id($pool->getCompressor()));
$pool->shutdown();
}
}catch(\Throwable $e){
$this->logger->logException($e);
$this->logger->emergency("Crashed while crashing, killing process");

View File

@ -0,0 +1,204 @@
<?php
/*
*
* ____ _ _ __ __ _ __ __ ____
* | _ \ ___ ___| | _____| |_| \/ (_)_ __ ___ | \/ | _ \
* | |_) / _ \ / __| |/ / _ \ __| |\/| | | '_ \ / _ \_____| |\/| | |_) |
* | __/ (_) | (__| < __/ |_| | | | | | | | __/_____| | | | __/
* |_| \___/ \___|_|\_\___|\__|_| |_|_|_| |_|\___| |_| |_|_|
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* @author PocketMine Team
* @link http://www.pocketmine.net/
*
*
*/
declare(strict_types=1);
namespace pocketmine\network\mcpe\compression;
use pmmp\thread\ThreadSafeArray;
use pocketmine\snooze\SleeperHandler;
use pocketmine\snooze\SleeperHandlerEntry;
use pocketmine\thread\Thread;
use pocketmine\utils\AssumptionFailedError;
use function count;
use function serialize;
use function unserialize;
final class CompressorWorker{
private Thread $thread;
/** @phpstan-var ThreadSafeArray<int, string|null> */
private ThreadSafeArray $inChannel;
/** @phpstan-var ThreadSafeArray<int, string> */
private ThreadSafeArray $outChannel;
/**
* @var CompressBatchPromise[]|\SplQueue
* @phpstan-var \SplQueue<CompressBatchPromise>
*/
private \SplQueue $promises;
private readonly int $sleeperNotifierId;
private bool $shutdown = false;
public function __construct(
Compressor $compressor,
private SleeperHandler $sleeperHandler,
){
$this->inChannel = new ThreadSafeArray();
$this->outChannel = new ThreadSafeArray();
$this->promises = new \SplQueue();
$sleeperEntry = $this->sleeperHandler->addNotifier(function() : void{
$this->processResults();
});
$this->sleeperNotifierId = $sleeperEntry->getNotifierId();
$this->thread = new class($this->inChannel, $this->outChannel, $compressor, $sleeperEntry) extends Thread{
private string $compressor;
/**
* @phpstan-param ThreadSafeArray<int, string|null> $inChannel
* @phpstan-param ThreadSafeArray<int, string> $outChannel
*/
public function __construct(
private ThreadSafeArray $inChannel,
private ThreadSafeArray $outChannel,
Compressor $compressor,
private SleeperHandlerEntry $sleeperEntry
){
$this->compressor = serialize($compressor);
}
public function onRun() : void{
/** @var Compressor $compressor */
$compressor = unserialize($this->compressor);
$inChannel = $this->inChannel;
$outChannel = $this->outChannel;
$sleeperNotifier = $this->sleeperEntry->createNotifier();
$shutdown = false;
while(!$shutdown){
$inBuffers = $inChannel->synchronized(function() use ($inChannel) : array{
while($inChannel->count() === 0){
$inChannel->wait();
}
/**
* @phpstan-var array<int, string|null> $result
* @var string[]|null[] $result
*/
$result = $inChannel->chunk(100, preserve: false);
return $result;
});
$outBuffers = [];
foreach($inBuffers as $inBuffer){
if($inBuffer === null){
$shutdown = true;
//don't break here - we still need to process the rest of the buffers
}else{
$outBuffers[] = $compressor->compress($inBuffer);
}
}
$outChannel->synchronized(function() use ($outChannel, $outBuffers) : void{
foreach($outBuffers as $outBuffer){
$outChannel[] = $outBuffer;
}
});
$sleeperNotifier->wakeupSleeper();
}
}
public function quit() : void{
$inChannel = $this->inChannel;
$inChannel->synchronized(function() use ($inChannel) : void{
$inChannel[] = null;
$inChannel->notify();
});
parent::quit();
}
};
$this->thread->setClassLoaders([]); //plugin class loaders are not needed here
$this->thread->start();
}
public function submit(string $buffer) : CompressBatchPromise{
if($this->shutdown){
throw new \LogicException("This worker has been shut down");
}
$this->inChannel->synchronized(function() use ($buffer) : void{
$this->inChannel[] = $buffer;
$this->inChannel->notify();
});
$promise = new CompressBatchPromise();
$this->promises->enqueue($promise);
return $promise;
}
/**
* @param string[] $buffers
* @return CompressBatchPromise[]
*/
public function submitBulk(array $buffers) : array{
if($this->shutdown){
throw new \LogicException("This worker has been shut down");
}
$this->inChannel->synchronized(function() use ($buffers) : void{
foreach($buffers as $buffer){
$this->inChannel[] = $buffer;
}
$this->inChannel->notify();
});
$promises = [];
foreach($buffers as $k => $buffer){
$promise = new CompressBatchPromise();
$this->promises->enqueue($promise);
$promises[$k] = $promise;
}
return $promises;
}
private function processResults() : int{
if(count($this->promises) === 0){
return 0;
}
do{
$results = $this->outChannel->synchronized(function() : array{
/** @var string[] $results */
$results = $this->outChannel->chunk(100, preserve: false);
return $results;
});
foreach($results as $compressed){
$promise = $this->promises->dequeue();
$promise->resolve($compressed);
}
}while(count($results) > 0);
return count($this->promises);
}
public function shutdown() : void{
$this->shutdown = true;
$this->thread->quit();
if($this->processResults() > 0){
throw new AssumptionFailedError("All compression work should have been done before shutdown");
}
$this->sleeperHandler->removeNotifier($this->sleeperNotifierId);
}
public function __destruct(){
$this->shutdown();
}
}

View File

@ -0,0 +1,91 @@
<?php
/*
*
* ____ _ _ __ __ _ __ __ ____
* | _ \ ___ ___| | _____| |_| \/ (_)_ __ ___ | \/ | _ \
* | |_) / _ \ / __| |/ / _ \ __| |\/| | | '_ \ / _ \_____| |\/| | |_) |
* | __/ (_) | (__| < __/ |_| | | | | | | | __/_____| | | | __/
* |_| \___/ \___|_|\_\___|\__|_| |_|_|_| |_|\___| |_| |_|_|
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* @author PocketMine Team
* @link http://www.pocketmine.net/
*
*
*/
declare(strict_types=1);
namespace pocketmine\network\mcpe\compression;
use pocketmine\snooze\SleeperHandler;
use function array_merge;
use function array_slice;
use function ceil;
use function count;
final class CompressorWorkerPool{
/**
* @var CompressorWorker[]
* @phpstan-var array<int, CompressorWorker>
*/
private array $workers = [];
private int $nextWorker = 0;
public function __construct(
private readonly int $maxSize,
private readonly Compressor $compressor,
private readonly SleeperHandler $sleeperHandler,
){}
public function getCompressor() : Compressor{ return $this->compressor; }
public function submit(string $buffer) : CompressBatchPromise{
$worker = $this->workers[$this->nextWorker] ?? null;
if($worker === null){
$worker = new CompressorWorker($this->compressor, $this->sleeperHandler);
$this->workers[$this->nextWorker] = $worker;
}
$this->nextWorker = ($this->nextWorker + 1) % $this->maxSize;
return $worker->submit($buffer);
}
/**
* @param string[] $buffers
* @return CompressBatchPromise[]
*/
public function submitBulk(array $buffers) : array{
$splitSize = (int) ceil(count($buffers) / $this->maxSize);
$results = [];
$offset = 0;
for($i = 0; $i < $this->maxSize; $i++){
$worker = $this->workers[$i] ??= new CompressorWorker($this->compressor, $this->sleeperHandler);
$results[] = $worker->submitBulk(array_slice($buffers, $offset, $splitSize, true));
$offset += $splitSize;
if($offset >= count($buffers)){
break;
}
}
return array_merge(...$results);
}
public function shutdown() : void{
foreach($this->workers as $worker){
$worker->shutdown();
}
$this->workers = [];
}
public function __destruct(){
$this->shutdown();
}
}