Avoid unnecessary CompressBatchPromise allocations for sync-prepared batches

Sync-prepared batches account for the vast majority of outbound packets. Avoiding these useless objects further reduces the overhead of zero-compressed packets, as the creation of these objects is a significant part of the overhead for these cases.

closes #6157
This commit is contained in:
Dylan K. Taylor 2023-11-17 12:35:42 +00:00
parent 519784460f
commit bc07778434
No known key found for this signature in database
GPG Key ID: 8927471A91CAFD3D
3 changed files with 52 additions and 40 deletions

View File

@ -1368,7 +1368,7 @@ class Server{
*
* @param bool|null $sync Compression on the main thread (true) or workers (false). Default is automatic (null).
*/
public function prepareBatch(string $buffer, Compressor $compressor, ?bool $sync = null, ?TimingsHandler $timings = null) : CompressBatchPromise{
public function prepareBatch(string $buffer, Compressor $compressor, ?bool $sync = null, ?TimingsHandler $timings = null) : CompressBatchPromise|string{
$timings ??= Timings::$playerNetworkSendCompress;
try{
$timings->startTiming();
@ -1378,15 +1378,14 @@ class Server{
$sync = !$this->networkCompressionAsync || $threshold === null || strlen($buffer) < $threshold;
}
$promise = new CompressBatchPromise();
if(!$sync && strlen($buffer) >= $this->networkCompressionAsyncThreshold){
$promise = new CompressBatchPromise();
$task = new CompressBatchTask($buffer, $promise, $compressor);
$this->asyncPool->submitTask($task);
}else{
$promise->resolve($compressor->compress($buffer));
return $promise;
}
return $promise;
return $compressor->compress($buffer);
}finally{
$timings->stopTiming();
}

View File

@ -117,6 +117,7 @@ use function count;
use function get_class;
use function implode;
use function in_array;
use function is_string;
use function json_encode;
use function random_bytes;
use function str_split;
@ -158,8 +159,8 @@ class NetworkSession{
private array $sendBuffer = [];
/**
* @var \SplQueue|CompressBatchPromise[]
* @phpstan-var \SplQueue<CompressBatchPromise>
* @var \SplQueue|CompressBatchPromise[]|string[]
* @phpstan-var \SplQueue<CompressBatchPromise|string>
*/
private \SplQueue $compressedQueue;
private bool $forceAsyncCompression = true;
@ -525,13 +526,12 @@ class NetworkSession{
PacketBatch::encodeRaw($stream, $this->sendBuffer);
if($this->enableCompression){
$promise = $this->server->prepareBatch($stream->getBuffer(), $this->compressor, $syncMode, Timings::$playerNetworkSendCompressSessionBuffer);
$batch = $this->server->prepareBatch($stream->getBuffer(), $this->compressor, $syncMode, Timings::$playerNetworkSendCompressSessionBuffer);
}else{
$promise = new CompressBatchPromise();
$promise->resolve($stream->getBuffer());
$batch = $stream->getBuffer();
}
$this->sendBuffer = [];
$this->queueCompressedNoBufferFlush($promise, $immediate);
$this->queueCompressedNoBufferFlush($batch, $immediate);
}finally{
Timings::$playerNetworkSend->stopTiming();
}
@ -550,7 +550,7 @@ class NetworkSession{
public function getTypeConverter() : TypeConverter{ return $this->typeConverter; }
public function queueCompressed(CompressBatchPromise $payload, bool $immediate = false) : void{
public function queueCompressed(CompressBatchPromise|string $payload, bool $immediate = false) : void{
Timings::$playerNetworkSend->startTiming();
try{
$this->flushSendBuffer($immediate); //Maintain ordering if possible
@ -560,36 +560,25 @@ class NetworkSession{
}
}
private function queueCompressedNoBufferFlush(CompressBatchPromise $payload, bool $immediate = false) : void{
private function queueCompressedNoBufferFlush(CompressBatchPromise|string $batch, bool $immediate = false) : void{
Timings::$playerNetworkSend->startTiming();
try{
if($immediate){
if(is_string($batch)){
if($immediate){
//Skips all queues
$this->sendEncoded($batch, true);
}else{
$this->compressedQueue->enqueue($batch);
$this->flushCompressedQueue();
}
}elseif($immediate){
//Skips all queues
$this->sendEncoded($payload->getResult(), true);
$this->sendEncoded($batch->getResult(), true);
}else{
$this->compressedQueue->enqueue($payload);
$payload->onResolve(function(CompressBatchPromise $payload) : void{
if($this->connected && $this->compressedQueue->bottom() === $payload){
Timings::$playerNetworkSend->startTiming();
try{
$this->compressedQueue->dequeue(); //result unused
$this->sendEncoded($payload->getResult());
while(!$this->compressedQueue->isEmpty()){
/** @var CompressBatchPromise $current */
$current = $this->compressedQueue->bottom();
if($current->hasResult()){
$this->compressedQueue->dequeue();
$this->sendEncoded($current->getResult());
}else{
//can't send any more queued until this one is ready
break;
}
}
}finally{
Timings::$playerNetworkSend->stopTiming();
}
$this->compressedQueue->enqueue($batch);
$batch->onResolve(function() : void{
if($this->connected){
$this->flushCompressedQueue();
}
});
}
@ -598,6 +587,30 @@ class NetworkSession{
}
}
private function flushCompressedQueue() : void{
Timings::$playerNetworkSend->startTiming();
try{
while(!$this->compressedQueue->isEmpty()){
/** @var CompressBatchPromise|string $current */
$current = $this->compressedQueue->bottom();
if(is_string($current)){
$this->compressedQueue->dequeue();
$this->sendEncoded($current);
}elseif($current->hasResult()){
$this->compressedQueue->dequeue();
$this->sendEncoded($current->getResult());
}else{
//can't send any more queued until this one is ready
break;
}
}
}finally{
Timings::$playerNetworkSend->stopTiming();
}
}
private function sendEncoded(string $payload, bool $immediate = false) : void{
if($this->cipher !== null){
Timings::$playerNetworkSendEncrypt->startTiming();

View File

@ -88,9 +88,9 @@ final class StandardPacketBroadcaster implements PacketBroadcaster{
PacketBatch::encodeRaw($stream, $packetBuffers);
$batchBuffer = $stream->getBuffer();
$promise = $this->server->prepareBatch($batchBuffer, $compressor, timings: Timings::$playerNetworkSendCompressBroadcast);
$batch = $this->server->prepareBatch($batchBuffer, $compressor, timings: Timings::$playerNetworkSendCompressBroadcast);
foreach($compressorTargets as $target){
$target->queueCompressed($promise);
$target->queueCompressed($batch);
}
}else{
foreach($compressorTargets as $target){