Implement send buffering and queuing for network sessions (#2358)

Async compression and broadcasts are now reliable and don't have race condition bugs.
This features improved performance and significantly reduced bandwidth wastage.

Reduce Level broadcast latency by ticking network after levels. This ensures that session buffers get flushed as soon as possible after level tick, if level broadcasts were done.
This commit is contained in:
Dylan K. Taylor 2018-08-13 14:37:18 +01:00 committed by GitHub
parent 22c8077bdf
commit 15bac8c58a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 225 additions and 87 deletions

View File

@ -90,6 +90,7 @@ use pocketmine\nbt\tag\ByteTag;
use pocketmine\nbt\tag\CompoundTag;
use pocketmine\nbt\tag\DoubleTag;
use pocketmine\nbt\tag\ListTag;
use pocketmine\network\mcpe\CompressBatchPromise;
use pocketmine\network\mcpe\NetworkCipher;
use pocketmine\network\mcpe\NetworkSession;
use pocketmine\network\mcpe\protocol\AdventureSettingsPacket;
@ -894,7 +895,7 @@ class Player extends Human implements CommandSender, ChunkLoader, IPlayer{
unset($this->loadQueue[$index]);
}
public function sendChunk(int $x, int $z, string $payload){
public function sendChunk(int $x, int $z, CompressBatchPromise $promise){
if(!$this->isConnected()){
return;
}
@ -902,7 +903,7 @@ class Player extends Human implements CommandSender, ChunkLoader, IPlayer{
$this->usedChunks[Level::chunkHash($x, $z)] = true;
$this->chunkLoadCount++;
$this->networkSession->sendEncoded($payload);
$this->networkSession->queueCompressed($promise);
if($this->spawned){
foreach($this->level->getChunkEntities($x, $z) as $entity){

View File

@ -71,7 +71,8 @@ use pocketmine\nbt\tag\LongTag;
use pocketmine\nbt\tag\ShortTag;
use pocketmine\nbt\tag\StringTag;
use pocketmine\network\AdvancedNetworkInterface;
use pocketmine\network\mcpe\CompressBatchedTask;
use pocketmine\network\mcpe\CompressBatchPromise;
use pocketmine\network\mcpe\CompressBatchTask;
use pocketmine\network\mcpe\NetworkCipher;
use pocketmine\network\mcpe\NetworkCompression;
use pocketmine\network\mcpe\NetworkSession;
@ -1899,55 +1900,54 @@ class Server{
$stream->putPacket($packet);
}
//TODO: if under the compression threshold, add to session buffers instead of batching (first we need to implement buffering!)
$this->batchPackets($targets, $stream);
if(NetworkCompression::$THRESHOLD < 0 or strlen($stream->buffer) < NetworkCompression::$THRESHOLD){
foreach($targets as $target){
foreach($ev->getPackets() as $pk){
$target->addToSendBuffer($pk);
}
}
}else{
$promise = $this->prepareBatch($stream);
foreach($targets as $target){
$target->queueCompressed($promise);
}
}
return true;
}
/**
* Broadcasts a list of packets in a batch to a list of players
*
* @param NetworkSession[] $targets
* @param PacketStream $stream
* @param bool $forceSync
* @param bool $immediate
* @param PacketStream $stream
* @param bool $forceSync
*
* @return CompressBatchPromise
*/
public function batchPackets(array $targets, PacketStream $stream, bool $forceSync = false, bool $immediate = false){
Timings::$playerNetworkSendCompressTimer->startTiming();
public function prepareBatch(PacketStream $stream, bool $forceSync = false) : CompressBatchPromise{
try{
Timings::$playerNetworkSendCompressTimer->startTiming();
if(!empty($targets)){
$compressionLevel = NetworkCompression::$LEVEL;
if(NetworkCompression::$THRESHOLD < 0 or strlen($stream->buffer) < NetworkCompression::$THRESHOLD){
$compressionLevel = 0; //Do not compress packets under the threshold
$forceSync = true;
}
if(!$forceSync and !$immediate and $this->networkCompressionAsync){
$task = new CompressBatchedTask($stream, $targets, $compressionLevel);
$promise = new CompressBatchPromise();
if(!$forceSync and $this->networkCompressionAsync){
$task = new CompressBatchTask($stream, $compressionLevel, $promise);
$this->asyncPool->submitTask($task);
}else{
$this->broadcastPacketsCallback(NetworkCompression::compress($stream->buffer), $targets, $immediate);
$promise->resolve(NetworkCompression::compress($stream->buffer));
}
}
Timings::$playerNetworkSendCompressTimer->stopTiming();
}
/**
* @param string $payload
* @param NetworkSession[] $sessions
* @param bool $immediate
*/
public function broadcastPacketsCallback(string $payload, array $sessions, bool $immediate = false){
/** @var NetworkSession $session */
foreach($sessions as $session){
if($session->isConnected()){
$session->sendEncoded($payload, $immediate);
}
return $promise;
}finally{
Timings::$playerNetworkSendCompressTimer->stopTiming();
}
}
/**
* @param int $type
*/
@ -2542,10 +2542,6 @@ class Server{
++$this->tickCounter;
Timings::$connectionTimer->startTiming();
$this->network->tick();
Timings::$connectionTimer->stopTiming();
Timings::$schedulerTimer->startTiming();
$this->pluginManager->tickSchedulers($this->tickCounter);
Timings::$schedulerTimer->stopTiming();
@ -2556,6 +2552,10 @@ class Server{
$this->checkTickUpdates($this->tickCounter);
Timings::$connectionTimer->startTiming();
$this->network->tick();
Timings::$connectionTimer->stopTiming();
if(($this->tickCounter % 20) === 0){
if($this->doTitleTick){
$this->titleTick();

View File

@ -25,6 +25,7 @@ namespace pocketmine\inventory;
use pocketmine\item\Item;
use pocketmine\item\ItemFactory;
use pocketmine\network\mcpe\CompressBatchPromise;
use pocketmine\network\mcpe\NetworkCompression;
use pocketmine\network\mcpe\PacketStream;
use pocketmine\network\mcpe\protocol\CraftingDataPacket;
@ -38,7 +39,7 @@ class CraftingManager{
/** @var FurnaceRecipe[] */
protected $furnaceRecipes = [];
/** @var string */
/** @var CompressBatchPromise */
private $craftingDataCache;
public function __construct(){
@ -105,16 +106,18 @@ class CraftingManager{
$batch = new PacketStream();
$batch->putPacket($pk);
$this->craftingDataCache = NetworkCompression::compress($batch->buffer);
$this->craftingDataCache = new CompressBatchPromise();
$this->craftingDataCache->resolve(NetworkCompression::compress($batch->buffer));
Timings::$craftingDataCacheRebuildTimer->stopTiming();
}
/**
* Returns a pre-compressed CraftingDataPacket for sending to players. Rebuilds the cache if it is not found.
*
* @return string
* @return CompressBatchPromise
*/
public function getCraftingDataPacket() : string{
public function getCraftingDataPacket() : CompressBatchPromise{
if($this->craftingDataCache === null){
$this->buildCraftingDataCache();
}

View File

@ -69,6 +69,7 @@ use pocketmine\metadata\MetadataValue;
use pocketmine\nbt\tag\ListTag;
use pocketmine\nbt\tag\StringTag;
use pocketmine\network\mcpe\ChunkRequestTask;
use pocketmine\network\mcpe\CompressBatchPromise;
use pocketmine\network\mcpe\protocol\DataPacket;
use pocketmine\network\mcpe\protocol\LevelEventPacket;
use pocketmine\network\mcpe\protocol\LevelSoundEventPacket;
@ -122,7 +123,7 @@ class Level implements ChunkManager, Metadatable{
/** @var Block[][] */
private $blockCache = [];
/** @var string[] */
/** @var CompressBatchPromise[] */
private $chunkCache = [];
/** @var int */
@ -2451,7 +2452,7 @@ class Level implements ChunkManager, Metadatable{
$this->chunkSendQueue[$index][$player->getLoaderId()] = $player;
}
private function sendChunkFromCache(int $x, int $z){
private function sendCachedChunk(int $x, int $z){
if(isset($this->chunkSendQueue[$index = Level::chunkHash($x, $z)])){
foreach($this->chunkSendQueue[$index] as $player){
/** @var Player $player */
@ -2479,10 +2480,12 @@ class Level implements ChunkManager, Metadatable{
continue;
}
}
if(isset($this->chunkCache[$index])){
$this->sendChunkFromCache($x, $z);
$this->sendCachedChunk($x, $z);
continue;
}
$this->timings->syncChunkSendPrepareTimer->startTiming();
$chunk = $this->chunks[$index] ?? null;
@ -2491,7 +2494,30 @@ class Level implements ChunkManager, Metadatable{
}
assert($chunk->getX() === $x and $chunk->getZ() === $z, "Chunk coordinate mismatch: expected $x $z, but chunk has coordinates " . $chunk->getX() . " " . $chunk->getZ() . ", did you forget to clone a chunk before setting?");
$this->server->getAsyncPool()->submitTask($task = new ChunkRequestTask($this, $x, $z, $chunk));
/*
* we don't send promises directly to the players here because unresolved promises of chunk sending
* would slow down the sending of other packets, especially if a chunk takes a long time to prepare.
*/
$promise = new CompressBatchPromise();
$promise->onResolve(function(CompressBatchPromise $promise) use ($x, $z, $index): void{
if(!$this->closed){
$this->timings->syncChunkSendTimer->startTiming();
unset($this->chunkSendTasks[$index]);
$this->chunkCache[$index] = $promise;
$this->sendCachedChunk($x, $z);
if(!$this->server->getMemoryManager()->canUseChunkCache()){
unset($this->chunkCache[$index]);
}
$this->timings->syncChunkSendTimer->stopTiming();
}else{
$this->server->getLogger()->debug("Dropped prepared chunk $x $z due to level not loaded");
}
});
$this->server->getAsyncPool()->submitTask($task = new ChunkRequestTask($x, $z, $chunk, $promise));
$this->chunkSendTasks[$index] = $task;
$this->timings->syncChunkSendPrepareTimer->stopTiming();
@ -2501,21 +2527,6 @@ class Level implements ChunkManager, Metadatable{
}
}
public function chunkRequestCallback(int $x, int $z, string $payload){
$this->timings->syncChunkSendTimer->startTiming();
$index = Level::chunkHash($x, $z);
unset($this->chunkSendTasks[$index]);
$this->chunkCache[$index] = $payload;
$this->sendChunkFromCache($x, $z);
if(!$this->server->getMemoryManager()->canUseChunkCache()){
unset($this->chunkCache[$index]);
}
$this->timings->syncChunkSendTimer->stopTiming();
}
/**
* @param Entity $entity
*

View File

@ -24,7 +24,6 @@ declare(strict_types=1);
namespace pocketmine\network\mcpe;
use pocketmine\level\format\Chunk;
use pocketmine\level\Level;
use pocketmine\network\mcpe\protocol\FullChunkDataPacket;
use pocketmine\scheduler\AsyncTask;
use pocketmine\Server;
@ -42,8 +41,7 @@ class ChunkRequestTask extends AsyncTask{
/** @var int */
protected $compressionLevel;
public function __construct(Level $level, int $chunkX, int $chunkZ, Chunk $chunk){
$this->storeLocal($level);
public function __construct(int $chunkX, int $chunkZ, Chunk $chunk, CompressBatchPromise $promise){
$this->compressionLevel = NetworkCompression::$LEVEL;
$this->chunk = $chunk->fastSerialize();
@ -59,6 +57,8 @@ class ChunkRequestTask extends AsyncTask{
}
$this->tiles = $tiles;
$this->storeLocal($promise);
}
public function onRun() : void{
@ -76,14 +76,8 @@ class ChunkRequestTask extends AsyncTask{
}
public function onCompletion(Server $server) : void{
/** @var Level $level */
$level = $this->fetchLocal();
if(!$level->isClosed()){
if($this->hasResult()){
$level->chunkRequestCallback($this->chunkX, $this->chunkZ, $this->getResult());
}else{
$level->getServer()->getLogger()->error("Chunk request for level " . $level->getName() . ", x=" . $this->chunkX . ", z=" . $this->chunkZ . " doesn't have any result data");
}
}
/** @var CompressBatchPromise $promise */
$promise = $this->fetchLocal();
$promise->resolve($this->getResult());
}
}

View File

@ -0,0 +1,62 @@
<?php
/*
*
* ____ _ _ __ __ _ __ __ ____
* | _ \ ___ ___| | _____| |_| \/ (_)_ __ ___ | \/ | _ \
* | |_) / _ \ / __| |/ / _ \ __| |\/| | | '_ \ / _ \_____| |\/| | |_) |
* | __/ (_) | (__| < __/ |_| | | | | | | | __/_____| | | | __/
* |_| \___/ \___|_|\_\___|\__|_| |_|_|_| |_|\___| |_| |_|_|
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* @author PocketMine Team
* @link http://www.pocketmine.net/
*
*
*/
declare(strict_types=1);
namespace pocketmine\network\mcpe;
class CompressBatchPromise{
/** @var callable[] */
private $callbacks = [];
/** @var string|null */
private $result = null;
public function onResolve(callable $callback) : void{
if($this->result !== null){
$callback($this);
}else{
$this->callbacks[] = $callback;
}
}
public function resolve(string $result) : void{
if($this->result !== null){
throw new \InvalidStateException("Cannot resolve promise more than once");
}
$this->result = $result;
foreach($this->callbacks as $callback){
$callback($this);
}
$this->callbacks = [];
}
public function getResult() : string{
if($this->result === null){
throw new \InvalidStateException("Promise has not yet been resolved");
}
return $this->result;
}
public function hasResult() : bool{
return $this->result !== null;
}
}

View File

@ -26,20 +26,20 @@ namespace pocketmine\network\mcpe;
use pocketmine\scheduler\AsyncTask;
use pocketmine\Server;
class CompressBatchedTask extends AsyncTask{
class CompressBatchTask extends AsyncTask{
private $level;
private $data;
/**
* @param PacketStream $stream
* @param NetworkSession[] $targets
* @param int $compressionLevel
* @param PacketStream $stream
* @param int $compressionLevel
* @param CompressBatchPromise $promise
*/
public function __construct(PacketStream $stream, array $targets, int $compressionLevel){
public function __construct(PacketStream $stream, int $compressionLevel, CompressBatchPromise $promise){
$this->data = $stream->buffer;
$this->level = $compressionLevel;
$this->storeLocal($targets);
$this->storeLocal($promise);
}
public function onRun() : void{
@ -47,9 +47,8 @@ class CompressBatchedTask extends AsyncTask{
}
public function onCompletion(Server $server) : void{
/** @var NetworkSession[] $targets */
$targets = $this->fetchLocal();
$server->broadcastPacketsCallback($this->getResult(), $targets);
/** @var CompressBatchPromise $promise */
$promise = $this->fetchLocal();
$promise->resolve($this->getResult());
}
}

View File

@ -68,12 +68,20 @@ class NetworkSession{
/** @var NetworkCipher */
private $cipher;
/** @var PacketStream|null */
private $sendBuffer;
/** @var \SplQueue|CompressBatchPromise[] */
private $compressedQueue;
public function __construct(Server $server, NetworkInterface $interface, string $ip, int $port){
$this->server = $server;
$this->interface = $interface;
$this->ip = $ip;
$this->port = $port;
$this->compressedQueue = new \SplQueue();
$this->connectTime = time();
$this->server->getNetwork()->scheduleSessionTick($this);
@ -206,10 +214,10 @@ class NetworkSession{
return false;
}
//TODO: implement buffering (this is just a quick fix)
$stream = new PacketStream();
$stream->putPacket($packet);
$this->server->batchPackets([$this], $stream, true, $immediate);
$this->addToSendBuffer($packet);
if($immediate){
$this->flushSendBuffer(true);
}
return true;
}finally{
@ -217,7 +225,62 @@ class NetworkSession{
}
}
public function sendEncoded(string $payload, bool $immediate = false) : void{
/**
* @internal
* @param DataPacket $packet
*/
public function addToSendBuffer(DataPacket $packet) : void{
$timings = Timings::getSendDataPacketTimings($packet);
$timings->startTiming();
try{
if($this->sendBuffer === null){
$this->sendBuffer = new PacketStream();
}
$this->sendBuffer->putPacket($packet);
$this->server->getNetwork()->scheduleSessionTick($this);
}finally{
$timings->stopTiming();
}
}
private function flushSendBuffer(bool $immediate = false) : void{
if($this->sendBuffer !== null){
$promise = $this->server->prepareBatch($this->sendBuffer, $immediate);
$this->sendBuffer = null;
$this->queueCompressed($promise, $immediate);
}
}
public function queueCompressed(CompressBatchPromise $payload, bool $immediate = false) : void{
$this->flushSendBuffer($immediate); //Maintain ordering if possible
if($immediate){
//Skips all queues
$this->sendEncoded($payload->getResult(), true);
}else{
$this->compressedQueue->enqueue($payload);
$payload->onResolve(function(CompressBatchPromise $payload) : void{
if($this->connected and $this->compressedQueue->bottom() === $payload){
$this->compressedQueue->dequeue(); //result unused
$this->sendEncoded($payload->getResult());
while(!$this->compressedQueue->isEmpty()){
/** @var CompressBatchPromise $current */
$current = $this->compressedQueue->bottom();
if($current->hasResult()){
$this->compressedQueue->dequeue();
$this->sendEncoded($current->getResult());
}else{
//can't send any more queued until this one is ready
break;
}
}
}
});
}
}
private function sendEncoded(string $payload, bool $immediate = false) : void{
if($this->cipher !== null){
Timings::$playerNetworkSendEncryptTimer->startTiming();
$payload = $this->cipher->encrypt($payload);
@ -289,6 +352,8 @@ class NetworkSession{
$this->handler = null;
$this->interface = null;
$this->player = null;
$this->sendBuffer = null;
$this->compressedQueue = null;
}
public function enableEncryption(string $encryptionKey, string $handshakeJwt) : void{
@ -345,7 +410,10 @@ class NetworkSession{
return true; //keep ticking until timeout
}
//TODO: more stuff on tick
if($this->sendBuffer !== null){
$this->flushSendBuffer();
}
return false;
}
}

View File

@ -87,7 +87,7 @@ class PreSpawnSessionHandler extends SessionHandler{
$this->player->sendAllInventories();
$this->player->getInventory()->sendCreativeContents();
$this->player->getInventory()->sendHeldItem($this->player);
$this->session->sendEncoded($this->server->getCraftingManager()->getCraftingDataPacket());
$this->session->queueCompressed($this->server->getCraftingManager()->getCraftingDataPacket());
$this->server->sendFullPlayerListData($this->player);
}