diff --git a/src/pocketmine/Server.php b/src/pocketmine/Server.php index a130fa611..d25bfe510 100644 --- a/src/pocketmine/Server.php +++ b/src/pocketmine/Server.php @@ -1539,8 +1539,7 @@ class Server{ $this->getConfigString("rcon.password", ""), $this->getConfigInt("rcon.port", $this->getPort()), $this->getIp(), - $this->getConfigInt("rcon.threads", 1), - $this->getConfigInt("rcon.clients-per-thread", 50) + $this->getConfigInt("rcon.max-clients", 50) ); }catch(\Throwable $e){ $this->getLogger()->critical("RCON can't be started: " . $e->getMessage()); @@ -2499,11 +2498,6 @@ class Server{ Timings::$connectionTimer->startTiming(); $this->network->processInterfaces(); - - if($this->rcon !== null){ - $this->rcon->check(); - } - Timings::$connectionTimer->stopTiming(); Timings::$schedulerTimer->startTiming(); diff --git a/src/pocketmine/network/rcon/RCON.php b/src/pocketmine/network/rcon/RCON.php index a5eecd28d..711598dd0 100644 --- a/src/pocketmine/network/rcon/RCON.php +++ b/src/pocketmine/network/rcon/RCON.php @@ -30,6 +30,7 @@ namespace pocketmine\network\rcon; use pocketmine\command\RemoteConsoleCommandSender; use pocketmine\event\server\RemoteServerCommandEvent; use pocketmine\Server; +use pocketmine\snooze\SleeperNotifier; use pocketmine\utils\TextFormat; class RCON{ @@ -37,25 +38,22 @@ class RCON{ private $server; /** @var resource */ private $socket; - /** @var string */ - private $password; - /** @var int */ - private $threads; - /** @var RCONInstance[] */ - private $workers = []; - /** @var int */ - private $clientsPerThread; - public function __construct(Server $server, string $password, int $port = 19132, string $interface = "0.0.0.0", int $threads = 1, int $clientsPerThread = 50){ + /** @var RCONInstance */ + private $instance; + + /** @var resource */ + private $ipcMainSocket; + /** @var resource */ + private $ipcThreadSocket; + + public function __construct(Server $server, string $password, int $port = 19132, string $interface = "0.0.0.0", int $maxClients = 50){ $this->server = $server; - $this->password = $password; $this->server->getLogger()->info("Starting remote control listener"); - if($this->password === ""){ + if($password === ""){ throw new \InvalidArgumentException("Empty password"); } - $this->threads = (int) max(1, $threads); - $this->clientsPerThread = (int) max(1, $clientsPerThread); $this->socket = socket_create(AF_INET, SOCK_STREAM, SOL_TCP); if($this->socket === false or !@socket_bind($this->socket, $interface, $port) or !@socket_listen($this->socket)){ @@ -64,52 +62,51 @@ class RCON{ socket_set_block($this->socket); - for($n = 0; $n < $this->threads; ++$n){ - $this->workers[$n] = new RCONInstance($this->socket, $this->password, $this->clientsPerThread); + $ret = @socket_create_pair(AF_UNIX, SOCK_STREAM, 0, $ipc); + if(!$ret){ + $err = socket_last_error(); + if(($err !== SOCKET_EPROTONOSUPPORT and $err !== SOCKET_ENOPROTOOPT) or !@socket_create_pair(AF_INET, SOCK_STREAM, 0, $ipc)){ + throw new \RuntimeException(trim(socket_strerror(socket_last_error()))); + } } + [$this->ipcMainSocket, $this->ipcThreadSocket] = $ipc; + + $notifier = new SleeperNotifier(); + $this->server->getTickSleeper()->addNotifier($notifier, function() : void{ + $this->check(); + }); + $this->instance = new RCONInstance($this->socket, $password, (int) max(1, $maxClients), $this->server->getLogger(), $this->ipcThreadSocket, $notifier); + socket_getsockname($this->socket, $addr, $port); $this->server->getLogger()->info("RCON running on $addr:$port"); } public function stop(){ - for($n = 0; $n < $this->threads; ++$n){ - $this->workers[$n]->close(); - Server::microSleep(50000); - $this->workers[$n]->quit(); - } + $this->instance->close(); + socket_write($this->ipcMainSocket, "\x00"); //make select() return + Server::microSleep(50000); + $this->instance->quit(); + @socket_close($this->socket); - $this->threads = 0; + @socket_close($this->ipcMainSocket); + @socket_close($this->ipcThreadSocket); } public function check(){ - for($n = 0; $n < $this->threads; ++$n){ - if($this->workers[$n]->isTerminated()){ - $this->workers[$n] = new RCONInstance($this->socket, $this->password, $this->clientsPerThread); - }elseif($this->workers[$n]->isWaiting()){ - if($this->workers[$n]->response !== ""){ - $this->server->getLogger()->info($this->workers[$n]->response); - $this->workers[$n]->synchronized(function(RCONInstance $thread){ - $thread->notify(); - }, $this->workers[$n]); - }else{ + $response = new RemoteConsoleCommandSender(); + $command = $this->instance->cmd; - $response = new RemoteConsoleCommandSender(); - $command = $this->workers[$n]->cmd; + $this->server->getPluginManager()->callEvent($ev = new RemoteServerCommandEvent($response, $command)); - $this->server->getPluginManager()->callEvent($ev = new RemoteServerCommandEvent($response, $command)); - - if(!$ev->isCancelled()){ - $this->server->dispatchCommand($ev->getSender(), $ev->getCommand()); - } - - $this->workers[$n]->response = TextFormat::clean($response->getMessage()); - $this->workers[$n]->synchronized(function(RCONInstance $thread){ - $thread->notify(); - }, $this->workers[$n]); - } - } + if(!$ev->isCancelled()){ + $this->server->dispatchCommand($ev->getSender(), $ev->getCommand()); } + + $this->instance->response = TextFormat::clean($response->getMessage()); + $this->instance->synchronized(function(RCONInstance $thread){ + $thread->notify(); + }, $this->instance); } } diff --git a/src/pocketmine/network/rcon/RCONInstance.php b/src/pocketmine/network/rcon/RCONInstance.php index 5e6f0e650..64224ff77 100644 --- a/src/pocketmine/network/rcon/RCONInstance.php +++ b/src/pocketmine/network/rcon/RCONInstance.php @@ -23,13 +23,11 @@ declare(strict_types=1); namespace pocketmine\network\rcon; +use pocketmine\snooze\SleeperNotifier; use pocketmine\Thread; use pocketmine\utils\Binary; class RCONInstance extends Thread{ - private const STATUS_DISCONNECTED = -1; - private const STATUS_AUTHENTICATING = 0; - private const STATUS_CONNECTED = 1; /** @var string */ public $cmd; @@ -44,30 +42,31 @@ class RCONInstance extends Thread{ private $password; /** @var int */ private $maxClients; - /** @var bool */ - private $waiting; - - public function isWaiting(){ - return $this->waiting; - } + /** @var \ThreadedLogger */ + private $logger; + /** @var resource */ + private $ipcSocket; + /** @var SleeperNotifier|null */ + private $notifier; /** - * @param resource $socket - * @param string $password - * @param int $maxClients + * @param resource $socket + * @param string $password + * @param int $maxClients + * @param \ThreadedLogger $logger + * @param resource $ipcSocket + * @param null|SleeperNotifier $notifier */ - public function __construct($socket, string $password, int $maxClients = 50){ + public function __construct($socket, string $password, int $maxClients = 50, \ThreadedLogger $logger, $ipcSocket, ?SleeperNotifier $notifier){ $this->stop = false; $this->cmd = ""; $this->response = ""; $this->socket = $socket; $this->password = $password; $this->maxClients = $maxClients; - for($n = 0; $n < $this->maxClients; ++$n){ - $this->{"client" . $n} = null; - $this->{"status" . $n} = self::STATUS_DISCONNECTED; - $this->{"timeout" . $n} = 0; - } + $this->logger = $logger; + $this->ipcSocket = $ipcSocket; + $this->notifier = $notifier; $this->start(PTHREADS_INHERIT_NONE); } @@ -81,7 +80,6 @@ class RCONInstance extends Thread{ } private function readPacket($client, &$requestID, &$packetType, &$payload){ - socket_set_nonblock($client); $d = socket_read($client, 4); if($this->stop){ return false; @@ -90,7 +88,7 @@ class RCONInstance extends Thread{ }elseif($d === "" or strlen($d) < 4){ return false; } - socket_set_block($client); + $size = Binary::readLInt($d); if($size < 0 or $size > 65535){ return false; @@ -107,44 +105,49 @@ class RCONInstance extends Thread{ public function run(){ $this->registerClassLoader(); + + /** @var resource[] $clients */ + $clients = []; + /** @var int[] $authenticated */ + $authenticated = []; + /** @var float[] $timeouts */ + $timeouts = []; + + /** @var int $nextClientId */ + $nextClientId = 0; + while(!$this->stop){ - $this->synchronized(function(){ - $this->wait(2000); - }); - $r = [$socket = $this->socket]; + $r = $clients; + $r["main"] = $this->socket; //this is ugly, but we need to be able to mass-select() + $r["ipc"] = $this->ipcSocket; $w = null; $e = null; - if(socket_select($r, $w, $e, 0) === 1){ - if(($client = socket_accept($this->socket)) !== false){ - socket_set_block($client); - socket_set_option($client, SOL_SOCKET, SO_KEEPALIVE, 1); - $done = false; - for($n = 0; $n < $this->maxClients; ++$n){ - if($this->{"client" . $n} === null){ - $this->{"client" . $n} = $client; - $this->{"status" . $n} = self::STATUS_AUTHENTICATING; - $this->{"timeout" . $n} = microtime(true) + 5; - $done = true; - break; - } - } - if(!$done){ - @socket_close($client); - } - } - } - for($n = 0; $n < $this->maxClients; ++$n){ - $client = &$this->{"client" . $n}; - if($client !== null){ - if($this->{"status" . $n} !== self::STATUS_DISCONNECTED and !$this->stop){ - if($this->{"status" . $n} === self::STATUS_AUTHENTICATING and $this->{"timeout" . $n} < microtime(true)){ //Timeout - $this->{"status" . $n} = self::STATUS_DISCONNECTED; - continue; + $disconnect = []; + + if(socket_select($r, $w, $e, 5, 0) > 0){ + foreach($r as $id => $sock){ + if($sock === $this->socket){ + if(($client = socket_accept($this->socket)) !== false){ + if(count($clients) >= $this->maxClients){ + @socket_close($client); + }else{ + socket_set_block($client); + socket_set_option($client, SOL_SOCKET, SO_KEEPALIVE, 1); + + $id = $nextClientId++; + $clients[$id] = $client; + $authenticated[$id] = false; + $timeouts[$id] = microtime(true) + 5; + } } - $p = $this->readPacket($client, $requestID, $packetType, $payload); + }elseif($sock === $this->ipcSocket){ + //read dummy data + socket_read($sock, 65535); + }else{ + $p = $this->readPacket($sock, $requestID, $packetType, $payload); if($p === false){ - $this->{"status" . $n} = self::STATUS_DISCONNECTED; + $disconnect[$id] = $sock; continue; }elseif($p === null){ continue; @@ -152,57 +155,64 @@ class RCONInstance extends Thread{ switch($packetType){ case 3: //Login - if($this->{"status" . $n} !== self::STATUS_AUTHENTICATING){ - $this->{"status" . $n} = self::STATUS_DISCONNECTED; + if($authenticated[$id]){ + $disconnect[$id] = $sock; break; } if($payload === $this->password){ - socket_getpeername($client, $addr, $port); - $this->response = "[INFO] Successful Rcon connection from: /$addr:$port"; - $this->synchronized(function(){ - $this->waiting = true; - $this->wait(); - }); - $this->waiting = false; - $this->response = ""; - $this->writePacket($client, $requestID, 2, ""); - $this->{"status" . $n} = self::STATUS_CONNECTED; + socket_getpeername($sock, $addr, $port); + $this->logger->info("Successful Rcon connection from: /$addr:$port"); + $this->writePacket($sock, $requestID, 2, ""); + $authenticated[$id] = true; }else{ - $this->{"status" . $n} = self::STATUS_DISCONNECTED; - $this->writePacket($client, -1, 2, ""); + $disconnect[$id] = $sock; + $this->writePacket($sock, -1, 2, ""); } break; case 2: //Command - if($this->{"status" . $n} !== self::STATUS_CONNECTED){ - $this->{"status" . $n} = self::STATUS_DISCONNECTED; + if(!$authenticated[$id]){ + $disconnect[$id] = $sock; break; } if(strlen($payload) > 0){ $this->cmd = ltrim($payload); $this->synchronized(function(){ - $this->waiting = true; + $this->notifier->wakeupSleeper(); $this->wait(); }); - $this->waiting = false; - $this->writePacket($client, $requestID, 0, str_replace("\n", "\r\n", trim($this->response))); + $this->writePacket($sock, $requestID, 0, str_replace("\n", "\r\n", trim($this->response))); $this->response = ""; $this->cmd = ""; } break; } - - }else{ - @socket_set_option($client, SOL_SOCKET, SO_LINGER, ["l_onoff" => 1, "l_linger" => 1]); - @socket_shutdown($client, 2); - @socket_set_block($client); - @socket_read($client, 1); - @socket_close($client); - $this->{"status" . $n} = self::STATUS_DISCONNECTED; - $this->{"client" . $n} = null; } } } + + foreach($authenticated as $id => $status){ + if(!isset($disconnect[$id]) and !$authenticated[$id] and $timeouts[$id] < microtime(true)){ //Timeout + $disconnect[$id] = $clients[$id]; + } + } + + foreach($disconnect as $id => $client){ + $this->disconnectClient($client); + unset($clients[$id], $authenticated[$id], $timeouts[$id]); + } } + + foreach($clients as $client){ + $this->disconnectClient($client); + } + } + + private function disconnectClient($client) : void{ + @socket_set_option($client, SOL_SOCKET, SO_LINGER, ["l_onoff" => 1, "l_linger" => 1]); + @socket_shutdown($client, 2); + @socket_set_block($client); + @socket_read($client, 1); + @socket_close($client); } public function getThreadName() : string{