From 72d1948f30826af574624d316932996b107602a0 Mon Sep 17 00:00:00 2001 From: "Dylan K. Taylor" Date: Thu, 13 Jul 2017 12:04:47 +0100 Subject: [PATCH] Improved batch handling --- src/pocketmine/Server.php | 8 +-- .../level/format/io/ChunkRequestTask.php | 3 +- .../network/CompressBatchedTask.php | 11 ++-- .../network/mcpe/protocol/BatchPacket.php | 55 +++++++++++-------- 4 files changed, 43 insertions(+), 34 deletions(-) diff --git a/src/pocketmine/Server.php b/src/pocketmine/Server.php index 1f920e36b..0c9a7e96e 100644 --- a/src/pocketmine/Server.php +++ b/src/pocketmine/Server.php @@ -257,6 +257,7 @@ class Server{ /** @var Player[] */ private $playerList = []; + /** @var string[] */ private $identifiers = []; /** @var Level[] */ @@ -1846,17 +1847,16 @@ class Server{ } if(Network::$BATCH_THRESHOLD >= 0 and strlen($pk->payload) >= Network::$BATCH_THRESHOLD){ - $compressionLevel = $this->networkCompressionLevel; + $pk->setCompressionLevel($this->networkCompressionLevel); }else{ - $compressionLevel = 0; //Do not compress packets under the threshold + $pk->setCompressionLevel(0); //Do not compress packets under the threshold $forceSync = true; } if(!$forceSync and !$immediate and $this->networkCompressionAsync){ - $task = new CompressBatchedTask($pk, $targets, $compressionLevel); + $task = new CompressBatchedTask($pk, $targets); $this->getScheduler()->scheduleAsyncTask($task); }else{ - $pk->compress($compressionLevel); $this->broadcastPacketsCallback($pk, $targets, $immediate); } } diff --git a/src/pocketmine/level/format/io/ChunkRequestTask.php b/src/pocketmine/level/format/io/ChunkRequestTask.php index 75944af4d..01261ddd3 100644 --- a/src/pocketmine/level/format/io/ChunkRequestTask.php +++ b/src/pocketmine/level/format/io/ChunkRequestTask.php @@ -75,7 +75,7 @@ class ChunkRequestTask extends AsyncTask{ $batch = new BatchPacket(); $batch->addPacket($pk); - $batch->compress($this->compressionLevel); + $batch->setCompressionLevel($this->compressionLevel); $batch->encode(); $this->setResult($batch->buffer, false); @@ -87,7 +87,6 @@ class ChunkRequestTask extends AsyncTask{ if($this->hasResult()){ $batch = new BatchPacket($this->getResult()); assert(strlen($batch->buffer) > 0); - $batch->compressed = true; $batch->isEncoded = true; $level->chunkRequestCallback($this->chunkX, $this->chunkZ, $batch); }else{ diff --git a/src/pocketmine/network/CompressBatchedTask.php b/src/pocketmine/network/CompressBatchedTask.php index afecfa2bd..a70cd5531 100644 --- a/src/pocketmine/network/CompressBatchedTask.php +++ b/src/pocketmine/network/CompressBatchedTask.php @@ -33,10 +33,14 @@ class CompressBatchedTask extends AsyncTask{ public $data; public $targets; - public function __construct(BatchPacket $batch, array $targets, $level = 7){ + /** + * @param BatchPacket $batch + * @param string[] $targets + */ + public function __construct(BatchPacket $batch, array $targets){ $this->data = $batch->payload; $this->targets = serialize($targets); - $this->level = $level; + $this->level = $batch->getCompressionLevel(); } public function onRun(){ @@ -44,7 +48,7 @@ class CompressBatchedTask extends AsyncTask{ $batch->payload = $this->data; $this->data = null; - $batch->compress($this->level); + $batch->setCompressionLevel($this->level); $batch->encode(); $this->setResult($batch->buffer, false); @@ -53,7 +57,6 @@ class CompressBatchedTask extends AsyncTask{ public function onCompletion(Server $server){ $pk = new BatchPacket($this->getResult()); $pk->isEncoded = true; - $pk->compressed = true; $server->broadcastPacketsCallback($pk, unserialize($this->targets)); } } diff --git a/src/pocketmine/network/mcpe/protocol/BatchPacket.php b/src/pocketmine/network/mcpe/protocol/BatchPacket.php index a2095c050..af4ed8ab5 100644 --- a/src/pocketmine/network/mcpe/protocol/BatchPacket.php +++ b/src/pocketmine/network/mcpe/protocol/BatchPacket.php @@ -29,13 +29,17 @@ namespace pocketmine\network\mcpe\protocol; use pocketmine\network\mcpe\NetworkSession; #ifndef COMPILE use pocketmine\utils\Binary; +use pocketmine\utils\BinaryStream; + #endif class BatchPacket extends DataPacket{ const NETWORK_ID = 0xfe; - public $payload; - public $compressed = false; + /** @var string */ + public $payload = ""; + /** @var int */ + protected $compressionLevel = 7; public function canBeBatched() : bool{ return false; @@ -46,12 +50,16 @@ class BatchPacket extends DataPacket{ } public function decodePayload(){ - $this->payload = $this->getRemaining(); + $data = $this->getRemaining(); + try{ + $this->payload = zlib_decode($data, 1024 * 1024 * 64); //Max 64MB + }catch(\ErrorException $e){ //zlib decode error + $this->payload = ""; + } } public function encodePayload(){ - assert($this->compressed); - $this->put($this->payload); + $this->put(zlib_encode($this->payload, ZLIB_ENCODING_DEFLATE, $this->compressionLevel)); } /** @@ -68,31 +76,30 @@ class BatchPacket extends DataPacket{ $this->payload .= Binary::writeUnsignedVarInt(strlen($packet->buffer)) . $packet->buffer; } - public function compress(int $level = 7){ - assert(!$this->compressed); - $this->payload = zlib_encode($this->payload, ZLIB_ENCODING_DEFLATE, $level); - $this->compressed = true; + /** + * @return \Generator + */ + public function getPackets(){ + $stream = new BinaryStream($this->payload); + while(!$stream->feof()){ + yield $stream->getString(); + } + } + + public function getCompressionLevel() : int{ + return $this->compressionLevel; + } + + public function setCompressionLevel(int $level){ + $this->compressionLevel = $level; } public function handle(NetworkSession $session) : bool{ - if(strlen($this->payload) < 2){ + if($this->payload === ""){ return false; } - try{ - $str = zlib_decode($this->payload, 1024 * 1024 * 64); //Max 64MB - }catch(\ErrorException $e){ - return false; - } - - if(strlen($str) === 0){ - throw new \InvalidStateException("Decoded BatchPacket payload is empty"); - } - - $this->setBuffer($str, 0); - - while(!$this->feof()){ - $buf = $this->getString(); + foreach($this->getPackets() as $buf){ $pk = PacketPool::getPacketById(ord($buf{0})); if(!$pk->canBeBatched()){