First shot at packet ack receipt support

this will be useful for preventing resource pack sending from overloading the network.
it's not the best solution for that (since it means the RTT will limit the pack download speed), but it's easier than implementing congestion control and will work fine in most cases.
This commit is contained in:
Dylan K. Taylor 2024-03-01 14:41:53 +00:00
parent b2c97cf2f1
commit bc2abf4b15
No known key found for this signature in database
GPG Key ID: 8927471A91CAFD3D
4 changed files with 100 additions and 20 deletions

View File

@ -100,6 +100,8 @@ use pocketmine\player\Player;
use pocketmine\player\PlayerInfo;
use pocketmine\player\UsedChunkStatus;
use pocketmine\player\XboxLivePlayerInfo;
use pocketmine\promise\Promise;
use pocketmine\promise\PromiseResolver;
use pocketmine\Server;
use pocketmine\timings\Timings;
use pocketmine\utils\AssumptionFailedError;
@ -158,15 +160,24 @@ class NetworkSession{
/** @var string[] */
private array $sendBuffer = [];
/**
* @var \SplQueue|CompressBatchPromise[]|string[]
* @phpstan-var \SplQueue<CompressBatchPromise|string>
* @var PromiseResolver[]
* @phpstan-var list<PromiseResolver<true>>
*/
private array $sendBufferAckPromises = [];
/** @phpstan-var \SplQueue<array{CompressBatchPromise|string, list<PromiseResolver<true>>}> */
private \SplQueue $compressedQueue;
private bool $forceAsyncCompression = true;
private bool $enableCompression = false; //disabled until handshake completed
private int $nextAckReceiptId = 0;
/**
* @var PromiseResolver[][]
* @phpstan-var array<int, list<PromiseResolver<true>>>
*/
private array $ackPromisesByReceiptId = [];
private ?InventoryManager $invManager = null;
/**
@ -468,7 +479,23 @@ class NetworkSession{
}
}
public function sendDataPacket(ClientboundPacket $packet, bool $immediate = false) : bool{
public function handleAckReceipt(int $receiptId) : void{
if(!$this->connected){
return;
}
if(isset($this->ackPromisesByReceiptId[$receiptId])){
$promises = $this->ackPromisesByReceiptId[$receiptId];
unset($this->ackPromisesByReceiptId[$receiptId]);
foreach($promises as $promise){
$promise->resolve(true);
}
}
}
/**
* @phpstan-param PromiseResolver<true>|null $ackReceiptResolver
*/
private function sendDataPacketInternal(ClientboundPacket $packet, bool $immediate, ?PromiseResolver $ackReceiptResolver) : bool{
if(!$this->connected){
return false;
}
@ -491,6 +518,9 @@ class NetworkSession{
$packets = [$packet];
}
if($ackReceiptResolver !== null){
$this->sendBufferAckPromises[] = $ackReceiptResolver;
}
foreach($packets as $evPacket){
$this->addToSendBuffer(self::encodePacketTimed(PacketSerializer::encoder(), $evPacket));
}
@ -504,6 +534,23 @@ class NetworkSession{
}
}
public function sendDataPacket(ClientboundPacket $packet, bool $immediate = false) : bool{
return $this->sendDataPacketInternal($packet, $immediate, null);
}
/**
* @phpstan-return Promise<true>
*/
public function sendDataPacketWithReceipt(ClientboundPacket $packet, bool $immediate = false) : Promise{
$resolver = new PromiseResolver();
if(!$this->sendDataPacketInternal($packet, $immediate, $resolver)){
$resolver->reject();
}
return $resolver->getPromise();
}
/**
* @internal
*/
@ -545,7 +592,9 @@ class NetworkSession{
$batch = $stream->getBuffer();
}
$this->sendBuffer = [];
$this->queueCompressedNoBufferFlush($batch, $immediate);
$ackPromises = $this->sendBufferAckPromises;
$this->sendBufferAckPromises = [];
$this->queueCompressedNoBufferFlush($batch, $immediate, $ackPromises);
}finally{
Timings::$playerNetworkSend->stopTiming();
}
@ -572,22 +621,27 @@ class NetworkSession{
}
}
private function queueCompressedNoBufferFlush(CompressBatchPromise|string $batch, bool $immediate = false) : void{
/**
* @param PromiseResolver[] $ackPromises
*
* @phpstan-param list<PromiseResolver<true>> $ackPromises
*/
private function queueCompressedNoBufferFlush(CompressBatchPromise|string $batch, bool $immediate = false, array $ackPromises = []) : void{
Timings::$playerNetworkSend->startTiming();
try{
if(is_string($batch)){
if($immediate){
//Skips all queues
$this->sendEncoded($batch, true);
$this->sendEncoded($batch, true, $ackPromises);
}else{
$this->compressedQueue->enqueue($batch);
$this->compressedQueue->enqueue([$batch, $ackPromises]);
$this->flushCompressedQueue();
}
}elseif($immediate){
//Skips all queues
$this->sendEncoded($batch->getResult(), true);
$this->sendEncoded($batch->getResult(), true, $ackPromises);
}else{
$this->compressedQueue->enqueue($batch);
$this->compressedQueue->enqueue([$batch, $ackPromises]);
$batch->onResolve(function() : void{
if($this->connected){
$this->flushCompressedQueue();
@ -604,14 +658,14 @@ class NetworkSession{
try{
while(!$this->compressedQueue->isEmpty()){
/** @var CompressBatchPromise|string $current */
$current = $this->compressedQueue->bottom();
[$current, $ackPromises] = $this->compressedQueue->bottom();
if(is_string($current)){
$this->compressedQueue->dequeue();
$this->sendEncoded($current);
$this->sendEncoded($current, false, $ackPromises);
}elseif($current->hasResult()){
$this->compressedQueue->dequeue();
$this->sendEncoded($current->getResult());
$this->sendEncoded($current->getResult(), false, $ackPromises);
}else{
//can't send any more queued until this one is ready
@ -623,13 +677,24 @@ class NetworkSession{
}
}
private function sendEncoded(string $payload, bool $immediate = false) : void{
/**
* @param PromiseResolver[] $ackPromises
* @phpstan-param list<PromiseResolver<true>> $ackPromises
*/
private function sendEncoded(string $payload, bool $immediate, array $ackPromises) : void{
if($this->cipher !== null){
Timings::$playerNetworkSendEncrypt->startTiming();
$payload = $this->cipher->encrypt($payload);
Timings::$playerNetworkSendEncrypt->stopTiming();
}
$this->sender->send($payload, $immediate);
if(count($ackPromises) > 0){
$ackReceiptId = $this->nextAckReceiptId++;
$this->ackPromisesByReceiptId[$ackReceiptId] = $ackPromises;
}else{
$ackReceiptId = null;
}
$this->sender->send($payload, $immediate, $ackReceiptId);
}
/**
@ -642,6 +707,18 @@ class NetworkSession{
$this->disconnectGuard = false;
$this->flushSendBuffer(true);
$this->sender->close("");
foreach($this->ackPromisesByReceiptId as $resolvers){
foreach($resolvers as $resolver){
$resolver->reject();
}
}
$this->ackPromisesByReceiptId = [];
foreach($this->sendBufferAckPromises as $resolver){
$resolver->reject();
}
$this->sendBufferAckPromises = [];
foreach($this->disposeHooks as $callback){
$callback();
}

View File

@ -28,7 +28,7 @@ interface PacketSender{
/**
* Pushes a packet into the channel to be processed.
*/
public function send(string $payload, bool $immediate) : void;
public function send(string $payload, bool $immediate, ?int $receiptId) : void;
/**
* Closes the channel, terminating the connection.

View File

@ -252,7 +252,9 @@ class RakLibInterface implements ServerEventListener, AdvancedNetworkInterface{
}
public function onPacketAck(int $sessionId, int $identifierACK) : void{
if(isset($this->sessions[$sessionId])){
$this->sessions[$sessionId]->handleAckReceipt($identifierACK);
}
}
public function setName(string $name) : void{
@ -289,12 +291,13 @@ class RakLibInterface implements ServerEventListener, AdvancedNetworkInterface{
$this->network->getBandwidthTracker()->add($bytesSentDiff, $bytesReceivedDiff);
}
public function putPacket(int $sessionId, string $payload, bool $immediate = true) : void{
public function putPacket(int $sessionId, string $payload, bool $immediate = true, ?int $receiptId = null) : void{
if(isset($this->sessions[$sessionId])){
$pk = new EncapsulatedPacket();
$pk->buffer = self::MCPE_RAKNET_PACKET_ID . $payload;
$pk->reliability = PacketReliability::RELIABLE_ORDERED;
$pk->orderChannel = 0;
$pk->identifierACK = $receiptId;
$this->interface->sendEncapsulated($sessionId, $pk, $immediate);
}

View File

@ -33,9 +33,9 @@ class RakLibPacketSender implements PacketSender{
private RakLibInterface $handler
){}
public function send(string $payload, bool $immediate) : void{
public function send(string $payload, bool $immediate, ?int $receiptId) : void{
if(!$this->closed){
$this->handler->putPacket($this->sessionId, $payload, $immediate);
$this->handler->putPacket($this->sessionId, $payload, $immediate, $receiptId);
}
}