Merge branch 'minor-next' into major-next

This commit is contained in:
Dylan K. Taylor 2023-03-11 22:13:31 +00:00
commit f03afba10e
No known key found for this signature in database
GPG Key ID: 8927471A91CAFD3D
3 changed files with 42 additions and 41 deletions

View File

@ -42,7 +42,6 @@ use pocketmine\network\mcpe\cache\ChunkCache;
use pocketmine\network\mcpe\compression\CompressBatchPromise; use pocketmine\network\mcpe\compression\CompressBatchPromise;
use pocketmine\network\mcpe\compression\Compressor; use pocketmine\network\mcpe\compression\Compressor;
use pocketmine\network\mcpe\compression\DecompressionException; use pocketmine\network\mcpe\compression\DecompressionException;
use pocketmine\network\mcpe\convert\GlobalItemTypeDictionary;
use pocketmine\network\mcpe\convert\SkinAdapterSingleton; use pocketmine\network\mcpe\convert\SkinAdapterSingleton;
use pocketmine\network\mcpe\convert\TypeConverter; use pocketmine\network\mcpe\convert\TypeConverter;
use pocketmine\network\mcpe\encryption\DecryptionException; use pocketmine\network\mcpe\encryption\DecryptionException;
@ -189,8 +188,6 @@ class NetworkSession{
private bool $forceAsyncCompression = true; private bool $forceAsyncCompression = true;
private bool $enableCompression = false; //disabled until handshake completed private bool $enableCompression = false; //disabled until handshake completed
private PacketSerializerContext $packetSerializerContext;
private ?InventoryManager $invManager = null; private ?InventoryManager $invManager = null;
/** /**
@ -203,6 +200,7 @@ class NetworkSession{
private Server $server, private Server $server,
private NetworkSessionManager $manager, private NetworkSessionManager $manager,
private PacketPool $packetPool, private PacketPool $packetPool,
private PacketSerializerContext $packetSerializerContext,
private PacketSender $sender, private PacketSender $sender,
private PacketBroadcaster $broadcaster, private PacketBroadcaster $broadcaster,
private Compressor $compressor, private Compressor $compressor,
@ -213,9 +211,6 @@ class NetworkSession{
$this->compressedQueue = new \SplQueue(); $this->compressedQueue = new \SplQueue();
//TODO: allow this to be injected
$this->packetSerializerContext = new PacketSerializerContext(GlobalItemTypeDictionary::getInstance()->getDictionary());
$this->disposeHooks = new ObjectSet(); $this->disposeHooks = new ObjectSet();
$this->connectTime = time(); $this->connectTime = time();

View File

@ -25,6 +25,7 @@ namespace pocketmine\network\mcpe;
use pocketmine\network\mcpe\protocol\serializer\PacketBatch; use pocketmine\network\mcpe\protocol\serializer\PacketBatch;
use pocketmine\network\mcpe\protocol\serializer\PacketSerializer; use pocketmine\network\mcpe\protocol\serializer\PacketSerializer;
use pocketmine\network\mcpe\protocol\serializer\PacketSerializerContext;
use pocketmine\Server; use pocketmine\Server;
use pocketmine\timings\Timings; use pocketmine\timings\Timings;
use pocketmine\utils\BinaryStream; use pocketmine\utils\BinaryStream;
@ -33,54 +34,54 @@ use function spl_object_id;
use function strlen; use function strlen;
final class StandardPacketBroadcaster implements PacketBroadcaster{ final class StandardPacketBroadcaster implements PacketBroadcaster{
public function __construct(private Server $server){} public function __construct(
private Server $server,
private PacketSerializerContext $protocolContext
){}
public function broadcastPackets(array $recipients, array $packets) : void{ public function broadcastPackets(array $recipients, array $packets) : void{
$packetBufferTotalLengths = [];
$packetBuffers = [];
$compressors = []; $compressors = [];
/** @var NetworkSession[][][] $targetMap */
$targetMap = []; /** @var NetworkSession[][] $targetsByCompressor */
$targetsByCompressor = [];
foreach($recipients as $recipient){ foreach($recipients as $recipient){
$serializerContext = $recipient->getPacketSerializerContext(); if($recipient->getPacketSerializerContext() !== $this->protocolContext){
$bufferId = spl_object_id($serializerContext); throw new \InvalidArgumentException("Only recipients with the same protocol context as the broadcaster can be broadcast to by this broadcaster");
if(!isset($packetBuffers[$bufferId])){
$packetBufferTotalLengths[$bufferId] = 0;
$packetBuffers[$bufferId] = [];
foreach($packets as $packet){
$buffer = NetworkSession::encodePacketTimed(PacketSerializer::encoder($serializerContext), $packet);
$packetBufferTotalLengths[$bufferId] += strlen($buffer);
$packetBuffers[$bufferId][] = $buffer;
}
} }
//TODO: different compressors might be compatible, it might not be necessary to split them up by object //TODO: different compressors might be compatible, it might not be necessary to split them up by object
$compressor = $recipient->getCompressor(); $compressor = $recipient->getCompressor();
$compressors[spl_object_id($compressor)] = $compressor; $compressors[spl_object_id($compressor)] = $compressor;
$targetMap[$bufferId][spl_object_id($compressor)][] = $recipient; $targetsByCompressor[spl_object_id($compressor)][] = $recipient;
} }
foreach($targetMap as $bufferId => $compressorMap){ $totalLength = 0;
foreach($compressorMap as $compressorId => $compressorTargets){ $packetBuffers = [];
$compressor = $compressors[$compressorId]; foreach($packets as $packet){
$buffer = NetworkSession::encodePacketTimed(PacketSerializer::encoder($this->protocolContext), $packet);
$totalLength += strlen($buffer);
$packetBuffers[] = $buffer;
}
$threshold = $compressor->getCompressionThreshold(); foreach($targetsByCompressor as $compressorId => $compressorTargets){
if(count($compressorTargets) > 1 && $threshold !== null && $packetBufferTotalLengths[$bufferId] >= $threshold){ $compressor = $compressors[$compressorId];
//do not prepare shared batch unless we're sure it will be compressed
$stream = new BinaryStream();
PacketBatch::encodeRaw($stream, $packetBuffers[$bufferId]);
$batchBuffer = $stream->getBuffer();
$promise = $this->server->prepareBatch(new PacketBatch($batchBuffer), $compressor, timings: Timings::$playerNetworkSendCompressBroadcast); $threshold = $compressor->getCompressionThreshold();
foreach($compressorTargets as $target){ if(count($compressorTargets) > 1 && $threshold !== null && $totalLength >= $threshold){
$target->queueCompressed($promise); //do not prepare shared batch unless we're sure it will be compressed
} $stream = new BinaryStream();
}else{ PacketBatch::encodeRaw($stream, $packetBuffers);
foreach($compressorTargets as $target){ $batchBuffer = $stream->getBuffer();
foreach($packetBuffers[$bufferId] as $packetBuffer){
$target->addToSendBuffer($packetBuffer); $promise = $this->server->prepareBatch(new PacketBatch($batchBuffer), $compressor, timings: Timings::$playerNetworkSendCompressBroadcast);
} foreach($compressorTargets as $target){
$target->queueCompressed($promise);
}
}else{
foreach($compressorTargets as $target){
foreach($packetBuffers as $packetBuffer){
$target->addToSendBuffer($packetBuffer);
} }
} }
} }

View File

@ -26,11 +26,13 @@ namespace pocketmine\network\mcpe\raklib;
use pocketmine\lang\KnownTranslationFactory; use pocketmine\lang\KnownTranslationFactory;
use pocketmine\network\AdvancedNetworkInterface; use pocketmine\network\AdvancedNetworkInterface;
use pocketmine\network\mcpe\compression\ZlibCompressor; use pocketmine\network\mcpe\compression\ZlibCompressor;
use pocketmine\network\mcpe\convert\GlobalItemTypeDictionary;
use pocketmine\network\mcpe\convert\TypeConverter; use pocketmine\network\mcpe\convert\TypeConverter;
use pocketmine\network\mcpe\NetworkSession; use pocketmine\network\mcpe\NetworkSession;
use pocketmine\network\mcpe\PacketBroadcaster; use pocketmine\network\mcpe\PacketBroadcaster;
use pocketmine\network\mcpe\protocol\PacketPool; use pocketmine\network\mcpe\protocol\PacketPool;
use pocketmine\network\mcpe\protocol\ProtocolInfo; use pocketmine\network\mcpe\protocol\ProtocolInfo;
use pocketmine\network\mcpe\protocol\serializer\PacketSerializerContext;
use pocketmine\network\mcpe\StandardPacketBroadcaster; use pocketmine\network\mcpe\StandardPacketBroadcaster;
use pocketmine\network\Network; use pocketmine\network\Network;
use pocketmine\network\NetworkInterfaceStartException; use pocketmine\network\NetworkInterfaceStartException;
@ -79,6 +81,7 @@ class RakLibInterface implements ServerEventListener, AdvancedNetworkInterface{
private SleeperNotifier $sleeper; private SleeperNotifier $sleeper;
private PacketBroadcaster $broadcaster; private PacketBroadcaster $broadcaster;
private PacketSerializerContext $packetSerializerContext;
public function __construct(Server $server, string $ip, int $port, bool $ipV6){ public function __construct(Server $server, string $ip, int $port, bool $ipV6){
$this->server = $server; $this->server = $server;
@ -108,7 +111,8 @@ class RakLibInterface implements ServerEventListener, AdvancedNetworkInterface{
new PthreadsChannelWriter($mainToThreadBuffer) new PthreadsChannelWriter($mainToThreadBuffer)
); );
$this->broadcaster = new StandardPacketBroadcaster($this->server); $this->packetSerializerContext = new PacketSerializerContext(GlobalItemTypeDictionary::getInstance()->getDictionary());
$this->broadcaster = new StandardPacketBroadcaster($this->server, $this->packetSerializerContext);
} }
public function start() : void{ public function start() : void{
@ -173,6 +177,7 @@ class RakLibInterface implements ServerEventListener, AdvancedNetworkInterface{
$this->server, $this->server,
$this->network->getSessionManager(), $this->network->getSessionManager(),
PacketPool::getInstance(), PacketPool::getInstance(),
$this->packetSerializerContext,
new RakLibPacketSender($sessionId, $this), new RakLibPacketSender($sessionId, $this),
$this->broadcaster, $this->broadcaster,
ZlibCompressor::getInstance(), //TODO: this shouldn't be hardcoded, but we might need the RakNet protocol version to select it ZlibCompressor::getInstance(), //TODO: this shouldn't be hardcoded, but we might need the RakNet protocol version to select it