call(); if($ev->isCancelled()){ return; } $packets = $ev->getPackets(); } $compressors = []; /** @var NetworkSession[][] $targetsByCompressor */ $targetsByCompressor = []; foreach($recipients as $recipient){ if($recipient->getPacketSerializerContext() !== $this->protocolContext){ throw new \InvalidArgumentException("Only recipients with the same protocol context as the broadcaster can be broadcast to by this broadcaster"); } //TODO: different compressors might be compatible, it might not be necessary to split them up by object $compressor = $recipient->getCompressor(); $compressors[spl_object_id($compressor)] = $compressor; $targetsByCompressor[spl_object_id($compressor)][] = $recipient; } $totalLength = 0; $packetBuffers = []; foreach($packets as $packet){ $buffer = NetworkSession::encodePacketTimed(PacketSerializer::encoder($this->protocolContext), $packet); //varint length prefix + packet buffer $totalLength += (((int) log(strlen($buffer), 128)) + 1) + strlen($buffer); $packetBuffers[] = $buffer; } foreach($targetsByCompressor as $compressorId => $compressorTargets){ $compressor = $compressors[$compressorId]; $threshold = $compressor->getCompressionThreshold(); if(count($compressorTargets) > 1 && $threshold !== null && $totalLength >= $threshold){ //do not prepare shared batch unless we're sure it will be compressed $stream = new BinaryStream(); PacketBatch::encodeRaw($stream, $packetBuffers); $batchBuffer = $stream->getBuffer(); $batch = $this->server->prepareBatch($batchBuffer, $compressor, timings: Timings::$playerNetworkSendCompressBroadcast); foreach($compressorTargets as $target){ $target->queueCompressed($batch); } }else{ foreach($compressorTargets as $target){ foreach($packetBuffers as $packetBuffer){ $target->addToSendBuffer($packetBuffer); } } } } } }