diff --git a/src/Server.php b/src/Server.php index 5b590ccb1..3e4e71397 100644 --- a/src/Server.php +++ b/src/Server.php @@ -48,7 +48,8 @@ use pocketmine\nbt\tag\CompoundTag; use pocketmine\nbt\TreeRoot; use pocketmine\network\mcpe\compression\CompressBatchPromise; use pocketmine\network\mcpe\compression\CompressBatchTask; -use pocketmine\network\mcpe\compression\ZlibCompressor as ZlibNetworkCompression; +use pocketmine\network\mcpe\compression\Compressor; +use pocketmine\network\mcpe\compression\ZlibCompressor; use pocketmine\network\mcpe\encryption\NetworkCipher; use pocketmine\network\mcpe\NetworkSession; use pocketmine\network\mcpe\protocol\serializer\PacketBatch; @@ -922,7 +923,7 @@ class Server{ $this->logger->warning("Invalid network compression level $netCompressionLevel set, setting to default 7"); $netCompressionLevel = 7; } - ZlibNetworkCompression::setInstance(new ZlibNetworkCompression($netCompressionLevel, $netCompressionThreshold, ZlibNetworkCompression::DEFAULT_MAX_DECOMPRESSION_SIZE)); + ZlibCompressor::setInstance(new ZlibCompressor($netCompressionLevel, $netCompressionThreshold, ZlibCompressor::DEFAULT_MAX_DECOMPRESSION_SIZE)); $this->networkCompressionAsync = (bool) $this->getProperty("network.async-compression", true); @@ -1252,16 +1253,28 @@ class Server{ $stream = PacketBatch::fromPackets(...$ev->getPackets()); - if(!ZlibNetworkCompression::getInstance()->willCompress($stream->getBuffer())){ - foreach($recipients as $target){ - foreach($ev->getPackets() as $pk){ - $target->addToSendBuffer($pk); + $compressors = []; + $compressorTargets = []; + foreach($recipients as $recipient){ + $compressor = $recipient->getCompressor(); + $compressorId = spl_object_id($compressor); + //TODO: different compressors might be compatible, it might not be necessary to split them up by object + $compressors[$compressorId] = $compressor; + $compressorTargets[$compressorId][] = $recipient; + } + + foreach($compressors as $compressorId => $compressor){ + if(!$compressor->willCompress($stream->getBuffer())){ + foreach($compressorTargets[$compressorId] as $target){ + foreach($ev->getPackets() as $pk){ + $target->addToSendBuffer($pk); + } + } + }else{ + $promise = $this->prepareBatch($stream, $compressor); + foreach($compressorTargets[$compressorId] as $target){ + $target->queueCompressed($promise); } - } - }else{ - $promise = $this->prepareBatch($stream); - foreach($recipients as $target){ - $target->queueCompressed($promise); } } @@ -1271,11 +1284,10 @@ class Server{ /** * Broadcasts a list of packets in a batch to a list of players */ - public function prepareBatch(PacketBatch $stream, bool $forceSync = false) : CompressBatchPromise{ + public function prepareBatch(PacketBatch $stream, Compressor $compressor, bool $forceSync = false) : CompressBatchPromise{ try{ Timings::$playerNetworkSendCompressTimer->startTiming(); - $compressor = ZlibNetworkCompression::getInstance(); $buffer = $stream->getBuffer(); if(!$compressor->willCompress($buffer)){ $forceSync = true; diff --git a/src/crafting/CraftingManager.php b/src/crafting/CraftingManager.php index 4abb8bc9b..8fec8b922 100644 --- a/src/crafting/CraftingManager.php +++ b/src/crafting/CraftingManager.php @@ -25,7 +25,7 @@ namespace pocketmine\crafting; use pocketmine\item\Item; use pocketmine\network\mcpe\compression\CompressBatchPromise; -use pocketmine\network\mcpe\compression\ZlibCompressor; +use pocketmine\network\mcpe\compression\Compressor; use pocketmine\network\mcpe\convert\TypeConverter; use pocketmine\network\mcpe\protocol\serializer\PacketBatch; use pocketmine\network\mcpe\protocol\CraftingDataPacket; @@ -41,6 +41,7 @@ use function array_map; use function file_get_contents; use function json_decode; use function json_encode; +use function spl_object_id; use function str_repeat; use function usort; use const DIRECTORY_SEPARATOR; @@ -53,8 +54,8 @@ class CraftingManager{ /** @var FurnaceRecipe[] */ protected $furnaceRecipes = []; - /** @var CompressBatchPromise|null */ - private $craftingDataCache; + /** @var CompressBatchPromise[] */ + private $craftingDataCaches = []; public function __construct(){ $this->init(); @@ -98,14 +99,12 @@ class CraftingManager{ break; } } - - $this->buildCraftingDataCache(); } /** * Rebuilds the cached CraftingDataPacket. */ - public function buildCraftingDataCache() : void{ + private function buildCraftingDataCache(Compressor $compressor) : CompressBatchPromise{ Timings::$craftingDataCacheRebuildTimer->startTiming(); $pk = new CraftingDataPacket(); $pk->cleanRecipes = true; @@ -164,21 +163,24 @@ class CraftingManager{ ); } - $this->craftingDataCache = new CompressBatchPromise(); - $this->craftingDataCache->resolve(ZlibCompressor::getInstance()->compress(PacketBatch::fromPackets($pk)->getBuffer())); + $promise = new CompressBatchPromise(); + $promise->resolve($compressor->compress(PacketBatch::fromPackets($pk)->getBuffer())); Timings::$craftingDataCacheRebuildTimer->stopTiming(); + return $promise; } /** * Returns a pre-compressed CraftingDataPacket for sending to players. Rebuilds the cache if it is not found. */ - public function getCraftingDataPacket() : CompressBatchPromise{ - if($this->craftingDataCache === null){ - $this->buildCraftingDataCache(); + public function getCraftingDataPacket(Compressor $compressor) : CompressBatchPromise{ + $compressorId = spl_object_id($compressor); + + if(!isset($this->craftingDataCaches[$compressorId])){ + $this->craftingDataCaches[$compressorId] = $this->buildCraftingDataCache($compressor); } - return $this->craftingDataCache; + return $this->craftingDataCaches[$compressorId]; } /** @@ -253,19 +255,19 @@ class CraftingManager{ public function registerShapedRecipe(ShapedRecipe $recipe) : void{ $this->shapedRecipes[self::hashOutputs($recipe->getResults())][] = $recipe; - $this->craftingDataCache = null; + $this->craftingDataCaches = []; } public function registerShapelessRecipe(ShapelessRecipe $recipe) : void{ $this->shapelessRecipes[self::hashOutputs($recipe->getResults())][] = $recipe; - $this->craftingDataCache = null; + $this->craftingDataCaches = []; } public function registerFurnaceRecipe(FurnaceRecipe $recipe) : void{ $input = $recipe->getInput(); $this->furnaceRecipes[$input->getId() . ":" . ($input->hasAnyDamageValue() ? "?" : $input->getMeta())] = $recipe; - $this->craftingDataCache = null; + $this->craftingDataCaches = []; } /** diff --git a/src/network/mcpe/ChunkCache.php b/src/network/mcpe/ChunkCache.php index 67d5b8911..399da5c29 100644 --- a/src/network/mcpe/ChunkCache.php +++ b/src/network/mcpe/ChunkCache.php @@ -25,6 +25,7 @@ namespace pocketmine\network\mcpe; use pocketmine\math\Vector3; use pocketmine\network\mcpe\compression\CompressBatchPromise; +use pocketmine\network\mcpe\compression\Compressor; use pocketmine\world\ChunkListener; use pocketmine\world\ChunkListenerNoOpTrait; use pocketmine\world\format\Chunk; @@ -39,7 +40,7 @@ use function strlen; * TODO: this needs a hook for world unloading */ class ChunkCache implements ChunkListener{ - /** @var self[] */ + /** @var self[][] */ private static $instances = []; /** @@ -47,12 +48,20 @@ class ChunkCache implements ChunkListener{ * * @return ChunkCache */ - public static function getInstance(World $world) : self{ - return self::$instances[spl_object_id($world)] ?? (self::$instances[spl_object_id($world)] = new self($world)); + public static function getInstance(World $world, Compressor $compressor) : self{ + $worldId = spl_object_id($world); + $compressorId = spl_object_id($compressor); + if(!isset(self::$instances[$worldId][$compressorId])){ + \GlobalLogger::get()->debug("Created new chunk packet cache (world#$worldId, compressor#$compressorId)"); + self::$instances[$worldId][$compressorId] = new self($world, $compressor); + } + return self::$instances[$worldId][$compressorId]; } /** @var World */ private $world; + /** @var Compressor */ + private $compressor; /** @var CompressBatchPromise[] */ private $caches = []; @@ -62,8 +71,9 @@ class ChunkCache implements ChunkListener{ /** @var int */ private $misses = 0; - private function __construct(World $world){ + private function __construct(World $world, Compressor $compressor){ $this->world = $world; + $this->compressor = $compressor; } /** @@ -92,6 +102,7 @@ class ChunkCache implements ChunkListener{ $chunkZ, $this->world->getChunk($chunkX, $chunkZ), $this->caches[$chunkHash], + $this->compressor, function() use ($chunkX, $chunkZ) : void{ $this->world->getLogger()->error("Failed preparing chunk $chunkX $chunkZ, retrying"); diff --git a/src/network/mcpe/ChunkRequestTask.php b/src/network/mcpe/ChunkRequestTask.php index 25e10bda9..a8b135d97 100644 --- a/src/network/mcpe/ChunkRequestTask.php +++ b/src/network/mcpe/ChunkRequestTask.php @@ -24,7 +24,7 @@ declare(strict_types=1); namespace pocketmine\network\mcpe; use pocketmine\network\mcpe\compression\CompressBatchPromise; -use pocketmine\network\mcpe\compression\ZlibCompressor; +use pocketmine\network\mcpe\compression\Compressor; use pocketmine\network\mcpe\protocol\LevelChunkPacket; use pocketmine\network\mcpe\protocol\serializer\PacketBatch; use pocketmine\network\mcpe\serializer\ChunkSerializer; @@ -43,7 +43,7 @@ class ChunkRequestTask extends AsyncTask{ /** @var int */ protected $chunkZ; - /** @var ZlibCompressor */ + /** @var Compressor */ protected $compressor; /** @var string */ @@ -52,8 +52,8 @@ class ChunkRequestTask extends AsyncTask{ /** * @phpstan-param (\Closure() : void)|null $onError */ - public function __construct(int $chunkX, int $chunkZ, Chunk $chunk, CompressBatchPromise $promise, ?\Closure $onError = null){ - $this->compressor = ZlibCompressor::getInstance(); //TODO: this should be injectable + public function __construct(int $chunkX, int $chunkZ, Chunk $chunk, CompressBatchPromise $promise, Compressor $compressor, ?\Closure $onError = null){ + $this->compressor = $compressor; $this->chunk = FastChunkSerializer::serializeWithoutLight($chunk); $this->chunkX = $chunkX; diff --git a/src/network/mcpe/NetworkSession.php b/src/network/mcpe/NetworkSession.php index dec765ac1..aafdf7e39 100644 --- a/src/network/mcpe/NetworkSession.php +++ b/src/network/mcpe/NetworkSession.php @@ -35,8 +35,8 @@ use pocketmine\form\Form; use pocketmine\math\Vector3; use pocketmine\network\BadPacketException; use pocketmine\network\mcpe\compression\CompressBatchPromise; +use pocketmine\network\mcpe\compression\Compressor; use pocketmine\network\mcpe\compression\DecompressionException; -use pocketmine\network\mcpe\compression\ZlibCompressor; use pocketmine\network\mcpe\convert\SkinAdapterSingleton; use pocketmine\network\mcpe\convert\TypeConverter; use pocketmine\network\mcpe\encryption\DecryptionException; @@ -147,6 +147,8 @@ class NetworkSession{ /** @var \SplQueue|CompressBatchPromise[] */ private $compressedQueue; + /** @var Compressor */ + private $compressor; /** @var InventoryManager|null */ private $invManager = null; @@ -154,7 +156,7 @@ class NetworkSession{ /** @var PacketSender */ private $sender; - public function __construct(Server $server, NetworkSessionManager $manager, PacketSender $sender, string $ip, int $port){ + public function __construct(Server $server, NetworkSessionManager $manager, PacketSender $sender, Compressor $compressor, string $ip, int $port){ $this->server = $server; $this->manager = $manager; $this->sender = $sender; @@ -164,6 +166,7 @@ class NetworkSession{ $this->logger = new \PrefixedLogger($this->server->getLogger(), $this->getLogPrefix()); $this->compressedQueue = new \SplQueue(); + $this->compressor = $compressor; $this->connectTime = time(); @@ -283,7 +286,7 @@ class NetworkSession{ Timings::$playerNetworkReceiveDecompressTimer->startTiming(); try{ - $stream = new PacketBatch(ZlibCompressor::getInstance()->decompress($payload)); //TODO: make this dynamic + $stream = new PacketBatch($this->compressor->decompress($payload)); }catch(DecompressionException $e){ $this->logger->debug("Failed to decompress packet: " . base64_encode($payload)); //TODO: this isn't incompatible game version if we already established protocol version @@ -395,12 +398,16 @@ class NetworkSession{ private function flushSendBuffer(bool $immediate = false) : void{ if($this->sendBuffer !== null){ - $promise = $this->server->prepareBatch($this->sendBuffer, $immediate); + $promise = $this->server->prepareBatch($this->sendBuffer, $this->compressor, $immediate); $this->sendBuffer = null; $this->queueCompressed($promise, $immediate); } } + public function getCompressor() : Compressor{ + return $this->compressor; + } + public function queueCompressed(CompressBatchPromise $payload, bool $immediate = false) : void{ $this->flushSendBuffer($immediate); //Maintain ordering if possible if($immediate){ @@ -757,7 +764,7 @@ class NetworkSession{ $world = $this->player->getLocation()->getWorld(); assert($world !== null); - ChunkCache::getInstance($world)->request($chunkX, $chunkZ)->onResolve( + ChunkCache::getInstance($world, $this->compressor)->request($chunkX, $chunkZ)->onResolve( //this callback may be called synchronously or asynchronously, depending on whether the promise is resolved yet function(CompressBatchPromise $promise) use ($world, $chunkX, $chunkZ, $onCompletion) : void{ diff --git a/src/network/mcpe/compression/CompressBatchTask.php b/src/network/mcpe/compression/CompressBatchTask.php index 05ce2ce2d..d13546b3a 100644 --- a/src/network/mcpe/compression/CompressBatchTask.php +++ b/src/network/mcpe/compression/CompressBatchTask.php @@ -31,10 +31,10 @@ class CompressBatchTask extends AsyncTask{ /** @var string */ private $data; - /** @var ZlibCompressor */ + /** @var Compressor */ private $compressor; - public function __construct(string $data, CompressBatchPromise $promise, ZlibCompressor $compressor){ + public function __construct(string $data, CompressBatchPromise $promise, Compressor $compressor){ $this->data = $data; $this->compressor = $compressor; $this->storeLocal(self::TLS_KEY_PROMISE, $promise); diff --git a/src/network/mcpe/compression/Compressor.php b/src/network/mcpe/compression/Compressor.php new file mode 100644 index 000000000..bbc8fac4f --- /dev/null +++ b/src/network/mcpe/compression/Compressor.php @@ -0,0 +1,36 @@ +session->getInvManager()->syncAll(); $this->session->getInvManager()->syncCreative(); $this->session->getInvManager()->syncSelectedHotbarSlot(); - $this->session->queueCompressed($this->server->getCraftingManager()->getCraftingDataPacket()); + $this->session->queueCompressed($this->server->getCraftingManager()->getCraftingDataPacket($this->session->getCompressor())); $this->session->syncPlayerList(); } diff --git a/src/network/mcpe/raklib/RakLibInterface.php b/src/network/mcpe/raklib/RakLibInterface.php index 0906390d1..8321e5768 100644 --- a/src/network/mcpe/raklib/RakLibInterface.php +++ b/src/network/mcpe/raklib/RakLibInterface.php @@ -25,6 +25,7 @@ namespace pocketmine\network\mcpe\raklib; use pocketmine\network\AdvancedNetworkInterface; use pocketmine\network\BadPacketException; +use pocketmine\network\mcpe\compression\ZlibCompressor; use pocketmine\network\mcpe\NetworkSession; use pocketmine\network\mcpe\protocol\ProtocolInfo; use pocketmine\network\Network; @@ -152,7 +153,14 @@ class RakLibInterface implements ServerEventListener, AdvancedNetworkInterface{ } public function openSession(int $sessionId, string $address, int $port, int $clientID) : void{ - $session = new NetworkSession($this->server, $this->network->getSessionManager(), new RakLibPacketSender($sessionId, $this), $address, $port); + $session = new NetworkSession( + $this->server, + $this->network->getSessionManager(), + new RakLibPacketSender($sessionId, $this), + ZlibCompressor::getInstance(), //TODO: this shouldn't be hardcoded, but we might need the RakNet protocol version to select it + $address, + $port + ); $this->sessions[$sessionId] = $session; }