From 6a64486f55e37cbd7a0ce092335441d111fa77e4 Mon Sep 17 00:00:00 2001 From: "Dylan K. Taylor" Date: Wed, 22 Feb 2023 21:51:49 +0000 Subject: [PATCH 1/2] StandardPacketBroadcaster: Improve performance when broadcasting small packets In refactors during PM4, I stripped out packet buffer caching, as it was problematic when events alter packets in undetectable ways. However, I never cleaned this part of the code up properly after enabling DataPacketSendEvent to include multiple packets and multiple targets, so we were still individually encoding the packet(s) for every single session if the sum total of the sizes was below 256 bytes. This change encodes packets once in the StandardPacketBroadcaster and retains their buffers to post to the session's send buffer directly if the resulting batch is below compression threshold. This code is still not optimal (see ##5589), but fixing this brings broadcasting performance back to PM3 levels, without any of PM3's problems. --- src/network/mcpe/NetworkSession.php | 37 +++++++------------ .../mcpe/StandardPacketBroadcaster.php | 28 +++++++++----- 2 files changed, 33 insertions(+), 32 deletions(-) diff --git a/src/network/mcpe/NetworkSession.php b/src/network/mcpe/NetworkSession.php index 2b995ef8f..8becf1260 100644 --- a/src/network/mcpe/NetworkSession.php +++ b/src/network/mcpe/NetworkSession.php @@ -177,7 +177,7 @@ class NetworkSession{ private ?EncryptionContext $cipher = null; - /** @var ClientboundPacket[] */ + /** @var string[] */ private array $sendBuffer = []; /** @@ -264,23 +264,6 @@ class NetworkSession{ ); } - /** - * @param ClientboundPacket[] $packets - */ - public static function encodePacketBatchTimed(BinaryStream $stream, PacketSerializerContext $context, array $packets) : void{ - PacketBatch::encodeRaw($stream, array_map(function(ClientboundPacket $packet) use ($context) : string{ - $timings = Timings::getEncodeDataPacketTimings($packet); - $timings->startTiming(); - try{ - $stream = PacketSerializer::encoder($context); - $packet->encode($stream); - return $stream->getBuffer(); - }finally{ - $timings->stopTiming(); - } - }, $packets)); - } - private function onPlayerCreated(Player $player) : void{ if(!$this->isConnected()){ //the remote player might have disconnected before spawn terrain generation was finished @@ -498,7 +481,7 @@ class NetworkSession{ return false; } - $this->addToSendBuffer($packet); + $this->addToSendBuffer(self::encodePacketTimed(PacketSerializer::encoder($this->packetSerializerContext), $packet)); if($immediate){ $this->flushSendBuffer(true); } @@ -512,16 +495,24 @@ class NetworkSession{ /** * @internal */ - public function addToSendBuffer(ClientboundPacket $packet) : void{ - $timings = Timings::getSendDataPacketTimings($packet); + public static function encodePacketTimed(PacketSerializer $serializer, ClientboundPacket $packet) : string{ + $timings = Timings::getEncodeDataPacketTimings($packet); $timings->startTiming(); try{ - $this->sendBuffer[] = $packet; + $packet->encode($serializer); + return $serializer->getBuffer(); }finally{ $timings->stopTiming(); } } + /** + * @internal + */ + public function addToSendBuffer(string $buffer) : void{ + $this->sendBuffer[] = $buffer; + } + private function flushSendBuffer(bool $immediate = false) : void{ if(count($this->sendBuffer) > 0){ Timings::$playerNetworkSend->startTiming(); @@ -534,7 +525,7 @@ class NetworkSession{ } $stream = new BinaryStream(); - self::encodePacketBatchTimed($stream, $this->packetSerializerContext, $this->sendBuffer); + PacketBatch::encodeRaw($stream, $this->sendBuffer); if($this->enableCompression){ $promise = $this->server->prepareBatch(new PacketBatch($stream->getBuffer()), $this->compressor, $syncMode); diff --git a/src/network/mcpe/StandardPacketBroadcaster.php b/src/network/mcpe/StandardPacketBroadcaster.php index 73848ccae..c52b8f93d 100644 --- a/src/network/mcpe/StandardPacketBroadcaster.php +++ b/src/network/mcpe/StandardPacketBroadcaster.php @@ -23,25 +23,35 @@ declare(strict_types=1); namespace pocketmine\network\mcpe; +use pocketmine\network\mcpe\protocol\ClientboundPacket; use pocketmine\network\mcpe\protocol\serializer\PacketBatch; +use pocketmine\network\mcpe\protocol\serializer\PacketSerializer; use pocketmine\Server; use pocketmine\utils\BinaryStream; +use function array_map; use function spl_object_id; final class StandardPacketBroadcaster implements PacketBroadcaster{ public function __construct(private Server $server){} public function broadcastPackets(array $recipients, array $packets) : void{ - $buffers = []; + $batchBuffers = []; + + /** @var string[][] $packetBuffers */ + $packetBuffers = []; $compressors = []; + /** @var NetworkSession[][][] $targetMap */ $targetMap = []; foreach($recipients as $recipient){ $serializerContext = $recipient->getPacketSerializerContext(); $bufferId = spl_object_id($serializerContext); - if(!isset($buffers[$bufferId])){ + if(!isset($batchBuffers[$bufferId])){ + $packetBuffers[$bufferId] = array_map(function(ClientboundPacket $packet) use ($serializerContext) : string{ + return NetworkSession::encodePacketTimed(PacketSerializer::encoder($serializerContext), $packet); + }, $packets); $stream = new BinaryStream(); - NetworkSession::encodePacketBatchTimed($stream, $serializerContext, $packets); - $buffers[$bufferId] = $stream->getBuffer(); + PacketBatch::encodeRaw($stream, $packetBuffers[$bufferId]); + $batchBuffers[$bufferId] = $stream->getBuffer(); } //TODO: different compressors might be compatible, it might not be necessary to split them up by object @@ -52,17 +62,17 @@ final class StandardPacketBroadcaster implements PacketBroadcaster{ } foreach($targetMap as $bufferId => $compressorMap){ - $buffer = $buffers[$bufferId]; + $batchBuffer = $batchBuffers[$bufferId]; foreach($compressorMap as $compressorId => $compressorTargets){ $compressor = $compressors[$compressorId]; - if(!$compressor->willCompress($buffer)){ + if(!$compressor->willCompress($batchBuffer)){ foreach($compressorTargets as $target){ - foreach($packets as $pk){ - $target->addToSendBuffer($pk); + foreach($packetBuffers[$bufferId] as $packetBuffer){ + $target->addToSendBuffer($packetBuffer); } } }else{ - $promise = $this->server->prepareBatch(new PacketBatch($buffer), $compressor); + $promise = $this->server->prepareBatch(new PacketBatch($batchBuffer), $compressor); foreach($compressorTargets as $target){ $target->queueCompressed($promise); } From 8234360c8d4b54aca3fff531d02fa4bbf7017af2 Mon Sep 17 00:00:00 2001 From: "Dylan K. Taylor" Date: Wed, 22 Feb 2023 22:43:10 +0000 Subject: [PATCH 2/2] Avoid creating batch buffers just to determine whether a batch should be globally compressed Instead, sum together the lengths of encoded packet buffers and use that to decide whether to build the buffer or not. --- src/Server.php | 3 ++ .../mcpe/StandardPacketBroadcaster.php | 46 +++++++++++++------ .../mcpe/compression/ThresholdCompressor.php | 31 +++++++++++++ .../mcpe/compression/ZlibCompressor.php | 10 ++-- 4 files changed, 73 insertions(+), 17 deletions(-) create mode 100644 src/network/mcpe/compression/ThresholdCompressor.php diff --git a/src/Server.php b/src/Server.php index 0fc467bef..116e6f39e 100644 --- a/src/Server.php +++ b/src/Server.php @@ -891,6 +891,9 @@ class Server{ if($this->configGroup->getPropertyInt("network.batch-threshold", 256) >= 0){ $netCompressionThreshold = $this->configGroup->getPropertyInt("network.batch-threshold", 256); } + if($netCompressionThreshold < 0){ + $netCompressionThreshold = null; + } $netCompressionLevel = $this->configGroup->getPropertyInt("network.compression-level", 6); if($netCompressionLevel < 1 || $netCompressionLevel > 9){ diff --git a/src/network/mcpe/StandardPacketBroadcaster.php b/src/network/mcpe/StandardPacketBroadcaster.php index c52b8f93d..04b3253f5 100644 --- a/src/network/mcpe/StandardPacketBroadcaster.php +++ b/src/network/mcpe/StandardPacketBroadcaster.php @@ -23,21 +23,19 @@ declare(strict_types=1); namespace pocketmine\network\mcpe; -use pocketmine\network\mcpe\protocol\ClientboundPacket; +use pocketmine\network\mcpe\compression\ThresholdCompressor; use pocketmine\network\mcpe\protocol\serializer\PacketBatch; use pocketmine\network\mcpe\protocol\serializer\PacketSerializer; use pocketmine\Server; use pocketmine\utils\BinaryStream; -use function array_map; use function spl_object_id; +use function strlen; final class StandardPacketBroadcaster implements PacketBroadcaster{ public function __construct(private Server $server){} public function broadcastPackets(array $recipients, array $packets) : void{ - $batchBuffers = []; - - /** @var string[][] $packetBuffers */ + $packetBufferTotalLengths = []; $packetBuffers = []; $compressors = []; /** @var NetworkSession[][][] $targetMap */ @@ -45,13 +43,14 @@ final class StandardPacketBroadcaster implements PacketBroadcaster{ foreach($recipients as $recipient){ $serializerContext = $recipient->getPacketSerializerContext(); $bufferId = spl_object_id($serializerContext); - if(!isset($batchBuffers[$bufferId])){ - $packetBuffers[$bufferId] = array_map(function(ClientboundPacket $packet) use ($serializerContext) : string{ - return NetworkSession::encodePacketTimed(PacketSerializer::encoder($serializerContext), $packet); - }, $packets); - $stream = new BinaryStream(); - PacketBatch::encodeRaw($stream, $packetBuffers[$bufferId]); - $batchBuffers[$bufferId] = $stream->getBuffer(); + if(!isset($packetBuffers[$bufferId])){ + $packetBufferTotalLengths[$bufferId] = 0; + $packetBuffers[$bufferId] = []; + foreach($packets as $packet){ + $buffer = NetworkSession::encodePacketTimed(PacketSerializer::encoder($serializerContext), $packet); + $packetBufferTotalLengths[$bufferId] += strlen($buffer); + $packetBuffers[$bufferId][] = $buffer; + } } //TODO: different compressors might be compatible, it might not be necessary to split them up by object @@ -62,10 +61,29 @@ final class StandardPacketBroadcaster implements PacketBroadcaster{ } foreach($targetMap as $bufferId => $compressorMap){ - $batchBuffer = $batchBuffers[$bufferId]; foreach($compressorMap as $compressorId => $compressorTargets){ $compressor = $compressors[$compressorId]; - if(!$compressor->willCompress($batchBuffer)){ + + $batchBuffer = null; + if($compressor instanceof ThresholdCompressor){ + $threshold = $compressor->getCompressionThreshold(); + if($threshold !== null && $packetBufferTotalLengths[$bufferId] >= $threshold){ + //do not prepare shared batch unless we're sure it will be compressed + $stream = new BinaryStream(); + PacketBatch::encodeRaw($stream, $packetBuffers[$bufferId]); + $batchBuffer = $stream->getBuffer(); + } + }else{ + //this is a legacy compressor, so we have to encode the batch and check if it will compress + $stream = new BinaryStream(); + PacketBatch::encodeRaw($stream, $packetBuffers[$bufferId]); + $tempBatchBuffer = $stream->getBuffer(); + if($compressor->willCompress($tempBatchBuffer)){ + $batchBuffer = $tempBatchBuffer; + } + } + + if($batchBuffer === null){ foreach($compressorTargets as $target){ foreach($packetBuffers[$bufferId] as $packetBuffer){ $target->addToSendBuffer($packetBuffer); diff --git a/src/network/mcpe/compression/ThresholdCompressor.php b/src/network/mcpe/compression/ThresholdCompressor.php new file mode 100644 index 000000000..4c3c0ad55 --- /dev/null +++ b/src/network/mcpe/compression/ThresholdCompressor.php @@ -0,0 +1,31 @@ +minCompressionSize; + } + public function willCompress(string $data) : bool{ - return $this->minCompressionSize > -1 && strlen($data) >= $this->minCompressionSize; + return $this->minCompressionSize !== null && strlen($data) >= $this->minCompressionSize; } /**