Compare commits

...

12 Commits

Author SHA1 Message Date
3d790e3e4b Merge remote-tracking branch 'origin/minor-next' into compressor-threads 2023-11-16 09:50:19 +00:00
e92923aa10 Remove debug timings 2023-11-13 15:08:34 +00:00
d35d9e6ecf Enable compression-threads by default 2023-11-13 15:07:38 +00:00
671c65d787 Merge branch 'minor-next' into compressor-threads 2023-11-13 11:40:31 +00:00
7e6550d05a Merge branch 'minor-next' into compressor-threads 2023-11-13 11:14:19 +00:00
366968722f CompressorWorker: isolate thread body inside static function
this prevents accidental access to shared properties, which would cost performance.
2023-11-13 10:50:33 +00:00
135fe149f0 Merge branch 'minor-next' into compressor-threads 2023-11-09 18:05:57 +00:00
94023b48e5 Added more timings 2023-11-09 14:55:26 +00:00
ff47da6675 Merge remote-tracking branch 'origin/minor-next' into compressor-threads 2023-11-09 14:41:24 +00:00
0f9f0a3b7f Remove async compression pocketmine.yml options
CompressBatchTask is kept for internal testing for now
2023-06-05 21:36:33 +01:00
3e4d8f4a60 shush 2023-06-05 21:21:26 +01:00
20c2fae0c6 First look at specialized network compression threads
closes #5641

This has been mostly tested on Windows so far, where it offers substantial performance gains for compression.
On *nix, the performance advantage is smaller, since there's less overhead to creating new AsyncTask instances; however, the performance benefit is still clearly visible.

There are still some wrinkles to iron out, such as the dumb algorithm used for cycling through threads which wastes memory on small servers, but this change is mainly aimed at large servers, where the benefit will be clearly apparent.
In practice, this should reduce main thread CPU load by 10-20% on some of the largest servers, offering a large amount of headroom for increased player counts.
2023-06-05 21:11:13 +01:00
6 changed files with 341 additions and 76 deletions

View File

@ -85,11 +85,10 @@ network:
batch-threshold: 256
#Compression level used when sending batched packets. Higher = more CPU, less bandwidth usage
compression-level: 6
#Use AsyncTasks for compression during the main game session. Increases latency, but may reduce main thread load
async-compression: false
#Threshold for async compression, in bytes. Only packets larger than this will be compressed asynchronously
#Due to large overhead of AsyncTask, async compression isn't worth it except for large packets
async-compression-threshold: 10000
#Max threads to use for packet compression. If disabled, compression will be done on the main thread.
#Set to 0 to disable, or "auto" to try to detect the number of available CPU cores.
#Higher values will allow using more CPU cores, but will also increase memory usage.
compression-threads: auto
#Experimental. Use UPnP to automatically port forward
upnp-forwarding: false
#Maximum size in bytes of packets sent over the network (default 1492 bytes). Packets larger than this will be

View File

@ -50,8 +50,8 @@ use pocketmine\lang\LanguageNotFoundException;
use pocketmine\lang\Translatable;
use pocketmine\nbt\tag\CompoundTag;
use pocketmine\network\mcpe\compression\CompressBatchPromise;
use pocketmine\network\mcpe\compression\CompressBatchTask;
use pocketmine\network\mcpe\compression\Compressor;
use pocketmine\network\mcpe\compression\CompressorWorkerPool;
use pocketmine\network\mcpe\compression\ZlibCompressor;
use pocketmine\network\mcpe\convert\TypeConverter;
use pocketmine\network\mcpe\encryption\EncryptionContext;
@ -208,8 +208,6 @@ class Server{
private const TICKS_PER_TPS_OVERLOAD_WARNING = 5 * self::TARGET_TICKS_PER_SECOND;
private const TICKS_PER_STATS_REPORT = 300 * self::TARGET_TICKS_PER_SECOND;
private const DEFAULT_ASYNC_COMPRESSION_THRESHOLD = 10_000;
private static ?Server $instance = null;
private TimeTrackingSleeperHandler $tickSleeper;
@ -267,8 +265,13 @@ class Server{
private bool $onlineMode = true;
private Network $network;
private bool $networkCompressionAsync = true;
private int $networkCompressionAsyncThreshold = self::DEFAULT_ASYNC_COMPRESSION_THRESHOLD;
private int $networkCompressionThreads;
/**
* @var CompressorWorkerPool[]
* @phpstan-var array<int, CompressorWorkerPool>
*/
private array $networkCompressionThreadPools = [];
private Language $language;
private bool $forceLanguage = false;
@ -907,11 +910,12 @@ class Server{
}
ZlibCompressor::setInstance(new ZlibCompressor($netCompressionLevel, $netCompressionThreshold, ZlibCompressor::DEFAULT_MAX_DECOMPRESSION_SIZE));
$this->networkCompressionAsync = $this->configGroup->getPropertyBool(Yml::NETWORK_ASYNC_COMPRESSION, true);
$this->networkCompressionAsyncThreshold = max(
$this->configGroup->getPropertyInt(Yml::NETWORK_ASYNC_COMPRESSION_THRESHOLD, self::DEFAULT_ASYNC_COMPRESSION_THRESHOLD),
$netCompressionThreshold ?? self::DEFAULT_ASYNC_COMPRESSION_THRESHOLD
);
$netCompressionThreads = $this->configGroup->getPropertyString(Yml::NETWORK_COMPRESSION_THREADS, "auto");
if($netCompressionThreads === "auto"){
$this->networkCompressionThreads = max(1, Utils::getCoreCount() - 2);
}else{
$this->networkCompressionThreads = max(0, (int) $netCompressionThreads);
}
EncryptionContext::$ENABLED = $this->configGroup->getPropertyBool(Yml::NETWORK_ENABLE_ENCRYPTION, true);
@ -1355,6 +1359,17 @@ class Server{
return count($recipients);
}
private function getNetworkCompressionWorkerPool(Compressor $compressor) : CompressorWorkerPool{
$compressorId = spl_object_id($compressor);
$workerPool = $this->networkCompressionThreadPools[$compressorId] ?? null;
if($workerPool === null){
$this->logger->debug("Creating new worker pool for compressor " . get_class($compressor) . "#" . $compressorId);
$workerPool = $this->networkCompressionThreadPools[$compressorId] = new CompressorWorkerPool($this->networkCompressionThreads, $compressor, $this->tickSleeper);
}
return $workerPool;
}
/**
* @internal
* Broadcasts a list of packets in a batch to a list of players
@ -1368,14 +1383,16 @@ class Server{
if($sync === null){
$threshold = $compressor->getCompressionThreshold();
$sync = !$this->networkCompressionAsync || $threshold === null || strlen($buffer) < $threshold;
$sync = $threshold === null || strlen($buffer) < $threshold;
}
$promise = new CompressBatchPromise();
if(!$sync && strlen($buffer) >= $this->networkCompressionAsyncThreshold){
$task = new CompressBatchTask($buffer, $promise, $compressor);
$this->asyncPool->submitTask($task);
if(!$sync && $this->networkCompressionThreads > 0){
$workerPool = $this->getNetworkCompressionWorkerPool($compressor);
//TODO: we really want to be submitting all sessions' buffers in one go to maximize performance
$promise = $workerPool->submit($buffer);
}else{
$promise = new CompressBatchPromise();
$promise->resolve($compressor->compress($buffer));
}
@ -1496,6 +1513,10 @@ class Server{
$this->network->unregisterInterface($interface);
}
}
foreach($this->networkCompressionThreadPools as $pool){
$this->logger->debug("Shutting down network compression thread pool for compressor " . get_class($pool->getCompressor()) . "#" . spl_object_id($pool->getCompressor()));
$pool->shutdown();
}
}catch(\Throwable $e){
$this->logger->logException($e);
$this->logger->emergency("Crashed while crashing, killing process");

View File

@ -90,10 +90,9 @@ final class YmlServerProperties{
public const MEMORY_WORLD_CACHES_DISABLE_CHUNK_CACHE = 'memory.world-caches.disable-chunk-cache';
public const MEMORY_WORLD_CACHES_LOW_MEMORY_TRIGGER = 'memory.world-caches.low-memory-trigger';
public const NETWORK = 'network';
public const NETWORK_ASYNC_COMPRESSION = 'network.async-compression';
public const NETWORK_ASYNC_COMPRESSION_THRESHOLD = 'network.async-compression-threshold';
public const NETWORK_BATCH_THRESHOLD = 'network.batch-threshold';
public const NETWORK_COMPRESSION_LEVEL = 'network.compression-level';
public const NETWORK_COMPRESSION_THREADS = 'network.compression-threads';
public const NETWORK_ENABLE_ENCRYPTION = 'network.enable-encryption';
public const NETWORK_MAX_MTU_SIZE = 'network.max-mtu-size';
public const NETWORK_UPNP_FORWARDING = 'network.upnp-forwarding';

View File

@ -1,54 +0,0 @@
<?php
/*
*
* ____ _ _ __ __ _ __ __ ____
* | _ \ ___ ___| | _____| |_| \/ (_)_ __ ___ | \/ | _ \
* | |_) / _ \ / __| |/ / _ \ __| |\/| | | '_ \ / _ \_____| |\/| | |_) |
* | __/ (_) | (__| < __/ |_| | | | | | | | __/_____| | | | __/
* |_| \___/ \___|_|\_\___|\__|_| |_|_|_| |_|\___| |_| |_|_|
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* @author PocketMine Team
* @link http://www.pocketmine.net/
*
*
*/
declare(strict_types=1);
namespace pocketmine\network\mcpe\compression;
use pocketmine\scheduler\AsyncTask;
use pocketmine\thread\NonThreadSafeValue;
class CompressBatchTask extends AsyncTask{
private const TLS_KEY_PROMISE = "promise";
/** @phpstan-var NonThreadSafeValue<Compressor> */
private NonThreadSafeValue $compressor;
public function __construct(
private string $data,
CompressBatchPromise $promise,
Compressor $compressor
){
$this->compressor = new NonThreadSafeValue($compressor);
$this->storeLocal(self::TLS_KEY_PROMISE, $promise);
}
public function onRun() : void{
$this->setResult($this->compressor->deserialize()->compress($this->data));
}
public function onCompletion() : void{
/** @var CompressBatchPromise $promise */
$promise = $this->fetchLocal(self::TLS_KEY_PROMISE);
$promise->resolve($this->getResult());
}
}

View File

@ -0,0 +1,209 @@
<?php
/*
*
* ____ _ _ __ __ _ __ __ ____
* | _ \ ___ ___| | _____| |_| \/ (_)_ __ ___ | \/ | _ \
* | |_) / _ \ / __| |/ / _ \ __| |\/| | | '_ \ / _ \_____| |\/| | |_) |
* | __/ (_) | (__| < __/ |_| | | | | | | | __/_____| | | | __/
* |_| \___/ \___|_|\_\___|\__|_| |_|_|_| |_|\___| |_| |_|_|
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* @author PocketMine Team
* @link http://www.pocketmine.net/
*
*
*/
declare(strict_types=1);
namespace pocketmine\network\mcpe\compression;
use pmmp\thread\ThreadSafeArray;
use pocketmine\snooze\SleeperHandler;
use pocketmine\snooze\SleeperHandlerEntry;
use pocketmine\snooze\SleeperNotifier;
use pocketmine\thread\Thread;
use pocketmine\utils\AssumptionFailedError;
use function count;
use function serialize;
use function unserialize;
final class CompressorWorker{
private Thread $thread;
/** @phpstan-var ThreadSafeArray<int, string|null> */
private ThreadSafeArray $inChannel;
/** @phpstan-var ThreadSafeArray<int, string> */
private ThreadSafeArray $outChannel;
/**
* @var CompressBatchPromise[]|\SplQueue
* @phpstan-var \SplQueue<CompressBatchPromise>
*/
private \SplQueue $promises;
private readonly int $sleeperNotifierId;
private bool $shutdown = false;
public function __construct(
Compressor $compressor,
private SleeperHandler $sleeperHandler,
){
$this->inChannel = new ThreadSafeArray();
$this->outChannel = new ThreadSafeArray();
$this->promises = new \SplQueue();
$sleeperEntry = $this->sleeperHandler->addNotifier(function() : void{
$this->processResults();
});
$this->sleeperNotifierId = $sleeperEntry->getNotifierId();
$this->thread = new class($this->inChannel, $this->outChannel, $compressor, $sleeperEntry) extends Thread{
private string $compressor;
/**
* @phpstan-param ThreadSafeArray<int, string|null> $inChannel
* @phpstan-param ThreadSafeArray<int, string> $outChannel
*/
public function __construct(
private ThreadSafeArray $inChannel,
private ThreadSafeArray $outChannel,
Compressor $compressor,
private SleeperHandlerEntry $sleeperEntry
){
$this->compressor = serialize($compressor);
}
public function onRun() : void{
/** @var Compressor $compressor */
$compressor = unserialize($this->compressor);
self::thread($this->inChannel, $compressor, $this->outChannel, $this->sleeperEntry->createNotifier());
}
/**
* @phpstan-param ThreadSafeArray<int, string|null> $inChannel
* @phpstan-param ThreadSafeArray<int, string> $outChannel
*/
private static function thread(ThreadSafeArray $inChannel, Compressor $compressor, ThreadSafeArray $outChannel, SleeperNotifier $sleeperNotifier) : void{
$shutdown = false;
while(!$shutdown){
$inBuffers = $inChannel->synchronized(function() use ($inChannel) : array{
while($inChannel->count() === 0){
$inChannel->wait();
}
/**
* @phpstan-var array<int, string|null> $result
* @var string[]|null[] $result
*/
$result = $inChannel->chunk(100, preserve: false);
return $result;
});
$outBuffers = [];
foreach($inBuffers as $inBuffer){
if($inBuffer === null){
$shutdown = true;
//don't break here - we still need to process the rest of the buffers
}else{
$outBuffers[] = $compressor->compress($inBuffer);
}
}
$outChannel->synchronized(function() use ($outChannel, $outBuffers) : void{
foreach($outBuffers as $outBuffer){
$outChannel[] = $outBuffer;
}
});
$sleeperNotifier->wakeupSleeper();
}
}
public function quit() : void{
$inChannel = $this->inChannel;
$inChannel->synchronized(function() use ($inChannel) : void{
$inChannel[] = null;
$inChannel->notify();
});
parent::quit();
}
};
$this->thread->setClassLoaders([]); //plugin class loaders are not needed here
$this->thread->start();
}
public function submit(string $buffer) : CompressBatchPromise{
if($this->shutdown){
throw new \LogicException("This worker has been shut down");
}
$this->inChannel->synchronized(function() use ($buffer) : void{
$this->inChannel[] = $buffer;
$this->inChannel->notify();
});
$promise = new CompressBatchPromise();
$this->promises->enqueue($promise);
return $promise;
}
/**
* @param string[] $buffers
* @return CompressBatchPromise[]
*/
public function submitBulk(array $buffers) : array{
if($this->shutdown){
throw new \LogicException("This worker has been shut down");
}
$this->inChannel->synchronized(function() use ($buffers) : void{
foreach($buffers as $buffer){
$this->inChannel[] = $buffer;
}
$this->inChannel->notify();
});
$promises = [];
foreach($buffers as $k => $buffer){
$promise = new CompressBatchPromise();
$this->promises->enqueue($promise);
$promises[$k] = $promise;
}
return $promises;
}
private function processResults() : int{
if(count($this->promises) === 0){
return 0;
}
do{
$results = $this->outChannel->synchronized(function() : array{
/** @var string[] $results */
$results = $this->outChannel->chunk(100, preserve: false);
return $results;
});
foreach($results as $compressed){
$promise = $this->promises->dequeue();
$promise->resolve($compressed);
}
}while(count($results) > 0);
return count($this->promises);
}
public function shutdown() : void{
$this->shutdown = true;
$this->thread->quit();
if($this->processResults() > 0){
throw new AssumptionFailedError("All compression work should have been done before shutdown");
}
$this->sleeperHandler->removeNotifier($this->sleeperNotifierId);
}
public function __destruct(){
$this->shutdown();
}
}

View File

@ -0,0 +1,91 @@
<?php
/*
*
* ____ _ _ __ __ _ __ __ ____
* | _ \ ___ ___| | _____| |_| \/ (_)_ __ ___ | \/ | _ \
* | |_) / _ \ / __| |/ / _ \ __| |\/| | | '_ \ / _ \_____| |\/| | |_) |
* | __/ (_) | (__| < __/ |_| | | | | | | | __/_____| | | | __/
* |_| \___/ \___|_|\_\___|\__|_| |_|_|_| |_|\___| |_| |_|_|
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* @author PocketMine Team
* @link http://www.pocketmine.net/
*
*
*/
declare(strict_types=1);
namespace pocketmine\network\mcpe\compression;
use pocketmine\snooze\SleeperHandler;
use function array_merge;
use function array_slice;
use function ceil;
use function count;
final class CompressorWorkerPool{
/**
* @var CompressorWorker[]
* @phpstan-var array<int, CompressorWorker>
*/
private array $workers = [];
private int $nextWorker = 0;
public function __construct(
private readonly int $maxSize,
private readonly Compressor $compressor,
private readonly SleeperHandler $sleeperHandler,
){}
public function getCompressor() : Compressor{ return $this->compressor; }
public function submit(string $buffer) : CompressBatchPromise{
$worker = $this->workers[$this->nextWorker] ?? null;
if($worker === null){
$worker = new CompressorWorker($this->compressor, $this->sleeperHandler);
$this->workers[$this->nextWorker] = $worker;
}
$this->nextWorker = ($this->nextWorker + 1) % $this->maxSize;
return $worker->submit($buffer);
}
/**
* @param string[] $buffers
* @return CompressBatchPromise[]
*/
public function submitBulk(array $buffers) : array{
$splitSize = (int) ceil(count($buffers) / $this->maxSize);
$results = [];
$offset = 0;
for($i = 0; $i < $this->maxSize; $i++){
$worker = $this->workers[$i] ??= new CompressorWorker($this->compressor, $this->sleeperHandler);
$results[] = $worker->submitBulk(array_slice($buffers, $offset, $splitSize, true));
$offset += $splitSize;
if($offset >= count($buffers)){
break;
}
}
return array_merge(...$results);
}
public function shutdown() : void{
foreach($this->workers as $worker){
$worker->shutdown();
}
$this->workers = [];
}
public function __destruct(){
$this->shutdown();
}
}