Check for zombie connections through last received payload
The previous code would check zombie connections depending on whether HEARTBEAT_ACK was received. Unfortunately when there's exceeding backpressure the connection can terminate since the HEARTBEAT_ACK is buffered very far away despite it being there, just not received yet.
This commit is contained in:
parent
0ec72660cf
commit
6bae52f4bb
@ -119,12 +119,13 @@ class KeepAliveHandler(threading.Thread):
|
|||||||
self._stop_ev = threading.Event()
|
self._stop_ev = threading.Event()
|
||||||
self._last_ack = time.perf_counter()
|
self._last_ack = time.perf_counter()
|
||||||
self._last_send = time.perf_counter()
|
self._last_send = time.perf_counter()
|
||||||
|
self._last_recv = time.perf_counter()
|
||||||
self.latency = float('inf')
|
self.latency = float('inf')
|
||||||
self.heartbeat_timeout = ws._max_heartbeat_timeout
|
self.heartbeat_timeout = ws._max_heartbeat_timeout
|
||||||
|
|
||||||
def run(self):
|
def run(self):
|
||||||
while not self._stop_ev.wait(self.interval):
|
while not self._stop_ev.wait(self.interval):
|
||||||
if self._last_ack + self.heartbeat_timeout < time.perf_counter():
|
if self._last_recv + self.heartbeat_timeout < time.perf_counter():
|
||||||
log.warning("Shard ID %s has stopped responding to the gateway. Closing and restarting.", self.shard_id)
|
log.warning("Shard ID %s has stopped responding to the gateway. Closing and restarting.", self.shard_id)
|
||||||
coro = self.ws.close(4000)
|
coro = self.ws.close(4000)
|
||||||
f = asyncio.run_coroutine_threadsafe(coro, loop=self.ws.loop)
|
f = asyncio.run_coroutine_threadsafe(coro, loop=self.ws.loop)
|
||||||
@ -173,6 +174,9 @@ class KeepAliveHandler(threading.Thread):
|
|||||||
def stop(self):
|
def stop(self):
|
||||||
self._stop_ev.set()
|
self._stop_ev.set()
|
||||||
|
|
||||||
|
def tick(self):
|
||||||
|
self._last_recv = time.perf_counter()
|
||||||
|
|
||||||
def ack(self):
|
def ack(self):
|
||||||
ack_time = time.perf_counter()
|
ack_time = time.perf_counter()
|
||||||
self._last_ack = ack_time
|
self._last_ack = ack_time
|
||||||
@ -197,6 +201,7 @@ class VoiceKeepAliveHandler(KeepAliveHandler):
|
|||||||
def ack(self):
|
def ack(self):
|
||||||
ack_time = time.perf_counter()
|
ack_time = time.perf_counter()
|
||||||
self._last_ack = ack_time
|
self._last_ack = ack_time
|
||||||
|
self._last_recv = ack_time
|
||||||
self.latency = ack_time - self._last_send
|
self.latency = ack_time - self._last_send
|
||||||
self.recent_ack_latencies.append(self.latency)
|
self.recent_ack_latencies.append(self.latency)
|
||||||
|
|
||||||
@ -429,6 +434,9 @@ class DiscordWebSocket:
|
|||||||
if seq is not None:
|
if seq is not None:
|
||||||
self.sequence = seq
|
self.sequence = seq
|
||||||
|
|
||||||
|
if self._keep_alive:
|
||||||
|
self._keep_alive.tick()
|
||||||
|
|
||||||
if op != self.DISPATCH:
|
if op != self.DISPATCH:
|
||||||
if op == self.RECONNECT:
|
if op == self.RECONNECT:
|
||||||
# "reconnect" can only be handled by the Client
|
# "reconnect" can only be handled by the Client
|
||||||
|
Loading…
x
Reference in New Issue
Block a user