Added DataPacketBroadcastEvent, refactor broadcast handling, close #1521

batchPackets() is now considered internal and shouldn't be used by plugins.

Added Server->broadcastPackets(Player[], DataPacket[]) : bool
This commit is contained in:
Dylan K. Taylor 2018-08-02 15:54:30 +01:00
parent c345e6f22c
commit 7560880168
4 changed files with 140 additions and 37 deletions

View File

@ -39,6 +39,7 @@ use pocketmine\event\HandlerList;
use pocketmine\event\level\LevelInitEvent; use pocketmine\event\level\LevelInitEvent;
use pocketmine\event\level\LevelLoadEvent; use pocketmine\event\level\LevelLoadEvent;
use pocketmine\event\player\PlayerDataSaveEvent; use pocketmine\event\player\PlayerDataSaveEvent;
use pocketmine\event\server\DataPacketBroadcastEvent;
use pocketmine\event\server\QueryRegenerateEvent; use pocketmine\event\server\QueryRegenerateEvent;
use pocketmine\event\server\ServerCommandEvent; use pocketmine\event\server\ServerCommandEvent;
use pocketmine\inventory\CraftingManager; use pocketmine\inventory\CraftingManager;
@ -1859,41 +1860,62 @@ class Server{
* *
* @param Player[] $players * @param Player[] $players
* @param DataPacket $packet * @param DataPacket $packet
*
* @return bool
*/ */
public function broadcastPacket(array $players, DataPacket $packet){ public function broadcastPacket(array $players, DataPacket $packet) : bool{
$packet->encode(); return $this->broadcastPackets($players, [$packet]);
$this->batchPackets($players, [$packet], false); }
/**
* @param Player[] $players
* @param DataPacket[] $packets
*
* @return bool
*/
public function broadcastPackets(array $players, array $packets) : bool{
if(empty($packets)){
throw new \InvalidArgumentException("Cannot broadcast empty list of packets");
}
$this->pluginManager->callEvent($ev = new DataPacketBroadcastEvent($players, $packets));
if($ev->isCancelled()){
return false;
}
/** @var NetworkSession[] $targets */
$targets = [];
foreach($ev->getPlayers() as $player){
if($player->isConnected()){
$targets[] = $player->getNetworkSession();
}
}
if(empty($targets)){
return false;
}
$stream = new PacketStream();
foreach($ev->getPackets() as $packet){
$stream->putPacket($packet);
}
//TODO: if under the compression threshold, add to session buffers instead of batching (first we need to implement buffering!)
$this->batchPackets($targets, $stream);
return true;
} }
/** /**
* Broadcasts a list of packets in a batch to a list of players * Broadcasts a list of packets in a batch to a list of players
* *
* @param Player[] $players * @param NetworkSession[] $targets
* @param DataPacket[] $packets * @param PacketStream $stream
* @param bool $forceSync * @param bool $forceSync
* @param bool $immediate * @param bool $immediate
*/ */
public function batchPackets(array $players, array $packets, bool $forceSync = false, bool $immediate = false){ public function batchPackets(array $targets, PacketStream $stream, bool $forceSync = false, bool $immediate = false){
if(empty($packets)){
throw new \InvalidArgumentException("Cannot send empty batch");
}
Timings::$playerNetworkSendCompressTimer->startTiming(); Timings::$playerNetworkSendCompressTimer->startTiming();
/** @var NetworkSession[] $targets */
$targets = [];
foreach($players as $player){
if($player->isConnected()){
$targets[] = $player->getNetworkSession();
}
}
if(!empty($targets)){ if(!empty($targets)){
$stream = new PacketStream();
foreach($packets as $p){
$stream->putPacket($p);
}
$compressionLevel = NetworkCompression::$LEVEL; $compressionLevel = NetworkCompression::$LEVEL;
if(NetworkCompression::$THRESHOLD < 0 or strlen($stream->buffer) < NetworkCompression::$THRESHOLD){ if(NetworkCompression::$THRESHOLD < 0 or strlen($stream->buffer) < NetworkCompression::$THRESHOLD){
$compressionLevel = 0; //Do not compress packets under the threshold $compressionLevel = 0; //Do not compress packets under the threshold

View File

@ -0,0 +1,75 @@
<?php
/*
*
* ____ _ _ __ __ _ __ __ ____
* | _ \ ___ ___| | _____| |_| \/ (_)_ __ ___ | \/ | _ \
* | |_) / _ \ / __| |/ / _ \ __| |\/| | | '_ \ / _ \_____| |\/| | |_) |
* | __/ (_) | (__| < __/ |_| | | | | | | | __/_____| | | | __/
* |_| \___/ \___|_|\_\___|\__|_| |_|_|_| |_|\___| |_| |_|_|
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* @author PocketMine Team
* @link http://www.pocketmine.net/
*
*
*/
declare(strict_types=1);
namespace pocketmine\event\server;
use pocketmine\event\Cancellable;
use pocketmine\network\mcpe\protocol\DataPacket;
use pocketmine\Player;
/**
* Called when a list of packets is broadcasted to 1 or more players.
*/
class DataPacketBroadcastEvent extends ServerEvent implements Cancellable{
/** @var Player[] */
private $players;
/** @var DataPacket[] */
private $packets;
/**
* @param Player[] $players
* @param DataPacket[] $packets
*/
public function __construct(array $players, array $packets){
$this->players = $players;
$this->packets = $packets;
}
/**
* @return Player[]
*/
public function getPlayers() : array{
return $this->players;
}
/**
* @param Player[] $players
*/
public function setPlayers(array $players) : void{
$this->players = $players;
}
/**
* @return DataPacket[]
*/
public function getPackets() : array{
return $this->packets;
}
/**
* @param DataPacket[] $packets
*/
public function setPackets(array $packets) : void{
$this->packets = $packets;
}
}

View File

@ -446,7 +446,7 @@ class Level implements ChunkManager, Metadatable{
$this->addChunkPacket($sound->getFloorX() >> 4, $sound->getFloorZ() >> 4, $e); $this->addChunkPacket($sound->getFloorX() >> 4, $sound->getFloorZ() >> 4, $e);
} }
}else{ }else{
$this->server->batchPackets($players, $pk, false); $this->server->broadcastPackets($players, $pk);
} }
} }
@ -461,7 +461,7 @@ class Level implements ChunkManager, Metadatable{
$this->addChunkPacket($particle->getFloorX() >> 4, $particle->getFloorZ() >> 4, $e); $this->addChunkPacket($particle->getFloorX() >> 4, $particle->getFloorZ() >> 4, $e);
} }
}else{ }else{
$this->server->batchPackets($players, $pk, false); $this->server->broadcastPackets($players, $pk);
} }
} }
@ -675,7 +675,11 @@ class Level implements ChunkManager, Metadatable{
$pk = new SetTimePacket(); $pk = new SetTimePacket();
$pk->time = $this->time; $pk->time = $this->time;
$this->server->broadcastPacket(count($targets) > 0 ? $targets : $this->players, $pk); if(empty($targets)){
$this->addGlobalPacket($pk);
}else{
$this->server->broadcastPacket($targets, $pk);
}
} }
/** /**
@ -798,7 +802,7 @@ class Level implements ChunkManager, Metadatable{
} }
if(!empty($this->players) and !empty($this->globalPackets)){ if(!empty($this->players) and !empty($this->globalPackets)){
$this->server->batchPackets($this->players, $this->globalPackets); $this->server->broadcastPackets($this->players, $this->globalPackets);
$this->globalPackets = []; $this->globalPackets = [];
} }
@ -806,7 +810,7 @@ class Level implements ChunkManager, Metadatable{
Level::getXZ($index, $chunkX, $chunkZ); Level::getXZ($index, $chunkX, $chunkZ);
$chunkPlayers = $this->getChunkPlayers($chunkX, $chunkZ); $chunkPlayers = $this->getChunkPlayers($chunkX, $chunkZ);
if(count($chunkPlayers) > 0){ if(count($chunkPlayers) > 0){
$this->server->batchPackets($chunkPlayers, $entries, false, false); $this->server->broadcastPackets($chunkPlayers, $entries);
} }
} }
@ -914,7 +918,7 @@ class Level implements ChunkManager, Metadatable{
} }
} }
$this->server->batchPackets($target, $packets, false, false); $this->server->broadcastPackets($target, $packets);
} }
public function clearCache(bool $force = false){ public function clearCache(bool $force = false){
@ -2919,13 +2923,13 @@ class Level implements ChunkManager, Metadatable{
* @param Player ...$targets * @param Player ...$targets
*/ */
public function sendDifficulty(Player ...$targets){ public function sendDifficulty(Player ...$targets){
if(count($targets) === 0){
$targets = $this->getPlayers();
}
$pk = new SetDifficultyPacket(); $pk = new SetDifficultyPacket();
$pk->difficulty = $this->getDifficulty(); $pk->difficulty = $this->getDifficulty();
$this->server->broadcastPacket($targets, $pk); if(empty($targets)){
$this->addGlobalPacket($pk);
}else{
$this->server->broadcastPacket($targets, $pk);
}
} }
public function populateChunk(int $x, int $z, bool $force = false) : bool{ public function populateChunk(int $x, int $z, bool $force = false) : bool{

View File

@ -202,7 +202,9 @@ class NetworkSession{
} }
//TODO: implement buffering (this is just a quick fix) //TODO: implement buffering (this is just a quick fix)
$this->server->batchPackets([$this->player], [$packet], true, $immediate); $stream = new PacketStream();
$stream->putPacket($packet);
$this->server->batchPackets([$this], $stream, true, $immediate);
return true; return true;
}finally{ }finally{