Rewrite chunking to work with intents.
This slows down chunking significantly for bots in a large number of guilds since it goes down from 75 guilds/request to 1 guild/request. However the logic was rewritten to fire the chunking request immediately after receiving the GUILD_CREATE rather than waiting for all the guilds in the ready stream before doing it.
This commit is contained in:
parent
51704b10cb
commit
eb641569f7
@ -370,7 +370,7 @@ class DiscordWebSocket:
|
|||||||
}
|
}
|
||||||
|
|
||||||
if state._intents is not None:
|
if state._intents is not None:
|
||||||
payload['d']['intents'] = state._intents
|
payload['d']['intents'] = state._intents.value
|
||||||
|
|
||||||
await self.call_hooks('before_identify', self.shard_id, initial=self._initial_identify)
|
await self.call_hooks('before_identify', self.shard_id, initial=self._initial_identify)
|
||||||
await self.send_as_json(payload)
|
await self.send_as_json(payload)
|
||||||
|
@ -2045,11 +2045,6 @@ class Guild(Hashable):
|
|||||||
|
|
||||||
This is a websocket operation and can be slow.
|
This is a websocket operation and can be slow.
|
||||||
|
|
||||||
.. warning::
|
|
||||||
|
|
||||||
Most bots do not need to use this. It's mainly a helper
|
|
||||||
for bots who have disabled ``guild_subscriptions``.
|
|
||||||
|
|
||||||
.. versionadded:: 1.3
|
.. versionadded:: 1.3
|
||||||
|
|
||||||
Parameters
|
Parameters
|
||||||
@ -2059,7 +2054,7 @@ class Guild(Hashable):
|
|||||||
requests all members.
|
requests all members.
|
||||||
limit: :class:`int`
|
limit: :class:`int`
|
||||||
The maximum number of members to send back. This must be
|
The maximum number of members to send back. This must be
|
||||||
a number between 1 and 1000.
|
a number between 1 and 100.
|
||||||
cache: :class:`bool`
|
cache: :class:`bool`
|
||||||
Whether to cache the members internally. This makes operations
|
Whether to cache the members internally. This makes operations
|
||||||
such as :meth:`get_member` work for those that matched.
|
such as :meth:`get_member` work for those that matched.
|
||||||
@ -2073,19 +2068,26 @@ class Guild(Hashable):
|
|||||||
-------
|
-------
|
||||||
asyncio.TimeoutError
|
asyncio.TimeoutError
|
||||||
The query timed out waiting for the members.
|
The query timed out waiting for the members.
|
||||||
|
ValueError
|
||||||
|
Invalid parameters were passed to the function
|
||||||
|
|
||||||
Returns
|
Returns
|
||||||
--------
|
--------
|
||||||
List[:class:`Member`]
|
List[:class:`Member`]
|
||||||
The list of members that have matched the query.
|
The list of members that have matched the query.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
if query is None:
|
||||||
|
if query == '':
|
||||||
|
raise ValueError('Cannot pass empty query string.')
|
||||||
|
|
||||||
|
if user_ids is None:
|
||||||
|
raise ValueError('Must pass either query or user_ids')
|
||||||
|
|
||||||
if user_ids is not None and query is not None:
|
if user_ids is not None and query is not None:
|
||||||
raise TypeError('Cannot pass both query and user_ids')
|
raise ValueError('Cannot pass both query and user_ids')
|
||||||
|
|
||||||
if user_ids is None and query is None:
|
limit = min(100, limit or 5)
|
||||||
raise TypeError('Must pass either query or user_ids')
|
|
||||||
|
|
||||||
limit = limit or 5
|
|
||||||
return await self._state.query_members(self, query=query, limit=limit, user_ids=user_ids, cache=cache)
|
return await self._state.query_members(self, query=query, limit=limit, user_ids=user_ids, cache=cache)
|
||||||
|
|
||||||
async def change_voice_state(self, *, channel, self_mute=False, self_deaf=False):
|
async def change_voice_state(self, *, channel, self_mute=False, self_deaf=False):
|
||||||
|
239
discord/state.py
239
discord/state.py
@ -25,7 +25,7 @@ DEALINGS IN THE SOFTWARE.
|
|||||||
"""
|
"""
|
||||||
|
|
||||||
import asyncio
|
import asyncio
|
||||||
from collections import deque, namedtuple, OrderedDict
|
from collections import deque, OrderedDict
|
||||||
import copy
|
import copy
|
||||||
import datetime
|
import datetime
|
||||||
import itertools
|
import itertools
|
||||||
@ -49,20 +49,22 @@ from .channel import *
|
|||||||
from .raw_models import *
|
from .raw_models import *
|
||||||
from .member import Member
|
from .member import Member
|
||||||
from .role import Role
|
from .role import Role
|
||||||
from .enums import ChannelType, try_enum, Status, Enum
|
from .enums import ChannelType, try_enum, Status
|
||||||
from . import utils
|
from . import utils
|
||||||
from .flags import Intents
|
from .flags import Intents
|
||||||
from .embeds import Embed
|
from .embeds import Embed
|
||||||
from .object import Object
|
from .object import Object
|
||||||
from .invite import Invite
|
from .invite import Invite
|
||||||
|
|
||||||
class ListenerType(Enum):
|
class ChunkRequest:
|
||||||
chunk = 0
|
__slots__ = ('guild_id', 'nonce', 'future')
|
||||||
query_members = 1
|
|
||||||
|
def __init__(self, guild_id, future):
|
||||||
|
self.guild_id = guild_id
|
||||||
|
self.nonce = os.urandom(16).hex()
|
||||||
|
self.future = future
|
||||||
|
|
||||||
Listener = namedtuple('Listener', ('type', 'future', 'predicate'))
|
|
||||||
log = logging.getLogger(__name__)
|
log = logging.getLogger(__name__)
|
||||||
ReadyState = namedtuple('ReadyState', ('launch', 'guilds'))
|
|
||||||
|
|
||||||
class ConnectionState:
|
class ConnectionState:
|
||||||
def __init__(self, *, dispatch, handlers, hooks, syncer, http, loop, **options):
|
def __init__(self, *, dispatch, handlers, hooks, syncer, http, loop, **options):
|
||||||
@ -94,7 +96,7 @@ class ConnectionState:
|
|||||||
self.allowed_mentions = allowed_mentions
|
self.allowed_mentions = allowed_mentions
|
||||||
# Only disable cache if both fetch_offline and guild_subscriptions are off.
|
# Only disable cache if both fetch_offline and guild_subscriptions are off.
|
||||||
self._cache_members = (self._fetch_offline or self.guild_subscriptions)
|
self._cache_members = (self._fetch_offline or self.guild_subscriptions)
|
||||||
self._listeners = []
|
self._chunk_requests = []
|
||||||
|
|
||||||
activity = options.get('activity', None)
|
activity = options.get('activity', None)
|
||||||
if activity:
|
if activity:
|
||||||
@ -114,7 +116,9 @@ class ConnectionState:
|
|||||||
if intents is not None:
|
if intents is not None:
|
||||||
if not isinstance(intents, Intents):
|
if not isinstance(intents, Intents):
|
||||||
raise TypeError('intents parameter must be Intent not %r' % type(intents))
|
raise TypeError('intents parameter must be Intent not %r' % type(intents))
|
||||||
intents = intents.value
|
|
||||||
|
if not intents.members and self._fetch_offline:
|
||||||
|
raise ValueError('Intents.members has be enabled to fetch offline members.')
|
||||||
|
|
||||||
self._activity = activity
|
self._activity = activity
|
||||||
self._status = status
|
self._status = status
|
||||||
@ -146,34 +150,20 @@ class ConnectionState:
|
|||||||
# to reconnect loops which cause mass allocations and deallocations.
|
# to reconnect loops which cause mass allocations and deallocations.
|
||||||
gc.collect()
|
gc.collect()
|
||||||
|
|
||||||
def get_nonce(self):
|
def process_chunk_requests(self, guild_id, nonce, members):
|
||||||
return os.urandom(16).hex()
|
|
||||||
|
|
||||||
def process_listeners(self, listener_type, argument, result):
|
|
||||||
removed = []
|
removed = []
|
||||||
for i, listener in enumerate(self._listeners):
|
for i, request in enumerate(self._chunk_requests):
|
||||||
if listener.type != listener_type:
|
future = request.future
|
||||||
continue
|
|
||||||
|
|
||||||
future = listener.future
|
|
||||||
if future.cancelled():
|
if future.cancelled():
|
||||||
removed.append(i)
|
removed.append(i)
|
||||||
continue
|
continue
|
||||||
|
|
||||||
try:
|
if request.guild_id == guild_id and request.nonce == nonce:
|
||||||
passed = listener.predicate(argument)
|
future.set_result(members)
|
||||||
except Exception as exc:
|
|
||||||
future.set_exception(exc)
|
|
||||||
removed.append(i)
|
removed.append(i)
|
||||||
else:
|
|
||||||
if passed:
|
|
||||||
future.set_result(result)
|
|
||||||
removed.append(i)
|
|
||||||
if listener.type == ListenerType.chunk:
|
|
||||||
break
|
|
||||||
|
|
||||||
for index in reversed(removed):
|
for index in reversed(removed):
|
||||||
del self._listeners[index]
|
del self._chunk_requests[index]
|
||||||
|
|
||||||
def call_handlers(self, key, *args, **kwargs):
|
def call_handlers(self, key, *args, **kwargs):
|
||||||
try:
|
try:
|
||||||
@ -307,10 +297,6 @@ class ConnectionState:
|
|||||||
self._add_guild(guild)
|
self._add_guild(guild)
|
||||||
return guild
|
return guild
|
||||||
|
|
||||||
def chunks_needed(self, guild):
|
|
||||||
for _ in range(math.ceil(guild._member_count / 1000)):
|
|
||||||
yield self.receive_chunk(guild.id)
|
|
||||||
|
|
||||||
def _get_guild_channel(self, data):
|
def _get_guild_channel(self, data):
|
||||||
channel_id = int(data['channel_id'])
|
channel_id = int(data['channel_id'])
|
||||||
try:
|
try:
|
||||||
@ -327,43 +313,20 @@ class ConnectionState:
|
|||||||
ws = self._get_websocket(guild_id) # This is ignored upstream
|
ws = self._get_websocket(guild_id) # This is ignored upstream
|
||||||
await ws.request_chunks(guild_id, query=query, limit=limit, nonce=nonce)
|
await ws.request_chunks(guild_id, query=query, limit=limit, nonce=nonce)
|
||||||
|
|
||||||
async def request_offline_members(self, guilds):
|
|
||||||
# get all the chunks
|
|
||||||
chunks = []
|
|
||||||
for guild in guilds:
|
|
||||||
chunks.extend(self.chunks_needed(guild))
|
|
||||||
|
|
||||||
# we only want to request ~75 guilds per chunk request.
|
|
||||||
splits = [guilds[i:i + 75] for i in range(0, len(guilds), 75)]
|
|
||||||
for split in splits:
|
|
||||||
await self.chunker([g.id for g in split])
|
|
||||||
|
|
||||||
# wait for the chunks
|
|
||||||
if chunks:
|
|
||||||
try:
|
|
||||||
await utils.sane_wait_for(chunks, timeout=len(chunks) * 30.0)
|
|
||||||
except asyncio.TimeoutError:
|
|
||||||
log.warning('Somehow timed out waiting for chunks.')
|
|
||||||
else:
|
|
||||||
log.info('Finished requesting guild member chunks for %d guilds.', len(guilds))
|
|
||||||
|
|
||||||
async def query_members(self, guild, query, limit, user_ids, cache):
|
async def query_members(self, guild, query, limit, user_ids, cache):
|
||||||
guild_id = guild.id
|
guild_id = guild.id
|
||||||
ws = self._get_websocket(guild_id)
|
ws = self._get_websocket(guild_id)
|
||||||
if ws is None:
|
if ws is None:
|
||||||
raise RuntimeError('Somehow do not have a websocket for this guild_id')
|
raise RuntimeError('Somehow do not have a websocket for this guild_id')
|
||||||
|
|
||||||
# Limits over 1000 cannot be supported since
|
future = self.loop.create_future()
|
||||||
# the main use case for this is guild_subscriptions being disabled
|
request = ChunkRequest(guild.id, future)
|
||||||
# and they don't receive GUILD_MEMBER events which make computing
|
self._chunk_requests.append(request)
|
||||||
# member_count impossible. The only way to fix it is by limiting
|
|
||||||
# the limit parameter to 1 to 1000.
|
|
||||||
nonce = self.get_nonce()
|
|
||||||
future = self.receive_member_query(guild_id, nonce)
|
|
||||||
try:
|
try:
|
||||||
# start the query operation
|
# start the query operation
|
||||||
await ws.request_chunks(guild_id, query=query, limit=limit, user_ids=user_ids, nonce=nonce)
|
await ws.request_chunks(guild_id, query=query, limit=limit, user_ids=user_ids, nonce=request.nonce)
|
||||||
members = await asyncio.wait_for(future, timeout=5.0)
|
members = await asyncio.wait_for(future, timeout=30.0)
|
||||||
|
|
||||||
if cache:
|
if cache:
|
||||||
for member in members:
|
for member in members:
|
||||||
@ -376,29 +339,26 @@ class ConnectionState:
|
|||||||
|
|
||||||
async def _delay_ready(self):
|
async def _delay_ready(self):
|
||||||
try:
|
try:
|
||||||
launch = self._ready_state.launch
|
|
||||||
|
|
||||||
# only real bots wait for GUILD_CREATE streaming
|
# only real bots wait for GUILD_CREATE streaming
|
||||||
if self.is_bot:
|
if self.is_bot:
|
||||||
while True:
|
while True:
|
||||||
# this snippet of code is basically waiting N seconds
|
# this snippet of code is basically waiting N seconds
|
||||||
# until the last GUILD_CREATE was sent
|
# until the last GUILD_CREATE was sent
|
||||||
try:
|
try:
|
||||||
await asyncio.wait_for(launch.wait(), timeout=self.guild_ready_timeout)
|
guild = await asyncio.wait_for(self._ready_state.get(), timeout=self.guild_ready_timeout)
|
||||||
except asyncio.TimeoutError:
|
except asyncio.TimeoutError:
|
||||||
break
|
break
|
||||||
else:
|
else:
|
||||||
launch.clear()
|
try:
|
||||||
|
if self._fetch_offline:
|
||||||
guilds = next(zip(*self._ready_state.guilds), [])
|
await asyncio.wait_for(self.chunk_guild(guild), timeout=60.0)
|
||||||
if self._fetch_offline:
|
except asyncio.TimeoutError:
|
||||||
await self.request_offline_members(guilds)
|
log.info('Timed out waiting for chunks while launching ready event.')
|
||||||
|
finally:
|
||||||
for guild, unavailable in self._ready_state.guilds:
|
if guild.unavailable is False:
|
||||||
if unavailable is False:
|
self.dispatch('guild_available', guild)
|
||||||
self.dispatch('guild_available', guild)
|
else:
|
||||||
else:
|
self.dispatch('guild_join', guild)
|
||||||
self.dispatch('guild_join', guild)
|
|
||||||
|
|
||||||
# remove the state
|
# remove the state
|
||||||
try:
|
try:
|
||||||
@ -423,16 +383,13 @@ class ConnectionState:
|
|||||||
if self._ready_task is not None:
|
if self._ready_task is not None:
|
||||||
self._ready_task.cancel()
|
self._ready_task.cancel()
|
||||||
|
|
||||||
self._ready_state = ReadyState(launch=asyncio.Event(), guilds=[])
|
self._ready_state = asyncio.Queue()
|
||||||
self.clear()
|
self.clear()
|
||||||
self.user = user = ClientUser(state=self, data=data['user'])
|
self.user = user = ClientUser(state=self, data=data['user'])
|
||||||
self._users[user.id] = user
|
self._users[user.id] = user
|
||||||
|
|
||||||
guilds = self._ready_state.guilds
|
|
||||||
for guild_data in data['guilds']:
|
for guild_data in data['guilds']:
|
||||||
guild = self._add_guild_from_data(guild_data)
|
self._add_guild_from_data(guild_data)
|
||||||
if (not self.is_bot and not guild.unavailable) or guild.large:
|
|
||||||
guilds.append((guild, guild.unavailable))
|
|
||||||
|
|
||||||
for relationship in data.get('relationships', []):
|
for relationship in data.get('relationships', []):
|
||||||
try:
|
try:
|
||||||
@ -766,14 +723,18 @@ class ConnectionState:
|
|||||||
|
|
||||||
return self._add_guild_from_data(data)
|
return self._add_guild_from_data(data)
|
||||||
|
|
||||||
|
async def chunk_guild(self, guild):
|
||||||
|
future = self.loop.create_future()
|
||||||
|
request = ChunkRequest(guild.id, future)
|
||||||
|
self._chunk_requests.append(request)
|
||||||
|
await self.chunker(guild.id, nonce=request.nonce)
|
||||||
|
await request.future
|
||||||
|
|
||||||
async def _chunk_and_dispatch(self, guild, unavailable):
|
async def _chunk_and_dispatch(self, guild, unavailable):
|
||||||
chunks = list(self.chunks_needed(guild))
|
try:
|
||||||
await self.chunker(guild.id)
|
await asyncio.wait_for(self.chunk_guild(guild), timeout=60.0)
|
||||||
if chunks:
|
except asyncio.TimeoutError:
|
||||||
try:
|
log.info('Somehow timed out waiting for chunks.')
|
||||||
await utils.sane_wait_for(chunks, timeout=len(chunks))
|
|
||||||
except asyncio.TimeoutError:
|
|
||||||
log.info('Somehow timed out waiting for chunks.')
|
|
||||||
|
|
||||||
if unavailable is False:
|
if unavailable is False:
|
||||||
self.dispatch('guild_available', guild)
|
self.dispatch('guild_available', guild)
|
||||||
@ -788,25 +749,17 @@ class ConnectionState:
|
|||||||
|
|
||||||
guild = self._get_create_guild(data)
|
guild = self._get_create_guild(data)
|
||||||
|
|
||||||
|
try:
|
||||||
|
# Notify the on_ready state, if any, that this guild is complete.
|
||||||
|
self._ready_state.put_nowait(guild)
|
||||||
|
except AttributeError:
|
||||||
|
pass
|
||||||
|
else:
|
||||||
|
# If we're waiting for the event, put the rest on hold
|
||||||
|
return
|
||||||
|
|
||||||
# check if it requires chunking
|
# check if it requires chunking
|
||||||
if guild.large:
|
if guild.large:
|
||||||
if unavailable is False:
|
|
||||||
# check if we're waiting for 'useful' READY
|
|
||||||
# and if we are, we don't want to dispatch any
|
|
||||||
# event such as guild_join or guild_available
|
|
||||||
# because we're still in the 'READY' phase. Or
|
|
||||||
# so we say.
|
|
||||||
try:
|
|
||||||
state = self._ready_state
|
|
||||||
state.launch.set()
|
|
||||||
state.guilds.append((guild, unavailable))
|
|
||||||
except AttributeError:
|
|
||||||
# the _ready_state attribute is only there during
|
|
||||||
# processing of useful READY.
|
|
||||||
pass
|
|
||||||
else:
|
|
||||||
return
|
|
||||||
|
|
||||||
# since we're not waiting for 'useful' READY we'll just
|
# since we're not waiting for 'useful' READY we'll just
|
||||||
# do the chunk request here if wanted
|
# do the chunk request here if wanted
|
||||||
if self._fetch_offline:
|
if self._fetch_offline:
|
||||||
@ -923,8 +876,8 @@ class ConnectionState:
|
|||||||
if existing is None or existing.joined_at is None:
|
if existing is None or existing.joined_at is None:
|
||||||
guild._add_member(member)
|
guild._add_member(member)
|
||||||
|
|
||||||
self.process_listeners(ListenerType.chunk, guild, len(members))
|
if data.get('chunk_index', 0) + 1 == data.get('chunk_count'):
|
||||||
self.process_listeners(ListenerType.query_members, (guild_id, data.get('nonce')), members)
|
self.process_chunk_requests(guild_id, data.get('nonce'), members)
|
||||||
|
|
||||||
def parse_guild_integrations_update(self, data):
|
def parse_guild_integrations_update(self, data):
|
||||||
guild = self._get_guild(int(data['guild_id']))
|
guild = self._get_guild(int(data['guild_id']))
|
||||||
@ -1048,21 +1001,6 @@ class ConnectionState:
|
|||||||
def create_message(self, *, channel, data):
|
def create_message(self, *, channel, data):
|
||||||
return Message(state=self, channel=channel, data=data)
|
return Message(state=self, channel=channel, data=data)
|
||||||
|
|
||||||
def receive_chunk(self, guild_id):
|
|
||||||
future = self.loop.create_future()
|
|
||||||
listener = Listener(ListenerType.chunk, future, lambda s: s.id == guild_id)
|
|
||||||
self._listeners.append(listener)
|
|
||||||
return future
|
|
||||||
|
|
||||||
def receive_member_query(self, guild_id, nonce):
|
|
||||||
def predicate(args, *, guild_id=guild_id, nonce=nonce):
|
|
||||||
return args == (guild_id, nonce)
|
|
||||||
|
|
||||||
future = self.loop.create_future()
|
|
||||||
listener = Listener(ListenerType.query_members, future, predicate)
|
|
||||||
self._listeners.append(listener)
|
|
||||||
return future
|
|
||||||
|
|
||||||
class AutoShardedConnectionState(ConnectionState):
|
class AutoShardedConnectionState(ConnectionState):
|
||||||
def __init__(self, *args, **kwargs):
|
def __init__(self, *args, **kwargs):
|
||||||
super().__init__(*args, **kwargs)
|
super().__init__(*args, **kwargs)
|
||||||
@ -1085,51 +1023,31 @@ class AutoShardedConnectionState(ConnectionState):
|
|||||||
ws = self._get_websocket(guild_id, shard_id=shard_id)
|
ws = self._get_websocket(guild_id, shard_id=shard_id)
|
||||||
await ws.request_chunks(guild_id, query=query, limit=limit, nonce=nonce)
|
await ws.request_chunks(guild_id, query=query, limit=limit, nonce=nonce)
|
||||||
|
|
||||||
async def request_offline_members(self, guilds, *, shard_id):
|
|
||||||
# get all the chunks
|
|
||||||
chunks = []
|
|
||||||
for guild in guilds:
|
|
||||||
chunks.extend(self.chunks_needed(guild))
|
|
||||||
|
|
||||||
# we only want to request ~75 guilds per chunk request.
|
|
||||||
splits = [guilds[i:i + 75] for i in range(0, len(guilds), 75)]
|
|
||||||
for split in splits:
|
|
||||||
await self.chunker([g.id for g in split], shard_id=shard_id)
|
|
||||||
|
|
||||||
# wait for the chunks
|
|
||||||
if chunks:
|
|
||||||
try:
|
|
||||||
await utils.sane_wait_for(chunks, timeout=len(chunks) * 30.0)
|
|
||||||
except asyncio.TimeoutError:
|
|
||||||
log.info('Somehow timed out waiting for chunks.')
|
|
||||||
else:
|
|
||||||
log.info('Finished requesting guild member chunks for %d guilds.', len(guilds))
|
|
||||||
|
|
||||||
async def _delay_ready(self):
|
async def _delay_ready(self):
|
||||||
await self.shards_launched.wait()
|
await self.shards_launched.wait()
|
||||||
launch = self._ready_state.launch
|
processed = []
|
||||||
while True:
|
while True:
|
||||||
# this snippet of code is basically waiting N seconds
|
# this snippet of code is basically waiting N seconds
|
||||||
# until the last GUILD_CREATE was sent
|
# until the last GUILD_CREATE was sent
|
||||||
try:
|
try:
|
||||||
await asyncio.wait_for(launch.wait(), timeout=self.guild_ready_timeout)
|
guild = await asyncio.wait_for(self._ready_state.get(), timeout=self.guild_ready_timeout)
|
||||||
except asyncio.TimeoutError:
|
except asyncio.TimeoutError:
|
||||||
break
|
break
|
||||||
else:
|
else:
|
||||||
launch.clear()
|
try:
|
||||||
|
if self._fetch_offline:
|
||||||
|
await asyncio.wait_for(self.chunk_guild(guild), timeout=60.0)
|
||||||
|
except asyncio.TimeoutError:
|
||||||
|
log.info('Timed out waiting for chunks while launching ready event.')
|
||||||
|
finally:
|
||||||
|
processed.append(guild)
|
||||||
|
if guild.unavailable is False:
|
||||||
|
self.dispatch('guild_available', guild)
|
||||||
|
else:
|
||||||
|
self.dispatch('guild_join', guild)
|
||||||
|
|
||||||
guilds = sorted(self._ready_state.guilds, key=lambda g: g[0].shard_id)
|
guilds = sorted(processed, key=lambda g: g.shard_id)
|
||||||
|
for shard_id, _ in itertools.groupby(guilds, key=lambda g: g.shard_id):
|
||||||
for shard_id, sub_guilds_info in itertools.groupby(guilds, key=lambda g: g[0].shard_id):
|
|
||||||
sub_guilds, sub_available = zip(*sub_guilds_info)
|
|
||||||
if self._fetch_offline:
|
|
||||||
await self.request_offline_members(sub_guilds, shard_id=shard_id)
|
|
||||||
|
|
||||||
for guild, unavailable in zip(sub_guilds, sub_available):
|
|
||||||
if unavailable is False:
|
|
||||||
self.dispatch('guild_available', guild)
|
|
||||||
else:
|
|
||||||
self.dispatch('guild_join', guild)
|
|
||||||
self.dispatch('shard_ready', shard_id)
|
self.dispatch('shard_ready', shard_id)
|
||||||
|
|
||||||
# remove the state
|
# remove the state
|
||||||
@ -1149,16 +1067,13 @@ class AutoShardedConnectionState(ConnectionState):
|
|||||||
|
|
||||||
def parse_ready(self, data):
|
def parse_ready(self, data):
|
||||||
if not hasattr(self, '_ready_state'):
|
if not hasattr(self, '_ready_state'):
|
||||||
self._ready_state = ReadyState(launch=asyncio.Event(), guilds=[])
|
self._ready_state = asyncio.Queue()
|
||||||
|
|
||||||
self.user = user = ClientUser(state=self, data=data['user'])
|
self.user = user = ClientUser(state=self, data=data['user'])
|
||||||
self._users[user.id] = user
|
self._users[user.id] = user
|
||||||
|
|
||||||
guilds = self._ready_state.guilds
|
|
||||||
for guild_data in data['guilds']:
|
for guild_data in data['guilds']:
|
||||||
guild = self._add_guild_from_data(guild_data)
|
self._add_guild_from_data(guild_data)
|
||||||
if guild.large:
|
|
||||||
guilds.append((guild, guild.unavailable))
|
|
||||||
|
|
||||||
if self._messages:
|
if self._messages:
|
||||||
self._update_message_references()
|
self._update_message_references()
|
||||||
|
Loading…
x
Reference in New Issue
Block a user