diff --git a/src/network/mcpe/NetworkSession.php b/src/network/mcpe/NetworkSession.php index 771d231e6..2b995ef8f 100644 --- a/src/network/mcpe/NetworkSession.php +++ b/src/network/mcpe/NetworkSession.php @@ -376,62 +376,67 @@ class NetworkSession{ return; } - if($this->incomingPacketBatchBudget <= 0){ - $this->updatePacketBudget(); - if($this->incomingPacketBatchBudget <= 0){ - throw new PacketHandlingException("Receiving packets too fast"); - } - } - $this->incomingPacketBatchBudget--; - - if($this->cipher !== null){ - Timings::$playerNetworkReceiveDecrypt->startTiming(); - try{ - $payload = $this->cipher->decrypt($payload); - }catch(DecryptionException $e){ - $this->logger->debug("Encrypted packet: " . base64_encode($payload)); - throw PacketHandlingException::wrap($e, "Packet decryption error"); - }finally{ - Timings::$playerNetworkReceiveDecrypt->stopTiming(); - } - } - - if($this->enableCompression){ - Timings::$playerNetworkReceiveDecompress->startTiming(); - try{ - $decompressed = $this->compressor->decompress($payload); - }catch(DecompressionException $e){ - $this->logger->debug("Failed to decompress packet: " . base64_encode($payload)); - throw PacketHandlingException::wrap($e, "Compressed packet batch decode error"); - }finally{ - Timings::$playerNetworkReceiveDecompress->stopTiming(); - } - }else{ - $decompressed = $payload; - } - + Timings::$playerNetworkReceive->startTiming(); try{ - $stream = new BinaryStream($decompressed); - $count = 0; - foreach(PacketBatch::decodeRaw($stream) as $buffer){ - if(++$count > 1300){ - throw new PacketHandlingException("Too many packets in batch"); - } - $packet = $this->packetPool->getPacket($buffer); - if($packet === null){ - $this->logger->debug("Unknown packet: " . base64_encode($buffer)); - throw new PacketHandlingException("Unknown packet received"); - } - try{ - $this->handleDataPacket($packet, $buffer); - }catch(PacketHandlingException $e){ - $this->logger->debug($packet->getName() . ": " . base64_encode($buffer)); - throw PacketHandlingException::wrap($e, "Error processing " . $packet->getName()); + if($this->incomingPacketBatchBudget <= 0){ + $this->updatePacketBudget(); + if($this->incomingPacketBatchBudget <= 0){ + throw new PacketHandlingException("Receiving packets too fast"); } } - }catch(PacketDecodeException $e){ - $this->logger->logException($e); - throw PacketHandlingException::wrap($e, "Packet batch decode error"); + $this->incomingPacketBatchBudget--; + + if($this->cipher !== null){ + Timings::$playerNetworkReceiveDecrypt->startTiming(); + try{ + $payload = $this->cipher->decrypt($payload); + }catch(DecryptionException $e){ + $this->logger->debug("Encrypted packet: " . base64_encode($payload)); + throw PacketHandlingException::wrap($e, "Packet decryption error"); + }finally{ + Timings::$playerNetworkReceiveDecrypt->stopTiming(); + } + } + + if($this->enableCompression){ + Timings::$playerNetworkReceiveDecompress->startTiming(); + try{ + $decompressed = $this->compressor->decompress($payload); + }catch(DecompressionException $e){ + $this->logger->debug("Failed to decompress packet: " . base64_encode($payload)); + throw PacketHandlingException::wrap($e, "Compressed packet batch decode error"); + }finally{ + Timings::$playerNetworkReceiveDecompress->stopTiming(); + } + }else{ + $decompressed = $payload; + } + + try{ + $stream = new BinaryStream($decompressed); + $count = 0; + foreach(PacketBatch::decodeRaw($stream) as $buffer){ + if(++$count > 1300){ + throw new PacketHandlingException("Too many packets in batch"); + } + $packet = $this->packetPool->getPacket($buffer); + if($packet === null){ + $this->logger->debug("Unknown packet: " . base64_encode($buffer)); + throw new PacketHandlingException("Unknown packet received"); + } + try{ + $this->handleDataPacket($packet, $buffer); + }catch(PacketHandlingException $e){ + $this->logger->debug($packet->getName() . ": " . base64_encode($buffer)); + throw PacketHandlingException::wrap($e, "Error processing " . $packet->getName()); + } + } + }catch(PacketDecodeException $e){ + $this->logger->logException($e); + throw PacketHandlingException::wrap($e, "Packet batch decode error"); + } + }finally{ + Timings::$playerNetworkReceive->stopTiming(); } } @@ -519,24 +524,29 @@ class NetworkSession{ private function flushSendBuffer(bool $immediate = false) : void{ if(count($this->sendBuffer) > 0){ - $syncMode = null; //automatic - if($immediate){ - $syncMode = true; - }elseif($this->forceAsyncCompression){ - $syncMode = false; - } + Timings::$playerNetworkSend->startTiming(); + try{ + $syncMode = null; //automatic + if($immediate){ + $syncMode = true; + }elseif($this->forceAsyncCompression){ + $syncMode = false; + } - $stream = new BinaryStream(); - self::encodePacketBatchTimed($stream, $this->packetSerializerContext, $this->sendBuffer); + $stream = new BinaryStream(); + self::encodePacketBatchTimed($stream, $this->packetSerializerContext, $this->sendBuffer); - if($this->enableCompression){ - $promise = $this->server->prepareBatch(new PacketBatch($stream->getBuffer()), $this->compressor, $syncMode); - }else{ - $promise = new CompressBatchPromise(); - $promise->resolve($stream->getBuffer()); + if($this->enableCompression){ + $promise = $this->server->prepareBatch(new PacketBatch($stream->getBuffer()), $this->compressor, $syncMode); + }else{ + $promise = new CompressBatchPromise(); + $promise->resolve($stream->getBuffer()); + } + $this->sendBuffer = []; + $this->queueCompressedNoBufferFlush($promise, $immediate); + }finally{ + Timings::$playerNetworkSend->stopTiming(); } - $this->sendBuffer = []; - $this->queueCompressedNoBufferFlush($promise, $immediate); } } @@ -549,35 +559,45 @@ class NetworkSession{ } public function queueCompressed(CompressBatchPromise $payload, bool $immediate = false) : void{ - $this->flushSendBuffer($immediate); //Maintain ordering if possible - $this->queueCompressedNoBufferFlush($payload, $immediate); + Timings::$playerNetworkSend->startTiming(); + try{ + $this->flushSendBuffer($immediate); //Maintain ordering if possible + $this->queueCompressedNoBufferFlush($payload, $immediate); + }finally{ + Timings::$playerNetworkSend->stopTiming(); + } } private function queueCompressedNoBufferFlush(CompressBatchPromise $payload, bool $immediate = false) : void{ - if($immediate){ - //Skips all queues - $this->sendEncoded($payload->getResult(), true); - }else{ - $this->compressedQueue->enqueue($payload); - $payload->onResolve(function(CompressBatchPromise $payload) : void{ - if($this->connected && $this->compressedQueue->bottom() === $payload){ - $this->compressedQueue->dequeue(); //result unused - $this->sendEncoded($payload->getResult()); + Timings::$playerNetworkSend->startTiming(); + try{ + if($immediate){ + //Skips all queues + $this->sendEncoded($payload->getResult(), true); + }else{ + $this->compressedQueue->enqueue($payload); + $payload->onResolve(function(CompressBatchPromise $payload) : void{ + if($this->connected && $this->compressedQueue->bottom() === $payload){ + $this->compressedQueue->dequeue(); //result unused + $this->sendEncoded($payload->getResult()); - while(!$this->compressedQueue->isEmpty()){ - /** @var CompressBatchPromise $current */ - $current = $this->compressedQueue->bottom(); - if($current->hasResult()){ - $this->compressedQueue->dequeue(); + while(!$this->compressedQueue->isEmpty()){ + /** @var CompressBatchPromise $current */ + $current = $this->compressedQueue->bottom(); + if($current->hasResult()){ + $this->compressedQueue->dequeue(); - $this->sendEncoded($current->getResult()); - }else{ - //can't send any more queued until this one is ready - break; + $this->sendEncoded($current->getResult()); + }else{ + //can't send any more queued until this one is ready + break; + } } } - } - }); + }); + } + }finally{ + Timings::$playerNetworkSend->stopTiming(); } }