Implemented Channeled packet sending

This commit is contained in:
Shoghi Cervantes
2015-04-14 18:24:40 +02:00
parent bb945446b7
commit 0b176b3fe0
29 changed files with 170 additions and 120 deletions

View File

@ -29,6 +29,7 @@ class CompressBatchedTask extends AsyncTask{
public $level = 7;
public $data;
public $final;
public $channel = 0;
public $targets = [];
public function onRun(){
@ -40,6 +41,6 @@ class CompressBatchedTask extends AsyncTask{
}
public function onCompletion(Server $server){
$server->broadcastPacketsCallback($this->final, $this->targets);
$server->broadcastPacketsCallback($this->final, $this->targets, $this->channel);
}
}

View File

@ -80,6 +80,16 @@ class Network{
public static $BATCH_THRESHOLD = 512;
const CHANNEL_NONE = 0;
const CHANNEL_PRIORITY = 1; //Priority channel, only to be used when it matters
const CHANNEL_WORLD_CHUNKS = 2; //Chunk sending
const CHANNEL_MOVEMENT = 3; //Movement sending
const CHANNEL_BLOCKS = 4; //Block updates or explosions
const CHANNEL_WORLD_EVENTS = 5; //Entity, level or tile entity events
const CHANNEL_ENTITY_SPAWNING = 6; //Entity spawn/despawn channel
const CHANNEL_TEXT = 7; //Chat and other text stuff
const CHANNEL_END = 31;
/** @var \SplFixedArray */
private $packetPool;

View File

@ -61,9 +61,6 @@ class RakLibInterface implements ServerInstance, AdvancedSourceInterface{
/** @var ServerHandler */
private $interface;
/** @var string[][] */
private $batchedPackets = [];
public function __construct(Server $server){
$this->server = $server;
@ -71,6 +68,10 @@ class RakLibInterface implements ServerInstance, AdvancedSourceInterface{
$this->rakLib = new RakLibServer($this->server->getLogger(), $this->server->getLoader(), $this->server->getPort(), $this->server->getIp() === "" ? "0.0.0.0" : $this->server->getIp());
$this->interface = new ServerHandler($this->rakLib, $this);
for($i = 0; $i < 256; ++$i){
$this->channelCounts[$i] = 0;
}
}
public function setNetwork(Network $network){
@ -79,7 +80,6 @@ class RakLibInterface implements ServerInstance, AdvancedSourceInterface{
public function doTick(){
if(!$this->rakLib->isTerminated()){
$this->sendBatchedPackets();
$this->interface->sendTick();
}else{
$info = $this->rakLib->getTerminationInfo();
@ -88,15 +88,6 @@ class RakLibInterface implements ServerInstance, AdvancedSourceInterface{
}
}
private function sendBatchedPackets(){
foreach($this->batchedPackets as $i => $p){
if($this->batchedPackets[$i] !== ""){
$this->server->batchPackets([$this->players[$i]], [$p]);
$this->batchedPackets[$i] = "";
}
}
}
public function process(){
$work = false;
if($this->interface->handlePacket()){
@ -115,7 +106,6 @@ class RakLibInterface implements ServerInstance, AdvancedSourceInterface{
$player = $this->players[$identifier];
$this->identifiers->detach($player);
unset($this->players[$identifier]);
unset($this->batchedPackets[$identifier]);
unset($this->identifiersACK[$identifier]);
if(!$player->closed){
$player->close($player->getLeaveMessage(), $reason);
@ -126,7 +116,6 @@ class RakLibInterface implements ServerInstance, AdvancedSourceInterface{
public function close(Player $player, $reason = "unknown reason"){
if(isset($this->identifiers[$player])){
unset($this->players[$this->identifiers[$player]]);
unset($this->batchedPackets[$this->identifiers[$player]]);
unset($this->identifiersACK[$this->identifiers[$player]]);
$this->interface->closeSession($this->identifiers[$player], $reason);
$this->identifiers->detach($player);
@ -149,7 +138,6 @@ class RakLibInterface implements ServerInstance, AdvancedSourceInterface{
$player = new $class($this, $ev->getClientId(), $ev->getAddress(), $ev->getPort());
$this->players[$identifier] = $player;
$this->identifiersACK[$identifier] = 0;
$this->batchedPackets[$identifier] = "";
$this->identifiers->attach($player, $identifier);
$this->server->addPlayer($identifier, $player);
}
@ -218,7 +206,13 @@ class RakLibInterface implements ServerInstance, AdvancedSourceInterface{
$packet->__encapsulatedPacket = new CachedEncapsulatedPacket;
$packet->__encapsulatedPacket->identifierACK = null;
$packet->__encapsulatedPacket->buffer = $packet->buffer;
$packet->__encapsulatedPacket->reliability = 2;
if($packet->getChannel() !== 0){
$packet->__encapsulatedPacket->reliability = 3;
$packet->__encapsulatedPacket->orderChannel = $packet->getChannel();
$packet->__encapsulatedPacket->orderIndex = 0;
}else{
$packet->__encapsulatedPacket->reliability = 2;
}
}
$pk = $packet->__encapsulatedPacket;
}
@ -226,14 +220,21 @@ class RakLibInterface implements ServerInstance, AdvancedSourceInterface{
if(!$immediate and !$needACK and $packet->pid() !== ProtocolInfo::BATCH_PACKET
and Network::$BATCH_THRESHOLD >= 0
and strlen($packet->buffer) >= Network::$BATCH_THRESHOLD){
$this->batchedPackets[$this->identifiers[$player]] .= $packet->buffer;
$this->server->batchPackets([$player], [$packet], false, $packet->getChannel());
return null;
}
if($pk === null){
$pk = new EncapsulatedPacket();
$pk->buffer = $packet->buffer;
$pk->reliability = 2;
if($packet->getChannel() !== 0){
$packet->reliability = 3;
$packet->orderChannel = $packet->getChannel();
$packet->orderIndex = 0;
}else{
$packet->reliability = 2;
}
if($needACK === true){
$pk->identifierACK = $this->identifiersACK[$identifier]++;
}

View File

@ -35,6 +35,7 @@ abstract class DataPacket extends \stdClass{
private $offset = 0;
public $buffer = "";
public $isEncoded = false;
private $channel = 0;
abstract public function pid();
@ -47,6 +48,15 @@ abstract class DataPacket extends \stdClass{
$this->offset = 0;
}
public function setChannel($channel){
$this->channel = (int) $channel;
return $this;
}
public function getChannel(){
return $this->channel;
}
public function setBuffer($buffer = ""){
$this->buffer = $buffer;
$this->offset = 0;