From 92a0ddb4fe9b2cf1874614156879fc9e8e5b3782 Mon Sep 17 00:00:00 2001 From: Shoghi Cervantes Date: Wed, 26 Mar 2014 16:48:27 +0100 Subject: [PATCH] Threaded networking :D --- src/PocketMine/Player.php | 8 +- src/PocketMine/Server.php | 114 +++++++--- src/PocketMine/command/SimpleCommandMap.php | 8 +- .../command/defaults/PluginsCommand.php | 62 ++++++ src/PocketMine/network/Handler.php | 116 ----------- src/PocketMine/network/ThreadedHandler.php | 197 ++++++++++++++++++ src/PocketMine/network/UDPSocket.php | 72 ------- src/PocketMine/network/query/QueryHandler.php | 7 +- 8 files changed, 352 insertions(+), 232 deletions(-) create mode 100644 src/PocketMine/command/defaults/PluginsCommand.php delete mode 100644 src/PocketMine/network/Handler.php create mode 100644 src/PocketMine/network/ThreadedHandler.php delete mode 100644 src/PocketMine/network/UDPSocket.php diff --git a/src/PocketMine/Player.php b/src/PocketMine/Player.php index 6641b74c0..0e302f1de 100644 --- a/src/PocketMine/Player.php +++ b/src/PocketMine/Player.php @@ -22,8 +22,8 @@ namespace PocketMine; use PocketMine\Entity\RealHuman; -use PocketMine\Event\EventHandler; use PocketMine\Event; +use PocketMine\Event\EventHandler; use PocketMine\Item\Item; use PocketMine\Level\Level; use PocketMine\Level\Position; @@ -37,7 +37,6 @@ use PocketMine\NBT\Tag\Float; use PocketMine\NBT\Tag\Int; use PocketMine\NBT\Tag\Short; use PocketMine\NBT\Tag\String; -use PocketMine\Command\CommandSender; use PocketMine\Network\Protocol\AdventureSettingsPacket; use PocketMine\Network\Protocol\AnimatePacket; use PocketMine\Network\Protocol\ChunkDataPacket; @@ -81,7 +80,7 @@ use PocketMine\Utils\Utils; * Class Player * @package PocketMine */ -class Player extends RealHuman/*TODO: implements CommandSender*/{ +class Player extends RealHuman /*TODO: implements CommandSender*/{ const BROADCAST_CHANNEL_ADMINISTRATIVE = "pocketmine.broadcast.admin"; const BROADCAST_CHANNEL_USERS = "pocketmine.broadcast.user"; @@ -467,7 +466,7 @@ class Player extends RealHuman/*TODO: implements CommandSender*/{ if($this->connected === true){ $packet->ip = $this->ip; $packet->port = $this->port; - $this->bandwidthRaw += $this->server->send($packet); + $this->bandwidthRaw += $this->server->sendPacket($packet); } } @@ -663,6 +662,7 @@ class Player extends RealHuman/*TODO: implements CommandSender*/{ if(EventHandler::callEvent(new Event\Player\PlayerAchievementAwardedEvent($this, $achievementId)) !== Event\Event::DENY){ $this->achievements[$achievementId] = true; Achievement::broadcast($this, $achievementId); + return true; }else{ return false; diff --git a/src/PocketMine/Server.php b/src/PocketMine/Server.php index 43752a317..4521661cc 100644 --- a/src/PocketMine/Server.php +++ b/src/PocketMine/Server.php @@ -25,24 +25,30 @@ */ namespace PocketMine; -use PocketMine\Network\Packet; -use PocketMine\Utils\Config; -use PocketMine\Utils\Utils; -use PocketMine\Utils\VersionString; -use PocketMine\Network\UPnP\UPnP; -use PocketMine\Utils\TextFormat; -use PocketMine\Plugin\PluginManager; -use PocketMine\Tile\Tile; -use PocketMine\Entity\Entity; -use PocketMine\Level\Level; use PocketMine\Block\Block; +use PocketMine\Command\ConsoleCommandSender; +use PocketMine\Command\SimpleCommandMap; +use PocketMine\Entity\Entity; +use PocketMine\Event\Event; +use PocketMine\Event\EventHandler; +use PocketMine\Event\Server\PacketReceiveEvent; +use PocketMine\Event\Server\PacketSendEvent; use PocketMine\Item\Item; +use PocketMine\Level\Level; +use PocketMine\Network\Packet; +use PocketMine\Network\Query\QueryHandler; +use PocketMine\Network\Query\QueryPacket; +use PocketMine\Network\ThreadedHandler; +use PocketMine\Network\UPnP\UPnP; +use PocketMine\Plugin\PluginManager; use PocketMine\Recipes\Crafting; -use PocketMine\Network\Handler; use PocketMine\Scheduler\ServerScheduler; use PocketMine\Scheduler\TickScheduler; -use PocketMine\Command\SimpleCommandMap; -use PocketMine\Command\ConsoleCommandSender; +use PocketMine\Tile\Tile; +use PocketMine\Utils\Config; +use PocketMine\Utils\TextFormat; +use PocketMine\Utils\Utils; +use PocketMine\Utils\VersionString; class Server{ /** @@ -94,7 +100,7 @@ class Server{ private $inTick = false; /** - * @var Handler + * @var ThreadedHandler */ private $interface; @@ -104,6 +110,11 @@ class Server{ private $dataPath; private $pluginPath; + /** + * @var QueryHandler + */ + private $queryHandler; + /** * @var Config */ @@ -255,14 +266,19 @@ class Server{ public function getTick(){ return $this->tickCounter; } + /** * @return float */ public function getTicksPerSecond(){ - $v = array_values($this->tickMeasure); - $tps = 20 / ($v[19] - $v[0]); + return $this->tickScheduler->getTPS(); + } - return round($tps, 4); + /** + * @return ThreadedHandler + */ + public function getNetwork(){ + return $this->interface; } /** @@ -276,6 +292,7 @@ class Server{ if(isset($v[$variable])){ return (string) $v[$variable]; } + return $this->properties->exists($variable) ? $this->properties->get($variable) : $defaultValue; } @@ -289,7 +306,7 @@ class Server{ /** * @param string $variable - * @param int $defaultValue + * @param int $defaultValue * * @return int */ @@ -298,19 +315,20 @@ class Server{ if(isset($v[$variable])){ return (int) $v[$variable]; } + return $this->properties->exists($variable) ? (int) $this->properties->get($variable) : (int) $defaultValue; } /** * @param string $variable - * @param int $value + * @param int $value */ public function setConfigInt($variable, $value){ $this->properties->set($variable, (int) $value); } /** - * @param string $variable + * @param string $variable * @param boolean $defaultValue * * @return boolean @@ -332,15 +350,16 @@ class Server{ case "yes": return true; } + return false; } /** * @param string $variable - * @param bool $value + * @param bool $value */ public function setConfigBool($variable, $value){ - $this->properties->set($variable, $value == true ? "1":"0"); + $this->properties->set($variable, $value == true ? "1" : "0"); } /** @@ -436,7 +455,7 @@ class Server{ console("[INFO] Starting Minecraft PE server on " . ($this->getIp() === "" ? "*" : $this->getIp()) . ":" . $this->getPort()); define("BOOTUP_RANDOM", Utils::getRandomBytes(16)); $this->serverID = Utils::readLong(substr(Utils::getUniqueID(true, $this->getIp() . $this->getPort()), 8)); - $this->interface = new Handler("255.255.255.255", $this->getPort(), $this->getIp() === "" ? "0.0.0.0":$this->getIp()); + $this->interface = new ThreadedHandler("255.255.255.255", $this->getPort(), $this->getIp() === "" ? "0.0.0.0" : $this->getIp()); console("[INFO] This server is running PocketMine-MP version " . ($version->isDev() ? TextFormat::YELLOW : "") . $this->getPocketMineVersion() . TextFormat::RESET . " \"" . $this->getCodename() . "\" (API " . $this->getApiVersion() . ")", true, true, 0); console("[INFO] PocketMine-MP is distributed under the LGPL License", true, true, 0); @@ -466,12 +485,12 @@ class Server{ } if(!defined("NO_THREADS") and $this->getProperty("enable-rcon") === true){ $this->rcon = new RCON($this->getProperty("rcon.password", ""), $this->getProperty("rcon.port", $this->getProperty("server-port")), ($ip = $this->getProperty("server-ip")) != "" ? $ip : "0.0.0.0", $this->getProperty("rcon.threads", 1), $this->getProperty("rcon.clients-per-thread", 50)); - } - - if($this->getProperty("enable-query") === true){ - $this->query = new QueryHandler(); }*/ + if($this->getConfigBoolean("enable-query", true) === true){ + $this->queryHandler = new QueryHandler(); + } + //$this->schedule(2, array($this, "checkTickUpdates"), array(), true); } @@ -507,11 +526,13 @@ class Server{ } } - private function tickProcessorWindows(){ $lastLoop = 0; - + private function tickProcessorWindows(){ + $lastLoop = 0; while(true){ if(($packet = $this->interface->readPacket()) instanceof Packet){ - var_dump($packet); + if(EventHandler::callEvent(new PacketReceiveEvent($packet)) !== Event::DENY){ + $this->handlePacket($packet); + } $lastLoop = 0; } if(($ticks = $this->tick()) !== true){ @@ -529,7 +550,9 @@ class Server{ $lastLoop = 0; while(true){ if(($packet = $this->interface->readPacket()) instanceof Packet){ - var_dump($packet); + if(EventHandler::callEvent(new PacketReceiveEvent($packet)) !== Event::DENY){ + $this->handlePacket($packet); + } $lastLoop = 0; } if(($ticks = $this->tick()) !== true){ @@ -547,6 +570,27 @@ class Server{ } } + public function handlePacket(Packet $packet){ + if($packet instanceof QueryPacket and isset($this->queryHandler)){ + $this->queryHandler->handle($packet); + } + } + + /** + * Sends a packet to the processing queue. Returns the number of bytes + * + * @param Packet $packet + * + * @return int + */ + public function sendPacket(Packet $packet){ + if(EventHandler::callEvent(new PacketSendEvent($packet)) !== Event::DENY){ + return $this->interface->writePacket($packet); + } + + return 0; + } + private function checkTickUpdates(){ //Update entities that need update if(count(Entity::$needUpdate) > 0){ @@ -610,9 +654,8 @@ class Server{ public function titleTick(){ $time = microtime(true); if(defined("PocketMine\\DEBUG") and \PocketMine\DEBUG >= 0 and \PocketMine\ANSI === true){ - echo "\x1b]0;PocketMine-MP " .$this->getPocketMineVersion() . " | Online " . count(Player::$list) . "/" . $this->getMaxPlayers() . " | RAM " . round((memory_get_usage() / 1024) / 1024, 2) . "MB | U " . round(($this->interface->bandwidth[1] / max(1, $time - $this->interface->bandwidth[2])) / 1024, 2) . " D " . round(($this->interface->bandwidth[0] / max(1, $time - $this->interface->bandwidth[2])) / 1024, 2) . " kB/s | TPS " . $this->getTPS() . "\x07"; + echo "\x1b]0;PocketMine-MP " . $this->getPocketMineVersion() . " | Online " . count(Player::$list) . "/" . $this->getMaxPlayers() . " | RAM " . round((memory_get_usage() / 1024) / 1024, 2) . "/" . round((memory_get_usage(true) / 1024) / 1024, 2) . " MB | U " . round($this->interface->getUploadSpeed() / 1024, 2) . " D " . round($this->interface->getDownloadSpeed() / 1024, 2) . " kB/s | TPS " . $this->getTPS() . "\x07"; } - $this->interface->bandwidth = array(0, 0, $time); } /** @@ -638,12 +681,17 @@ class Server{ $this->checkTickUpdates(); if(($this->tickCounter & 0b1111) === 0){ $this->titleTick(); + if(isset($this->queryHandler) and ($this->tickCounter & 0b111111111) === 0){ + $this->queryHandler->regenerateInfo(); + } } } $this->tickScheduler->doTick(); $this->inTick = false; + return true; } + return false; } diff --git a/src/PocketMine/command/SimpleCommandMap.php b/src/PocketMine/command/SimpleCommandMap.php index ec77ff9fb..fc6987000 100644 --- a/src/PocketMine/command/SimpleCommandMap.php +++ b/src/PocketMine/command/SimpleCommandMap.php @@ -21,10 +21,11 @@ namespace PocketMine\Command; -use PocketMine; -use PocketMine\Server; -use PocketMine\Command\Defaults\VersionCommand; +use PocketMine\Command\Defaults\PluginsCommand; use PocketMine\Command\Defaults\VanillaCommand; +use PocketMine\Command\Defaults\VersionCommand; +use PocketMine\Server; +use PocketMine; class SimpleCommandMap implements CommandMap{ @@ -46,6 +47,7 @@ class SimpleCommandMap implements CommandMap{ private function setDefaultCommands(){ //TODO $this->register("pocketmine", new VersionCommand("version")); + $this->register("pocketmine", new PluginsCommand("plugins")); } diff --git a/src/PocketMine/command/defaults/PluginsCommand.php b/src/PocketMine/command/defaults/PluginsCommand.php new file mode 100644 index 000000000..34c209274 --- /dev/null +++ b/src/PocketMine/command/defaults/PluginsCommand.php @@ -0,0 +1,62 @@ +setPermission("pocketmine.command.plugins"); + } + + public function execute(CommandSender $sender, $currentAlias, array $args){ + if(!$this->testPermission($sender)){ + return true; + } + + $sender->sendMessage("Plugins " . $this->getPluginList()); + + return true; + } + + private function getPluginList(){ + $list = ""; + foreach(($plugins = PocketMine\Server::getInstance()->getPluginManager()->getPlugins()) as $plugin){ + if(strlen($list) > 0){ + $list .= TextFormat::WHITE . ", "; + } + $list .= $plugin->isEnabled() ? TextFormat::GREEN : TextFormat::RED; + $list .= $plugin->getDescription()->getName(); + } + + return "(" . count($plugins) . "): $list"; + } +} \ No newline at end of file diff --git a/src/PocketMine/network/Handler.php b/src/PocketMine/network/Handler.php deleted file mode 100644 index 2801bc6cc..000000000 --- a/src/PocketMine/network/Handler.php +++ /dev/null @@ -1,116 +0,0 @@ -socket = new UDPSocket($server, $port, true, $serverip); - if($this->socket->connected === false){ - console("[SEVERE] Couldn't bind to $serverip:" . $port, true, true, 0); - exit(1); - } - $this->bandwidth = array(0, 0, microtime(true)); - $this->packets = array(); - } - - public function close(){ - $this->socket->close(false); - } - - public function readPacket(){ - $buf = null; - $source = null; - $port = null; - $len = $this->socket->read($buffer, $source, $port); - if($len === false or $len === 0){ - return false; - } - $this->bandwidth[0] += $len; - - $pid = ord($buffer{0}); - - if(Info::isValid($pid)){ - $packet = new Packet($pid); - $packet->buffer =& $buffer; - $packet->ip = $source; - $packet->port = $port; - $packet->decode(); - if(EventHandler::callEvent(new PacketReceiveEvent($packet)) === Event::DENY){ - return false; - } - - return $packet; - }elseif($pid === 0xfe and $buffer{1} === "\xfd" and Server::getInstance()->api->query instanceof QueryHandler){ - $packet = new QueryPacket; - $packet->ip = $source; - $packet->port = $port; - $packet->buffer =& $buffer; - if(EventHandler::callEvent(new PacketReceiveEvent($packet)) === Event::DENY){ - return false; - } - Server::getInstance()->api->query->handle($packet); - }else{ - $packet = new Packet($pid); - $packet->ip = $source; - $packet->port = $port; - $packet->buffer =& $buffer; - EventHandler::callEvent(new PacketReceiveEvent($packet)); - - return false; - } - - return true; - } - - public function writePacket(Packet $packet){ - if(EventHandler::callEvent(new PacketSendEvent($packet)) === Event::DENY){ - return 0; - }elseif($packet instanceof Packet){ - $packet->encode(); - } - $write = $this->socket->write($packet->buffer, $packet->ip, $packet->port); - $this->bandwidth[1] += $write; - - return $write; - } - -} - -?> \ No newline at end of file diff --git a/src/PocketMine/network/ThreadedHandler.php b/src/PocketMine/network/ThreadedHandler.php new file mode 100644 index 000000000..6c346d746 --- /dev/null +++ b/src/PocketMine/network/ThreadedHandler.php @@ -0,0 +1,197 @@ +server = $server; + $this->port = $port; + $this->serverip = $serverip; + $this->bandwidthUp = 0; + $this->bandwidthDown = 0; + $this->bandwidthTime = microtime(true); + $this->packets = new \Threaded(); + $this->queue = new \Threaded(); + $this->stop = false; + + //Load the classes so the Thread gets them + Info::isValid(0); + new Packet(0); + new QueryPacket(); + new RakNetPacket(0); + + $this->start(PTHREADS_INHERIT_ALL); + } + + public function close(){ + $this->synchronized(function (){ + $this->stop = true; + socket_close($this->socket); + }); + } + + /** + * Upload speed in bytes/s + * + * @return float + */ + public function getUploadSpeed(){ + return $this->synchronized(function (){ + return $this->bandwidthUp / max(1, microtime(true) - $this->bandwidthTime); + }); + } + + /** + * Download speed in bytes/s + * + * @return float + */ + public function getDownloadSpeed(){ + return $this->synchronized(function (){ + return $this->bandwidthDown / max(1, microtime(true) - $this->bandwidthTime); + }); + } + + + /** + * @return Packet + */ + public function readPacket(){ + return $this->packets->synchronized(function (){ + //$this->notify(); + return $this->packets->shift(); + }); + } + + /** + * @param Packet $packet + * + * @return int + */ + public function writePacket(Packet $packet){ + return $this->queue->synchronized(function ($packet){ + $this->queue[] = $packet; + + //$this->notify(); + return strlen($packet->buffer); + }, $packet); + } + + public function run(){ + $this->socket = socket_create(AF_INET, SOCK_DGRAM, SOL_UDP); + socket_set_option($this->socket, SOL_SOCKET, SO_BROADCAST, 1); //Allow sending broadcast messages + if(socket_bind($this->socket, $this->serverip, $this->port) === true){ + socket_set_option($this->socket, SOL_SOCKET, SO_REUSEADDR, 0); + @socket_set_option($this->socket, SOL_SOCKET, SO_SNDBUF, 1024 * 1024 * 2); //2MB + @socket_set_option($this->socket, SOL_SOCKET, SO_RCVBUF, 1024 * 1024); //1MB + }else{ + console("[SEVERE] Couldn't bind to $this->serverip:" . $this->port, true, true, 0); + exit(1); + } + socket_set_nonblock($this->socket); + + $count = 0; + while($this->stop === false){ + if($this->getPacket() === false and $this->putPacket() === false){ + ++$count; + }else{ + $count = 0; + } + if($count > 128){ + $this->wait(100000); + } + } + } + + private function putPacket(){ + if(($packet = $this->queue->synchronized(function (){ + return $this->queue->shift(); + })) instanceof Packet + ){ + if($packet instanceof RakNetPacket){ + $packet->encode(); + } + $this->bandwidthUp += @socket_sendto($this->socket, $packet->buffer, strlen($packet->buffer), 0, $packet->ip, $packet->port); + + return true; + } + + return false; + } + + private function getPacket(){ + $buffer = null; + $source = null; + $port = null; + $len = @socket_recvfrom($this->socket, $buffer, 65535, 0, $source, $port); + if($len === false or $len == 0){ + return false; + } + $this->bandwidthDown += $len; + $pid = ord($buffer{0}); + if(Info::isValid($pid)){ + $packet = new RakNetPacket($pid); + $packet->buffer =& $buffer; + $packet->ip = $source; + $packet->port = $port; + $packet->decode(); + }elseif($pid === 0xfe and $buffer{1} === "\xfd"){ + $packet = new QueryPacket; + $packet->ip = $source; + $packet->port = $port; + $packet->buffer =& $buffer; + }else{ + $packet = new Packet($pid); + $packet->ip = $source; + $packet->port = $port; + $packet->buffer =& $buffer; + } + $this->packets->synchronized(function (Packet $packet){ + $this->packets[] = $packet; + }, $packet); + + return true; + } + +} + +?> \ No newline at end of file diff --git a/src/PocketMine/network/UDPSocket.php b/src/PocketMine/network/UDPSocket.php deleted file mode 100644 index 9cbbc7e00..000000000 --- a/src/PocketMine/network/UDPSocket.php +++ /dev/null @@ -1,72 +0,0 @@ -server = $server; - $this->port = $port; - $this->sock = socket_create(AF_INET, SOCK_DGRAM, SOL_UDP); - socket_set_option($this->sock, SOL_SOCKET, SO_BROADCAST, 1); //Allow sending broadcast messages - if($listen !== true){ - $this->connected = true; - $this->unblock(); - }else{ - if(socket_bind($this->sock, $serverip, $port) === true){ - socket_set_option($this->sock, SOL_SOCKET, SO_REUSEADDR, 0); - @socket_set_option($this->sock, SOL_SOCKET, SO_SNDBUF, 1024 * 1024 * 2); //2MB - @socket_set_option($this->sock, SOL_SOCKET, SO_RCVBUF, 1024 * 1024); //1MB - $this->unblock(); - $this->connected = true; - }else{ - $this->connected = false; - } - } - } - - public function close($error = 125){ - socket_close($this->sock); - } - - public function block(){ - socket_set_block($this->sock); - } - - public function unblock(){ - socket_set_nonblock($this->sock); - } - - public function read(&$buf, &$source, &$port){ - return @socket_recvfrom($this->sock, $buf, 65535, 0, $source, $port); - } - - public function write($data, $dest, $port){ - return @socket_sendto($this->sock, $data, strlen($data), 0, $dest, $port); - } - -} - -?> \ No newline at end of file diff --git a/src/PocketMine/network/query/QueryHandler.php b/src/PocketMine/network/query/QueryHandler.php index 137fdc1da..ed8c51c6c 100644 --- a/src/PocketMine/network/query/QueryHandler.php +++ b/src/PocketMine/network/query/QueryHandler.php @@ -25,11 +25,11 @@ */ namespace PocketMine\Network\Query; -use PocketMine; use PocketMine\Level\Level; use PocketMine\Player; use PocketMine\Server; use PocketMine\Utils\Utils; +use PocketMine; class QueryHandler{ private $socket, $server, $lastToken, $token, $longData, $timeout; @@ -49,7 +49,6 @@ class QueryHandler{ packets can conflict with the MCPE ones. */ - $this->server->schedule(20 * 30, array($this, "regenerateToken"), array(), true); $this->regenerateToken(); $this->lastToken = $this->token; $this->regenerateInfo(); @@ -116,7 +115,7 @@ class QueryHandler{ $pk->sessionID = $packet->sessionID; $pk->payload = self::getTokenString($this->token, $packet->ip) . "\x00"; $pk->encode(); - $this->server->send($pk); + $this->server->sendPacket($pk); break; case QueryPacket::STATISTICS: //Stat $token = Utils::readInt(substr($packet->payload, 0, 4)); @@ -137,7 +136,7 @@ class QueryHandler{ $pk->payload = $this->server->getServerName() . "\x00" . (($this->server->getGamemode() & 0x01) === 0 ? "SMP" : "CMP") . "\x00" . Level::getDefault()->getName() . "\x00" . count(Player::$list) . "\x00" . $this->server->getMaxPlayers() . "\x00" . Utils::writeLShort($this->server->getPort()) . $this->server->getIp() . "\x00"; } $pk->encode(); - $this->server->send($pk); + $this->server->sendPacket($pk); break; } }