Maximize concurrency when chunking on AutoSharded clients
This commit is contained in:
		| @@ -347,6 +347,7 @@ class ConnectionState: | ||||
|         try: | ||||
|             # only real bots wait for GUILD_CREATE streaming | ||||
|             if self.is_bot: | ||||
|                 states = [] | ||||
|                 while True: | ||||
|                     # this snippet of code is basically waiting N seconds | ||||
|                     # until the last GUILD_CREATE was sent | ||||
| @@ -355,17 +356,26 @@ class ConnectionState: | ||||
|                     except asyncio.TimeoutError: | ||||
|                         break | ||||
|                     else: | ||||
|                         try: | ||||
|                             if self._fetch_offline: | ||||
|                                 await asyncio.wait_for(self.chunk_guild(guild), timeout=60.0) | ||||
|                         except asyncio.TimeoutError: | ||||
|                             log.info('Timed out waiting for chunks while launching ready event.') | ||||
|                         finally: | ||||
|                         if self._fetch_offline: | ||||
|                             future = await self.chunk_guild(guild, wait=False) | ||||
|                             states.append((guild, future)) | ||||
|                         else: | ||||
|                             if guild.unavailable is False: | ||||
|                                 self.dispatch('guild_available', guild) | ||||
|                             else: | ||||
|                                 self.dispatch('guild_join', guild) | ||||
|  | ||||
|                 for guild, future in states: | ||||
|                     try: | ||||
|                         await asyncio.wait_for(future, timeout=5.0) | ||||
|                     except asyncio.TimeoutError: | ||||
|                         log.warning('Shard ID %s timed out waiting for chunks for guild_id %s.', guild.shard_id, guild.id) | ||||
|  | ||||
|                     if guild.unavailable is False: | ||||
|                         self.dispatch('guild_available', guild) | ||||
|                     else: | ||||
|                         self.dispatch('guild_join', guild) | ||||
|  | ||||
|             # remove the state | ||||
|             try: | ||||
|                 del self._ready_state | ||||
| @@ -733,12 +743,14 @@ class ConnectionState: | ||||
|  | ||||
|         return self._add_guild_from_data(data) | ||||
|  | ||||
|     async def chunk_guild(self, guild): | ||||
|     async def chunk_guild(self, guild, *, wait=True): | ||||
|         future = self.loop.create_future() | ||||
|         request = ChunkRequest(guild.id, future) | ||||
|         self._chunk_requests.append(request) | ||||
|         await self.chunker(guild.id, nonce=request.nonce) | ||||
|         await request.future | ||||
|         if wait: | ||||
|             await request.future | ||||
|         return request.future | ||||
|  | ||||
|     async def _chunk_and_dispatch(self, guild, unavailable): | ||||
|         try: | ||||
| @@ -1041,20 +1053,32 @@ class AutoShardedConnectionState(ConnectionState): | ||||
|             except asyncio.TimeoutError: | ||||
|                 break | ||||
|             else: | ||||
|                 try: | ||||
|                     if self._fetch_offline: | ||||
|                         await asyncio.wait_for(self.chunk_guild(guild), timeout=60.0) | ||||
|                 except asyncio.TimeoutError: | ||||
|                     log.info('Timed out waiting for chunks while launching ready event.') | ||||
|                 finally: | ||||
|                     processed.append(guild) | ||||
|                     if guild.unavailable is False: | ||||
|                         self.dispatch('guild_available', guild) | ||||
|                     else: | ||||
|                         self.dispatch('guild_join', guild) | ||||
|                 if self._fetch_offline: | ||||
|                     # Chunk the guild in the background while we wait for GUILD_CREATE streaming | ||||
|                     future = asyncio.ensure_future(self.chunk_guild(guild)) | ||||
|                 else: | ||||
|                     future = self.loop.create_future() | ||||
|                     future.set_result(True) | ||||
|  | ||||
|                 processed.append((guild, future)) | ||||
|  | ||||
|         guilds = sorted(processed, key=lambda g: g[0].shard_id) | ||||
|         for shard_id, info in itertools.groupby(guilds, key=lambda g: g[0].shard_id): | ||||
|             children, futures = zip(*info) | ||||
|             # 110 reqs/minute w/ 1 req/guild plus some buffer | ||||
|             timeout = 61 * (len(children) / 110) | ||||
|             try: | ||||
|                 await utils.sane_wait_for(futures, timeout=timeout) | ||||
|             except asyncio.TimeoutError: | ||||
|                 log.warning('Shard ID %s failed to wait for chunks (timeout=%.2f) for %d guilds', self.shard_id, | ||||
|                                                                                                   timeout, | ||||
|                                                                                                   len(guilds)) | ||||
|             for guild in children: | ||||
|                 if guild.unavailable is False: | ||||
|                     self.dispatch('guild_available', guild) | ||||
|                 else: | ||||
|                     self.dispatch('guild_join', guild) | ||||
|  | ||||
|         guilds = sorted(processed, key=lambda g: g.shard_id) | ||||
|         for shard_id, _ in itertools.groupby(guilds, key=lambda g: g.shard_id): | ||||
|             self.dispatch('shard_ready', shard_id) | ||||
|  | ||||
|         # remove the state | ||||
|   | ||||
		Reference in New Issue
	
	Block a user