From 15bac8c58adff26c66c5119829af89db83ef8621 Mon Sep 17 00:00:00 2001 From: "Dylan K. Taylor" Date: Mon, 13 Aug 2018 14:37:18 +0100 Subject: [PATCH] Implement send buffering and queuing for network sessions (#2358) Async compression and broadcasts are now reliable and don't have race condition bugs. This features improved performance and significantly reduced bandwidth wastage. Reduce Level broadcast latency by ticking network after levels. This ensures that session buffers get flushed as soon as possible after level tick, if level broadcasts were done. --- src/pocketmine/Player.php | 5 +- src/pocketmine/Server.php | 66 +++++++-------- src/pocketmine/inventory/CraftingManager.php | 11 ++- src/pocketmine/level/Level.php | 49 +++++++----- .../network/mcpe/ChunkRequestTask.php | 18 ++--- .../network/mcpe/CompressBatchPromise.php | 62 ++++++++++++++ ...sBatchedTask.php => CompressBatchTask.php} | 19 +++-- .../network/mcpe/NetworkSession.php | 80 +++++++++++++++++-- .../mcpe/handler/PreSpawnSessionHandler.php | 2 +- 9 files changed, 225 insertions(+), 87 deletions(-) create mode 100644 src/pocketmine/network/mcpe/CompressBatchPromise.php rename src/pocketmine/network/mcpe/{CompressBatchedTask.php => CompressBatchTask.php} (72%) diff --git a/src/pocketmine/Player.php b/src/pocketmine/Player.php index 3cf8aff79..7aa78cb65 100644 --- a/src/pocketmine/Player.php +++ b/src/pocketmine/Player.php @@ -90,6 +90,7 @@ use pocketmine\nbt\tag\ByteTag; use pocketmine\nbt\tag\CompoundTag; use pocketmine\nbt\tag\DoubleTag; use pocketmine\nbt\tag\ListTag; +use pocketmine\network\mcpe\CompressBatchPromise; use pocketmine\network\mcpe\NetworkCipher; use pocketmine\network\mcpe\NetworkSession; use pocketmine\network\mcpe\protocol\AdventureSettingsPacket; @@ -894,7 +895,7 @@ class Player extends Human implements CommandSender, ChunkLoader, IPlayer{ unset($this->loadQueue[$index]); } - public function sendChunk(int $x, int $z, string $payload){ + public function sendChunk(int $x, int $z, CompressBatchPromise $promise){ if(!$this->isConnected()){ return; } @@ -902,7 +903,7 @@ class Player extends Human implements CommandSender, ChunkLoader, IPlayer{ $this->usedChunks[Level::chunkHash($x, $z)] = true; $this->chunkLoadCount++; - $this->networkSession->sendEncoded($payload); + $this->networkSession->queueCompressed($promise); if($this->spawned){ foreach($this->level->getChunkEntities($x, $z) as $entity){ diff --git a/src/pocketmine/Server.php b/src/pocketmine/Server.php index 2c9e5d0f6..528e1743b 100644 --- a/src/pocketmine/Server.php +++ b/src/pocketmine/Server.php @@ -71,7 +71,8 @@ use pocketmine\nbt\tag\LongTag; use pocketmine\nbt\tag\ShortTag; use pocketmine\nbt\tag\StringTag; use pocketmine\network\AdvancedNetworkInterface; -use pocketmine\network\mcpe\CompressBatchedTask; +use pocketmine\network\mcpe\CompressBatchPromise; +use pocketmine\network\mcpe\CompressBatchTask; use pocketmine\network\mcpe\NetworkCipher; use pocketmine\network\mcpe\NetworkCompression; use pocketmine\network\mcpe\NetworkSession; @@ -1899,55 +1900,54 @@ class Server{ $stream->putPacket($packet); } - //TODO: if under the compression threshold, add to session buffers instead of batching (first we need to implement buffering!) - $this->batchPackets($targets, $stream); + if(NetworkCompression::$THRESHOLD < 0 or strlen($stream->buffer) < NetworkCompression::$THRESHOLD){ + foreach($targets as $target){ + foreach($ev->getPackets() as $pk){ + $target->addToSendBuffer($pk); + } + } + }else{ + $promise = $this->prepareBatch($stream); + foreach($targets as $target){ + $target->queueCompressed($promise); + } + } + return true; } /** * Broadcasts a list of packets in a batch to a list of players * - * @param NetworkSession[] $targets - * @param PacketStream $stream - * @param bool $forceSync - * @param bool $immediate + * @param PacketStream $stream + * @param bool $forceSync + * + * @return CompressBatchPromise */ - public function batchPackets(array $targets, PacketStream $stream, bool $forceSync = false, bool $immediate = false){ - Timings::$playerNetworkSendCompressTimer->startTiming(); + public function prepareBatch(PacketStream $stream, bool $forceSync = false) : CompressBatchPromise{ + try{ + Timings::$playerNetworkSendCompressTimer->startTiming(); - if(!empty($targets)){ $compressionLevel = NetworkCompression::$LEVEL; if(NetworkCompression::$THRESHOLD < 0 or strlen($stream->buffer) < NetworkCompression::$THRESHOLD){ $compressionLevel = 0; //Do not compress packets under the threshold $forceSync = true; } - if(!$forceSync and !$immediate and $this->networkCompressionAsync){ - $task = new CompressBatchedTask($stream, $targets, $compressionLevel); + $promise = new CompressBatchPromise(); + if(!$forceSync and $this->networkCompressionAsync){ + $task = new CompressBatchTask($stream, $compressionLevel, $promise); $this->asyncPool->submitTask($task); }else{ - $this->broadcastPacketsCallback(NetworkCompression::compress($stream->buffer), $targets, $immediate); + $promise->resolve(NetworkCompression::compress($stream->buffer)); } - } - Timings::$playerNetworkSendCompressTimer->stopTiming(); - } - - /** - * @param string $payload - * @param NetworkSession[] $sessions - * @param bool $immediate - */ - public function broadcastPacketsCallback(string $payload, array $sessions, bool $immediate = false){ - /** @var NetworkSession $session */ - foreach($sessions as $session){ - if($session->isConnected()){ - $session->sendEncoded($payload, $immediate); - } + return $promise; + }finally{ + Timings::$playerNetworkSendCompressTimer->stopTiming(); } } - /** * @param int $type */ @@ -2542,10 +2542,6 @@ class Server{ ++$this->tickCounter; - Timings::$connectionTimer->startTiming(); - $this->network->tick(); - Timings::$connectionTimer->stopTiming(); - Timings::$schedulerTimer->startTiming(); $this->pluginManager->tickSchedulers($this->tickCounter); Timings::$schedulerTimer->stopTiming(); @@ -2556,6 +2552,10 @@ class Server{ $this->checkTickUpdates($this->tickCounter); + Timings::$connectionTimer->startTiming(); + $this->network->tick(); + Timings::$connectionTimer->stopTiming(); + if(($this->tickCounter % 20) === 0){ if($this->doTitleTick){ $this->titleTick(); diff --git a/src/pocketmine/inventory/CraftingManager.php b/src/pocketmine/inventory/CraftingManager.php index 414294d91..0b51e998a 100644 --- a/src/pocketmine/inventory/CraftingManager.php +++ b/src/pocketmine/inventory/CraftingManager.php @@ -25,6 +25,7 @@ namespace pocketmine\inventory; use pocketmine\item\Item; use pocketmine\item\ItemFactory; +use pocketmine\network\mcpe\CompressBatchPromise; use pocketmine\network\mcpe\NetworkCompression; use pocketmine\network\mcpe\PacketStream; use pocketmine\network\mcpe\protocol\CraftingDataPacket; @@ -38,7 +39,7 @@ class CraftingManager{ /** @var FurnaceRecipe[] */ protected $furnaceRecipes = []; - /** @var string */ + /** @var CompressBatchPromise */ private $craftingDataCache; public function __construct(){ @@ -105,16 +106,18 @@ class CraftingManager{ $batch = new PacketStream(); $batch->putPacket($pk); - $this->craftingDataCache = NetworkCompression::compress($batch->buffer); + $this->craftingDataCache = new CompressBatchPromise(); + $this->craftingDataCache->resolve(NetworkCompression::compress($batch->buffer)); + Timings::$craftingDataCacheRebuildTimer->stopTiming(); } /** * Returns a pre-compressed CraftingDataPacket for sending to players. Rebuilds the cache if it is not found. * - * @return string + * @return CompressBatchPromise */ - public function getCraftingDataPacket() : string{ + public function getCraftingDataPacket() : CompressBatchPromise{ if($this->craftingDataCache === null){ $this->buildCraftingDataCache(); } diff --git a/src/pocketmine/level/Level.php b/src/pocketmine/level/Level.php index b8231d979..a7c0613ff 100644 --- a/src/pocketmine/level/Level.php +++ b/src/pocketmine/level/Level.php @@ -69,6 +69,7 @@ use pocketmine\metadata\MetadataValue; use pocketmine\nbt\tag\ListTag; use pocketmine\nbt\tag\StringTag; use pocketmine\network\mcpe\ChunkRequestTask; +use pocketmine\network\mcpe\CompressBatchPromise; use pocketmine\network\mcpe\protocol\DataPacket; use pocketmine\network\mcpe\protocol\LevelEventPacket; use pocketmine\network\mcpe\protocol\LevelSoundEventPacket; @@ -122,7 +123,7 @@ class Level implements ChunkManager, Metadatable{ /** @var Block[][] */ private $blockCache = []; - /** @var string[] */ + /** @var CompressBatchPromise[] */ private $chunkCache = []; /** @var int */ @@ -2451,7 +2452,7 @@ class Level implements ChunkManager, Metadatable{ $this->chunkSendQueue[$index][$player->getLoaderId()] = $player; } - private function sendChunkFromCache(int $x, int $z){ + private function sendCachedChunk(int $x, int $z){ if(isset($this->chunkSendQueue[$index = Level::chunkHash($x, $z)])){ foreach($this->chunkSendQueue[$index] as $player){ /** @var Player $player */ @@ -2479,10 +2480,12 @@ class Level implements ChunkManager, Metadatable{ continue; } } + if(isset($this->chunkCache[$index])){ - $this->sendChunkFromCache($x, $z); + $this->sendCachedChunk($x, $z); continue; } + $this->timings->syncChunkSendPrepareTimer->startTiming(); $chunk = $this->chunks[$index] ?? null; @@ -2491,7 +2494,30 @@ class Level implements ChunkManager, Metadatable{ } assert($chunk->getX() === $x and $chunk->getZ() === $z, "Chunk coordinate mismatch: expected $x $z, but chunk has coordinates " . $chunk->getX() . " " . $chunk->getZ() . ", did you forget to clone a chunk before setting?"); - $this->server->getAsyncPool()->submitTask($task = new ChunkRequestTask($this, $x, $z, $chunk)); + /* + * we don't send promises directly to the players here because unresolved promises of chunk sending + * would slow down the sending of other packets, especially if a chunk takes a long time to prepare. + */ + + $promise = new CompressBatchPromise(); + $promise->onResolve(function(CompressBatchPromise $promise) use ($x, $z, $index): void{ + if(!$this->closed){ + $this->timings->syncChunkSendTimer->startTiming(); + + unset($this->chunkSendTasks[$index]); + + $this->chunkCache[$index] = $promise; + $this->sendCachedChunk($x, $z); + if(!$this->server->getMemoryManager()->canUseChunkCache()){ + unset($this->chunkCache[$index]); + } + + $this->timings->syncChunkSendTimer->stopTiming(); + }else{ + $this->server->getLogger()->debug("Dropped prepared chunk $x $z due to level not loaded"); + } + }); + $this->server->getAsyncPool()->submitTask($task = new ChunkRequestTask($x, $z, $chunk, $promise)); $this->chunkSendTasks[$index] = $task; $this->timings->syncChunkSendPrepareTimer->stopTiming(); @@ -2501,21 +2527,6 @@ class Level implements ChunkManager, Metadatable{ } } - public function chunkRequestCallback(int $x, int $z, string $payload){ - $this->timings->syncChunkSendTimer->startTiming(); - - $index = Level::chunkHash($x, $z); - unset($this->chunkSendTasks[$index]); - - $this->chunkCache[$index] = $payload; - $this->sendChunkFromCache($x, $z); - if(!$this->server->getMemoryManager()->canUseChunkCache()){ - unset($this->chunkCache[$index]); - } - - $this->timings->syncChunkSendTimer->stopTiming(); - } - /** * @param Entity $entity * diff --git a/src/pocketmine/network/mcpe/ChunkRequestTask.php b/src/pocketmine/network/mcpe/ChunkRequestTask.php index b12d31db1..885bb44f4 100644 --- a/src/pocketmine/network/mcpe/ChunkRequestTask.php +++ b/src/pocketmine/network/mcpe/ChunkRequestTask.php @@ -24,7 +24,6 @@ declare(strict_types=1); namespace pocketmine\network\mcpe; use pocketmine\level\format\Chunk; -use pocketmine\level\Level; use pocketmine\network\mcpe\protocol\FullChunkDataPacket; use pocketmine\scheduler\AsyncTask; use pocketmine\Server; @@ -42,8 +41,7 @@ class ChunkRequestTask extends AsyncTask{ /** @var int */ protected $compressionLevel; - public function __construct(Level $level, int $chunkX, int $chunkZ, Chunk $chunk){ - $this->storeLocal($level); + public function __construct(int $chunkX, int $chunkZ, Chunk $chunk, CompressBatchPromise $promise){ $this->compressionLevel = NetworkCompression::$LEVEL; $this->chunk = $chunk->fastSerialize(); @@ -59,6 +57,8 @@ class ChunkRequestTask extends AsyncTask{ } $this->tiles = $tiles; + + $this->storeLocal($promise); } public function onRun() : void{ @@ -76,14 +76,8 @@ class ChunkRequestTask extends AsyncTask{ } public function onCompletion(Server $server) : void{ - /** @var Level $level */ - $level = $this->fetchLocal(); - if(!$level->isClosed()){ - if($this->hasResult()){ - $level->chunkRequestCallback($this->chunkX, $this->chunkZ, $this->getResult()); - }else{ - $level->getServer()->getLogger()->error("Chunk request for level " . $level->getName() . ", x=" . $this->chunkX . ", z=" . $this->chunkZ . " doesn't have any result data"); - } - } + /** @var CompressBatchPromise $promise */ + $promise = $this->fetchLocal(); + $promise->resolve($this->getResult()); } } diff --git a/src/pocketmine/network/mcpe/CompressBatchPromise.php b/src/pocketmine/network/mcpe/CompressBatchPromise.php new file mode 100644 index 000000000..6707c08c2 --- /dev/null +++ b/src/pocketmine/network/mcpe/CompressBatchPromise.php @@ -0,0 +1,62 @@ +result !== null){ + $callback($this); + }else{ + $this->callbacks[] = $callback; + } + } + + public function resolve(string $result) : void{ + if($this->result !== null){ + throw new \InvalidStateException("Cannot resolve promise more than once"); + } + $this->result = $result; + foreach($this->callbacks as $callback){ + $callback($this); + } + $this->callbacks = []; + } + + public function getResult() : string{ + if($this->result === null){ + throw new \InvalidStateException("Promise has not yet been resolved"); + } + return $this->result; + } + + public function hasResult() : bool{ + return $this->result !== null; + } +} diff --git a/src/pocketmine/network/mcpe/CompressBatchedTask.php b/src/pocketmine/network/mcpe/CompressBatchTask.php similarity index 72% rename from src/pocketmine/network/mcpe/CompressBatchedTask.php rename to src/pocketmine/network/mcpe/CompressBatchTask.php index c9366b22a..860460315 100644 --- a/src/pocketmine/network/mcpe/CompressBatchedTask.php +++ b/src/pocketmine/network/mcpe/CompressBatchTask.php @@ -26,20 +26,20 @@ namespace pocketmine\network\mcpe; use pocketmine\scheduler\AsyncTask; use pocketmine\Server; -class CompressBatchedTask extends AsyncTask{ +class CompressBatchTask extends AsyncTask{ private $level; private $data; /** - * @param PacketStream $stream - * @param NetworkSession[] $targets - * @param int $compressionLevel + * @param PacketStream $stream + * @param int $compressionLevel + * @param CompressBatchPromise $promise */ - public function __construct(PacketStream $stream, array $targets, int $compressionLevel){ + public function __construct(PacketStream $stream, int $compressionLevel, CompressBatchPromise $promise){ $this->data = $stream->buffer; $this->level = $compressionLevel; - $this->storeLocal($targets); + $this->storeLocal($promise); } public function onRun() : void{ @@ -47,9 +47,8 @@ class CompressBatchedTask extends AsyncTask{ } public function onCompletion(Server $server) : void{ - /** @var NetworkSession[] $targets */ - $targets = $this->fetchLocal(); - - $server->broadcastPacketsCallback($this->getResult(), $targets); + /** @var CompressBatchPromise $promise */ + $promise = $this->fetchLocal(); + $promise->resolve($this->getResult()); } } diff --git a/src/pocketmine/network/mcpe/NetworkSession.php b/src/pocketmine/network/mcpe/NetworkSession.php index f4d38224a..eff34d202 100644 --- a/src/pocketmine/network/mcpe/NetworkSession.php +++ b/src/pocketmine/network/mcpe/NetworkSession.php @@ -68,12 +68,20 @@ class NetworkSession{ /** @var NetworkCipher */ private $cipher; + /** @var PacketStream|null */ + private $sendBuffer; + + /** @var \SplQueue|CompressBatchPromise[] */ + private $compressedQueue; + public function __construct(Server $server, NetworkInterface $interface, string $ip, int $port){ $this->server = $server; $this->interface = $interface; $this->ip = $ip; $this->port = $port; + $this->compressedQueue = new \SplQueue(); + $this->connectTime = time(); $this->server->getNetwork()->scheduleSessionTick($this); @@ -206,10 +214,10 @@ class NetworkSession{ return false; } - //TODO: implement buffering (this is just a quick fix) - $stream = new PacketStream(); - $stream->putPacket($packet); - $this->server->batchPackets([$this], $stream, true, $immediate); + $this->addToSendBuffer($packet); + if($immediate){ + $this->flushSendBuffer(true); + } return true; }finally{ @@ -217,7 +225,62 @@ class NetworkSession{ } } - public function sendEncoded(string $payload, bool $immediate = false) : void{ + /** + * @internal + * @param DataPacket $packet + */ + public function addToSendBuffer(DataPacket $packet) : void{ + $timings = Timings::getSendDataPacketTimings($packet); + $timings->startTiming(); + try{ + if($this->sendBuffer === null){ + $this->sendBuffer = new PacketStream(); + } + $this->sendBuffer->putPacket($packet); + $this->server->getNetwork()->scheduleSessionTick($this); + }finally{ + $timings->stopTiming(); + } + } + + private function flushSendBuffer(bool $immediate = false) : void{ + if($this->sendBuffer !== null){ + $promise = $this->server->prepareBatch($this->sendBuffer, $immediate); + $this->sendBuffer = null; + $this->queueCompressed($promise, $immediate); + } + } + + public function queueCompressed(CompressBatchPromise $payload, bool $immediate = false) : void{ + $this->flushSendBuffer($immediate); //Maintain ordering if possible + if($immediate){ + //Skips all queues + $this->sendEncoded($payload->getResult(), true); + }else{ + $this->compressedQueue->enqueue($payload); + $payload->onResolve(function(CompressBatchPromise $payload) : void{ + if($this->connected and $this->compressedQueue->bottom() === $payload){ + $this->compressedQueue->dequeue(); //result unused + $this->sendEncoded($payload->getResult()); + + while(!$this->compressedQueue->isEmpty()){ + /** @var CompressBatchPromise $current */ + $current = $this->compressedQueue->bottom(); + if($current->hasResult()){ + $this->compressedQueue->dequeue(); + + $this->sendEncoded($current->getResult()); + }else{ + //can't send any more queued until this one is ready + break; + } + } + } + }); + } + } + + private function sendEncoded(string $payload, bool $immediate = false) : void{ if($this->cipher !== null){ Timings::$playerNetworkSendEncryptTimer->startTiming(); $payload = $this->cipher->encrypt($payload); @@ -289,6 +352,8 @@ class NetworkSession{ $this->handler = null; $this->interface = null; $this->player = null; + $this->sendBuffer = null; + $this->compressedQueue = null; } public function enableEncryption(string $encryptionKey, string $handshakeJwt) : void{ @@ -345,7 +410,10 @@ class NetworkSession{ return true; //keep ticking until timeout } - //TODO: more stuff on tick + if($this->sendBuffer !== null){ + $this->flushSendBuffer(); + } + return false; } } diff --git a/src/pocketmine/network/mcpe/handler/PreSpawnSessionHandler.php b/src/pocketmine/network/mcpe/handler/PreSpawnSessionHandler.php index 1c18c8aa9..69083f014 100644 --- a/src/pocketmine/network/mcpe/handler/PreSpawnSessionHandler.php +++ b/src/pocketmine/network/mcpe/handler/PreSpawnSessionHandler.php @@ -87,7 +87,7 @@ class PreSpawnSessionHandler extends SessionHandler{ $this->player->sendAllInventories(); $this->player->getInventory()->sendCreativeContents(); $this->player->getInventory()->sendHeldItem($this->player); - $this->session->sendEncoded($this->server->getCraftingManager()->getCraftingDataPacket()); + $this->session->queueCompressed($this->server->getCraftingManager()->getCraftingDataPacket()); $this->server->sendFullPlayerListData($this->player); }