Update to Snooze 0.5.0

This commit is contained in:
Dylan K. Taylor
2023-05-23 01:09:22 +01:00
parent 4aba9d9725
commit c66a3a8b3e
6 changed files with 48 additions and 44 deletions

View File

@@ -39,7 +39,6 @@ use pocketmine\network\NetworkInterfaceStartException;
use pocketmine\network\PacketHandlingException;
use pocketmine\player\GameMode;
use pocketmine\Server;
use pocketmine\snooze\SleeperNotifier;
use pocketmine\timings\Timings;
use pocketmine\utils\Utils;
use raklib\generic\DisconnectReason;
@@ -79,7 +78,7 @@ class RakLibInterface implements ServerEventListener, AdvancedNetworkInterface{
private RakLibToUserThreadMessageReceiver $eventReceiver;
private UserToRakLibThreadMessageSender $interface;
private SleeperNotifier $sleeper;
private int $sleeperNotifierId;
private PacketBroadcaster $packetBroadcaster;
private EntityEventBroadcaster $entityEventBroadcaster;
@@ -104,7 +103,15 @@ class RakLibInterface implements ServerEventListener, AdvancedNetworkInterface{
$this->rakServerId = mt_rand(0, PHP_INT_MAX);
$this->sleeper = new SleeperNotifier();
$sleeperEntry = $this->server->getTickSleeper()->addNotifier(function() : void{
Timings::$connection->startTiming();
try{
while($this->eventReceiver->handle($this));
}finally{
Timings::$connection->stopTiming();
}
});
$this->sleeperNotifierId = $sleeperEntry->getNotifierId();
/** @phpstan-var ThreadSafeArray<int, string> $mainToThreadBuffer */
$mainToThreadBuffer = new ThreadSafeArray();
@@ -119,7 +126,7 @@ class RakLibInterface implements ServerEventListener, AdvancedNetworkInterface{
$this->rakServerId,
$this->server->getConfigGroup()->getPropertyInt("network.max-mtu-size", 1492),
self::MCPE_RAKNET_PROTOCOL_VERSION,
$this->sleeper
$sleeperEntry
);
$this->eventReceiver = new RakLibToUserThreadMessageReceiver(
new PthreadsChannelReader($threadToMainBuffer)
@@ -130,14 +137,6 @@ class RakLibInterface implements ServerEventListener, AdvancedNetworkInterface{
}
public function start() : void{
$this->server->getTickSleeper()->addNotifier($this->sleeper, function() : void{
Timings::$connection->startTiming();
try{
while($this->eventReceiver->handle($this));
}finally{
Timings::$connection->stopTiming();
}
});
$this->server->getLogger()->debug("Waiting for RakLib to start...");
try{
$this->rakLib->startAndWait();
@@ -182,7 +181,7 @@ class RakLibInterface implements ServerEventListener, AdvancedNetworkInterface{
}
public function shutdown() : void{
$this->server->getTickSleeper()->removeNotifier($this->sleeper);
$this->server->getTickSleeper()->removeNotifier($this->sleeperNotifierId);
$this->rakLib->quit();
}

View File

@@ -25,7 +25,7 @@ namespace pocketmine\network\mcpe\raklib;
use pmmp\thread\Thread as NativeThread;
use pmmp\thread\ThreadSafeArray;
use pocketmine\snooze\SleeperNotifier;
use pocketmine\snooze\SleeperHandlerEntry;
use pocketmine\thread\log\ThreadSafeLogger;
use pocketmine\thread\NonThreadSafeValue;
use pocketmine\thread\Thread;
@@ -63,7 +63,7 @@ class RakLibServer extends Thread{
protected int $serverId,
protected int $maxMtuSize,
protected int $protocolVersion,
protected SleeperNotifier $mainThreadNotifier
protected SleeperHandlerEntry $sleeperEntry
){
$this->mainPath = \pocketmine\PATH;
$this->address = new NonThreadSafeValue($address);
@@ -133,7 +133,7 @@ class RakLibServer extends Thread{
$this->maxMtuSize,
new SimpleProtocolAcceptor($this->protocolVersion),
new UserToRakLibThreadMessageReceiver(new PthreadsChannelReader($this->mainToThreadBuffer)),
new RakLibToUserThreadMessageSender(new SnoozeAwarePthreadsChannelWriter($this->threadToMainBuffer, $this->mainThreadNotifier)),
new RakLibToUserThreadMessageSender(new SnoozeAwarePthreadsChannelWriter($this->threadToMainBuffer, $this->sleeperEntry->createNotifier())),
new ExceptionTraceCleaner($this->mainPath)
);
$this->synchronized(function() : void{

View File

@@ -26,7 +26,6 @@ namespace pocketmine\scheduler;
use pmmp\thread\Thread as NativeThread;
use pmmp\thread\ThreadSafeArray;
use pocketmine\snooze\SleeperHandler;
use pocketmine\snooze\SleeperNotifier;
use pocketmine\thread\log\ThreadSafeLogger;
use pocketmine\thread\ThreadSafeClassLoader;
use pocketmine\utils\Utils;
@@ -52,8 +51,8 @@ class AsyncPool{
private array $taskQueues = [];
/**
* @var AsyncWorker[]
* @phpstan-var array<int, AsyncWorker>
* @var AsyncPoolWorkerEntry[]
* @phpstan-var array<int, AsyncPoolWorkerEntry>
*/
private array $workers = [];
/**
@@ -132,13 +131,12 @@ class AsyncPool{
*/
private function getWorker(int $worker) : AsyncWorker{
if(!isset($this->workers[$worker])){
$notifier = new SleeperNotifier();
$this->workers[$worker] = new AsyncWorker($this->logger, $worker, $this->workerMemoryLimit, $notifier);
$this->eventLoop->addNotifier($notifier, function() use ($worker) : void{
$sleeperEntry = $this->eventLoop->addNotifier(function() use ($worker) : void{
$this->collectTasksFromWorker($worker);
});
$this->workers[$worker]->setClassLoaders([$this->classLoader]);
$this->workers[$worker]->start(self::WORKER_START_OPTIONS);
$this->workers[$worker] = new AsyncPoolWorkerEntry(new AsyncWorker($this->logger, $worker, $this->workerMemoryLimit, $sleeperEntry), $sleeperEntry->getNotifierId());
$this->workers[$worker]->worker->setClassLoaders([$this->classLoader]);
$this->workers[$worker]->worker->start(self::WORKER_START_OPTIONS);
$this->taskQueues[$worker] = new \SplQueue();
@@ -147,7 +145,7 @@ class AsyncPool{
}
}
return $this->workers[$worker];
return $this->workers[$worker]->worker;
}
/**
@@ -270,7 +268,7 @@ class AsyncPool{
break; //current task is still running, skip to next worker
}
}
$this->workers[$worker]->collect();
$this->workers[$worker]->worker->collect();
return $more;
}
@@ -289,8 +287,8 @@ class AsyncPool{
$time = time();
foreach($this->taskQueues as $i => $queue){
if((!isset($this->workerLastUsed[$i]) || $this->workerLastUsed[$i] + 300 < $time) && $queue->isEmpty()){
$this->workers[$i]->quit();
$this->eventLoop->removeNotifier($this->workers[$i]->getNotifier());
$this->workers[$i]->worker->quit();
$this->eventLoop->removeNotifier($this->workers[$i]->sleeperNotifierId);
unset($this->workers[$i], $this->taskQueues[$i], $this->workerLastUsed[$i]);
$ret++;
}
@@ -308,8 +306,8 @@ class AsyncPool{
}
foreach($this->workers as $worker){
$worker->quit();
$this->eventLoop->removeNotifier($worker->getNotifier());
$worker->worker->quit();
$this->eventLoop->removeNotifier($worker->sleeperNotifierId);
}
$this->workers = [];
$this->taskQueues = [];

View File

@@ -24,9 +24,11 @@ declare(strict_types=1);
namespace pocketmine\scheduler;
use pmmp\thread\Thread as NativeThread;
use pocketmine\snooze\SleeperHandlerEntry;
use pocketmine\snooze\SleeperNotifier;
use pocketmine\thread\log\ThreadSafeLogger;
use pocketmine\thread\Worker;
use pocketmine\utils\AssumptionFailedError;
use function gc_enable;
use function ini_set;
@@ -34,15 +36,21 @@ class AsyncWorker extends Worker{
/** @var mixed[] */
private static array $store = [];
private const TLS_KEY_NOTIFIER = self::class . "::notifier";
public function __construct(
private ThreadSafeLogger $logger,
private int $id,
private int $memoryLimit,
private SleeperNotifier $notifier
private SleeperHandlerEntry $sleeperEntry
){}
public function getNotifier() : SleeperNotifier{
return $this->notifier;
$notifier = $this->getFromThreadStore(self::TLS_KEY_NOTIFIER);
if(!$notifier instanceof SleeperNotifier){
throw new AssumptionFailedError("SleeperNotifier not found in thread-local storage");
}
return $notifier;
}
protected function onRun() : void{
@@ -57,6 +65,8 @@ class AsyncWorker extends Worker{
ini_set('memory_limit', '-1');
$this->logger->debug("No memory limit set");
}
$this->saveToThreadStore(self::TLS_KEY_NOTIFIER, $this->sleeperEntry->createNotifier());
}
public function getLogger() : ThreadSafeLogger{