diff --git a/src/network/mcpe/NetworkSession.php b/src/network/mcpe/NetworkSession.php index 78b11e27e..98460bef3 100644 --- a/src/network/mcpe/NetworkSession.php +++ b/src/network/mcpe/NetworkSession.php @@ -115,6 +115,7 @@ use pocketmine\utils\ObjectSet; use pocketmine\utils\TextFormat; use pocketmine\world\format\io\GlobalItemDataHandlers; use pocketmine\world\Position; +use pocketmine\world\World; use pocketmine\YmlServerProperties; use function array_map; use function array_values; @@ -1178,6 +1179,19 @@ class NetworkSession{ $this->sendDataPacket(ClientboundCloseFormPacket::create()); } + /** + * @phpstan-param \Closure() : void $onCompletion + */ + private function sendChunkPacket(string $chunkPacket, \Closure $onCompletion, World $world) : void{ + $world->timings->syncChunkSend->startTiming(); + try{ + $this->queueCompressed($chunkPacket); + $onCompletion(); + }finally{ + $world->timings->syncChunkSend->stopTiming(); + } + } + /** * Instructs the networksession to start using the chunk at the given coordinates. This may occur asynchronously. * @param \Closure $onCompletion To be called when chunk sending has completed. @@ -1185,8 +1199,12 @@ class NetworkSession{ */ public function startUsingChunk(int $chunkX, int $chunkZ, \Closure $onCompletion) : void{ $world = $this->player->getLocation()->getWorld(); - ChunkCache::getInstance($world, $this->compressor)->request($chunkX, $chunkZ)->onResolve( - + $promiseOrPacket = ChunkCache::getInstance($world, $this->compressor)->request($chunkX, $chunkZ); + if(is_string($promiseOrPacket)){ + $this->sendChunkPacket($promiseOrPacket, $onCompletion, $world); + return; + } + $promiseOrPacket->onResolve( //this callback may be called synchronously or asynchronously, depending on whether the promise is resolved yet function(CompressBatchPromise $promise) use ($world, $onCompletion, $chunkX, $chunkZ) : void{ if(!$this->isConnected()){ @@ -1204,13 +1222,7 @@ class NetworkSession{ //to NEEDED if they want to be resent. return; } - $world->timings->syncChunkSend->startTiming(); - try{ - $this->queueCompressed($promise); - $onCompletion(); - }finally{ - $world->timings->syncChunkSend->stopTiming(); - } + $this->sendChunkPacket($promise->getResult(), $onCompletion, $world); } ); } diff --git a/src/network/mcpe/cache/ChunkCache.php b/src/network/mcpe/cache/ChunkCache.php index 12e769776..2c5f67f08 100644 --- a/src/network/mcpe/cache/ChunkCache.php +++ b/src/network/mcpe/cache/ChunkCache.php @@ -32,6 +32,7 @@ use pocketmine\world\ChunkListener; use pocketmine\world\ChunkListenerNoOpTrait; use pocketmine\world\format\Chunk; use pocketmine\world\World; +use function is_string; use function spl_object_id; use function strlen; @@ -69,7 +70,7 @@ class ChunkCache implements ChunkListener{ foreach(self::$instances as $compressorMap){ foreach($compressorMap as $chunkCache){ foreach($chunkCache->caches as $chunkHash => $promise){ - if($promise->hasResult()){ + if(is_string($promise)){ //Do not clear promises that are not yet fulfilled; they will have requesters waiting on them unset($chunkCache->caches[$chunkHash]); } @@ -79,8 +80,8 @@ class ChunkCache implements ChunkListener{ } /** - * @var CompressBatchPromise[] - * @phpstan-var array + * @var CompressBatchPromise[]|string[] + * @phpstan-var array */ private array $caches = []; @@ -92,29 +93,17 @@ class ChunkCache implements ChunkListener{ private Compressor $compressor ){} - /** - * Requests asynchronous preparation of the chunk at the given coordinates. - * - * @return CompressBatchPromise a promise of resolution which will contain a compressed chunk packet. - */ - public function request(int $chunkX, int $chunkZ) : CompressBatchPromise{ + private function prepareChunkAsync(int $chunkX, int $chunkZ, int $chunkHash) : CompressBatchPromise{ $this->world->registerChunkListener($this, $chunkX, $chunkZ); $chunk = $this->world->getChunk($chunkX, $chunkZ); if($chunk === null){ throw new \InvalidArgumentException("Cannot request an unloaded chunk"); } - $chunkHash = World::chunkHash($chunkX, $chunkZ); - - if(isset($this->caches[$chunkHash])){ - ++$this->hits; - return $this->caches[$chunkHash]; - } - ++$this->misses; $this->world->timings->syncChunkSendPrepare->startTiming(); try{ - $this->caches[$chunkHash] = new CompressBatchPromise(); + $promise = new CompressBatchPromise(); $this->world->getServer()->getAsyncPool()->submitTask( new ChunkRequestTask( @@ -122,17 +111,39 @@ class ChunkCache implements ChunkListener{ $chunkZ, DimensionIds::OVERWORLD, //TODO: not hardcode this $chunk, - $this->caches[$chunkHash], + $promise, $this->compressor ) ); + $this->caches[$chunkHash] = $promise; + $promise->onResolve(function(CompressBatchPromise $promise) use ($chunkHash) : void{ + //the promise may have been discarded or replaced if the chunk was unloaded or modified in the meantime + if(($this->caches[$chunkHash] ?? null) === $promise){ + $this->caches[$chunkHash] = $promise->getResult(); + } + }); - return $this->caches[$chunkHash]; + return $promise; }finally{ $this->world->timings->syncChunkSendPrepare->stopTiming(); } } + /** + * Requests asynchronous preparation of the chunk at the given coordinates. + * + * @return CompressBatchPromise|string Compressed chunk packet, or a promise for one to be resolved asynchronously. + */ + public function request(int $chunkX, int $chunkZ) : CompressBatchPromise|string{ + $chunkHash = World::chunkHash($chunkX, $chunkZ); + if(isset($this->caches[$chunkHash])){ + ++$this->hits; + return $this->caches[$chunkHash]; + } + + return $this->prepareChunkAsync($chunkX, $chunkZ, $chunkHash); + } + private function destroy(int $chunkX, int $chunkZ) : bool{ $chunkHash = World::chunkHash($chunkX, $chunkZ); $existing = $this->caches[$chunkHash] ?? null; @@ -148,12 +159,12 @@ class ChunkCache implements ChunkListener{ $chunkPosHash = World::chunkHash($chunkX, $chunkZ); $cache = $this->caches[$chunkPosHash] ?? null; if($cache !== null){ - if(!$cache->hasResult()){ + if(!is_string($cache)){ //some requesters are waiting for this chunk, so their request needs to be fulfilled $cache->cancel(); unset($this->caches[$chunkPosHash]); - $this->request($chunkX, $chunkZ)->onResolve(...$cache->getResolveCallbacks()); + $this->prepareChunkAsync($chunkX, $chunkZ, $chunkPosHash)->onResolve(...$cache->getResolveCallbacks()); }else{ //dump the cache, it'll be regenerated the next time it's requested $this->destroy($chunkX, $chunkZ); @@ -199,8 +210,8 @@ class ChunkCache implements ChunkListener{ public function calculateCacheSize() : int{ $result = 0; foreach($this->caches as $cache){ - if($cache->hasResult()){ - $result += strlen($cache->getResult()); + if(is_string($cache)){ + $result += strlen($cache); } } return $result;