From 9c5f7128a4c6b670ff3240dfab6001c6d4aa2172 Mon Sep 17 00:00:00 2001 From: "Dylan K. Taylor" Date: Thu, 10 May 2018 12:33:05 +0100 Subject: [PATCH] RCON: lots of cleanup, now notification-based instead of poll-based This now utilizes Snooze in order to have the server wake up to process RCON commands ondemand, similar to how the CommandReader thread operates. This is better for performance and response times. This also makes a few other changes: - RCON thread will now waste less CPU since it uses a blocking select() with timeout to read - Following from that, IPC sockets are used to allow interrupting select() from the RCON thread. - Multiple threads for RCON has been removed (this is entirely unnecessary, reading data from sockets is not CPU-intensive, and a single thread is easier to work with) --- src/pocketmine/Server.php | 8 +- src/pocketmine/network/rcon/RCON.php | 87 +++++----- src/pocketmine/network/rcon/RCONInstance.php | 170 ++++++++++--------- 3 files changed, 133 insertions(+), 132 deletions(-) diff --git a/src/pocketmine/Server.php b/src/pocketmine/Server.php index a130fa611..d25bfe510 100644 --- a/src/pocketmine/Server.php +++ b/src/pocketmine/Server.php @@ -1539,8 +1539,7 @@ class Server{ $this->getConfigString("rcon.password", ""), $this->getConfigInt("rcon.port", $this->getPort()), $this->getIp(), - $this->getConfigInt("rcon.threads", 1), - $this->getConfigInt("rcon.clients-per-thread", 50) + $this->getConfigInt("rcon.max-clients", 50) ); }catch(\Throwable $e){ $this->getLogger()->critical("RCON can't be started: " . $e->getMessage()); @@ -2499,11 +2498,6 @@ class Server{ Timings::$connectionTimer->startTiming(); $this->network->processInterfaces(); - - if($this->rcon !== null){ - $this->rcon->check(); - } - Timings::$connectionTimer->stopTiming(); Timings::$schedulerTimer->startTiming(); diff --git a/src/pocketmine/network/rcon/RCON.php b/src/pocketmine/network/rcon/RCON.php index a5eecd28d..711598dd0 100644 --- a/src/pocketmine/network/rcon/RCON.php +++ b/src/pocketmine/network/rcon/RCON.php @@ -30,6 +30,7 @@ namespace pocketmine\network\rcon; use pocketmine\command\RemoteConsoleCommandSender; use pocketmine\event\server\RemoteServerCommandEvent; use pocketmine\Server; +use pocketmine\snooze\SleeperNotifier; use pocketmine\utils\TextFormat; class RCON{ @@ -37,25 +38,22 @@ class RCON{ private $server; /** @var resource */ private $socket; - /** @var string */ - private $password; - /** @var int */ - private $threads; - /** @var RCONInstance[] */ - private $workers = []; - /** @var int */ - private $clientsPerThread; - public function __construct(Server $server, string $password, int $port = 19132, string $interface = "0.0.0.0", int $threads = 1, int $clientsPerThread = 50){ + /** @var RCONInstance */ + private $instance; + + /** @var resource */ + private $ipcMainSocket; + /** @var resource */ + private $ipcThreadSocket; + + public function __construct(Server $server, string $password, int $port = 19132, string $interface = "0.0.0.0", int $maxClients = 50){ $this->server = $server; - $this->password = $password; $this->server->getLogger()->info("Starting remote control listener"); - if($this->password === ""){ + if($password === ""){ throw new \InvalidArgumentException("Empty password"); } - $this->threads = (int) max(1, $threads); - $this->clientsPerThread = (int) max(1, $clientsPerThread); $this->socket = socket_create(AF_INET, SOCK_STREAM, SOL_TCP); if($this->socket === false or !@socket_bind($this->socket, $interface, $port) or !@socket_listen($this->socket)){ @@ -64,52 +62,51 @@ class RCON{ socket_set_block($this->socket); - for($n = 0; $n < $this->threads; ++$n){ - $this->workers[$n] = new RCONInstance($this->socket, $this->password, $this->clientsPerThread); + $ret = @socket_create_pair(AF_UNIX, SOCK_STREAM, 0, $ipc); + if(!$ret){ + $err = socket_last_error(); + if(($err !== SOCKET_EPROTONOSUPPORT and $err !== SOCKET_ENOPROTOOPT) or !@socket_create_pair(AF_INET, SOCK_STREAM, 0, $ipc)){ + throw new \RuntimeException(trim(socket_strerror(socket_last_error()))); + } } + [$this->ipcMainSocket, $this->ipcThreadSocket] = $ipc; + + $notifier = new SleeperNotifier(); + $this->server->getTickSleeper()->addNotifier($notifier, function() : void{ + $this->check(); + }); + $this->instance = new RCONInstance($this->socket, $password, (int) max(1, $maxClients), $this->server->getLogger(), $this->ipcThreadSocket, $notifier); + socket_getsockname($this->socket, $addr, $port); $this->server->getLogger()->info("RCON running on $addr:$port"); } public function stop(){ - for($n = 0; $n < $this->threads; ++$n){ - $this->workers[$n]->close(); - Server::microSleep(50000); - $this->workers[$n]->quit(); - } + $this->instance->close(); + socket_write($this->ipcMainSocket, "\x00"); //make select() return + Server::microSleep(50000); + $this->instance->quit(); + @socket_close($this->socket); - $this->threads = 0; + @socket_close($this->ipcMainSocket); + @socket_close($this->ipcThreadSocket); } public function check(){ - for($n = 0; $n < $this->threads; ++$n){ - if($this->workers[$n]->isTerminated()){ - $this->workers[$n] = new RCONInstance($this->socket, $this->password, $this->clientsPerThread); - }elseif($this->workers[$n]->isWaiting()){ - if($this->workers[$n]->response !== ""){ - $this->server->getLogger()->info($this->workers[$n]->response); - $this->workers[$n]->synchronized(function(RCONInstance $thread){ - $thread->notify(); - }, $this->workers[$n]); - }else{ + $response = new RemoteConsoleCommandSender(); + $command = $this->instance->cmd; - $response = new RemoteConsoleCommandSender(); - $command = $this->workers[$n]->cmd; + $this->server->getPluginManager()->callEvent($ev = new RemoteServerCommandEvent($response, $command)); - $this->server->getPluginManager()->callEvent($ev = new RemoteServerCommandEvent($response, $command)); - - if(!$ev->isCancelled()){ - $this->server->dispatchCommand($ev->getSender(), $ev->getCommand()); - } - - $this->workers[$n]->response = TextFormat::clean($response->getMessage()); - $this->workers[$n]->synchronized(function(RCONInstance $thread){ - $thread->notify(); - }, $this->workers[$n]); - } - } + if(!$ev->isCancelled()){ + $this->server->dispatchCommand($ev->getSender(), $ev->getCommand()); } + + $this->instance->response = TextFormat::clean($response->getMessage()); + $this->instance->synchronized(function(RCONInstance $thread){ + $thread->notify(); + }, $this->instance); } } diff --git a/src/pocketmine/network/rcon/RCONInstance.php b/src/pocketmine/network/rcon/RCONInstance.php index 5e6f0e650..64224ff77 100644 --- a/src/pocketmine/network/rcon/RCONInstance.php +++ b/src/pocketmine/network/rcon/RCONInstance.php @@ -23,13 +23,11 @@ declare(strict_types=1); namespace pocketmine\network\rcon; +use pocketmine\snooze\SleeperNotifier; use pocketmine\Thread; use pocketmine\utils\Binary; class RCONInstance extends Thread{ - private const STATUS_DISCONNECTED = -1; - private const STATUS_AUTHENTICATING = 0; - private const STATUS_CONNECTED = 1; /** @var string */ public $cmd; @@ -44,30 +42,31 @@ class RCONInstance extends Thread{ private $password; /** @var int */ private $maxClients; - /** @var bool */ - private $waiting; - - public function isWaiting(){ - return $this->waiting; - } + /** @var \ThreadedLogger */ + private $logger; + /** @var resource */ + private $ipcSocket; + /** @var SleeperNotifier|null */ + private $notifier; /** - * @param resource $socket - * @param string $password - * @param int $maxClients + * @param resource $socket + * @param string $password + * @param int $maxClients + * @param \ThreadedLogger $logger + * @param resource $ipcSocket + * @param null|SleeperNotifier $notifier */ - public function __construct($socket, string $password, int $maxClients = 50){ + public function __construct($socket, string $password, int $maxClients = 50, \ThreadedLogger $logger, $ipcSocket, ?SleeperNotifier $notifier){ $this->stop = false; $this->cmd = ""; $this->response = ""; $this->socket = $socket; $this->password = $password; $this->maxClients = $maxClients; - for($n = 0; $n < $this->maxClients; ++$n){ - $this->{"client" . $n} = null; - $this->{"status" . $n} = self::STATUS_DISCONNECTED; - $this->{"timeout" . $n} = 0; - } + $this->logger = $logger; + $this->ipcSocket = $ipcSocket; + $this->notifier = $notifier; $this->start(PTHREADS_INHERIT_NONE); } @@ -81,7 +80,6 @@ class RCONInstance extends Thread{ } private function readPacket($client, &$requestID, &$packetType, &$payload){ - socket_set_nonblock($client); $d = socket_read($client, 4); if($this->stop){ return false; @@ -90,7 +88,7 @@ class RCONInstance extends Thread{ }elseif($d === "" or strlen($d) < 4){ return false; } - socket_set_block($client); + $size = Binary::readLInt($d); if($size < 0 or $size > 65535){ return false; @@ -107,44 +105,49 @@ class RCONInstance extends Thread{ public function run(){ $this->registerClassLoader(); + + /** @var resource[] $clients */ + $clients = []; + /** @var int[] $authenticated */ + $authenticated = []; + /** @var float[] $timeouts */ + $timeouts = []; + + /** @var int $nextClientId */ + $nextClientId = 0; + while(!$this->stop){ - $this->synchronized(function(){ - $this->wait(2000); - }); - $r = [$socket = $this->socket]; + $r = $clients; + $r["main"] = $this->socket; //this is ugly, but we need to be able to mass-select() + $r["ipc"] = $this->ipcSocket; $w = null; $e = null; - if(socket_select($r, $w, $e, 0) === 1){ - if(($client = socket_accept($this->socket)) !== false){ - socket_set_block($client); - socket_set_option($client, SOL_SOCKET, SO_KEEPALIVE, 1); - $done = false; - for($n = 0; $n < $this->maxClients; ++$n){ - if($this->{"client" . $n} === null){ - $this->{"client" . $n} = $client; - $this->{"status" . $n} = self::STATUS_AUTHENTICATING; - $this->{"timeout" . $n} = microtime(true) + 5; - $done = true; - break; - } - } - if(!$done){ - @socket_close($client); - } - } - } - for($n = 0; $n < $this->maxClients; ++$n){ - $client = &$this->{"client" . $n}; - if($client !== null){ - if($this->{"status" . $n} !== self::STATUS_DISCONNECTED and !$this->stop){ - if($this->{"status" . $n} === self::STATUS_AUTHENTICATING and $this->{"timeout" . $n} < microtime(true)){ //Timeout - $this->{"status" . $n} = self::STATUS_DISCONNECTED; - continue; + $disconnect = []; + + if(socket_select($r, $w, $e, 5, 0) > 0){ + foreach($r as $id => $sock){ + if($sock === $this->socket){ + if(($client = socket_accept($this->socket)) !== false){ + if(count($clients) >= $this->maxClients){ + @socket_close($client); + }else{ + socket_set_block($client); + socket_set_option($client, SOL_SOCKET, SO_KEEPALIVE, 1); + + $id = $nextClientId++; + $clients[$id] = $client; + $authenticated[$id] = false; + $timeouts[$id] = microtime(true) + 5; + } } - $p = $this->readPacket($client, $requestID, $packetType, $payload); + }elseif($sock === $this->ipcSocket){ + //read dummy data + socket_read($sock, 65535); + }else{ + $p = $this->readPacket($sock, $requestID, $packetType, $payload); if($p === false){ - $this->{"status" . $n} = self::STATUS_DISCONNECTED; + $disconnect[$id] = $sock; continue; }elseif($p === null){ continue; @@ -152,57 +155,64 @@ class RCONInstance extends Thread{ switch($packetType){ case 3: //Login - if($this->{"status" . $n} !== self::STATUS_AUTHENTICATING){ - $this->{"status" . $n} = self::STATUS_DISCONNECTED; + if($authenticated[$id]){ + $disconnect[$id] = $sock; break; } if($payload === $this->password){ - socket_getpeername($client, $addr, $port); - $this->response = "[INFO] Successful Rcon connection from: /$addr:$port"; - $this->synchronized(function(){ - $this->waiting = true; - $this->wait(); - }); - $this->waiting = false; - $this->response = ""; - $this->writePacket($client, $requestID, 2, ""); - $this->{"status" . $n} = self::STATUS_CONNECTED; + socket_getpeername($sock, $addr, $port); + $this->logger->info("Successful Rcon connection from: /$addr:$port"); + $this->writePacket($sock, $requestID, 2, ""); + $authenticated[$id] = true; }else{ - $this->{"status" . $n} = self::STATUS_DISCONNECTED; - $this->writePacket($client, -1, 2, ""); + $disconnect[$id] = $sock; + $this->writePacket($sock, -1, 2, ""); } break; case 2: //Command - if($this->{"status" . $n} !== self::STATUS_CONNECTED){ - $this->{"status" . $n} = self::STATUS_DISCONNECTED; + if(!$authenticated[$id]){ + $disconnect[$id] = $sock; break; } if(strlen($payload) > 0){ $this->cmd = ltrim($payload); $this->synchronized(function(){ - $this->waiting = true; + $this->notifier->wakeupSleeper(); $this->wait(); }); - $this->waiting = false; - $this->writePacket($client, $requestID, 0, str_replace("\n", "\r\n", trim($this->response))); + $this->writePacket($sock, $requestID, 0, str_replace("\n", "\r\n", trim($this->response))); $this->response = ""; $this->cmd = ""; } break; } - - }else{ - @socket_set_option($client, SOL_SOCKET, SO_LINGER, ["l_onoff" => 1, "l_linger" => 1]); - @socket_shutdown($client, 2); - @socket_set_block($client); - @socket_read($client, 1); - @socket_close($client); - $this->{"status" . $n} = self::STATUS_DISCONNECTED; - $this->{"client" . $n} = null; } } } + + foreach($authenticated as $id => $status){ + if(!isset($disconnect[$id]) and !$authenticated[$id] and $timeouts[$id] < microtime(true)){ //Timeout + $disconnect[$id] = $clients[$id]; + } + } + + foreach($disconnect as $id => $client){ + $this->disconnectClient($client); + unset($clients[$id], $authenticated[$id], $timeouts[$id]); + } } + + foreach($clients as $client){ + $this->disconnectClient($client); + } + } + + private function disconnectClient($client) : void{ + @socket_set_option($client, SOL_SOCKET, SO_LINGER, ["l_onoff" => 1, "l_linger" => 1]); + @socket_shutdown($client, 2); + @socket_set_block($client); + @socket_read($client, 1); + @socket_close($client); } public function getThreadName() : string{