diff --git a/src/network/mcpe/NetworkSession.php b/src/network/mcpe/NetworkSession.php index 8c457ed40..bea3f8131 100644 --- a/src/network/mcpe/NetworkSession.php +++ b/src/network/mcpe/NetworkSession.php @@ -174,7 +174,7 @@ class NetworkSession{ */ private array $sendBufferAckPromises = []; - /** @phpstan-var \SplQueue>}> */ + /** @phpstan-var \SplQueue>, bool}> */ private \SplQueue $compressedQueue; private bool $forceAsyncCompression = true; private bool $enableCompression = false; //disabled until handshake completed @@ -235,7 +235,7 @@ class NetworkSession{ private function onSessionStartSuccess() : void{ $this->logger->debug("Session start handshake completed, awaiting login packet"); - $this->flushSendBuffer(true); + $this->flushGamePacketQueue(); $this->enableCompression = true; $this->setHandler(new LoginPacketHandler( $this->server, @@ -529,7 +529,7 @@ class NetworkSession{ $this->addToSendBuffer(self::encodePacketTimed(PacketSerializer::encoder(), $evPacket)); } if($immediate){ - $this->flushSendBuffer(true); + $this->flushGamePacketQueue(); } return true; @@ -577,14 +577,12 @@ class NetworkSession{ $this->sendBuffer[] = $buffer; } - private function flushSendBuffer(bool $immediate = false) : void{ + private function flushGamePacketQueue() : void{ if(count($this->sendBuffer) > 0){ Timings::$playerNetworkSend->startTiming(); try{ $syncMode = null; //automatic - if($immediate){ - $syncMode = true; - }elseif($this->forceAsyncCompression){ + if($this->forceAsyncCompression){ $syncMode = false; } @@ -599,7 +597,9 @@ class NetworkSession{ $this->sendBuffer = []; $ackPromises = $this->sendBufferAckPromises; $this->sendBufferAckPromises = []; - $this->queueCompressedNoBufferFlush($batch, $immediate, $ackPromises); + //these packets were already potentially buffered for up to 50ms - make sure the transport layer doesn't + //delay them any longer + $this->queueCompressedNoGamePacketFlush($batch, networkFlush: true, ackPromises: $ackPromises); }finally{ Timings::$playerNetworkSend->stopTiming(); } @@ -619,8 +619,10 @@ class NetworkSession{ public function queueCompressed(CompressBatchPromise|string $payload, bool $immediate = false) : void{ Timings::$playerNetworkSend->startTiming(); try{ - $this->flushSendBuffer($immediate); //Maintain ordering if possible - $this->queueCompressedNoBufferFlush($payload, $immediate); + //if the next packet causes a flush, avoid unnecessarily flushing twice + //however, if the next packet does *not* cause a flush, game packets should be flushed to avoid delays + $this->flushGamePacketQueue(); + $this->queueCompressedNoGamePacketFlush($payload, $immediate); }finally{ Timings::$playerNetworkSend->stopTiming(); } @@ -631,22 +633,13 @@ class NetworkSession{ * * @phpstan-param list> $ackPromises */ - private function queueCompressedNoBufferFlush(CompressBatchPromise|string $batch, bool $immediate = false, array $ackPromises = []) : void{ + private function queueCompressedNoGamePacketFlush(CompressBatchPromise|string $batch, bool $networkFlush = false, array $ackPromises = []) : void{ Timings::$playerNetworkSend->startTiming(); try{ + $this->compressedQueue->enqueue([$batch, $ackPromises, $networkFlush]); if(is_string($batch)){ - if($immediate){ - //Skips all queues - $this->sendEncoded($batch, true, $ackPromises); - }else{ - $this->compressedQueue->enqueue([$batch, $ackPromises]); - $this->flushCompressedQueue(); - } - }elseif($immediate){ - //Skips all queues - $this->sendEncoded($batch->getResult(), true, $ackPromises); + $this->flushCompressedQueue(); }else{ - $this->compressedQueue->enqueue([$batch, $ackPromises]); $batch->onResolve(function() : void{ if($this->connected){ $this->flushCompressedQueue(); @@ -663,14 +656,14 @@ class NetworkSession{ try{ while(!$this->compressedQueue->isEmpty()){ /** @var CompressBatchPromise|string $current */ - [$current, $ackPromises] = $this->compressedQueue->bottom(); + [$current, $ackPromises, $networkFlush] = $this->compressedQueue->bottom(); if(is_string($current)){ $this->compressedQueue->dequeue(); - $this->sendEncoded($current, false, $ackPromises); + $this->sendEncoded($current, $networkFlush, $ackPromises); }elseif($current->hasResult()){ $this->compressedQueue->dequeue(); - $this->sendEncoded($current->getResult(), false, $ackPromises); + $this->sendEncoded($current->getResult(), $networkFlush, $ackPromises); }else{ //can't send any more queued until this one is ready @@ -710,7 +703,7 @@ class NetworkSession{ $this->disconnectGuard = true; $func(); $this->disconnectGuard = false; - $this->flushSendBuffer(true); + $this->flushGamePacketQueue(); $this->sender->close(""); foreach($this->disposeHooks as $callback){ $callback(); @@ -1345,6 +1338,6 @@ class NetworkSession{ Timings::$playerNetworkSendInventorySync->stopTiming(); } - $this->flushSendBuffer(); + $this->flushGamePacketQueue(); } }