NetworkSession: immediate-send now causes a buffer flush when the packet is ready

instead of skipping queues and forcing sync compression as previously seen.

this maintains proper packet order and allows immediate-flush to be used to reduce latency in-game.

Small servers won't notice any difference, but for larger ones it may make a difference, since the buffer time effectively depends on the amount of load RakLib is under.
closes #3325
This commit is contained in:
Dylan K. Taylor 2025-04-05 17:40:48 +01:00
parent 673b39e2a1
commit 071c15d7de
No known key found for this signature in database
GPG Key ID: 8927471A91CAFD3D

View File

@ -174,7 +174,7 @@ class NetworkSession{
*/ */
private array $sendBufferAckPromises = []; private array $sendBufferAckPromises = [];
/** @phpstan-var \SplQueue<array{CompressBatchPromise|string, list<PromiseResolver<true>>}> */ /** @phpstan-var \SplQueue<array{CompressBatchPromise|string, list<PromiseResolver<true>>, bool}> */
private \SplQueue $compressedQueue; private \SplQueue $compressedQueue;
private bool $forceAsyncCompression = true; private bool $forceAsyncCompression = true;
private bool $enableCompression = false; //disabled until handshake completed private bool $enableCompression = false; //disabled until handshake completed
@ -235,7 +235,7 @@ class NetworkSession{
private function onSessionStartSuccess() : void{ private function onSessionStartSuccess() : void{
$this->logger->debug("Session start handshake completed, awaiting login packet"); $this->logger->debug("Session start handshake completed, awaiting login packet");
$this->flushSendBuffer(true); $this->flushGamePacketQueue();
$this->enableCompression = true; $this->enableCompression = true;
$this->setHandler(new LoginPacketHandler( $this->setHandler(new LoginPacketHandler(
$this->server, $this->server,
@ -529,7 +529,7 @@ class NetworkSession{
$this->addToSendBuffer(self::encodePacketTimed(PacketSerializer::encoder(), $evPacket)); $this->addToSendBuffer(self::encodePacketTimed(PacketSerializer::encoder(), $evPacket));
} }
if($immediate){ if($immediate){
$this->flushSendBuffer(true); $this->flushGamePacketQueue();
} }
return true; return true;
@ -577,14 +577,12 @@ class NetworkSession{
$this->sendBuffer[] = $buffer; $this->sendBuffer[] = $buffer;
} }
private function flushSendBuffer(bool $immediate = false) : void{ private function flushGamePacketQueue() : void{
if(count($this->sendBuffer) > 0){ if(count($this->sendBuffer) > 0){
Timings::$playerNetworkSend->startTiming(); Timings::$playerNetworkSend->startTiming();
try{ try{
$syncMode = null; //automatic $syncMode = null; //automatic
if($immediate){ if($this->forceAsyncCompression){
$syncMode = true;
}elseif($this->forceAsyncCompression){
$syncMode = false; $syncMode = false;
} }
@ -599,7 +597,9 @@ class NetworkSession{
$this->sendBuffer = []; $this->sendBuffer = [];
$ackPromises = $this->sendBufferAckPromises; $ackPromises = $this->sendBufferAckPromises;
$this->sendBufferAckPromises = []; $this->sendBufferAckPromises = [];
$this->queueCompressedNoBufferFlush($batch, $immediate, $ackPromises); //these packets were already potentially buffered for up to 50ms - make sure the transport layer doesn't
//delay them any longer
$this->queueCompressedNoGamePacketFlush($batch, networkFlush: true, ackPromises: $ackPromises);
}finally{ }finally{
Timings::$playerNetworkSend->stopTiming(); Timings::$playerNetworkSend->stopTiming();
} }
@ -619,8 +619,10 @@ class NetworkSession{
public function queueCompressed(CompressBatchPromise|string $payload, bool $immediate = false) : void{ public function queueCompressed(CompressBatchPromise|string $payload, bool $immediate = false) : void{
Timings::$playerNetworkSend->startTiming(); Timings::$playerNetworkSend->startTiming();
try{ try{
$this->flushSendBuffer($immediate); //Maintain ordering if possible //if the next packet causes a flush, avoid unnecessarily flushing twice
$this->queueCompressedNoBufferFlush($payload, $immediate); //however, if the next packet does *not* cause a flush, game packets should be flushed to avoid delays
$this->flushGamePacketQueue();
$this->queueCompressedNoGamePacketFlush($payload, $immediate);
}finally{ }finally{
Timings::$playerNetworkSend->stopTiming(); Timings::$playerNetworkSend->stopTiming();
} }
@ -631,22 +633,13 @@ class NetworkSession{
* *
* @phpstan-param list<PromiseResolver<true>> $ackPromises * @phpstan-param list<PromiseResolver<true>> $ackPromises
*/ */
private function queueCompressedNoBufferFlush(CompressBatchPromise|string $batch, bool $immediate = false, array $ackPromises = []) : void{ private function queueCompressedNoGamePacketFlush(CompressBatchPromise|string $batch, bool $networkFlush = false, array $ackPromises = []) : void{
Timings::$playerNetworkSend->startTiming(); Timings::$playerNetworkSend->startTiming();
try{ try{
$this->compressedQueue->enqueue([$batch, $ackPromises, $networkFlush]);
if(is_string($batch)){ if(is_string($batch)){
if($immediate){ $this->flushCompressedQueue();
//Skips all queues
$this->sendEncoded($batch, true, $ackPromises);
}else{
$this->compressedQueue->enqueue([$batch, $ackPromises]);
$this->flushCompressedQueue();
}
}elseif($immediate){
//Skips all queues
$this->sendEncoded($batch->getResult(), true, $ackPromises);
}else{ }else{
$this->compressedQueue->enqueue([$batch, $ackPromises]);
$batch->onResolve(function() : void{ $batch->onResolve(function() : void{
if($this->connected){ if($this->connected){
$this->flushCompressedQueue(); $this->flushCompressedQueue();
@ -663,14 +656,14 @@ class NetworkSession{
try{ try{
while(!$this->compressedQueue->isEmpty()){ while(!$this->compressedQueue->isEmpty()){
/** @var CompressBatchPromise|string $current */ /** @var CompressBatchPromise|string $current */
[$current, $ackPromises] = $this->compressedQueue->bottom(); [$current, $ackPromises, $networkFlush] = $this->compressedQueue->bottom();
if(is_string($current)){ if(is_string($current)){
$this->compressedQueue->dequeue(); $this->compressedQueue->dequeue();
$this->sendEncoded($current, false, $ackPromises); $this->sendEncoded($current, $networkFlush, $ackPromises);
}elseif($current->hasResult()){ }elseif($current->hasResult()){
$this->compressedQueue->dequeue(); $this->compressedQueue->dequeue();
$this->sendEncoded($current->getResult(), false, $ackPromises); $this->sendEncoded($current->getResult(), $networkFlush, $ackPromises);
}else{ }else{
//can't send any more queued until this one is ready //can't send any more queued until this one is ready
@ -710,7 +703,7 @@ class NetworkSession{
$this->disconnectGuard = true; $this->disconnectGuard = true;
$func(); $func();
$this->disconnectGuard = false; $this->disconnectGuard = false;
$this->flushSendBuffer(true); $this->flushGamePacketQueue();
$this->sender->close(""); $this->sender->close("");
foreach($this->disposeHooks as $callback){ foreach($this->disposeHooks as $callback){
$callback(); $callback();
@ -1345,6 +1338,6 @@ class NetworkSession{
Timings::$playerNetworkSendInventorySync->stopTiming(); Timings::$playerNetworkSendInventorySync->stopTiming();
} }
$this->flushSendBuffer(); $this->flushGamePacketQueue();
} }
} }