1 Commits

Author SHA1 Message Date
e649f356be Turn KeepAliveHandler into an asyncio Task 2021-10-07 17:22:32 +01:00
4 changed files with 37 additions and 90 deletions

View File

@ -465,6 +465,7 @@ class Context(discord.abc.Messageable, Generic[BotT]):
kwargs.pop("nonce", None) kwargs.pop("nonce", None)
kwargs.pop("stickers", None) kwargs.pop("stickers", None)
kwargs.pop("reference", None) kwargs.pop("reference", None)
kwargs.pop("delete_after", None)
kwargs.pop("mention_author", None) kwargs.pop("mention_author", None)
if not ( if not (

View File

@ -153,38 +153,38 @@ class GatewayRatelimiter:
await asyncio.sleep(delta) await asyncio.sleep(delta)
class KeepAliveHandler(threading.Thread): class KeepAliveHandler:
def __init__(self, *args: Any, **kwargs: Any) -> None: def __init__(self, *, ws: DiscordWebSocket, shard_id: int = None, interval: float = None) -> None:
ws = kwargs.pop("ws")
interval = kwargs.pop("interval", None)
shard_id = kwargs.pop("shard_id", None)
threading.Thread.__init__(self, *args, **kwargs)
self.ws: DiscordWebSocket = ws self.ws: DiscordWebSocket = ws
self._main_thread_id: int = ws.thread_id
self.interval: Optional[float] = interval
self.daemon: bool = True
self.shard_id: Optional[int] = shard_id self.shard_id: Optional[int] = shard_id
self.interval: Optional[float] = interval
self.heartbeat_timeout: float = self.ws._max_heartbeat_timeout
self.msg: str = "Keeping shard ID %s websocket alive with sequence %s." self.msg: str = "Keeping shard ID %s websocket alive with sequence %s."
self.block_msg: str = "Shard ID %s heartbeat blocked for more than %s seconds." self.block_msg: str = "Shard ID %s heartbeat blocked for more than %s seconds."
self.behind_msg: str = "Can't keep up, shard ID %s websocket is %.1fs behind." self.behind_msg: str = "Can't keep up, shard ID %s websocket is %.1fs behind."
self._stop_ev: threading.Event = threading.Event() self._stop_ev: asyncio.Event = asyncio.Event()
self._last_ack: float = time.perf_counter()
self._last_send: float = time.perf_counter() self._last_send: float = time.perf_counter()
self._last_recv: float = time.perf_counter() self._last_recv: float = time.perf_counter()
self._last_ack: float = time.perf_counter()
self.latency: float = float("inf") self.latency: float = float("inf")
self.heartbeat_timeout: float = ws._max_heartbeat_timeout
def run(self) -> None: async def run(self) -> None:
while not self._stop_ev.wait(self.interval): while True:
try:
await asyncio.wait_for(self._stop_ev.wait(), timeout=self.interval)
except asyncio.TimeoutError:
pass
else:
return
if self._last_recv + self.heartbeat_timeout < time.perf_counter(): if self._last_recv + self.heartbeat_timeout < time.perf_counter():
_log.warning( _log.warning(
"Shard ID %s has stopped responding to the gateway. Closing and restarting.", self.shard_id "Shard ID %s has stopped responding to the gateway. Closing and restarting.", self.shard_id
) )
coro = self.ws.close(4000)
f = asyncio.run_coroutine_threadsafe(coro, loop=self.ws.loop)
try: try:
f.result() await self.ws.close(4000)
except Exception: except Exception:
_log.exception("An error occurred while stopping the gateway. Ignoring.") _log.exception("An error occurred while stopping the gateway. Ignoring.")
finally: finally:
@ -193,24 +193,18 @@ class KeepAliveHandler(threading.Thread):
data = self.get_payload() data = self.get_payload()
_log.debug(self.msg, self.shard_id, data["d"]) _log.debug(self.msg, self.shard_id, data["d"])
coro = self.ws.send_heartbeat(data)
f = asyncio.run_coroutine_threadsafe(coro, loop=self.ws.loop)
try: try:
# block until sending is complete # block until sending is complete
total = 0 total = 0
while True: while True:
try: try:
f.result(10) await asyncio.wait_for(self.ws.send_heartbeat(data), timeout=10)
break break
except concurrent.futures.TimeoutError: except asyncio.TimeoutError:
total += 10 total += 10
try:
frame = sys._current_frames()[self._main_thread_id] stack = "".join(traceback.format_stack())
except KeyError: msg = f"{self.block_msg}\nLoop traceback (most recent call last):\n{stack}"
msg = self.block_msg
else:
stack = "".join(traceback.format_stack(frame))
msg = f"{self.block_msg}\nLoop thread traceback (most recent call last):\n{stack}"
_log.warning(msg, self.shard_id, total) _log.warning(msg, self.shard_id, total)
except Exception: except Exception:
@ -225,6 +219,10 @@ class KeepAliveHandler(threading.Thread):
"d": self.ws.sequence, # type: ignore "d": self.ws.sequence, # type: ignore
} }
def start(self) -> None:
self.ws.loop.create_task(self.run())
def stop(self) -> None: def stop(self) -> None:
self._stop_ev.set() self._stop_ev.set()

View File

@ -337,7 +337,7 @@ class Interaction:
self._state.store_view(view, message.id) self._state.store_view(view, message.id)
return message return message
async def delete_original_message(self, delay: Optional[float] = None) -> None: async def delete_original_message(self) -> None:
"""|coro| """|coro|
Deletes the original interaction response message. Deletes the original interaction response message.
@ -345,14 +345,6 @@ class Interaction:
This is a lower level interface to :meth:`InteractionMessage.delete` in case This is a lower level interface to :meth:`InteractionMessage.delete` in case
you do not want to fetch the message and save an HTTP request. you do not want to fetch the message and save an HTTP request.
Parameters
------------
delay: Optional[:class:`float`]
If provided, the number of seconds to wait before deleting the message.
The waiting is done in the background and deletion failures are ignored.
.. versionadded:: 2.0
Raises Raises
------- -------
HTTPException HTTPException
@ -360,8 +352,6 @@ class Interaction:
Forbidden Forbidden
Deleted a message that is not yours. Deleted a message that is not yours.
""" """
if delay is not None:
await asyncio.sleep(delay)
adapter = async_context.get() adapter = async_context.get()
await adapter.delete_original_interaction_response( await adapter.delete_original_interaction_response(
self.application_id, self.application_id,
@ -470,7 +460,6 @@ class InteractionResponse:
view: View = MISSING, view: View = MISSING,
tts: bool = False, tts: bool = False,
ephemeral: bool = False, ephemeral: bool = False,
delete_after: Optional[float] = None,
) -> None: ) -> None:
"""|coro| """|coro|
@ -494,9 +483,6 @@ class InteractionResponse:
Indicates if the message should only be visible to the user who started the interaction. Indicates if the message should only be visible to the user who started the interaction.
If a view is sent with an ephemeral message and it has no timeout set then the timeout If a view is sent with an ephemeral message and it has no timeout set then the timeout
is set to 15 minutes. is set to 15 minutes.
delete_after: Optional[:class:`float`]
The amount of seconds the bot should wait before deleting the message sent.
.. versionadded:: 2.0
Raises Raises
------- -------
@ -553,8 +539,6 @@ class InteractionResponse:
self._parent._state.store_view(view) self._parent._state.store_view(view)
self.responded_at = utils.utcnow() self.responded_at = utils.utcnow()
if delete_after is not None:
self._parent.delete_original_message(delay=delete_after)
async def edit_message( async def edit_message(
self, self,
@ -749,7 +733,7 @@ class InteractionMessage(Message):
allowed_mentions=allowed_mentions, allowed_mentions=allowed_mentions,
) )
async def delete(self, *, delay: Optional[float] = None, silent: bool = False) -> None: async def delete(self, *, delay: Optional[float] = None) -> None:
"""|coro| """|coro|
Deletes the message. Deletes the message.
@ -760,12 +744,6 @@ class InteractionMessage(Message):
If provided, the number of seconds to wait before deleting the message. If provided, the number of seconds to wait before deleting the message.
The waiting is done in the background and deletion failures are ignored. The waiting is done in the background and deletion failures are ignored.
silent: :class:`bool`
If silent is set to ``True``, the error will not be raised, it will be ignored.
This defaults to ``False`
.. versionadded:: 2.0
Raises Raises
------ ------
Forbidden Forbidden
@ -779,15 +757,12 @@ class InteractionMessage(Message):
if delay is not None: if delay is not None:
async def inner_call(delay: float = delay): async def inner_call(delay: float = delay):
await asyncio.sleep(delay)
try: try:
await self._state._interaction.delete_original_message(delay=delay) await self._state._interaction.delete_original_message()
except HTTPException: except HTTPException:
pass pass
asyncio.create_task(inner_call()) asyncio.create_task(inner_call())
else: else:
try: await self._state._interaction.delete_original_message()
await self._state._interaction.delete_original_message(delay=delay)
except Exception:
if not silent:
raise

View File

@ -717,7 +717,7 @@ class WebhookMessage(Message):
allowed_mentions=allowed_mentions, allowed_mentions=allowed_mentions,
) )
async def delete(self, *, delay: Optional[float] = None, silent: bool = False) -> None: async def delete(self, *, delay: Optional[float] = None) -> None:
"""|coro| """|coro|
Deletes the message. Deletes the message.
@ -728,12 +728,6 @@ class WebhookMessage(Message):
If provided, the number of seconds to wait before deleting the message. If provided, the number of seconds to wait before deleting the message.
The waiting is done in the background and deletion failures are ignored. The waiting is done in the background and deletion failures are ignored.
silent: :class:`bool`
If silent is set to ``True``, the error will not be raised, it will be ignored.
This defaults to ``False`
.. versionadded:: 2.0
Raises Raises
------ ------
Forbidden Forbidden
@ -747,18 +741,15 @@ class WebhookMessage(Message):
if delay is not None: if delay is not None:
async def inner_call(delay: float = delay): async def inner_call(delay: float = delay):
await asyncio.sleep(delay)
try: try:
await self._state._webhook.delete_message(self.id, delay) await self._state._webhook.delete_message(self.id)
except HTTPException: except HTTPException:
pass pass
asyncio.create_task(inner_call()) asyncio.create_task(inner_call())
else: else:
try: await self._state._webhook.delete_message(self.id)
await self._state._webhook.delete_message(self.id, delay)
except Exception:
if not silent:
raise
class BaseWebhook(Hashable): class BaseWebhook(Hashable):
@ -1279,7 +1270,6 @@ class Webhook(BaseWebhook):
view: View = MISSING, view: View = MISSING,
thread: Snowflake = MISSING, thread: Snowflake = MISSING,
wait: bool = False, wait: bool = False,
delete_after: Optional[float] = None,
) -> Optional[WebhookMessage]: ) -> Optional[WebhookMessage]:
"""|coro| """|coro|
@ -1346,11 +1336,6 @@ class Webhook(BaseWebhook):
.. versionadded:: 2.0 .. versionadded:: 2.0
delete_after: Optional[:class:`float`]
If provided, the number of seconds to wait before deleting the message.
The waiting is done in the background and deletion failures are ignored.
.. versionadded:: 2.0
Raises Raises
-------- --------
HTTPException HTTPException
@ -1432,9 +1417,6 @@ class Webhook(BaseWebhook):
message_id = None if msg is None else msg.id message_id = None if msg is None else msg.id
self._state.store_view(view, message_id) self._state.store_view(view, message_id)
if delete_after is not None and msg is not None:
await msg.delete(delay=delete_after)
return msg return msg
async def fetch_message(self, id: int) -> WebhookMessage: async def fetch_message(self, id: int) -> WebhookMessage:
@ -1588,7 +1570,7 @@ class Webhook(BaseWebhook):
self._state.store_view(view, message_id) self._state.store_view(view, message_id)
return message return message
async def delete_message(self, message_id: int, delay: Optional[float] = None, /) -> None: async def delete_message(self, message_id: int, /) -> None:
"""|coro| """|coro|
Deletes a message owned by this webhook. Deletes a message owned by this webhook.
@ -1603,12 +1585,6 @@ class Webhook(BaseWebhook):
message_id: :class:`int` message_id: :class:`int`
The message ID to delete. The message ID to delete.
delay: Optional[:class:`float`]
If provided, the number of seconds to wait before deleting the message.
The waiting is done in the background and deletion failures are ignored.
.. versionadded:: 2.0
Raises Raises
------- -------
HTTPException HTTPException
@ -1619,9 +1595,6 @@ class Webhook(BaseWebhook):
if self.token is None: if self.token is None:
raise InvalidArgument("This webhook does not have a token associated with it") raise InvalidArgument("This webhook does not have a token associated with it")
if delay is not None:
await asyncio.sleep(delay)
adapter = async_context.get() adapter = async_context.get()
await adapter.delete_webhook_message( await adapter.delete_webhook_message(
self.id, self.id,