diff --git a/src/network/mcpe/NetworkSession.php b/src/network/mcpe/NetworkSession.php index bcdefb9ba8..3fb7415514 100644 --- a/src/network/mcpe/NetworkSession.php +++ b/src/network/mcpe/NetworkSession.php @@ -42,7 +42,6 @@ use pocketmine\network\mcpe\cache\ChunkCache; use pocketmine\network\mcpe\compression\CompressBatchPromise; use pocketmine\network\mcpe\compression\Compressor; use pocketmine\network\mcpe\compression\DecompressionException; -use pocketmine\network\mcpe\convert\GlobalItemTypeDictionary; use pocketmine\network\mcpe\convert\SkinAdapterSingleton; use pocketmine\network\mcpe\convert\TypeConverter; use pocketmine\network\mcpe\encryption\DecryptionException; @@ -189,8 +188,6 @@ class NetworkSession{ private bool $forceAsyncCompression = true; private bool $enableCompression = false; //disabled until handshake completed - private PacketSerializerContext $packetSerializerContext; - private ?InventoryManager $invManager = null; /** @@ -203,6 +200,7 @@ class NetworkSession{ private Server $server, private NetworkSessionManager $manager, private PacketPool $packetPool, + private PacketSerializerContext $packetSerializerContext, private PacketSender $sender, private PacketBroadcaster $broadcaster, private Compressor $compressor, @@ -213,9 +211,6 @@ class NetworkSession{ $this->compressedQueue = new \SplQueue(); - //TODO: allow this to be injected - $this->packetSerializerContext = new PacketSerializerContext(GlobalItemTypeDictionary::getInstance()->getDictionary()); - $this->disposeHooks = new ObjectSet(); $this->connectTime = time(); diff --git a/src/network/mcpe/StandardPacketBroadcaster.php b/src/network/mcpe/StandardPacketBroadcaster.php index f1009d23b2..f83cff64df 100644 --- a/src/network/mcpe/StandardPacketBroadcaster.php +++ b/src/network/mcpe/StandardPacketBroadcaster.php @@ -25,6 +25,7 @@ namespace pocketmine\network\mcpe; use pocketmine\network\mcpe\protocol\serializer\PacketBatch; use pocketmine\network\mcpe\protocol\serializer\PacketSerializer; +use pocketmine\network\mcpe\protocol\serializer\PacketSerializerContext; use pocketmine\Server; use pocketmine\timings\Timings; use pocketmine\utils\BinaryStream; @@ -33,54 +34,54 @@ use function spl_object_id; use function strlen; final class StandardPacketBroadcaster implements PacketBroadcaster{ - public function __construct(private Server $server){} + public function __construct( + private Server $server, + private PacketSerializerContext $protocolContext + ){} public function broadcastPackets(array $recipients, array $packets) : void{ - $packetBufferTotalLengths = []; - $packetBuffers = []; $compressors = []; - /** @var NetworkSession[][][] $targetMap */ - $targetMap = []; + + /** @var NetworkSession[][] $targetsByCompressor */ + $targetsByCompressor = []; foreach($recipients as $recipient){ - $serializerContext = $recipient->getPacketSerializerContext(); - $bufferId = spl_object_id($serializerContext); - if(!isset($packetBuffers[$bufferId])){ - $packetBufferTotalLengths[$bufferId] = 0; - $packetBuffers[$bufferId] = []; - foreach($packets as $packet){ - $buffer = NetworkSession::encodePacketTimed(PacketSerializer::encoder($serializerContext), $packet); - $packetBufferTotalLengths[$bufferId] += strlen($buffer); - $packetBuffers[$bufferId][] = $buffer; - } + if($recipient->getPacketSerializerContext() !== $this->protocolContext){ + throw new \InvalidArgumentException("Only recipients with the same protocol context as the broadcaster can be broadcast to by this broadcaster"); } //TODO: different compressors might be compatible, it might not be necessary to split them up by object $compressor = $recipient->getCompressor(); $compressors[spl_object_id($compressor)] = $compressor; - $targetMap[$bufferId][spl_object_id($compressor)][] = $recipient; + $targetsByCompressor[spl_object_id($compressor)][] = $recipient; } - foreach($targetMap as $bufferId => $compressorMap){ - foreach($compressorMap as $compressorId => $compressorTargets){ - $compressor = $compressors[$compressorId]; + $totalLength = 0; + $packetBuffers = []; + foreach($packets as $packet){ + $buffer = NetworkSession::encodePacketTimed(PacketSerializer::encoder($this->protocolContext), $packet); + $totalLength += strlen($buffer); + $packetBuffers[] = $buffer; + } - $threshold = $compressor->getCompressionThreshold(); - if(count($compressorTargets) > 1 && $threshold !== null && $packetBufferTotalLengths[$bufferId] >= $threshold){ - //do not prepare shared batch unless we're sure it will be compressed - $stream = new BinaryStream(); - PacketBatch::encodeRaw($stream, $packetBuffers[$bufferId]); - $batchBuffer = $stream->getBuffer(); + foreach($targetsByCompressor as $compressorId => $compressorTargets){ + $compressor = $compressors[$compressorId]; - $promise = $this->server->prepareBatch(new PacketBatch($batchBuffer), $compressor, timings: Timings::$playerNetworkSendCompressBroadcast); - foreach($compressorTargets as $target){ - $target->queueCompressed($promise); - } - }else{ - foreach($compressorTargets as $target){ - foreach($packetBuffers[$bufferId] as $packetBuffer){ - $target->addToSendBuffer($packetBuffer); - } + $threshold = $compressor->getCompressionThreshold(); + if(count($compressorTargets) > 1 && $threshold !== null && $totalLength >= $threshold){ + //do not prepare shared batch unless we're sure it will be compressed + $stream = new BinaryStream(); + PacketBatch::encodeRaw($stream, $packetBuffers); + $batchBuffer = $stream->getBuffer(); + + $promise = $this->server->prepareBatch(new PacketBatch($batchBuffer), $compressor, timings: Timings::$playerNetworkSendCompressBroadcast); + foreach($compressorTargets as $target){ + $target->queueCompressed($promise); + } + }else{ + foreach($compressorTargets as $target){ + foreach($packetBuffers as $packetBuffer){ + $target->addToSendBuffer($packetBuffer); } } } diff --git a/src/network/mcpe/raklib/RakLibInterface.php b/src/network/mcpe/raklib/RakLibInterface.php index 412a741528..b0d1bc13dd 100644 --- a/src/network/mcpe/raklib/RakLibInterface.php +++ b/src/network/mcpe/raklib/RakLibInterface.php @@ -26,11 +26,13 @@ namespace pocketmine\network\mcpe\raklib; use pocketmine\lang\KnownTranslationFactory; use pocketmine\network\AdvancedNetworkInterface; use pocketmine\network\mcpe\compression\ZlibCompressor; +use pocketmine\network\mcpe\convert\GlobalItemTypeDictionary; use pocketmine\network\mcpe\convert\TypeConverter; use pocketmine\network\mcpe\NetworkSession; use pocketmine\network\mcpe\PacketBroadcaster; use pocketmine\network\mcpe\protocol\PacketPool; use pocketmine\network\mcpe\protocol\ProtocolInfo; +use pocketmine\network\mcpe\protocol\serializer\PacketSerializerContext; use pocketmine\network\mcpe\StandardPacketBroadcaster; use pocketmine\network\Network; use pocketmine\network\NetworkInterfaceStartException; @@ -79,6 +81,7 @@ class RakLibInterface implements ServerEventListener, AdvancedNetworkInterface{ private SleeperNotifier $sleeper; private PacketBroadcaster $broadcaster; + private PacketSerializerContext $packetSerializerContext; public function __construct(Server $server, string $ip, int $port, bool $ipV6){ $this->server = $server; @@ -108,7 +111,8 @@ class RakLibInterface implements ServerEventListener, AdvancedNetworkInterface{ new PthreadsChannelWriter($mainToThreadBuffer) ); - $this->broadcaster = new StandardPacketBroadcaster($this->server); + $this->packetSerializerContext = new PacketSerializerContext(GlobalItemTypeDictionary::getInstance()->getDictionary()); + $this->broadcaster = new StandardPacketBroadcaster($this->server, $this->packetSerializerContext); } public function start() : void{ @@ -173,6 +177,7 @@ class RakLibInterface implements ServerEventListener, AdvancedNetworkInterface{ $this->server, $this->network->getSessionManager(), PacketPool::getInstance(), + $this->packetSerializerContext, new RakLibPacketSender($sessionId, $this), $this->broadcaster, ZlibCompressor::getInstance(), //TODO: this shouldn't be hardcoded, but we might need the RakNet protocol version to select it