From dbd1f3f96ee1d1674f2b4bc4da94b0d2d2a840f1 Mon Sep 17 00:00:00 2001 From: Shoghi Cervantes Date: Thu, 11 Sep 2014 18:05:01 +0200 Subject: [PATCH] Use pthreads interface on Chunk Generation THread, remove IPC sockets, improve performance --- .../generator/GenerationChunkManager.php | 48 +++++-- .../level/generator/GenerationManager.php | 129 ++++++++---------- .../generator/GenerationRequestManager.php | 36 ++--- .../level/generator/GenerationThread.php | 54 +++++--- 4 files changed, 141 insertions(+), 126 deletions(-) diff --git a/src/pocketmine/level/generator/GenerationChunkManager.php b/src/pocketmine/level/generator/GenerationChunkManager.php index 61304a454..5c8e7898d 100644 --- a/src/pocketmine/level/generator/GenerationChunkManager.php +++ b/src/pocketmine/level/generator/GenerationChunkManager.php @@ -75,10 +75,15 @@ class GenerationChunkManager implements ChunkManager{ * @param $chunkZ * * @return FullChunk + * + * @throws \Exception */ public function getChunk($chunkX, $chunkZ){ $index = Level::chunkHash($chunkX, $chunkZ); $chunk = !isset($this->chunks[$index]) ? $this->requestChunk($chunkX, $chunkZ) : $this->chunks[$index]; + if($chunk === null){ + throw new \Exception("null chunk received"); + } $this->changes[$index] = $chunk; return $chunk; @@ -134,21 +139,36 @@ class GenerationChunkManager implements ChunkManager{ } public function isChunkGenerated($chunkX, $chunkZ){ - return $this->getChunk($chunkX, $chunkZ)->isGenerated(); + try{ + return $this->getChunk($chunkX, $chunkZ)->isGenerated(); + }catch(\Exception $e){ + return false; + } } public function isChunkPopulated($chunkX, $chunkZ){ - return $this->getChunk($chunkX, $chunkZ)->isPopulated(); + try{ + return $this->getChunk($chunkX, $chunkZ)->isPopulated(); + }catch(\Exception $e){ + return false; + } } public function setChunkGenerated($chunkX, $chunkZ){ $chunk = $this->getChunk($chunkX, $chunkZ); $chunk->setGenerated(true); + try{ + $chunk = $this->getChunk($chunkX, $chunkZ); + $chunk->setGenerated(true); + }catch(\Exception $e){} } public function setChunkPopulated($chunkX, $chunkZ){ - $chunk = $this->getChunk($chunkX, $chunkZ); - $chunk->setPopulated(true); + + try{ + $chunk = $this->getChunk($chunkX, $chunkZ); + $chunk->setPopulated(true); + }catch(\Exception $e){} } protected function requestChunk($chunkX, $chunkZ){ @@ -181,7 +201,11 @@ class GenerationChunkManager implements ChunkManager{ * @return int 0-255 */ public function getBlockIdAt($x, $y, $z){ - return $this->getChunk($x >> 4, $z >> 4)->getBlockId($x & 0x0f, $y & 0x7f, $z & 0x0f); + try{ + return $this->getChunk($x >> 4, $z >> 4)->getBlockId($x & 0x0f, $y & 0x7f, $z & 0x0f); + }catch(\Exception $e){ + return 0; + } } /** @@ -193,7 +217,9 @@ class GenerationChunkManager implements ChunkManager{ * @param int $id 0-255 */ public function setBlockIdAt($x, $y, $z, $id){ - $this->getChunk($x >> 4, $z >> 4)->setBlockId($x & 0x0f, $y & 0x7f, $z & 0x0f, $id & 0xff); + try{ + $this->getChunk($x >> 4, $z >> 4)->setBlockId($x & 0x0f, $y & 0x7f, $z & 0x0f, $id & 0xff); + }catch(\Exception $e){} } /** @@ -206,7 +232,11 @@ class GenerationChunkManager implements ChunkManager{ * @return int 0-15 */ public function getBlockDataAt($x, $y, $z){ - return $this->getChunk($x >> 4, $z >> 4)->getBlockData($x & 0x0f, $y & 0x7f, $z & 0x0f); + try{ + return $this->getChunk($x >> 4, $z >> 4)->getBlockData($x & 0x0f, $y & 0x7f, $z & 0x0f); + }catch(\Exception $e){ + return 0; + } } /** @@ -218,7 +248,9 @@ class GenerationChunkManager implements ChunkManager{ * @param int $data 0-15 */ public function setBlockDataAt($x, $y, $z, $data){ - $this->getChunk($x >> 4, $z >> 4)->setBlockData($x & 0x0f, $y & 0x7f, $z & 0x0f, $data & 0x0f); + try{ + $this->getChunk($x >> 4, $z >> 4)->setBlockData($x & 0x0f, $y & 0x7f, $z & 0x0f, $data & 0x0f); + }catch(\Exception $e){} } public function shutdown(){ diff --git a/src/pocketmine/level/generator/GenerationManager.php b/src/pocketmine/level/generator/GenerationManager.php index 95278e790..273446137 100644 --- a/src/pocketmine/level/generator/GenerationManager.php +++ b/src/pocketmine/level/generator/GenerationManager.php @@ -89,8 +89,9 @@ class GenerationManager{ */ const PACKET_SHUTDOWN = 0xff; + /** @var GenerationThread */ + protected $thread; - protected $socket; /** @var \Logger */ protected $logger; /** @var \ClassLoader */ @@ -110,12 +111,12 @@ class GenerationManager{ protected $shutdown = false; /** - * @param resource $socket - * @param \Logger $logger - * @param \ClassLoader $loader + * @param GenerationThread $thread + * @param \Logger $logger + * @param \ClassLoader $loader */ - public function __construct($socket, \Logger $logger, \ClassLoader $loader){ - $this->socket = $socket; + public function __construct(GenerationThread $thread, \Logger $logger, \ClassLoader $loader){ + $this->thread = $thread; $this->logger = $logger; $this->loader = $loader; $chunkX = $chunkZ = null; @@ -204,7 +205,7 @@ class GenerationManager{ public function requestChunk($levelID, $chunkX, $chunkZ){ $this->needsChunk[$levelID] = [$chunkX, $chunkZ]; $binary = chr(self::PACKET_REQUEST_CHUNK) . Binary::writeInt($levelID) . Binary::writeInt($chunkX) . Binary::writeInt($chunkZ); - @socket_write($this->socket, Binary::writeInt(strlen($binary)) . $binary); + $this->thread->pushThreadToMainPacket($binary); do{ $this->readPacket(); @@ -221,76 +222,60 @@ class GenerationManager{ public function sendChunk($levelID, FullChunk $chunk){ $binary = chr(self::PACKET_SEND_CHUNK) . Binary::writeInt($levelID) . chr(strlen($class = get_class($chunk))) . $class . $chunk->toBinary(); - @socket_write($this->socket, Binary::writeInt(strlen($binary)) . $binary); - } - - protected function socketRead($len){ - $buffer = ""; - while(strlen($buffer) < $len){ - $buffer .= @socket_read($this->socket, $len - strlen($buffer)); - } - - return $buffer; + $this->thread->pushThreadToMainPacket($binary); } protected function readPacket(){ - $len = $this->socketRead(4); - if(($len = Binary::readInt($len)) <= 0){ - $this->shutdown = true; - $this->getLogger()->critical("Generation Thread found a stream error, shutting down"); + if(strlen($packet = $this->thread->readMainToThreadPacket()) > 0){ + $pid = ord($packet{0}); + $offset = 1; + if($pid === self::PACKET_REQUEST_CHUNK){ + $levelID = Binary::readInt(substr($packet, $offset, 4)); + $offset += 4; + $chunkX = Binary::readInt(substr($packet, $offset, 4)); + $offset += 4; + $chunkZ = Binary::readInt(substr($packet, $offset, 4)); + $this->enqueueChunk($levelID, $chunkX, $chunkZ); + }elseif($pid === self::PACKET_SEND_CHUNK){ + $levelID = Binary::readInt(substr($packet, $offset, 4)); + $offset += 4; + $len = ord($packet{$offset++}); + /** @var FullChunk $class */ + $class = substr($packet, $offset, $len); + $offset += $len; + $chunk = $class::fromBinary(substr($packet, $offset)); + $this->receiveChunk($levelID, $chunk); + }elseif($pid === self::PACKET_OPEN_LEVEL){ + $levelID = Binary::readInt(substr($packet, $offset, 4)); + $offset += 4; + $seed = Binary::readInt(substr($packet, $offset, 4)); + $offset += 4; + $len = Binary::readShort(substr($packet, $offset, 2)); + $offset += 2; + $class = substr($packet, $offset, $len); + $offset += $len; + $options = unserialize(substr($packet, $offset)); + $this->openLevel($levelID, $seed, $class, $options); + }elseif($pid === self::PACKET_CLOSE_LEVEL){ + $levelID = Binary::readInt(substr($packet, $offset, 4)); + $this->closeLevel($levelID); + }elseif($pid === self::PACKET_ADD_NAMESPACE){ + $len = Binary::readShort(substr($packet, $offset, 2)); + $offset += 2; + $namespace = substr($packet, $offset, $len); + $offset += $len; + $path = substr($packet, $offset); + $this->loader->addPath($path); + }elseif($pid === self::PACKET_SHUTDOWN){ + foreach($this->levels as $level){ + $level->shutdown(); + } + $this->levels = []; - return; - } - - $packet = $this->socketRead($len); - - $pid = ord($packet{0}); - $offset = 1; - if($pid === self::PACKET_REQUEST_CHUNK){ - $levelID = Binary::readInt(substr($packet, $offset, 4)); - $offset += 4; - $chunkX = Binary::readInt(substr($packet, $offset, 4)); - $offset += 4; - $chunkZ = Binary::readInt(substr($packet, $offset, 4)); - $this->enqueueChunk($levelID, $chunkX, $chunkZ); - }elseif($pid === self::PACKET_SEND_CHUNK){ - $levelID = Binary::readInt(substr($packet, $offset, 4)); - $offset += 4; - $len = ord($packet{$offset++}); - /** @var FullChunk $class */ - $class = substr($packet, $offset, $len); - $offset += $len; - $chunk = $class::fromBinary(substr($packet, $offset)); - $this->receiveChunk($levelID, $chunk); - }elseif($pid === self::PACKET_OPEN_LEVEL){ - $levelID = Binary::readInt(substr($packet, $offset, 4)); - $offset += 4; - $seed = Binary::readInt(substr($packet, $offset, 4)); - $offset += 4; - $len = Binary::readShort(substr($packet, $offset, 2)); - $offset += 2; - $class = substr($packet, $offset, $len); - $offset += $len; - $options = unserialize(substr($packet, $offset)); - $this->openLevel($levelID, $seed, $class, $options); - }elseif($pid === self::PACKET_CLOSE_LEVEL){ - $levelID = Binary::readInt(substr($packet, $offset, 4)); - $this->closeLevel($levelID); - }elseif($pid === self::PACKET_ADD_NAMESPACE){ - $len = Binary::readShort(substr($packet, $offset, 2)); - $offset += 2; - $namespace = substr($packet, $offset, $len); - $offset += $len; - $path = substr($packet, $offset); - $this->loader->addPath($path); - }elseif($pid === self::PACKET_SHUTDOWN){ - foreach($this->levels as $level){ - $level->shutdown(); + $this->shutdown = true; } - $this->levels = []; - - $this->shutdown = true; - socket_close($this->socket); + }elseif(count($this->thread->getInternalQueue()) === 0){ + $this->thread->wait(50000); } } diff --git a/src/pocketmine/level/generator/GenerationRequestManager.php b/src/pocketmine/level/generator/GenerationRequestManager.php index 913aac485..d7b0c25fc 100644 --- a/src/pocketmine/level/generator/GenerationRequestManager.php +++ b/src/pocketmine/level/generator/GenerationRequestManager.php @@ -28,7 +28,6 @@ use pocketmine\utils\Binary; class GenerationRequestManager{ - protected $socket; /** @var Server */ protected $server; /** @var GenerationThread */ @@ -40,7 +39,6 @@ class GenerationRequestManager{ public function __construct(Server $server){ $this->server = $server; $this->generationThread = new GenerationThread($server->getLogger(), $server->getLoader()); - $this->socket = $this->generationThread->getExternalSocket(); } /** @@ -52,7 +50,7 @@ class GenerationRequestManager{ $buffer = chr(GenerationManager::PACKET_OPEN_LEVEL) . Binary::writeInt($level->getID()) . Binary::writeInt($level->getSeed()) . Binary::writeShort(strlen($generator)) . $generator . serialize($options); - @socket_write($this->socket, Binary::writeInt(strlen($buffer)) . $buffer); + $this->generationThread->pushMainToThreadPacket($buffer); } /** @@ -60,31 +58,22 @@ class GenerationRequestManager{ */ public function closeLevel(Level $level){ $buffer = chr(GenerationManager::PACKET_CLOSE_LEVEL) . Binary::writeInt($level->getID()); - @socket_write($this->socket, Binary::writeInt(strlen($buffer)) . $buffer); + $this->generationThread->pushMainToThreadPacket($buffer); } public function addNamespace($namespace, $path){ $buffer = chr(GenerationManager::PACKET_ADD_NAMESPACE) . Binary::writeShort(strlen($namespace)) . $namespace . $path; - @socket_write($this->socket, Binary::writeInt(strlen($buffer)) . $buffer); - } - - protected function socketRead($len){ - $buffer = ""; - while(strlen($buffer) < $len){ - $buffer .= @socket_read($this->socket, $len - strlen($buffer)); - } - - return $buffer; + $this->generationThread->pushMainToThreadPacket($buffer); } protected function sendChunk($levelID, FullChunk $chunk){ - $binary = chr(GenerationManager::PACKET_SEND_CHUNK) . Binary::writeInt($levelID) . chr(strlen($class = get_class($chunk))) . $class . $chunk->toBinary(); - @socket_write($this->socket, Binary::writeInt(strlen($binary)) . $binary); + $buffer = chr(GenerationManager::PACKET_SEND_CHUNK) . Binary::writeInt($levelID) . chr(strlen($class = get_class($chunk))) . $class . $chunk->toBinary(); + $this->generationThread->pushMainToThreadPacket($buffer); } public function requestChunk(Level $level, $chunkX, $chunkZ){ $buffer = chr(GenerationManager::PACKET_REQUEST_CHUNK) . Binary::writeInt($level->getID()) . Binary::writeInt($chunkX) . Binary::writeInt($chunkZ); - @socket_write($this->socket, Binary::writeInt(strlen($buffer)) . $buffer); + $this->generationThread->pushMainToThreadPacket($buffer); } protected function handleRequest($levelID, $chunkX, $chunkZ){ @@ -97,7 +86,7 @@ class GenerationRequestManager{ } }else{ $buffer = chr(GenerationManager::PACKET_CLOSE_LEVEL) . Binary::writeInt($levelID); - @socket_write($this->socket, Binary::writeInt(strlen($buffer)) . $buffer); + $this->generationThread->pushMainToThreadPacket($buffer); } } @@ -106,17 +95,12 @@ class GenerationRequestManager{ $level->generateChunkCallback($chunk->getX(), $chunk->getZ(), $chunk); }else{ $buffer = chr(GenerationManager::PACKET_CLOSE_LEVEL) . Binary::writeInt($levelID); - @socket_write($this->socket, Binary::writeInt(strlen($buffer)) . $buffer); + $this->generationThread->pushMainToThreadPacket($buffer); } } public function handlePackets(){ - if(($len = @socket_read($this->socket, 4)) !== false and $len !== ""){ - if(strlen($len) < 4){ - $len .= $this->socketRead(4 - strlen($len)); - } - - $packet = $this->socketRead(Binary::readInt($len)); + while(strlen($packet = $this->generationThread->readThreadToMainPacket()) > 0){ $pid = ord($packet{0}); $offset = 1; @@ -145,7 +129,7 @@ class GenerationRequestManager{ public function shutdown(){ $buffer = chr(GenerationManager::PACKET_SHUTDOWN); - @socket_write($this->socket, Binary::writeInt(strlen($buffer)) . $buffer); + $this->generationThread->pushMainToThreadPacket($buffer); $this->generationThread->join(); } diff --git a/src/pocketmine/level/generator/GenerationThread.php b/src/pocketmine/level/generator/GenerationThread.php index 58a6de654..bd7991596 100644 --- a/src/pocketmine/level/generator/GenerationThread.php +++ b/src/pocketmine/level/generator/GenerationThread.php @@ -32,15 +32,40 @@ class GenerationThread extends Thread{ /** @var \ThreadedLogger */ protected $logger; - protected $externalSocket; - protected $internalSocket; + /** @var \Threaded */ + protected $externalQueue; + /** @var \Threaded */ + protected $internalQueue; - public function getExternalSocket(){ - return $this->externalSocket; + /** + * @return \Threaded + */ + public function getInternalQueue(){ + return $this->internalQueue; } - public function getInternalSocket(){ - return $this->internalSocket; + /** + * @return \Threaded + */ + public function getExternalQueue(){ + return $this->externalQueue; + } + + public function pushMainToThreadPacket($str){ + $this->internalQueue[] = $str; + $this->notify(); + } + + public function readMainToThreadPacket(){ + return $this->internalQueue->shift(); + } + + public function pushThreadToMainPacket($str){ + $this->externalQueue[] = $str; + } + + public function readThreadToMainPacket(){ + return $this->externalQueue->shift(); } /** @@ -57,19 +82,8 @@ class GenerationThread extends Thread{ $this->addDependency($loadPaths, new \ReflectionClass($this->loader)); $this->loadPaths = array_reverse($loadPaths); - $sockets = []; - if(!socket_create_pair((strtoupper(substr(PHP_OS, 0, 3)) === 'WIN' ? AF_INET : AF_UNIX), SOCK_STREAM, 0, $sockets)){ - throw new \Exception("Could not create IPC sockets. Reason: " . socket_strerror(socket_last_error())); - } - - $this->internalSocket = $sockets[0]; - socket_set_block($this->internalSocket); //IMPORTANT! - @socket_set_option($this->internalSocket, SOL_SOCKET, SO_SNDBUF, 1024 * 1024 * 2); - @socket_set_option($this->internalSocket, SOL_SOCKET, SO_RCVBUF, 1024 * 1024 * 2); - $this->externalSocket = $sockets[1]; - socket_set_nonblock($this->externalSocket); - @socket_set_option($this->externalSocket, SOL_SOCKET, SO_SNDBUF, 1024 * 1024 * 2); - @socket_set_option($this->externalSocket, SOL_SOCKET, SO_RCVBUF, 1024 * 1024 * 2); + $this->externalQueue = new \Threaded(); + $this->internalQueue = new \Threaded(); $this->start(); } @@ -98,6 +112,6 @@ class GenerationThread extends Thread{ } $this->loader->register(); - $generationManager = new GenerationManager($this->getInternalSocket(), $this->getLogger(), $this->loader); + $generationManager = new GenerationManager($this, $this->getLogger(), $this->loader); } } \ No newline at end of file