Kill BatchPacket, clean up batching related things

DataPacketSendEvent and DataPacketReceiveEvent will no longer capture BatchPackets
In most places strings are now used instead of DataPackets, to remove limitations on what data can be sent to a network interface
Removed CraftingManager's cyclic dependency on Server

There is a lot more work to do aside from this, but this commit is intended to clean up what is necessary to fix the handling of BatchPacket.
This commit is contained in:
Dylan K. Taylor 2018-07-19 14:52:34 +01:00
parent 85647c03bf
commit bdd9a7eb52
15 changed files with 152 additions and 256 deletions

View File

@ -100,7 +100,6 @@ use pocketmine\network\mcpe\NetworkSession;
use pocketmine\network\mcpe\protocol\AdventureSettingsPacket;
use pocketmine\network\mcpe\protocol\AnimatePacket;
use pocketmine\network\mcpe\protocol\AvailableCommandsPacket;
use pocketmine\network\mcpe\protocol\BatchPacket;
use pocketmine\network\mcpe\protocol\BlockEntityDataPacket;
use pocketmine\network\mcpe\protocol\BlockPickRequestPacket;
use pocketmine\network\mcpe\protocol\BookEditPacket;
@ -714,6 +713,13 @@ class Player extends Human implements CommandSender, ChunkLoader, IPlayer{
return $this->networkSession !== null;
}
/**
* @return NetworkSession
*/
public function getNetworkSession() : NetworkSession{
return $this->networkSession;
}
/**
* Gets the username
* @return string
@ -934,7 +940,7 @@ class Player extends Human implements CommandSender, ChunkLoader, IPlayer{
unset($this->loadQueue[$index]);
}
public function sendChunk(int $x, int $z, BatchPacket $payload){
public function sendChunk(int $x, int $z, string $payload){
if(!$this->isConnected()){
return;
}
@ -942,7 +948,7 @@ class Player extends Human implements CommandSender, ChunkLoader, IPlayer{
$this->usedChunks[Level::chunkHash($x, $z)] = true;
$this->chunkLoadCount++;
$this->dataPacket($payload);
$this->networkSession->getInterface()->putPacket($this, $payload);
if($this->spawned){
foreach($this->level->getChunkEntities($x, $z) as $entity){
@ -2120,7 +2126,7 @@ class Player extends Human implements CommandSender, ChunkLoader, IPlayer{
$this->sendAllInventories();
$this->inventory->sendCreativeContents();
$this->inventory->sendHeldItem($this);
$this->dataPacket($this->server->getCraftingManager()->getCraftingDataPacket());
$this->networkSession->getInterface()->putPacket($this, $this->server->getCraftingManager()->getCraftingDataPacket());
$this->server->addOnlinePlayer($this);
$this->server->sendFullPlayerListData($this);

View File

@ -70,7 +70,8 @@ use pocketmine\nbt\tag\ShortTag;
use pocketmine\nbt\tag\StringTag;
use pocketmine\network\AdvancedNetworkInterface;
use pocketmine\network\mcpe\CompressBatchedTask;
use pocketmine\network\mcpe\protocol\BatchPacket;
use pocketmine\network\mcpe\NetworkCompression;
use pocketmine\network\mcpe\PacketStream;
use pocketmine\network\mcpe\protocol\DataPacket;
use pocketmine\network\mcpe\protocol\PlayerListPacket;
use pocketmine\network\mcpe\protocol\ProtocolInfo;
@ -223,8 +224,6 @@ class Server{
private $network;
/** @var bool */
private $networkCompressionAsync = true;
/** @var int */
public $networkCompressionLevel = 7;
/** @var bool */
private $autoTickRate = true;
@ -1517,15 +1516,15 @@ class Server{
$this->asyncPool = new AsyncPool($this, $poolSize, (int) max(-1, (int) $this->getProperty("memory.async-worker-hard-limit", 256)), $this->autoloader, $this->logger);
if($this->getProperty("network.batch-threshold", 256) >= 0){
Network::$BATCH_THRESHOLD = (int) $this->getProperty("network.batch-threshold", 256);
NetworkCompression::$THRESHOLD = (int) $this->getProperty("network.batch-threshold", 256);
}else{
Network::$BATCH_THRESHOLD = -1;
NetworkCompression::$THRESHOLD = -1;
}
$this->networkCompressionLevel = $this->getProperty("network.compression-level", 7);
if($this->networkCompressionLevel < 1 or $this->networkCompressionLevel > 9){
$this->logger->warning("Invalid network compression level $this->networkCompressionLevel set, setting to default 7");
$this->networkCompressionLevel = 7;
NetworkCompression::$LEVEL = $this->getProperty("network.compression-level", 7);
if(NetworkCompression::$LEVEL < 1 or NetworkCompression::$LEVEL > 9){
$this->logger->warning("Invalid network compression level " . NetworkCompression::$LEVEL . " set, setting to default 7");
NetworkCompression::$LEVEL = 7;
}
$this->networkCompressionAsync = (bool) $this->getProperty("network.async-compression", true);
@ -1864,24 +1863,23 @@ class Server{
$targets = array_filter($players, function(Player $player) : bool{ return $player->isConnected(); });
if(!empty($targets)){
$pk = new BatchPacket();
$stream = new PacketStream();
foreach($packets as $p){
$pk->addPacket($p);
$stream->putPacket($p);
}
if(Network::$BATCH_THRESHOLD >= 0 and strlen($pk->payload) >= Network::$BATCH_THRESHOLD){
$pk->setCompressionLevel($this->networkCompressionLevel);
}else{
$pk->setCompressionLevel(0); //Do not compress packets under the threshold
$compressionLevel = NetworkCompression::$LEVEL;
if(NetworkCompression::$THRESHOLD < 0 or strlen($stream->buffer) < NetworkCompression::$THRESHOLD){
$compressionLevel = 0; //Do not compress packets under the threshold
$forceSync = true;
}
if(!$forceSync and !$immediate and $this->networkCompressionAsync){
$task = new CompressBatchedTask($pk, $targets);
$task = new CompressBatchedTask($stream, $targets, $compressionLevel);
$this->asyncPool->submitTask($task);
}else{
$this->broadcastPacketsCallback($pk, $targets, $immediate);
$this->broadcastPacketsCallback(NetworkCompression::compress($stream->buffer), $targets, $immediate);
}
}
@ -1889,17 +1887,13 @@ class Server{
}
/**
* @param BatchPacket $pk
* @param Player[] $players
* @param bool $immediate
* @param string $payload
* @param Player[] $players
* @param bool $immediate
*/
public function broadcastPacketsCallback(BatchPacket $pk, array $players, bool $immediate = false){
if(!$pk->isEncoded){
$pk->encode();
}
public function broadcastPacketsCallback(string $payload, array $players, bool $immediate = false){
foreach($players as $i){
$i->sendDataPacket($pk, false, $immediate);
$i->getNetworkSession()->getInterface()->putPacket($i, $payload, false, $immediate);
}
}

View File

@ -25,9 +25,9 @@ namespace pocketmine\inventory;
use pocketmine\item\Item;
use pocketmine\item\ItemFactory;
use pocketmine\network\mcpe\protocol\BatchPacket;
use pocketmine\network\mcpe\NetworkCompression;
use pocketmine\network\mcpe\PacketStream;
use pocketmine\network\mcpe\protocol\CraftingDataPacket;
use pocketmine\Server;
use pocketmine\timings\Timings;
class CraftingManager{
@ -38,7 +38,7 @@ class CraftingManager{
/** @var FurnaceRecipe[] */
protected $furnaceRecipes = [];
/** @var BatchPacket */
/** @var string */
private $craftingDataCache;
public function __construct(){
@ -102,21 +102,19 @@ class CraftingManager{
$pk->encode();
$batch = new BatchPacket();
$batch->addPacket($pk);
$batch->setCompressionLevel(Server::getInstance()->networkCompressionLevel);
$batch->encode();
$batch = new PacketStream();
$batch->putPacket($pk);
$this->craftingDataCache = $batch;
$this->craftingDataCache = NetworkCompression::compress($batch->buffer);
Timings::$craftingDataCacheRebuildTimer->stopTiming();
}
/**
* Returns a pre-compressed CraftingDataPacket for sending to players. Rebuilds the cache if it is not found.
*
* @return BatchPacket
* @return string
*/
public function getCraftingDataPacket() : BatchPacket{
public function getCraftingDataPacket() : string{
if($this->craftingDataCache === null){
$this->buildCraftingDataCache();
}

View File

@ -69,7 +69,6 @@ use pocketmine\metadata\MetadataValue;
use pocketmine\nbt\tag\ListTag;
use pocketmine\nbt\tag\StringTag;
use pocketmine\network\mcpe\ChunkRequestTask;
use pocketmine\network\mcpe\protocol\BatchPacket;
use pocketmine\network\mcpe\protocol\DataPacket;
use pocketmine\network\mcpe\protocol\LevelEventPacket;
use pocketmine\network\mcpe\protocol\LevelSoundEventPacket;
@ -123,7 +122,7 @@ class Level implements ChunkManager, Metadatable{
/** @var Block[][] */
private $blockCache = [];
/** @var BatchPacket[] */
/** @var string[] */
private $chunkCache = [];
/** @var int */
@ -2498,7 +2497,7 @@ class Level implements ChunkManager, Metadatable{
}
}
public function chunkRequestCallback(int $x, int $z, BatchPacket $payload){
public function chunkRequestCallback(int $x, int $z, string $payload){
$this->timings->syncChunkSendTimer->startTiming();
$index = Level::chunkHash($x, $z);

View File

@ -33,9 +33,6 @@ use pocketmine\network\mcpe\protocol\PacketPool;
use pocketmine\Server;
class Network{
public static $BATCH_THRESHOLD = 512;
/** @var Server */
private $server;

View File

@ -26,7 +26,6 @@ declare(strict_types=1);
*/
namespace pocketmine\network;
use pocketmine\network\mcpe\protocol\DataPacket;
use pocketmine\Player;
/**
@ -42,14 +41,14 @@ interface NetworkInterface{
/**
* Sends a DataPacket to the interface, returns an unique identifier for the packet if $needACK is true
*
* @param Player $player
* @param DataPacket $packet
* @param bool $needACK
* @param bool $immediate
* @param Player $player
* @param string $payload
* @param bool $needACK
* @param bool $immediate
*
* @return int|null
*/
public function putPacket(Player $player, DataPacket $packet, bool $needACK = false, bool $immediate = true) : ?int;
public function putPacket(Player $player, string $payload, bool $needACK = false, bool $immediate = true) : ?int;
/**
* Terminates the connection

View File

@ -25,7 +25,6 @@ namespace pocketmine\network\mcpe;
use pocketmine\level\format\Chunk;
use pocketmine\level\Level;
use pocketmine\network\mcpe\protocol\BatchPacket;
use pocketmine\network\mcpe\protocol\FullChunkDataPacket;
use pocketmine\scheduler\AsyncTask;
use pocketmine\Server;
@ -47,7 +46,7 @@ class ChunkRequestTask extends AsyncTask{
public function __construct(Level $level, int $chunkX, int $chunkZ, Chunk $chunk){
$this->levelId = $level->getId();
$this->compressionLevel = $level->getServer()->networkCompressionLevel;
$this->compressionLevel = NetworkCompression::$LEVEL;
$this->chunk = $chunk->fastSerialize();
$this->chunkX = $chunkX;
@ -72,22 +71,17 @@ class ChunkRequestTask extends AsyncTask{
$pk->chunkZ = $this->chunkZ;
$pk->data = $chunk->networkSerialize() . $this->tiles;
$batch = new BatchPacket();
$batch->addPacket($pk);
$batch->setCompressionLevel($this->compressionLevel);
$batch->encode();
$stream = new PacketStream();
$stream->putPacket($pk);
$this->setResult($batch->buffer, false);
$this->setResult(NetworkCompression::compress($stream->buffer, $this->compressionLevel), false);
}
public function onCompletion(Server $server) : void{
$level = $server->getLevel($this->levelId);
if($level instanceof Level){
if($this->hasResult()){
$batch = new BatchPacket($this->getResult());
assert(strlen($batch->buffer) > 0);
$batch->isEncoded = true;
$level->chunkRequestCallback($this->chunkX, $this->chunkZ, $batch);
$level->chunkRequestCallback($this->chunkX, $this->chunkZ, $this->getResult());
}else{
$server->getLogger()->error("Chunk request for level #" . $this->levelId . ", x=" . $this->chunkX . ", z=" . $this->chunkZ . " doesn't have any result data");
}

View File

@ -23,44 +23,34 @@ declare(strict_types=1);
namespace pocketmine\network\mcpe;
use pocketmine\network\mcpe\protocol\BatchPacket;
use pocketmine\Player;
use pocketmine\scheduler\AsyncTask;
use pocketmine\Server;
class CompressBatchedTask extends AsyncTask{
public $level = 7;
public $data;
private $level;
private $data;
/**
* @param BatchPacket $batch
* @param string[] $targets
* @param PacketStream $stream
* @param string[] $targets
* @param int $compressionLevel
*/
public function __construct(BatchPacket $batch, array $targets){
$this->data = $batch->payload;
$this->level = $batch->getCompressionLevel();
public function __construct(PacketStream $stream, array $targets, int $compressionLevel){
$this->data = $stream->buffer;
$this->level = $compressionLevel;
$this->storeLocal($targets);
}
public function onRun() : void{
$batch = new BatchPacket();
$batch->payload = $this->data;
$this->data = null;
$batch->setCompressionLevel($this->level);
$batch->encode();
$this->setResult($batch->buffer, false);
$this->setResult(NetworkCompression::compress($this->data, $this->level), false);
}
public function onCompletion(Server $server) : void{
$pk = new BatchPacket($this->getResult());
$pk->isEncoded = true;
/** @var Player[] $targets */
$targets = $this->fetchLocal();
$server->broadcastPacketsCallback($pk, $targets);
$server->broadcastPacketsCallback($this->getResult(), $targets);
}
}

View File

@ -0,0 +1,47 @@
<?php
/*
*
* ____ _ _ __ __ _ __ __ ____
* | _ \ ___ ___| | _____| |_| \/ (_)_ __ ___ | \/ | _ \
* | |_) / _ \ / __| |/ / _ \ __| |\/| | | '_ \ / _ \_____| |\/| | |_) |
* | __/ (_) | (__| < __/ |_| | | | | | | | __/_____| | | | __/
* |_| \___/ \___|_|\_\___|\__|_| |_|_|_| |_|\___| |_| |_|_|
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* @author PocketMine Team
* @link http://www.pocketmine.net/
*
*
*/
declare(strict_types=1);
namespace pocketmine\network\mcpe;
final class NetworkCompression{
public static $LEVEL = 7;
public static $THRESHOLD = 256;
private function __construct(){
}
public static function decompress(string $payload) : string{
return zlib_decode($payload, 1024 * 1024 * 64); //Max 64MB
}
/**
* @param string $payload
* @param int $compressionLevel
*
* @return string
*/
public static function compress(string $payload, ?int $compressionLevel = null) : string{
return zlib_encode($payload, ZLIB_ENCODING_DEFLATE, $compressionLevel ?? self::$LEVEL);
}
}

View File

@ -42,8 +42,8 @@ use pocketmine\network\mcpe\protocol\BossEventPacket;
use pocketmine\network\mcpe\protocol\CameraPacket;
use pocketmine\network\mcpe\protocol\ChangeDimensionPacket;
use pocketmine\network\mcpe\protocol\ChunkRadiusUpdatedPacket;
use pocketmine\network\mcpe\protocol\ClientToServerHandshakePacket;
use pocketmine\network\mcpe\protocol\ClientboundMapItemDataPacket;
use pocketmine\network\mcpe\protocol\ClientToServerHandshakePacket;
use pocketmine\network\mcpe\protocol\CommandBlockUpdatePacket;
use pocketmine\network\mcpe\protocol\CommandOutputPacket;
use pocketmine\network\mcpe\protocol\CommandRequestPacket;
@ -82,14 +82,15 @@ use pocketmine\network\mcpe\protocol\MoveEntityAbsolutePacket;
use pocketmine\network\mcpe\protocol\MoveEntityDeltaPacket;
use pocketmine\network\mcpe\protocol\MovePlayerPacket;
use pocketmine\network\mcpe\protocol\NpcRequestPacket;
use pocketmine\network\mcpe\protocol\PacketPool;
use pocketmine\network\mcpe\protocol\PhotoTransferPacket;
use pocketmine\network\mcpe\protocol\PlaySoundPacket;
use pocketmine\network\mcpe\protocol\PlayStatusPacket;
use pocketmine\network\mcpe\protocol\PlayerActionPacket;
use pocketmine\network\mcpe\protocol\PlayerHotbarPacket;
use pocketmine\network\mcpe\protocol\PlayerInputPacket;
use pocketmine\network\mcpe\protocol\PlayerListPacket;
use pocketmine\network\mcpe\protocol\PlayerSkinPacket;
use pocketmine\network\mcpe\protocol\PlaySoundPacket;
use pocketmine\network\mcpe\protocol\PlayStatusPacket;
use pocketmine\network\mcpe\protocol\PurchaseReceiptPacket;
use pocketmine\network\mcpe\protocol\RemoveEntityPacket;
use pocketmine\network\mcpe\protocol\RemoveObjectivePacket;
@ -98,8 +99,8 @@ use pocketmine\network\mcpe\protocol\ResourcePackChunkDataPacket;
use pocketmine\network\mcpe\protocol\ResourcePackChunkRequestPacket;
use pocketmine\network\mcpe\protocol\ResourcePackClientResponsePacket;
use pocketmine\network\mcpe\protocol\ResourcePackDataInfoPacket;
use pocketmine\network\mcpe\protocol\ResourcePackStackPacket;
use pocketmine\network\mcpe\protocol\ResourcePacksInfoPacket;
use pocketmine\network\mcpe\protocol\ResourcePackStackPacket;
use pocketmine\network\mcpe\protocol\RespawnPacket;
use pocketmine\network\mcpe\protocol\RiderJumpPacket;
use pocketmine\network\mcpe\protocol\ServerSettingsRequestPacket;
@ -166,6 +167,10 @@ class NetworkSession{
$this->port = $port;
}
public function getInterface() : NetworkInterface{
return $this->interface;
}
/**
* @return string
*/
@ -180,6 +185,15 @@ class NetworkSession{
return $this->port;
}
public function handleEncoded(string $payload) : void{
//TODO: decryption if enabled
$stream = new PacketStream(NetworkCompression::decompress($payload));
while(!$stream->feof()){
$this->handleDataPacket(PacketPool::getPacket($stream->getString()));
}
}
public function handleDataPacket(DataPacket $packet) : void{
$timings = Timings::getReceiveDataPacketTimings($packet);
$timings->startTiming();
@ -207,7 +221,8 @@ class NetworkSession{
return false;
}
$this->interface->putPacket($this->player, $packet, false, $immediate);
//TODO: implement buffering (this is just a quick fix)
$this->server->batchPackets([$this->player], [$packet], true, $immediate);
return true;
}finally{

View File

@ -23,13 +23,19 @@ declare(strict_types=1);
namespace pocketmine\network\mcpe;
use raklib\protocol\EncapsulatedPacket;
use pocketmine\network\mcpe\protocol\DataPacket;
use pocketmine\network\mcpe\protocol\PacketPool;
class CachedEncapsulatedPacket extends EncapsulatedPacket{
/** @var string|null */
private $internalData = null;
class PacketStream extends NetworkBinaryStream{
public function toInternalBinary() : string{
return $this->internalData ?? ($this->internalData = parent::toInternalBinary());
public function putPacket(DataPacket $packet) : void{
if(!$packet->isEncoded){
$packet->encode();
}
$this->putString($packet->buffer);
}
public function getPacket() : DataPacket{
return PacketPool::getPacket($this->getString());
}
}

View File

@ -25,9 +25,6 @@ namespace pocketmine\network\mcpe;
use pocketmine\event\player\PlayerCreationEvent;
use pocketmine\network\AdvancedNetworkInterface;
use pocketmine\network\mcpe\protocol\BatchPacket;
use pocketmine\network\mcpe\protocol\DataPacket;
use pocketmine\network\mcpe\protocol\PacketPool;
use pocketmine\network\mcpe\protocol\ProtocolInfo;
use pocketmine\network\Network;
use pocketmine\Player;
@ -48,6 +45,8 @@ class RakLibInterface implements ServerInstance, AdvancedNetworkInterface{
*/
private const MCPE_RAKNET_PROTOCOL_VERSION = 8;
private const MCPE_RAKNET_PACKET_ID = "\xfe";
/** @var Server */
private $server;
@ -153,13 +152,12 @@ class RakLibInterface implements ServerInstance, AdvancedNetworkInterface{
//get this now for blocking in case the player was closed before the exception was raised
$address = $this->players[$identifier]->getAddress();
try{
if($packet->buffer !== ""){
$pk = PacketPool::getPacket($packet->buffer);
$this->players[$identifier]->handleDataPacket($pk);
if($packet->buffer !== "" and $packet->buffer{0} === self::MCPE_RAKNET_PACKET_ID){ //Batch
$this->players[$identifier]->getNetworkSession()->handleEncoded(substr($packet->buffer, 1));
}
}catch(\Throwable $e){
$logger = $this->server->getLogger();
$logger->debug("Packet " . (isset($pk) ? get_class($pk) : "unknown") . " 0x" . bin2hex($packet->buffer));
$logger->debug("EncapsulatedPacket 0x" . bin2hex($packet->buffer));
$logger->logException($e);
$this->interface->blockAddress($address, 5);
@ -216,37 +214,18 @@ class RakLibInterface implements ServerInstance, AdvancedNetworkInterface{
}
}
public function putPacket(Player $player, DataPacket $packet, bool $needACK = false, bool $immediate = true) : ?int{
public function putPacket(Player $player, string $payload, bool $needACK = false, bool $immediate = true) : ?int{
if(isset($this->identifiers[$h = spl_object_hash($player)])){
$identifier = $this->identifiers[$h];
if(!$packet->isEncoded){
$packet->encode();
}
if($packet instanceof BatchPacket){
if($needACK){
$pk = new EncapsulatedPacket();
$pk->identifierACK = $this->identifiersACK[$identifier]++;
$pk->buffer = $packet->buffer;
$pk->reliability = PacketReliability::RELIABLE_ORDERED;
$pk->orderChannel = 0;
}else{
if(!isset($packet->__encapsulatedPacket)){
$packet->__encapsulatedPacket = new CachedEncapsulatedPacket;
$packet->__encapsulatedPacket->identifierACK = null;
$packet->__encapsulatedPacket->buffer = $packet->buffer;
$packet->__encapsulatedPacket->reliability = PacketReliability::RELIABLE_ORDERED;
$packet->__encapsulatedPacket->orderChannel = 0;
}
$pk = $packet->__encapsulatedPacket;
}
$pk = new EncapsulatedPacket();
$pk->identifierACK = $this->identifiersACK[$identifier]++;
$pk->buffer = self::MCPE_RAKNET_PACKET_ID . $payload;
$pk->reliability = PacketReliability::RELIABLE_ORDERED;
$pk->orderChannel = 0;
$this->interface->sendEncapsulated($identifier, $pk, ($needACK ? RakLib::FLAG_NEED_ACK : 0) | ($immediate ? RakLib::PRIORITY_IMMEDIATE : RakLib::PRIORITY_NORMAL));
return $pk->identifierACK;
}else{
$this->server->batchPackets([$player], [$packet], true, $immediate);
return null;
}
$this->interface->sendEncapsulated($identifier, $pk, ($needACK ? RakLib::FLAG_NEED_ACK : 0) | ($immediate ? RakLib::PRIORITY_IMMEDIATE : RakLib::PRIORITY_NORMAL));
return $pk->identifierACK;
}
return null;

View File

@ -1,122 +0,0 @@
<?php
/*
*
* ____ _ _ __ __ _ __ __ ____
* | _ \ ___ ___| | _____| |_| \/ (_)_ __ ___ | \/ | _ \
* | |_) / _ \ / __| |/ / _ \ __| |\/| | | '_ \ / _ \_____| |\/| | |_) |
* | __/ (_) | (__| < __/ |_| | | | | | | | __/_____| | | | __/
* |_| \___/ \___|_|\_\___|\__|_| |_|_|_| |_|\___| |_| |_|_|
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* @author PocketMine Team
* @link http://www.pocketmine.net/
*
*
*/
declare(strict_types=1);
namespace pocketmine\network\mcpe\protocol;
#include <rules/DataPacket.h>
use pocketmine\network\mcpe\NetworkBinaryStream;
use pocketmine\network\mcpe\NetworkSession;
#ifndef COMPILE
use pocketmine\utils\Binary;
#endif
class BatchPacket extends DataPacket{
public const NETWORK_ID = 0xfe;
/** @var string */
public $payload = "";
/** @var int */
protected $compressionLevel = 7;
public function canBeBatched() : bool{
return false;
}
public function canBeSentBeforeLogin() : bool{
return true;
}
protected function decodeHeader() : void{
$pid = $this->getByte();
assert($pid === static::NETWORK_ID);
}
protected function decodePayload() : void{
$data = $this->getRemaining();
try{
$this->payload = zlib_decode($data, 1024 * 1024 * 64); //Max 64MB
}catch(\ErrorException $e){ //zlib decode error
$this->payload = "";
}
}
protected function encodeHeader() : void{
$this->putByte(static::NETWORK_ID);
}
protected function encodePayload() : void{
$this->put(zlib_encode($this->payload, ZLIB_ENCODING_DEFLATE, $this->compressionLevel));
}
/**
* @param DataPacket $packet
*/
public function addPacket(DataPacket $packet) : void{
if(!$packet->canBeBatched()){
throw new \InvalidArgumentException(get_class($packet) . " cannot be put inside a BatchPacket");
}
if(!$packet->isEncoded){
$packet->encode();
}
$this->payload .= Binary::writeUnsignedVarInt(strlen($packet->buffer)) . $packet->buffer;
}
/**
* @return \Generator
*/
public function getPackets() : \Generator{
$stream = new NetworkBinaryStream($this->payload);
while(!$stream->feof()){
yield $stream->getString();
}
}
public function getCompressionLevel() : int{
return $this->compressionLevel;
}
public function setCompressionLevel(int $level) : void{
$this->compressionLevel = $level;
}
public function handle(NetworkSession $session) : bool{
if($this->payload === ""){
return false;
}
foreach($this->getPackets() as $buf){
$pk = PacketPool::getPacket($buf);
if(!$pk->canBeBatched()){
throw new \InvalidArgumentException("Received invalid " . get_class($pk) . " inside BatchPacket");
}
$session->handleDataPacket($pk);
}
return true;
}
}

View File

@ -49,10 +49,6 @@ abstract class DataPacket extends NetworkBinaryStream{
return (new \ReflectionClass($this))->getShortName();
}
public function canBeBatched() : bool{
return true;
}
public function canBeSentBeforeLogin() : bool{
return false;
}

View File

@ -143,8 +143,6 @@ class PacketPool{
static::registerPacket(new UpdateBlockSyncedPacket());
static::registerPacket(new MoveEntityDeltaPacket());
static::registerPacket(new SetLocalPlayerAsInitializedPacket());
static::registerPacket(new BatchPacket());
}
/**