BatchPacket, compress any packet depending on the size, really fast threaded chunk sending

This commit is contained in:
Shoghi Cervantes
2015-03-25 23:50:55 +01:00
parent 7d1313c63d
commit 962c28aaca
16 changed files with 658 additions and 233 deletions

View File

@@ -75,6 +75,9 @@ use pocketmine\nbt\tag\Int;
use pocketmine\nbt\tag\Long;
use pocketmine\nbt\tag\Short;
use pocketmine\nbt\tag\String;
use pocketmine\network\CompressBatchedTask;
use pocketmine\network\Network;
use pocketmine\network\protocol\BatchPacket;
use pocketmine\network\protocol\DataPacket;
use pocketmine\network\query\QueryHandler;
use pocketmine\network\RakLibInterface;
@@ -191,10 +194,11 @@ class Server{
/** @var LevelMetadataStore */
private $levelMetadata;
/** @var SourceInterface[] */
private $interfaces = [];
/** @var RakLibInterface */
private $mainInterface;
/** @var Network */
private $network;
private $networkCompressionAsync = true;
private $networkCompressionLevel = 7;
private $serverID;
@@ -217,6 +221,8 @@ class Server{
/** @var Player[] */
private $players = [];
private $identifiers = [];
/** @var Level[] */
private $levels = [];
@@ -595,67 +601,54 @@ class Server{
return round((array_sum($this->useAverage) / count($this->useAverage)) * 100, 2);
}
/**
* @deprecated
*
* @param $address
* @param int $timeout
*/
public function blockAddress($address, $timeout = 300){
$this->network->blockAddress($address, $timeout);
}
/**
* @deprecated
*
* @param $address
* @param $port
* @param $payload
*/
public function sendPacket($address, $port, $payload){
$this->network->sendPacket($address, $port, $payload);
}
/**
* @deprecated
*
* @return SourceInterface[]
*/
public function getInterfaces(){
return $this->interfaces;
return $this->network->getInterfaces();
}
/**
* @deprecated
*
* @param SourceInterface $interface
*/
public function addInterface(SourceInterface $interface){
$this->interfaces[spl_object_hash($interface)] = $interface;
$this->network->registerInterface($interface);
}
/**
* @deprecated
*
* @param SourceInterface $interface
*/
public function removeInterface(SourceInterface $interface){
$interface->shutdown();
unset($this->interfaces[spl_object_hash($interface)]);
}
/**
* @param string $address
* @param int $port
* @param string $payload
*/
public function sendPacket($address, $port, $payload){
$this->mainInterface->putRaw($address, $port, $payload);
}
/**
* Blocks an IP address from the main interface. Setting timeout to -1 will block it forever
*
* @param string $address
* @param int $timeout
*/
public function blockAddress($address, $timeout = 300){
$this->mainInterface->blockAddress($address, $timeout);
}
/**
* @param string $address
* @param int $port
* @param string $payload
*/
public function handlePacket($address, $port, $payload){
try{
if(strlen($payload) > 2 and substr($payload, 0, 2) === "\xfe\xfd" and $this->queryHandler instanceof QueryHandler){
$this->queryHandler->handle($address, $port, $payload);
}
}catch(\Exception $e){
if(\pocketmine\DEBUG > 1){
if($this->logger instanceof MainLogger){
$this->logger->logException($e);
}
}
$this->blockAddress($address, 600);
}
//TODO: add raw packet events
$this->network->unregisterInterface($interface);
}
/**
@@ -893,6 +886,7 @@ class Server{
foreach($this->players as $identifier => $p){
if($player === $p){
unset($this->players[$identifier]);
unset($this->identifiers[spl_object_hash($player)]);
break;
}
}
@@ -1529,6 +1523,14 @@ class Server{
ServerScheduler::$WORKERS = $this->getProperty("settings.async-workers", ServerScheduler::$WORKERS);
if($this->getProperty("network.batch-threshold", 256) >= 0){
Network::$BATCH_THRESHOLD = (int) $this->getProperty("network.batch-threshold", 256);
}else{
Network::$BATCH_THRESHOLD = -1;
}
$this->networkCompressionLevel = $this->getProperty("network.compression-level", 7);
$this->networkCompressionAsync = $this->getProperty("network.async-compression", true);
$this->scheduler = new ServerScheduler();
if($this->getConfigBoolean("enable-rcon", false) === true){
@@ -1588,7 +1590,10 @@ class Server{
define("BOOTUP_RANDOM", @Utils::getRandomBytes(16));
$this->serverID = Binary::readLong(substr(Utils::getUniqueID(true, $this->getIp() . $this->getPort()), 0, 8));
$this->addInterface($this->mainInterface = new RakLibInterface($this));
$this->network = new Network($this);
$this->network->setName($this->getMotd());
$this->network->registerInterface(new RakLibInterface($this));
$this->logger->info("This server is running " . $this->getName() . " version " . ($version->isDev() ? TextFormat::YELLOW : "") . $version->get(true) . TextFormat::WHITE . " \"" . $this->getCodename() . "\" (API " . $this->getApiVersion() . ")");
$this->logger->info($this->getName() . " is distributed under the LGPL License");
@@ -1751,6 +1756,11 @@ class Server{
public static function broadcastPacket(array $players, DataPacket $packet){
$packet->encode();
$packet->isEncoded = true;
if(Network::$BATCH_THRESHOLD >= 0 and strlen($packet->buffer) >= Network::$BATCH_THRESHOLD){
Server::getInstance()->batchPackets($players, [$packet->buffer]);
return;
}
foreach($players as $player){
$player->dataPacket($packet);
}
@@ -1759,6 +1769,53 @@ class Server{
}
}
/**
* Broadcasts a list of packets in a batch to a list of players
*
* @param Player[] $players
* @param DataPacket[]|string $packets
*/
public function batchPackets(array $players, array $packets){
$str = "";
foreach($packets as $p){
if(is_object($p)){
$p->encode();
$str .= $p->buffer;
}else{
$str .= $p;
}
}
$targets = [];
foreach($players as $p){
$targets[] = $this->identifiers[spl_object_hash($p)];
}
if($this->networkCompressionAsync){
$task = new CompressBatchedTask();
$task->targets = $targets;
$task->data = $str;
$task->level = $this->networkCompressionLevel;
$this->getScheduler()->scheduleAsyncTask($task);
}else{
$this->broadcastPacketsCallback(zlib_encode($str, ZLIB_ENCODING_DEFLATE, $this->networkCompressionLevel), $targets);
}
}
public function broadcastPacketsCallback($data, array $identifiers){
$pk = new BatchPacket();
$pk->payload = $data;
$pk->encode();
$pk->isEncoded = true;
foreach($identifiers as $i){
if(isset($this->players[$i])){
$this->players[$i]->dataPacket($pk);
}
}
}
/**
* @param int $type
@@ -1902,7 +1959,7 @@ class Server{
$this->rcon->stop();
}
if($this->getProperty("settings.upnp-forwarding", false) === true){
if($this->getProperty("network.upnp-forwarding", false) === true){
$this->logger->info("[UPnP] Removing port forward...");
UPnP::RemovePortForward($this->getPort());
}
@@ -1930,8 +1987,9 @@ class Server{
$this->console->kill();
foreach($this->interfaces as $interface){
foreach($this->network->getInterfaces() as $interface){
$interface->shutdown();
$this->network->unregisterInterface($interface);
}
}catch(\Exception $e){
$this->logger->emergency("Crashed while crashing, killing process");
@@ -1960,7 +2018,7 @@ class Server{
}
if($this->getProperty("settings.upnp-forwarding", false) == true){
if($this->getProperty("network.upnp-forwarding", false) == true){
$this->logger->info("[UPnP] Trying to port forward...");
UPnP::PortForward($this->getPort());
}
@@ -2112,6 +2170,7 @@ class Server{
public function addPlayer($identifier, Player $player){
$this->players[$identifier] = $player;
$this->identifiers[spl_object_hash($player)] = $identifier;
}
private function checkTickUpdates($currentTick){
@@ -2137,6 +2196,7 @@ class Server{
$player->save();
}elseif(!$player->isConnected()){
unset($this->players[$index]);
unset($this->identifiers[spl_object_hash($player)]);
}
}
@@ -2178,7 +2238,7 @@ class Server{
"version" => $version->get(true),
"build" => $version->getBuild(),
"mc_version" => \pocketmine\MINECRAFT_VERSION,
"protocol" => network\protocol\Info::CURRENT_PROTOCOL,
"protocol" => \pocketmine\network\protocol\Info::CURRENT_PROTOCOL,
"online" => count($this->players),
"max" => $this->getMaxPlayers(),
"plugins" => $plist,
@@ -2188,7 +2248,7 @@ class Server{
}
public function getNetwork(){
return $this->mainInterface;
return $this->network;
}
private function titleTick(){
@@ -2203,10 +2263,12 @@ class Server{
$this->getPocketMineVersion() .
" | Online " . count($this->players) . "/" . $this->getMaxPlayers() .
" | Memory " . $usage .
" | U " . round($this->mainInterface->getUploadUsage() / 1024, 2) .
" D " . round($this->mainInterface->getDownloadUsage() / 1024, 2) .
" | U " . round($this->network->getUpload() / 1024, 2) .
" D " . round($this->network->getDownload() / 1024, 2) .
" kB/s | TPS " . $this->getTicksPerSecond() .
" | Load " . $this->getTickUsage() . "%\x07";
$this->network->resetStatistics();
}
public function getMemoryUsage($advanced = false){
@@ -2250,6 +2312,31 @@ class Server{
}
/**
* @param string $address
* @param int $port
* @param string $payload
*
* TODO: move this to Network
*/
public function handlePacket($address, $port, $payload){
try{
if(strlen($payload) > 2 and substr($payload, 0, 2) === "\xfe\xfd" and $this->queryHandler instanceof QueryHandler){
$this->queryHandler->handle($address, $port, $payload);
}
}catch(\Exception $e){
if(\pocketmine\DEBUG > 1){
if($this->logger instanceof MainLogger){
$this->logger->logException($e);
}
}
$this->blockAddress($address, 600);
}
//TODO: add raw packet events
}
/**
* Tries to execute a server tick
*/
@@ -2266,9 +2353,7 @@ class Server{
$this->checkConsole();
Timings::$connectionTimer->startTiming();
foreach($this->interfaces as $interface){
$interface->process();
}
$this->network->processInterfaces();
Timings::$connectionTimer->stopTiming();
Timings::$schedulerTimer->startTiming();