mirror of
https://github.com/pmmp/PocketMine-MP.git
synced 2025-09-04 08:56:15 +00:00
Merge branch 'resource-pack-ack-receipts' into minor-next
This commit is contained in:
@ -101,6 +101,8 @@ use pocketmine\player\Player;
|
||||
use pocketmine\player\PlayerInfo;
|
||||
use pocketmine\player\UsedChunkStatus;
|
||||
use pocketmine\player\XboxLivePlayerInfo;
|
||||
use pocketmine\promise\Promise;
|
||||
use pocketmine\promise\PromiseResolver;
|
||||
use pocketmine\Server;
|
||||
use pocketmine\timings\Timings;
|
||||
use pocketmine\utils\AssumptionFailedError;
|
||||
@ -159,15 +161,24 @@ class NetworkSession{
|
||||
|
||||
/** @var string[] */
|
||||
private array $sendBuffer = [];
|
||||
|
||||
/**
|
||||
* @var \SplQueue|CompressBatchPromise[]|string[]
|
||||
* @phpstan-var \SplQueue<CompressBatchPromise|string>
|
||||
* @var PromiseResolver[]
|
||||
* @phpstan-var list<PromiseResolver<true>>
|
||||
*/
|
||||
private array $sendBufferAckPromises = [];
|
||||
|
||||
/** @phpstan-var \SplQueue<array{CompressBatchPromise|string, list<PromiseResolver<true>>}> */
|
||||
private \SplQueue $compressedQueue;
|
||||
private bool $forceAsyncCompression = true;
|
||||
private bool $enableCompression = false; //disabled until handshake completed
|
||||
|
||||
private int $nextAckReceiptId = 0;
|
||||
/**
|
||||
* @var PromiseResolver[][]
|
||||
* @phpstan-var array<int, list<PromiseResolver<true>>>
|
||||
*/
|
||||
private array $ackPromisesByReceiptId = [];
|
||||
|
||||
private ?InventoryManager $invManager = null;
|
||||
|
||||
/**
|
||||
@ -465,7 +476,23 @@ class NetworkSession{
|
||||
}
|
||||
}
|
||||
|
||||
public function sendDataPacket(ClientboundPacket $packet, bool $immediate = false) : bool{
|
||||
public function handleAckReceipt(int $receiptId) : void{
|
||||
if(!$this->connected){
|
||||
return;
|
||||
}
|
||||
if(isset($this->ackPromisesByReceiptId[$receiptId])){
|
||||
$promises = $this->ackPromisesByReceiptId[$receiptId];
|
||||
unset($this->ackPromisesByReceiptId[$receiptId]);
|
||||
foreach($promises as $promise){
|
||||
$promise->resolve(true);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @phpstan-param PromiseResolver<true>|null $ackReceiptResolver
|
||||
*/
|
||||
private function sendDataPacketInternal(ClientboundPacket $packet, bool $immediate, ?PromiseResolver $ackReceiptResolver) : bool{
|
||||
if(!$this->connected){
|
||||
return false;
|
||||
}
|
||||
@ -488,6 +515,9 @@ class NetworkSession{
|
||||
$packets = [$packet];
|
||||
}
|
||||
|
||||
if($ackReceiptResolver !== null){
|
||||
$this->sendBufferAckPromises[] = $ackReceiptResolver;
|
||||
}
|
||||
foreach($packets as $evPacket){
|
||||
$this->addToSendBuffer(self::encodePacketTimed(PacketSerializer::encoder(), $evPacket));
|
||||
}
|
||||
@ -501,6 +531,23 @@ class NetworkSession{
|
||||
}
|
||||
}
|
||||
|
||||
public function sendDataPacket(ClientboundPacket $packet, bool $immediate = false) : bool{
|
||||
return $this->sendDataPacketInternal($packet, $immediate, null);
|
||||
}
|
||||
|
||||
/**
|
||||
* @phpstan-return Promise<true>
|
||||
*/
|
||||
public function sendDataPacketWithReceipt(ClientboundPacket $packet, bool $immediate = false) : Promise{
|
||||
$resolver = new PromiseResolver();
|
||||
|
||||
if(!$this->sendDataPacketInternal($packet, $immediate, $resolver)){
|
||||
$resolver->reject();
|
||||
}
|
||||
|
||||
return $resolver->getPromise();
|
||||
}
|
||||
|
||||
/**
|
||||
* @internal
|
||||
*/
|
||||
@ -542,7 +589,9 @@ class NetworkSession{
|
||||
$batch = $stream->getBuffer();
|
||||
}
|
||||
$this->sendBuffer = [];
|
||||
$this->queueCompressedNoBufferFlush($batch, $immediate);
|
||||
$ackPromises = $this->sendBufferAckPromises;
|
||||
$this->sendBufferAckPromises = [];
|
||||
$this->queueCompressedNoBufferFlush($batch, $immediate, $ackPromises);
|
||||
}finally{
|
||||
Timings::$playerNetworkSend->stopTiming();
|
||||
}
|
||||
@ -569,22 +618,27 @@ class NetworkSession{
|
||||
}
|
||||
}
|
||||
|
||||
private function queueCompressedNoBufferFlush(CompressBatchPromise|string $batch, bool $immediate = false) : void{
|
||||
/**
|
||||
* @param PromiseResolver[] $ackPromises
|
||||
*
|
||||
* @phpstan-param list<PromiseResolver<true>> $ackPromises
|
||||
*/
|
||||
private function queueCompressedNoBufferFlush(CompressBatchPromise|string $batch, bool $immediate = false, array $ackPromises = []) : void{
|
||||
Timings::$playerNetworkSend->startTiming();
|
||||
try{
|
||||
if(is_string($batch)){
|
||||
if($immediate){
|
||||
//Skips all queues
|
||||
$this->sendEncoded($batch, true);
|
||||
$this->sendEncoded($batch, true, $ackPromises);
|
||||
}else{
|
||||
$this->compressedQueue->enqueue($batch);
|
||||
$this->compressedQueue->enqueue([$batch, $ackPromises]);
|
||||
$this->flushCompressedQueue();
|
||||
}
|
||||
}elseif($immediate){
|
||||
//Skips all queues
|
||||
$this->sendEncoded($batch->getResult(), true);
|
||||
$this->sendEncoded($batch->getResult(), true, $ackPromises);
|
||||
}else{
|
||||
$this->compressedQueue->enqueue($batch);
|
||||
$this->compressedQueue->enqueue([$batch, $ackPromises]);
|
||||
$batch->onResolve(function() : void{
|
||||
if($this->connected){
|
||||
$this->flushCompressedQueue();
|
||||
@ -601,14 +655,14 @@ class NetworkSession{
|
||||
try{
|
||||
while(!$this->compressedQueue->isEmpty()){
|
||||
/** @var CompressBatchPromise|string $current */
|
||||
$current = $this->compressedQueue->bottom();
|
||||
[$current, $ackPromises] = $this->compressedQueue->bottom();
|
||||
if(is_string($current)){
|
||||
$this->compressedQueue->dequeue();
|
||||
$this->sendEncoded($current);
|
||||
$this->sendEncoded($current, false, $ackPromises);
|
||||
|
||||
}elseif($current->hasResult()){
|
||||
$this->compressedQueue->dequeue();
|
||||
$this->sendEncoded($current->getResult());
|
||||
$this->sendEncoded($current->getResult(), false, $ackPromises);
|
||||
|
||||
}else{
|
||||
//can't send any more queued until this one is ready
|
||||
@ -620,13 +674,24 @@ class NetworkSession{
|
||||
}
|
||||
}
|
||||
|
||||
private function sendEncoded(string $payload, bool $immediate = false) : void{
|
||||
/**
|
||||
* @param PromiseResolver[] $ackPromises
|
||||
* @phpstan-param list<PromiseResolver<true>> $ackPromises
|
||||
*/
|
||||
private function sendEncoded(string $payload, bool $immediate, array $ackPromises) : void{
|
||||
if($this->cipher !== null){
|
||||
Timings::$playerNetworkSendEncrypt->startTiming();
|
||||
$payload = $this->cipher->encrypt($payload);
|
||||
Timings::$playerNetworkSendEncrypt->stopTiming();
|
||||
}
|
||||
$this->sender->send($payload, $immediate);
|
||||
|
||||
if(count($ackPromises) > 0){
|
||||
$ackReceiptId = $this->nextAckReceiptId++;
|
||||
$this->ackPromisesByReceiptId[$ackReceiptId] = $ackPromises;
|
||||
}else{
|
||||
$ackReceiptId = null;
|
||||
}
|
||||
$this->sender->send($payload, $immediate, $ackReceiptId);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -646,6 +711,19 @@ class NetworkSession{
|
||||
$this->setHandler(null);
|
||||
$this->connected = false;
|
||||
|
||||
$ackPromisesByReceiptId = $this->ackPromisesByReceiptId;
|
||||
$this->ackPromisesByReceiptId = [];
|
||||
foreach($ackPromisesByReceiptId as $resolvers){
|
||||
foreach($resolvers as $resolver){
|
||||
$resolver->reject();
|
||||
}
|
||||
}
|
||||
$sendBufferAckPromises = $this->sendBufferAckPromises;
|
||||
$this->sendBufferAckPromises = [];
|
||||
foreach($sendBufferAckPromises as $resolver){
|
||||
$resolver->reject();
|
||||
}
|
||||
|
||||
$this->logger->info($this->server->getLanguage()->translate(KnownTranslationFactory::pocketmine_network_session_close($reason)));
|
||||
}
|
||||
}
|
||||
|
Reference in New Issue
Block a user