mirror of
https://github.com/pmmp/PocketMine-MP.git
synced 2025-09-09 11:16:57 +00:00
Compare commits
12 Commits
5.16.0
...
compressor
Author | SHA1 | Date | |
---|---|---|---|
3d790e3e4b | |||
e92923aa10 | |||
d35d9e6ecf | |||
671c65d787 | |||
7e6550d05a | |||
366968722f | |||
135fe149f0 | |||
94023b48e5 | |||
ff47da6675 | |||
0f9f0a3b7f | |||
3e4d8f4a60 | |||
20c2fae0c6 |
@ -85,11 +85,10 @@ network:
|
||||
batch-threshold: 256
|
||||
#Compression level used when sending batched packets. Higher = more CPU, less bandwidth usage
|
||||
compression-level: 6
|
||||
#Use AsyncTasks for compression during the main game session. Increases latency, but may reduce main thread load
|
||||
async-compression: false
|
||||
#Threshold for async compression, in bytes. Only packets larger than this will be compressed asynchronously
|
||||
#Due to large overhead of AsyncTask, async compression isn't worth it except for large packets
|
||||
async-compression-threshold: 10000
|
||||
#Max threads to use for packet compression. If disabled, compression will be done on the main thread.
|
||||
#Set to 0 to disable, or "auto" to try to detect the number of available CPU cores.
|
||||
#Higher values will allow using more CPU cores, but will also increase memory usage.
|
||||
compression-threads: auto
|
||||
#Experimental. Use UPnP to automatically port forward
|
||||
upnp-forwarding: false
|
||||
#Maximum size in bytes of packets sent over the network (default 1492 bytes). Packets larger than this will be
|
||||
|
@ -50,8 +50,8 @@ use pocketmine\lang\LanguageNotFoundException;
|
||||
use pocketmine\lang\Translatable;
|
||||
use pocketmine\nbt\tag\CompoundTag;
|
||||
use pocketmine\network\mcpe\compression\CompressBatchPromise;
|
||||
use pocketmine\network\mcpe\compression\CompressBatchTask;
|
||||
use pocketmine\network\mcpe\compression\Compressor;
|
||||
use pocketmine\network\mcpe\compression\CompressorWorkerPool;
|
||||
use pocketmine\network\mcpe\compression\ZlibCompressor;
|
||||
use pocketmine\network\mcpe\convert\TypeConverter;
|
||||
use pocketmine\network\mcpe\encryption\EncryptionContext;
|
||||
@ -208,8 +208,6 @@ class Server{
|
||||
private const TICKS_PER_TPS_OVERLOAD_WARNING = 5 * self::TARGET_TICKS_PER_SECOND;
|
||||
private const TICKS_PER_STATS_REPORT = 300 * self::TARGET_TICKS_PER_SECOND;
|
||||
|
||||
private const DEFAULT_ASYNC_COMPRESSION_THRESHOLD = 10_000;
|
||||
|
||||
private static ?Server $instance = null;
|
||||
|
||||
private TimeTrackingSleeperHandler $tickSleeper;
|
||||
@ -267,8 +265,13 @@ class Server{
|
||||
private bool $onlineMode = true;
|
||||
|
||||
private Network $network;
|
||||
private bool $networkCompressionAsync = true;
|
||||
private int $networkCompressionAsyncThreshold = self::DEFAULT_ASYNC_COMPRESSION_THRESHOLD;
|
||||
|
||||
private int $networkCompressionThreads;
|
||||
/**
|
||||
* @var CompressorWorkerPool[]
|
||||
* @phpstan-var array<int, CompressorWorkerPool>
|
||||
*/
|
||||
private array $networkCompressionThreadPools = [];
|
||||
|
||||
private Language $language;
|
||||
private bool $forceLanguage = false;
|
||||
@ -907,11 +910,12 @@ class Server{
|
||||
}
|
||||
ZlibCompressor::setInstance(new ZlibCompressor($netCompressionLevel, $netCompressionThreshold, ZlibCompressor::DEFAULT_MAX_DECOMPRESSION_SIZE));
|
||||
|
||||
$this->networkCompressionAsync = $this->configGroup->getPropertyBool(Yml::NETWORK_ASYNC_COMPRESSION, true);
|
||||
$this->networkCompressionAsyncThreshold = max(
|
||||
$this->configGroup->getPropertyInt(Yml::NETWORK_ASYNC_COMPRESSION_THRESHOLD, self::DEFAULT_ASYNC_COMPRESSION_THRESHOLD),
|
||||
$netCompressionThreshold ?? self::DEFAULT_ASYNC_COMPRESSION_THRESHOLD
|
||||
);
|
||||
$netCompressionThreads = $this->configGroup->getPropertyString(Yml::NETWORK_COMPRESSION_THREADS, "auto");
|
||||
if($netCompressionThreads === "auto"){
|
||||
$this->networkCompressionThreads = max(1, Utils::getCoreCount() - 2);
|
||||
}else{
|
||||
$this->networkCompressionThreads = max(0, (int) $netCompressionThreads);
|
||||
}
|
||||
|
||||
EncryptionContext::$ENABLED = $this->configGroup->getPropertyBool(Yml::NETWORK_ENABLE_ENCRYPTION, true);
|
||||
|
||||
@ -1355,6 +1359,17 @@ class Server{
|
||||
return count($recipients);
|
||||
}
|
||||
|
||||
private function getNetworkCompressionWorkerPool(Compressor $compressor) : CompressorWorkerPool{
|
||||
$compressorId = spl_object_id($compressor);
|
||||
$workerPool = $this->networkCompressionThreadPools[$compressorId] ?? null;
|
||||
if($workerPool === null){
|
||||
$this->logger->debug("Creating new worker pool for compressor " . get_class($compressor) . "#" . $compressorId);
|
||||
$workerPool = $this->networkCompressionThreadPools[$compressorId] = new CompressorWorkerPool($this->networkCompressionThreads, $compressor, $this->tickSleeper);
|
||||
}
|
||||
|
||||
return $workerPool;
|
||||
}
|
||||
|
||||
/**
|
||||
* @internal
|
||||
* Broadcasts a list of packets in a batch to a list of players
|
||||
@ -1368,14 +1383,16 @@ class Server{
|
||||
|
||||
if($sync === null){
|
||||
$threshold = $compressor->getCompressionThreshold();
|
||||
$sync = !$this->networkCompressionAsync || $threshold === null || strlen($buffer) < $threshold;
|
||||
$sync = $threshold === null || strlen($buffer) < $threshold;
|
||||
}
|
||||
|
||||
$promise = new CompressBatchPromise();
|
||||
if(!$sync && strlen($buffer) >= $this->networkCompressionAsyncThreshold){
|
||||
$task = new CompressBatchTask($buffer, $promise, $compressor);
|
||||
$this->asyncPool->submitTask($task);
|
||||
if(!$sync && $this->networkCompressionThreads > 0){
|
||||
$workerPool = $this->getNetworkCompressionWorkerPool($compressor);
|
||||
|
||||
//TODO: we really want to be submitting all sessions' buffers in one go to maximize performance
|
||||
$promise = $workerPool->submit($buffer);
|
||||
}else{
|
||||
$promise = new CompressBatchPromise();
|
||||
$promise->resolve($compressor->compress($buffer));
|
||||
}
|
||||
|
||||
@ -1496,6 +1513,10 @@ class Server{
|
||||
$this->network->unregisterInterface($interface);
|
||||
}
|
||||
}
|
||||
foreach($this->networkCompressionThreadPools as $pool){
|
||||
$this->logger->debug("Shutting down network compression thread pool for compressor " . get_class($pool->getCompressor()) . "#" . spl_object_id($pool->getCompressor()));
|
||||
$pool->shutdown();
|
||||
}
|
||||
}catch(\Throwable $e){
|
||||
$this->logger->logException($e);
|
||||
$this->logger->emergency("Crashed while crashing, killing process");
|
||||
|
@ -90,10 +90,9 @@ final class YmlServerProperties{
|
||||
public const MEMORY_WORLD_CACHES_DISABLE_CHUNK_CACHE = 'memory.world-caches.disable-chunk-cache';
|
||||
public const MEMORY_WORLD_CACHES_LOW_MEMORY_TRIGGER = 'memory.world-caches.low-memory-trigger';
|
||||
public const NETWORK = 'network';
|
||||
public const NETWORK_ASYNC_COMPRESSION = 'network.async-compression';
|
||||
public const NETWORK_ASYNC_COMPRESSION_THRESHOLD = 'network.async-compression-threshold';
|
||||
public const NETWORK_BATCH_THRESHOLD = 'network.batch-threshold';
|
||||
public const NETWORK_COMPRESSION_LEVEL = 'network.compression-level';
|
||||
public const NETWORK_COMPRESSION_THREADS = 'network.compression-threads';
|
||||
public const NETWORK_ENABLE_ENCRYPTION = 'network.enable-encryption';
|
||||
public const NETWORK_MAX_MTU_SIZE = 'network.max-mtu-size';
|
||||
public const NETWORK_UPNP_FORWARDING = 'network.upnp-forwarding';
|
||||
|
@ -1,54 +0,0 @@
|
||||
<?php
|
||||
|
||||
/*
|
||||
*
|
||||
* ____ _ _ __ __ _ __ __ ____
|
||||
* | _ \ ___ ___| | _____| |_| \/ (_)_ __ ___ | \/ | _ \
|
||||
* | |_) / _ \ / __| |/ / _ \ __| |\/| | | '_ \ / _ \_____| |\/| | |_) |
|
||||
* | __/ (_) | (__| < __/ |_| | | | | | | | __/_____| | | | __/
|
||||
* |_| \___/ \___|_|\_\___|\__|_| |_|_|_| |_|\___| |_| |_|_|
|
||||
*
|
||||
* This program is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU Lesser General Public License as published by
|
||||
* the Free Software Foundation, either version 3 of the License, or
|
||||
* (at your option) any later version.
|
||||
*
|
||||
* @author PocketMine Team
|
||||
* @link http://www.pocketmine.net/
|
||||
*
|
||||
*
|
||||
*/
|
||||
|
||||
declare(strict_types=1);
|
||||
|
||||
namespace pocketmine\network\mcpe\compression;
|
||||
|
||||
use pocketmine\scheduler\AsyncTask;
|
||||
use pocketmine\thread\NonThreadSafeValue;
|
||||
|
||||
class CompressBatchTask extends AsyncTask{
|
||||
|
||||
private const TLS_KEY_PROMISE = "promise";
|
||||
|
||||
/** @phpstan-var NonThreadSafeValue<Compressor> */
|
||||
private NonThreadSafeValue $compressor;
|
||||
|
||||
public function __construct(
|
||||
private string $data,
|
||||
CompressBatchPromise $promise,
|
||||
Compressor $compressor
|
||||
){
|
||||
$this->compressor = new NonThreadSafeValue($compressor);
|
||||
$this->storeLocal(self::TLS_KEY_PROMISE, $promise);
|
||||
}
|
||||
|
||||
public function onRun() : void{
|
||||
$this->setResult($this->compressor->deserialize()->compress($this->data));
|
||||
}
|
||||
|
||||
public function onCompletion() : void{
|
||||
/** @var CompressBatchPromise $promise */
|
||||
$promise = $this->fetchLocal(self::TLS_KEY_PROMISE);
|
||||
$promise->resolve($this->getResult());
|
||||
}
|
||||
}
|
209
src/network/mcpe/compression/CompressorWorker.php
Normal file
209
src/network/mcpe/compression/CompressorWorker.php
Normal file
@ -0,0 +1,209 @@
|
||||
<?php
|
||||
|
||||
/*
|
||||
*
|
||||
* ____ _ _ __ __ _ __ __ ____
|
||||
* | _ \ ___ ___| | _____| |_| \/ (_)_ __ ___ | \/ | _ \
|
||||
* | |_) / _ \ / __| |/ / _ \ __| |\/| | | '_ \ / _ \_____| |\/| | |_) |
|
||||
* | __/ (_) | (__| < __/ |_| | | | | | | | __/_____| | | | __/
|
||||
* |_| \___/ \___|_|\_\___|\__|_| |_|_|_| |_|\___| |_| |_|_|
|
||||
*
|
||||
* This program is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU Lesser General Public License as published by
|
||||
* the Free Software Foundation, either version 3 of the License, or
|
||||
* (at your option) any later version.
|
||||
*
|
||||
* @author PocketMine Team
|
||||
* @link http://www.pocketmine.net/
|
||||
*
|
||||
*
|
||||
*/
|
||||
|
||||
declare(strict_types=1);
|
||||
|
||||
namespace pocketmine\network\mcpe\compression;
|
||||
|
||||
use pmmp\thread\ThreadSafeArray;
|
||||
use pocketmine\snooze\SleeperHandler;
|
||||
use pocketmine\snooze\SleeperHandlerEntry;
|
||||
use pocketmine\snooze\SleeperNotifier;
|
||||
use pocketmine\thread\Thread;
|
||||
use pocketmine\utils\AssumptionFailedError;
|
||||
use function count;
|
||||
use function serialize;
|
||||
use function unserialize;
|
||||
|
||||
final class CompressorWorker{
|
||||
private Thread $thread;
|
||||
|
||||
/** @phpstan-var ThreadSafeArray<int, string|null> */
|
||||
private ThreadSafeArray $inChannel;
|
||||
/** @phpstan-var ThreadSafeArray<int, string> */
|
||||
private ThreadSafeArray $outChannel;
|
||||
|
||||
/**
|
||||
* @var CompressBatchPromise[]|\SplQueue
|
||||
* @phpstan-var \SplQueue<CompressBatchPromise>
|
||||
*/
|
||||
private \SplQueue $promises;
|
||||
|
||||
private readonly int $sleeperNotifierId;
|
||||
|
||||
private bool $shutdown = false;
|
||||
|
||||
public function __construct(
|
||||
Compressor $compressor,
|
||||
private SleeperHandler $sleeperHandler,
|
||||
){
|
||||
$this->inChannel = new ThreadSafeArray();
|
||||
$this->outChannel = new ThreadSafeArray();
|
||||
$this->promises = new \SplQueue();
|
||||
|
||||
$sleeperEntry = $this->sleeperHandler->addNotifier(function() : void{
|
||||
$this->processResults();
|
||||
});
|
||||
$this->sleeperNotifierId = $sleeperEntry->getNotifierId();
|
||||
|
||||
$this->thread = new class($this->inChannel, $this->outChannel, $compressor, $sleeperEntry) extends Thread{
|
||||
private string $compressor;
|
||||
|
||||
/**
|
||||
* @phpstan-param ThreadSafeArray<int, string|null> $inChannel
|
||||
* @phpstan-param ThreadSafeArray<int, string> $outChannel
|
||||
*/
|
||||
public function __construct(
|
||||
private ThreadSafeArray $inChannel,
|
||||
private ThreadSafeArray $outChannel,
|
||||
Compressor $compressor,
|
||||
private SleeperHandlerEntry $sleeperEntry
|
||||
){
|
||||
$this->compressor = serialize($compressor);
|
||||
}
|
||||
|
||||
public function onRun() : void{
|
||||
/** @var Compressor $compressor */
|
||||
$compressor = unserialize($this->compressor);
|
||||
self::thread($this->inChannel, $compressor, $this->outChannel, $this->sleeperEntry->createNotifier());
|
||||
}
|
||||
|
||||
/**
|
||||
* @phpstan-param ThreadSafeArray<int, string|null> $inChannel
|
||||
* @phpstan-param ThreadSafeArray<int, string> $outChannel
|
||||
*/
|
||||
private static function thread(ThreadSafeArray $inChannel, Compressor $compressor, ThreadSafeArray $outChannel, SleeperNotifier $sleeperNotifier) : void{
|
||||
$shutdown = false;
|
||||
|
||||
while(!$shutdown){
|
||||
$inBuffers = $inChannel->synchronized(function() use ($inChannel) : array{
|
||||
while($inChannel->count() === 0){
|
||||
$inChannel->wait();
|
||||
}
|
||||
/**
|
||||
* @phpstan-var array<int, string|null> $result
|
||||
* @var string[]|null[] $result
|
||||
*/
|
||||
$result = $inChannel->chunk(100, preserve: false);
|
||||
return $result;
|
||||
});
|
||||
|
||||
$outBuffers = [];
|
||||
foreach($inBuffers as $inBuffer){
|
||||
if($inBuffer === null){
|
||||
$shutdown = true;
|
||||
//don't break here - we still need to process the rest of the buffers
|
||||
}else{
|
||||
$outBuffers[] = $compressor->compress($inBuffer);
|
||||
}
|
||||
}
|
||||
|
||||
$outChannel->synchronized(function() use ($outChannel, $outBuffers) : void{
|
||||
foreach($outBuffers as $outBuffer){
|
||||
$outChannel[] = $outBuffer;
|
||||
}
|
||||
});
|
||||
$sleeperNotifier->wakeupSleeper();
|
||||
}
|
||||
}
|
||||
|
||||
public function quit() : void{
|
||||
$inChannel = $this->inChannel;
|
||||
$inChannel->synchronized(function() use ($inChannel) : void{
|
||||
$inChannel[] = null;
|
||||
$inChannel->notify();
|
||||
});
|
||||
parent::quit();
|
||||
}
|
||||
};
|
||||
$this->thread->setClassLoaders([]); //plugin class loaders are not needed here
|
||||
$this->thread->start();
|
||||
}
|
||||
|
||||
public function submit(string $buffer) : CompressBatchPromise{
|
||||
if($this->shutdown){
|
||||
throw new \LogicException("This worker has been shut down");
|
||||
}
|
||||
$this->inChannel->synchronized(function() use ($buffer) : void{
|
||||
$this->inChannel[] = $buffer;
|
||||
$this->inChannel->notify();
|
||||
});
|
||||
$promise = new CompressBatchPromise();
|
||||
$this->promises->enqueue($promise);
|
||||
return $promise;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param string[] $buffers
|
||||
* @return CompressBatchPromise[]
|
||||
*/
|
||||
public function submitBulk(array $buffers) : array{
|
||||
if($this->shutdown){
|
||||
throw new \LogicException("This worker has been shut down");
|
||||
}
|
||||
$this->inChannel->synchronized(function() use ($buffers) : void{
|
||||
foreach($buffers as $buffer){
|
||||
$this->inChannel[] = $buffer;
|
||||
}
|
||||
$this->inChannel->notify();
|
||||
});
|
||||
$promises = [];
|
||||
foreach($buffers as $k => $buffer){
|
||||
$promise = new CompressBatchPromise();
|
||||
$this->promises->enqueue($promise);
|
||||
$promises[$k] = $promise;
|
||||
}
|
||||
return $promises;
|
||||
}
|
||||
|
||||
private function processResults() : int{
|
||||
if(count($this->promises) === 0){
|
||||
return 0;
|
||||
}
|
||||
|
||||
do{
|
||||
$results = $this->outChannel->synchronized(function() : array{
|
||||
/** @var string[] $results */
|
||||
$results = $this->outChannel->chunk(100, preserve: false);
|
||||
return $results;
|
||||
});
|
||||
foreach($results as $compressed){
|
||||
$promise = $this->promises->dequeue();
|
||||
$promise->resolve($compressed);
|
||||
}
|
||||
}while(count($results) > 0);
|
||||
|
||||
return count($this->promises);
|
||||
}
|
||||
|
||||
public function shutdown() : void{
|
||||
$this->shutdown = true;
|
||||
$this->thread->quit();
|
||||
if($this->processResults() > 0){
|
||||
throw new AssumptionFailedError("All compression work should have been done before shutdown");
|
||||
}
|
||||
$this->sleeperHandler->removeNotifier($this->sleeperNotifierId);
|
||||
}
|
||||
|
||||
public function __destruct(){
|
||||
$this->shutdown();
|
||||
}
|
||||
}
|
91
src/network/mcpe/compression/CompressorWorkerPool.php
Normal file
91
src/network/mcpe/compression/CompressorWorkerPool.php
Normal file
@ -0,0 +1,91 @@
|
||||
<?php
|
||||
|
||||
/*
|
||||
*
|
||||
* ____ _ _ __ __ _ __ __ ____
|
||||
* | _ \ ___ ___| | _____| |_| \/ (_)_ __ ___ | \/ | _ \
|
||||
* | |_) / _ \ / __| |/ / _ \ __| |\/| | | '_ \ / _ \_____| |\/| | |_) |
|
||||
* | __/ (_) | (__| < __/ |_| | | | | | | | __/_____| | | | __/
|
||||
* |_| \___/ \___|_|\_\___|\__|_| |_|_|_| |_|\___| |_| |_|_|
|
||||
*
|
||||
* This program is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU Lesser General Public License as published by
|
||||
* the Free Software Foundation, either version 3 of the License, or
|
||||
* (at your option) any later version.
|
||||
*
|
||||
* @author PocketMine Team
|
||||
* @link http://www.pocketmine.net/
|
||||
*
|
||||
*
|
||||
*/
|
||||
|
||||
declare(strict_types=1);
|
||||
|
||||
namespace pocketmine\network\mcpe\compression;
|
||||
|
||||
use pocketmine\snooze\SleeperHandler;
|
||||
use function array_merge;
|
||||
use function array_slice;
|
||||
use function ceil;
|
||||
use function count;
|
||||
|
||||
final class CompressorWorkerPool{
|
||||
|
||||
/**
|
||||
* @var CompressorWorker[]
|
||||
* @phpstan-var array<int, CompressorWorker>
|
||||
*/
|
||||
private array $workers = [];
|
||||
|
||||
private int $nextWorker = 0;
|
||||
|
||||
public function __construct(
|
||||
private readonly int $maxSize,
|
||||
private readonly Compressor $compressor,
|
||||
private readonly SleeperHandler $sleeperHandler,
|
||||
){}
|
||||
|
||||
public function getCompressor() : Compressor{ return $this->compressor; }
|
||||
|
||||
public function submit(string $buffer) : CompressBatchPromise{
|
||||
$worker = $this->workers[$this->nextWorker] ?? null;
|
||||
if($worker === null){
|
||||
$worker = new CompressorWorker($this->compressor, $this->sleeperHandler);
|
||||
$this->workers[$this->nextWorker] = $worker;
|
||||
}
|
||||
$this->nextWorker = ($this->nextWorker + 1) % $this->maxSize;
|
||||
return $worker->submit($buffer);
|
||||
}
|
||||
|
||||
/**
|
||||
* @param string[] $buffers
|
||||
* @return CompressBatchPromise[]
|
||||
*/
|
||||
public function submitBulk(array $buffers) : array{
|
||||
$splitSize = (int) ceil(count($buffers) / $this->maxSize);
|
||||
|
||||
$results = [];
|
||||
$offset = 0;
|
||||
for($i = 0; $i < $this->maxSize; $i++){
|
||||
$worker = $this->workers[$i] ??= new CompressorWorker($this->compressor, $this->sleeperHandler);
|
||||
|
||||
$results[] = $worker->submitBulk(array_slice($buffers, $offset, $splitSize, true));
|
||||
$offset += $splitSize;
|
||||
if($offset >= count($buffers)){
|
||||
break;
|
||||
}
|
||||
}
|
||||
return array_merge(...$results);
|
||||
}
|
||||
|
||||
public function shutdown() : void{
|
||||
foreach($this->workers as $worker){
|
||||
$worker->shutdown();
|
||||
}
|
||||
$this->workers = [];
|
||||
}
|
||||
|
||||
public function __destruct(){
|
||||
$this->shutdown();
|
||||
}
|
||||
}
|
Reference in New Issue
Block a user