Require pthreads ^5.1

This version of pthreads has a substantially improved API, improved
performance, improved memory usage, and much less magical and broken
behaviour.
This commit is contained in:
Dylan K. Taylor
2023-01-23 20:02:33 +00:00
parent 14b250c63f
commit 222415859a
22 changed files with 245 additions and 118 deletions

View File

@ -33,6 +33,7 @@ use pocketmine\network\mcpe\protocol\serializer\PacketSerializerContext;
use pocketmine\network\mcpe\protocol\types\ChunkPosition;
use pocketmine\network\mcpe\serializer\ChunkSerializer;
use pocketmine\scheduler\AsyncTask;
use pocketmine\thread\NonThreadSafeValue;
use pocketmine\world\format\Chunk;
use pocketmine\world\format\io\FastChunkSerializer;
@ -43,14 +44,15 @@ class ChunkRequestTask extends AsyncTask{
protected string $chunk;
protected int $chunkX;
protected int $chunkZ;
protected Compressor $compressor;
/** @phpstan-var NonThreadSafeValue<Compressor> */
protected NonThreadSafeValue $compressor;
private string $tiles;
/**
* @phpstan-param (\Closure() : void)|null $onError
*/
public function __construct(int $chunkX, int $chunkZ, Chunk $chunk, CompressBatchPromise $promise, Compressor $compressor, ?\Closure $onError = null){
$this->compressor = $compressor;
$this->compressor = new NonThreadSafeValue($compressor);
$this->chunk = FastChunkSerializer::serializeTerrain($chunk);
$this->chunkX = $chunkX;
@ -66,7 +68,7 @@ class ChunkRequestTask extends AsyncTask{
$subCount = ChunkSerializer::getSubChunkCount($chunk);
$encoderContext = new PacketSerializerContext(GlobalItemTypeDictionary::getInstance()->getDictionary());
$payload = ChunkSerializer::serializeFullChunk($chunk, RuntimeBlockMapping::getInstance(), $encoderContext, $this->tiles);
$this->setResult($this->compressor->compress(PacketBatch::fromPackets($encoderContext, LevelChunkPacket::create(new ChunkPosition($this->chunkX, $this->chunkZ), $subCount, false, null, $payload))->getBuffer()));
$this->setResult($this->compressor->deserialize()->compress(PacketBatch::fromPackets($encoderContext, LevelChunkPacket::create(new ChunkPosition($this->chunkX, $this->chunkZ), $subCount, false, null, $payload))->getBuffer()));
}
public function onError() : void{

View File

@ -30,6 +30,7 @@ use pocketmine\network\mcpe\JwtUtils;
use pocketmine\network\mcpe\protocol\types\login\JwtChainLinkBody;
use pocketmine\network\mcpe\protocol\types\login\JwtHeader;
use pocketmine\scheduler\AsyncTask;
use pocketmine\thread\NonThreadSafeValue;
use function base64_decode;
use function igbinary_serialize;
use function igbinary_unserialize;
@ -49,8 +50,10 @@ class ProcessLoginTask extends AsyncTask{
* Whether the keychain signatures were validated correctly. This will be set to an error message if any link in the
* keychain is invalid for whatever reason (bad signature, not in nbf-exp window, etc). If this is non-null, the
* keychain might have been tampered with. The player will always be disconnected if this is non-null.
*
* @phpstan-var NonThreadSafeValue<Translatable>|string|null
*/
private Translatable|string|null $error = "Unknown";
private NonThreadSafeValue|string|null $error = "Unknown";
/**
* Whether the player is logged into Xbox Live. This is true if any link in the keychain is signed with the Mojang
* root public key.
@ -77,7 +80,8 @@ class ProcessLoginTask extends AsyncTask{
$this->clientPublicKey = $this->validateChain();
$this->error = null;
}catch(VerifyLoginException $e){
$this->error = $e->getDisconnectMessage();
$disconnectMessage = $e->getDisconnectMessage();
$this->error = $disconnectMessage instanceof Translatable ? new NonThreadSafeValue($disconnectMessage) : $disconnectMessage;
}
}
@ -195,6 +199,6 @@ class ProcessLoginTask extends AsyncTask{
* @phpstan-var \Closure(bool, bool, Translatable|string|null, ?string) : void $callback
*/
$callback = $this->fetchLocal(self::TLS_KEY_ON_COMPLETION);
$callback($this->authenticated, $this->authRequired, $this->error, $this->clientPublicKey);
$callback($this->authenticated, $this->authRequired, $this->error instanceof NonThreadSafeValue ? $this->error->deserialize() : $this->error, $this->clientPublicKey);
}
}

View File

@ -24,21 +24,26 @@ declare(strict_types=1);
namespace pocketmine\network\mcpe\compression;
use pocketmine\scheduler\AsyncTask;
use pocketmine\thread\NonThreadSafeValue;
class CompressBatchTask extends AsyncTask{
private const TLS_KEY_PROMISE = "promise";
/** @phpstan-var NonThreadSafeValue<Compressor> */
private NonThreadSafeValue $compressor;
public function __construct(
private string $data,
CompressBatchPromise $promise,
private Compressor $compressor
Compressor $compressor
){
$this->compressor = new NonThreadSafeValue($compressor);
$this->storeLocal(self::TLS_KEY_PROMISE, $promise);
}
public function onRun() : void{
$this->setResult($this->compressor->compress($this->data));
$this->setResult($this->compressor->deserialize()->compress($this->data));
}
public function onCompletion() : void{

View File

@ -26,7 +26,10 @@ namespace pocketmine\network\mcpe\raklib;
use raklib\server\ipc\InterThreadChannelReader;
final class PthreadsChannelReader implements InterThreadChannelReader{
public function __construct(private \Threaded $buffer){}
/**
* @phpstan-param \ThreadedArray<int, string> $buffer
*/
public function __construct(private \ThreadedArray $buffer){}
public function read() : ?string{
return $this->buffer->shift();

View File

@ -26,7 +26,10 @@ namespace pocketmine\network\mcpe\raklib;
use raklib\server\ipc\InterThreadChannelWriter;
final class PthreadsChannelWriter implements InterThreadChannelWriter{
public function __construct(private \Threaded $buffer){}
/**
* @phpstan-param \ThreadedArray<int, string> $buffer
*/
public function __construct(private \ThreadedArray $buffer){}
public function write(string $str) : void{
$this->buffer[] = $str;

View File

@ -85,8 +85,10 @@ class RakLibInterface implements ServerEventListener, AdvancedNetworkInterface{
$this->sleeper = new SleeperNotifier();
$mainToThreadBuffer = new \Threaded();
$threadToMainBuffer = new \Threaded();
/** @phpstan-var \ThreadedArray<int, string> $mainToThreadBuffer */
$mainToThreadBuffer = new \ThreadedArray();
/** @phpstan-var \ThreadedArray<int, string> $threadToMainBuffer */
$threadToMainBuffer = new \ThreadedArray();
$this->rakLib = new RakLibServer(
$this->server->getLogger(),

View File

@ -24,6 +24,7 @@ declare(strict_types=1);
namespace pocketmine\network\mcpe\raklib;
use pocketmine\snooze\SleeperNotifier;
use pocketmine\thread\NonThreadSafeValue;
use pocketmine\thread\Thread;
use raklib\generic\Socket;
use raklib\generic\SocketException;
@ -43,19 +44,27 @@ class RakLibServer extends Thread{
protected bool $cleanShutdown = false;
protected bool $ready = false;
protected string $mainPath;
public ?RakLibThreadCrashInfo $crashInfo = null;
/** @phpstan-var NonThreadSafeValue<RakLibThreadCrashInfo>|null */
public ?NonThreadSafeValue $crashInfo = null;
/** @phpstan-var NonThreadSafeValue<InternetAddress> */
protected NonThreadSafeValue $address;
/**
* @phpstan-param \ThreadedArray<int, string> $mainToThreadBuffer
* @phpstan-param \ThreadedArray<int, string> $threadToMainBuffer
*/
public function __construct(
protected \ThreadedLogger $logger,
protected \Threaded $mainToThreadBuffer,
protected \Threaded $threadToMainBuffer,
protected InternetAddress $address,
protected \ThreadedArray $mainToThreadBuffer,
protected \ThreadedArray $threadToMainBuffer,
InternetAddress $address,
protected int $serverId,
protected int $maxMtuSize,
protected int $protocolVersion,
protected SleeperNotifier $mainThreadNotifier
){
$this->mainPath = \pocketmine\PATH;
$this->address = new NonThreadSafeValue($address);
}
/**
@ -75,12 +84,12 @@ class RakLibServer extends Thread{
}
public function getCrashInfo() : ?RakLibThreadCrashInfo{
return $this->crashInfo;
return $this->crashInfo?->deserialize();
}
private function setCrashInfo(RakLibThreadCrashInfo $info) : void{
$this->synchronized(function(RakLibThreadCrashInfo $info) : void{
$this->crashInfo = $info;
$this->crashInfo = new NonThreadSafeValue($info);
$this->notify();
}, $info);
}
@ -91,7 +100,7 @@ class RakLibServer extends Thread{
while(!$this->ready && $this->crashInfo === null){
$this->wait();
}
$crashInfo = $this->crashInfo;
$crashInfo = $this->crashInfo?->deserialize();
if($crashInfo !== null){
if($crashInfo->getClass() === SocketException::class){
throw new SocketException($crashInfo->getMessage());
@ -110,7 +119,7 @@ class RakLibServer extends Thread{
register_shutdown_function([$this, "shutdownHandler"]);
try{
$socket = new Socket($this->address);
$socket = new Socket($this->address->deserialize());
}catch(SocketException $e){
$this->setCrashInfo(RakLibThreadCrashInfo::fromThrowable($e));
return;

View File

@ -27,8 +27,11 @@ use pocketmine\snooze\SleeperNotifier;
use raklib\server\ipc\InterThreadChannelWriter;
final class SnoozeAwarePthreadsChannelWriter implements InterThreadChannelWriter{
/**
* @phpstan-param \ThreadedArray<int, string> $buffer
*/
public function __construct(
private \Threaded $buffer,
private \ThreadedArray $buffer,
private SleeperNotifier $notifier
){}