From 939dfd9269df0feaff5b96d0dd628055b9706b95 Mon Sep 17 00:00:00 2001 From: "Dylan K. Taylor" Date: Wed, 17 Apr 2019 19:33:37 +0100 Subject: [PATCH] First look at separating chunk sending from Level --- src/pocketmine/Player.php | 39 +-- src/pocketmine/level/Level.php | 79 +----- src/pocketmine/network/mcpe/ChunkCache.php | 231 ++++++++++++++++++ .../network/mcpe/CompressBatchPromise.php | 62 ++++- .../network/mcpe/NetworkSession.php | 37 +++ 5 files changed, 337 insertions(+), 111 deletions(-) create mode 100644 src/pocketmine/network/mcpe/ChunkCache.php diff --git a/src/pocketmine/Player.php b/src/pocketmine/Player.php index b1afebec1..d76212d0a 100644 --- a/src/pocketmine/Player.php +++ b/src/pocketmine/Player.php @@ -96,7 +96,6 @@ use pocketmine\nbt\tag\CompoundTag; use pocketmine\nbt\tag\DoubleTag; use pocketmine\nbt\tag\IntTag; use pocketmine\nbt\tag\ListTag; -use pocketmine\network\mcpe\CompressBatchPromise; use pocketmine\network\mcpe\NetworkSession; use pocketmine\network\mcpe\protocol\AnimatePacket; use pocketmine\network\mcpe\protocol\BookEditPacket; @@ -954,12 +953,7 @@ class Player extends Human implements CommandSender, ChunkLoader, ChunkListener, $level = $level ?? $this->level; $index = Level::chunkHash($x, $z); if(isset($this->usedChunks[$index])){ - foreach($level->getChunkEntities($x, $z) as $entity){ - if($entity !== $this){ - $entity->despawnFrom($this); - } - } - + $this->networkSession->stopUsingChunk($x, $z); unset($this->usedChunks[$index]); } $level->unregisterChunkLoader($this, $x, $z); @@ -967,39 +961,24 @@ class Player extends Human implements CommandSender, ChunkLoader, ChunkListener, unset($this->loadQueue[$index]); } - public function sendChunk(int $x, int $z, CompressBatchPromise $promise){ + public function onChunkReady(int $x, int $z){ if(!$this->isConnected()){ return; } + assert(isset($this->usedChunks[Level::chunkHash($x, $z)])); $this->usedChunks[Level::chunkHash($x, $z)] = true; - $this->networkSession->queueCompressed($promise); + $spawn = ++$this->spawnChunkLoadCount === $this->spawnThreshold; + $this->networkSession->startUsingChunk($x, $z, $spawn); - if($this->spawned){ - foreach($this->level->getChunkEntities($x, $z) as $entity){ - if($entity !== $this and !$entity->isClosed() and $entity->isAlive()){ - $entity->spawnTo($this); - } - } - }elseif(++$this->spawnChunkLoadCount >= $this->spawnThreshold){ - $this->spawnChunkLoadCount = -1; + if($spawn){ + //TODO: not sure this should be here $this->spawned = true; - - foreach($this->usedChunks as $index => $c){ - Level::getXZ($index, $chunkX, $chunkZ); - foreach($this->level->getChunkEntities($chunkX, $chunkZ) as $entity){ - if($entity !== $this and !$entity->isClosed() and $entity->isAlive() and !$entity->isFlaggedForDespawn()){ - $entity->spawnTo($this); - } - } - } - - $this->networkSession->onTerrainReady(); } } - protected function sendNextChunk(){ + protected function requestChunks(){ if(!$this->isConnected()){ return; } @@ -1146,7 +1125,7 @@ class Player extends Human implements CommandSender, ChunkLoader, ChunkListener, } if(count($this->loadQueue) > 0){ - $this->sendNextChunk(); + $this->requestChunks(); } } diff --git a/src/pocketmine/level/Level.php b/src/pocketmine/level/Level.php index be360ae48..66701c559 100644 --- a/src/pocketmine/level/Level.php +++ b/src/pocketmine/level/Level.php @@ -72,8 +72,6 @@ use pocketmine\metadata\Metadatable; use pocketmine\metadata\MetadataValue; use pocketmine\nbt\tag\ListTag; use pocketmine\nbt\tag\StringTag; -use pocketmine\network\mcpe\ChunkRequestTask; -use pocketmine\network\mcpe\CompressBatchPromise; use pocketmine\network\mcpe\protocol\BlockEntityDataPacket; use pocketmine\network\mcpe\protocol\ClientboundPacket; use pocketmine\network\mcpe\protocol\LevelEventPacket; @@ -148,9 +146,6 @@ class Level implements ChunkManager, Metadatable{ /** @var Block[][] */ private $blockCache = []; - /** @var CompressBatchPromise[] */ - private $chunkCache = []; - /** @var int */ private $sendTimeTicker = 0; @@ -221,8 +216,6 @@ class Level implements ChunkManager, Metadatable{ /** @var Player[][] */ private $chunkSendQueue = []; - /** @var ChunkRequestTask[] */ - private $chunkSendTasks = []; /** @var bool[] */ private $chunkPopulationQueue = []; @@ -843,7 +836,6 @@ class Level implements ChunkManager, Metadatable{ if(empty($blocks)){ //blocks can be set normally and then later re-set with direct send continue; } - unset($this->chunkCache[$index]); Level::getXZ($index, $chunkX, $chunkZ); if(count($blocks) > 512){ $chunk = $this->getChunk($chunkX, $chunkZ); @@ -854,8 +846,6 @@ class Level implements ChunkManager, Metadatable{ $this->sendBlocks($this->getChunkPlayers($chunkX, $chunkZ), $blocks); } } - }else{ - $this->chunkCache = []; } $this->changedBlocks = []; @@ -960,7 +950,6 @@ class Level implements ChunkManager, Metadatable{ public function clearCache(bool $force = false){ if($force){ - $this->chunkCache = []; $this->blockCache = []; }else{ $count = 0; @@ -974,10 +963,6 @@ class Level implements ChunkManager, Metadatable{ } } - public function clearChunkCache(int $chunkX, int $chunkZ){ - unset($this->chunkCache[Level::chunkHash($chunkX, $chunkZ)]); - } - public function getRandomTickedBlocks() : \SplFixedArray{ return $this->randomTickBlocks; } @@ -2319,17 +2304,13 @@ class Level implements ChunkManager, Metadatable{ $this->chunks[$chunkHash] = $chunk; unset($this->blockCache[$chunkHash]); - unset($this->chunkCache[$chunkHash]); unset($this->changedBlocks[$chunkHash]); - if(isset($this->chunkSendTasks[$chunkHash])){ //invalidate pending caches - $this->chunkSendTasks[$chunkHash]->cancelRun(); - unset($this->chunkSendTasks[$chunkHash]); - } $chunk->setChanged(); if(!$this->isChunkInUse($chunkX, $chunkZ)){ $this->unloadChunkRequest($chunkX, $chunkZ); } + foreach($this->getChunkListeners($chunkX, $chunkZ) as $listener){ $listener->onChunkChanged($chunk); } @@ -2419,13 +2400,11 @@ class Level implements ChunkManager, Metadatable{ $this->chunkSendQueue[$index][spl_object_id($player)] = $player; } - private function sendCachedChunk(int $x, int $z){ + private function onChunkReady(int $x, int $z){ if(isset($this->chunkSendQueue[$index = Level::chunkHash($x, $z)])){ foreach($this->chunkSendQueue[$index] as $player){ /** @var Player $player */ - if($player->isConnected() and isset($player->usedChunks[$index])){ - $player->sendChunk($x, $z, $this->chunkCache[$index]); - } + $player->onChunkReady($x, $z); } unset($this->chunkSendQueue[$index]); } @@ -2438,55 +2417,13 @@ class Level implements ChunkManager, Metadatable{ foreach($this->chunkSendQueue as $index => $players){ Level::getXZ($index, $x, $z); - if(isset($this->chunkSendTasks[$index])){ - //Not ready for sending yet - continue; - } - - if(isset($this->chunkCache[$index])){ - $this->sendCachedChunk($x, $z); - continue; - } - $this->timings->syncChunkSendPrepareTimer->startTiming(); $chunk = $this->chunks[$index] ?? null; - if(!($chunk instanceof Chunk)){ + if($chunk === null or !$chunk->isGenerated() or !$chunk->isPopulated()){ throw new ChunkException("Invalid Chunk sent"); } - assert($chunk->getX() === $x and $chunk->getZ() === $z, "Chunk coordinate mismatch: expected $x $z, but chunk has coordinates " . $chunk->getX() . " " . $chunk->getZ() . ", did you forget to clone a chunk before setting?"); - - /* - * we don't send promises directly to the players here because unresolved promises of chunk sending - * would slow down the sending of other packets, especially if a chunk takes a long time to prepare. - */ - - $promise = new CompressBatchPromise(); - $promise->onResolve(function(CompressBatchPromise $promise) use ($x, $z, $index): void{ - if(!$this->closed){ - $this->timings->syncChunkSendTimer->startTiming(); - - unset($this->chunkSendTasks[$index]); - - $this->chunkCache[$index] = $promise; - $this->sendCachedChunk($x, $z); - if(!$this->server->getMemoryManager()->canUseChunkCache()){ - unset($this->chunkCache[$index]); - } - - $this->timings->syncChunkSendTimer->stopTiming(); - }else{ - $this->server->getLogger()->debug("Dropped prepared chunk $x $z due to world not loaded"); - } - }); - $this->server->getAsyncPool()->submitTask($task = new ChunkRequestTask($x, $z, $chunk, $promise, function() use($index, $x, $z){ - if(isset($this->chunkSendTasks[$index])){ - unset($this->chunkSendTasks[$index]); - $this->server->getLogger()->error("Failed to prepare chunk $x $z for sending, retrying"); - } - })); - $this->chunkSendTasks[$index] = $task; - + $this->onChunkReady($x, $z); $this->timings->syncChunkSendPrepareTimer->stopTiming(); } @@ -2577,7 +2514,9 @@ class Level implements ChunkManager, Metadatable{ if(isset($this->chunks[$hash = Level::chunkHash($chunkX, $chunkZ)])){ $this->chunks[$hash]->removeTile($tile); } - $this->clearChunkCache($chunkX, $chunkZ); + foreach($this->getChunkListeners($chunkX, $chunkZ) as $listener){ + $listener->onBlockChanged($tile); + } } /** @@ -2712,11 +2651,9 @@ class Level implements ChunkManager, Metadatable{ } unset($this->chunks[$chunkHash]); - unset($this->chunkCache[$chunkHash]); unset($this->blockCache[$chunkHash]); unset($this->changedBlocks[$chunkHash]); unset($this->chunkSendQueue[$chunkHash]); - unset($this->chunkSendTasks[$chunkHash]); $this->timings->doChunkUnload->stopTiming(); diff --git a/src/pocketmine/network/mcpe/ChunkCache.php b/src/pocketmine/network/mcpe/ChunkCache.php new file mode 100644 index 000000000..dcadd609d --- /dev/null +++ b/src/pocketmine/network/mcpe/ChunkCache.php @@ -0,0 +1,231 @@ +world = $world; + } + + /** + * Requests asynchronous preparation of the chunk at the given coordinates. + * + * @param int $chunkX + * @param int $chunkZ + * + * @return CompressBatchPromise a promise of resolution which will contain a compressed chunk packet. + */ + public function request(int $chunkX, int $chunkZ) : CompressBatchPromise{ + $this->world->registerChunkListener($this, $chunkX, $chunkZ); + $chunkHash = Level::chunkHash($chunkX, $chunkZ); + + if(isset($this->caches[$chunkHash])){ + ++$this->hits; + return $this->caches[$chunkHash]; + } + + ++$this->misses; + + $this->world->timings->syncChunkSendPrepareTimer->startTiming(); + try{ + $this->caches[$chunkHash] = new CompressBatchPromise(); + + $this->world->getServer()->getAsyncPool()->submitTask( + new ChunkRequestTask( + $chunkX, + $chunkZ, + $this->world->getChunk($chunkX, $chunkZ), + $this->caches[$chunkHash], + function() use($chunkX, $chunkZ){ + $this->world->getServer()->getLogger()->error("Failed preparing chunk for " . $this->world->getDisplayName() . " chunk $chunkX $chunkZ, retrying"); + + $this->restartPendingRequest($chunkX, $chunkZ); + } + ) + ); + + return $this->caches[$chunkHash]; + }finally{ + $this->world->timings->syncChunkSendPrepareTimer->stopTiming(); + } + } + + private function destroy(int $chunkX, int $chunkZ) : bool{ + $chunkHash = Level::chunkHash($chunkX, $chunkZ); + $existing = $this->caches[$chunkHash] ?? null; + unset($this->caches[$chunkHash]); + + return $existing !== null; + } + + /** + * Restarts an async request for an unresolved chunk. + * + * @param int $chunkX + * @param int $chunkZ + * + * @throws \InvalidArgumentException + */ + private function restartPendingRequest(int $chunkX, int $chunkZ) : void{ + $chunkHash = Level::chunkHash($chunkX, $chunkZ); + $existing = $this->caches[$chunkHash]; + if($existing === null or $existing->hasResult()){ + throw new \InvalidArgumentException("Restart can only be applied to unresolved promises"); + } + $existing->cancel(); + unset($this->caches[$chunkHash]); + + $this->request($chunkX, $chunkZ)->onResolve(...$existing->getResolveCallbacks()); + } + + /** + * @param int $chunkX + * @param int $chunkZ + * + * @throws \InvalidArgumentException + */ + private function destroyOrRestart(int $chunkX, int $chunkZ) : void{ + $cache = $this->caches[Level::chunkHash($chunkX, $chunkZ)] ?? null; + if($cache !== null){ + if(!$cache->hasResult()){ + //some requesters are waiting for this chunk, so their request needs to be fulfilled + $this->restartPendingRequest($chunkX, $chunkZ); + }else{ + //dump the cache, it'll be regenerated the next time it's requested + $this->destroy($chunkX, $chunkZ); + } + } + } + + /** + * @see ChunkListener::onChunkChanged() + * @param Chunk $chunk + */ + public function onChunkChanged(Chunk $chunk) : void{ + //FIXME: this gets fired for stuff that doesn't change terrain related things (like lighting updates) + $this->destroyOrRestart($chunk->getX(), $chunk->getZ()); + } + + /** + * @see ChunkListener::onBlockChanged() + * @param Vector3 $block + */ + public function onBlockChanged(Vector3 $block) : void{ + //FIXME: requesters will still receive this chunk after it's been dropped, but we can't mark this for a simple + //sync here because it can spam the worker pool + $this->destroy($block->getFloorX() >> 4, $block->getFloorZ() >> 4); + } + + /** + * @see ChunkListener::onChunkUnloaded() + * @param Chunk $chunk + */ + public function onChunkUnloaded(Chunk $chunk) : void{ + $this->destroy($chunk->getX(), $chunk->getZ()); + $this->world->unregisterChunkListener($this, $chunk->getX(), $chunk->getZ()); + } + + /** + * @see ChunkListener::onChunkLoaded() + * @param Chunk $chunk + */ + public function onChunkLoaded(Chunk $chunk) : void{ + //NOOP + } + + /** + * @see ChunkListener::onChunkPopulated() + * @param Chunk $chunk + */ + public function onChunkPopulated(Chunk $chunk) : void{ + //NOOP - we also receive this in onChunkChanged, so we don't care here + } + + /** + * Returns the number of bytes occupied by the cache data in this cache. This does not include the size of any + * promises referenced by the cache. + * + * @return int + */ + public function calculateCacheSize() : int{ + $result = 0; + foreach($this->caches as $cache){ + if($cache->hasResult()){ + $result += strlen($cache->getResult()); + } + } + return $result; + } + + /** + * Returns the percentage of requests to the cache which resulted in a cache hit. + * + * @return float + */ + public function getHitPercentage() : float{ + $total = $this->hits + $this->misses; + return $total > 0 ? $this->hits / $total : 0.0; + } +} diff --git a/src/pocketmine/network/mcpe/CompressBatchPromise.php b/src/pocketmine/network/mcpe/CompressBatchPromise.php index 6707c08c2..f58ef6ca0 100644 --- a/src/pocketmine/network/mcpe/CompressBatchPromise.php +++ b/src/pocketmine/network/mcpe/CompressBatchPromise.php @@ -23,6 +23,9 @@ declare(strict_types=1); namespace pocketmine\network\mcpe; +use pocketmine\utils\Utils; +use function array_push; + class CompressBatchPromise{ /** @var callable[] */ private $callbacks = []; @@ -30,26 +33,45 @@ class CompressBatchPromise{ /** @var string|null */ private $result = null; - public function onResolve(callable $callback) : void{ + /** @var bool */ + private $cancelled = false; + + public function onResolve(callable ...$callbacks) : void{ + $this->checkCancelled(); + foreach($callbacks as $callback){ + Utils::validateCallableSignature(function(CompressBatchPromise $promise){}, $callback); + } if($this->result !== null){ - $callback($this); + foreach($callbacks as $callback){ + $callback($this); + } }else{ - $this->callbacks[] = $callback; + array_push($this->callbacks, ...$callbacks); } } public function resolve(string $result) : void{ - if($this->result !== null){ - throw new \InvalidStateException("Cannot resolve promise more than once"); + if(!$this->cancelled){ + if($this->result !== null){ + throw new \InvalidStateException("Cannot resolve promise more than once"); + } + $this->result = $result; + foreach($this->callbacks as $callback){ + $callback($this); + } + $this->callbacks = []; } - $this->result = $result; - foreach($this->callbacks as $callback){ - $callback($this); - } - $this->callbacks = []; + } + + /** + * @return callable[] + */ + public function getResolveCallbacks() : array{ + return $this->callbacks; } public function getResult() : string{ + $this->checkCancelled(); if($this->result === null){ throw new \InvalidStateException("Promise has not yet been resolved"); } @@ -59,4 +81,24 @@ class CompressBatchPromise{ public function hasResult() : bool{ return $this->result !== null; } + + /** + * @return bool + */ + public function isCancelled() : bool{ + return $this->cancelled; + } + + public function cancel() : void{ + if($this->hasResult()){ + throw new \InvalidStateException("Cannot cancel a resolved promise"); + } + $this->cancelled = true; + } + + private function checkCancelled() : void{ + if($this->cancelled){ + throw new \InvalidArgumentException("Promise has been cancelled"); + } + } } diff --git a/src/pocketmine/network/mcpe/NetworkSession.php b/src/pocketmine/network/mcpe/NetworkSession.php index 838e4cc6b..3a76c908b 100644 --- a/src/pocketmine/network/mcpe/NetworkSession.php +++ b/src/pocketmine/network/mcpe/NetworkSession.php @@ -744,6 +744,43 @@ class NetworkSession{ return $this->sendDataPacket($pk); } + public function startUsingChunk(int $chunkX, int $chunkZ, bool $spawn = false) : void{ + ChunkCache::getInstance($this->player->getLevel())->request($chunkX, $chunkZ)->onResolve( + + //this callback may be called synchronously or asynchronously, depending on whether the promise is resolved yet + function(CompressBatchPromise $promise) use($chunkX, $chunkZ, $spawn){ + if(!$this->isConnected()){ + return; + } + $this->player->level->timings->syncChunkSendTimer->startTiming(); + try{ + $this->queueCompressed($promise); + + foreach($this->player->getLevel()->getChunkEntities($chunkX, $chunkZ) as $entity){ + if($entity !== $this->player and !$entity->isClosed() and !$entity->isFlaggedForDespawn()){ + $entity->spawnTo($this->player); + } + } + + if($spawn){ + //TODO: potential race condition during chunk sending could cause this to be called too early + $this->onTerrainReady(); + } + }finally{ + $this->player->level->timings->syncChunkSendTimer->stopTiming(); + } + } + ); + } + + public function stopUsingChunk(int $chunkX, int $chunkZ) : void{ + foreach($this->player->getLevel()->getChunkEntities($chunkX, $chunkZ) as $entity){ + if($entity !== $this->player){ + $entity->despawnFrom($this->player); + } + } + } + public function tick() : bool{ if($this->handler instanceof LoginSessionHandler){ if(time() >= $this->connectTime + 10){