Better chunking behaviour and add members on PRESENCE_UPDATE.
This should hopefully cover all cases where members are added. There was a bug where an array of chunks received would get entirely processed if only a single chunk was received. This was fixed by explicitly bailing early if we're requesting for chunks.
This commit is contained in:
parent
84f1342b85
commit
6076c8c671
@ -46,7 +46,7 @@ class ListenerType(enum.Enum):
|
|||||||
|
|
||||||
Listener = namedtuple('Listener', ('type', 'future', 'predicate'))
|
Listener = namedtuple('Listener', ('type', 'future', 'predicate'))
|
||||||
log = logging.getLogger(__name__)
|
log = logging.getLogger(__name__)
|
||||||
ReadyState = namedtuple('ReadyState', ('launch', 'chunks', 'servers'))
|
ReadyState = namedtuple('ReadyState', ('launch', 'servers'))
|
||||||
|
|
||||||
class ConnectionState:
|
class ConnectionState:
|
||||||
def __init__(self, dispatch, chunker, max_messages, *, loop):
|
def __init__(self, dispatch, chunker, max_messages, *, loop):
|
||||||
@ -85,6 +85,8 @@ class ConnectionState:
|
|||||||
if passed:
|
if passed:
|
||||||
future.set_result(result)
|
future.set_result(result)
|
||||||
removed.append(i)
|
removed.append(i)
|
||||||
|
if listener.type == ListenerType.chunk:
|
||||||
|
break
|
||||||
|
|
||||||
for index in reversed(removed):
|
for index in reversed(removed):
|
||||||
del self._listeners[index]
|
del self._listeners[index]
|
||||||
@ -134,16 +136,7 @@ class ConnectionState:
|
|||||||
yield self.receive_chunk(server.id)
|
yield self.receive_chunk(server.id)
|
||||||
|
|
||||||
@asyncio.coroutine
|
@asyncio.coroutine
|
||||||
def _delay_ready(self, large_servers):
|
def _delay_ready(self):
|
||||||
if len(large_servers):
|
|
||||||
# for regular accounts with < 100 guilds you will
|
|
||||||
# get a regular READY packet without the unavailable
|
|
||||||
# streaming so if this is non-empty then it's a regular
|
|
||||||
# account and it needs chunking.
|
|
||||||
yield from self.chunker(large_servers)
|
|
||||||
for server in large_servers:
|
|
||||||
self._ready_state.chunks.extend(self.chunks_needed(server))
|
|
||||||
|
|
||||||
launch = self._ready_state.launch
|
launch = self._ready_state.launch
|
||||||
while not launch.is_set():
|
while not launch.is_set():
|
||||||
# this snippet of code is basically waiting 2 seconds
|
# this snippet of code is basically waiting 2 seconds
|
||||||
@ -152,9 +145,18 @@ class ConnectionState:
|
|||||||
yield from asyncio.sleep(2)
|
yield from asyncio.sleep(2)
|
||||||
|
|
||||||
# get all the chunks
|
# get all the chunks
|
||||||
chunks = [f for f in self._ready_state.chunks if not f.done()]
|
servers = self._ready_state.servers
|
||||||
|
chunks = []
|
||||||
|
for server in servers:
|
||||||
|
chunks.extend(self.chunks_needed(server))
|
||||||
|
|
||||||
|
# we only want to request ~75 guilds per chunk request.
|
||||||
|
splits = [servers[i:i + 75] for i in range(0, len(servers), 75)]
|
||||||
|
for split in splits:
|
||||||
|
yield from self.chunker(split)
|
||||||
|
|
||||||
|
# wait for the chunks
|
||||||
if chunks:
|
if chunks:
|
||||||
yield from self.chunker(self._ready_state.servers)
|
|
||||||
yield from asyncio.wait(chunks)
|
yield from asyncio.wait(chunks)
|
||||||
|
|
||||||
# remove the state
|
# remove the state
|
||||||
@ -164,21 +166,21 @@ class ConnectionState:
|
|||||||
self.dispatch('ready')
|
self.dispatch('ready')
|
||||||
|
|
||||||
def parse_ready(self, data):
|
def parse_ready(self, data):
|
||||||
self._ready_state = ReadyState(launch=asyncio.Event(), chunks=[], servers=[])
|
self._ready_state = ReadyState(launch=asyncio.Event(), servers=[])
|
||||||
self.user = User(**data['user'])
|
self.user = User(**data['user'])
|
||||||
guilds = data.get('guilds')
|
guilds = data.get('guilds')
|
||||||
|
|
||||||
large_servers = []
|
servers = self._ready_state.servers
|
||||||
for guild in guilds:
|
for guild in guilds:
|
||||||
server = self._add_server_from_data(guild)
|
server = self._add_server_from_data(guild)
|
||||||
if server.large:
|
if server.large:
|
||||||
large_servers.append(server)
|
servers.append(server)
|
||||||
|
|
||||||
for pm in data.get('private_channels'):
|
for pm in data.get('private_channels'):
|
||||||
self._add_private_channel(PrivateChannel(id=pm['id'],
|
self._add_private_channel(PrivateChannel(id=pm['id'],
|
||||||
user=User(**pm['recipient'])))
|
user=User(**pm['recipient'])))
|
||||||
|
|
||||||
utils.create_task(self._delay_ready(large_servers), loop=self.loop)
|
utils.create_task(self._delay_ready(), loop=self.loop)
|
||||||
|
|
||||||
def parse_message_create(self, data):
|
def parse_message_create(self, data):
|
||||||
channel = self.get_channel(data.get('channel_id'))
|
channel = self.get_channel(data.get('channel_id'))
|
||||||
@ -215,7 +217,13 @@ class ConnectionState:
|
|||||||
member_id = user['id']
|
member_id = user['id']
|
||||||
member = server.get_member(member_id)
|
member = server.get_member(member_id)
|
||||||
if member is None:
|
if member is None:
|
||||||
return
|
if 'name' not in user:
|
||||||
|
# sometimes we receive 'incomplete' member data post-removal.
|
||||||
|
# skip these useless cases.
|
||||||
|
return
|
||||||
|
|
||||||
|
member = self._make_member(server, data)
|
||||||
|
server._add_member(member)
|
||||||
|
|
||||||
old_member = copy.copy(member)
|
old_member = copy.copy(member)
|
||||||
member.status = data.get('status')
|
member.status = data.get('status')
|
||||||
@ -338,8 +346,6 @@ class ConnectionState:
|
|||||||
|
|
||||||
# check if it requires chunking
|
# check if it requires chunking
|
||||||
if server.large:
|
if server.large:
|
||||||
chunks = list(self.chunks_needed(server))
|
|
||||||
|
|
||||||
if unavailable == False:
|
if unavailable == False:
|
||||||
# check if we're waiting for 'useful' READY
|
# check if we're waiting for 'useful' READY
|
||||||
# and if we are, we don't want to dispatch any
|
# and if we are, we don't want to dispatch any
|
||||||
@ -349,9 +355,7 @@ class ConnectionState:
|
|||||||
try:
|
try:
|
||||||
state = self._ready_state
|
state = self._ready_state
|
||||||
state.launch.clear()
|
state.launch.clear()
|
||||||
if chunks:
|
state.servers.append(server)
|
||||||
state.servers.append(server)
|
|
||||||
state.chunks.extend(chunks)
|
|
||||||
except AttributeError:
|
except AttributeError:
|
||||||
# the _ready_state attribute is only there during
|
# the _ready_state attribute is only there during
|
||||||
# processing of useful READY.
|
# processing of useful READY.
|
||||||
@ -362,6 +366,7 @@ class ConnectionState:
|
|||||||
# since we're not waiting for 'useful' READY we'll just
|
# since we're not waiting for 'useful' READY we'll just
|
||||||
# do the chunk request here
|
# do the chunk request here
|
||||||
yield from self.chunker(server)
|
yield from self.chunker(server)
|
||||||
|
chunks = list(self.chunks_needed(server))
|
||||||
if chunks:
|
if chunks:
|
||||||
yield from asyncio.wait(chunks)
|
yield from asyncio.wait(chunks)
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user