mirror of
https://github.com/pmmp/PocketMine-MP.git
synced 2025-06-11 05:55:33 +00:00
NetworkSession: fixed some segments of recv/send logic not being covered by their respective network timingsÂ
This commit is contained in:
parent
d5e92b4ae6
commit
4dbcd714bd
@ -376,62 +376,67 @@ class NetworkSession{
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
if($this->incomingPacketBatchBudget <= 0){
|
Timings::$playerNetworkReceive->startTiming();
|
||||||
$this->updatePacketBudget();
|
|
||||||
if($this->incomingPacketBatchBudget <= 0){
|
|
||||||
throw new PacketHandlingException("Receiving packets too fast");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
$this->incomingPacketBatchBudget--;
|
|
||||||
|
|
||||||
if($this->cipher !== null){
|
|
||||||
Timings::$playerNetworkReceiveDecrypt->startTiming();
|
|
||||||
try{
|
|
||||||
$payload = $this->cipher->decrypt($payload);
|
|
||||||
}catch(DecryptionException $e){
|
|
||||||
$this->logger->debug("Encrypted packet: " . base64_encode($payload));
|
|
||||||
throw PacketHandlingException::wrap($e, "Packet decryption error");
|
|
||||||
}finally{
|
|
||||||
Timings::$playerNetworkReceiveDecrypt->stopTiming();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if($this->enableCompression){
|
|
||||||
Timings::$playerNetworkReceiveDecompress->startTiming();
|
|
||||||
try{
|
|
||||||
$decompressed = $this->compressor->decompress($payload);
|
|
||||||
}catch(DecompressionException $e){
|
|
||||||
$this->logger->debug("Failed to decompress packet: " . base64_encode($payload));
|
|
||||||
throw PacketHandlingException::wrap($e, "Compressed packet batch decode error");
|
|
||||||
}finally{
|
|
||||||
Timings::$playerNetworkReceiveDecompress->stopTiming();
|
|
||||||
}
|
|
||||||
}else{
|
|
||||||
$decompressed = $payload;
|
|
||||||
}
|
|
||||||
|
|
||||||
try{
|
try{
|
||||||
$stream = new BinaryStream($decompressed);
|
if($this->incomingPacketBatchBudget <= 0){
|
||||||
$count = 0;
|
$this->updatePacketBudget();
|
||||||
foreach(PacketBatch::decodeRaw($stream) as $buffer){
|
if($this->incomingPacketBatchBudget <= 0){
|
||||||
if(++$count > 1300){
|
throw new PacketHandlingException("Receiving packets too fast");
|
||||||
throw new PacketHandlingException("Too many packets in batch");
|
|
||||||
}
|
|
||||||
$packet = $this->packetPool->getPacket($buffer);
|
|
||||||
if($packet === null){
|
|
||||||
$this->logger->debug("Unknown packet: " . base64_encode($buffer));
|
|
||||||
throw new PacketHandlingException("Unknown packet received");
|
|
||||||
}
|
|
||||||
try{
|
|
||||||
$this->handleDataPacket($packet, $buffer);
|
|
||||||
}catch(PacketHandlingException $e){
|
|
||||||
$this->logger->debug($packet->getName() . ": " . base64_encode($buffer));
|
|
||||||
throw PacketHandlingException::wrap($e, "Error processing " . $packet->getName());
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}catch(PacketDecodeException $e){
|
$this->incomingPacketBatchBudget--;
|
||||||
$this->logger->logException($e);
|
|
||||||
throw PacketHandlingException::wrap($e, "Packet batch decode error");
|
if($this->cipher !== null){
|
||||||
|
Timings::$playerNetworkReceiveDecrypt->startTiming();
|
||||||
|
try{
|
||||||
|
$payload = $this->cipher->decrypt($payload);
|
||||||
|
}catch(DecryptionException $e){
|
||||||
|
$this->logger->debug("Encrypted packet: " . base64_encode($payload));
|
||||||
|
throw PacketHandlingException::wrap($e, "Packet decryption error");
|
||||||
|
}finally{
|
||||||
|
Timings::$playerNetworkReceiveDecrypt->stopTiming();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if($this->enableCompression){
|
||||||
|
Timings::$playerNetworkReceiveDecompress->startTiming();
|
||||||
|
try{
|
||||||
|
$decompressed = $this->compressor->decompress($payload);
|
||||||
|
}catch(DecompressionException $e){
|
||||||
|
$this->logger->debug("Failed to decompress packet: " . base64_encode($payload));
|
||||||
|
throw PacketHandlingException::wrap($e, "Compressed packet batch decode error");
|
||||||
|
}finally{
|
||||||
|
Timings::$playerNetworkReceiveDecompress->stopTiming();
|
||||||
|
}
|
||||||
|
}else{
|
||||||
|
$decompressed = $payload;
|
||||||
|
}
|
||||||
|
|
||||||
|
try{
|
||||||
|
$stream = new BinaryStream($decompressed);
|
||||||
|
$count = 0;
|
||||||
|
foreach(PacketBatch::decodeRaw($stream) as $buffer){
|
||||||
|
if(++$count > 1300){
|
||||||
|
throw new PacketHandlingException("Too many packets in batch");
|
||||||
|
}
|
||||||
|
$packet = $this->packetPool->getPacket($buffer);
|
||||||
|
if($packet === null){
|
||||||
|
$this->logger->debug("Unknown packet: " . base64_encode($buffer));
|
||||||
|
throw new PacketHandlingException("Unknown packet received");
|
||||||
|
}
|
||||||
|
try{
|
||||||
|
$this->handleDataPacket($packet, $buffer);
|
||||||
|
}catch(PacketHandlingException $e){
|
||||||
|
$this->logger->debug($packet->getName() . ": " . base64_encode($buffer));
|
||||||
|
throw PacketHandlingException::wrap($e, "Error processing " . $packet->getName());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}catch(PacketDecodeException $e){
|
||||||
|
$this->logger->logException($e);
|
||||||
|
throw PacketHandlingException::wrap($e, "Packet batch decode error");
|
||||||
|
}
|
||||||
|
}finally{
|
||||||
|
Timings::$playerNetworkReceive->stopTiming();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -519,24 +524,29 @@ class NetworkSession{
|
|||||||
|
|
||||||
private function flushSendBuffer(bool $immediate = false) : void{
|
private function flushSendBuffer(bool $immediate = false) : void{
|
||||||
if(count($this->sendBuffer) > 0){
|
if(count($this->sendBuffer) > 0){
|
||||||
$syncMode = null; //automatic
|
Timings::$playerNetworkSend->startTiming();
|
||||||
if($immediate){
|
try{
|
||||||
$syncMode = true;
|
$syncMode = null; //automatic
|
||||||
}elseif($this->forceAsyncCompression){
|
if($immediate){
|
||||||
$syncMode = false;
|
$syncMode = true;
|
||||||
}
|
}elseif($this->forceAsyncCompression){
|
||||||
|
$syncMode = false;
|
||||||
|
}
|
||||||
|
|
||||||
$stream = new BinaryStream();
|
$stream = new BinaryStream();
|
||||||
self::encodePacketBatchTimed($stream, $this->packetSerializerContext, $this->sendBuffer);
|
self::encodePacketBatchTimed($stream, $this->packetSerializerContext, $this->sendBuffer);
|
||||||
|
|
||||||
if($this->enableCompression){
|
if($this->enableCompression){
|
||||||
$promise = $this->server->prepareBatch(new PacketBatch($stream->getBuffer()), $this->compressor, $syncMode);
|
$promise = $this->server->prepareBatch(new PacketBatch($stream->getBuffer()), $this->compressor, $syncMode);
|
||||||
}else{
|
}else{
|
||||||
$promise = new CompressBatchPromise();
|
$promise = new CompressBatchPromise();
|
||||||
$promise->resolve($stream->getBuffer());
|
$promise->resolve($stream->getBuffer());
|
||||||
|
}
|
||||||
|
$this->sendBuffer = [];
|
||||||
|
$this->queueCompressedNoBufferFlush($promise, $immediate);
|
||||||
|
}finally{
|
||||||
|
Timings::$playerNetworkSend->stopTiming();
|
||||||
}
|
}
|
||||||
$this->sendBuffer = [];
|
|
||||||
$this->queueCompressedNoBufferFlush($promise, $immediate);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -549,35 +559,45 @@ class NetworkSession{
|
|||||||
}
|
}
|
||||||
|
|
||||||
public function queueCompressed(CompressBatchPromise $payload, bool $immediate = false) : void{
|
public function queueCompressed(CompressBatchPromise $payload, bool $immediate = false) : void{
|
||||||
$this->flushSendBuffer($immediate); //Maintain ordering if possible
|
Timings::$playerNetworkSend->startTiming();
|
||||||
$this->queueCompressedNoBufferFlush($payload, $immediate);
|
try{
|
||||||
|
$this->flushSendBuffer($immediate); //Maintain ordering if possible
|
||||||
|
$this->queueCompressedNoBufferFlush($payload, $immediate);
|
||||||
|
}finally{
|
||||||
|
Timings::$playerNetworkSend->stopTiming();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private function queueCompressedNoBufferFlush(CompressBatchPromise $payload, bool $immediate = false) : void{
|
private function queueCompressedNoBufferFlush(CompressBatchPromise $payload, bool $immediate = false) : void{
|
||||||
if($immediate){
|
Timings::$playerNetworkSend->startTiming();
|
||||||
//Skips all queues
|
try{
|
||||||
$this->sendEncoded($payload->getResult(), true);
|
if($immediate){
|
||||||
}else{
|
//Skips all queues
|
||||||
$this->compressedQueue->enqueue($payload);
|
$this->sendEncoded($payload->getResult(), true);
|
||||||
$payload->onResolve(function(CompressBatchPromise $payload) : void{
|
}else{
|
||||||
if($this->connected && $this->compressedQueue->bottom() === $payload){
|
$this->compressedQueue->enqueue($payload);
|
||||||
$this->compressedQueue->dequeue(); //result unused
|
$payload->onResolve(function(CompressBatchPromise $payload) : void{
|
||||||
$this->sendEncoded($payload->getResult());
|
if($this->connected && $this->compressedQueue->bottom() === $payload){
|
||||||
|
$this->compressedQueue->dequeue(); //result unused
|
||||||
|
$this->sendEncoded($payload->getResult());
|
||||||
|
|
||||||
while(!$this->compressedQueue->isEmpty()){
|
while(!$this->compressedQueue->isEmpty()){
|
||||||
/** @var CompressBatchPromise $current */
|
/** @var CompressBatchPromise $current */
|
||||||
$current = $this->compressedQueue->bottom();
|
$current = $this->compressedQueue->bottom();
|
||||||
if($current->hasResult()){
|
if($current->hasResult()){
|
||||||
$this->compressedQueue->dequeue();
|
$this->compressedQueue->dequeue();
|
||||||
|
|
||||||
$this->sendEncoded($current->getResult());
|
$this->sendEncoded($current->getResult());
|
||||||
}else{
|
}else{
|
||||||
//can't send any more queued until this one is ready
|
//can't send any more queued until this one is ready
|
||||||
break;
|
break;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
});
|
||||||
});
|
}
|
||||||
|
}finally{
|
||||||
|
Timings::$playerNetworkSend->stopTiming();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user