Use a proper type for the event queue
This commit is contained in:
parent
b8154e365f
commit
988505a97f
@ -42,6 +42,27 @@ class EventType:
|
|||||||
resume = 1
|
resume = 1
|
||||||
identify = 2
|
identify = 2
|
||||||
|
|
||||||
|
class EventItem:
|
||||||
|
__slots__ = ('type', 'shard', 'error')
|
||||||
|
|
||||||
|
def __init__(self, etype, shard, error):
|
||||||
|
self.type = etype
|
||||||
|
self.shard = shard
|
||||||
|
self.error = error
|
||||||
|
|
||||||
|
def __lt__(self, other):
|
||||||
|
if not isinstance(other, EventItem):
|
||||||
|
return NotImplemented
|
||||||
|
return self.type < other.type
|
||||||
|
|
||||||
|
def __eq__(self, other):
|
||||||
|
if not isinstance(other, EventItem):
|
||||||
|
return NotImplemented
|
||||||
|
return self.type == other.type
|
||||||
|
|
||||||
|
def __hash__(self):
|
||||||
|
return hash(self.type)
|
||||||
|
|
||||||
class Shard:
|
class Shard:
|
||||||
def __init__(self, ws, client):
|
def __init__(self, ws, client):
|
||||||
self.ws = ws
|
self.ws = ws
|
||||||
@ -64,10 +85,10 @@ class Shard:
|
|||||||
await self.ws.poll_event()
|
await self.ws.poll_event()
|
||||||
except ReconnectWebSocket as e:
|
except ReconnectWebSocket as e:
|
||||||
etype = EventType.resume if e.resume else EventType.identify
|
etype = EventType.resume if e.resume else EventType.identify
|
||||||
self._queue.put_nowait((etype, self, e))
|
self._queue.put_nowait(EventItem(etype, self, e))
|
||||||
break
|
break
|
||||||
except ConnectionClosed as e:
|
except ConnectionClosed as e:
|
||||||
self._queue.put_nowait((EventType.close, self, e))
|
self._queue.put_nowait(EventItem(EventType.close, self, e))
|
||||||
break
|
break
|
||||||
|
|
||||||
async def reconnect(self, exc):
|
async def reconnect(self, exc):
|
||||||
@ -220,23 +241,15 @@ class AutoShardedClient(Client):
|
|||||||
if shard_id != last_shard_id:
|
if shard_id != last_shard_id:
|
||||||
await asyncio.sleep(5.0)
|
await asyncio.sleep(5.0)
|
||||||
|
|
||||||
# shards_to_wait_for = []
|
|
||||||
# for shard in self.shards.values():
|
|
||||||
# shard.complete_pending_reads()
|
|
||||||
# shards_to_wait_for.append(shard.wait())
|
|
||||||
|
|
||||||
# # wait for all pending tasks to finish
|
|
||||||
# await utils.sane_wait_for(shards_to_wait_for, timeout=300.0)
|
|
||||||
|
|
||||||
async def _connect(self):
|
async def _connect(self):
|
||||||
await self.launch_shards()
|
await self.launch_shards()
|
||||||
|
|
||||||
while True:
|
while True:
|
||||||
etype, shard, exc = await self._queue.get()
|
item = await self._queue.get()
|
||||||
if etype == EventType.close:
|
if item.type == EventType.close:
|
||||||
raise exc
|
raise item.error
|
||||||
elif etype in (EventType.identify, EventType.resume):
|
elif item.type in (EventType.identify, EventType.resume):
|
||||||
await shard.reconnect(exc)
|
await item.shard.reconnect(item.error)
|
||||||
|
|
||||||
async def close(self):
|
async def close(self):
|
||||||
"""|coro|
|
"""|coro|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user