Merge branch 'next-minor' into next-major

This commit is contained in:
Dylan K. Taylor 2023-02-22 22:51:51 +00:00
commit 5854b1c8c2
No known key found for this signature in database
GPG Key ID: 8927471A91CAFD3D
5 changed files with 93 additions and 36 deletions

View File

@ -887,6 +887,9 @@ class Server{
if($this->configGroup->getPropertyInt("network.batch-threshold", 256) >= 0){
$netCompressionThreshold = $this->configGroup->getPropertyInt("network.batch-threshold", 256);
}
if($netCompressionThreshold < 0){
$netCompressionThreshold = null;
}
$netCompressionLevel = $this->configGroup->getPropertyInt("network.compression-level", 6);
if($netCompressionLevel < 1 || $netCompressionLevel > 9){

View File

@ -178,7 +178,7 @@ class NetworkSession{
private ?EncryptionContext $cipher = null;
/** @var ClientboundPacket[] */
/** @var string[] */
private array $sendBuffer = [];
/**
@ -266,23 +266,6 @@ class NetworkSession{
);
}
/**
* @param ClientboundPacket[] $packets
*/
public static function encodePacketBatchTimed(BinaryStream $stream, PacketSerializerContext $context, array $packets) : void{
PacketBatch::encodeRaw($stream, array_map(function(ClientboundPacket $packet) use ($context) : string{
$timings = Timings::getEncodeDataPacketTimings($packet);
$timings->startTiming();
try{
$stream = PacketSerializer::encoder($context);
$packet->encode($stream);
return $stream->getBuffer();
}finally{
$timings->stopTiming();
}
}, $packets));
}
private function onPlayerCreated(Player $player) : void{
if(!$this->isConnected()){
//the remote player might have disconnected before spawn terrain generation was finished
@ -502,7 +485,7 @@ class NetworkSession{
$packets = $ev->getPackets();
foreach($packets as $evPacket){
$this->addToSendBuffer($evPacket);
$this->addToSendBuffer(self::encodePacketTimed(PacketSerializer::encoder($this->packetSerializerContext), $evPacket));
}
if($immediate){
$this->flushSendBuffer(true);
@ -517,16 +500,24 @@ class NetworkSession{
/**
* @internal
*/
public function addToSendBuffer(ClientboundPacket $packet) : void{
$timings = Timings::getSendDataPacketTimings($packet);
public static function encodePacketTimed(PacketSerializer $serializer, ClientboundPacket $packet) : string{
$timings = Timings::getEncodeDataPacketTimings($packet);
$timings->startTiming();
try{
$this->sendBuffer[] = $packet;
$packet->encode($serializer);
return $serializer->getBuffer();
}finally{
$timings->stopTiming();
}
}
/**
* @internal
*/
public function addToSendBuffer(string $buffer) : void{
$this->sendBuffer[] = $buffer;
}
private function flushSendBuffer(bool $immediate = false) : void{
if(count($this->sendBuffer) > 0){
Timings::$playerNetworkSend->startTiming();
@ -539,7 +530,7 @@ class NetworkSession{
}
$stream = new BinaryStream();
self::encodePacketBatchTimed($stream, $this->packetSerializerContext, $this->sendBuffer);
PacketBatch::encodeRaw($stream, $this->sendBuffer);
if($this->enableCompression){
$promise = $this->server->prepareBatch(new PacketBatch($stream->getBuffer()), $this->compressor, $syncMode);

View File

@ -23,25 +23,34 @@ declare(strict_types=1);
namespace pocketmine\network\mcpe;
use pocketmine\network\mcpe\compression\ThresholdCompressor;
use pocketmine\network\mcpe\protocol\serializer\PacketBatch;
use pocketmine\network\mcpe\protocol\serializer\PacketSerializer;
use pocketmine\Server;
use pocketmine\utils\BinaryStream;
use function spl_object_id;
use function strlen;
final class StandardPacketBroadcaster implements PacketBroadcaster{
public function __construct(private Server $server){}
public function broadcastPackets(array $recipients, array $packets) : void{
$buffers = [];
$packetBufferTotalLengths = [];
$packetBuffers = [];
$compressors = [];
/** @var NetworkSession[][][] $targetMap */
$targetMap = [];
foreach($recipients as $recipient){
$serializerContext = $recipient->getPacketSerializerContext();
$bufferId = spl_object_id($serializerContext);
if(!isset($buffers[$bufferId])){
$stream = new BinaryStream();
NetworkSession::encodePacketBatchTimed($stream, $serializerContext, $packets);
$buffers[$bufferId] = $stream->getBuffer();
if(!isset($packetBuffers[$bufferId])){
$packetBufferTotalLengths[$bufferId] = 0;
$packetBuffers[$bufferId] = [];
foreach($packets as $packet){
$buffer = NetworkSession::encodePacketTimed(PacketSerializer::encoder($serializerContext), $packet);
$packetBufferTotalLengths[$bufferId] += strlen($buffer);
$packetBuffers[$bufferId][] = $buffer;
}
}
//TODO: different compressors might be compatible, it might not be necessary to split them up by object
@ -52,17 +61,36 @@ final class StandardPacketBroadcaster implements PacketBroadcaster{
}
foreach($targetMap as $bufferId => $compressorMap){
$buffer = $buffers[$bufferId];
foreach($compressorMap as $compressorId => $compressorTargets){
$compressor = $compressors[$compressorId];
if(!$compressor->willCompress($buffer)){
$batchBuffer = null;
if($compressor instanceof ThresholdCompressor){
$threshold = $compressor->getCompressionThreshold();
if($threshold !== null && $packetBufferTotalLengths[$bufferId] >= $threshold){
//do not prepare shared batch unless we're sure it will be compressed
$stream = new BinaryStream();
PacketBatch::encodeRaw($stream, $packetBuffers[$bufferId]);
$batchBuffer = $stream->getBuffer();
}
}else{
//this is a legacy compressor, so we have to encode the batch and check if it will compress
$stream = new BinaryStream();
PacketBatch::encodeRaw($stream, $packetBuffers[$bufferId]);
$tempBatchBuffer = $stream->getBuffer();
if($compressor->willCompress($tempBatchBuffer)){
$batchBuffer = $tempBatchBuffer;
}
}
if($batchBuffer === null){
foreach($compressorTargets as $target){
foreach($packets as $pk){
$target->addToSendBuffer($pk);
foreach($packetBuffers[$bufferId] as $packetBuffer){
$target->addToSendBuffer($packetBuffer);
}
}
}else{
$promise = $this->server->prepareBatch(new PacketBatch($buffer), $compressor);
$promise = $this->server->prepareBatch(new PacketBatch($batchBuffer), $compressor);
foreach($compressorTargets as $target){
$target->queueCompressed($promise);
}

View File

@ -0,0 +1,31 @@
<?php
/*
*
* ____ _ _ __ __ _ __ __ ____
* | _ \ ___ ___| | _____| |_| \/ (_)_ __ ___ | \/ | _ \
* | |_) / _ \ / __| |/ / _ \ __| |\/| | | '_ \ / _ \_____| |\/| | |_) |
* | __/ (_) | (__| < __/ |_| | | | | | | | __/_____| | | | __/
* |_| \___/ \___|_|\_\___|\__|_| |_|_|_| |_|\___| |_| |_|_|
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* @author PocketMine Team
* @link http://www.pocketmine.net/
*
*
*/
declare(strict_types=1);
namespace pocketmine\network\mcpe\compression;
interface ThresholdCompressor extends Compressor{
/**
* Returns the minimum size of packet batch that will be compressed. Null means that no packets will be compressed.
*/
public function getCompressionThreshold() : ?int;
}

View File

@ -32,7 +32,7 @@ use function zlib_decode;
use function zlib_encode;
use const ZLIB_ENCODING_RAW;
final class ZlibCompressor implements Compressor{
final class ZlibCompressor implements ThresholdCompressor{
use SingletonTrait;
public const DEFAULT_LEVEL = 7;
@ -48,12 +48,16 @@ final class ZlibCompressor implements Compressor{
public function __construct(
private int $level,
private int $minCompressionSize,
private ?int $minCompressionSize,
private int $maxDecompressionSize
){}
public function getCompressionThreshold() : ?int{
return $this->minCompressionSize;
}
public function willCompress(string $data) : bool{
return $this->minCompressionSize > -1 && strlen($data) >= $this->minCompressionSize;
return $this->minCompressionSize !== null && strlen($data) >= $this->minCompressionSize;
}
/**