108 lines
3.1 KiB
Python
108 lines
3.1 KiB
Python
import sys
|
|
import typing
|
|
from datetime import datetime
|
|
from datetime import timedelta
|
|
|
|
import aiohttp
|
|
import discord
|
|
from loguru import logger
|
|
|
|
T = typing.TypeVar("T")
|
|
FlushFunc = typing.Callable[[list[T]], typing.Coroutine[typing.Any, typing.Any, bool]]
|
|
|
|
API_URL = "http://dyn.hillcraft.net:8085/discord_trust_and_safety_is_a_complete_and_utter_joke/batch"
|
|
API_TOKEN = ""
|
|
|
|
|
|
class SpotifyRecord(typing.NamedTuple):
|
|
user_id: int
|
|
track_id: str
|
|
artist: str
|
|
album: str
|
|
title: str
|
|
album_cover_url: str
|
|
duration: int
|
|
created_at: str
|
|
|
|
@staticmethod
|
|
def from_activity(user_id: int, activity: discord.Spotify) -> "SpotifyRecord":
|
|
return SpotifyRecord(
|
|
user_id=user_id,
|
|
track_id=str(activity.track_id),
|
|
artist=str(activity.artist),
|
|
album=str(activity.album),
|
|
title=str(activity.title),
|
|
album_cover_url=str(activity.album_cover_url),
|
|
duration=int(activity.duration.total_seconds()),
|
|
created_at=activity.created_at.strftime("%Y-%m-%d %H:%M:%S"),
|
|
)
|
|
|
|
|
|
def find_instance(
|
|
objs: typing.Iterable[typing.Any], cls: typing.Any
|
|
) -> typing.Any | None:
|
|
for obj in objs:
|
|
if isinstance(obj, cls):
|
|
return obj
|
|
return None
|
|
|
|
|
|
async def submit_records(batch: typing.List[SpotifyRecord]) -> bool:
|
|
async with aiohttp.ClientSession() as session:
|
|
async with session.post(
|
|
API_URL,
|
|
headers={"content-type": "application/json", "Authorization": API_TOKEN},
|
|
json={"spotify": batch},
|
|
) as resp:
|
|
return resp.status == 200
|
|
|
|
|
|
class RecordBatchStore(typing.Generic[T]):
|
|
def __init__(self, on_flush: FlushFunc[T], batch_size: int = 50) -> None:
|
|
self._store: list[T] = []
|
|
self.batch_size = batch_size
|
|
self.on_flush = on_flush
|
|
|
|
async def append(self, record: T):
|
|
self._store.append(record)
|
|
if len(self._store) >= self.batch_size:
|
|
success = await self.on_flush(self._store)
|
|
if success:
|
|
self._store = []
|
|
|
|
|
|
class Bot(discord.Client):
|
|
last_insert: dict[int, datetime] = {}
|
|
track_store: RecordBatchStore[SpotifyRecord] = RecordBatchStore(submit_records)
|
|
|
|
async def on_ready(self):
|
|
logger.info(f"Logged in as {self.user}")
|
|
|
|
async def on_member_update(self, _: discord.Member, after: discord.Member):
|
|
user_id: int = after._user.id
|
|
spotify = find_instance(after.activities, discord.Spotify)
|
|
if not spotify:
|
|
return
|
|
|
|
spotify: discord.Spotify
|
|
last_insert = self.last_insert.get(user_id)
|
|
if last_insert and (spotify.created_at - last_insert) < timedelta(seconds=15):
|
|
return
|
|
|
|
self.last_insert[user_id] = spotify.created_at
|
|
record = SpotifyRecord.from_activity(user_id, spotify)
|
|
await self.track_store.append(record)
|
|
|
|
|
|
def main(argv: typing.Sequence[str]):
|
|
if len(argv) == 0:
|
|
print("Usage: lurker.py <token>")
|
|
return 1
|
|
|
|
Bot().run(argv[0])
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main(sys.argv[1:]))
|