From bc07778434d34c4cf01a5159794ae0a2c33a8b00 Mon Sep 17 00:00:00 2001 From: "Dylan K. Taylor" Date: Fri, 17 Nov 2023 12:35:42 +0000 Subject: [PATCH] Avoid unnecessary CompressBatchPromise allocations for sync-prepared batches Sync-prepared batches account for the vast majority of outbound packets. Avoiding these useless objects further reduces the overhead of zero-compressed packets, as the creation of these objects is a significant part of the overhead for these cases. closes #6157 --- src/Server.php | 9 +-- src/network/mcpe/NetworkSession.php | 79 +++++++++++-------- .../mcpe/StandardPacketBroadcaster.php | 4 +- 3 files changed, 52 insertions(+), 40 deletions(-) diff --git a/src/Server.php b/src/Server.php index c490d7837..48f641947 100644 --- a/src/Server.php +++ b/src/Server.php @@ -1368,7 +1368,7 @@ class Server{ * * @param bool|null $sync Compression on the main thread (true) or workers (false). Default is automatic (null). */ - public function prepareBatch(string $buffer, Compressor $compressor, ?bool $sync = null, ?TimingsHandler $timings = null) : CompressBatchPromise{ + public function prepareBatch(string $buffer, Compressor $compressor, ?bool $sync = null, ?TimingsHandler $timings = null) : CompressBatchPromise|string{ $timings ??= Timings::$playerNetworkSendCompress; try{ $timings->startTiming(); @@ -1378,15 +1378,14 @@ class Server{ $sync = !$this->networkCompressionAsync || $threshold === null || strlen($buffer) < $threshold; } - $promise = new CompressBatchPromise(); if(!$sync && strlen($buffer) >= $this->networkCompressionAsyncThreshold){ + $promise = new CompressBatchPromise(); $task = new CompressBatchTask($buffer, $promise, $compressor); $this->asyncPool->submitTask($task); - }else{ - $promise->resolve($compressor->compress($buffer)); + return $promise; } - return $promise; + return $compressor->compress($buffer); }finally{ $timings->stopTiming(); } diff --git a/src/network/mcpe/NetworkSession.php b/src/network/mcpe/NetworkSession.php index 4f0152636..a930cfa55 100644 --- a/src/network/mcpe/NetworkSession.php +++ b/src/network/mcpe/NetworkSession.php @@ -117,6 +117,7 @@ use function count; use function get_class; use function implode; use function in_array; +use function is_string; use function json_encode; use function random_bytes; use function str_split; @@ -158,8 +159,8 @@ class NetworkSession{ private array $sendBuffer = []; /** - * @var \SplQueue|CompressBatchPromise[] - * @phpstan-var \SplQueue + * @var \SplQueue|CompressBatchPromise[]|string[] + * @phpstan-var \SplQueue */ private \SplQueue $compressedQueue; private bool $forceAsyncCompression = true; @@ -525,13 +526,12 @@ class NetworkSession{ PacketBatch::encodeRaw($stream, $this->sendBuffer); if($this->enableCompression){ - $promise = $this->server->prepareBatch($stream->getBuffer(), $this->compressor, $syncMode, Timings::$playerNetworkSendCompressSessionBuffer); + $batch = $this->server->prepareBatch($stream->getBuffer(), $this->compressor, $syncMode, Timings::$playerNetworkSendCompressSessionBuffer); }else{ - $promise = new CompressBatchPromise(); - $promise->resolve($stream->getBuffer()); + $batch = $stream->getBuffer(); } $this->sendBuffer = []; - $this->queueCompressedNoBufferFlush($promise, $immediate); + $this->queueCompressedNoBufferFlush($batch, $immediate); }finally{ Timings::$playerNetworkSend->stopTiming(); } @@ -550,7 +550,7 @@ class NetworkSession{ public function getTypeConverter() : TypeConverter{ return $this->typeConverter; } - public function queueCompressed(CompressBatchPromise $payload, bool $immediate = false) : void{ + public function queueCompressed(CompressBatchPromise|string $payload, bool $immediate = false) : void{ Timings::$playerNetworkSend->startTiming(); try{ $this->flushSendBuffer($immediate); //Maintain ordering if possible @@ -560,36 +560,25 @@ class NetworkSession{ } } - private function queueCompressedNoBufferFlush(CompressBatchPromise $payload, bool $immediate = false) : void{ + private function queueCompressedNoBufferFlush(CompressBatchPromise|string $batch, bool $immediate = false) : void{ Timings::$playerNetworkSend->startTiming(); try{ - if($immediate){ + if(is_string($batch)){ + if($immediate){ + //Skips all queues + $this->sendEncoded($batch, true); + }else{ + $this->compressedQueue->enqueue($batch); + $this->flushCompressedQueue(); + } + }elseif($immediate){ //Skips all queues - $this->sendEncoded($payload->getResult(), true); + $this->sendEncoded($batch->getResult(), true); }else{ - $this->compressedQueue->enqueue($payload); - $payload->onResolve(function(CompressBatchPromise $payload) : void{ - if($this->connected && $this->compressedQueue->bottom() === $payload){ - Timings::$playerNetworkSend->startTiming(); - try{ - $this->compressedQueue->dequeue(); //result unused - $this->sendEncoded($payload->getResult()); - - while(!$this->compressedQueue->isEmpty()){ - /** @var CompressBatchPromise $current */ - $current = $this->compressedQueue->bottom(); - if($current->hasResult()){ - $this->compressedQueue->dequeue(); - - $this->sendEncoded($current->getResult()); - }else{ - //can't send any more queued until this one is ready - break; - } - } - }finally{ - Timings::$playerNetworkSend->stopTiming(); - } + $this->compressedQueue->enqueue($batch); + $batch->onResolve(function() : void{ + if($this->connected){ + $this->flushCompressedQueue(); } }); } @@ -598,6 +587,30 @@ class NetworkSession{ } } + private function flushCompressedQueue() : void{ + Timings::$playerNetworkSend->startTiming(); + try{ + while(!$this->compressedQueue->isEmpty()){ + /** @var CompressBatchPromise|string $current */ + $current = $this->compressedQueue->bottom(); + if(is_string($current)){ + $this->compressedQueue->dequeue(); + $this->sendEncoded($current); + + }elseif($current->hasResult()){ + $this->compressedQueue->dequeue(); + $this->sendEncoded($current->getResult()); + + }else{ + //can't send any more queued until this one is ready + break; + } + } + }finally{ + Timings::$playerNetworkSend->stopTiming(); + } + } + private function sendEncoded(string $payload, bool $immediate = false) : void{ if($this->cipher !== null){ Timings::$playerNetworkSendEncrypt->startTiming(); diff --git a/src/network/mcpe/StandardPacketBroadcaster.php b/src/network/mcpe/StandardPacketBroadcaster.php index c200859fd..1de6f80fe 100644 --- a/src/network/mcpe/StandardPacketBroadcaster.php +++ b/src/network/mcpe/StandardPacketBroadcaster.php @@ -88,9 +88,9 @@ final class StandardPacketBroadcaster implements PacketBroadcaster{ PacketBatch::encodeRaw($stream, $packetBuffers); $batchBuffer = $stream->getBuffer(); - $promise = $this->server->prepareBatch($batchBuffer, $compressor, timings: Timings::$playerNetworkSendCompressBroadcast); + $batch = $this->server->prepareBatch($batchBuffer, $compressor, timings: Timings::$playerNetworkSendCompressBroadcast); foreach($compressorTargets as $target){ - $target->queueCompressed($promise); + $target->queueCompressed($batch); } }else{ foreach($compressorTargets as $target){