Added a PacketBroadcaster interface

this gives a bit more control over how packets are broadcasted, which might be useful if the batch format changes (e.g. adding a length prefix) for multi version.
This really ought to be unique to a protocol context instead of a network interface, but for now this is the best we can do.
This commit is contained in:
Dylan K. Taylor 2020-12-02 16:34:14 +00:00
parent 6001f69d52
commit 687ad28fa6
5 changed files with 131 additions and 26 deletions

View File

@ -52,6 +52,7 @@ use pocketmine\network\mcpe\compression\Compressor;
use pocketmine\network\mcpe\compression\ZlibCompressor; use pocketmine\network\mcpe\compression\ZlibCompressor;
use pocketmine\network\mcpe\encryption\EncryptionContext; use pocketmine\network\mcpe\encryption\EncryptionContext;
use pocketmine\network\mcpe\NetworkSession; use pocketmine\network\mcpe\NetworkSession;
use pocketmine\network\mcpe\PacketBroadcaster;
use pocketmine\network\mcpe\protocol\ClientboundPacket; use pocketmine\network\mcpe\protocol\ClientboundPacket;
use pocketmine\network\mcpe\protocol\ProtocolInfo; use pocketmine\network\mcpe\protocol\ProtocolInfo;
use pocketmine\network\mcpe\protocol\serializer\PacketBatch; use pocketmine\network\mcpe\protocol\serializer\PacketBatch;
@ -1227,33 +1228,17 @@ class Server{
} }
$recipients = $ev->getTargets(); $recipients = $ev->getTargets();
$stream = PacketBatch::fromPackets(...$ev->getPackets()); /** @var PacketBroadcaster[] $broadcasters */
$broadcasters = [];
/** @var Compressor[] $compressors */ /** @var NetworkSession[][] $broadcasterTargets */
$compressors = []; $broadcasterTargets = [];
/** @var NetworkSession[][] $compressorTargets */
$compressorTargets = [];
foreach($recipients as $recipient){ foreach($recipients as $recipient){
$compressor = $recipient->getCompressor(); $broadcaster = $recipient->getBroadcaster();
$compressorId = spl_object_id($compressor); $broadcasters[spl_object_id($broadcaster)] = $broadcaster;
//TODO: different compressors might be compatible, it might not be necessary to split them up by object $broadcasterTargets[spl_object_id($broadcaster)][] = $recipient;
$compressors[$compressorId] = $compressor;
$compressorTargets[$compressorId][] = $recipient;
} }
foreach($broadcasters as $broadcaster){
foreach($compressors as $compressorId => $compressor){ $broadcaster->broadcastPackets($broadcasterTargets[spl_object_id($broadcaster)], $packets);
if(!$compressor->willCompress($stream->getBuffer())){
foreach($compressorTargets[$compressorId] as $target){
foreach($ev->getPackets() as $pk){
$target->addToSendBuffer($pk);
}
}
}else{
$promise = $this->prepareBatch($stream, $compressor);
foreach($compressorTargets[$compressorId] as $target){
$target->queueCompressed($promise);
}
}
} }
return true; return true;

View File

@ -177,16 +177,20 @@ class NetworkSession{
/** @var PacketSender */ /** @var PacketSender */
private $sender; private $sender;
/** @var PacketBroadcaster */
private $broadcaster;
/** /**
* @var \Closure[]|Set * @var \Closure[]|Set
* @phpstan-var Set<\Closure() : void> * @phpstan-var Set<\Closure() : void>
*/ */
private $disposeHooks; private $disposeHooks;
public function __construct(Server $server, NetworkSessionManager $manager, PacketPool $packetPool, PacketSender $sender, Compressor $compressor, string $ip, int $port){ public function __construct(Server $server, NetworkSessionManager $manager, PacketPool $packetPool, PacketSender $sender, PacketBroadcaster $broadcaster, Compressor $compressor, string $ip, int $port){
$this->server = $server; $this->server = $server;
$this->manager = $manager; $this->manager = $manager;
$this->sender = $sender; $this->sender = $sender;
$this->broadcaster = $broadcaster;
$this->ip = $ip; $this->ip = $ip;
$this->port = $port; $this->port = $port;
@ -449,6 +453,8 @@ class NetworkSession{
} }
} }
public function getBroadcaster() : PacketBroadcaster{ return $this->broadcaster; }
public function getCompressor() : Compressor{ public function getCompressor() : Compressor{
return $this->compressor; return $this->compressor;
} }

View File

@ -0,0 +1,35 @@
<?php
/*
*
* ____ _ _ __ __ _ __ __ ____
* | _ \ ___ ___| | _____| |_| \/ (_)_ __ ___ | \/ | _ \
* | |_) / _ \ / __| |/ / _ \ __| |\/| | | '_ \ / _ \_____| |\/| | |_) |
* | __/ (_) | (__| < __/ |_| | | | | | | | __/_____| | | | __/
* |_| \___/ \___|_|\_\___|\__|_| |_|_|_| |_|\___| |_| |_|_|
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* @author PocketMine Team
* @link http://www.pocketmine.net/
*
*
*/
declare(strict_types=1);
namespace pocketmine\network\mcpe;
use pocketmine\network\mcpe\protocol\ClientboundPacket;
interface PacketBroadcaster{
/**
* @param NetworkSession[] $recipients
* @param ClientboundPacket[] $packets
*/
public function broadcastPackets(array $recipients, array $packets) : void;
}

View File

@ -0,0 +1,71 @@
<?php
/*
*
* ____ _ _ __ __ _ __ __ ____
* | _ \ ___ ___| | _____| |_| \/ (_)_ __ ___ | \/ | _ \
* | |_) / _ \ / __| |/ / _ \ __| |\/| | | '_ \ / _ \_____| |\/| | |_) |
* | __/ (_) | (__| < __/ |_| | | | | | | | __/_____| | | | __/
* |_| \___/ \___|_|\_\___|\__|_| |_|_|_| |_|\___| |_| |_|_|
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* @author PocketMine Team
* @link http://www.pocketmine.net/
*
*
*/
declare(strict_types=1);
namespace pocketmine\network\mcpe;
use pocketmine\network\mcpe\compression\Compressor;
use pocketmine\network\mcpe\protocol\serializer\PacketBatch;
use pocketmine\Server;
use function spl_object_id;
final class StandardPacketBroadcaster implements PacketBroadcaster{
/** @var Server */
private $server;
public function __construct(Server $server){
$this->server = $server;
}
public function broadcastPackets(array $recipients, array $packets) : void{
$stream = PacketBatch::fromPackets(...$packets);
/** @var Compressor[] $compressors */
$compressors = [];
/** @var NetworkSession[][] $compressorTargets */
$compressorTargets = [];
foreach($recipients as $recipient){
$compressor = $recipient->getCompressor();
$compressorId = spl_object_id($compressor);
//TODO: different compressors might be compatible, it might not be necessary to split them up by object
$compressors[$compressorId] = $compressor;
$compressorTargets[$compressorId][] = $recipient;
}
foreach($compressors as $compressorId => $compressor){
if(!$compressor->willCompress($stream->getBuffer())){
foreach($compressorTargets[$compressorId] as $target){
foreach($packets as $pk){
$target->addToSendBuffer($pk);
}
}
}else{
$promise = $this->server->prepareBatch($stream, $compressor);
foreach($compressorTargets[$compressorId] as $target){
$target->queueCompressed($promise);
}
}
}
}
}

View File

@ -27,8 +27,10 @@ use pocketmine\network\AdvancedNetworkInterface;
use pocketmine\network\BadPacketException; use pocketmine\network\BadPacketException;
use pocketmine\network\mcpe\compression\ZlibCompressor; use pocketmine\network\mcpe\compression\ZlibCompressor;
use pocketmine\network\mcpe\NetworkSession; use pocketmine\network\mcpe\NetworkSession;
use pocketmine\network\mcpe\PacketBroadcaster;
use pocketmine\network\mcpe\protocol\PacketPool; use pocketmine\network\mcpe\protocol\PacketPool;
use pocketmine\network\mcpe\protocol\ProtocolInfo; use pocketmine\network\mcpe\protocol\ProtocolInfo;
use pocketmine\network\mcpe\StandardPacketBroadcaster;
use pocketmine\network\Network; use pocketmine\network\Network;
use pocketmine\Server; use pocketmine\Server;
use pocketmine\snooze\SleeperNotifier; use pocketmine\snooze\SleeperNotifier;
@ -81,6 +83,9 @@ class RakLibInterface implements ServerEventListener, AdvancedNetworkInterface{
/** @var SleeperNotifier */ /** @var SleeperNotifier */
private $sleeper; private $sleeper;
/** @var PacketBroadcaster */
private $broadcaster;
public function __construct(Server $server){ public function __construct(Server $server){
$this->server = $server; $this->server = $server;
$this->rakServerId = mt_rand(0, PHP_INT_MAX); $this->rakServerId = mt_rand(0, PHP_INT_MAX);
@ -106,6 +111,8 @@ class RakLibInterface implements ServerEventListener, AdvancedNetworkInterface{
$this->interface = new UserToRakLibThreadMessageSender( $this->interface = new UserToRakLibThreadMessageSender(
new PthreadsChannelWriter($mainToThreadBuffer) new PthreadsChannelWriter($mainToThreadBuffer)
); );
$this->broadcaster = new StandardPacketBroadcaster($this->server);
} }
public function start() : void{ public function start() : void{
@ -158,6 +165,7 @@ class RakLibInterface implements ServerEventListener, AdvancedNetworkInterface{
$this->network->getSessionManager(), $this->network->getSessionManager(),
PacketPool::getInstance(), PacketPool::getInstance(),
new RakLibPacketSender($sessionId, $this), new RakLibPacketSender($sessionId, $this),
$this->broadcaster,
ZlibCompressor::getInstance(), //TODO: this shouldn't be hardcoded, but we might need the RakNet protocol version to select it ZlibCompressor::getInstance(), //TODO: this shouldn't be hardcoded, but we might need the RakNet protocol version to select it
$address, $address,
$port $port