mirror of
https://github.com/pmmp/PocketMine-MP.git
synced 2025-04-21 00:07:30 +00:00
Fixing gigantic clusterfuck with protocol contexts and broadcasting
fixes #5623
This commit is contained in:
parent
acaa1a9ce1
commit
fa7c38276c
@ -43,7 +43,6 @@ use pocketmine\network\mcpe\cache\ChunkCache;
|
||||
use pocketmine\network\mcpe\compression\CompressBatchPromise;
|
||||
use pocketmine\network\mcpe\compression\Compressor;
|
||||
use pocketmine\network\mcpe\compression\DecompressionException;
|
||||
use pocketmine\network\mcpe\convert\GlobalItemTypeDictionary;
|
||||
use pocketmine\network\mcpe\convert\SkinAdapterSingleton;
|
||||
use pocketmine\network\mcpe\convert\TypeConverter;
|
||||
use pocketmine\network\mcpe\encryption\DecryptionException;
|
||||
@ -188,8 +187,6 @@ class NetworkSession{
|
||||
private bool $forceAsyncCompression = true;
|
||||
private bool $enableCompression = false; //disabled until handshake completed
|
||||
|
||||
private PacketSerializerContext $packetSerializerContext;
|
||||
|
||||
private ?InventoryManager $invManager = null;
|
||||
|
||||
/**
|
||||
@ -202,6 +199,7 @@ class NetworkSession{
|
||||
private Server $server,
|
||||
private NetworkSessionManager $manager,
|
||||
private PacketPool $packetPool,
|
||||
private PacketSerializerContext $packetSerializerContext,
|
||||
private PacketSender $sender,
|
||||
private PacketBroadcaster $broadcaster,
|
||||
private Compressor $compressor,
|
||||
@ -212,9 +210,6 @@ class NetworkSession{
|
||||
|
||||
$this->compressedQueue = new \SplQueue();
|
||||
|
||||
//TODO: allow this to be injected
|
||||
$this->packetSerializerContext = new PacketSerializerContext(GlobalItemTypeDictionary::getInstance()->getDictionary());
|
||||
|
||||
$this->disposeHooks = new ObjectSet();
|
||||
|
||||
$this->connectTime = time();
|
||||
|
@ -25,6 +25,7 @@ namespace pocketmine\network\mcpe;
|
||||
|
||||
use pocketmine\network\mcpe\protocol\serializer\PacketBatch;
|
||||
use pocketmine\network\mcpe\protocol\serializer\PacketSerializer;
|
||||
use pocketmine\network\mcpe\protocol\serializer\PacketSerializerContext;
|
||||
use pocketmine\Server;
|
||||
use pocketmine\timings\Timings;
|
||||
use pocketmine\utils\BinaryStream;
|
||||
@ -33,54 +34,54 @@ use function spl_object_id;
|
||||
use function strlen;
|
||||
|
||||
final class StandardPacketBroadcaster implements PacketBroadcaster{
|
||||
public function __construct(private Server $server){}
|
||||
public function __construct(
|
||||
private Server $server,
|
||||
private PacketSerializerContext $protocolContext
|
||||
){}
|
||||
|
||||
public function broadcastPackets(array $recipients, array $packets) : void{
|
||||
$packetBufferTotalLengths = [];
|
||||
$packetBuffers = [];
|
||||
$compressors = [];
|
||||
/** @var NetworkSession[][][] $targetMap */
|
||||
$targetMap = [];
|
||||
|
||||
/** @var NetworkSession[][] $targetsByCompressor */
|
||||
$targetsByCompressor = [];
|
||||
foreach($recipients as $recipient){
|
||||
$serializerContext = $recipient->getPacketSerializerContext();
|
||||
$bufferId = spl_object_id($serializerContext);
|
||||
if(!isset($packetBuffers[$bufferId])){
|
||||
$packetBufferTotalLengths[$bufferId] = 0;
|
||||
$packetBuffers[$bufferId] = [];
|
||||
foreach($packets as $packet){
|
||||
$buffer = NetworkSession::encodePacketTimed(PacketSerializer::encoder($serializerContext), $packet);
|
||||
$packetBufferTotalLengths[$bufferId] += strlen($buffer);
|
||||
$packetBuffers[$bufferId][] = $buffer;
|
||||
}
|
||||
if($recipient->getPacketSerializerContext() !== $this->protocolContext){
|
||||
throw new \InvalidArgumentException("Only recipients with the same protocol context as the broadcaster can be broadcast to by this broadcaster");
|
||||
}
|
||||
|
||||
//TODO: different compressors might be compatible, it might not be necessary to split them up by object
|
||||
$compressor = $recipient->getCompressor();
|
||||
$compressors[spl_object_id($compressor)] = $compressor;
|
||||
|
||||
$targetMap[$bufferId][spl_object_id($compressor)][] = $recipient;
|
||||
$targetsByCompressor[spl_object_id($compressor)][] = $recipient;
|
||||
}
|
||||
|
||||
foreach($targetMap as $bufferId => $compressorMap){
|
||||
foreach($compressorMap as $compressorId => $compressorTargets){
|
||||
$compressor = $compressors[$compressorId];
|
||||
$totalLength = 0;
|
||||
$packetBuffers = [];
|
||||
foreach($packets as $packet){
|
||||
$buffer = NetworkSession::encodePacketTimed(PacketSerializer::encoder($this->protocolContext), $packet);
|
||||
$totalLength += strlen($buffer);
|
||||
$packetBuffers[] = $buffer;
|
||||
}
|
||||
|
||||
$threshold = $compressor->getCompressionThreshold();
|
||||
if(count($compressorTargets) > 1 && $threshold !== null && $packetBufferTotalLengths[$bufferId] >= $threshold){
|
||||
//do not prepare shared batch unless we're sure it will be compressed
|
||||
$stream = new BinaryStream();
|
||||
PacketBatch::encodeRaw($stream, $packetBuffers[$bufferId]);
|
||||
$batchBuffer = $stream->getBuffer();
|
||||
foreach($targetsByCompressor as $compressorId => $compressorTargets){
|
||||
$compressor = $compressors[$compressorId];
|
||||
|
||||
$promise = $this->server->prepareBatch(new PacketBatch($batchBuffer), $compressor, timings: Timings::$playerNetworkSendCompressBroadcast);
|
||||
foreach($compressorTargets as $target){
|
||||
$target->queueCompressed($promise);
|
||||
}
|
||||
}else{
|
||||
foreach($compressorTargets as $target){
|
||||
foreach($packetBuffers[$bufferId] as $packetBuffer){
|
||||
$target->addToSendBuffer($packetBuffer);
|
||||
}
|
||||
$threshold = $compressor->getCompressionThreshold();
|
||||
if(count($compressorTargets) > 1 && $threshold !== null && $totalLength >= $threshold){
|
||||
//do not prepare shared batch unless we're sure it will be compressed
|
||||
$stream = new BinaryStream();
|
||||
PacketBatch::encodeRaw($stream, $packetBuffers);
|
||||
$batchBuffer = $stream->getBuffer();
|
||||
|
||||
$promise = $this->server->prepareBatch(new PacketBatch($batchBuffer), $compressor, timings: Timings::$playerNetworkSendCompressBroadcast);
|
||||
foreach($compressorTargets as $target){
|
||||
$target->queueCompressed($promise);
|
||||
}
|
||||
}else{
|
||||
foreach($compressorTargets as $target){
|
||||
foreach($packetBuffers as $packetBuffer){
|
||||
$target->addToSendBuffer($packetBuffer);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -25,11 +25,13 @@ namespace pocketmine\network\mcpe\raklib;
|
||||
|
||||
use pocketmine\network\AdvancedNetworkInterface;
|
||||
use pocketmine\network\mcpe\compression\ZlibCompressor;
|
||||
use pocketmine\network\mcpe\convert\GlobalItemTypeDictionary;
|
||||
use pocketmine\network\mcpe\convert\TypeConverter;
|
||||
use pocketmine\network\mcpe\NetworkSession;
|
||||
use pocketmine\network\mcpe\PacketBroadcaster;
|
||||
use pocketmine\network\mcpe\protocol\PacketPool;
|
||||
use pocketmine\network\mcpe\protocol\ProtocolInfo;
|
||||
use pocketmine\network\mcpe\protocol\serializer\PacketSerializerContext;
|
||||
use pocketmine\network\mcpe\StandardPacketBroadcaster;
|
||||
use pocketmine\network\Network;
|
||||
use pocketmine\network\NetworkInterfaceStartException;
|
||||
@ -79,6 +81,7 @@ class RakLibInterface implements ServerEventListener, AdvancedNetworkInterface{
|
||||
private SleeperNotifier $sleeper;
|
||||
|
||||
private PacketBroadcaster $broadcaster;
|
||||
private PacketSerializerContext $packetSerializerContext;
|
||||
|
||||
public function __construct(Server $server, string $ip, int $port, bool $ipV6){
|
||||
$this->server = $server;
|
||||
@ -106,7 +109,8 @@ class RakLibInterface implements ServerEventListener, AdvancedNetworkInterface{
|
||||
new PthreadsChannelWriter($mainToThreadBuffer)
|
||||
);
|
||||
|
||||
$this->broadcaster = new StandardPacketBroadcaster($this->server);
|
||||
$this->packetSerializerContext = new PacketSerializerContext(GlobalItemTypeDictionary::getInstance()->getDictionary());
|
||||
$this->broadcaster = new StandardPacketBroadcaster($this->server, $this->packetSerializerContext);
|
||||
}
|
||||
|
||||
public function start() : void{
|
||||
@ -166,6 +170,7 @@ class RakLibInterface implements ServerEventListener, AdvancedNetworkInterface{
|
||||
$this->server,
|
||||
$this->network->getSessionManager(),
|
||||
PacketPool::getInstance(),
|
||||
$this->packetSerializerContext,
|
||||
new RakLibPacketSender($sessionId, $this),
|
||||
$this->broadcaster,
|
||||
ZlibCompressor::getInstance(), //TODO: this shouldn't be hardcoded, but we might need the RakNet protocol version to select it
|
||||
|
Loading…
x
Reference in New Issue
Block a user