Drop support for Python 3.4 and make minimum version 3.5.2.

This commit is contained in:
Rapptz
2018-06-10 18:09:14 -04:00
parent 7eb918b19e
commit f25091efe1
35 changed files with 626 additions and 1069 deletions

View File

@ -71,7 +71,7 @@ class KeepAliveHandler(threading.Thread):
if self._last_ack + self.heartbeat_timeout < time.monotonic():
log.warn("Shard ID %s has stopped responding to the gateway. Closing and restarting." % self.shard_id)
coro = self.ws.close(4000)
f = compat.run_coroutine_threadsafe(coro, loop=self.ws.loop)
f = asyncio.run_coroutine_threadsafe(coro, loop=self.ws.loop)
try:
f.result()
@ -84,7 +84,7 @@ class KeepAliveHandler(threading.Thread):
data = self.get_payload()
log.debug(self.msg, data['d'])
coro = self.ws.send_as_json(data)
f = compat.run_coroutine_threadsafe(coro, loop=self.ws.loop)
f = asyncio.run_coroutine_threadsafe(coro, loop=self.ws.loop)
try:
# block until sending is complete
f.result()
@ -190,14 +190,13 @@ class DiscordWebSocket(websockets.client.WebSocketClientProtocol):
self._buffer = bytearray()
@classmethod
@asyncio.coroutine
def from_client(cls, client, *, shard_id=None, session=None, sequence=None, resume=False):
async def from_client(cls, client, *, shard_id=None, session=None, sequence=None, resume=False):
"""Creates a main websocket for Discord from a :class:`Client`.
This is for internal use only.
"""
gateway = yield from client.http.get_gateway()
ws = yield from websockets.connect(gateway, loop=client.loop, klass=cls)
gateway = await client.http.get_gateway()
ws = await websockets.connect(gateway, loop=client.loop, klass=cls)
# dynamically add attributes needed
ws.token = client.http.token
@ -215,19 +214,19 @@ class DiscordWebSocket(websockets.client.WebSocketClientProtocol):
log.info('Created websocket connected to %s', gateway)
# poll event for OP Hello
yield from ws.poll_event()
await ws.poll_event()
if not resume:
yield from ws.identify()
await ws.identify()
return ws
yield from ws.resume()
await ws.resume()
try:
yield from ws.ensure_open()
await ws.ensure_open()
except websockets.exceptions.ConnectionClosed:
# ws got closed so let's just do a regular IDENTIFY connect.
log.info('RESUME failed (the websocket decided to close) for Shard ID %s. Retrying.', shard_id)
return (yield from cls.from_client(client, shard_id=shard_id))
return (await cls.from_client(client, shard_id=shard_id))
else:
return ws
@ -251,13 +250,12 @@ class DiscordWebSocket(websockets.client.WebSocketClientProtocol):
A future to wait for.
"""
future = compat.create_future(self.loop)
future = self.loop.create_future()
entry = EventListener(event=event, predicate=predicate, result=result, future=future)
self._dispatch_listeners.append(entry)
return future
@asyncio.coroutine
def identify(self):
async def identify(self):
"""Sends the IDENTIFY packet."""
payload = {
'op': self.IDENTIFY,
@ -291,11 +289,10 @@ class DiscordWebSocket(websockets.client.WebSocketClientProtocol):
'afk': False
}
yield from self.send_as_json(payload)
await self.send_as_json(payload)
log.info('Shard ID %s has sent the IDENTIFY payload.', self.shard_id)
@asyncio.coroutine
def resume(self):
async def resume(self):
"""Sends the RESUME packet."""
payload = {
'op': self.RESUME,
@ -306,11 +303,10 @@ class DiscordWebSocket(websockets.client.WebSocketClientProtocol):
}
}
yield from self.send_as_json(payload)
await self.send_as_json(payload)
log.info('Shard ID %s has sent the RESUME payload.', self.shard_id)
@asyncio.coroutine
def received_message(self, msg):
async def received_message(self, msg):
self._dispatch('socket_raw_receive', msg)
if isinstance(msg, bytes):
@ -342,7 +338,7 @@ class DiscordWebSocket(websockets.client.WebSocketClientProtocol):
# so we terminate our connection and raise an
# internal exception signalling to reconnect.
log.info('Received RECONNECT opcode.')
yield from self.close()
await self.close()
raise ResumeWebSocket(self.shard_id)
if op == self.HEARTBEAT_ACK:
@ -351,27 +347,27 @@ class DiscordWebSocket(websockets.client.WebSocketClientProtocol):
if op == self.HEARTBEAT:
beat = self._keep_alive.get_payload()
yield from self.send_as_json(beat)
await self.send_as_json(beat)
return
if op == self.HELLO:
interval = data['heartbeat_interval'] / 1000.0
self._keep_alive = KeepAliveHandler(ws=self, interval=interval, shard_id=self.shard_id)
# send a heartbeat immediately
yield from self.send_as_json(self._keep_alive.get_payload())
await self.send_as_json(self._keep_alive.get_payload())
self._keep_alive.start()
return
if op == self.INVALIDATE_SESSION:
if data == True:
yield from asyncio.sleep(5.0, loop=self.loop)
yield from self.close()
await asyncio.sleep(5.0, loop=self.loop)
await self.close()
raise ResumeWebSocket(self.shard_id)
self.sequence = None
self.session_id = None
log.info('Shard ID %s session has been invalidated.' % self.shard_id)
yield from self.identify()
await self.identify()
return
if op != self.DISPATCH:
@ -435,8 +431,7 @@ class DiscordWebSocket(websockets.client.WebSocketClientProtocol):
def _can_handle_close(self, code):
return code not in (1000, 4004, 4010, 4011)
@asyncio.coroutine
def poll_event(self):
async def poll_event(self):
"""Polls for a DISPATCH event and handles the general gateway loop.
Raises
@ -445,8 +440,8 @@ class DiscordWebSocket(websockets.client.WebSocketClientProtocol):
The websocket connection was terminated for unhandled reasons.
"""
try:
msg = yield from self.recv()
yield from self.received_message(msg)
msg = await self.recv()
await self.received_message(msg)
except websockets.exceptions.ConnectionClosed as e:
if self._can_handle_close(e.code):
log.info('Websocket closed with %s (%s), attempting a reconnect.', e.code, e.reason)
@ -455,21 +450,18 @@ class DiscordWebSocket(websockets.client.WebSocketClientProtocol):
log.info('Websocket closed with %s (%s), cannot reconnect.', e.code, e.reason)
raise ConnectionClosed(e, shard_id=self.shard_id) from e
@asyncio.coroutine
def send(self, data):
async def send(self, data):
self._dispatch('socket_raw_send', data)
yield from super().send(data)
await super().send(data)
@asyncio.coroutine
def send_as_json(self, data):
async def send_as_json(self, data):
try:
yield from super().send(utils.to_json(data))
await super().send(utils.to_json(data))
except websockets.exceptions.ConnectionClosed as e:
if not self._can_handle_close(e.code):
raise ConnectionClosed(e, shard_id=self.shard_id) from e
@asyncio.coroutine
def change_presence(self, *, activity=None, status=None, afk=False, since=0.0):
async def change_presence(self, *, activity=None, status=None, afk=False, since=0.0):
if activity is not None:
if not isinstance(activity, _ActivityTag):
raise InvalidArgument('activity must be one of Game, Streaming, or Activity.')
@ -490,18 +482,16 @@ class DiscordWebSocket(websockets.client.WebSocketClientProtocol):
sent = utils.to_json(payload)
log.debug('Sending "%s" to change status', sent)
yield from self.send(sent)
await self.send(sent)
@asyncio.coroutine
def request_sync(self, guild_ids):
async def request_sync(self, guild_ids):
payload = {
'op': self.GUILD_SYNC,
'd': list(guild_ids)
}
yield from self.send_as_json(payload)
await self.send_as_json(payload)
@asyncio.coroutine
def voice_state(self, guild_id, channel_id, self_mute=False, self_deaf=False):
async def voice_state(self, guild_id, channel_id, self_mute=False, self_deaf=False):
payload = {
'op': self.VOICE_STATE,
'd': {
@ -513,14 +503,13 @@ class DiscordWebSocket(websockets.client.WebSocketClientProtocol):
}
log.debug('Updating our voice state to %s.', payload)
yield from self.send_as_json(payload)
await self.send_as_json(payload)
@asyncio.coroutine
def close_connection(self, *args, **kwargs):
async def close_connection(self, *args, **kwargs):
if self._keep_alive:
self._keep_alive.stop()
yield from super().close_connection(*args, **kwargs)
await super().close_connection(*args, **kwargs)
class DiscordVoiceWebSocket(websockets.client.WebSocketClientProtocol):
"""Implements the websocket protocol for handling voice connections.
@ -565,13 +554,11 @@ class DiscordVoiceWebSocket(websockets.client.WebSocketClientProtocol):
self.max_size = None
self._keep_alive = None
@asyncio.coroutine
def send_as_json(self, data):
async def send_as_json(self, data):
log.debug('Sending voice websocket frame: %s.', data)
yield from self.send(utils.to_json(data))
await self.send(utils.to_json(data))
@asyncio.coroutine
def resume(self):
async def resume(self):
state = self._connection
payload = {
'op': self.RESUME,
@ -581,10 +568,9 @@ class DiscordVoiceWebSocket(websockets.client.WebSocketClientProtocol):
'session_id': state.session_id
}
}
yield from self.send_as_json(payload)
await self.send_as_json(payload)
@asyncio.coroutine
def identify(self):
async def identify(self):
state = self._connection
payload = {
'op': self.IDENTIFY,
@ -595,27 +581,25 @@ class DiscordVoiceWebSocket(websockets.client.WebSocketClientProtocol):
'token': state.token
}
}
yield from self.send_as_json(payload)
await self.send_as_json(payload)
@classmethod
@asyncio.coroutine
def from_client(cls, client, *, resume=False):
async def from_client(cls, client, *, resume=False):
"""Creates a voice websocket for the :class:`VoiceClient`."""
gateway = 'wss://' + client.endpoint + '/?v=3'
ws = yield from websockets.connect(gateway, loop=client.loop, klass=cls)
ws = await websockets.connect(gateway, loop=client.loop, klass=cls)
ws.gateway = gateway
ws._connection = client
ws._max_heartbeat_timeout = 60.0
if resume:
yield from ws.resume()
await ws.resume()
else:
yield from ws.identify()
await ws.identify()
return ws
@asyncio.coroutine
def select_protocol(self, ip, port):
async def select_protocol(self, ip, port):
payload = {
'op': self.SELECT_PROTOCOL,
'd': {
@ -628,10 +612,9 @@ class DiscordVoiceWebSocket(websockets.client.WebSocketClientProtocol):
}
}
yield from self.send_as_json(payload)
await self.send_as_json(payload)
@asyncio.coroutine
def speak(self, is_speaking=True):
async def speak(self, is_speaking=True):
payload = {
'op': self.SPEAKING,
'd': {
@ -640,10 +623,9 @@ class DiscordVoiceWebSocket(websockets.client.WebSocketClientProtocol):
}
}
yield from self.send_as_json(payload)
await self.send_as_json(payload)
@asyncio.coroutine
def received_message(self, msg):
async def received_message(self, msg):
log.debug('Voice websocket frame received: %s', msg)
op = msg['op']
data = msg.get('d')
@ -652,17 +634,16 @@ class DiscordVoiceWebSocket(websockets.client.WebSocketClientProtocol):
interval = data['heartbeat_interval'] / 1000.0
self._keep_alive = VoiceKeepAliveHandler(ws=self, interval=interval)
self._keep_alive.start()
yield from self.initial_connection(data)
await self.initial_connection(data)
elif op == self.HEARTBEAT_ACK:
self._keep_alive.ack()
elif op == self.INVALIDATE_SESSION:
log.info('Voice RESUME failed.')
yield from self.identify()
await self.identify()
elif op == self.SESSION_DESCRIPTION:
yield from self.load_secret_key(data)
await self.load_secret_key(data)
@asyncio.coroutine
def initial_connection(self, data):
async def initial_connection(self, data):
state = self._connection
state.ssrc = data['ssrc']
state.voice_port = data['port']
@ -670,7 +651,7 @@ class DiscordVoiceWebSocket(websockets.client.WebSocketClientProtocol):
packet = bytearray(70)
struct.pack_into('>I', packet, 0, state.ssrc)
state.socket.sendto(packet, (state.endpoint_ip, state.voice_port))
recv = yield from self.loop.sock_recv(state.socket, 70)
recv = await self.loop.sock_recv(state.socket, 70)
log.debug('received packet in initial_connection: %s', recv)
# the ip is ascii starting at the 4th byte and ending at the first null
@ -683,28 +664,25 @@ class DiscordVoiceWebSocket(websockets.client.WebSocketClientProtocol):
state.port = struct.unpack_from('<H', recv, len(recv) - 2)[0]
log.debug('detected ip: %s port: %s', state.ip, state.port)
yield from self.select_protocol(state.ip, state.port)
await self.select_protocol(state.ip, state.port)
log.info('selected the voice protocol for use')
@asyncio.coroutine
def load_secret_key(self, data):
async def load_secret_key(self, data):
log.info('received secret key for voice connection')
self._connection.secret_key = data.get('secret_key')
yield from self.speak()
await self.speak()
@asyncio.coroutine
def poll_event(self):
async def poll_event(self):
try:
msg = yield from asyncio.wait_for(self.recv(), timeout=30.0, loop=self.loop)
yield from self.received_message(json.loads(msg))
msg = await asyncio.wait_for(self.recv(), timeout=30.0, loop=self.loop)
await self.received_message(json.loads(msg))
except websockets.exceptions.ConnectionClosed as e:
raise ConnectionClosed(e, shard_id=None) from e
@asyncio.coroutine
def close_connection(self, *args, **kwargs):
async def close_connection(self, *args, **kwargs):
if self._keep_alive:
self._keep_alive.stop()
yield from super().close_connection(*args, **kwargs)
await super().close_connection(*args, **kwargs)