RCON: lots of cleanup, now notification-based instead of poll-based

This now utilizes Snooze in order to have the server wake up to process RCON commands ondemand, similar to how the CommandReader thread operates. This is better for performance and response times.

This also makes a few other changes:
- RCON thread will now waste less CPU since it uses a blocking select() with timeout to read
- Following from that, IPC sockets are used to allow interrupting select() from the RCON thread.
- Multiple threads for RCON has been removed (this is entirely unnecessary, reading data from sockets is not CPU-intensive, and a single thread is easier to work with)
This commit is contained in:
Dylan K. Taylor 2018-05-10 12:33:05 +01:00
parent 1e4a97f921
commit 9c5f7128a4
3 changed files with 133 additions and 132 deletions

View File

@ -1539,8 +1539,7 @@ class Server{
$this->getConfigString("rcon.password", ""),
$this->getConfigInt("rcon.port", $this->getPort()),
$this->getIp(),
$this->getConfigInt("rcon.threads", 1),
$this->getConfigInt("rcon.clients-per-thread", 50)
$this->getConfigInt("rcon.max-clients", 50)
);
}catch(\Throwable $e){
$this->getLogger()->critical("RCON can't be started: " . $e->getMessage());
@ -2499,11 +2498,6 @@ class Server{
Timings::$connectionTimer->startTiming();
$this->network->processInterfaces();
if($this->rcon !== null){
$this->rcon->check();
}
Timings::$connectionTimer->stopTiming();
Timings::$schedulerTimer->startTiming();

View File

@ -30,6 +30,7 @@ namespace pocketmine\network\rcon;
use pocketmine\command\RemoteConsoleCommandSender;
use pocketmine\event\server\RemoteServerCommandEvent;
use pocketmine\Server;
use pocketmine\snooze\SleeperNotifier;
use pocketmine\utils\TextFormat;
class RCON{
@ -37,25 +38,22 @@ class RCON{
private $server;
/** @var resource */
private $socket;
/** @var string */
private $password;
/** @var int */
private $threads;
/** @var RCONInstance[] */
private $workers = [];
/** @var int */
private $clientsPerThread;
public function __construct(Server $server, string $password, int $port = 19132, string $interface = "0.0.0.0", int $threads = 1, int $clientsPerThread = 50){
/** @var RCONInstance */
private $instance;
/** @var resource */
private $ipcMainSocket;
/** @var resource */
private $ipcThreadSocket;
public function __construct(Server $server, string $password, int $port = 19132, string $interface = "0.0.0.0", int $maxClients = 50){
$this->server = $server;
$this->password = $password;
$this->server->getLogger()->info("Starting remote control listener");
if($this->password === ""){
if($password === ""){
throw new \InvalidArgumentException("Empty password");
}
$this->threads = (int) max(1, $threads);
$this->clientsPerThread = (int) max(1, $clientsPerThread);
$this->socket = socket_create(AF_INET, SOCK_STREAM, SOL_TCP);
if($this->socket === false or !@socket_bind($this->socket, $interface, $port) or !@socket_listen($this->socket)){
@ -64,52 +62,51 @@ class RCON{
socket_set_block($this->socket);
for($n = 0; $n < $this->threads; ++$n){
$this->workers[$n] = new RCONInstance($this->socket, $this->password, $this->clientsPerThread);
$ret = @socket_create_pair(AF_UNIX, SOCK_STREAM, 0, $ipc);
if(!$ret){
$err = socket_last_error();
if(($err !== SOCKET_EPROTONOSUPPORT and $err !== SOCKET_ENOPROTOOPT) or !@socket_create_pair(AF_INET, SOCK_STREAM, 0, $ipc)){
throw new \RuntimeException(trim(socket_strerror(socket_last_error())));
}
}
[$this->ipcMainSocket, $this->ipcThreadSocket] = $ipc;
$notifier = new SleeperNotifier();
$this->server->getTickSleeper()->addNotifier($notifier, function() : void{
$this->check();
});
$this->instance = new RCONInstance($this->socket, $password, (int) max(1, $maxClients), $this->server->getLogger(), $this->ipcThreadSocket, $notifier);
socket_getsockname($this->socket, $addr, $port);
$this->server->getLogger()->info("RCON running on $addr:$port");
}
public function stop(){
for($n = 0; $n < $this->threads; ++$n){
$this->workers[$n]->close();
Server::microSleep(50000);
$this->workers[$n]->quit();
}
$this->instance->close();
socket_write($this->ipcMainSocket, "\x00"); //make select() return
Server::microSleep(50000);
$this->instance->quit();
@socket_close($this->socket);
$this->threads = 0;
@socket_close($this->ipcMainSocket);
@socket_close($this->ipcThreadSocket);
}
public function check(){
for($n = 0; $n < $this->threads; ++$n){
if($this->workers[$n]->isTerminated()){
$this->workers[$n] = new RCONInstance($this->socket, $this->password, $this->clientsPerThread);
}elseif($this->workers[$n]->isWaiting()){
if($this->workers[$n]->response !== ""){
$this->server->getLogger()->info($this->workers[$n]->response);
$this->workers[$n]->synchronized(function(RCONInstance $thread){
$thread->notify();
}, $this->workers[$n]);
}else{
$response = new RemoteConsoleCommandSender();
$command = $this->instance->cmd;
$response = new RemoteConsoleCommandSender();
$command = $this->workers[$n]->cmd;
$this->server->getPluginManager()->callEvent($ev = new RemoteServerCommandEvent($response, $command));
$this->server->getPluginManager()->callEvent($ev = new RemoteServerCommandEvent($response, $command));
if(!$ev->isCancelled()){
$this->server->dispatchCommand($ev->getSender(), $ev->getCommand());
}
$this->workers[$n]->response = TextFormat::clean($response->getMessage());
$this->workers[$n]->synchronized(function(RCONInstance $thread){
$thread->notify();
}, $this->workers[$n]);
}
}
if(!$ev->isCancelled()){
$this->server->dispatchCommand($ev->getSender(), $ev->getCommand());
}
$this->instance->response = TextFormat::clean($response->getMessage());
$this->instance->synchronized(function(RCONInstance $thread){
$thread->notify();
}, $this->instance);
}
}

View File

@ -23,13 +23,11 @@ declare(strict_types=1);
namespace pocketmine\network\rcon;
use pocketmine\snooze\SleeperNotifier;
use pocketmine\Thread;
use pocketmine\utils\Binary;
class RCONInstance extends Thread{
private const STATUS_DISCONNECTED = -1;
private const STATUS_AUTHENTICATING = 0;
private const STATUS_CONNECTED = 1;
/** @var string */
public $cmd;
@ -44,30 +42,31 @@ class RCONInstance extends Thread{
private $password;
/** @var int */
private $maxClients;
/** @var bool */
private $waiting;
public function isWaiting(){
return $this->waiting;
}
/** @var \ThreadedLogger */
private $logger;
/** @var resource */
private $ipcSocket;
/** @var SleeperNotifier|null */
private $notifier;
/**
* @param resource $socket
* @param string $password
* @param int $maxClients
* @param resource $socket
* @param string $password
* @param int $maxClients
* @param \ThreadedLogger $logger
* @param resource $ipcSocket
* @param null|SleeperNotifier $notifier
*/
public function __construct($socket, string $password, int $maxClients = 50){
public function __construct($socket, string $password, int $maxClients = 50, \ThreadedLogger $logger, $ipcSocket, ?SleeperNotifier $notifier){
$this->stop = false;
$this->cmd = "";
$this->response = "";
$this->socket = $socket;
$this->password = $password;
$this->maxClients = $maxClients;
for($n = 0; $n < $this->maxClients; ++$n){
$this->{"client" . $n} = null;
$this->{"status" . $n} = self::STATUS_DISCONNECTED;
$this->{"timeout" . $n} = 0;
}
$this->logger = $logger;
$this->ipcSocket = $ipcSocket;
$this->notifier = $notifier;
$this->start(PTHREADS_INHERIT_NONE);
}
@ -81,7 +80,6 @@ class RCONInstance extends Thread{
}
private function readPacket($client, &$requestID, &$packetType, &$payload){
socket_set_nonblock($client);
$d = socket_read($client, 4);
if($this->stop){
return false;
@ -90,7 +88,7 @@ class RCONInstance extends Thread{
}elseif($d === "" or strlen($d) < 4){
return false;
}
socket_set_block($client);
$size = Binary::readLInt($d);
if($size < 0 or $size > 65535){
return false;
@ -107,44 +105,49 @@ class RCONInstance extends Thread{
public function run(){
$this->registerClassLoader();
/** @var resource[] $clients */
$clients = [];
/** @var int[] $authenticated */
$authenticated = [];
/** @var float[] $timeouts */
$timeouts = [];
/** @var int $nextClientId */
$nextClientId = 0;
while(!$this->stop){
$this->synchronized(function(){
$this->wait(2000);
});
$r = [$socket = $this->socket];
$r = $clients;
$r["main"] = $this->socket; //this is ugly, but we need to be able to mass-select()
$r["ipc"] = $this->ipcSocket;
$w = null;
$e = null;
if(socket_select($r, $w, $e, 0) === 1){
if(($client = socket_accept($this->socket)) !== false){
socket_set_block($client);
socket_set_option($client, SOL_SOCKET, SO_KEEPALIVE, 1);
$done = false;
for($n = 0; $n < $this->maxClients; ++$n){
if($this->{"client" . $n} === null){
$this->{"client" . $n} = $client;
$this->{"status" . $n} = self::STATUS_AUTHENTICATING;
$this->{"timeout" . $n} = microtime(true) + 5;
$done = true;
break;
}
}
if(!$done){
@socket_close($client);
}
}
}
for($n = 0; $n < $this->maxClients; ++$n){
$client = &$this->{"client" . $n};
if($client !== null){
if($this->{"status" . $n} !== self::STATUS_DISCONNECTED and !$this->stop){
if($this->{"status" . $n} === self::STATUS_AUTHENTICATING and $this->{"timeout" . $n} < microtime(true)){ //Timeout
$this->{"status" . $n} = self::STATUS_DISCONNECTED;
continue;
$disconnect = [];
if(socket_select($r, $w, $e, 5, 0) > 0){
foreach($r as $id => $sock){
if($sock === $this->socket){
if(($client = socket_accept($this->socket)) !== false){
if(count($clients) >= $this->maxClients){
@socket_close($client);
}else{
socket_set_block($client);
socket_set_option($client, SOL_SOCKET, SO_KEEPALIVE, 1);
$id = $nextClientId++;
$clients[$id] = $client;
$authenticated[$id] = false;
$timeouts[$id] = microtime(true) + 5;
}
}
$p = $this->readPacket($client, $requestID, $packetType, $payload);
}elseif($sock === $this->ipcSocket){
//read dummy data
socket_read($sock, 65535);
}else{
$p = $this->readPacket($sock, $requestID, $packetType, $payload);
if($p === false){
$this->{"status" . $n} = self::STATUS_DISCONNECTED;
$disconnect[$id] = $sock;
continue;
}elseif($p === null){
continue;
@ -152,57 +155,64 @@ class RCONInstance extends Thread{
switch($packetType){
case 3: //Login
if($this->{"status" . $n} !== self::STATUS_AUTHENTICATING){
$this->{"status" . $n} = self::STATUS_DISCONNECTED;
if($authenticated[$id]){
$disconnect[$id] = $sock;
break;
}
if($payload === $this->password){
socket_getpeername($client, $addr, $port);
$this->response = "[INFO] Successful Rcon connection from: /$addr:$port";
$this->synchronized(function(){
$this->waiting = true;
$this->wait();
});
$this->waiting = false;
$this->response = "";
$this->writePacket($client, $requestID, 2, "");
$this->{"status" . $n} = self::STATUS_CONNECTED;
socket_getpeername($sock, $addr, $port);
$this->logger->info("Successful Rcon connection from: /$addr:$port");
$this->writePacket($sock, $requestID, 2, "");
$authenticated[$id] = true;
}else{
$this->{"status" . $n} = self::STATUS_DISCONNECTED;
$this->writePacket($client, -1, 2, "");
$disconnect[$id] = $sock;
$this->writePacket($sock, -1, 2, "");
}
break;
case 2: //Command
if($this->{"status" . $n} !== self::STATUS_CONNECTED){
$this->{"status" . $n} = self::STATUS_DISCONNECTED;
if(!$authenticated[$id]){
$disconnect[$id] = $sock;
break;
}
if(strlen($payload) > 0){
$this->cmd = ltrim($payload);
$this->synchronized(function(){
$this->waiting = true;
$this->notifier->wakeupSleeper();
$this->wait();
});
$this->waiting = false;
$this->writePacket($client, $requestID, 0, str_replace("\n", "\r\n", trim($this->response)));
$this->writePacket($sock, $requestID, 0, str_replace("\n", "\r\n", trim($this->response)));
$this->response = "";
$this->cmd = "";
}
break;
}
}else{
@socket_set_option($client, SOL_SOCKET, SO_LINGER, ["l_onoff" => 1, "l_linger" => 1]);
@socket_shutdown($client, 2);
@socket_set_block($client);
@socket_read($client, 1);
@socket_close($client);
$this->{"status" . $n} = self::STATUS_DISCONNECTED;
$this->{"client" . $n} = null;
}
}
}
foreach($authenticated as $id => $status){
if(!isset($disconnect[$id]) and !$authenticated[$id] and $timeouts[$id] < microtime(true)){ //Timeout
$disconnect[$id] = $clients[$id];
}
}
foreach($disconnect as $id => $client){
$this->disconnectClient($client);
unset($clients[$id], $authenticated[$id], $timeouts[$id]);
}
}
foreach($clients as $client){
$this->disconnectClient($client);
}
}
private function disconnectClient($client) : void{
@socket_set_option($client, SOL_SOCKET, SO_LINGER, ["l_onoff" => 1, "l_linger" => 1]);
@socket_shutdown($client, 2);
@socket_set_block($client);
@socket_read($client, 1);
@socket_close($client);
}
public function getThreadName() : string{