diff --git a/src/network/mcpe/NetworkSession.php b/src/network/mcpe/NetworkSession.php index 63fab278e..edb7f2549 100644 --- a/src/network/mcpe/NetworkSession.php +++ b/src/network/mcpe/NetworkSession.php @@ -100,6 +100,8 @@ use pocketmine\player\Player; use pocketmine\player\PlayerInfo; use pocketmine\player\UsedChunkStatus; use pocketmine\player\XboxLivePlayerInfo; +use pocketmine\promise\Promise; +use pocketmine\promise\PromiseResolver; use pocketmine\Server; use pocketmine\timings\Timings; use pocketmine\utils\AssumptionFailedError; @@ -158,15 +160,24 @@ class NetworkSession{ /** @var string[] */ private array $sendBuffer = []; - /** - * @var \SplQueue|CompressBatchPromise[]|string[] - * @phpstan-var \SplQueue + * @var PromiseResolver[] + * @phpstan-var list> */ + private array $sendBufferAckPromises = []; + + /** @phpstan-var \SplQueue>}> */ private \SplQueue $compressedQueue; private bool $forceAsyncCompression = true; private bool $enableCompression = false; //disabled until handshake completed + private int $nextAckReceiptId = 0; + /** + * @var PromiseResolver[][] + * @phpstan-var array>> + */ + private array $ackPromisesByReceiptId = []; + private ?InventoryManager $invManager = null; /** @@ -468,7 +479,23 @@ class NetworkSession{ } } - public function sendDataPacket(ClientboundPacket $packet, bool $immediate = false) : bool{ + public function handleAckReceipt(int $receiptId) : void{ + if(!$this->connected){ + return; + } + if(isset($this->ackPromisesByReceiptId[$receiptId])){ + $promises = $this->ackPromisesByReceiptId[$receiptId]; + unset($this->ackPromisesByReceiptId[$receiptId]); + foreach($promises as $promise){ + $promise->resolve(true); + } + } + } + + /** + * @phpstan-param PromiseResolver|null $ackReceiptResolver + */ + private function sendDataPacketInternal(ClientboundPacket $packet, bool $immediate, ?PromiseResolver $ackReceiptResolver) : bool{ if(!$this->connected){ return false; } @@ -491,6 +518,9 @@ class NetworkSession{ $packets = [$packet]; } + if($ackReceiptResolver !== null){ + $this->sendBufferAckPromises[] = $ackReceiptResolver; + } foreach($packets as $evPacket){ $this->addToSendBuffer(self::encodePacketTimed(PacketSerializer::encoder(), $evPacket)); } @@ -504,6 +534,23 @@ class NetworkSession{ } } + public function sendDataPacket(ClientboundPacket $packet, bool $immediate = false) : bool{ + return $this->sendDataPacketInternal($packet, $immediate, null); + } + + /** + * @phpstan-return Promise + */ + public function sendDataPacketWithReceipt(ClientboundPacket $packet, bool $immediate = false) : Promise{ + $resolver = new PromiseResolver(); + + if(!$this->sendDataPacketInternal($packet, $immediate, $resolver)){ + $resolver->reject(); + } + + return $resolver->getPromise(); + } + /** * @internal */ @@ -545,7 +592,9 @@ class NetworkSession{ $batch = $stream->getBuffer(); } $this->sendBuffer = []; - $this->queueCompressedNoBufferFlush($batch, $immediate); + $ackPromises = $this->sendBufferAckPromises; + $this->sendBufferAckPromises = []; + $this->queueCompressedNoBufferFlush($batch, $immediate, $ackPromises); }finally{ Timings::$playerNetworkSend->stopTiming(); } @@ -572,22 +621,27 @@ class NetworkSession{ } } - private function queueCompressedNoBufferFlush(CompressBatchPromise|string $batch, bool $immediate = false) : void{ + /** + * @param PromiseResolver[] $ackPromises + * + * @phpstan-param list> $ackPromises + */ + private function queueCompressedNoBufferFlush(CompressBatchPromise|string $batch, bool $immediate = false, array $ackPromises = []) : void{ Timings::$playerNetworkSend->startTiming(); try{ if(is_string($batch)){ if($immediate){ //Skips all queues - $this->sendEncoded($batch, true); + $this->sendEncoded($batch, true, $ackPromises); }else{ - $this->compressedQueue->enqueue($batch); + $this->compressedQueue->enqueue([$batch, $ackPromises]); $this->flushCompressedQueue(); } }elseif($immediate){ //Skips all queues - $this->sendEncoded($batch->getResult(), true); + $this->sendEncoded($batch->getResult(), true, $ackPromises); }else{ - $this->compressedQueue->enqueue($batch); + $this->compressedQueue->enqueue([$batch, $ackPromises]); $batch->onResolve(function() : void{ if($this->connected){ $this->flushCompressedQueue(); @@ -604,14 +658,14 @@ class NetworkSession{ try{ while(!$this->compressedQueue->isEmpty()){ /** @var CompressBatchPromise|string $current */ - $current = $this->compressedQueue->bottom(); + [$current, $ackPromises] = $this->compressedQueue->bottom(); if(is_string($current)){ $this->compressedQueue->dequeue(); - $this->sendEncoded($current); + $this->sendEncoded($current, false, $ackPromises); }elseif($current->hasResult()){ $this->compressedQueue->dequeue(); - $this->sendEncoded($current->getResult()); + $this->sendEncoded($current->getResult(), false, $ackPromises); }else{ //can't send any more queued until this one is ready @@ -623,13 +677,24 @@ class NetworkSession{ } } - private function sendEncoded(string $payload, bool $immediate = false) : void{ + /** + * @param PromiseResolver[] $ackPromises + * @phpstan-param list> $ackPromises + */ + private function sendEncoded(string $payload, bool $immediate, array $ackPromises) : void{ if($this->cipher !== null){ Timings::$playerNetworkSendEncrypt->startTiming(); $payload = $this->cipher->encrypt($payload); Timings::$playerNetworkSendEncrypt->stopTiming(); } - $this->sender->send($payload, $immediate); + + if(count($ackPromises) > 0){ + $ackReceiptId = $this->nextAckReceiptId++; + $this->ackPromisesByReceiptId[$ackReceiptId] = $ackPromises; + }else{ + $ackReceiptId = null; + } + $this->sender->send($payload, $immediate, $ackReceiptId); } /** @@ -642,6 +707,18 @@ class NetworkSession{ $this->disconnectGuard = false; $this->flushSendBuffer(true); $this->sender->close(""); + + foreach($this->ackPromisesByReceiptId as $resolvers){ + foreach($resolvers as $resolver){ + $resolver->reject(); + } + } + $this->ackPromisesByReceiptId = []; + foreach($this->sendBufferAckPromises as $resolver){ + $resolver->reject(); + } + $this->sendBufferAckPromises = []; + foreach($this->disposeHooks as $callback){ $callback(); } diff --git a/src/network/mcpe/PacketSender.php b/src/network/mcpe/PacketSender.php index 4842ea93e..36e556fe4 100644 --- a/src/network/mcpe/PacketSender.php +++ b/src/network/mcpe/PacketSender.php @@ -28,7 +28,7 @@ interface PacketSender{ /** * Pushes a packet into the channel to be processed. */ - public function send(string $payload, bool $immediate) : void; + public function send(string $payload, bool $immediate, ?int $receiptId) : void; /** * Closes the channel, terminating the connection. diff --git a/src/network/mcpe/raklib/RakLibInterface.php b/src/network/mcpe/raklib/RakLibInterface.php index b2325f569..44328f8f3 100644 --- a/src/network/mcpe/raklib/RakLibInterface.php +++ b/src/network/mcpe/raklib/RakLibInterface.php @@ -252,7 +252,9 @@ class RakLibInterface implements ServerEventListener, AdvancedNetworkInterface{ } public function onPacketAck(int $sessionId, int $identifierACK) : void{ - + if(isset($this->sessions[$sessionId])){ + $this->sessions[$sessionId]->handleAckReceipt($identifierACK); + } } public function setName(string $name) : void{ @@ -289,12 +291,13 @@ class RakLibInterface implements ServerEventListener, AdvancedNetworkInterface{ $this->network->getBandwidthTracker()->add($bytesSentDiff, $bytesReceivedDiff); } - public function putPacket(int $sessionId, string $payload, bool $immediate = true) : void{ + public function putPacket(int $sessionId, string $payload, bool $immediate = true, ?int $receiptId = null) : void{ if(isset($this->sessions[$sessionId])){ $pk = new EncapsulatedPacket(); $pk->buffer = self::MCPE_RAKNET_PACKET_ID . $payload; $pk->reliability = PacketReliability::RELIABLE_ORDERED; $pk->orderChannel = 0; + $pk->identifierACK = $receiptId; $this->interface->sendEncapsulated($sessionId, $pk, $immediate); } diff --git a/src/network/mcpe/raklib/RakLibPacketSender.php b/src/network/mcpe/raklib/RakLibPacketSender.php index d940c282b..df8cf9a00 100644 --- a/src/network/mcpe/raklib/RakLibPacketSender.php +++ b/src/network/mcpe/raklib/RakLibPacketSender.php @@ -33,9 +33,9 @@ class RakLibPacketSender implements PacketSender{ private RakLibInterface $handler ){} - public function send(string $payload, bool $immediate) : void{ + public function send(string $payload, bool $immediate, ?int $receiptId) : void{ if(!$this->closed){ - $this->handler->putPacket($this->sessionId, $payload, $immediate); + $this->handler->putPacket($this->sessionId, $payload, $immediate, $receiptId); } }