diff --git a/src/pocketmine/Player.php b/src/pocketmine/Player.php index 3cf8aff79..7aa78cb65 100644 --- a/src/pocketmine/Player.php +++ b/src/pocketmine/Player.php @@ -90,6 +90,7 @@ use pocketmine\nbt\tag\ByteTag; use pocketmine\nbt\tag\CompoundTag; use pocketmine\nbt\tag\DoubleTag; use pocketmine\nbt\tag\ListTag; +use pocketmine\network\mcpe\CompressBatchPromise; use pocketmine\network\mcpe\NetworkCipher; use pocketmine\network\mcpe\NetworkSession; use pocketmine\network\mcpe\protocol\AdventureSettingsPacket; @@ -894,7 +895,7 @@ class Player extends Human implements CommandSender, ChunkLoader, IPlayer{ unset($this->loadQueue[$index]); } - public function sendChunk(int $x, int $z, string $payload){ + public function sendChunk(int $x, int $z, CompressBatchPromise $promise){ if(!$this->isConnected()){ return; } @@ -902,7 +903,7 @@ class Player extends Human implements CommandSender, ChunkLoader, IPlayer{ $this->usedChunks[Level::chunkHash($x, $z)] = true; $this->chunkLoadCount++; - $this->networkSession->sendEncoded($payload); + $this->networkSession->queueCompressed($promise); if($this->spawned){ foreach($this->level->getChunkEntities($x, $z) as $entity){ diff --git a/src/pocketmine/Server.php b/src/pocketmine/Server.php index 2c9e5d0f6..528e1743b 100644 --- a/src/pocketmine/Server.php +++ b/src/pocketmine/Server.php @@ -71,7 +71,8 @@ use pocketmine\nbt\tag\LongTag; use pocketmine\nbt\tag\ShortTag; use pocketmine\nbt\tag\StringTag; use pocketmine\network\AdvancedNetworkInterface; -use pocketmine\network\mcpe\CompressBatchedTask; +use pocketmine\network\mcpe\CompressBatchPromise; +use pocketmine\network\mcpe\CompressBatchTask; use pocketmine\network\mcpe\NetworkCipher; use pocketmine\network\mcpe\NetworkCompression; use pocketmine\network\mcpe\NetworkSession; @@ -1899,55 +1900,54 @@ class Server{ $stream->putPacket($packet); } - //TODO: if under the compression threshold, add to session buffers instead of batching (first we need to implement buffering!) - $this->batchPackets($targets, $stream); + if(NetworkCompression::$THRESHOLD < 0 or strlen($stream->buffer) < NetworkCompression::$THRESHOLD){ + foreach($targets as $target){ + foreach($ev->getPackets() as $pk){ + $target->addToSendBuffer($pk); + } + } + }else{ + $promise = $this->prepareBatch($stream); + foreach($targets as $target){ + $target->queueCompressed($promise); + } + } + return true; } /** * Broadcasts a list of packets in a batch to a list of players * - * @param NetworkSession[] $targets - * @param PacketStream $stream - * @param bool $forceSync - * @param bool $immediate + * @param PacketStream $stream + * @param bool $forceSync + * + * @return CompressBatchPromise */ - public function batchPackets(array $targets, PacketStream $stream, bool $forceSync = false, bool $immediate = false){ - Timings::$playerNetworkSendCompressTimer->startTiming(); + public function prepareBatch(PacketStream $stream, bool $forceSync = false) : CompressBatchPromise{ + try{ + Timings::$playerNetworkSendCompressTimer->startTiming(); - if(!empty($targets)){ $compressionLevel = NetworkCompression::$LEVEL; if(NetworkCompression::$THRESHOLD < 0 or strlen($stream->buffer) < NetworkCompression::$THRESHOLD){ $compressionLevel = 0; //Do not compress packets under the threshold $forceSync = true; } - if(!$forceSync and !$immediate and $this->networkCompressionAsync){ - $task = new CompressBatchedTask($stream, $targets, $compressionLevel); + $promise = new CompressBatchPromise(); + if(!$forceSync and $this->networkCompressionAsync){ + $task = new CompressBatchTask($stream, $compressionLevel, $promise); $this->asyncPool->submitTask($task); }else{ - $this->broadcastPacketsCallback(NetworkCompression::compress($stream->buffer), $targets, $immediate); + $promise->resolve(NetworkCompression::compress($stream->buffer)); } - } - Timings::$playerNetworkSendCompressTimer->stopTiming(); - } - - /** - * @param string $payload - * @param NetworkSession[] $sessions - * @param bool $immediate - */ - public function broadcastPacketsCallback(string $payload, array $sessions, bool $immediate = false){ - /** @var NetworkSession $session */ - foreach($sessions as $session){ - if($session->isConnected()){ - $session->sendEncoded($payload, $immediate); - } + return $promise; + }finally{ + Timings::$playerNetworkSendCompressTimer->stopTiming(); } } - /** * @param int $type */ @@ -2542,10 +2542,6 @@ class Server{ ++$this->tickCounter; - Timings::$connectionTimer->startTiming(); - $this->network->tick(); - Timings::$connectionTimer->stopTiming(); - Timings::$schedulerTimer->startTiming(); $this->pluginManager->tickSchedulers($this->tickCounter); Timings::$schedulerTimer->stopTiming(); @@ -2556,6 +2552,10 @@ class Server{ $this->checkTickUpdates($this->tickCounter); + Timings::$connectionTimer->startTiming(); + $this->network->tick(); + Timings::$connectionTimer->stopTiming(); + if(($this->tickCounter % 20) === 0){ if($this->doTitleTick){ $this->titleTick(); diff --git a/src/pocketmine/inventory/CraftingManager.php b/src/pocketmine/inventory/CraftingManager.php index 414294d91..0b51e998a 100644 --- a/src/pocketmine/inventory/CraftingManager.php +++ b/src/pocketmine/inventory/CraftingManager.php @@ -25,6 +25,7 @@ namespace pocketmine\inventory; use pocketmine\item\Item; use pocketmine\item\ItemFactory; +use pocketmine\network\mcpe\CompressBatchPromise; use pocketmine\network\mcpe\NetworkCompression; use pocketmine\network\mcpe\PacketStream; use pocketmine\network\mcpe\protocol\CraftingDataPacket; @@ -38,7 +39,7 @@ class CraftingManager{ /** @var FurnaceRecipe[] */ protected $furnaceRecipes = []; - /** @var string */ + /** @var CompressBatchPromise */ private $craftingDataCache; public function __construct(){ @@ -105,16 +106,18 @@ class CraftingManager{ $batch = new PacketStream(); $batch->putPacket($pk); - $this->craftingDataCache = NetworkCompression::compress($batch->buffer); + $this->craftingDataCache = new CompressBatchPromise(); + $this->craftingDataCache->resolve(NetworkCompression::compress($batch->buffer)); + Timings::$craftingDataCacheRebuildTimer->stopTiming(); } /** * Returns a pre-compressed CraftingDataPacket for sending to players. Rebuilds the cache if it is not found. * - * @return string + * @return CompressBatchPromise */ - public function getCraftingDataPacket() : string{ + public function getCraftingDataPacket() : CompressBatchPromise{ if($this->craftingDataCache === null){ $this->buildCraftingDataCache(); } diff --git a/src/pocketmine/level/Level.php b/src/pocketmine/level/Level.php index b8231d979..a7c0613ff 100644 --- a/src/pocketmine/level/Level.php +++ b/src/pocketmine/level/Level.php @@ -69,6 +69,7 @@ use pocketmine\metadata\MetadataValue; use pocketmine\nbt\tag\ListTag; use pocketmine\nbt\tag\StringTag; use pocketmine\network\mcpe\ChunkRequestTask; +use pocketmine\network\mcpe\CompressBatchPromise; use pocketmine\network\mcpe\protocol\DataPacket; use pocketmine\network\mcpe\protocol\LevelEventPacket; use pocketmine\network\mcpe\protocol\LevelSoundEventPacket; @@ -122,7 +123,7 @@ class Level implements ChunkManager, Metadatable{ /** @var Block[][] */ private $blockCache = []; - /** @var string[] */ + /** @var CompressBatchPromise[] */ private $chunkCache = []; /** @var int */ @@ -2451,7 +2452,7 @@ class Level implements ChunkManager, Metadatable{ $this->chunkSendQueue[$index][$player->getLoaderId()] = $player; } - private function sendChunkFromCache(int $x, int $z){ + private function sendCachedChunk(int $x, int $z){ if(isset($this->chunkSendQueue[$index = Level::chunkHash($x, $z)])){ foreach($this->chunkSendQueue[$index] as $player){ /** @var Player $player */ @@ -2479,10 +2480,12 @@ class Level implements ChunkManager, Metadatable{ continue; } } + if(isset($this->chunkCache[$index])){ - $this->sendChunkFromCache($x, $z); + $this->sendCachedChunk($x, $z); continue; } + $this->timings->syncChunkSendPrepareTimer->startTiming(); $chunk = $this->chunks[$index] ?? null; @@ -2491,7 +2494,30 @@ class Level implements ChunkManager, Metadatable{ } assert($chunk->getX() === $x and $chunk->getZ() === $z, "Chunk coordinate mismatch: expected $x $z, but chunk has coordinates " . $chunk->getX() . " " . $chunk->getZ() . ", did you forget to clone a chunk before setting?"); - $this->server->getAsyncPool()->submitTask($task = new ChunkRequestTask($this, $x, $z, $chunk)); + /* + * we don't send promises directly to the players here because unresolved promises of chunk sending + * would slow down the sending of other packets, especially if a chunk takes a long time to prepare. + */ + + $promise = new CompressBatchPromise(); + $promise->onResolve(function(CompressBatchPromise $promise) use ($x, $z, $index): void{ + if(!$this->closed){ + $this->timings->syncChunkSendTimer->startTiming(); + + unset($this->chunkSendTasks[$index]); + + $this->chunkCache[$index] = $promise; + $this->sendCachedChunk($x, $z); + if(!$this->server->getMemoryManager()->canUseChunkCache()){ + unset($this->chunkCache[$index]); + } + + $this->timings->syncChunkSendTimer->stopTiming(); + }else{ + $this->server->getLogger()->debug("Dropped prepared chunk $x $z due to level not loaded"); + } + }); + $this->server->getAsyncPool()->submitTask($task = new ChunkRequestTask($x, $z, $chunk, $promise)); $this->chunkSendTasks[$index] = $task; $this->timings->syncChunkSendPrepareTimer->stopTiming(); @@ -2501,21 +2527,6 @@ class Level implements ChunkManager, Metadatable{ } } - public function chunkRequestCallback(int $x, int $z, string $payload){ - $this->timings->syncChunkSendTimer->startTiming(); - - $index = Level::chunkHash($x, $z); - unset($this->chunkSendTasks[$index]); - - $this->chunkCache[$index] = $payload; - $this->sendChunkFromCache($x, $z); - if(!$this->server->getMemoryManager()->canUseChunkCache()){ - unset($this->chunkCache[$index]); - } - - $this->timings->syncChunkSendTimer->stopTiming(); - } - /** * @param Entity $entity * diff --git a/src/pocketmine/network/mcpe/ChunkRequestTask.php b/src/pocketmine/network/mcpe/ChunkRequestTask.php index b12d31db1..885bb44f4 100644 --- a/src/pocketmine/network/mcpe/ChunkRequestTask.php +++ b/src/pocketmine/network/mcpe/ChunkRequestTask.php @@ -24,7 +24,6 @@ declare(strict_types=1); namespace pocketmine\network\mcpe; use pocketmine\level\format\Chunk; -use pocketmine\level\Level; use pocketmine\network\mcpe\protocol\FullChunkDataPacket; use pocketmine\scheduler\AsyncTask; use pocketmine\Server; @@ -42,8 +41,7 @@ class ChunkRequestTask extends AsyncTask{ /** @var int */ protected $compressionLevel; - public function __construct(Level $level, int $chunkX, int $chunkZ, Chunk $chunk){ - $this->storeLocal($level); + public function __construct(int $chunkX, int $chunkZ, Chunk $chunk, CompressBatchPromise $promise){ $this->compressionLevel = NetworkCompression::$LEVEL; $this->chunk = $chunk->fastSerialize(); @@ -59,6 +57,8 @@ class ChunkRequestTask extends AsyncTask{ } $this->tiles = $tiles; + + $this->storeLocal($promise); } public function onRun() : void{ @@ -76,14 +76,8 @@ class ChunkRequestTask extends AsyncTask{ } public function onCompletion(Server $server) : void{ - /** @var Level $level */ - $level = $this->fetchLocal(); - if(!$level->isClosed()){ - if($this->hasResult()){ - $level->chunkRequestCallback($this->chunkX, $this->chunkZ, $this->getResult()); - }else{ - $level->getServer()->getLogger()->error("Chunk request for level " . $level->getName() . ", x=" . $this->chunkX . ", z=" . $this->chunkZ . " doesn't have any result data"); - } - } + /** @var CompressBatchPromise $promise */ + $promise = $this->fetchLocal(); + $promise->resolve($this->getResult()); } } diff --git a/src/pocketmine/network/mcpe/CompressBatchPromise.php b/src/pocketmine/network/mcpe/CompressBatchPromise.php new file mode 100644 index 000000000..6707c08c2 --- /dev/null +++ b/src/pocketmine/network/mcpe/CompressBatchPromise.php @@ -0,0 +1,62 @@ +result !== null){ + $callback($this); + }else{ + $this->callbacks[] = $callback; + } + } + + public function resolve(string $result) : void{ + if($this->result !== null){ + throw new \InvalidStateException("Cannot resolve promise more than once"); + } + $this->result = $result; + foreach($this->callbacks as $callback){ + $callback($this); + } + $this->callbacks = []; + } + + public function getResult() : string{ + if($this->result === null){ + throw new \InvalidStateException("Promise has not yet been resolved"); + } + return $this->result; + } + + public function hasResult() : bool{ + return $this->result !== null; + } +} diff --git a/src/pocketmine/network/mcpe/CompressBatchedTask.php b/src/pocketmine/network/mcpe/CompressBatchTask.php similarity index 72% rename from src/pocketmine/network/mcpe/CompressBatchedTask.php rename to src/pocketmine/network/mcpe/CompressBatchTask.php index c9366b22a..860460315 100644 --- a/src/pocketmine/network/mcpe/CompressBatchedTask.php +++ b/src/pocketmine/network/mcpe/CompressBatchTask.php @@ -26,20 +26,20 @@ namespace pocketmine\network\mcpe; use pocketmine\scheduler\AsyncTask; use pocketmine\Server; -class CompressBatchedTask extends AsyncTask{ +class CompressBatchTask extends AsyncTask{ private $level; private $data; /** - * @param PacketStream $stream - * @param NetworkSession[] $targets - * @param int $compressionLevel + * @param PacketStream $stream + * @param int $compressionLevel + * @param CompressBatchPromise $promise */ - public function __construct(PacketStream $stream, array $targets, int $compressionLevel){ + public function __construct(PacketStream $stream, int $compressionLevel, CompressBatchPromise $promise){ $this->data = $stream->buffer; $this->level = $compressionLevel; - $this->storeLocal($targets); + $this->storeLocal($promise); } public function onRun() : void{ @@ -47,9 +47,8 @@ class CompressBatchedTask extends AsyncTask{ } public function onCompletion(Server $server) : void{ - /** @var NetworkSession[] $targets */ - $targets = $this->fetchLocal(); - - $server->broadcastPacketsCallback($this->getResult(), $targets); + /** @var CompressBatchPromise $promise */ + $promise = $this->fetchLocal(); + $promise->resolve($this->getResult()); } } diff --git a/src/pocketmine/network/mcpe/NetworkSession.php b/src/pocketmine/network/mcpe/NetworkSession.php index f4d38224a..eff34d202 100644 --- a/src/pocketmine/network/mcpe/NetworkSession.php +++ b/src/pocketmine/network/mcpe/NetworkSession.php @@ -68,12 +68,20 @@ class NetworkSession{ /** @var NetworkCipher */ private $cipher; + /** @var PacketStream|null */ + private $sendBuffer; + + /** @var \SplQueue|CompressBatchPromise[] */ + private $compressedQueue; + public function __construct(Server $server, NetworkInterface $interface, string $ip, int $port){ $this->server = $server; $this->interface = $interface; $this->ip = $ip; $this->port = $port; + $this->compressedQueue = new \SplQueue(); + $this->connectTime = time(); $this->server->getNetwork()->scheduleSessionTick($this); @@ -206,10 +214,10 @@ class NetworkSession{ return false; } - //TODO: implement buffering (this is just a quick fix) - $stream = new PacketStream(); - $stream->putPacket($packet); - $this->server->batchPackets([$this], $stream, true, $immediate); + $this->addToSendBuffer($packet); + if($immediate){ + $this->flushSendBuffer(true); + } return true; }finally{ @@ -217,7 +225,62 @@ class NetworkSession{ } } - public function sendEncoded(string $payload, bool $immediate = false) : void{ + /** + * @internal + * @param DataPacket $packet + */ + public function addToSendBuffer(DataPacket $packet) : void{ + $timings = Timings::getSendDataPacketTimings($packet); + $timings->startTiming(); + try{ + if($this->sendBuffer === null){ + $this->sendBuffer = new PacketStream(); + } + $this->sendBuffer->putPacket($packet); + $this->server->getNetwork()->scheduleSessionTick($this); + }finally{ + $timings->stopTiming(); + } + } + + private function flushSendBuffer(bool $immediate = false) : void{ + if($this->sendBuffer !== null){ + $promise = $this->server->prepareBatch($this->sendBuffer, $immediate); + $this->sendBuffer = null; + $this->queueCompressed($promise, $immediate); + } + } + + public function queueCompressed(CompressBatchPromise $payload, bool $immediate = false) : void{ + $this->flushSendBuffer($immediate); //Maintain ordering if possible + if($immediate){ + //Skips all queues + $this->sendEncoded($payload->getResult(), true); + }else{ + $this->compressedQueue->enqueue($payload); + $payload->onResolve(function(CompressBatchPromise $payload) : void{ + if($this->connected and $this->compressedQueue->bottom() === $payload){ + $this->compressedQueue->dequeue(); //result unused + $this->sendEncoded($payload->getResult()); + + while(!$this->compressedQueue->isEmpty()){ + /** @var CompressBatchPromise $current */ + $current = $this->compressedQueue->bottom(); + if($current->hasResult()){ + $this->compressedQueue->dequeue(); + + $this->sendEncoded($current->getResult()); + }else{ + //can't send any more queued until this one is ready + break; + } + } + } + }); + } + } + + private function sendEncoded(string $payload, bool $immediate = false) : void{ if($this->cipher !== null){ Timings::$playerNetworkSendEncryptTimer->startTiming(); $payload = $this->cipher->encrypt($payload); @@ -289,6 +352,8 @@ class NetworkSession{ $this->handler = null; $this->interface = null; $this->player = null; + $this->sendBuffer = null; + $this->compressedQueue = null; } public function enableEncryption(string $encryptionKey, string $handshakeJwt) : void{ @@ -345,7 +410,10 @@ class NetworkSession{ return true; //keep ticking until timeout } - //TODO: more stuff on tick + if($this->sendBuffer !== null){ + $this->flushSendBuffer(); + } + return false; } } diff --git a/src/pocketmine/network/mcpe/handler/PreSpawnSessionHandler.php b/src/pocketmine/network/mcpe/handler/PreSpawnSessionHandler.php index 1c18c8aa9..69083f014 100644 --- a/src/pocketmine/network/mcpe/handler/PreSpawnSessionHandler.php +++ b/src/pocketmine/network/mcpe/handler/PreSpawnSessionHandler.php @@ -87,7 +87,7 @@ class PreSpawnSessionHandler extends SessionHandler{ $this->player->sendAllInventories(); $this->player->getInventory()->sendCreativeContents(); $this->player->getInventory()->sendHeldItem($this->player); - $this->session->sendEncoded($this->server->getCraftingManager()->getCraftingDataPacket()); + $this->session->queueCompressed($this->server->getCraftingManager()->getCraftingDataPacket()); $this->server->sendFullPlayerListData($this->player); }