net: compressors are now fully dynamic (or at least the potential to be)

the compressor used by RakLibInterface when opening a session is still
hardcoded, but that's because we have no way to select the correct
compressor at that point in the login sequence, since we aren't
propagating the protocol information up from RakLib right now.
This commit is contained in:
Dylan K. Taylor 2020-04-28 16:21:18 +01:00
parent d9e4783b24
commit 3be9548b1e
10 changed files with 122 additions and 46 deletions

View File

@ -48,7 +48,8 @@ use pocketmine\nbt\tag\CompoundTag;
use pocketmine\nbt\TreeRoot;
use pocketmine\network\mcpe\compression\CompressBatchPromise;
use pocketmine\network\mcpe\compression\CompressBatchTask;
use pocketmine\network\mcpe\compression\ZlibCompressor as ZlibNetworkCompression;
use pocketmine\network\mcpe\compression\Compressor;
use pocketmine\network\mcpe\compression\ZlibCompressor;
use pocketmine\network\mcpe\encryption\NetworkCipher;
use pocketmine\network\mcpe\NetworkSession;
use pocketmine\network\mcpe\protocol\serializer\PacketBatch;
@ -922,7 +923,7 @@ class Server{
$this->logger->warning("Invalid network compression level $netCompressionLevel set, setting to default 7");
$netCompressionLevel = 7;
}
ZlibNetworkCompression::setInstance(new ZlibNetworkCompression($netCompressionLevel, $netCompressionThreshold, ZlibNetworkCompression::DEFAULT_MAX_DECOMPRESSION_SIZE));
ZlibCompressor::setInstance(new ZlibCompressor($netCompressionLevel, $netCompressionThreshold, ZlibCompressor::DEFAULT_MAX_DECOMPRESSION_SIZE));
$this->networkCompressionAsync = (bool) $this->getProperty("network.async-compression", true);
@ -1252,16 +1253,28 @@ class Server{
$stream = PacketBatch::fromPackets(...$ev->getPackets());
if(!ZlibNetworkCompression::getInstance()->willCompress($stream->getBuffer())){
foreach($recipients as $target){
foreach($ev->getPackets() as $pk){
$target->addToSendBuffer($pk);
$compressors = [];
$compressorTargets = [];
foreach($recipients as $recipient){
$compressor = $recipient->getCompressor();
$compressorId = spl_object_id($compressor);
//TODO: different compressors might be compatible, it might not be necessary to split them up by object
$compressors[$compressorId] = $compressor;
$compressorTargets[$compressorId][] = $recipient;
}
foreach($compressors as $compressorId => $compressor){
if(!$compressor->willCompress($stream->getBuffer())){
foreach($compressorTargets[$compressorId] as $target){
foreach($ev->getPackets() as $pk){
$target->addToSendBuffer($pk);
}
}
}else{
$promise = $this->prepareBatch($stream, $compressor);
foreach($compressorTargets[$compressorId] as $target){
$target->queueCompressed($promise);
}
}
}else{
$promise = $this->prepareBatch($stream);
foreach($recipients as $target){
$target->queueCompressed($promise);
}
}
@ -1271,11 +1284,10 @@ class Server{
/**
* Broadcasts a list of packets in a batch to a list of players
*/
public function prepareBatch(PacketBatch $stream, bool $forceSync = false) : CompressBatchPromise{
public function prepareBatch(PacketBatch $stream, Compressor $compressor, bool $forceSync = false) : CompressBatchPromise{
try{
Timings::$playerNetworkSendCompressTimer->startTiming();
$compressor = ZlibNetworkCompression::getInstance();
$buffer = $stream->getBuffer();
if(!$compressor->willCompress($buffer)){
$forceSync = true;

View File

@ -25,7 +25,7 @@ namespace pocketmine\crafting;
use pocketmine\item\Item;
use pocketmine\network\mcpe\compression\CompressBatchPromise;
use pocketmine\network\mcpe\compression\ZlibCompressor;
use pocketmine\network\mcpe\compression\Compressor;
use pocketmine\network\mcpe\convert\TypeConverter;
use pocketmine\network\mcpe\protocol\serializer\PacketBatch;
use pocketmine\network\mcpe\protocol\CraftingDataPacket;
@ -41,6 +41,7 @@ use function array_map;
use function file_get_contents;
use function json_decode;
use function json_encode;
use function spl_object_id;
use function str_repeat;
use function usort;
use const DIRECTORY_SEPARATOR;
@ -53,8 +54,8 @@ class CraftingManager{
/** @var FurnaceRecipe[] */
protected $furnaceRecipes = [];
/** @var CompressBatchPromise|null */
private $craftingDataCache;
/** @var CompressBatchPromise[] */
private $craftingDataCaches = [];
public function __construct(){
$this->init();
@ -98,14 +99,12 @@ class CraftingManager{
break;
}
}
$this->buildCraftingDataCache();
}
/**
* Rebuilds the cached CraftingDataPacket.
*/
public function buildCraftingDataCache() : void{
private function buildCraftingDataCache(Compressor $compressor) : CompressBatchPromise{
Timings::$craftingDataCacheRebuildTimer->startTiming();
$pk = new CraftingDataPacket();
$pk->cleanRecipes = true;
@ -164,21 +163,24 @@ class CraftingManager{
);
}
$this->craftingDataCache = new CompressBatchPromise();
$this->craftingDataCache->resolve(ZlibCompressor::getInstance()->compress(PacketBatch::fromPackets($pk)->getBuffer()));
$promise = new CompressBatchPromise();
$promise->resolve($compressor->compress(PacketBatch::fromPackets($pk)->getBuffer()));
Timings::$craftingDataCacheRebuildTimer->stopTiming();
return $promise;
}
/**
* Returns a pre-compressed CraftingDataPacket for sending to players. Rebuilds the cache if it is not found.
*/
public function getCraftingDataPacket() : CompressBatchPromise{
if($this->craftingDataCache === null){
$this->buildCraftingDataCache();
public function getCraftingDataPacket(Compressor $compressor) : CompressBatchPromise{
$compressorId = spl_object_id($compressor);
if(!isset($this->craftingDataCaches[$compressorId])){
$this->craftingDataCaches[$compressorId] = $this->buildCraftingDataCache($compressor);
}
return $this->craftingDataCache;
return $this->craftingDataCaches[$compressorId];
}
/**
@ -253,19 +255,19 @@ class CraftingManager{
public function registerShapedRecipe(ShapedRecipe $recipe) : void{
$this->shapedRecipes[self::hashOutputs($recipe->getResults())][] = $recipe;
$this->craftingDataCache = null;
$this->craftingDataCaches = [];
}
public function registerShapelessRecipe(ShapelessRecipe $recipe) : void{
$this->shapelessRecipes[self::hashOutputs($recipe->getResults())][] = $recipe;
$this->craftingDataCache = null;
$this->craftingDataCaches = [];
}
public function registerFurnaceRecipe(FurnaceRecipe $recipe) : void{
$input = $recipe->getInput();
$this->furnaceRecipes[$input->getId() . ":" . ($input->hasAnyDamageValue() ? "?" : $input->getMeta())] = $recipe;
$this->craftingDataCache = null;
$this->craftingDataCaches = [];
}
/**

View File

@ -25,6 +25,7 @@ namespace pocketmine\network\mcpe;
use pocketmine\math\Vector3;
use pocketmine\network\mcpe\compression\CompressBatchPromise;
use pocketmine\network\mcpe\compression\Compressor;
use pocketmine\world\ChunkListener;
use pocketmine\world\ChunkListenerNoOpTrait;
use pocketmine\world\format\Chunk;
@ -39,7 +40,7 @@ use function strlen;
* TODO: this needs a hook for world unloading
*/
class ChunkCache implements ChunkListener{
/** @var self[] */
/** @var self[][] */
private static $instances = [];
/**
@ -47,12 +48,20 @@ class ChunkCache implements ChunkListener{
*
* @return ChunkCache
*/
public static function getInstance(World $world) : self{
return self::$instances[spl_object_id($world)] ?? (self::$instances[spl_object_id($world)] = new self($world));
public static function getInstance(World $world, Compressor $compressor) : self{
$worldId = spl_object_id($world);
$compressorId = spl_object_id($compressor);
if(!isset(self::$instances[$worldId][$compressorId])){
\GlobalLogger::get()->debug("Created new chunk packet cache (world#$worldId, compressor#$compressorId)");
self::$instances[$worldId][$compressorId] = new self($world, $compressor);
}
return self::$instances[$worldId][$compressorId];
}
/** @var World */
private $world;
/** @var Compressor */
private $compressor;
/** @var CompressBatchPromise[] */
private $caches = [];
@ -62,8 +71,9 @@ class ChunkCache implements ChunkListener{
/** @var int */
private $misses = 0;
private function __construct(World $world){
private function __construct(World $world, Compressor $compressor){
$this->world = $world;
$this->compressor = $compressor;
}
/**
@ -92,6 +102,7 @@ class ChunkCache implements ChunkListener{
$chunkZ,
$this->world->getChunk($chunkX, $chunkZ),
$this->caches[$chunkHash],
$this->compressor,
function() use ($chunkX, $chunkZ) : void{
$this->world->getLogger()->error("Failed preparing chunk $chunkX $chunkZ, retrying");

View File

@ -24,7 +24,7 @@ declare(strict_types=1);
namespace pocketmine\network\mcpe;
use pocketmine\network\mcpe\compression\CompressBatchPromise;
use pocketmine\network\mcpe\compression\ZlibCompressor;
use pocketmine\network\mcpe\compression\Compressor;
use pocketmine\network\mcpe\protocol\LevelChunkPacket;
use pocketmine\network\mcpe\protocol\serializer\PacketBatch;
use pocketmine\network\mcpe\serializer\ChunkSerializer;
@ -43,7 +43,7 @@ class ChunkRequestTask extends AsyncTask{
/** @var int */
protected $chunkZ;
/** @var ZlibCompressor */
/** @var Compressor */
protected $compressor;
/** @var string */
@ -52,8 +52,8 @@ class ChunkRequestTask extends AsyncTask{
/**
* @phpstan-param (\Closure() : void)|null $onError
*/
public function __construct(int $chunkX, int $chunkZ, Chunk $chunk, CompressBatchPromise $promise, ?\Closure $onError = null){
$this->compressor = ZlibCompressor::getInstance(); //TODO: this should be injectable
public function __construct(int $chunkX, int $chunkZ, Chunk $chunk, CompressBatchPromise $promise, Compressor $compressor, ?\Closure $onError = null){
$this->compressor = $compressor;
$this->chunk = FastChunkSerializer::serializeWithoutLight($chunk);
$this->chunkX = $chunkX;

View File

@ -35,8 +35,8 @@ use pocketmine\form\Form;
use pocketmine\math\Vector3;
use pocketmine\network\BadPacketException;
use pocketmine\network\mcpe\compression\CompressBatchPromise;
use pocketmine\network\mcpe\compression\Compressor;
use pocketmine\network\mcpe\compression\DecompressionException;
use pocketmine\network\mcpe\compression\ZlibCompressor;
use pocketmine\network\mcpe\convert\SkinAdapterSingleton;
use pocketmine\network\mcpe\convert\TypeConverter;
use pocketmine\network\mcpe\encryption\DecryptionException;
@ -147,6 +147,8 @@ class NetworkSession{
/** @var \SplQueue|CompressBatchPromise[] */
private $compressedQueue;
/** @var Compressor */
private $compressor;
/** @var InventoryManager|null */
private $invManager = null;
@ -154,7 +156,7 @@ class NetworkSession{
/** @var PacketSender */
private $sender;
public function __construct(Server $server, NetworkSessionManager $manager, PacketSender $sender, string $ip, int $port){
public function __construct(Server $server, NetworkSessionManager $manager, PacketSender $sender, Compressor $compressor, string $ip, int $port){
$this->server = $server;
$this->manager = $manager;
$this->sender = $sender;
@ -164,6 +166,7 @@ class NetworkSession{
$this->logger = new \PrefixedLogger($this->server->getLogger(), $this->getLogPrefix());
$this->compressedQueue = new \SplQueue();
$this->compressor = $compressor;
$this->connectTime = time();
@ -283,7 +286,7 @@ class NetworkSession{
Timings::$playerNetworkReceiveDecompressTimer->startTiming();
try{
$stream = new PacketBatch(ZlibCompressor::getInstance()->decompress($payload)); //TODO: make this dynamic
$stream = new PacketBatch($this->compressor->decompress($payload));
}catch(DecompressionException $e){
$this->logger->debug("Failed to decompress packet: " . base64_encode($payload));
//TODO: this isn't incompatible game version if we already established protocol version
@ -395,12 +398,16 @@ class NetworkSession{
private function flushSendBuffer(bool $immediate = false) : void{
if($this->sendBuffer !== null){
$promise = $this->server->prepareBatch($this->sendBuffer, $immediate);
$promise = $this->server->prepareBatch($this->sendBuffer, $this->compressor, $immediate);
$this->sendBuffer = null;
$this->queueCompressed($promise, $immediate);
}
}
public function getCompressor() : Compressor{
return $this->compressor;
}
public function queueCompressed(CompressBatchPromise $payload, bool $immediate = false) : void{
$this->flushSendBuffer($immediate); //Maintain ordering if possible
if($immediate){
@ -757,7 +764,7 @@ class NetworkSession{
$world = $this->player->getLocation()->getWorld();
assert($world !== null);
ChunkCache::getInstance($world)->request($chunkX, $chunkZ)->onResolve(
ChunkCache::getInstance($world, $this->compressor)->request($chunkX, $chunkZ)->onResolve(
//this callback may be called synchronously or asynchronously, depending on whether the promise is resolved yet
function(CompressBatchPromise $promise) use ($world, $chunkX, $chunkZ, $onCompletion) : void{

View File

@ -31,10 +31,10 @@ class CompressBatchTask extends AsyncTask{
/** @var string */
private $data;
/** @var ZlibCompressor */
/** @var Compressor */
private $compressor;
public function __construct(string $data, CompressBatchPromise $promise, ZlibCompressor $compressor){
public function __construct(string $data, CompressBatchPromise $promise, Compressor $compressor){
$this->data = $data;
$this->compressor = $compressor;
$this->storeLocal(self::TLS_KEY_PROMISE, $promise);

View File

@ -0,0 +1,36 @@
<?php
/*
*
* ____ _ _ __ __ _ __ __ ____
* | _ \ ___ ___| | _____| |_| \/ (_)_ __ ___ | \/ | _ \
* | |_) / _ \ / __| |/ / _ \ __| |\/| | | '_ \ / _ \_____| |\/| | |_) |
* | __/ (_) | (__| < __/ |_| | | | | | | | __/_____| | | | __/
* |_| \___/ \___|_|\_\___|\__|_| |_|_|_| |_|\___| |_| |_|_|
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* @author PocketMine Team
* @link http://www.pocketmine.net/
*
*
*/
declare(strict_types=1);
namespace pocketmine\network\mcpe\compression;
interface Compressor{
public function willCompress(string $data) : bool;
/**
* @throws DecompressionException
*/
public function decompress(string $payload) : string;
public function compress(string $payload) : string;
}

View File

@ -29,7 +29,7 @@ use function zlib_decode;
use function zlib_encode;
use const ZLIB_ENCODING_DEFLATE;
final class ZlibCompressor{
final class ZlibCompressor implements Compressor{
use SingletonTrait;
public const DEFAULT_LEVEL = 7;

View File

@ -98,7 +98,7 @@ class PreSpawnPacketHandler extends PacketHandler{
$this->session->getInvManager()->syncAll();
$this->session->getInvManager()->syncCreative();
$this->session->getInvManager()->syncSelectedHotbarSlot();
$this->session->queueCompressed($this->server->getCraftingManager()->getCraftingDataPacket());
$this->session->queueCompressed($this->server->getCraftingManager()->getCraftingDataPacket($this->session->getCompressor()));
$this->session->syncPlayerList();
}

View File

@ -25,6 +25,7 @@ namespace pocketmine\network\mcpe\raklib;
use pocketmine\network\AdvancedNetworkInterface;
use pocketmine\network\BadPacketException;
use pocketmine\network\mcpe\compression\ZlibCompressor;
use pocketmine\network\mcpe\NetworkSession;
use pocketmine\network\mcpe\protocol\ProtocolInfo;
use pocketmine\network\Network;
@ -152,7 +153,14 @@ class RakLibInterface implements ServerEventListener, AdvancedNetworkInterface{
}
public function openSession(int $sessionId, string $address, int $port, int $clientID) : void{
$session = new NetworkSession($this->server, $this->network->getSessionManager(), new RakLibPacketSender($sessionId, $this), $address, $port);
$session = new NetworkSession(
$this->server,
$this->network->getSessionManager(),
new RakLibPacketSender($sessionId, $this),
ZlibCompressor::getInstance(), //TODO: this shouldn't be hardcoded, but we might need the RakNet protocol version to select it
$address,
$port
);
$this->sessions[$sessionId] = $session;
}