Merge branch 'stable' into next-minor

This commit is contained in:
Dylan K. Taylor
2022-09-20 20:18:06 +01:00
13 changed files with 182 additions and 57 deletions

View File

@ -56,6 +56,7 @@ use pocketmine\network\mcpe\handler\LoginPacketHandler;
use pocketmine\network\mcpe\handler\PacketHandler;
use pocketmine\network\mcpe\handler\PreSpawnPacketHandler;
use pocketmine\network\mcpe\handler\ResourcePacksPacketHandler;
use pocketmine\network\mcpe\handler\SessionStartPacketHandler;
use pocketmine\network\mcpe\handler\SpawnResponsePacketHandler;
use pocketmine\network\mcpe\protocol\AvailableCommandsPacket;
use pocketmine\network\mcpe\protocol\ChunkRadiusUpdatedPacket;
@ -164,6 +165,7 @@ class NetworkSession{
*/
private \SplQueue $compressedQueue;
private bool $forceAsyncCompression = true;
private bool $enableCompression = false; //disabled until handshake completed
private PacketSerializerContext $packetSerializerContext;
@ -196,17 +198,10 @@ class NetworkSession{
$this->connectTime = time();
$this->setHandler(new LoginPacketHandler(
$this->setHandler(new SessionStartPacketHandler(
$this->server,
$this,
function(PlayerInfo $info) : void{
$this->info = $info;
$this->logger->info("Player: " . TextFormat::AQUA . $info->getUsername() . TextFormat::RESET);
$this->logger->setPrefix($this->getLogPrefix());
},
function(bool $isAuthenticated, bool $authRequired, ?string $error, ?string $clientPubKey) : void{
$this->setAuthenticationStatus($isAuthenticated, $authRequired, $error, $clientPubKey);
}
fn() => $this->onSessionStartSuccess()
));
$this->manager->add($this);
@ -221,6 +216,24 @@ class NetworkSession{
return $this->logger;
}
private function onSessionStartSuccess() : void{
$this->logger->debug("Session start handshake completed, awaiting login packet");
$this->flushSendBuffer(true);
$this->enableCompression = true;
$this->setHandler(new LoginPacketHandler(
$this->server,
$this,
function(PlayerInfo $info) : void{
$this->info = $info;
$this->logger->info("Player: " . TextFormat::AQUA . $info->getUsername() . TextFormat::RESET);
$this->logger->setPrefix($this->getLogPrefix());
},
function(bool $isAuthenticated, bool $authRequired, ?string $error, ?string $clientPubKey) : void{
$this->setAuthenticationStatus($isAuthenticated, $authRequired, $error, $clientPubKey);
}
));
}
protected function createPlayer() : void{
$this->server->createPlayer($this, $this->info, $this->authenticated, $this->cachedOfflinePlayerData)->onCompletion(
\Closure::fromCallable([$this, 'onPlayerCreated']),
@ -335,18 +348,22 @@ class NetworkSession{
}
}
Timings::$playerNetworkReceiveDecompress->startTiming();
try{
$stream = new PacketBatch($this->compressor->decompress($payload));
}catch(DecompressionException $e){
$this->logger->debug("Failed to decompress packet: " . base64_encode($payload));
throw PacketHandlingException::wrap($e, "Compressed packet batch decode error");
}finally{
Timings::$playerNetworkReceiveDecompress->stopTiming();
if($this->enableCompression){
Timings::$playerNetworkReceiveDecompress->startTiming();
try{
$decompressed = $this->compressor->decompress($payload);
}catch(DecompressionException $e){
$this->logger->debug("Failed to decompress packet: " . base64_encode($payload));
throw PacketHandlingException::wrap($e, "Compressed packet batch decode error");
}finally{
Timings::$playerNetworkReceiveDecompress->stopTiming();
}
}else{
$decompressed = $payload;
}
try{
foreach($stream->getPackets($this->packetPool, $this->packetSerializerContext, 500) as [$packet, $buffer]){
foreach((new PacketBatch($decompressed))->getPackets($this->packetPool, $this->packetSerializerContext, 500) as [$packet, $buffer]){
if($packet === null){
$this->logger->debug("Unknown packet: " . base64_encode($buffer));
throw new PacketHandlingException("Unknown packet received");
@ -451,7 +468,14 @@ class NetworkSession{
}elseif($this->forceAsyncCompression){
$syncMode = false;
}
$promise = $this->server->prepareBatch(PacketBatch::fromPackets($this->packetSerializerContext, ...$this->sendBuffer), $this->compressor, $syncMode);
$batch = PacketBatch::fromPackets($this->packetSerializerContext, ...$this->sendBuffer);
if($this->enableCompression){
$promise = $this->server->prepareBatch($batch, $this->compressor, $syncMode);
}else{
$promise = new CompressBatchPromise();
$promise->resolve($batch->getBuffer());
}
$this->sendBuffer = [];
$this->queueCompressedNoBufferFlush($promise, $immediate);
}