Added full async operations in a separate Thread

This commit is contained in:
Shoghi Cervantes 2013-06-12 19:38:11 +02:00
parent 2e23ce8beb
commit aa7d9b5e05
4 changed files with 146 additions and 16 deletions

View File

@ -186,20 +186,24 @@ class ServerAPI{
foreach($this->plugin->getList() as $p){
$plist .= str_replace(array(";", ":"), "", $p["name"]).":".str_replace(array(";", ":"), "", $p["version"]).";";
}
Utils::curl_post("http://stats.pocketmine.net/usage.php", array(
"serverid" => $this->server->serverID,
"port" => $this->server->port,
"os" => Utils::getOS(),
"memory_total" => $this->getProperty("memory-limit"),
"memory_usage" => memory_get_usage(true),
"php_version" => PHP_VERSION,
"version" => MAJOR_VERSION,
"mc_version" => CURRENT_MINECRAFT_VERSION,
"protocol" => CURRENT_PROTOCOL,
"online" => count($this->server->clients),
"max" => $this->server->maxClients,
"plugins" => $plist,
), 2);
$this->asyncOperation(ASYNC_CURL_POST, array(
"url" => "http://stats.pocketmine.org/usage.php",
"data" => array(
"serverid" => $this->server->serverID,
"port" => $this->server->port,
"os" => Utils::getOS(),
"memory_total" => $this->getProperty("memory-limit"),
"memory_usage" => memory_get_usage(true),
"php_version" => PHP_VERSION,
"version" => MAJOR_VERSION,
"mc_version" => CURRENT_MINECRAFT_VERSION,
"protocol" => CURRENT_PROTOCOL,
"online" => count($this->server->clients),
"max" => $this->server->maxClients,
"plugins" => $plist,
),
), NULL);
}
public function __destruct(){
@ -301,6 +305,10 @@ class ServerAPI{
/*-------------------------------------------------------------*/
public function asyncOperation($t, $d, $c = null){
return $this->server->asyncOperation($t, $d, $c);
}
public function addHandler($e, $c, $p = 5){
return $this->server->addHandler($e, $c, $p);
}

View File

@ -28,7 +28,7 @@ the Free Software Foundation, either version 3 of the License, or
class PocketMinecraftServer{
public $tCnt;
public $serverID, $interface, $database, $version, $invisible, $api, $tickMeasure, $preparedSQL, $seed, $gamemode, $name, $maxClients, $clients, $eidCnt, $custom, $description, $motd, $port, $saveEnabled;
private $serverip, $evCnt, $handCnt, $events, $eventsID, $handlers, $serverType, $lastTick, $ticks, $memoryStats;
private $serverip, $evCnt, $handCnt, $events, $eventsID, $handlers, $serverType, $lastTick, $ticks, $memoryStats, $async = array(), $asyncID = 0;
private function load(){
$this->version = new VersionString();
@ -73,6 +73,7 @@ class PocketMinecraftServer{
$this->reloadConfig();
$this->stop = false;
$this->ticks = 0;
$this->asyncThread = new AsyncMultipleQueue();
}
function __construct($name, $gamemode = SURVIVAL, $seed = false, $port = 19132, $serverip = "0.0.0.0"){
@ -107,6 +108,7 @@ class PocketMinecraftServer{
}
$this->schedule(20 * 15, array($this, "checkTicks"), array(), true);
$this->schedule(20 * 60 * 10, array($this, "checkMemory"), array(), true);
$this->schedule(20 * 5, array($this, "asyncOperationChecker"), array(), true);
}
public function checkTicks(){
@ -190,6 +192,7 @@ class PocketMinecraftServer{
$this->stop = true;
$this->trigger("server.close", $reason);
$this->interface->close();
@$this->asyncThread->stop = true;
}
}
@ -205,6 +208,59 @@ class PocketMinecraftServer{
}
}
public function asyncOperation($type, array $data, callable $callable = null){
$d = "";
$type = (int) $type;
switch($type){
case ASYNC_CURL_GET:
$d .= Utils::writeShort(strlen($data["url"])).$data["url"].(isset($data["timeout"]) ? Utils::writeShort($data["timeout"]) : Utils::writeShort(10));
break;
case ASYNC_CURL_POST:
$d .= Utils::writeShort(strlen($data["url"])).$data["url"].(isset($data["timeout"]) ? Utils::writeShort($data["timeout"]) : Utils::writeShort(10));
$d .= Utils::writeShort(count($data["data"]));
foreach($data["data"] as $key => $value){
$d .= Utils::writeInt(strlen($key)).$key . Utils::writeInt(strlen($value)).$value;
}
break;
default:
return false;
}
$ID = $this->asyncID++;
$this->async[$ID] = $callable;
$this->asyncThread->input .= Utils::writeInt($ID).Utils::writeShort($type).$d;
return $ID;
}
public function asyncOperationChecker(){
if(isset($this->asyncThread->output{5})){
$offset = 0;
$ID = Utils::readInt(substr($this->asyncThread->output, $offset, 4));
$offset += 4;
$type = Utils::readShort(substr($this->asyncThread->output, $offset, 2));
$offset += 2;
$data = array();
switch($type){
case ASYNC_CURL_GET:
case ASYNC_CURL_POST:
$len = Utils::readInt(substr($this->asyncThread->output, $offset, 4));
$offset += 2;
$data["result"] = substr($this->asyncThread->output, $offset, $len);
$offset += $len;
break;
}
$this->asyncThread->output = substr($this->asyncThread->output, $offset);
if(isset($this->async[$ID]) and $this->async[$ID] !== null and is_callable($this->async[$ID])){
if(is_array($this->async[$ID])){
$method = $this->async[$ID][1];
$result = $this->async[$ID][0]->$method($data, $type, $ID);
}else{
$result = $this->async[$ID]($data, $type, $ID);
}
}
unset($this->async[$ID]);
}
}
public function addHandler($event,callable $callable, $priority = 5){
if(!is_callable($callable)){
@ -223,6 +279,10 @@ class PocketMinecraftServer{
console("[INTERNAL] New handler ".(is_array($callable) ? get_class($callable[0])."::".$callable[1]:$callable)." to special event ".$event." (ID ".$hnid.")", true, true, 3);
return $hnid;
}
public function dhandle($e, $d){
return $this->handle($e, $d);
}
public function handle($event, &$data){
$this->preparedSQL->selectHandlers->reset();

View File

@ -36,6 +36,10 @@ if(!function_exists("cli_set_process_title")){
}
}
function dummy(){
}
function safe_var_dump($var, $cnt = 0){
switch(true){
case is_array($var):

View File

@ -25,12 +25,70 @@ the Free Software Foundation, either version 3 of the License, or
*/
define("ASYNC_CURL_GET", 1);
define("ASYNC_CURL_POST", 2);
class StackableArray{
public $counter = 0;
public function run(){}
}
class Async extends Thread {
class AsyncMultipleQueue extends Thread{
public $input;
public $output;
public $stop;
public function __construct(){
$this->input = "";
$this->output = "";
$this->stop = false;
$this->start();
}
private function get($len){
$str = "";
while(!isset($str{$len - 1})){
$str .= $this->input{0};
$this->input = substr($this->input, 1);
}
return $str;
}
public function run(){
while($this->stop === false){
if(isset($this->input{5})){ //len 6 min
$rID = Utils::readInt($this->get(4));
switch(Utils::readShort($this->get(2), false)){
case ASYNC_CURL_GET:
$url = $this->get(Utils::readShort($this->get(2), false));
$timeout = Utils::readShort($this->get(2));
$res = (string) Utils::curl_get($url, $timeout);
$this->lock();
$this->output .= Utils::writeInt($rID).Utils::writeShort(ASYNC_CURL_GET).Utils::writeInt(strlen($res)).$res;
$this->unlock();
break;
case ASYNC_CURL_POST:
$url = $this->get(Utils::readShort($this->get(2), false));
$timeout = Utils::readShort($this->get(2));
$cnt = Utils::readShort($this->get(2), false);
$d = array();
for($c = 0; $c < $cnt; ++$c){
$key = $this->get(Utils::readShort($this->get(2), false));
$d[$key] = $this->get(Utils::readShort($this->get(2), false));
}
$res = (string) Utils::curl_post($url, $d, $timeout);
$this->lock();
$this->output .= Utils::writeInt($rID).Utils::writeShort(ASYNC_CURL_POST).Utils::writeInt(strlen($res)).$res;
$this->unlock();
break;
}
}
usleep(10000);
}
}
}
class Async extends Thread{
public function __construct($method, $params = array()){
$this->method = $method;
$this->params = $params;