Merge 'minor-next' into 'major-next'

Automatic merge performed by: https://github.com/pmmp/RestrictedActions/actions/runs/14287788168
This commit is contained in:
pmmp-admin-bot[bot] 2025-04-06 01:43:19 +00:00
commit 94caea97e0

View File

@ -174,7 +174,7 @@ class NetworkSession{
*/
private array $sendBufferAckPromises = [];
/** @phpstan-var \SplQueue<array{CompressBatchPromise|string, list<PromiseResolver<true>>}> */
/** @phpstan-var \SplQueue<array{CompressBatchPromise|string, list<PromiseResolver<true>>, bool}> */
private \SplQueue $compressedQueue;
private bool $forceAsyncCompression = true;
private bool $enableCompression = false; //disabled until handshake completed
@ -235,7 +235,7 @@ class NetworkSession{
private function onSessionStartSuccess() : void{
$this->logger->debug("Session start handshake completed, awaiting login packet");
$this->flushSendBuffer(true);
$this->flushGamePacketQueue();
$this->enableCompression = true;
$this->setHandler(new LoginPacketHandler(
$this->server,
@ -529,7 +529,7 @@ class NetworkSession{
$this->addToSendBuffer(self::encodePacketTimed(PacketSerializer::encoder(), $evPacket));
}
if($immediate){
$this->flushSendBuffer(true);
$this->flushGamePacketQueue();
}
return true;
@ -577,14 +577,12 @@ class NetworkSession{
$this->sendBuffer[] = $buffer;
}
private function flushSendBuffer(bool $immediate = false) : void{
private function flushGamePacketQueue() : void{
if(count($this->sendBuffer) > 0){
Timings::$playerNetworkSend->startTiming();
try{
$syncMode = null; //automatic
if($immediate){
$syncMode = true;
}elseif($this->forceAsyncCompression){
if($this->forceAsyncCompression){
$syncMode = false;
}
@ -599,7 +597,9 @@ class NetworkSession{
$this->sendBuffer = [];
$ackPromises = $this->sendBufferAckPromises;
$this->sendBufferAckPromises = [];
$this->queueCompressedNoBufferFlush($batch, $immediate, $ackPromises);
//these packets were already potentially buffered for up to 50ms - make sure the transport layer doesn't
//delay them any longer
$this->queueCompressedNoGamePacketFlush($batch, networkFlush: true, ackPromises: $ackPromises);
}finally{
Timings::$playerNetworkSend->stopTiming();
}
@ -619,8 +619,10 @@ class NetworkSession{
public function queueCompressed(CompressBatchPromise|string $payload, bool $immediate = false) : void{
Timings::$playerNetworkSend->startTiming();
try{
$this->flushSendBuffer($immediate); //Maintain ordering if possible
$this->queueCompressedNoBufferFlush($payload, $immediate);
//if the next packet causes a flush, avoid unnecessarily flushing twice
//however, if the next packet does *not* cause a flush, game packets should be flushed to avoid delays
$this->flushGamePacketQueue();
$this->queueCompressedNoGamePacketFlush($payload, $immediate);
}finally{
Timings::$playerNetworkSend->stopTiming();
}
@ -631,22 +633,13 @@ class NetworkSession{
*
* @phpstan-param list<PromiseResolver<true>> $ackPromises
*/
private function queueCompressedNoBufferFlush(CompressBatchPromise|string $batch, bool $immediate = false, array $ackPromises = []) : void{
private function queueCompressedNoGamePacketFlush(CompressBatchPromise|string $batch, bool $networkFlush = false, array $ackPromises = []) : void{
Timings::$playerNetworkSend->startTiming();
try{
$this->compressedQueue->enqueue([$batch, $ackPromises, $networkFlush]);
if(is_string($batch)){
if($immediate){
//Skips all queues
$this->sendEncoded($batch, true, $ackPromises);
}else{
$this->compressedQueue->enqueue([$batch, $ackPromises]);
$this->flushCompressedQueue();
}
}elseif($immediate){
//Skips all queues
$this->sendEncoded($batch->getResult(), true, $ackPromises);
$this->flushCompressedQueue();
}else{
$this->compressedQueue->enqueue([$batch, $ackPromises]);
$batch->onResolve(function() : void{
if($this->connected){
$this->flushCompressedQueue();
@ -663,14 +656,14 @@ class NetworkSession{
try{
while(!$this->compressedQueue->isEmpty()){
/** @var CompressBatchPromise|string $current */
[$current, $ackPromises] = $this->compressedQueue->bottom();
[$current, $ackPromises, $networkFlush] = $this->compressedQueue->bottom();
if(is_string($current)){
$this->compressedQueue->dequeue();
$this->sendEncoded($current, false, $ackPromises);
$this->sendEncoded($current, $networkFlush, $ackPromises);
}elseif($current->hasResult()){
$this->compressedQueue->dequeue();
$this->sendEncoded($current->getResult(), false, $ackPromises);
$this->sendEncoded($current->getResult(), $networkFlush, $ackPromises);
}else{
//can't send any more queued until this one is ready
@ -710,7 +703,7 @@ class NetworkSession{
$this->disconnectGuard = true;
$func();
$this->disconnectGuard = false;
$this->flushSendBuffer(true);
$this->flushGamePacketQueue();
$this->sender->close("");
foreach($this->disposeHooks as $callback){
$callback();
@ -1345,6 +1338,6 @@ class NetworkSession{
Timings::$playerNetworkSendInventorySync->stopTiming();
}
$this->flushSendBuffer();
$this->flushGamePacketQueue();
}
}