mirror of
https://github.com/pmmp/PocketMine-MP.git
synced 2025-07-04 09:10:00 +00:00
Use pthreads interface on Chunk Generation THread, remove IPC sockets, improve performance
This commit is contained in:
parent
665c275bb7
commit
dbd1f3f96e
@ -75,10 +75,15 @@ class GenerationChunkManager implements ChunkManager{
|
|||||||
* @param $chunkZ
|
* @param $chunkZ
|
||||||
*
|
*
|
||||||
* @return FullChunk
|
* @return FullChunk
|
||||||
|
*
|
||||||
|
* @throws \Exception
|
||||||
*/
|
*/
|
||||||
public function getChunk($chunkX, $chunkZ){
|
public function getChunk($chunkX, $chunkZ){
|
||||||
$index = Level::chunkHash($chunkX, $chunkZ);
|
$index = Level::chunkHash($chunkX, $chunkZ);
|
||||||
$chunk = !isset($this->chunks[$index]) ? $this->requestChunk($chunkX, $chunkZ) : $this->chunks[$index];
|
$chunk = !isset($this->chunks[$index]) ? $this->requestChunk($chunkX, $chunkZ) : $this->chunks[$index];
|
||||||
|
if($chunk === null){
|
||||||
|
throw new \Exception("null chunk received");
|
||||||
|
}
|
||||||
$this->changes[$index] = $chunk;
|
$this->changes[$index] = $chunk;
|
||||||
|
|
||||||
return $chunk;
|
return $chunk;
|
||||||
@ -134,21 +139,36 @@ class GenerationChunkManager implements ChunkManager{
|
|||||||
}
|
}
|
||||||
|
|
||||||
public function isChunkGenerated($chunkX, $chunkZ){
|
public function isChunkGenerated($chunkX, $chunkZ){
|
||||||
|
try{
|
||||||
return $this->getChunk($chunkX, $chunkZ)->isGenerated();
|
return $this->getChunk($chunkX, $chunkZ)->isGenerated();
|
||||||
|
}catch(\Exception $e){
|
||||||
|
return false;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public function isChunkPopulated($chunkX, $chunkZ){
|
public function isChunkPopulated($chunkX, $chunkZ){
|
||||||
|
try{
|
||||||
return $this->getChunk($chunkX, $chunkZ)->isPopulated();
|
return $this->getChunk($chunkX, $chunkZ)->isPopulated();
|
||||||
|
}catch(\Exception $e){
|
||||||
|
return false;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public function setChunkGenerated($chunkX, $chunkZ){
|
public function setChunkGenerated($chunkX, $chunkZ){
|
||||||
$chunk = $this->getChunk($chunkX, $chunkZ);
|
$chunk = $this->getChunk($chunkX, $chunkZ);
|
||||||
$chunk->setGenerated(true);
|
$chunk->setGenerated(true);
|
||||||
|
try{
|
||||||
|
$chunk = $this->getChunk($chunkX, $chunkZ);
|
||||||
|
$chunk->setGenerated(true);
|
||||||
|
}catch(\Exception $e){}
|
||||||
}
|
}
|
||||||
|
|
||||||
public function setChunkPopulated($chunkX, $chunkZ){
|
public function setChunkPopulated($chunkX, $chunkZ){
|
||||||
|
|
||||||
|
try{
|
||||||
$chunk = $this->getChunk($chunkX, $chunkZ);
|
$chunk = $this->getChunk($chunkX, $chunkZ);
|
||||||
$chunk->setPopulated(true);
|
$chunk->setPopulated(true);
|
||||||
|
}catch(\Exception $e){}
|
||||||
}
|
}
|
||||||
|
|
||||||
protected function requestChunk($chunkX, $chunkZ){
|
protected function requestChunk($chunkX, $chunkZ){
|
||||||
@ -181,7 +201,11 @@ class GenerationChunkManager implements ChunkManager{
|
|||||||
* @return int 0-255
|
* @return int 0-255
|
||||||
*/
|
*/
|
||||||
public function getBlockIdAt($x, $y, $z){
|
public function getBlockIdAt($x, $y, $z){
|
||||||
|
try{
|
||||||
return $this->getChunk($x >> 4, $z >> 4)->getBlockId($x & 0x0f, $y & 0x7f, $z & 0x0f);
|
return $this->getChunk($x >> 4, $z >> 4)->getBlockId($x & 0x0f, $y & 0x7f, $z & 0x0f);
|
||||||
|
}catch(\Exception $e){
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -193,7 +217,9 @@ class GenerationChunkManager implements ChunkManager{
|
|||||||
* @param int $id 0-255
|
* @param int $id 0-255
|
||||||
*/
|
*/
|
||||||
public function setBlockIdAt($x, $y, $z, $id){
|
public function setBlockIdAt($x, $y, $z, $id){
|
||||||
|
try{
|
||||||
$this->getChunk($x >> 4, $z >> 4)->setBlockId($x & 0x0f, $y & 0x7f, $z & 0x0f, $id & 0xff);
|
$this->getChunk($x >> 4, $z >> 4)->setBlockId($x & 0x0f, $y & 0x7f, $z & 0x0f, $id & 0xff);
|
||||||
|
}catch(\Exception $e){}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -206,7 +232,11 @@ class GenerationChunkManager implements ChunkManager{
|
|||||||
* @return int 0-15
|
* @return int 0-15
|
||||||
*/
|
*/
|
||||||
public function getBlockDataAt($x, $y, $z){
|
public function getBlockDataAt($x, $y, $z){
|
||||||
|
try{
|
||||||
return $this->getChunk($x >> 4, $z >> 4)->getBlockData($x & 0x0f, $y & 0x7f, $z & 0x0f);
|
return $this->getChunk($x >> 4, $z >> 4)->getBlockData($x & 0x0f, $y & 0x7f, $z & 0x0f);
|
||||||
|
}catch(\Exception $e){
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -218,7 +248,9 @@ class GenerationChunkManager implements ChunkManager{
|
|||||||
* @param int $data 0-15
|
* @param int $data 0-15
|
||||||
*/
|
*/
|
||||||
public function setBlockDataAt($x, $y, $z, $data){
|
public function setBlockDataAt($x, $y, $z, $data){
|
||||||
|
try{
|
||||||
$this->getChunk($x >> 4, $z >> 4)->setBlockData($x & 0x0f, $y & 0x7f, $z & 0x0f, $data & 0x0f);
|
$this->getChunk($x >> 4, $z >> 4)->setBlockData($x & 0x0f, $y & 0x7f, $z & 0x0f, $data & 0x0f);
|
||||||
|
}catch(\Exception $e){}
|
||||||
}
|
}
|
||||||
|
|
||||||
public function shutdown(){
|
public function shutdown(){
|
||||||
|
@ -89,8 +89,9 @@ class GenerationManager{
|
|||||||
*/
|
*/
|
||||||
const PACKET_SHUTDOWN = 0xff;
|
const PACKET_SHUTDOWN = 0xff;
|
||||||
|
|
||||||
|
/** @var GenerationThread */
|
||||||
|
protected $thread;
|
||||||
|
|
||||||
protected $socket;
|
|
||||||
/** @var \Logger */
|
/** @var \Logger */
|
||||||
protected $logger;
|
protected $logger;
|
||||||
/** @var \ClassLoader */
|
/** @var \ClassLoader */
|
||||||
@ -110,12 +111,12 @@ class GenerationManager{
|
|||||||
protected $shutdown = false;
|
protected $shutdown = false;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @param resource $socket
|
* @param GenerationThread $thread
|
||||||
* @param \Logger $logger
|
* @param \Logger $logger
|
||||||
* @param \ClassLoader $loader
|
* @param \ClassLoader $loader
|
||||||
*/
|
*/
|
||||||
public function __construct($socket, \Logger $logger, \ClassLoader $loader){
|
public function __construct(GenerationThread $thread, \Logger $logger, \ClassLoader $loader){
|
||||||
$this->socket = $socket;
|
$this->thread = $thread;
|
||||||
$this->logger = $logger;
|
$this->logger = $logger;
|
||||||
$this->loader = $loader;
|
$this->loader = $loader;
|
||||||
$chunkX = $chunkZ = null;
|
$chunkX = $chunkZ = null;
|
||||||
@ -204,7 +205,7 @@ class GenerationManager{
|
|||||||
public function requestChunk($levelID, $chunkX, $chunkZ){
|
public function requestChunk($levelID, $chunkX, $chunkZ){
|
||||||
$this->needsChunk[$levelID] = [$chunkX, $chunkZ];
|
$this->needsChunk[$levelID] = [$chunkX, $chunkZ];
|
||||||
$binary = chr(self::PACKET_REQUEST_CHUNK) . Binary::writeInt($levelID) . Binary::writeInt($chunkX) . Binary::writeInt($chunkZ);
|
$binary = chr(self::PACKET_REQUEST_CHUNK) . Binary::writeInt($levelID) . Binary::writeInt($chunkX) . Binary::writeInt($chunkZ);
|
||||||
@socket_write($this->socket, Binary::writeInt(strlen($binary)) . $binary);
|
$this->thread->pushThreadToMainPacket($binary);
|
||||||
|
|
||||||
do{
|
do{
|
||||||
$this->readPacket();
|
$this->readPacket();
|
||||||
@ -221,29 +222,11 @@ class GenerationManager{
|
|||||||
|
|
||||||
public function sendChunk($levelID, FullChunk $chunk){
|
public function sendChunk($levelID, FullChunk $chunk){
|
||||||
$binary = chr(self::PACKET_SEND_CHUNK) . Binary::writeInt($levelID) . chr(strlen($class = get_class($chunk))) . $class . $chunk->toBinary();
|
$binary = chr(self::PACKET_SEND_CHUNK) . Binary::writeInt($levelID) . chr(strlen($class = get_class($chunk))) . $class . $chunk->toBinary();
|
||||||
@socket_write($this->socket, Binary::writeInt(strlen($binary)) . $binary);
|
$this->thread->pushThreadToMainPacket($binary);
|
||||||
}
|
|
||||||
|
|
||||||
protected function socketRead($len){
|
|
||||||
$buffer = "";
|
|
||||||
while(strlen($buffer) < $len){
|
|
||||||
$buffer .= @socket_read($this->socket, $len - strlen($buffer));
|
|
||||||
}
|
|
||||||
|
|
||||||
return $buffer;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
protected function readPacket(){
|
protected function readPacket(){
|
||||||
$len = $this->socketRead(4);
|
if(strlen($packet = $this->thread->readMainToThreadPacket()) > 0){
|
||||||
if(($len = Binary::readInt($len)) <= 0){
|
|
||||||
$this->shutdown = true;
|
|
||||||
$this->getLogger()->critical("Generation Thread found a stream error, shutting down");
|
|
||||||
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
$packet = $this->socketRead($len);
|
|
||||||
|
|
||||||
$pid = ord($packet{0});
|
$pid = ord($packet{0});
|
||||||
$offset = 1;
|
$offset = 1;
|
||||||
if($pid === self::PACKET_REQUEST_CHUNK){
|
if($pid === self::PACKET_REQUEST_CHUNK){
|
||||||
@ -290,7 +273,9 @@ class GenerationManager{
|
|||||||
$this->levels = [];
|
$this->levels = [];
|
||||||
|
|
||||||
$this->shutdown = true;
|
$this->shutdown = true;
|
||||||
socket_close($this->socket);
|
}
|
||||||
|
}elseif(count($this->thread->getInternalQueue()) === 0){
|
||||||
|
$this->thread->wait(50000);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -28,7 +28,6 @@ use pocketmine\utils\Binary;
|
|||||||
|
|
||||||
class GenerationRequestManager{
|
class GenerationRequestManager{
|
||||||
|
|
||||||
protected $socket;
|
|
||||||
/** @var Server */
|
/** @var Server */
|
||||||
protected $server;
|
protected $server;
|
||||||
/** @var GenerationThread */
|
/** @var GenerationThread */
|
||||||
@ -40,7 +39,6 @@ class GenerationRequestManager{
|
|||||||
public function __construct(Server $server){
|
public function __construct(Server $server){
|
||||||
$this->server = $server;
|
$this->server = $server;
|
||||||
$this->generationThread = new GenerationThread($server->getLogger(), $server->getLoader());
|
$this->generationThread = new GenerationThread($server->getLogger(), $server->getLoader());
|
||||||
$this->socket = $this->generationThread->getExternalSocket();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -52,7 +50,7 @@ class GenerationRequestManager{
|
|||||||
$buffer = chr(GenerationManager::PACKET_OPEN_LEVEL) . Binary::writeInt($level->getID()) . Binary::writeInt($level->getSeed()) .
|
$buffer = chr(GenerationManager::PACKET_OPEN_LEVEL) . Binary::writeInt($level->getID()) . Binary::writeInt($level->getSeed()) .
|
||||||
Binary::writeShort(strlen($generator)) . $generator . serialize($options);
|
Binary::writeShort(strlen($generator)) . $generator . serialize($options);
|
||||||
|
|
||||||
@socket_write($this->socket, Binary::writeInt(strlen($buffer)) . $buffer);
|
$this->generationThread->pushMainToThreadPacket($buffer);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -60,31 +58,22 @@ class GenerationRequestManager{
|
|||||||
*/
|
*/
|
||||||
public function closeLevel(Level $level){
|
public function closeLevel(Level $level){
|
||||||
$buffer = chr(GenerationManager::PACKET_CLOSE_LEVEL) . Binary::writeInt($level->getID());
|
$buffer = chr(GenerationManager::PACKET_CLOSE_LEVEL) . Binary::writeInt($level->getID());
|
||||||
@socket_write($this->socket, Binary::writeInt(strlen($buffer)) . $buffer);
|
$this->generationThread->pushMainToThreadPacket($buffer);
|
||||||
}
|
}
|
||||||
|
|
||||||
public function addNamespace($namespace, $path){
|
public function addNamespace($namespace, $path){
|
||||||
$buffer = chr(GenerationManager::PACKET_ADD_NAMESPACE) . Binary::writeShort(strlen($namespace)) . $namespace . $path;
|
$buffer = chr(GenerationManager::PACKET_ADD_NAMESPACE) . Binary::writeShort(strlen($namespace)) . $namespace . $path;
|
||||||
@socket_write($this->socket, Binary::writeInt(strlen($buffer)) . $buffer);
|
$this->generationThread->pushMainToThreadPacket($buffer);
|
||||||
}
|
|
||||||
|
|
||||||
protected function socketRead($len){
|
|
||||||
$buffer = "";
|
|
||||||
while(strlen($buffer) < $len){
|
|
||||||
$buffer .= @socket_read($this->socket, $len - strlen($buffer));
|
|
||||||
}
|
|
||||||
|
|
||||||
return $buffer;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
protected function sendChunk($levelID, FullChunk $chunk){
|
protected function sendChunk($levelID, FullChunk $chunk){
|
||||||
$binary = chr(GenerationManager::PACKET_SEND_CHUNK) . Binary::writeInt($levelID) . chr(strlen($class = get_class($chunk))) . $class . $chunk->toBinary();
|
$buffer = chr(GenerationManager::PACKET_SEND_CHUNK) . Binary::writeInt($levelID) . chr(strlen($class = get_class($chunk))) . $class . $chunk->toBinary();
|
||||||
@socket_write($this->socket, Binary::writeInt(strlen($binary)) . $binary);
|
$this->generationThread->pushMainToThreadPacket($buffer);
|
||||||
}
|
}
|
||||||
|
|
||||||
public function requestChunk(Level $level, $chunkX, $chunkZ){
|
public function requestChunk(Level $level, $chunkX, $chunkZ){
|
||||||
$buffer = chr(GenerationManager::PACKET_REQUEST_CHUNK) . Binary::writeInt($level->getID()) . Binary::writeInt($chunkX) . Binary::writeInt($chunkZ);
|
$buffer = chr(GenerationManager::PACKET_REQUEST_CHUNK) . Binary::writeInt($level->getID()) . Binary::writeInt($chunkX) . Binary::writeInt($chunkZ);
|
||||||
@socket_write($this->socket, Binary::writeInt(strlen($buffer)) . $buffer);
|
$this->generationThread->pushMainToThreadPacket($buffer);
|
||||||
}
|
}
|
||||||
|
|
||||||
protected function handleRequest($levelID, $chunkX, $chunkZ){
|
protected function handleRequest($levelID, $chunkX, $chunkZ){
|
||||||
@ -97,7 +86,7 @@ class GenerationRequestManager{
|
|||||||
}
|
}
|
||||||
}else{
|
}else{
|
||||||
$buffer = chr(GenerationManager::PACKET_CLOSE_LEVEL) . Binary::writeInt($levelID);
|
$buffer = chr(GenerationManager::PACKET_CLOSE_LEVEL) . Binary::writeInt($levelID);
|
||||||
@socket_write($this->socket, Binary::writeInt(strlen($buffer)) . $buffer);
|
$this->generationThread->pushMainToThreadPacket($buffer);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -106,17 +95,12 @@ class GenerationRequestManager{
|
|||||||
$level->generateChunkCallback($chunk->getX(), $chunk->getZ(), $chunk);
|
$level->generateChunkCallback($chunk->getX(), $chunk->getZ(), $chunk);
|
||||||
}else{
|
}else{
|
||||||
$buffer = chr(GenerationManager::PACKET_CLOSE_LEVEL) . Binary::writeInt($levelID);
|
$buffer = chr(GenerationManager::PACKET_CLOSE_LEVEL) . Binary::writeInt($levelID);
|
||||||
@socket_write($this->socket, Binary::writeInt(strlen($buffer)) . $buffer);
|
$this->generationThread->pushMainToThreadPacket($buffer);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public function handlePackets(){
|
public function handlePackets(){
|
||||||
if(($len = @socket_read($this->socket, 4)) !== false and $len !== ""){
|
while(strlen($packet = $this->generationThread->readThreadToMainPacket()) > 0){
|
||||||
if(strlen($len) < 4){
|
|
||||||
$len .= $this->socketRead(4 - strlen($len));
|
|
||||||
}
|
|
||||||
|
|
||||||
$packet = $this->socketRead(Binary::readInt($len));
|
|
||||||
$pid = ord($packet{0});
|
$pid = ord($packet{0});
|
||||||
$offset = 1;
|
$offset = 1;
|
||||||
|
|
||||||
@ -145,7 +129,7 @@ class GenerationRequestManager{
|
|||||||
|
|
||||||
public function shutdown(){
|
public function shutdown(){
|
||||||
$buffer = chr(GenerationManager::PACKET_SHUTDOWN);
|
$buffer = chr(GenerationManager::PACKET_SHUTDOWN);
|
||||||
@socket_write($this->socket, Binary::writeInt(strlen($buffer)) . $buffer);
|
$this->generationThread->pushMainToThreadPacket($buffer);
|
||||||
$this->generationThread->join();
|
$this->generationThread->join();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -32,15 +32,40 @@ class GenerationThread extends Thread{
|
|||||||
/** @var \ThreadedLogger */
|
/** @var \ThreadedLogger */
|
||||||
protected $logger;
|
protected $logger;
|
||||||
|
|
||||||
protected $externalSocket;
|
/** @var \Threaded */
|
||||||
protected $internalSocket;
|
protected $externalQueue;
|
||||||
|
/** @var \Threaded */
|
||||||
|
protected $internalQueue;
|
||||||
|
|
||||||
public function getExternalSocket(){
|
/**
|
||||||
return $this->externalSocket;
|
* @return \Threaded
|
||||||
|
*/
|
||||||
|
public function getInternalQueue(){
|
||||||
|
return $this->internalQueue;
|
||||||
}
|
}
|
||||||
|
|
||||||
public function getInternalSocket(){
|
/**
|
||||||
return $this->internalSocket;
|
* @return \Threaded
|
||||||
|
*/
|
||||||
|
public function getExternalQueue(){
|
||||||
|
return $this->externalQueue;
|
||||||
|
}
|
||||||
|
|
||||||
|
public function pushMainToThreadPacket($str){
|
||||||
|
$this->internalQueue[] = $str;
|
||||||
|
$this->notify();
|
||||||
|
}
|
||||||
|
|
||||||
|
public function readMainToThreadPacket(){
|
||||||
|
return $this->internalQueue->shift();
|
||||||
|
}
|
||||||
|
|
||||||
|
public function pushThreadToMainPacket($str){
|
||||||
|
$this->externalQueue[] = $str;
|
||||||
|
}
|
||||||
|
|
||||||
|
public function readThreadToMainPacket(){
|
||||||
|
return $this->externalQueue->shift();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -57,19 +82,8 @@ class GenerationThread extends Thread{
|
|||||||
$this->addDependency($loadPaths, new \ReflectionClass($this->loader));
|
$this->addDependency($loadPaths, new \ReflectionClass($this->loader));
|
||||||
$this->loadPaths = array_reverse($loadPaths);
|
$this->loadPaths = array_reverse($loadPaths);
|
||||||
|
|
||||||
$sockets = [];
|
$this->externalQueue = new \Threaded();
|
||||||
if(!socket_create_pair((strtoupper(substr(PHP_OS, 0, 3)) === 'WIN' ? AF_INET : AF_UNIX), SOCK_STREAM, 0, $sockets)){
|
$this->internalQueue = new \Threaded();
|
||||||
throw new \Exception("Could not create IPC sockets. Reason: " . socket_strerror(socket_last_error()));
|
|
||||||
}
|
|
||||||
|
|
||||||
$this->internalSocket = $sockets[0];
|
|
||||||
socket_set_block($this->internalSocket); //IMPORTANT!
|
|
||||||
@socket_set_option($this->internalSocket, SOL_SOCKET, SO_SNDBUF, 1024 * 1024 * 2);
|
|
||||||
@socket_set_option($this->internalSocket, SOL_SOCKET, SO_RCVBUF, 1024 * 1024 * 2);
|
|
||||||
$this->externalSocket = $sockets[1];
|
|
||||||
socket_set_nonblock($this->externalSocket);
|
|
||||||
@socket_set_option($this->externalSocket, SOL_SOCKET, SO_SNDBUF, 1024 * 1024 * 2);
|
|
||||||
@socket_set_option($this->externalSocket, SOL_SOCKET, SO_RCVBUF, 1024 * 1024 * 2);
|
|
||||||
|
|
||||||
$this->start();
|
$this->start();
|
||||||
}
|
}
|
||||||
@ -98,6 +112,6 @@ class GenerationThread extends Thread{
|
|||||||
}
|
}
|
||||||
$this->loader->register();
|
$this->loader->register();
|
||||||
|
|
||||||
$generationManager = new GenerationManager($this->getInternalSocket(), $this->getLogger(), $this->loader);
|
$generationManager = new GenerationManager($this, $this->getLogger(), $this->loader);
|
||||||
}
|
}
|
||||||
}
|
}
|
Loading…
x
Reference in New Issue
Block a user