From 3fe72e9eea38d9a58211cde42cfaa577ce020e2c Mon Sep 17 00:00:00 2001 From: bashonly <88596187+bashonly@users.noreply.github.com> Date: Fri, 30 May 2025 18:20:59 -0500 Subject: [PATCH] [ie/weverse] Support login with oauth refresh tokens (#13284) Closes #7806 Authored by: bashonly --- yt_dlp/extractor/weverse.py | 243 +++++++++++++++++++++++++++--------- 1 file changed, 186 insertions(+), 57 deletions(-) diff --git a/yt_dlp/extractor/weverse.py b/yt_dlp/extractor/weverse.py index 42b1189fe8..c13ab8e237 100644 --- a/yt_dlp/extractor/weverse.py +++ b/yt_dlp/extractor/weverse.py @@ -1,4 +1,5 @@ import base64 +import functools import hashlib import hmac import itertools @@ -17,99 +18,227 @@ from ..utils import ( UserNotLive, float_or_none, int_or_none, + join_nonempty, + jwt_decode_hs256, str_or_none, - traverse_obj, try_call, update_url_query, url_or_none, ) +from ..utils.traversal import require, traverse_obj class WeverseBaseIE(InfoExtractor): _NETRC_MACHINE = 'weverse' - _ACCOUNT_API_BASE = 'https://accountapi.weverse.io/web/api' + _ACCOUNT_API_BASE = 'https://accountapi.weverse.io' + _CLIENT_PLATFORM = 'WEB' + _SIGNING_KEY = b'1b9cb6378d959b45714bec49971ade22e6e24e42' + _ACCESS_TOKEN_KEY = 'we2_access_token' + _REFRESH_TOKEN_KEY = 'we2_refresh_token' + _DEVICE_ID_KEY = 'we2_device_id' _API_HEADERS = { 'Accept': 'application/json', + 'Origin': 'https://weverse.io', 'Referer': 'https://weverse.io/', - 'WEV-device-Id': str(uuid.uuid4()), } + _LOGIN_HINT_TMPL = ( + 'You can log in using your refresh token with --username "{}" --password "REFRESH_TOKEN" ' + '(replace REFRESH_TOKEN with the actual value of the "{}" cookie found in your web browser). ' + 'You can add an optional username suffix, e.g. --username "{}" , ' + 'if you need to manage multiple accounts. ') + _LOGIN_ERRORS_MAP = { + 'login_required': 'This content is only available for logged-in users. ', + 'invalid_username': '"{}" is not valid login username for this extractor. ', + 'invalid_password': ( + 'Your password is not a valid refresh token. Make sure that ' + 'you are passing the refresh token, and NOT the access token. '), + 'no_refresh_token': ( + 'Your access token has expired and there is no refresh token available. ' + 'Refresh your session/cookies in the web browser and try again. '), + 'expired_refresh_token': ( + 'Your refresh token has expired. Log in to the site again using ' + 'your web browser to get a new refresh token or export fresh cookies. '), + } + _OAUTH_PREFIX = 'oauth' + _oauth_tokens = {} + _device_id = None - def _perform_login(self, username, password): - if self._API_HEADERS.get('Authorization'): - return - - headers = { - 'x-acc-app-secret': '5419526f1c624b38b10787e5c10b2a7a', - 'x-acc-app-version': '3.3.6', - 'x-acc-language': 'en', - 'x-acc-service-id': 'weverse', - 'x-acc-trace-id': str(uuid.uuid4()), - 'x-clog-user-device-id': str(uuid.uuid4()), + @property + def _oauth_headers(self): + return { + **self._API_HEADERS, + 'X-ACC-APP-SECRET': '5419526f1c624b38b10787e5c10b2a7a', + 'X-ACC-SERVICE-ID': 'weverse', + 'X-ACC-TRACE-ID': str(uuid.uuid4()), } - valid_username = traverse_obj(self._download_json( - f'{self._ACCOUNT_API_BASE}/v2/signup/email/status', None, note='Checking username', - query={'email': username}, headers=headers, expected_status=(400, 404)), 'hasPassword') - if not valid_username: - raise ExtractorError('Invalid username provided', expected=True) - headers['content-type'] = 'application/json' + @functools.cached_property + def _oauth_cache_key(self): + username = self._get_login_info()[0] + if not username: + return 'cookies' + return join_nonempty(self._OAUTH_PREFIX, username.partition('+')[2]) + + @property + def _is_logged_in(self): + return bool(self._oauth_tokens.get(self._ACCESS_TOKEN_KEY)) + + def _access_token_is_valid(self): + response = self._download_json( + f'{self._ACCOUNT_API_BASE}/api/v1/token/validate', None, + 'Validating access token', 'Unable to valid access token', + expected_status=401, headers={ + **self._oauth_headers, + 'Authorization': f'Bearer {self._oauth_tokens[self._ACCESS_TOKEN_KEY]}', + }) + return traverse_obj(response, ('expiresIn', {int}), default=0) > 60 + + def _token_is_expired(self, key): + is_expired = jwt_decode_hs256(self._oauth_tokens[key])['exp'] - time.time() < 3600 + if key == self._REFRESH_TOKEN_KEY or not is_expired: + return is_expired + return not self._access_token_is_valid() + + def _refresh_access_token(self): + if not self._oauth_tokens.get(self._REFRESH_TOKEN_KEY): + self._report_login_error('no_refresh_token') + if self._token_is_expired(self._REFRESH_TOKEN_KEY): + self._report_login_error('expired_refresh_token') + + headers = {'Content-Type': 'application/json'} + if self._is_logged_in: + headers['Authorization'] = f'Bearer {self._oauth_tokens[self._ACCESS_TOKEN_KEY]}' + try: - auth = self._download_json( - f'{self._ACCOUNT_API_BASE}/v3/auth/token/by-credentials', None, data=json.dumps({ - 'email': username, - 'otpSessionId': 'BY_PASS', - 'password': password, - }, separators=(',', ':')).encode(), headers=headers, note='Logging in') + response = self._download_json( + f'{self._ACCOUNT_API_BASE}/api/v1/token/refresh', None, + 'Refreshing access token', 'Unable to refresh access token', + headers={**self._oauth_headers, **headers}, + data=json.dumps({ + 'refreshToken': self._oauth_tokens[self._REFRESH_TOKEN_KEY], + }, separators=(',', ':')).encode()) except ExtractorError as e: if isinstance(e.cause, HTTPError) and e.cause.status == 401: - raise ExtractorError('Invalid password provided', expected=True) + self._oauth_tokens.clear() + if self._oauth_cache_key == 'cookies': + self.cookiejar.clear(domain='.weverse.io', path='/', name=self._ACCESS_TOKEN_KEY) + self.cookiejar.clear(domain='.weverse.io', path='/', name=self._REFRESH_TOKEN_KEY) + else: + self.cache.store(self._NETRC_MACHINE, self._oauth_cache_key, self._oauth_tokens) + self._report_login_error('expired_refresh_token') raise - WeverseBaseIE._API_HEADERS['Authorization'] = f'Bearer {auth["accessToken"]}' + self._oauth_tokens.update(traverse_obj(response, { + self._ACCESS_TOKEN_KEY: ('accessToken', {str}, {require('access token')}), + self._REFRESH_TOKEN_KEY: ('refreshToken', {str}, {require('refresh token')}), + })) - def _real_initialize(self): - if self._API_HEADERS.get('Authorization'): + if self._oauth_cache_key == 'cookies': + self._set_cookie('.weverse.io', self._ACCESS_TOKEN_KEY, self._oauth_tokens[self._ACCESS_TOKEN_KEY]) + self._set_cookie('.weverse.io', self._REFRESH_TOKEN_KEY, self._oauth_tokens[self._REFRESH_TOKEN_KEY]) + else: + self.cache.store(self._NETRC_MACHINE, self._oauth_cache_key, self._oauth_tokens) + + def _get_authorization_header(self): + if not self._is_logged_in: + return {} + if self._token_is_expired(self._ACCESS_TOKEN_KEY): + self._refresh_access_token() + return {'Authorization': f'Bearer {self._oauth_tokens[self._ACCESS_TOKEN_KEY]}'} + + def _report_login_error(self, error_id): + error_msg = self._LOGIN_ERRORS_MAP[error_id] + username = self._get_login_info()[0] + + if error_id == 'invalid_username': + error_msg = error_msg.format(username) + username = f'{self._OAUTH_PREFIX}+{username}' + elif not username: + username = f'{self._OAUTH_PREFIX}+USERNAME' + + raise ExtractorError(join_nonempty( + error_msg, self._LOGIN_HINT_TMPL.format(self._OAUTH_PREFIX, self._REFRESH_TOKEN_KEY, username), + 'Or else you can u', self._login_hint(method='session_cookies')[1:], delim=''), expected=True) + + def _perform_login(self, username, password): + if self._is_logged_in: return - token = try_call(lambda: self._get_cookies('https://weverse.io/')['we2_access_token'].value) - if token: - WeverseBaseIE._API_HEADERS['Authorization'] = f'Bearer {token}' + if username.partition('+')[0] != self._OAUTH_PREFIX: + self._report_login_error('invalid_username') + + self._oauth_tokens.update(self.cache.load(self._NETRC_MACHINE, self._oauth_cache_key, default={})) + if self._is_logged_in and self._access_token_is_valid(): + return + + rt_key = self._REFRESH_TOKEN_KEY + if not self._oauth_tokens.get(rt_key) or self._token_is_expired(rt_key): + if try_call(lambda: jwt_decode_hs256(password)['scope']) != 'refresh': + self._report_login_error('invalid_password') + self._oauth_tokens[rt_key] = password + + self._refresh_access_token() + + def _real_initialize(self): + cookies = self._get_cookies('https://weverse.io/') + + if not self._device_id: + self._device_id = traverse_obj(cookies, (self._DEVICE_ID_KEY, 'value')) or str(uuid.uuid4()) + + if self._is_logged_in: + return + + self._oauth_tokens.update(traverse_obj(cookies, { + self._ACCESS_TOKEN_KEY: (self._ACCESS_TOKEN_KEY, 'value'), + self._REFRESH_TOKEN_KEY: (self._REFRESH_TOKEN_KEY, 'value'), + })) + if self._is_logged_in and not self._access_token_is_valid(): + self._refresh_access_token() def _call_api(self, ep, video_id, data=None, note='Downloading API JSON'): # Ref: https://ssl.pstatic.net/static/wevweb/2_3_2_11101725/public/static/js/2488.a09b41ff.chunk.js # From https://ssl.pstatic.net/static/wevweb/2_3_2_11101725/public/static/js/main.e206f7c1.js: - key = b'1b9cb6378d959b45714bec49971ade22e6e24e42' api_path = update_url_query(ep, { # 'gcc': 'US', 'appId': 'be4d79eb8fc7bd008ee82c8ec4ff6fd4', 'language': 'en', - 'os': 'WEB', - 'platform': 'WEB', + 'os': self._CLIENT_PLATFORM, + 'platform': self._CLIENT_PLATFORM, 'wpf': 'pc', }) - wmsgpad = int(time.time() * 1000) - wmd = base64.b64encode(hmac.HMAC( - key, f'{api_path[:255]}{wmsgpad}'.encode(), digestmod=hashlib.sha1).digest()).decode() - headers = {'Content-Type': 'application/json'} if data else {} - try: - return self._download_json( - f'https://global.apis.naver.com/weverse/wevweb{api_path}', video_id, note=note, - data=data, headers={**self._API_HEADERS, **headers}, query={ - 'wmsgpad': wmsgpad, - 'wmd': wmd, - }) - except ExtractorError as e: - if isinstance(e.cause, HTTPError) and e.cause.status == 401: - self.raise_login_required( - 'Session token has expired. Log in again or refresh cookies in browser') - elif isinstance(e.cause, HTTPError) and e.cause.status == 403: - if 'Authorization' in self._API_HEADERS: - raise ExtractorError('Your account does not have access to this content', expected=True) - self.raise_login_required() - raise + for is_retry in (False, True): + wmsgpad = int(time.time() * 1000) + wmd = base64.b64encode(hmac.HMAC( + self._SIGNING_KEY, f'{api_path[:255]}{wmsgpad}'.encode(), + digestmod=hashlib.sha1).digest()).decode() + + try: + return self._download_json( + f'https://global.apis.naver.com/weverse/wevweb{api_path}', video_id, note=note, + data=data, headers={ + **self._API_HEADERS, + **self._get_authorization_header(), + **({'Content-Type': 'application/json'} if data else {}), + 'WEV-device-Id': self._device_id, + }, query={ + 'wmsgpad': wmsgpad, + 'wmd': wmd, + }) + except ExtractorError as e: + if is_retry or not isinstance(e.cause, HTTPError): + raise + elif self._is_logged_in and e.cause.status == 401: + self._refresh_access_token() + continue + elif e.cause.status == 403: + if self._is_logged_in: + raise ExtractorError( + 'Your account does not have access to this content', expected=True) + self._report_login_error('login_required') + raise def _call_post_api(self, video_id): - path = '' if 'Authorization' in self._API_HEADERS else '/preview' + path = '' if self._is_logged_in else '/preview' return self._call_api(f'/post/v1.0/post-{video_id}{path}?fieldSet=postV1', video_id) def _get_community_id(self, channel):