mirror of
https://github.com/pmmp/PocketMine-MP.git
synced 2025-05-29 08:37:13 +00:00
Since task execution depends on tasks executing sequentially on a particular worker in some cases (e.g. PopulationTask must be preceded by GeneratorRegisterTask), it doesn't make sense to continue task execution if an error occurs. Moreover, a task crashing may render the whole server unstable, as it leaves the server in an undefined state. This is the same kind of problem we fixed with scheduled tasks in PM3. In versions past, pthreads was unreliable enough that random tasks would crash without an obvious reason, forcing us to accommodate this. I still don't know the origin or frequency of said issues, but I think it's time to rip the band-aid off and solve these problems for real.
225 lines
6.7 KiB
PHP
225 lines
6.7 KiB
PHP
<?php
|
|
|
|
/*
|
|
*
|
|
* ____ _ _ __ __ _ __ __ ____
|
|
* | _ \ ___ ___| | _____| |_| \/ (_)_ __ ___ | \/ | _ \
|
|
* | |_) / _ \ / __| |/ / _ \ __| |\/| | | '_ \ / _ \_____| |\/| | |_) |
|
|
* | __/ (_) | (__| < __/ |_| | | | | | | | __/_____| | | | __/
|
|
* |_| \___/ \___|_|\_\___|\__|_| |_|_|_| |_|\___| |_| |_|_|
|
|
*
|
|
* This program is free software: you can redistribute it and/or modify
|
|
* it under the terms of the GNU Lesser General Public License as published by
|
|
* the Free Software Foundation, either version 3 of the License, or
|
|
* (at your option) any later version.
|
|
*
|
|
* @author PocketMine Team
|
|
* @link http://www.pocketmine.net/
|
|
*
|
|
*
|
|
*/
|
|
|
|
declare(strict_types=1);
|
|
|
|
namespace pocketmine\network\mcpe\cache;
|
|
|
|
use pocketmine\math\Vector3;
|
|
use pocketmine\network\mcpe\ChunkRequestTask;
|
|
use pocketmine\network\mcpe\compression\CompressBatchPromise;
|
|
use pocketmine\network\mcpe\compression\Compressor;
|
|
use pocketmine\world\ChunkListener;
|
|
use pocketmine\world\ChunkListenerNoOpTrait;
|
|
use pocketmine\world\format\Chunk;
|
|
use pocketmine\world\World;
|
|
use function spl_object_id;
|
|
use function strlen;
|
|
|
|
/**
|
|
* This class is used by the current MCPE protocol system to store cached chunk packets for fast resending.
|
|
*/
|
|
class ChunkCache implements ChunkListener{
|
|
/** @var self[][] */
|
|
private static array $instances = [];
|
|
|
|
/**
|
|
* Fetches the ChunkCache instance for the given world. This lazily creates cache systems as needed.
|
|
*/
|
|
public static function getInstance(World $world, Compressor $compressor) : self{
|
|
$worldId = spl_object_id($world);
|
|
$compressorId = spl_object_id($compressor);
|
|
if(!isset(self::$instances[$worldId])){
|
|
self::$instances[$worldId] = [];
|
|
$world->addOnUnloadCallback(static function() use ($worldId) : void{
|
|
foreach(self::$instances[$worldId] as $cache){
|
|
$cache->caches = [];
|
|
}
|
|
unset(self::$instances[$worldId]);
|
|
\GlobalLogger::get()->debug("Destroyed chunk packet caches for world#$worldId");
|
|
});
|
|
}
|
|
if(!isset(self::$instances[$worldId][$compressorId])){
|
|
\GlobalLogger::get()->debug("Created new chunk packet cache (world#$worldId, compressor#$compressorId)");
|
|
self::$instances[$worldId][$compressorId] = new self($world, $compressor);
|
|
}
|
|
return self::$instances[$worldId][$compressorId];
|
|
}
|
|
|
|
public static function pruneCaches() : void{
|
|
foreach(self::$instances as $compressorMap){
|
|
foreach($compressorMap as $chunkCache){
|
|
foreach($chunkCache->caches as $chunkHash => $promise){
|
|
if($promise->hasResult()){
|
|
//Do not clear promises that are not yet fulfilled; they will have requesters waiting on them
|
|
unset($chunkCache->caches[$chunkHash]);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
/** @var CompressBatchPromise[] */
|
|
private array $caches = [];
|
|
|
|
private int $hits = 0;
|
|
private int $misses = 0;
|
|
|
|
private function __construct(
|
|
private World $world,
|
|
private Compressor $compressor
|
|
){}
|
|
|
|
/**
|
|
* Requests asynchronous preparation of the chunk at the given coordinates.
|
|
*
|
|
* @return CompressBatchPromise a promise of resolution which will contain a compressed chunk packet.
|
|
*/
|
|
public function request(int $chunkX, int $chunkZ) : CompressBatchPromise{
|
|
$this->world->registerChunkListener($this, $chunkX, $chunkZ);
|
|
$chunk = $this->world->getChunk($chunkX, $chunkZ);
|
|
if($chunk === null){
|
|
throw new \InvalidArgumentException("Cannot request an unloaded chunk");
|
|
}
|
|
$chunkHash = World::chunkHash($chunkX, $chunkZ);
|
|
|
|
if(isset($this->caches[$chunkHash])){
|
|
++$this->hits;
|
|
return $this->caches[$chunkHash];
|
|
}
|
|
|
|
++$this->misses;
|
|
|
|
$this->world->timings->syncChunkSendPrepare->startTiming();
|
|
try{
|
|
$this->caches[$chunkHash] = new CompressBatchPromise();
|
|
|
|
$this->world->getServer()->getAsyncPool()->submitTask(
|
|
new ChunkRequestTask(
|
|
$chunkX,
|
|
$chunkZ,
|
|
$chunk,
|
|
$this->caches[$chunkHash],
|
|
$this->compressor
|
|
)
|
|
);
|
|
|
|
return $this->caches[$chunkHash];
|
|
}finally{
|
|
$this->world->timings->syncChunkSendPrepare->stopTiming();
|
|
}
|
|
}
|
|
|
|
private function destroy(int $chunkX, int $chunkZ) : bool{
|
|
$chunkHash = World::chunkHash($chunkX, $chunkZ);
|
|
$existing = $this->caches[$chunkHash] ?? null;
|
|
unset($this->caches[$chunkHash]);
|
|
|
|
return $existing !== null;
|
|
}
|
|
|
|
/**
|
|
* Restarts an async request for an unresolved chunk.
|
|
*
|
|
* @throws \InvalidArgumentException
|
|
*/
|
|
private function restartPendingRequest(int $chunkX, int $chunkZ) : void{
|
|
$chunkHash = World::chunkHash($chunkX, $chunkZ);
|
|
$existing = $this->caches[$chunkHash] ?? null;
|
|
if($existing === null || $existing->hasResult()){
|
|
throw new \InvalidArgumentException("Restart can only be applied to unresolved promises");
|
|
}
|
|
$existing->cancel();
|
|
unset($this->caches[$chunkHash]);
|
|
|
|
$this->request($chunkX, $chunkZ)->onResolve(...$existing->getResolveCallbacks());
|
|
}
|
|
|
|
/**
|
|
* @throws \InvalidArgumentException
|
|
*/
|
|
private function destroyOrRestart(int $chunkX, int $chunkZ) : void{
|
|
$cache = $this->caches[World::chunkHash($chunkX, $chunkZ)] ?? null;
|
|
if($cache !== null){
|
|
if(!$cache->hasResult()){
|
|
//some requesters are waiting for this chunk, so their request needs to be fulfilled
|
|
$this->restartPendingRequest($chunkX, $chunkZ);
|
|
}else{
|
|
//dump the cache, it'll be regenerated the next time it's requested
|
|
$this->destroy($chunkX, $chunkZ);
|
|
}
|
|
}
|
|
}
|
|
|
|
use ChunkListenerNoOpTrait {
|
|
//force overriding of these
|
|
onChunkChanged as private;
|
|
onBlockChanged as private;
|
|
onChunkUnloaded as private;
|
|
}
|
|
|
|
/**
|
|
* @see ChunkListener::onChunkChanged()
|
|
*/
|
|
public function onChunkChanged(int $chunkX, int $chunkZ, Chunk $chunk) : void{
|
|
$this->destroyOrRestart($chunkX, $chunkZ);
|
|
}
|
|
|
|
/**
|
|
* @see ChunkListener::onBlockChanged()
|
|
*/
|
|
public function onBlockChanged(Vector3 $block) : void{
|
|
//FIXME: requesters will still receive this chunk after it's been dropped, but we can't mark this for a simple
|
|
//sync here because it can spam the worker pool
|
|
$this->destroy($block->getFloorX() >> Chunk::COORD_BIT_SIZE, $block->getFloorZ() >> Chunk::COORD_BIT_SIZE);
|
|
}
|
|
|
|
/**
|
|
* @see ChunkListener::onChunkUnloaded()
|
|
*/
|
|
public function onChunkUnloaded(int $chunkX, int $chunkZ, Chunk $chunk) : void{
|
|
$this->destroy($chunkX, $chunkZ);
|
|
$this->world->unregisterChunkListener($this, $chunkX, $chunkZ);
|
|
}
|
|
|
|
/**
|
|
* Returns the number of bytes occupied by the cache data in this cache. This does not include the size of any
|
|
* promises referenced by the cache.
|
|
*/
|
|
public function calculateCacheSize() : int{
|
|
$result = 0;
|
|
foreach($this->caches as $cache){
|
|
if($cache->hasResult()){
|
|
$result += strlen($cache->getResult());
|
|
}
|
|
}
|
|
return $result;
|
|
}
|
|
|
|
/**
|
|
* Returns the percentage of requests to the cache which resulted in a cache hit.
|
|
*/
|
|
public function getHitPercentage() : float{
|
|
$total = $this->hits + $this->misses;
|
|
return $total > 0 ? $this->hits / $total : 0.0;
|
|
}
|
|
}
|