From f588834b0cc61666bf1b3a139b2ac7833e06808a Mon Sep 17 00:00:00 2001 From: Rapptz Date: Fri, 20 Dec 2019 23:10:46 -0500 Subject: [PATCH 01/20] Add support for guild intents --- discord/client.py | 4 + discord/flags.py | 317 ++++++++++++++++++++++++++++++++++++++++++++- discord/gateway.py | 3 + discord/state.py | 8 ++ 4 files changed, 331 insertions(+), 1 deletion(-) diff --git a/discord/client.py b/discord/client.py index 59be489a..61fec3b8 100644 --- a/discord/client.py +++ b/discord/client.py @@ -141,6 +141,10 @@ class Client: Integer starting at ``0`` and less than :attr:`.shard_count`. shard_count: Optional[:class:`int`] The total number of shards. + intents: :class:`Intents` + A list of intents that you want to enable for the session. This is a way of + disabling and enabling certain gateway events from triggering and being sent. + Currently, if no intents are passed then you will receive all data. fetch_offline_members: :class:`bool` Indicates if :func:`.on_ready` should be delayed to fetch all offline members from the guilds the client belongs to. If this is ``False``\, then diff --git a/discord/flags.py b/discord/flags.py index 448bced2..4f5f197e 100644 --- a/discord/flags.py +++ b/discord/flags.py @@ -29,7 +29,8 @@ from .enums import UserFlags __all__ = ( 'SystemChannelFlags', 'MessageFlags', - 'PublicUserFlags' + 'PublicUserFlags', + 'Intents', ) class flag_value: @@ -327,3 +328,317 @@ class PublicUserFlags(BaseFlags): def all(self): """List[:class:`UserFlags`]: Returns all public flags the user has.""" return [public_flag for public_flag in UserFlags if self._has_flag(public_flag.value)] + + +@fill_with_flags() +class Intents(BaseFlags): + r"""Wraps up a Discord gateway intent flag. + + Similar to :class:`Permissions`\, the properties provided are two way. + You can set and retrieve individual bits using the properties as if they + were regular bools. + + To construct an object you can pass keyword arguments denoting the flags + to enable or disable. + + This is used to disable certain gateway features that are unnecessary to + run your bot. To make use of this, it is passed to the ``intents`` keyword + argument of :class:`Client`. + + A default instance of this class has everything enabled except :attr:`presences`. + + .. container:: operations + + .. describe:: x == y + + Checks if two flags are equal. + .. describe:: x != y + + Checks if two flags are not equal. + .. describe:: hash(x) + + Return the flag's hash. + .. describe:: iter(x) + + Returns an iterator of ``(name, value)`` pairs. This allows it + to be, for example, constructed as a dict or a list of pairs. + + Attributes + ----------- + value: :class:`int` + The raw value. You should query flags via the properties + rather than using this raw value. + """ + + __slots__ = () + + def __init__(self, **kwargs): + # Change the default value to everything being enabled + # except presences + bits = max(self.VALID_FLAGS.values()).bit_length() + self.value = (1 << bits) - 1 + self.presences = False + for key, value in kwargs.items(): + if key not in self.VALID_FLAGS: + raise TypeError('%r is not a valid flag name.' % key) + setattr(self, key, value) + + @classmethod + def all(cls): + """A factory method that creates a :class:`Intents` with everything enabled.""" + bits = max(cls.VALID_FLAGS.values()).bit_length() + value = (1 << bits) - 1 + self = cls.__new__(cls) + self.value = value + return self + + @classmethod + def none(cls): + """A factory method that creates a :class:`Intents` with everything disabled.""" + self = cls.__new__(cls) + self.value = self.DEFAULT_VALUE + return self + + @flag_value + def guilds(self): + """:class:`bool`: Whether guild related events are enabled. + + This corresponds to the following events: + + - :func:`on_guild_join` + - :func:`on_guild_remove` + - :func:`on_guild_available` + - :func:`on_guild_unavailable` + - :func:`on_guild_channel_update` + - :func:`on_guild_channel_create` + - :func:`on_guild_channel_delete` + - :func:`on_guild_channel_pins_update` + """ + return 1 << 0 + + @flag_value + def members(self): + """:class:`bool`: Whether guild member related events are enabled. + + This corresponds to the following events: + + - :func:`on_member_join` + - :func:`on_member_remove` + - :func:`on_member_update` (nickname, roles) + """ + return 1 << 1 + + @flag_value + def bans(self): + """:class:`bool`: Whether guild ban related events are enabled. + + This corresponds to the following events: + + - :func:`on_member_ban` + - :func:`on_member_unban` + """ + return 1 << 2 + + @flag_value + def emojis(self): + """:class:`bool`: Whether guild emoji related events are enabled. + + This corresponds to the following events: + + - :func:`on_guild_emojis_update` + """ + return 1 << 3 + + @flag_value + def integrations(self): + """:class:`bool`: Whether guild integration related events are enabled. + + This corresponds to the following events: + + - :func:`on_guild_integrations_update` + """ + return 1 << 4 + + @flag_value + def webhooks(self): + """:class:`bool`: Whether guild webhook related events are enabled. + + This corresponds to the following events: + + - :func:`on_webhooks_update` + """ + return 1 << 5 + + @flag_value + def invites(self): + """:class:`bool`: Whether guild invite related events are enabled. + + This corresponds to the following events: + + - :func:`on_invite_create` + - :func:`on_invite_delete` + """ + return 1 << 6 + + @flag_value + def voice_states(self): + """:class:`bool`: Whether guild voice state related events are enabled. + + This corresponds to the following events: + + - :func:`on_voice_state_update` + """ + return 1 << 7 + + @flag_value + def presences(self): + """:class:`bool`: Whether guild voice state related events are enabled. + + This corresponds to the following events: + + - :func:`on_member_update` (activities, status) + - :func:`on_user_update` + + .. note:: + + Currently, this requires opting in explicitly via the dev portal as well. + Bots in over 100 guilds will need to apply to Discord for approval. + """ + return 1 << 8 + + @flag_value + def messages(self): + """:class:`bool`: Whether guild and direct message related events are enabled. + + This is a shortcut to set or get both :attr:`guild_messages` and :attr:`dm_messages`. + + This corresponds to the following events: + + - :func:`on_message` (both guilds and DMs) + - :func:`on_message_update` (both guilds and DMs) + - :func:`on_message_delete` (both guilds and DMs) + - :func:`on_raw_message_delete` (both guilds and DMs) + - :func:`on_raw_message_update` (both guilds and DMs) + - :func:`on_private_channel_create` + """ + return (1 << 9) | (1 << 12) + + @flag_value + def guild_messages(self): + """:class:`bool`: Whether guild message related events are enabled. + + See also :attr:`dm_messages` for DMs or :attr:`messages` for both. + + This corresponds to the following events: + + - :func:`on_message` (only for guilds) + - :func:`on_message_update` (only for guilds) + - :func:`on_message_delete` (only for guilds) + - :func:`on_raw_message_delete` (only for guilds) + - :func:`on_raw_message_update` (only for guilds) + """ + return 1 << 9 + + @flag_value + def dm_messages(self): + """:class:`bool`: Whether direct message related events are enabled. + + See also :attr:`guild_messages` for guilds or :attr:`messages` for both. + + This corresponds to the following events: + + - :func:`on_message` (only for DMs) + - :func:`on_message_update` (only for DMs) + - :func:`on_message_delete` (only for DMs) + - :func:`on_raw_message_delete` (only for DMs) + - :func:`on_raw_message_update` (only for DMs) + - :func:`on_private_channel_create` + """ + return 1 << 12 + + @flag_value + def reactions(self): + """:class:`bool`: Whether guild and direct message reaction related events are enabled. + + This is a shortcut to set or get both :attr:`guild_reactions` and :attr:`dm_reactions`. + + This corresponds to the following events: + + - :func:`on_reaction_add` (both guilds and DMs) + - :func:`on_reaction_remove` (both guilds and DMs) + - :func:`on_reaction_clear` (both guilds and DMs) + - :func:`on_raw_reaction_add` (both guilds and DMs) + - :func:`on_raw_reaction_remove` (both guilds and DMs) + - :func:`on_raw_reaction_clear` (both guilds and DMs) + """ + return (1 << 10) | (1 << 13) + + @flag_value + def guild_reactions(self): + """:class:`bool`: Whether guild message reaction related events are enabled. + + See also :attr:`dm_reactions` for DMs or :attr:`reactions` for both. + + This corresponds to the following events: + + - :func:`on_reaction_add` (only for guilds) + - :func:`on_reaction_remove` (only for guilds) + - :func:`on_reaction_clear` (only for guilds) + - :func:`on_raw_reaction_add` (only for guilds) + - :func:`on_raw_reaction_remove` (only for guilds) + - :func:`on_raw_reaction_clear` (only for guilds) + """ + return 1 << 10 + + @flag_value + def dm_reactions(self): + """:class:`bool`: Whether direct message reaction related events are enabled. + + See also :attr:`guild_reactions` for guilds or :attr:`reactions` for both. + + This corresponds to the following events: + + - :func:`on_reaction_add` (only for DMs) + - :func:`on_reaction_remove` (only for DMs) + - :func:`on_reaction_clear` (only for DMs) + - :func:`on_raw_reaction_add` (only for DMs) + - :func:`on_raw_reaction_remove` (only for DMs) + - :func:`on_raw_reaction_clear` (only for DMs) + """ + return 1 << 13 + + @flag_value + def typing(self): + """:class:`bool`: Whether guild and direct message typing related events are enabled. + + This is a shortcut to set or get both :attr:`guild_typing` and :attr:`dm_typing`. + + This corresponds to the following events: + + - :func:`on_typing` (both guilds and DMs) + """ + return (1 << 11) | (1 << 14) + + @flag_value + def guild_typing(self): + """:class:`bool`: Whether guild and direct message typing related events are enabled. + + See also :attr:`dm_typing` for DMs or :attr:`typing` for both. + + This corresponds to the following events: + + - :func:`on_typing` (only for guilds) + """ + return 1 << 11 + + @flag_value + def dm_typing(self): + """:class:`bool`: Whether guild and direct message typing related events are enabled. + + See also :attr:`guild_typing` for guilds or :attr:`typing` for both. + + This corresponds to the following events: + + - :func:`on_typing` (only for DMs) + """ + return 1 << 14 diff --git a/discord/gateway.py b/discord/gateway.py index 81ff69b8..4d7b8f66 100644 --- a/discord/gateway.py +++ b/discord/gateway.py @@ -343,6 +343,9 @@ class DiscordWebSocket: 'afk': False } + if state._intents is not None: + payload['d']['intents'] = state._intents + await self.call_hooks('before_identify', self.shard_id, initial=self._initial_identify) await self.send_as_json(payload) log.info('Shard ID %s has sent the IDENTIFY payload.', self.shard_id) diff --git a/discord/state.py b/discord/state.py index f0e93d35..3f3c3ad7 100644 --- a/discord/state.py +++ b/discord/state.py @@ -51,6 +51,7 @@ from .member import Member from .role import Role from .enums import ChannelType, try_enum, Status, Enum from . import utils +from .flags import Intents from .embeds import Embed from .object import Object from .invite import Invite @@ -109,8 +110,15 @@ class ConnectionState: else: status = str(status) + intents = options.get('intents', None) + if intents is not None: + if not isinstance(intents, Intents): + raise TypeError('intents parameter must be Intent not %r' % type(intents)) + intents = intents.value + self._activity = activity self._status = status + self._intents = intents self.parsers = parsers = {} for attr, func in inspect.getmembers(self): From 63c454eaa0d162395e1cfabc6e25f7d7f04734b5 Mon Sep 17 00:00:00 2001 From: Rapptz Date: Thu, 27 Aug 2020 20:00:28 -0400 Subject: [PATCH 02/20] Handle gateway rate limits by using a rate limiter. With the new chunking changes this will become necessary and we don't want to disconnect from having too many outwards requests. --- discord/gateway.py | 31 +++++++++++++++++++++++++++++++ 1 file changed, 31 insertions(+) diff --git a/discord/gateway.py b/discord/gateway.py index 4d7b8f66..3d52a9e1 100644 --- a/discord/gateway.py +++ b/discord/gateway.py @@ -66,6 +66,31 @@ class WebSocketClosure(Exception): EventListener = namedtuple('EventListener', 'predicate event result future') +class GatewayRatelimiter: + def __init__(self, count=120, per=60.0): + self.max = count + self.remaining = count + self.window = 0.0 + self.per = per + + def get_delay(self): + current = time.time() + + if current > self.window + self.per: + self.remaining = self.max + + if self.remaining == self.max: + self.window = current + + if self.remaining == 0: + return self.per - (current - self.window) + + self.remaining -= 1 + if self.remaining == 0: + self.window = current + + return 0.0 + class KeepAliveHandler(threading.Thread): def __init__(self, *args, **kwargs): ws = kwargs.pop('ws', None) @@ -240,6 +265,7 @@ class DiscordWebSocket: self._zlib = zlib.decompressobj() self._buffer = bytearray() self._close_code = None + self._rate_limiter = GatewayRatelimiter() @property def open(self): @@ -532,6 +558,11 @@ class DiscordWebSocket: raise ConnectionClosed(self.socket, shard_id=self.shard_id, code=code) from None async def send(self, data): + delay = self._rate_limiter.get_delay() + if delay: + log.warning('WebSocket is ratelimited, waiting %.2f seconds', delay) + await asyncio.sleep(delay) + self._dispatch('socket_raw_send', data) await self.socket.send_str(data) From 50a951e3ec0191232212f2e54f0a4d09edb633a6 Mon Sep 17 00:00:00 2001 From: Rapptz Date: Fri, 4 Sep 2020 02:09:10 -0400 Subject: [PATCH 03/20] Change unknown cache log warnings from WARNING -> DEBUG --- discord/state.py | 38 +++++++++++++++++++------------------- 1 file changed, 19 insertions(+), 19 deletions(-) diff --git a/discord/state.py b/discord/state.py index 3f3c3ad7..4ab2de61 100644 --- a/discord/state.py +++ b/discord/state.py @@ -569,7 +569,7 @@ class ConnectionState: guild_id = utils._get_as_snowflake(data, 'guild_id') guild = self._get_guild(guild_id) if guild is None: - log.warning('PRESENCE_UPDATE referencing an unknown guild ID: %s. Discarding.', guild_id) + log.debug('PRESENCE_UPDATE referencing an unknown guild ID: %s. Discarding.', guild_id) return user = data['user'] @@ -637,14 +637,14 @@ class ConnectionState: channel._update(guild, data) self.dispatch('guild_channel_update', old_channel, channel) else: - log.warning('CHANNEL_UPDATE referencing an unknown channel ID: %s. Discarding.', channel_id) + log.debug('CHANNEL_UPDATE referencing an unknown channel ID: %s. Discarding.', channel_id) else: - log.warning('CHANNEL_UPDATE referencing an unknown guild ID: %s. Discarding.', guild_id) + log.debug('CHANNEL_UPDATE referencing an unknown guild ID: %s. Discarding.', guild_id) def parse_channel_create(self, data): factory, ch_type = _channel_factory(data['type']) if factory is None: - log.warning('CHANNEL_CREATE referencing an unknown channel type %s. Discarding.', data['type']) + log.debug('CHANNEL_CREATE referencing an unknown channel type %s. Discarding.', data['type']) return channel = None @@ -663,14 +663,14 @@ class ConnectionState: guild._add_channel(channel) self.dispatch('guild_channel_create', channel) else: - log.warning('CHANNEL_CREATE referencing an unknown guild ID: %s. Discarding.', guild_id) + log.debug('CHANNEL_CREATE referencing an unknown guild ID: %s. Discarding.', guild_id) return def parse_channel_pins_update(self, data): channel_id = int(data['channel_id']) channel = self.get_channel(channel_id) if channel is None: - log.warning('CHANNEL_PINS_UPDATE referencing an unknown channel ID: %s. Discarding.', channel_id) + log.debug('CHANNEL_PINS_UPDATE referencing an unknown channel ID: %s. Discarding.', channel_id) return last_pin = utils.parse_time(data['last_pin_timestamp']) if data['last_pin_timestamp'] else None @@ -704,7 +704,7 @@ class ConnectionState: def parse_guild_member_add(self, data): guild = self._get_guild(int(data['guild_id'])) if guild is None: - log.warning('GUILD_MEMBER_ADD referencing an unknown guild ID: %s. Discarding.', data['guild_id']) + log.debug('GUILD_MEMBER_ADD referencing an unknown guild ID: %s. Discarding.', data['guild_id']) return member = Member(guild=guild, data=data, state=self) @@ -723,14 +723,14 @@ class ConnectionState: guild._remove_member(member) self.dispatch('member_remove', member) else: - log.warning('GUILD_MEMBER_REMOVE referencing an unknown guild ID: %s. Discarding.', data['guild_id']) + log.debug('GUILD_MEMBER_REMOVE referencing an unknown guild ID: %s. Discarding.', data['guild_id']) def parse_guild_member_update(self, data): guild = self._get_guild(int(data['guild_id'])) user = data['user'] user_id = int(user['id']) if guild is None: - log.warning('GUILD_MEMBER_UPDATE referencing an unknown guild ID: %s. Discarding.', data['guild_id']) + log.debug('GUILD_MEMBER_UPDATE referencing an unknown guild ID: %s. Discarding.', data['guild_id']) return member = guild.get_member(user_id) @@ -739,12 +739,12 @@ class ConnectionState: member._update(data) self.dispatch('member_update', old_member, member) else: - log.warning('GUILD_MEMBER_UPDATE referencing an unknown member ID: %s. Discarding.', user_id) + log.debug('GUILD_MEMBER_UPDATE referencing an unknown member ID: %s. Discarding.', user_id) def parse_guild_emojis_update(self, data): guild = self._get_guild(int(data['guild_id'])) if guild is None: - log.warning('GUILD_EMOJIS_UPDATE referencing an unknown guild ID: %s. Discarding.', data['guild_id']) + log.debug('GUILD_EMOJIS_UPDATE referencing an unknown guild ID: %s. Discarding.', data['guild_id']) return before_emojis = guild.emojis @@ -830,12 +830,12 @@ class ConnectionState: guild._from_data(data) self.dispatch('guild_update', old_guild, guild) else: - log.warning('GUILD_UPDATE referencing an unknown guild ID: %s. Discarding.', data['id']) + log.debug('GUILD_UPDATE referencing an unknown guild ID: %s. Discarding.', data['id']) def parse_guild_delete(self, data): guild = self._get_guild(int(data['id'])) if guild is None: - log.warning('GUILD_DELETE referencing an unknown guild ID: %s. Discarding.', data['id']) + log.debug('GUILD_DELETE referencing an unknown guild ID: %s. Discarding.', data['id']) return if data.get('unavailable', False) and guild is not None: @@ -878,7 +878,7 @@ class ConnectionState: def parse_guild_role_create(self, data): guild = self._get_guild(int(data['guild_id'])) if guild is None: - log.warning('GUILD_ROLE_CREATE referencing an unknown guild ID: %s. Discarding.', data['guild_id']) + log.debug('GUILD_ROLE_CREATE referencing an unknown guild ID: %s. Discarding.', data['guild_id']) return role_data = data['role'] @@ -897,7 +897,7 @@ class ConnectionState: else: self.dispatch('guild_role_delete', role) else: - log.warning('GUILD_ROLE_DELETE referencing an unknown guild ID: %s. Discarding.', data['guild_id']) + log.debug('GUILD_ROLE_DELETE referencing an unknown guild ID: %s. Discarding.', data['guild_id']) def parse_guild_role_update(self, data): guild = self._get_guild(int(data['guild_id'])) @@ -910,7 +910,7 @@ class ConnectionState: role._update(role_data) self.dispatch('guild_role_update', old_role, role) else: - log.warning('GUILD_ROLE_UPDATE referencing an unknown guild ID: %s. Discarding.', data['guild_id']) + log.debug('GUILD_ROLE_UPDATE referencing an unknown guild ID: %s. Discarding.', data['guild_id']) def parse_guild_members_chunk(self, data): guild_id = int(data['guild_id']) @@ -931,14 +931,14 @@ class ConnectionState: if guild is not None: self.dispatch('guild_integrations_update', guild) else: - log.warning('GUILD_INTEGRATIONS_UPDATE referencing an unknown guild ID: %s. Discarding.', data['guild_id']) + log.debug('GUILD_INTEGRATIONS_UPDATE referencing an unknown guild ID: %s. Discarding.', data['guild_id']) def parse_webhooks_update(self, data): channel = self.get_channel(int(data['channel_id'])) if channel is not None: self.dispatch('webhooks_update', channel) else: - log.warning('WEBHOOKS_UPDATE referencing an unknown channel ID: %s. Discarding.', data['channel_id']) + log.debug('WEBHOOKS_UPDATE referencing an unknown channel ID: %s. Discarding.', data['channel_id']) def parse_voice_state_update(self, data): guild = self._get_guild(utils._get_as_snowflake(data, 'guild_id')) @@ -955,7 +955,7 @@ class ConnectionState: if member is not None: self.dispatch('voice_state_update', member, before, after) else: - log.warning('VOICE_STATE_UPDATE referencing an unknown member ID: %s. Discarding.', data['user_id']) + log.debug('VOICE_STATE_UPDATE referencing an unknown member ID: %s. Discarding.', data['user_id']) else: # in here we're either at private or group calls call = self._calls.get(channel_id) From 51704b10cb622926785efa484f71a32d9b34c7b4 Mon Sep 17 00:00:00 2001 From: Rapptz Date: Fri, 4 Sep 2020 08:02:52 -0400 Subject: [PATCH 04/20] Add more close codes that can't be handled for reconnecting. --- discord/gateway.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/discord/gateway.py b/discord/gateway.py index 3d52a9e1..719f5a99 100644 --- a/discord/gateway.py +++ b/discord/gateway.py @@ -517,7 +517,7 @@ class DiscordWebSocket: def _can_handle_close(self): code = self._close_code or self.socket.close_code - return code not in (1000, 4004, 4010, 4011) + return code not in (1000, 4004, 4010, 4011, 4012, 4013, 4014) async def poll_event(self): """Polls for a DISPATCH event and handles the general gateway loop. From eb641569f7eb96abdbb604147edbf3faf830b2f2 Mon Sep 17 00:00:00 2001 From: Rapptz Date: Fri, 4 Sep 2020 08:09:41 -0400 Subject: [PATCH 05/20] Rewrite chunking to work with intents. This slows down chunking significantly for bots in a large number of guilds since it goes down from 75 guilds/request to 1 guild/request. However the logic was rewritten to fire the chunking request immediately after receiving the GUILD_CREATE rather than waiting for all the guilds in the ready stream before doing it. --- discord/gateway.py | 2 +- discord/guild.py | 24 ++--- discord/state.py | 239 +++++++++++++++------------------------------ 3 files changed, 91 insertions(+), 174 deletions(-) diff --git a/discord/gateway.py b/discord/gateway.py index 719f5a99..889efc4d 100644 --- a/discord/gateway.py +++ b/discord/gateway.py @@ -370,7 +370,7 @@ class DiscordWebSocket: } if state._intents is not None: - payload['d']['intents'] = state._intents + payload['d']['intents'] = state._intents.value await self.call_hooks('before_identify', self.shard_id, initial=self._initial_identify) await self.send_as_json(payload) diff --git a/discord/guild.py b/discord/guild.py index 4c6013a3..d10127f2 100644 --- a/discord/guild.py +++ b/discord/guild.py @@ -2045,11 +2045,6 @@ class Guild(Hashable): This is a websocket operation and can be slow. - .. warning:: - - Most bots do not need to use this. It's mainly a helper - for bots who have disabled ``guild_subscriptions``. - .. versionadded:: 1.3 Parameters @@ -2059,7 +2054,7 @@ class Guild(Hashable): requests all members. limit: :class:`int` The maximum number of members to send back. This must be - a number between 1 and 1000. + a number between 1 and 100. cache: :class:`bool` Whether to cache the members internally. This makes operations such as :meth:`get_member` work for those that matched. @@ -2073,19 +2068,26 @@ class Guild(Hashable): ------- asyncio.TimeoutError The query timed out waiting for the members. + ValueError + Invalid parameters were passed to the function Returns -------- List[:class:`Member`] The list of members that have matched the query. """ + + if query is None: + if query == '': + raise ValueError('Cannot pass empty query string.') + + if user_ids is None: + raise ValueError('Must pass either query or user_ids') + if user_ids is not None and query is not None: - raise TypeError('Cannot pass both query and user_ids') + raise ValueError('Cannot pass both query and user_ids') - if user_ids is None and query is None: - raise TypeError('Must pass either query or user_ids') - - limit = limit or 5 + limit = min(100, limit or 5) return await self._state.query_members(self, query=query, limit=limit, user_ids=user_ids, cache=cache) async def change_voice_state(self, *, channel, self_mute=False, self_deaf=False): diff --git a/discord/state.py b/discord/state.py index 4ab2de61..85a28a5a 100644 --- a/discord/state.py +++ b/discord/state.py @@ -25,7 +25,7 @@ DEALINGS IN THE SOFTWARE. """ import asyncio -from collections import deque, namedtuple, OrderedDict +from collections import deque, OrderedDict import copy import datetime import itertools @@ -49,20 +49,22 @@ from .channel import * from .raw_models import * from .member import Member from .role import Role -from .enums import ChannelType, try_enum, Status, Enum +from .enums import ChannelType, try_enum, Status from . import utils from .flags import Intents from .embeds import Embed from .object import Object from .invite import Invite -class ListenerType(Enum): - chunk = 0 - query_members = 1 +class ChunkRequest: + __slots__ = ('guild_id', 'nonce', 'future') + + def __init__(self, guild_id, future): + self.guild_id = guild_id + self.nonce = os.urandom(16).hex() + self.future = future -Listener = namedtuple('Listener', ('type', 'future', 'predicate')) log = logging.getLogger(__name__) -ReadyState = namedtuple('ReadyState', ('launch', 'guilds')) class ConnectionState: def __init__(self, *, dispatch, handlers, hooks, syncer, http, loop, **options): @@ -94,7 +96,7 @@ class ConnectionState: self.allowed_mentions = allowed_mentions # Only disable cache if both fetch_offline and guild_subscriptions are off. self._cache_members = (self._fetch_offline or self.guild_subscriptions) - self._listeners = [] + self._chunk_requests = [] activity = options.get('activity', None) if activity: @@ -114,7 +116,9 @@ class ConnectionState: if intents is not None: if not isinstance(intents, Intents): raise TypeError('intents parameter must be Intent not %r' % type(intents)) - intents = intents.value + + if not intents.members and self._fetch_offline: + raise ValueError('Intents.members has be enabled to fetch offline members.') self._activity = activity self._status = status @@ -146,34 +150,20 @@ class ConnectionState: # to reconnect loops which cause mass allocations and deallocations. gc.collect() - def get_nonce(self): - return os.urandom(16).hex() - - def process_listeners(self, listener_type, argument, result): + def process_chunk_requests(self, guild_id, nonce, members): removed = [] - for i, listener in enumerate(self._listeners): - if listener.type != listener_type: - continue - - future = listener.future + for i, request in enumerate(self._chunk_requests): + future = request.future if future.cancelled(): removed.append(i) continue - try: - passed = listener.predicate(argument) - except Exception as exc: - future.set_exception(exc) + if request.guild_id == guild_id and request.nonce == nonce: + future.set_result(members) removed.append(i) - else: - if passed: - future.set_result(result) - removed.append(i) - if listener.type == ListenerType.chunk: - break for index in reversed(removed): - del self._listeners[index] + del self._chunk_requests[index] def call_handlers(self, key, *args, **kwargs): try: @@ -307,10 +297,6 @@ class ConnectionState: self._add_guild(guild) return guild - def chunks_needed(self, guild): - for _ in range(math.ceil(guild._member_count / 1000)): - yield self.receive_chunk(guild.id) - def _get_guild_channel(self, data): channel_id = int(data['channel_id']) try: @@ -327,43 +313,20 @@ class ConnectionState: ws = self._get_websocket(guild_id) # This is ignored upstream await ws.request_chunks(guild_id, query=query, limit=limit, nonce=nonce) - async def request_offline_members(self, guilds): - # get all the chunks - chunks = [] - for guild in guilds: - chunks.extend(self.chunks_needed(guild)) - - # we only want to request ~75 guilds per chunk request. - splits = [guilds[i:i + 75] for i in range(0, len(guilds), 75)] - for split in splits: - await self.chunker([g.id for g in split]) - - # wait for the chunks - if chunks: - try: - await utils.sane_wait_for(chunks, timeout=len(chunks) * 30.0) - except asyncio.TimeoutError: - log.warning('Somehow timed out waiting for chunks.') - else: - log.info('Finished requesting guild member chunks for %d guilds.', len(guilds)) - async def query_members(self, guild, query, limit, user_ids, cache): guild_id = guild.id ws = self._get_websocket(guild_id) if ws is None: raise RuntimeError('Somehow do not have a websocket for this guild_id') - # Limits over 1000 cannot be supported since - # the main use case for this is guild_subscriptions being disabled - # and they don't receive GUILD_MEMBER events which make computing - # member_count impossible. The only way to fix it is by limiting - # the limit parameter to 1 to 1000. - nonce = self.get_nonce() - future = self.receive_member_query(guild_id, nonce) + future = self.loop.create_future() + request = ChunkRequest(guild.id, future) + self._chunk_requests.append(request) + try: # start the query operation - await ws.request_chunks(guild_id, query=query, limit=limit, user_ids=user_ids, nonce=nonce) - members = await asyncio.wait_for(future, timeout=5.0) + await ws.request_chunks(guild_id, query=query, limit=limit, user_ids=user_ids, nonce=request.nonce) + members = await asyncio.wait_for(future, timeout=30.0) if cache: for member in members: @@ -376,29 +339,26 @@ class ConnectionState: async def _delay_ready(self): try: - launch = self._ready_state.launch - # only real bots wait for GUILD_CREATE streaming if self.is_bot: while True: # this snippet of code is basically waiting N seconds # until the last GUILD_CREATE was sent try: - await asyncio.wait_for(launch.wait(), timeout=self.guild_ready_timeout) + guild = await asyncio.wait_for(self._ready_state.get(), timeout=self.guild_ready_timeout) except asyncio.TimeoutError: break else: - launch.clear() - - guilds = next(zip(*self._ready_state.guilds), []) - if self._fetch_offline: - await self.request_offline_members(guilds) - - for guild, unavailable in self._ready_state.guilds: - if unavailable is False: - self.dispatch('guild_available', guild) - else: - self.dispatch('guild_join', guild) + try: + if self._fetch_offline: + await asyncio.wait_for(self.chunk_guild(guild), timeout=60.0) + except asyncio.TimeoutError: + log.info('Timed out waiting for chunks while launching ready event.') + finally: + if guild.unavailable is False: + self.dispatch('guild_available', guild) + else: + self.dispatch('guild_join', guild) # remove the state try: @@ -423,16 +383,13 @@ class ConnectionState: if self._ready_task is not None: self._ready_task.cancel() - self._ready_state = ReadyState(launch=asyncio.Event(), guilds=[]) + self._ready_state = asyncio.Queue() self.clear() self.user = user = ClientUser(state=self, data=data['user']) self._users[user.id] = user - guilds = self._ready_state.guilds for guild_data in data['guilds']: - guild = self._add_guild_from_data(guild_data) - if (not self.is_bot and not guild.unavailable) or guild.large: - guilds.append((guild, guild.unavailable)) + self._add_guild_from_data(guild_data) for relationship in data.get('relationships', []): try: @@ -766,14 +723,18 @@ class ConnectionState: return self._add_guild_from_data(data) + async def chunk_guild(self, guild): + future = self.loop.create_future() + request = ChunkRequest(guild.id, future) + self._chunk_requests.append(request) + await self.chunker(guild.id, nonce=request.nonce) + await request.future + async def _chunk_and_dispatch(self, guild, unavailable): - chunks = list(self.chunks_needed(guild)) - await self.chunker(guild.id) - if chunks: - try: - await utils.sane_wait_for(chunks, timeout=len(chunks)) - except asyncio.TimeoutError: - log.info('Somehow timed out waiting for chunks.') + try: + await asyncio.wait_for(self.chunk_guild(guild), timeout=60.0) + except asyncio.TimeoutError: + log.info('Somehow timed out waiting for chunks.') if unavailable is False: self.dispatch('guild_available', guild) @@ -788,25 +749,17 @@ class ConnectionState: guild = self._get_create_guild(data) + try: + # Notify the on_ready state, if any, that this guild is complete. + self._ready_state.put_nowait(guild) + except AttributeError: + pass + else: + # If we're waiting for the event, put the rest on hold + return + # check if it requires chunking if guild.large: - if unavailable is False: - # check if we're waiting for 'useful' READY - # and if we are, we don't want to dispatch any - # event such as guild_join or guild_available - # because we're still in the 'READY' phase. Or - # so we say. - try: - state = self._ready_state - state.launch.set() - state.guilds.append((guild, unavailable)) - except AttributeError: - # the _ready_state attribute is only there during - # processing of useful READY. - pass - else: - return - # since we're not waiting for 'useful' READY we'll just # do the chunk request here if wanted if self._fetch_offline: @@ -923,8 +876,8 @@ class ConnectionState: if existing is None or existing.joined_at is None: guild._add_member(member) - self.process_listeners(ListenerType.chunk, guild, len(members)) - self.process_listeners(ListenerType.query_members, (guild_id, data.get('nonce')), members) + if data.get('chunk_index', 0) + 1 == data.get('chunk_count'): + self.process_chunk_requests(guild_id, data.get('nonce'), members) def parse_guild_integrations_update(self, data): guild = self._get_guild(int(data['guild_id'])) @@ -1048,21 +1001,6 @@ class ConnectionState: def create_message(self, *, channel, data): return Message(state=self, channel=channel, data=data) - def receive_chunk(self, guild_id): - future = self.loop.create_future() - listener = Listener(ListenerType.chunk, future, lambda s: s.id == guild_id) - self._listeners.append(listener) - return future - - def receive_member_query(self, guild_id, nonce): - def predicate(args, *, guild_id=guild_id, nonce=nonce): - return args == (guild_id, nonce) - - future = self.loop.create_future() - listener = Listener(ListenerType.query_members, future, predicate) - self._listeners.append(listener) - return future - class AutoShardedConnectionState(ConnectionState): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) @@ -1085,51 +1023,31 @@ class AutoShardedConnectionState(ConnectionState): ws = self._get_websocket(guild_id, shard_id=shard_id) await ws.request_chunks(guild_id, query=query, limit=limit, nonce=nonce) - async def request_offline_members(self, guilds, *, shard_id): - # get all the chunks - chunks = [] - for guild in guilds: - chunks.extend(self.chunks_needed(guild)) - - # we only want to request ~75 guilds per chunk request. - splits = [guilds[i:i + 75] for i in range(0, len(guilds), 75)] - for split in splits: - await self.chunker([g.id for g in split], shard_id=shard_id) - - # wait for the chunks - if chunks: - try: - await utils.sane_wait_for(chunks, timeout=len(chunks) * 30.0) - except asyncio.TimeoutError: - log.info('Somehow timed out waiting for chunks.') - else: - log.info('Finished requesting guild member chunks for %d guilds.', len(guilds)) - async def _delay_ready(self): await self.shards_launched.wait() - launch = self._ready_state.launch + processed = [] while True: # this snippet of code is basically waiting N seconds # until the last GUILD_CREATE was sent try: - await asyncio.wait_for(launch.wait(), timeout=self.guild_ready_timeout) + guild = await asyncio.wait_for(self._ready_state.get(), timeout=self.guild_ready_timeout) except asyncio.TimeoutError: break else: - launch.clear() + try: + if self._fetch_offline: + await asyncio.wait_for(self.chunk_guild(guild), timeout=60.0) + except asyncio.TimeoutError: + log.info('Timed out waiting for chunks while launching ready event.') + finally: + processed.append(guild) + if guild.unavailable is False: + self.dispatch('guild_available', guild) + else: + self.dispatch('guild_join', guild) - guilds = sorted(self._ready_state.guilds, key=lambda g: g[0].shard_id) - - for shard_id, sub_guilds_info in itertools.groupby(guilds, key=lambda g: g[0].shard_id): - sub_guilds, sub_available = zip(*sub_guilds_info) - if self._fetch_offline: - await self.request_offline_members(sub_guilds, shard_id=shard_id) - - for guild, unavailable in zip(sub_guilds, sub_available): - if unavailable is False: - self.dispatch('guild_available', guild) - else: - self.dispatch('guild_join', guild) + guilds = sorted(processed, key=lambda g: g.shard_id) + for shard_id, _ in itertools.groupby(guilds, key=lambda g: g.shard_id): self.dispatch('shard_ready', shard_id) # remove the state @@ -1149,16 +1067,13 @@ class AutoShardedConnectionState(ConnectionState): def parse_ready(self, data): if not hasattr(self, '_ready_state'): - self._ready_state = ReadyState(launch=asyncio.Event(), guilds=[]) + self._ready_state = asyncio.Queue() self.user = user = ClientUser(state=self, data=data['user']) self._users[user.id] = user - guilds = self._ready_state.guilds for guild_data in data['guilds']: - guild = self._add_guild_from_data(guild_data) - if guild.large: - guilds.append((guild, guild.unavailable)) + self._add_guild_from_data(guild_data) if self._messages: self._update_message_references() From fd5faac42b534888f63fa7eca39bd0972acec2f9 Mon Sep 17 00:00:00 2001 From: Rapptz Date: Fri, 4 Sep 2020 08:32:02 -0400 Subject: [PATCH 06/20] Handle user updates within GUILD_MEMBER_UPDATE --- discord/member.py | 23 +++++++++++++---------- discord/state.py | 6 +++++- 2 files changed, 18 insertions(+), 11 deletions(-) diff --git a/discord/member.py b/discord/member.py index 1fb11e63..0b89d59f 100644 --- a/discord/member.py +++ b/discord/member.py @@ -266,17 +266,20 @@ class Member(discord.abc.Messageable, _BaseUser): self._client_status[None] = data['status'] if len(user) > 1: - u = self._user - original = (u.name, u.avatar, u.discriminator) - # These keys seem to always be available - modified = (user['username'], user['avatar'], user['discriminator']) - if original != modified: - to_return = User._copy(self._user) - u.name, u.avatar, u.discriminator = modified - # Signal to dispatch on_user_update - return to_return, u + return self._update_inner_user(user) return False + def _update_inner_user(self, user): + u = self._user + original = (u.name, u.avatar, u.discriminator) + # These keys seem to always be available + modified = (user['username'], user['avatar'], user['discriminator']) + if original != modified: + to_return = User._copy(self._user) + u.name, u.avatar, u.discriminator = modified + # Signal to dispatch on_user_update + return to_return, u + @property def status(self): """:class:`Status`: The member's overall status. If the value is unknown, then it will be a :class:`str` instead.""" @@ -433,7 +436,7 @@ class Member(discord.abc.Messageable, _BaseUser): guild = self.guild if len(self._roles) == 0: return guild.default_role - + return max(guild.get_role(rid) or guild.default_role for rid in self._roles) @property diff --git a/discord/state.py b/discord/state.py index 85a28a5a..4e89a501 100644 --- a/discord/state.py +++ b/discord/state.py @@ -692,8 +692,12 @@ class ConnectionState: member = guild.get_member(user_id) if member is not None: - old_member = copy.copy(member) + old_member = Member._copy(member) member._update(data) + user_update = member._update_inner_user(user) + if user_update: + self.dispatch('user_update', user_update[0], user_update[1]) + self.dispatch('member_update', old_member, member) else: log.debug('GUILD_MEMBER_UPDATE referencing an unknown member ID: %s. Discarding.', user_id) From 37760e16dda778f0924ca77777b008931dc5fffa Mon Sep 17 00:00:00 2001 From: Rapptz Date: Fri, 4 Sep 2020 23:39:54 -0400 Subject: [PATCH 07/20] All guilds require chunking if opting into it --- discord/state.py | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/discord/state.py b/discord/state.py index 4e89a501..94427c48 100644 --- a/discord/state.py +++ b/discord/state.py @@ -763,12 +763,9 @@ class ConnectionState: return # check if it requires chunking - if guild.large: - # since we're not waiting for 'useful' READY we'll just - # do the chunk request here if wanted - if self._fetch_offline: - asyncio.ensure_future(self._chunk_and_dispatch(guild, unavailable), loop=self.loop) - return + if self._fetch_offline: + asyncio.ensure_future(self._chunk_and_dispatch(guild, unavailable), loop=self.loop) + return # Dispatch available if newly available if unavailable is False: From e6fddbdbe7e83e751d05c3cb69636419588a755c Mon Sep 17 00:00:00 2001 From: Rapptz Date: Sat, 5 Sep 2020 00:28:05 -0400 Subject: [PATCH 08/20] Heartbeats bypass the rate limits for gateway --- discord/gateway.py | 17 ++++++++++++++--- 1 file changed, 14 insertions(+), 3 deletions(-) diff --git a/discord/gateway.py b/discord/gateway.py index 889efc4d..28604f3c 100644 --- a/discord/gateway.py +++ b/discord/gateway.py @@ -67,7 +67,8 @@ class WebSocketClosure(Exception): EventListener = namedtuple('EventListener', 'predicate event result future') class GatewayRatelimiter: - def __init__(self, count=120, per=60.0): + def __init__(self, count=110, per=60.0): + # The default is 110 to give room for at least 10 heartbeats per minute self.max = count self.remaining = count self.window = 0.0 @@ -128,7 +129,7 @@ class KeepAliveHandler(threading.Thread): data = self.get_payload() log.debug(self.msg, self.shard_id, data['d']) - coro = self.ws.send_as_json(data) + coro = self.ws.send_heartbeat(data) f = asyncio.run_coroutine_threadsafe(coro, loop=self.ws.loop) try: # block until sending is complete @@ -560,7 +561,7 @@ class DiscordWebSocket: async def send(self, data): delay = self._rate_limiter.get_delay() if delay: - log.warning('WebSocket is ratelimited, waiting %.2f seconds', delay) + log.warning('WebSocket in shard ID %s is ratelimited, waiting %.2f seconds', self.shard_id, delay) await asyncio.sleep(delay) self._dispatch('socket_raw_send', data) @@ -573,6 +574,14 @@ class DiscordWebSocket: if not self._can_handle_close(): raise ConnectionClosed(self.socket, shard_id=self.shard_id) from exc + async def send_heartbeat(self, data): + # This bypasses the rate limit handling code since it has a higher priority + try: + await self.socket.send_str(utils.to_json(data)) + except RuntimeError as exc: + if not self._can_handle_close(): + raise ConnectionClosed(self.socket, shard_id=self.shard_id) from exc + async def change_presence(self, *, activity=None, status=None, afk=False, since=0.0): if activity is not None: if not isinstance(activity, BaseActivity): @@ -700,6 +709,8 @@ class DiscordVoiceWebSocket: log.debug('Sending voice websocket frame: %s.', data) await self.ws.send_str(utils.to_json(data)) + send_heartbeat = send_as_json + async def resume(self): state = self._connection payload = { From 5837ad08046202490587868e9cd3bd368dc7746e Mon Sep 17 00:00:00 2001 From: Rapptz Date: Sat, 5 Sep 2020 12:20:58 -0400 Subject: [PATCH 09/20] Use a lock for the gateway rate limiter. This will allow for higher concurrency in AutoSharded situations where I can mostly "fire and forget" the chunk requests. --- discord/gateway.py | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/discord/gateway.py b/discord/gateway.py index 28604f3c..303d0305 100644 --- a/discord/gateway.py +++ b/discord/gateway.py @@ -73,6 +73,8 @@ class GatewayRatelimiter: self.remaining = count self.window = 0.0 self.per = per + self.lock = asyncio.Lock() + self.shard_id = None def get_delay(self): current = time.time() @@ -92,6 +94,14 @@ class GatewayRatelimiter: return 0.0 + async def block(self): + async with self.lock: + delta = self.get_delay() + if delta: + log.warning('WebSocket in shard ID %s is ratelimited, waiting %.2f seconds', self.shard_id, delta) + await asyncio.sleep(delta) + + class KeepAliveHandler(threading.Thread): def __init__(self, *args, **kwargs): ws = kwargs.pop('ws', None) @@ -291,6 +301,7 @@ class DiscordWebSocket: ws.call_hooks = client._connection.call_hooks ws._initial_identify = initial ws.shard_id = shard_id + ws._rate_limiter.shard_id = shard_id ws.shard_count = client._connection.shard_count ws.session_id = session ws.sequence = sequence @@ -559,11 +570,7 @@ class DiscordWebSocket: raise ConnectionClosed(self.socket, shard_id=self.shard_id, code=code) from None async def send(self, data): - delay = self._rate_limiter.get_delay() - if delay: - log.warning('WebSocket in shard ID %s is ratelimited, waiting %.2f seconds', self.shard_id, delay) - await asyncio.sleep(delay) - + await self._rate_limiter.block() self._dispatch('socket_raw_send', data) await self.socket.send_str(data) From fdbe0c4f57b0ece79aef8d40bc28ebaee47a6fc7 Mon Sep 17 00:00:00 2001 From: Rapptz Date: Sat, 5 Sep 2020 13:48:02 -0400 Subject: [PATCH 10/20] Maximize concurrency when chunking on AutoSharded clients --- discord/state.py | 66 +++++++++++++++++++++++++++++++++--------------- 1 file changed, 45 insertions(+), 21 deletions(-) diff --git a/discord/state.py b/discord/state.py index 94427c48..6a0ce92a 100644 --- a/discord/state.py +++ b/discord/state.py @@ -341,6 +341,7 @@ class ConnectionState: try: # only real bots wait for GUILD_CREATE streaming if self.is_bot: + states = [] while True: # this snippet of code is basically waiting N seconds # until the last GUILD_CREATE was sent @@ -349,17 +350,26 @@ class ConnectionState: except asyncio.TimeoutError: break else: - try: - if self._fetch_offline: - await asyncio.wait_for(self.chunk_guild(guild), timeout=60.0) - except asyncio.TimeoutError: - log.info('Timed out waiting for chunks while launching ready event.') - finally: + if self._fetch_offline: + future = await self.chunk_guild(guild, wait=False) + states.append((guild, future)) + else: if guild.unavailable is False: self.dispatch('guild_available', guild) else: self.dispatch('guild_join', guild) + for guild, future in states: + try: + await asyncio.wait_for(future, timeout=5.0) + except asyncio.TimeoutError: + log.warning('Shard ID %s timed out waiting for chunks for guild_id %s.', guild.shard_id, guild.id) + + if guild.unavailable is False: + self.dispatch('guild_available', guild) + else: + self.dispatch('guild_join', guild) + # remove the state try: del self._ready_state @@ -727,12 +737,14 @@ class ConnectionState: return self._add_guild_from_data(data) - async def chunk_guild(self, guild): + async def chunk_guild(self, guild, *, wait=True): future = self.loop.create_future() request = ChunkRequest(guild.id, future) self._chunk_requests.append(request) await self.chunker(guild.id, nonce=request.nonce) - await request.future + if wait: + await request.future + return request.future async def _chunk_and_dispatch(self, guild, unavailable): try: @@ -1035,20 +1047,32 @@ class AutoShardedConnectionState(ConnectionState): except asyncio.TimeoutError: break else: - try: - if self._fetch_offline: - await asyncio.wait_for(self.chunk_guild(guild), timeout=60.0) - except asyncio.TimeoutError: - log.info('Timed out waiting for chunks while launching ready event.') - finally: - processed.append(guild) - if guild.unavailable is False: - self.dispatch('guild_available', guild) - else: - self.dispatch('guild_join', guild) + if self._fetch_offline: + # Chunk the guild in the background while we wait for GUILD_CREATE streaming + future = asyncio.ensure_future(self.chunk_guild(guild)) + else: + future = self.loop.create_future() + future.set_result(True) + + processed.append((guild, future)) + + guilds = sorted(processed, key=lambda g: g[0].shard_id) + for shard_id, info in itertools.groupby(guilds, key=lambda g: g[0].shard_id): + children, futures = zip(*info) + # 110 reqs/minute w/ 1 req/guild plus some buffer + timeout = 61 * (len(children) / 110) + try: + await utils.sane_wait_for(futures, timeout=timeout) + except asyncio.TimeoutError: + log.warning('Shard ID %s failed to wait for chunks (timeout=%.2f) for %d guilds', self.shard_id, + timeout, + len(guilds)) + for guild in children: + if guild.unavailable is False: + self.dispatch('guild_available', guild) + else: + self.dispatch('guild_join', guild) - guilds = sorted(processed, key=lambda g: g.shard_id) - for shard_id, _ in itertools.groupby(guilds, key=lambda g: g.shard_id): self.dispatch('shard_ready', shard_id) # remove the state From 82fa967f3c32ec2a2dc377a49490faff74d5f629 Mon Sep 17 00:00:00 2001 From: Rapptz Date: Sat, 5 Sep 2020 13:57:50 -0400 Subject: [PATCH 11/20] Speed up chunking for guilds with presence intent enabled --- discord/state.py | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/discord/state.py b/discord/state.py index 6a0ce92a..182d93a8 100644 --- a/discord/state.py +++ b/discord/state.py @@ -297,6 +297,10 @@ class ConnectionState: self._add_guild(guild) return guild + def _guild_needs_chunking(self, guild): + # If presences are enabled then we get back the old guild.large behaviour + return self._fetch_offline and not guild.chunked and not (self._intents.presences and not guild.large) + def _get_guild_channel(self, data): channel_id = int(data['channel_id']) try: @@ -350,7 +354,8 @@ class ConnectionState: except asyncio.TimeoutError: break else: - if self._fetch_offline: + + if self._guild_needs_chunking(guild): future = await self.chunk_guild(guild, wait=False) states.append((guild, future)) else: @@ -775,7 +780,7 @@ class ConnectionState: return # check if it requires chunking - if self._fetch_offline: + if self._guild_needs_chunking(guild): asyncio.ensure_future(self._chunk_and_dispatch(guild, unavailable), loop=self.loop) return @@ -1047,7 +1052,7 @@ class AutoShardedConnectionState(ConnectionState): except asyncio.TimeoutError: break else: - if self._fetch_offline: + if self._guild_needs_chunking(guild): # Chunk the guild in the background while we wait for GUILD_CREATE streaming future = asyncio.ensure_future(self.chunk_guild(guild)) else: From 2129ae29be6478a2edc5e63b29da30d84946fd02 Mon Sep 17 00:00:00 2001 From: Rapptz Date: Sat, 5 Sep 2020 21:28:33 -0400 Subject: [PATCH 12/20] Check for zombie connections through last received payload The previous code would check zombie connections depending on whether HEARTBEAT_ACK was received. Unfortunately when there's exceeding backpressure the connection can terminate since the HEARTBEAT_ACK is buffered very far away despite it being there, just not received yet. --- discord/gateway.py | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/discord/gateway.py b/discord/gateway.py index 303d0305..9db98301 100644 --- a/discord/gateway.py +++ b/discord/gateway.py @@ -119,12 +119,13 @@ class KeepAliveHandler(threading.Thread): self._stop_ev = threading.Event() self._last_ack = time.perf_counter() self._last_send = time.perf_counter() + self._last_recv = time.perf_counter() self.latency = float('inf') self.heartbeat_timeout = ws._max_heartbeat_timeout def run(self): while not self._stop_ev.wait(self.interval): - if self._last_ack + self.heartbeat_timeout < time.perf_counter(): + if self._last_recv + self.heartbeat_timeout < time.perf_counter(): log.warning("Shard ID %s has stopped responding to the gateway. Closing and restarting.", self.shard_id) coro = self.ws.close(4000) f = asyncio.run_coroutine_threadsafe(coro, loop=self.ws.loop) @@ -173,6 +174,9 @@ class KeepAliveHandler(threading.Thread): def stop(self): self._stop_ev.set() + def tick(self): + self._last_recv = time.perf_counter() + def ack(self): ack_time = time.perf_counter() self._last_ack = ack_time @@ -197,6 +201,7 @@ class VoiceKeepAliveHandler(KeepAliveHandler): def ack(self): ack_time = time.perf_counter() self._last_ack = ack_time + self._last_recv = ack_time self.latency = ack_time - self._last_send self.recent_ack_latencies.append(self.latency) @@ -429,6 +434,9 @@ class DiscordWebSocket: if seq is not None: self.sequence = seq + if self._keep_alive: + self._keep_alive.tick() + if op != self.DISPATCH: if op == self.RECONNECT: # "reconnect" can only be handled by the Client From 81bfdea9dfcb96021b131eeff302d3147da670ae Mon Sep 17 00:00:00 2001 From: Rapptz Date: Sat, 5 Sep 2020 21:33:45 -0400 Subject: [PATCH 13/20] Maximize the amount of concurrency while chunking. In order to reduce our amount of backpressure we need to limit the amount of concurrent chunk requests we can have so the gateway buffer has some time to breathe. --- discord/state.py | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/discord/state.py b/discord/state.py index 182d93a8..220f3d82 100644 --- a/discord/state.py +++ b/discord/state.py @@ -354,7 +354,6 @@ class ConnectionState: except asyncio.TimeoutError: break else: - if self._guild_needs_chunking(guild): future = await self.chunk_guild(guild, wait=False) states.append((guild, future)) @@ -1044,6 +1043,8 @@ class AutoShardedConnectionState(ConnectionState): async def _delay_ready(self): await self.shards_launched.wait() processed = [] + max_concurrency = len(self.shard_ids) * 2 + current_bucket = [] while True: # this snippet of code is basically waiting N seconds # until the last GUILD_CREATE was sent @@ -1053,8 +1054,19 @@ class AutoShardedConnectionState(ConnectionState): break else: if self._guild_needs_chunking(guild): + log.debug('Guild ID %d requires chunking, will be done in the background.', guild.id) + if len(current_bucket) >= max_concurrency: + try: + await utils.sane_wait_for(current_bucket, timeout=max_concurrency * 10) + except asyncio.TimeoutError: + fmt = 'Shard ID %s failed to wait for chunks from a sub-bucket with length %d' + log.warning(fmt, self.shard_id, len(current_bucket)) + finally: + current_bucket = [] + # Chunk the guild in the background while we wait for GUILD_CREATE streaming future = asyncio.ensure_future(self.chunk_guild(guild)) + current_bucket.append(future) else: future = self.loop.create_future() future.set_result(True) From 65f591705dba7eb36c3096d74f83da7f209819a5 Mon Sep 17 00:00:00 2001 From: Rapptz Date: Mon, 7 Sep 2020 21:50:00 -0400 Subject: [PATCH 14/20] Fix timeouts due to hitting the gateway rate limit --- discord/state.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/discord/state.py b/discord/state.py index 220f3d82..8b51b135 100644 --- a/discord/state.py +++ b/discord/state.py @@ -747,7 +747,7 @@ class ConnectionState: self._chunk_requests.append(request) await self.chunker(guild.id, nonce=request.nonce) if wait: - await request.future + return await request.future return request.future async def _chunk_and_dispatch(self, guild, unavailable): @@ -1057,7 +1057,7 @@ class AutoShardedConnectionState(ConnectionState): log.debug('Guild ID %d requires chunking, will be done in the background.', guild.id) if len(current_bucket) >= max_concurrency: try: - await utils.sane_wait_for(current_bucket, timeout=max_concurrency * 10) + await utils.sane_wait_for(current_bucket, timeout=max_concurrency * 70.0) except asyncio.TimeoutError: fmt = 'Shard ID %s failed to wait for chunks from a sub-bucket with length %d' log.warning(fmt, self.shard_id, len(current_bucket)) From 41fd2740cb955babc38fa16752ec28c3b689fecd Mon Sep 17 00:00:00 2001 From: Rapptz Date: Mon, 7 Sep 2020 22:14:54 -0400 Subject: [PATCH 15/20] Explicitly disable the members presence by default --- discord/flags.py | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/discord/flags.py b/discord/flags.py index 4f5f197e..ccb75d68 100644 --- a/discord/flags.py +++ b/discord/flags.py @@ -345,7 +345,8 @@ class Intents(BaseFlags): run your bot. To make use of this, it is passed to the ``intents`` keyword argument of :class:`Client`. - A default instance of this class has everything enabled except :attr:`presences`. + A default instance of this class has everything enabled except :attr:`presences` + and :attr:`members`. .. container:: operations @@ -374,10 +375,11 @@ class Intents(BaseFlags): def __init__(self, **kwargs): # Change the default value to everything being enabled - # except presences + # except presences and members bits = max(self.VALID_FLAGS.values()).bit_length() self.value = (1 << bits) - 1 self.presences = False + self.members = False for key, value in kwargs.items(): if key not in self.VALID_FLAGS: raise TypeError('%r is not a valid flag name.' % key) @@ -425,6 +427,12 @@ class Intents(BaseFlags): - :func:`on_member_join` - :func:`on_member_remove` - :func:`on_member_update` (nickname, roles) + - :func:`on_user_update` + + .. note:: + + Currently, this requires opting in explicitly via the dev portal as well. + Bots in over 100 guilds will need to apply to Discord for verification. """ return 1 << 1 @@ -497,12 +505,11 @@ class Intents(BaseFlags): This corresponds to the following events: - :func:`on_member_update` (activities, status) - - :func:`on_user_update` .. note:: Currently, this requires opting in explicitly via the dev portal as well. - Bots in over 100 guilds will need to apply to Discord for approval. + Bots in over 100 guilds will need to apply to Discord for verification. """ return 1 << 8 From a293d87c7776e6e4927500e3dbe5b00a57e0c51f Mon Sep 17 00:00:00 2001 From: Rapptz Date: Mon, 7 Sep 2020 22:15:24 -0400 Subject: [PATCH 16/20] Add versionadded for intents enum --- discord/flags.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/discord/flags.py b/discord/flags.py index ccb75d68..bc2a52ed 100644 --- a/discord/flags.py +++ b/discord/flags.py @@ -348,6 +348,8 @@ class Intents(BaseFlags): A default instance of this class has everything enabled except :attr:`presences` and :attr:`members`. + .. versionadded:: 1.5 + .. container:: operations .. describe:: x == y From cb211c36bd7abc8e2c85c9be9b368a3d35319dfd Mon Sep 17 00:00:00 2001 From: Rapptz Date: Thu, 10 Sep 2020 05:26:35 -0400 Subject: [PATCH 17/20] Fix Client.request_offline_members no longer working --- discord/client.py | 13 +++++++++---- discord/shard.py | 12 ++++++++---- 2 files changed, 17 insertions(+), 8 deletions(-) diff --git a/discord/client.py b/discord/client.py index 61fec3b8..f496d865 100644 --- a/discord/client.py +++ b/discord/client.py @@ -392,6 +392,10 @@ class Client: in the guild is larger than 250. You can check if a guild is large if :attr:`.Guild.large` is ``True``. + .. warning:: + + This method is deprecated. + Parameters ----------- \*guilds: :class:`.Guild` @@ -400,12 +404,13 @@ class Client: Raises ------- :exc:`.InvalidArgument` - If any guild is unavailable or not large in the collection. + If any guild is unavailable in the collection. """ - if any(not g.large or g.unavailable for g in guilds): - raise InvalidArgument('An unavailable or non-large guild was passed.') + if any(g.unavailable for g in guilds): + raise InvalidArgument('An unavailable guild was passed.') - await self._connection.request_offline_members(guilds) + for guild in guilds: + await self._connection.chunk_guild(guild) # hooks diff --git a/discord/shard.py b/discord/shard.py index f6320678..50a39379 100644 --- a/discord/shard.py +++ b/discord/shard.py @@ -346,6 +346,10 @@ class AutoShardedClient(Client): in the guild is larger than 250. You can check if a guild is large if :attr:`Guild.large` is ``True``. + .. warning:: + + This method is deprecated. + Parameters ----------- \*guilds: :class:`Guild` @@ -354,15 +358,15 @@ class AutoShardedClient(Client): Raises ------- InvalidArgument - If any guild is unavailable or not large in the collection. + If any guild is unavailable in the collection. """ - if any(not g.large or g.unavailable for g in guilds): + if any(g.unavailable for g in guilds): raise InvalidArgument('An unavailable or non-large guild was passed.') _guilds = sorted(guilds, key=lambda g: g.shard_id) for shard_id, sub_guilds in itertools.groupby(_guilds, key=lambda g: g.shard_id): - sub_guilds = list(sub_guilds) - await self._connection.request_offline_members(sub_guilds, shard_id=shard_id) + for guild in sub_guilds: + await self._connection.chunk_guild(guild) async def launch_shard(self, gateway, shard_id, *, initial=False): try: From 009a961006ff517dc6f652baea188ebc5d4194fd Mon Sep 17 00:00:00 2001 From: Rapptz Date: Thu, 10 Sep 2020 05:56:48 -0400 Subject: [PATCH 18/20] Add Guild.chunk and deprecated Client.request_offline_members --- discord/client.py | 3 ++- discord/guild.py | 26 ++++++++++++++++----- discord/shard.py | 3 ++- discord/state.py | 57 ++++++++++++++++++++++++++--------------------- 4 files changed, 57 insertions(+), 32 deletions(-) diff --git a/discord/client.py b/discord/client.py index f496d865..23809424 100644 --- a/discord/client.py +++ b/discord/client.py @@ -379,6 +379,7 @@ class Client: print('Ignoring exception in {}'.format(event_method), file=sys.stderr) traceback.print_exc() + @utils.deprecated('Guild.chunk') async def request_offline_members(self, *guilds): r"""|coro| @@ -394,7 +395,7 @@ class Client: .. warning:: - This method is deprecated. + This method is deprecated. Use :meth:`Guild.chunk` instead. Parameters ----------- diff --git a/discord/guild.py b/discord/guild.py index d10127f2..b1f85c76 100644 --- a/discord/guild.py +++ b/discord/guild.py @@ -2037,6 +2037,23 @@ class Guild(Hashable): return Widget(state=self._state, data=data) + async def chunk(self, *, cache=True): + """|coro| + + Requests all members that belong to this guild. In order to use this, + :meth:`Intents.members` must be enabled. + + This is a websocket operation and can be slow. + + .. versionadded:: 1.5 + + Parameters + ----------- + cache: :class:`bool` + Whether to cache the members as well. + """ + return await self._state.chunk_guild(self, cache=cache) + async def query_members(self, query=None, *, limit=5, user_ids=None, cache=True): """|coro| @@ -2049,16 +2066,15 @@ class Guild(Hashable): Parameters ----------- - query: :class:`str` - The string that the username's start with. An empty string - requests all members. + query: Optional[:class:`str`] + The string that the username's start with. limit: :class:`int` The maximum number of members to send back. This must be - a number between 1 and 100. + a number between 5 and 100. cache: :class:`bool` Whether to cache the members internally. This makes operations such as :meth:`get_member` work for those that matched. - user_ids: List[:class:`int`] + user_ids: Optional[List[:class:`int`]] List of user IDs to search for. If the user ID is not in the guild then it won't be returned. .. versionadded:: 1.4 diff --git a/discord/shard.py b/discord/shard.py index 50a39379..2ed7724d 100644 --- a/discord/shard.py +++ b/discord/shard.py @@ -333,6 +333,7 @@ class AutoShardedClient(Client): """Mapping[int, :class:`ShardInfo`]: Returns a mapping of shard IDs to their respective info object.""" return { shard_id: ShardInfo(parent, self.shard_count) for shard_id, parent in self.__shards.items() } + @utils.deprecated('Guild.chunk') async def request_offline_members(self, *guilds): r"""|coro| @@ -348,7 +349,7 @@ class AutoShardedClient(Client): .. warning:: - This method is deprecated. + This method is deprecated. Use :meth:`Guild.chunk` instead. Parameters ----------- diff --git a/discord/state.py b/discord/state.py index 8b51b135..23200da3 100644 --- a/discord/state.py +++ b/discord/state.py @@ -57,12 +57,28 @@ from .object import Object from .invite import Invite class ChunkRequest: - __slots__ = ('guild_id', 'nonce', 'future') - - def __init__(self, guild_id, future): + def __init__(self, guild_id, future, resolver, *, cache=True): self.guild_id = guild_id + self.resolver = resolver + self.cache = cache self.nonce = os.urandom(16).hex() self.future = future + self.buffer = [] # List[Member] + + def add_members(self, members): + self.buffer.extend(members) + if self.cache: + guild = self.resolver(self.guild_id) + if guild is None: + return + + for member in members: + existing = guild.get_member(member.id) + if existing is None or existing.joined_at is None: + guild._add_member(member) + + def done(self): + self.future.set_result(self.buffer) log = logging.getLogger(__name__) @@ -150,7 +166,7 @@ class ConnectionState: # to reconnect loops which cause mass allocations and deallocations. gc.collect() - def process_chunk_requests(self, guild_id, nonce, members): + def process_chunk_requests(self, guild_id, nonce, members, complete): removed = [] for i, request in enumerate(self._chunk_requests): future = request.future @@ -159,8 +175,10 @@ class ConnectionState: continue if request.guild_id == guild_id and request.nonce == nonce: - future.set_result(members) - removed.append(i) + request.add_members(members) + if complete: + request.done() + removed.append(i) for index in reversed(removed): del self._chunk_requests[index] @@ -324,19 +342,13 @@ class ConnectionState: raise RuntimeError('Somehow do not have a websocket for this guild_id') future = self.loop.create_future() - request = ChunkRequest(guild.id, future) + request = ChunkRequest(guild.id, future, self._get_guild, cache=cache) self._chunk_requests.append(request) try: # start the query operation await ws.request_chunks(guild_id, query=query, limit=limit, user_ids=user_ids, nonce=request.nonce) - members = await asyncio.wait_for(future, timeout=30.0) - - if cache: - for member in members: - guild._add_member(member) - - return members + return await asyncio.wait_for(future, timeout=30.0) except asyncio.TimeoutError: log.warning('Timed out waiting for chunks with query %r and limit %d for guild_id %d', query, limit, guild_id) raise @@ -741,9 +753,10 @@ class ConnectionState: return self._add_guild_from_data(data) - async def chunk_guild(self, guild, *, wait=True): + async def chunk_guild(self, guild, *, wait=True, cache=None): + cache = cache or self._cache_members future = self.loop.create_future() - request = ChunkRequest(guild.id, future) + request = ChunkRequest(guild.id, future, self._get_guild, cache=cache) self._chunk_requests.append(request) await self.chunker(guild.id, nonce=request.nonce) if wait: @@ -887,14 +900,8 @@ class ConnectionState: guild = self._get_guild(guild_id) members = [Member(guild=guild, data=member, state=self) for member in data.get('members', [])] log.debug('Processed a chunk for %s members in guild ID %s.', len(members), guild_id) - if self._cache_members: - for member in members: - existing = guild.get_member(member.id) - if existing is None or existing.joined_at is None: - guild._add_member(member) - - if data.get('chunk_index', 0) + 1 == data.get('chunk_count'): - self.process_chunk_requests(guild_id, data.get('nonce'), members) + complete = data.get('chunk_index', 0) + 1 == data.get('chunk_count') + self.process_chunk_requests(guild_id, data.get('nonce'), members, complete) def parse_guild_integrations_update(self, data): guild = self._get_guild(int(data['guild_id'])) @@ -1069,7 +1076,7 @@ class AutoShardedConnectionState(ConnectionState): current_bucket.append(future) else: future = self.loop.create_future() - future.set_result(True) + future.set_result([]) processed.append((guild, future)) From 61ec62da113fff0f57f37260b1f34bb39d67907c Mon Sep 17 00:00:00 2001 From: Rapptz Date: Thu, 10 Sep 2020 05:58:24 -0400 Subject: [PATCH 19/20] Don't cache members during guild start up if cache is disabled. This is mainly a half-implemented commit. There are a few more places where cache consistency is necessary. In the future there will probably be a member cache policy enum that will be used and cache consistency will be tackled in part of that larger refactoring. --- discord/guild.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/discord/guild.py b/discord/guild.py index b1f85c76..7443d976 100644 --- a/discord/guild.py +++ b/discord/guild.py @@ -305,9 +305,12 @@ class Guild(Hashable): self._rules_channel_id = utils._get_as_snowflake(guild, 'rules_channel_id') self._public_updates_channel_id = utils._get_as_snowflake(guild, 'public_updates_channel_id') + cache_members = self._state._cache_members + self_id = self._state.self_id for mdata in guild.get('members', []): member = Member(data=mdata, guild=self, state=state) - self._add_member(member) + if cache_members or member.id == self_id: + self._add_member(member) self._sync(guild) self._large = None if member_count is None else self._member_count >= 250 From 77b0ddca7c0c0d2836c4b32b00fee59ee9b7f8f7 Mon Sep 17 00:00:00 2001 From: Rapptz Date: Thu, 10 Sep 2020 06:03:26 -0400 Subject: [PATCH 20/20] Raise if member intent is not enabled --- discord/guild.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/discord/guild.py b/discord/guild.py index 7443d976..3cdbefa1 100644 --- a/discord/guild.py +++ b/discord/guild.py @@ -2054,7 +2054,16 @@ class Guild(Hashable): ----------- cache: :class:`bool` Whether to cache the members as well. + + Raises + ------- + ClientException + The members intent is not enabled. """ + + if not self._state._intents.members: + raise ClientException('Intents.members must be enabled to use this.') + return await self._state.chunk_guild(self, cache=cache) async def query_members(self, query=None, *, limit=5, user_ids=None, cache=True):