From 3be9548b1ea240cdf352fa08a603cc3729f88682 Mon Sep 17 00:00:00 2001 From: "Dylan K. Taylor" Date: Tue, 28 Apr 2020 16:21:18 +0100 Subject: [PATCH] net: compressors are now fully dynamic (or at least the potential to be) the compressor used by RakLibInterface when opening a session is still hardcoded, but that's because we have no way to select the correct compressor at that point in the login sequence, since we aren't propagating the protocol information up from RakLib right now. --- src/Server.php | 38 ++++++++++++------- src/crafting/CraftingManager.php | 32 ++++++++-------- src/network/mcpe/ChunkCache.php | 19 ++++++++-- src/network/mcpe/ChunkRequestTask.php | 8 ++-- src/network/mcpe/NetworkSession.php | 17 ++++++--- .../mcpe/compression/CompressBatchTask.php | 4 +- src/network/mcpe/compression/Compressor.php | 36 ++++++++++++++++++ .../mcpe/compression/ZlibCompressor.php | 2 +- .../mcpe/handler/PreSpawnPacketHandler.php | 2 +- src/network/mcpe/raklib/RakLibInterface.php | 10 ++++- 10 files changed, 122 insertions(+), 46 deletions(-) create mode 100644 src/network/mcpe/compression/Compressor.php diff --git a/src/Server.php b/src/Server.php index 5b590ccb1..3e4e71397 100644 --- a/src/Server.php +++ b/src/Server.php @@ -48,7 +48,8 @@ use pocketmine\nbt\tag\CompoundTag; use pocketmine\nbt\TreeRoot; use pocketmine\network\mcpe\compression\CompressBatchPromise; use pocketmine\network\mcpe\compression\CompressBatchTask; -use pocketmine\network\mcpe\compression\ZlibCompressor as ZlibNetworkCompression; +use pocketmine\network\mcpe\compression\Compressor; +use pocketmine\network\mcpe\compression\ZlibCompressor; use pocketmine\network\mcpe\encryption\NetworkCipher; use pocketmine\network\mcpe\NetworkSession; use pocketmine\network\mcpe\protocol\serializer\PacketBatch; @@ -922,7 +923,7 @@ class Server{ $this->logger->warning("Invalid network compression level $netCompressionLevel set, setting to default 7"); $netCompressionLevel = 7; } - ZlibNetworkCompression::setInstance(new ZlibNetworkCompression($netCompressionLevel, $netCompressionThreshold, ZlibNetworkCompression::DEFAULT_MAX_DECOMPRESSION_SIZE)); + ZlibCompressor::setInstance(new ZlibCompressor($netCompressionLevel, $netCompressionThreshold, ZlibCompressor::DEFAULT_MAX_DECOMPRESSION_SIZE)); $this->networkCompressionAsync = (bool) $this->getProperty("network.async-compression", true); @@ -1252,16 +1253,28 @@ class Server{ $stream = PacketBatch::fromPackets(...$ev->getPackets()); - if(!ZlibNetworkCompression::getInstance()->willCompress($stream->getBuffer())){ - foreach($recipients as $target){ - foreach($ev->getPackets() as $pk){ - $target->addToSendBuffer($pk); + $compressors = []; + $compressorTargets = []; + foreach($recipients as $recipient){ + $compressor = $recipient->getCompressor(); + $compressorId = spl_object_id($compressor); + //TODO: different compressors might be compatible, it might not be necessary to split them up by object + $compressors[$compressorId] = $compressor; + $compressorTargets[$compressorId][] = $recipient; + } + + foreach($compressors as $compressorId => $compressor){ + if(!$compressor->willCompress($stream->getBuffer())){ + foreach($compressorTargets[$compressorId] as $target){ + foreach($ev->getPackets() as $pk){ + $target->addToSendBuffer($pk); + } + } + }else{ + $promise = $this->prepareBatch($stream, $compressor); + foreach($compressorTargets[$compressorId] as $target){ + $target->queueCompressed($promise); } - } - }else{ - $promise = $this->prepareBatch($stream); - foreach($recipients as $target){ - $target->queueCompressed($promise); } } @@ -1271,11 +1284,10 @@ class Server{ /** * Broadcasts a list of packets in a batch to a list of players */ - public function prepareBatch(PacketBatch $stream, bool $forceSync = false) : CompressBatchPromise{ + public function prepareBatch(PacketBatch $stream, Compressor $compressor, bool $forceSync = false) : CompressBatchPromise{ try{ Timings::$playerNetworkSendCompressTimer->startTiming(); - $compressor = ZlibNetworkCompression::getInstance(); $buffer = $stream->getBuffer(); if(!$compressor->willCompress($buffer)){ $forceSync = true; diff --git a/src/crafting/CraftingManager.php b/src/crafting/CraftingManager.php index 4abb8bc9b..8fec8b922 100644 --- a/src/crafting/CraftingManager.php +++ b/src/crafting/CraftingManager.php @@ -25,7 +25,7 @@ namespace pocketmine\crafting; use pocketmine\item\Item; use pocketmine\network\mcpe\compression\CompressBatchPromise; -use pocketmine\network\mcpe\compression\ZlibCompressor; +use pocketmine\network\mcpe\compression\Compressor; use pocketmine\network\mcpe\convert\TypeConverter; use pocketmine\network\mcpe\protocol\serializer\PacketBatch; use pocketmine\network\mcpe\protocol\CraftingDataPacket; @@ -41,6 +41,7 @@ use function array_map; use function file_get_contents; use function json_decode; use function json_encode; +use function spl_object_id; use function str_repeat; use function usort; use const DIRECTORY_SEPARATOR; @@ -53,8 +54,8 @@ class CraftingManager{ /** @var FurnaceRecipe[] */ protected $furnaceRecipes = []; - /** @var CompressBatchPromise|null */ - private $craftingDataCache; + /** @var CompressBatchPromise[] */ + private $craftingDataCaches = []; public function __construct(){ $this->init(); @@ -98,14 +99,12 @@ class CraftingManager{ break; } } - - $this->buildCraftingDataCache(); } /** * Rebuilds the cached CraftingDataPacket. */ - public function buildCraftingDataCache() : void{ + private function buildCraftingDataCache(Compressor $compressor) : CompressBatchPromise{ Timings::$craftingDataCacheRebuildTimer->startTiming(); $pk = new CraftingDataPacket(); $pk->cleanRecipes = true; @@ -164,21 +163,24 @@ class CraftingManager{ ); } - $this->craftingDataCache = new CompressBatchPromise(); - $this->craftingDataCache->resolve(ZlibCompressor::getInstance()->compress(PacketBatch::fromPackets($pk)->getBuffer())); + $promise = new CompressBatchPromise(); + $promise->resolve($compressor->compress(PacketBatch::fromPackets($pk)->getBuffer())); Timings::$craftingDataCacheRebuildTimer->stopTiming(); + return $promise; } /** * Returns a pre-compressed CraftingDataPacket for sending to players. Rebuilds the cache if it is not found. */ - public function getCraftingDataPacket() : CompressBatchPromise{ - if($this->craftingDataCache === null){ - $this->buildCraftingDataCache(); + public function getCraftingDataPacket(Compressor $compressor) : CompressBatchPromise{ + $compressorId = spl_object_id($compressor); + + if(!isset($this->craftingDataCaches[$compressorId])){ + $this->craftingDataCaches[$compressorId] = $this->buildCraftingDataCache($compressor); } - return $this->craftingDataCache; + return $this->craftingDataCaches[$compressorId]; } /** @@ -253,19 +255,19 @@ class CraftingManager{ public function registerShapedRecipe(ShapedRecipe $recipe) : void{ $this->shapedRecipes[self::hashOutputs($recipe->getResults())][] = $recipe; - $this->craftingDataCache = null; + $this->craftingDataCaches = []; } public function registerShapelessRecipe(ShapelessRecipe $recipe) : void{ $this->shapelessRecipes[self::hashOutputs($recipe->getResults())][] = $recipe; - $this->craftingDataCache = null; + $this->craftingDataCaches = []; } public function registerFurnaceRecipe(FurnaceRecipe $recipe) : void{ $input = $recipe->getInput(); $this->furnaceRecipes[$input->getId() . ":" . ($input->hasAnyDamageValue() ? "?" : $input->getMeta())] = $recipe; - $this->craftingDataCache = null; + $this->craftingDataCaches = []; } /** diff --git a/src/network/mcpe/ChunkCache.php b/src/network/mcpe/ChunkCache.php index 67d5b8911..399da5c29 100644 --- a/src/network/mcpe/ChunkCache.php +++ b/src/network/mcpe/ChunkCache.php @@ -25,6 +25,7 @@ namespace pocketmine\network\mcpe; use pocketmine\math\Vector3; use pocketmine\network\mcpe\compression\CompressBatchPromise; +use pocketmine\network\mcpe\compression\Compressor; use pocketmine\world\ChunkListener; use pocketmine\world\ChunkListenerNoOpTrait; use pocketmine\world\format\Chunk; @@ -39,7 +40,7 @@ use function strlen; * TODO: this needs a hook for world unloading */ class ChunkCache implements ChunkListener{ - /** @var self[] */ + /** @var self[][] */ private static $instances = []; /** @@ -47,12 +48,20 @@ class ChunkCache implements ChunkListener{ * * @return ChunkCache */ - public static function getInstance(World $world) : self{ - return self::$instances[spl_object_id($world)] ?? (self::$instances[spl_object_id($world)] = new self($world)); + public static function getInstance(World $world, Compressor $compressor) : self{ + $worldId = spl_object_id($world); + $compressorId = spl_object_id($compressor); + if(!isset(self::$instances[$worldId][$compressorId])){ + \GlobalLogger::get()->debug("Created new chunk packet cache (world#$worldId, compressor#$compressorId)"); + self::$instances[$worldId][$compressorId] = new self($world, $compressor); + } + return self::$instances[$worldId][$compressorId]; } /** @var World */ private $world; + /** @var Compressor */ + private $compressor; /** @var CompressBatchPromise[] */ private $caches = []; @@ -62,8 +71,9 @@ class ChunkCache implements ChunkListener{ /** @var int */ private $misses = 0; - private function __construct(World $world){ + private function __construct(World $world, Compressor $compressor){ $this->world = $world; + $this->compressor = $compressor; } /** @@ -92,6 +102,7 @@ class ChunkCache implements ChunkListener{ $chunkZ, $this->world->getChunk($chunkX, $chunkZ), $this->caches[$chunkHash], + $this->compressor, function() use ($chunkX, $chunkZ) : void{ $this->world->getLogger()->error("Failed preparing chunk $chunkX $chunkZ, retrying"); diff --git a/src/network/mcpe/ChunkRequestTask.php b/src/network/mcpe/ChunkRequestTask.php index 25e10bda9..a8b135d97 100644 --- a/src/network/mcpe/ChunkRequestTask.php +++ b/src/network/mcpe/ChunkRequestTask.php @@ -24,7 +24,7 @@ declare(strict_types=1); namespace pocketmine\network\mcpe; use pocketmine\network\mcpe\compression\CompressBatchPromise; -use pocketmine\network\mcpe\compression\ZlibCompressor; +use pocketmine\network\mcpe\compression\Compressor; use pocketmine\network\mcpe\protocol\LevelChunkPacket; use pocketmine\network\mcpe\protocol\serializer\PacketBatch; use pocketmine\network\mcpe\serializer\ChunkSerializer; @@ -43,7 +43,7 @@ class ChunkRequestTask extends AsyncTask{ /** @var int */ protected $chunkZ; - /** @var ZlibCompressor */ + /** @var Compressor */ protected $compressor; /** @var string */ @@ -52,8 +52,8 @@ class ChunkRequestTask extends AsyncTask{ /** * @phpstan-param (\Closure() : void)|null $onError */ - public function __construct(int $chunkX, int $chunkZ, Chunk $chunk, CompressBatchPromise $promise, ?\Closure $onError = null){ - $this->compressor = ZlibCompressor::getInstance(); //TODO: this should be injectable + public function __construct(int $chunkX, int $chunkZ, Chunk $chunk, CompressBatchPromise $promise, Compressor $compressor, ?\Closure $onError = null){ + $this->compressor = $compressor; $this->chunk = FastChunkSerializer::serializeWithoutLight($chunk); $this->chunkX = $chunkX; diff --git a/src/network/mcpe/NetworkSession.php b/src/network/mcpe/NetworkSession.php index dec765ac1..aafdf7e39 100644 --- a/src/network/mcpe/NetworkSession.php +++ b/src/network/mcpe/NetworkSession.php @@ -35,8 +35,8 @@ use pocketmine\form\Form; use pocketmine\math\Vector3; use pocketmine\network\BadPacketException; use pocketmine\network\mcpe\compression\CompressBatchPromise; +use pocketmine\network\mcpe\compression\Compressor; use pocketmine\network\mcpe\compression\DecompressionException; -use pocketmine\network\mcpe\compression\ZlibCompressor; use pocketmine\network\mcpe\convert\SkinAdapterSingleton; use pocketmine\network\mcpe\convert\TypeConverter; use pocketmine\network\mcpe\encryption\DecryptionException; @@ -147,6 +147,8 @@ class NetworkSession{ /** @var \SplQueue|CompressBatchPromise[] */ private $compressedQueue; + /** @var Compressor */ + private $compressor; /** @var InventoryManager|null */ private $invManager = null; @@ -154,7 +156,7 @@ class NetworkSession{ /** @var PacketSender */ private $sender; - public function __construct(Server $server, NetworkSessionManager $manager, PacketSender $sender, string $ip, int $port){ + public function __construct(Server $server, NetworkSessionManager $manager, PacketSender $sender, Compressor $compressor, string $ip, int $port){ $this->server = $server; $this->manager = $manager; $this->sender = $sender; @@ -164,6 +166,7 @@ class NetworkSession{ $this->logger = new \PrefixedLogger($this->server->getLogger(), $this->getLogPrefix()); $this->compressedQueue = new \SplQueue(); + $this->compressor = $compressor; $this->connectTime = time(); @@ -283,7 +286,7 @@ class NetworkSession{ Timings::$playerNetworkReceiveDecompressTimer->startTiming(); try{ - $stream = new PacketBatch(ZlibCompressor::getInstance()->decompress($payload)); //TODO: make this dynamic + $stream = new PacketBatch($this->compressor->decompress($payload)); }catch(DecompressionException $e){ $this->logger->debug("Failed to decompress packet: " . base64_encode($payload)); //TODO: this isn't incompatible game version if we already established protocol version @@ -395,12 +398,16 @@ class NetworkSession{ private function flushSendBuffer(bool $immediate = false) : void{ if($this->sendBuffer !== null){ - $promise = $this->server->prepareBatch($this->sendBuffer, $immediate); + $promise = $this->server->prepareBatch($this->sendBuffer, $this->compressor, $immediate); $this->sendBuffer = null; $this->queueCompressed($promise, $immediate); } } + public function getCompressor() : Compressor{ + return $this->compressor; + } + public function queueCompressed(CompressBatchPromise $payload, bool $immediate = false) : void{ $this->flushSendBuffer($immediate); //Maintain ordering if possible if($immediate){ @@ -757,7 +764,7 @@ class NetworkSession{ $world = $this->player->getLocation()->getWorld(); assert($world !== null); - ChunkCache::getInstance($world)->request($chunkX, $chunkZ)->onResolve( + ChunkCache::getInstance($world, $this->compressor)->request($chunkX, $chunkZ)->onResolve( //this callback may be called synchronously or asynchronously, depending on whether the promise is resolved yet function(CompressBatchPromise $promise) use ($world, $chunkX, $chunkZ, $onCompletion) : void{ diff --git a/src/network/mcpe/compression/CompressBatchTask.php b/src/network/mcpe/compression/CompressBatchTask.php index 05ce2ce2d..d13546b3a 100644 --- a/src/network/mcpe/compression/CompressBatchTask.php +++ b/src/network/mcpe/compression/CompressBatchTask.php @@ -31,10 +31,10 @@ class CompressBatchTask extends AsyncTask{ /** @var string */ private $data; - /** @var ZlibCompressor */ + /** @var Compressor */ private $compressor; - public function __construct(string $data, CompressBatchPromise $promise, ZlibCompressor $compressor){ + public function __construct(string $data, CompressBatchPromise $promise, Compressor $compressor){ $this->data = $data; $this->compressor = $compressor; $this->storeLocal(self::TLS_KEY_PROMISE, $promise); diff --git a/src/network/mcpe/compression/Compressor.php b/src/network/mcpe/compression/Compressor.php new file mode 100644 index 000000000..bbc8fac4f --- /dev/null +++ b/src/network/mcpe/compression/Compressor.php @@ -0,0 +1,36 @@ +session->getInvManager()->syncAll(); $this->session->getInvManager()->syncCreative(); $this->session->getInvManager()->syncSelectedHotbarSlot(); - $this->session->queueCompressed($this->server->getCraftingManager()->getCraftingDataPacket()); + $this->session->queueCompressed($this->server->getCraftingManager()->getCraftingDataPacket($this->session->getCompressor())); $this->session->syncPlayerList(); } diff --git a/src/network/mcpe/raklib/RakLibInterface.php b/src/network/mcpe/raklib/RakLibInterface.php index 0906390d1..8321e5768 100644 --- a/src/network/mcpe/raklib/RakLibInterface.php +++ b/src/network/mcpe/raklib/RakLibInterface.php @@ -25,6 +25,7 @@ namespace pocketmine\network\mcpe\raklib; use pocketmine\network\AdvancedNetworkInterface; use pocketmine\network\BadPacketException; +use pocketmine\network\mcpe\compression\ZlibCompressor; use pocketmine\network\mcpe\NetworkSession; use pocketmine\network\mcpe\protocol\ProtocolInfo; use pocketmine\network\Network; @@ -152,7 +153,14 @@ class RakLibInterface implements ServerEventListener, AdvancedNetworkInterface{ } public function openSession(int $sessionId, string $address, int $port, int $clientID) : void{ - $session = new NetworkSession($this->server, $this->network->getSessionManager(), new RakLibPacketSender($sessionId, $this), $address, $port); + $session = new NetworkSession( + $this->server, + $this->network->getSessionManager(), + new RakLibPacketSender($sessionId, $this), + ZlibCompressor::getInstance(), //TODO: this shouldn't be hardcoded, but we might need the RakNet protocol version to select it + $address, + $port + ); $this->sessions[$sessionId] = $session; }