Merge branch 'next-minor' into next-major

This commit is contained in:
Dylan K. Taylor
2023-02-21 15:41:57 +00:00
10 changed files with 213 additions and 126 deletions

View File

@@ -34,6 +34,7 @@ use pocketmine\network\mcpe\protocol\types\ChunkPosition;
use pocketmine\network\mcpe\serializer\ChunkSerializer;
use pocketmine\scheduler\AsyncTask;
use pocketmine\thread\NonThreadSafeValue;
use pocketmine\utils\BinaryStream;
use pocketmine\world\format\Chunk;
use pocketmine\world\format\io\FastChunkSerializer;
@@ -68,7 +69,10 @@ class ChunkRequestTask extends AsyncTask{
$subCount = ChunkSerializer::getSubChunkCount($chunk);
$encoderContext = new PacketSerializerContext(GlobalItemTypeDictionary::getInstance()->getDictionary());
$payload = ChunkSerializer::serializeFullChunk($chunk, RuntimeBlockMapping::getInstance(), $encoderContext, $this->tiles);
$this->setResult($this->compressor->deserialize()->compress(PacketBatch::fromPackets($encoderContext, LevelChunkPacket::create(new ChunkPosition($this->chunkX, $this->chunkZ), $subCount, false, null, $payload))->getBuffer()));
$stream = new BinaryStream();
PacketBatch::encodePackets($stream, $encoderContext, [LevelChunkPacket::create(new ChunkPosition($this->chunkX, $this->chunkZ), $subCount, false, null, $payload)]);
$this->setResult($this->compressor->deserialize()->compress($stream->getBuffer()));
}
public function onError() : void{

View File

@@ -120,6 +120,7 @@ use pocketmine\player\XboxLivePlayerInfo;
use pocketmine\Server;
use pocketmine\timings\Timings;
use pocketmine\utils\AssumptionFailedError;
use pocketmine\utils\BinaryStream;
use pocketmine\utils\ObjectSet;
use pocketmine\utils\TextFormat;
use pocketmine\utils\Utils;
@@ -177,7 +178,7 @@ class NetworkSession{
private ?EncryptionContext $cipher = null;
/** @var Packet[] */
/** @var ClientboundPacket[] */
private array $sendBuffer = [];
/**
@@ -265,6 +266,23 @@ class NetworkSession{
);
}
/**
* @param ClientboundPacket[] $packets
*/
public static function encodePacketBatchTimed(BinaryStream $stream, PacketSerializerContext $context, array $packets) : void{
PacketBatch::encodeRaw($stream, array_map(function(ClientboundPacket $packet) use ($context) : string{
$timings = Timings::getEncodeDataPacketTimings($packet);
$timings->startTiming();
try{
$stream = PacketSerializer::encoder($context);
$packet->encode($stream);
return $stream->getBuffer();
}finally{
$timings->stopTiming();
}
}, $packets));
}
private function onPlayerCreated(Player $player) : void{
if(!$this->isConnected()){
//the remote player might have disconnected before spawn terrain generation was finished
@@ -360,56 +378,67 @@ class NetworkSession{
return;
}
if($this->incomingPacketBatchBudget <= 0){
$this->updatePacketBudget();
if($this->incomingPacketBatchBudget <= 0){
throw new PacketHandlingException("Receiving packets too fast");
}
}
$this->incomingPacketBatchBudget--;
if($this->cipher !== null){
Timings::$playerNetworkReceiveDecrypt->startTiming();
try{
$payload = $this->cipher->decrypt($payload);
}catch(DecryptionException $e){
$this->logger->debug("Encrypted packet: " . base64_encode($payload));
throw PacketHandlingException::wrap($e, "Packet decryption error");
}finally{
Timings::$playerNetworkReceiveDecrypt->stopTiming();
}
}
if($this->enableCompression){
Timings::$playerNetworkReceiveDecompress->startTiming();
try{
$decompressed = $this->compressor->decompress($payload);
}catch(DecompressionException $e){
$this->logger->debug("Failed to decompress packet: " . base64_encode($payload));
throw PacketHandlingException::wrap($e, "Compressed packet batch decode error");
}finally{
Timings::$playerNetworkReceiveDecompress->stopTiming();
}
}else{
$decompressed = $payload;
}
Timings::$playerNetworkReceive->startTiming();
try{
foreach((new PacketBatch($decompressed))->getPackets($this->packetPool, $this->packetSerializerContext, 1300) as [$packet, $buffer]){
if($packet === null){
$this->logger->debug("Unknown packet: " . base64_encode($buffer));
throw new PacketHandlingException("Unknown packet received");
}
try{
$this->handleDataPacket($packet, $buffer);
}catch(PacketHandlingException $e){
$this->logger->debug($packet->getName() . ": " . base64_encode($buffer));
throw PacketHandlingException::wrap($e, "Error processing " . $packet->getName());
if($this->incomingPacketBatchBudget <= 0){
$this->updatePacketBudget();
if($this->incomingPacketBatchBudget <= 0){
throw new PacketHandlingException("Receiving packets too fast");
}
}
}catch(PacketDecodeException $e){
$this->logger->logException($e);
throw PacketHandlingException::wrap($e, "Packet batch decode error");
$this->incomingPacketBatchBudget--;
if($this->cipher !== null){
Timings::$playerNetworkReceiveDecrypt->startTiming();
try{
$payload = $this->cipher->decrypt($payload);
}catch(DecryptionException $e){
$this->logger->debug("Encrypted packet: " . base64_encode($payload));
throw PacketHandlingException::wrap($e, "Packet decryption error");
}finally{
Timings::$playerNetworkReceiveDecrypt->stopTiming();
}
}
if($this->enableCompression){
Timings::$playerNetworkReceiveDecompress->startTiming();
try{
$decompressed = $this->compressor->decompress($payload);
}catch(DecompressionException $e){
$this->logger->debug("Failed to decompress packet: " . base64_encode($payload));
throw PacketHandlingException::wrap($e, "Compressed packet batch decode error");
}finally{
Timings::$playerNetworkReceiveDecompress->stopTiming();
}
}else{
$decompressed = $payload;
}
try{
$stream = new BinaryStream($decompressed);
$count = 0;
foreach(PacketBatch::decodeRaw($stream) as $buffer){
if(++$count > 1300){
throw new PacketHandlingException("Too many packets in batch");
}
$packet = $this->packetPool->getPacket($buffer);
if($packet === null){
$this->logger->debug("Unknown packet: " . base64_encode($buffer));
throw new PacketHandlingException("Unknown packet received");
}
try{
$this->handleDataPacket($packet, $buffer);
}catch(PacketHandlingException $e){
$this->logger->debug($packet->getName() . ": " . base64_encode($buffer));
throw PacketHandlingException::wrap($e, "Error processing " . $packet->getName());
}
}
}catch(PacketDecodeException $e){
$this->logger->logException($e);
throw PacketHandlingException::wrap($e, "Packet batch decode error");
}
}finally{
Timings::$playerNetworkReceive->stopTiming();
}
}
@@ -500,22 +529,29 @@ class NetworkSession{
private function flushSendBuffer(bool $immediate = false) : void{
if(count($this->sendBuffer) > 0){
$syncMode = null; //automatic
if($immediate){
$syncMode = true;
}elseif($this->forceAsyncCompression){
$syncMode = false;
}
Timings::$playerNetworkSend->startTiming();
try{
$syncMode = null; //automatic
if($immediate){
$syncMode = true;
}elseif($this->forceAsyncCompression){
$syncMode = false;
}
$batch = PacketBatch::fromPackets($this->packetSerializerContext, ...$this->sendBuffer);
if($this->enableCompression){
$promise = $this->server->prepareBatch($batch, $this->compressor, $syncMode);
}else{
$promise = new CompressBatchPromise();
$promise->resolve($batch->getBuffer());
$stream = new BinaryStream();
self::encodePacketBatchTimed($stream, $this->packetSerializerContext, $this->sendBuffer);
if($this->enableCompression){
$promise = $this->server->prepareBatch(new PacketBatch($stream->getBuffer()), $this->compressor, $syncMode);
}else{
$promise = new CompressBatchPromise();
$promise->resolve($stream->getBuffer());
}
$this->sendBuffer = [];
$this->queueCompressedNoBufferFlush($promise, $immediate);
}finally{
Timings::$playerNetworkSend->stopTiming();
}
$this->sendBuffer = [];
$this->queueCompressedNoBufferFlush($promise, $immediate);
}
}
@@ -528,35 +564,45 @@ class NetworkSession{
}
public function queueCompressed(CompressBatchPromise $payload, bool $immediate = false) : void{
$this->flushSendBuffer($immediate); //Maintain ordering if possible
$this->queueCompressedNoBufferFlush($payload, $immediate);
Timings::$playerNetworkSend->startTiming();
try{
$this->flushSendBuffer($immediate); //Maintain ordering if possible
$this->queueCompressedNoBufferFlush($payload, $immediate);
}finally{
Timings::$playerNetworkSend->stopTiming();
}
}
private function queueCompressedNoBufferFlush(CompressBatchPromise $payload, bool $immediate = false) : void{
if($immediate){
//Skips all queues
$this->sendEncoded($payload->getResult(), true);
}else{
$this->compressedQueue->enqueue($payload);
$payload->onResolve(function(CompressBatchPromise $payload) : void{
if($this->connected && $this->compressedQueue->bottom() === $payload){
$this->compressedQueue->dequeue(); //result unused
$this->sendEncoded($payload->getResult());
Timings::$playerNetworkSend->startTiming();
try{
if($immediate){
//Skips all queues
$this->sendEncoded($payload->getResult(), true);
}else{
$this->compressedQueue->enqueue($payload);
$payload->onResolve(function(CompressBatchPromise $payload) : void{
if($this->connected && $this->compressedQueue->bottom() === $payload){
$this->compressedQueue->dequeue(); //result unused
$this->sendEncoded($payload->getResult());
while(!$this->compressedQueue->isEmpty()){
/** @var CompressBatchPromise $current */
$current = $this->compressedQueue->bottom();
if($current->hasResult()){
$this->compressedQueue->dequeue();
while(!$this->compressedQueue->isEmpty()){
/** @var CompressBatchPromise $current */
$current = $this->compressedQueue->bottom();
if($current->hasResult()){
$this->compressedQueue->dequeue();
$this->sendEncoded($current->getResult());
}else{
//can't send any more queued until this one is ready
break;
$this->sendEncoded($current->getResult());
}else{
//can't send any more queued until this one is ready
break;
}
}
}
}
});
});
}
}finally{
Timings::$playerNetworkSend->stopTiming();
}
}

View File

@@ -25,6 +25,7 @@ namespace pocketmine\network\mcpe;
use pocketmine\network\mcpe\protocol\serializer\PacketBatch;
use pocketmine\Server;
use pocketmine\utils\BinaryStream;
use function spl_object_id;
final class StandardPacketBroadcaster implements PacketBroadcaster{
@@ -38,7 +39,9 @@ final class StandardPacketBroadcaster implements PacketBroadcaster{
$serializerContext = $recipient->getPacketSerializerContext();
$bufferId = spl_object_id($serializerContext);
if(!isset($buffers[$bufferId])){
$buffers[$bufferId] = PacketBatch::fromPackets($serializerContext, ...$packets);
$stream = new BinaryStream();
NetworkSession::encodePacketBatchTimed($stream, $serializerContext, $packets);
$buffers[$bufferId] = $stream->getBuffer();
}
//TODO: different compressors might be compatible, it might not be necessary to split them up by object
@@ -52,14 +55,14 @@ final class StandardPacketBroadcaster implements PacketBroadcaster{
$buffer = $buffers[$bufferId];
foreach($compressorMap as $compressorId => $compressorTargets){
$compressor = $compressors[$compressorId];
if(!$compressor->willCompress($buffer->getBuffer())){
if(!$compressor->willCompress($buffer)){
foreach($compressorTargets as $target){
foreach($packets as $pk){
$target->addToSendBuffer($pk);
}
}
}else{
$promise = $this->server->prepareBatch($buffer, $compressor);
$promise = $this->server->prepareBatch(new PacketBatch($buffer), $compressor);
foreach($compressorTargets as $target){
$target->queueCompressed($promise);
}