Improved batch handling

This commit is contained in:
Dylan K. Taylor 2017-07-13 12:04:47 +01:00
parent 5283975f20
commit 72d1948f30
4 changed files with 43 additions and 34 deletions

View File

@ -257,6 +257,7 @@ class Server{
/** @var Player[] */ /** @var Player[] */
private $playerList = []; private $playerList = [];
/** @var string[] */
private $identifiers = []; private $identifiers = [];
/** @var Level[] */ /** @var Level[] */
@ -1846,17 +1847,16 @@ class Server{
} }
if(Network::$BATCH_THRESHOLD >= 0 and strlen($pk->payload) >= Network::$BATCH_THRESHOLD){ if(Network::$BATCH_THRESHOLD >= 0 and strlen($pk->payload) >= Network::$BATCH_THRESHOLD){
$compressionLevel = $this->networkCompressionLevel; $pk->setCompressionLevel($this->networkCompressionLevel);
}else{ }else{
$compressionLevel = 0; //Do not compress packets under the threshold $pk->setCompressionLevel(0); //Do not compress packets under the threshold
$forceSync = true; $forceSync = true;
} }
if(!$forceSync and !$immediate and $this->networkCompressionAsync){ if(!$forceSync and !$immediate and $this->networkCompressionAsync){
$task = new CompressBatchedTask($pk, $targets, $compressionLevel); $task = new CompressBatchedTask($pk, $targets);
$this->getScheduler()->scheduleAsyncTask($task); $this->getScheduler()->scheduleAsyncTask($task);
}else{ }else{
$pk->compress($compressionLevel);
$this->broadcastPacketsCallback($pk, $targets, $immediate); $this->broadcastPacketsCallback($pk, $targets, $immediate);
} }
} }

View File

@ -75,7 +75,7 @@ class ChunkRequestTask extends AsyncTask{
$batch = new BatchPacket(); $batch = new BatchPacket();
$batch->addPacket($pk); $batch->addPacket($pk);
$batch->compress($this->compressionLevel); $batch->setCompressionLevel($this->compressionLevel);
$batch->encode(); $batch->encode();
$this->setResult($batch->buffer, false); $this->setResult($batch->buffer, false);
@ -87,7 +87,6 @@ class ChunkRequestTask extends AsyncTask{
if($this->hasResult()){ if($this->hasResult()){
$batch = new BatchPacket($this->getResult()); $batch = new BatchPacket($this->getResult());
assert(strlen($batch->buffer) > 0); assert(strlen($batch->buffer) > 0);
$batch->compressed = true;
$batch->isEncoded = true; $batch->isEncoded = true;
$level->chunkRequestCallback($this->chunkX, $this->chunkZ, $batch); $level->chunkRequestCallback($this->chunkX, $this->chunkZ, $batch);
}else{ }else{

View File

@ -33,10 +33,14 @@ class CompressBatchedTask extends AsyncTask{
public $data; public $data;
public $targets; public $targets;
public function __construct(BatchPacket $batch, array $targets, $level = 7){ /**
* @param BatchPacket $batch
* @param string[] $targets
*/
public function __construct(BatchPacket $batch, array $targets){
$this->data = $batch->payload; $this->data = $batch->payload;
$this->targets = serialize($targets); $this->targets = serialize($targets);
$this->level = $level; $this->level = $batch->getCompressionLevel();
} }
public function onRun(){ public function onRun(){
@ -44,7 +48,7 @@ class CompressBatchedTask extends AsyncTask{
$batch->payload = $this->data; $batch->payload = $this->data;
$this->data = null; $this->data = null;
$batch->compress($this->level); $batch->setCompressionLevel($this->level);
$batch->encode(); $batch->encode();
$this->setResult($batch->buffer, false); $this->setResult($batch->buffer, false);
@ -53,7 +57,6 @@ class CompressBatchedTask extends AsyncTask{
public function onCompletion(Server $server){ public function onCompletion(Server $server){
$pk = new BatchPacket($this->getResult()); $pk = new BatchPacket($this->getResult());
$pk->isEncoded = true; $pk->isEncoded = true;
$pk->compressed = true;
$server->broadcastPacketsCallback($pk, unserialize($this->targets)); $server->broadcastPacketsCallback($pk, unserialize($this->targets));
} }
} }

View File

@ -29,13 +29,17 @@ namespace pocketmine\network\mcpe\protocol;
use pocketmine\network\mcpe\NetworkSession; use pocketmine\network\mcpe\NetworkSession;
#ifndef COMPILE #ifndef COMPILE
use pocketmine\utils\Binary; use pocketmine\utils\Binary;
use pocketmine\utils\BinaryStream;
#endif #endif
class BatchPacket extends DataPacket{ class BatchPacket extends DataPacket{
const NETWORK_ID = 0xfe; const NETWORK_ID = 0xfe;
public $payload; /** @var string */
public $compressed = false; public $payload = "";
/** @var int */
protected $compressionLevel = 7;
public function canBeBatched() : bool{ public function canBeBatched() : bool{
return false; return false;
@ -46,12 +50,16 @@ class BatchPacket extends DataPacket{
} }
public function decodePayload(){ public function decodePayload(){
$this->payload = $this->getRemaining(); $data = $this->getRemaining();
try{
$this->payload = zlib_decode($data, 1024 * 1024 * 64); //Max 64MB
}catch(\ErrorException $e){ //zlib decode error
$this->payload = "";
}
} }
public function encodePayload(){ public function encodePayload(){
assert($this->compressed); $this->put(zlib_encode($this->payload, ZLIB_ENCODING_DEFLATE, $this->compressionLevel));
$this->put($this->payload);
} }
/** /**
@ -68,31 +76,30 @@ class BatchPacket extends DataPacket{
$this->payload .= Binary::writeUnsignedVarInt(strlen($packet->buffer)) . $packet->buffer; $this->payload .= Binary::writeUnsignedVarInt(strlen($packet->buffer)) . $packet->buffer;
} }
public function compress(int $level = 7){ /**
assert(!$this->compressed); * @return \Generator
$this->payload = zlib_encode($this->payload, ZLIB_ENCODING_DEFLATE, $level); */
$this->compressed = true; public function getPackets(){
$stream = new BinaryStream($this->payload);
while(!$stream->feof()){
yield $stream->getString();
}
}
public function getCompressionLevel() : int{
return $this->compressionLevel;
}
public function setCompressionLevel(int $level){
$this->compressionLevel = $level;
} }
public function handle(NetworkSession $session) : bool{ public function handle(NetworkSession $session) : bool{
if(strlen($this->payload) < 2){ if($this->payload === ""){
return false; return false;
} }
try{ foreach($this->getPackets() as $buf){
$str = zlib_decode($this->payload, 1024 * 1024 * 64); //Max 64MB
}catch(\ErrorException $e){
return false;
}
if(strlen($str) === 0){
throw new \InvalidStateException("Decoded BatchPacket payload is empty");
}
$this->setBuffer($str, 0);
while(!$this->feof()){
$buf = $this->getString();
$pk = PacketPool::getPacketById(ord($buf{0})); $pk = PacketPool::getPacketById(ord($buf{0}));
if(!$pk->canBeBatched()){ if(!$pk->canBeBatched()){