1
0
mirror of https://github.com/yt-dlp/yt-dlp.git synced 2025-06-07 12:18:51 +00:00

[ie/weverse] Support login with oauth refresh tokens (#13284)

Closes #7806
Authored by: bashonly
This commit is contained in:
bashonly 2025-05-30 18:20:59 -05:00 committed by GitHub
parent d30a49742c
commit 3fe72e9eea
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -1,4 +1,5 @@
import base64 import base64
import functools
import hashlib import hashlib
import hmac import hmac
import itertools import itertools
@ -17,99 +18,227 @@ from ..utils import (
UserNotLive, UserNotLive,
float_or_none, float_or_none,
int_or_none, int_or_none,
join_nonempty,
jwt_decode_hs256,
str_or_none, str_or_none,
traverse_obj,
try_call, try_call,
update_url_query, update_url_query,
url_or_none, url_or_none,
) )
from ..utils.traversal import require, traverse_obj
class WeverseBaseIE(InfoExtractor): class WeverseBaseIE(InfoExtractor):
_NETRC_MACHINE = 'weverse' _NETRC_MACHINE = 'weverse'
_ACCOUNT_API_BASE = 'https://accountapi.weverse.io/web/api' _ACCOUNT_API_BASE = 'https://accountapi.weverse.io'
_CLIENT_PLATFORM = 'WEB'
_SIGNING_KEY = b'1b9cb6378d959b45714bec49971ade22e6e24e42'
_ACCESS_TOKEN_KEY = 'we2_access_token'
_REFRESH_TOKEN_KEY = 'we2_refresh_token'
_DEVICE_ID_KEY = 'we2_device_id'
_API_HEADERS = { _API_HEADERS = {
'Accept': 'application/json', 'Accept': 'application/json',
'Origin': 'https://weverse.io',
'Referer': 'https://weverse.io/', 'Referer': 'https://weverse.io/',
'WEV-device-Id': str(uuid.uuid4()),
} }
_LOGIN_HINT_TMPL = (
'You can log in using your refresh token with --username "{}" --password "REFRESH_TOKEN" '
'(replace REFRESH_TOKEN with the actual value of the "{}" cookie found in your web browser). '
'You can add an optional username suffix, e.g. --username "{}" , '
'if you need to manage multiple accounts. ')
_LOGIN_ERRORS_MAP = {
'login_required': 'This content is only available for logged-in users. ',
'invalid_username': '"{}" is not valid login username for this extractor. ',
'invalid_password': (
'Your password is not a valid refresh token. Make sure that '
'you are passing the refresh token, and NOT the access token. '),
'no_refresh_token': (
'Your access token has expired and there is no refresh token available. '
'Refresh your session/cookies in the web browser and try again. '),
'expired_refresh_token': (
'Your refresh token has expired. Log in to the site again using '
'your web browser to get a new refresh token or export fresh cookies. '),
}
_OAUTH_PREFIX = 'oauth'
_oauth_tokens = {}
_device_id = None
def _perform_login(self, username, password): @property
if self._API_HEADERS.get('Authorization'): def _oauth_headers(self):
return return {
**self._API_HEADERS,
headers = { 'X-ACC-APP-SECRET': '5419526f1c624b38b10787e5c10b2a7a',
'x-acc-app-secret': '5419526f1c624b38b10787e5c10b2a7a', 'X-ACC-SERVICE-ID': 'weverse',
'x-acc-app-version': '3.3.6', 'X-ACC-TRACE-ID': str(uuid.uuid4()),
'x-acc-language': 'en',
'x-acc-service-id': 'weverse',
'x-acc-trace-id': str(uuid.uuid4()),
'x-clog-user-device-id': str(uuid.uuid4()),
} }
valid_username = traverse_obj(self._download_json(
f'{self._ACCOUNT_API_BASE}/v2/signup/email/status', None, note='Checking username',
query={'email': username}, headers=headers, expected_status=(400, 404)), 'hasPassword')
if not valid_username:
raise ExtractorError('Invalid username provided', expected=True)
headers['content-type'] = 'application/json' @functools.cached_property
def _oauth_cache_key(self):
username = self._get_login_info()[0]
if not username:
return 'cookies'
return join_nonempty(self._OAUTH_PREFIX, username.partition('+')[2])
@property
def _is_logged_in(self):
return bool(self._oauth_tokens.get(self._ACCESS_TOKEN_KEY))
def _access_token_is_valid(self):
response = self._download_json(
f'{self._ACCOUNT_API_BASE}/api/v1/token/validate', None,
'Validating access token', 'Unable to valid access token',
expected_status=401, headers={
**self._oauth_headers,
'Authorization': f'Bearer {self._oauth_tokens[self._ACCESS_TOKEN_KEY]}',
})
return traverse_obj(response, ('expiresIn', {int}), default=0) > 60
def _token_is_expired(self, key):
is_expired = jwt_decode_hs256(self._oauth_tokens[key])['exp'] - time.time() < 3600
if key == self._REFRESH_TOKEN_KEY or not is_expired:
return is_expired
return not self._access_token_is_valid()
def _refresh_access_token(self):
if not self._oauth_tokens.get(self._REFRESH_TOKEN_KEY):
self._report_login_error('no_refresh_token')
if self._token_is_expired(self._REFRESH_TOKEN_KEY):
self._report_login_error('expired_refresh_token')
headers = {'Content-Type': 'application/json'}
if self._is_logged_in:
headers['Authorization'] = f'Bearer {self._oauth_tokens[self._ACCESS_TOKEN_KEY]}'
try: try:
auth = self._download_json( response = self._download_json(
f'{self._ACCOUNT_API_BASE}/v3/auth/token/by-credentials', None, data=json.dumps({ f'{self._ACCOUNT_API_BASE}/api/v1/token/refresh', None,
'email': username, 'Refreshing access token', 'Unable to refresh access token',
'otpSessionId': 'BY_PASS', headers={**self._oauth_headers, **headers},
'password': password, data=json.dumps({
}, separators=(',', ':')).encode(), headers=headers, note='Logging in') 'refreshToken': self._oauth_tokens[self._REFRESH_TOKEN_KEY],
}, separators=(',', ':')).encode())
except ExtractorError as e: except ExtractorError as e:
if isinstance(e.cause, HTTPError) and e.cause.status == 401: if isinstance(e.cause, HTTPError) and e.cause.status == 401:
raise ExtractorError('Invalid password provided', expected=True) self._oauth_tokens.clear()
if self._oauth_cache_key == 'cookies':
self.cookiejar.clear(domain='.weverse.io', path='/', name=self._ACCESS_TOKEN_KEY)
self.cookiejar.clear(domain='.weverse.io', path='/', name=self._REFRESH_TOKEN_KEY)
else:
self.cache.store(self._NETRC_MACHINE, self._oauth_cache_key, self._oauth_tokens)
self._report_login_error('expired_refresh_token')
raise raise
WeverseBaseIE._API_HEADERS['Authorization'] = f'Bearer {auth["accessToken"]}' self._oauth_tokens.update(traverse_obj(response, {
self._ACCESS_TOKEN_KEY: ('accessToken', {str}, {require('access token')}),
self._REFRESH_TOKEN_KEY: ('refreshToken', {str}, {require('refresh token')}),
}))
def _real_initialize(self): if self._oauth_cache_key == 'cookies':
if self._API_HEADERS.get('Authorization'): self._set_cookie('.weverse.io', self._ACCESS_TOKEN_KEY, self._oauth_tokens[self._ACCESS_TOKEN_KEY])
self._set_cookie('.weverse.io', self._REFRESH_TOKEN_KEY, self._oauth_tokens[self._REFRESH_TOKEN_KEY])
else:
self.cache.store(self._NETRC_MACHINE, self._oauth_cache_key, self._oauth_tokens)
def _get_authorization_header(self):
if not self._is_logged_in:
return {}
if self._token_is_expired(self._ACCESS_TOKEN_KEY):
self._refresh_access_token()
return {'Authorization': f'Bearer {self._oauth_tokens[self._ACCESS_TOKEN_KEY]}'}
def _report_login_error(self, error_id):
error_msg = self._LOGIN_ERRORS_MAP[error_id]
username = self._get_login_info()[0]
if error_id == 'invalid_username':
error_msg = error_msg.format(username)
username = f'{self._OAUTH_PREFIX}+{username}'
elif not username:
username = f'{self._OAUTH_PREFIX}+USERNAME'
raise ExtractorError(join_nonempty(
error_msg, self._LOGIN_HINT_TMPL.format(self._OAUTH_PREFIX, self._REFRESH_TOKEN_KEY, username),
'Or else you can u', self._login_hint(method='session_cookies')[1:], delim=''), expected=True)
def _perform_login(self, username, password):
if self._is_logged_in:
return return
token = try_call(lambda: self._get_cookies('https://weverse.io/')['we2_access_token'].value) if username.partition('+')[0] != self._OAUTH_PREFIX:
if token: self._report_login_error('invalid_username')
WeverseBaseIE._API_HEADERS['Authorization'] = f'Bearer {token}'
self._oauth_tokens.update(self.cache.load(self._NETRC_MACHINE, self._oauth_cache_key, default={}))
if self._is_logged_in and self._access_token_is_valid():
return
rt_key = self._REFRESH_TOKEN_KEY
if not self._oauth_tokens.get(rt_key) or self._token_is_expired(rt_key):
if try_call(lambda: jwt_decode_hs256(password)['scope']) != 'refresh':
self._report_login_error('invalid_password')
self._oauth_tokens[rt_key] = password
self._refresh_access_token()
def _real_initialize(self):
cookies = self._get_cookies('https://weverse.io/')
if not self._device_id:
self._device_id = traverse_obj(cookies, (self._DEVICE_ID_KEY, 'value')) or str(uuid.uuid4())
if self._is_logged_in:
return
self._oauth_tokens.update(traverse_obj(cookies, {
self._ACCESS_TOKEN_KEY: (self._ACCESS_TOKEN_KEY, 'value'),
self._REFRESH_TOKEN_KEY: (self._REFRESH_TOKEN_KEY, 'value'),
}))
if self._is_logged_in and not self._access_token_is_valid():
self._refresh_access_token()
def _call_api(self, ep, video_id, data=None, note='Downloading API JSON'): def _call_api(self, ep, video_id, data=None, note='Downloading API JSON'):
# Ref: https://ssl.pstatic.net/static/wevweb/2_3_2_11101725/public/static/js/2488.a09b41ff.chunk.js # Ref: https://ssl.pstatic.net/static/wevweb/2_3_2_11101725/public/static/js/2488.a09b41ff.chunk.js
# From https://ssl.pstatic.net/static/wevweb/2_3_2_11101725/public/static/js/main.e206f7c1.js: # From https://ssl.pstatic.net/static/wevweb/2_3_2_11101725/public/static/js/main.e206f7c1.js:
key = b'1b9cb6378d959b45714bec49971ade22e6e24e42'
api_path = update_url_query(ep, { api_path = update_url_query(ep, {
# 'gcc': 'US', # 'gcc': 'US',
'appId': 'be4d79eb8fc7bd008ee82c8ec4ff6fd4', 'appId': 'be4d79eb8fc7bd008ee82c8ec4ff6fd4',
'language': 'en', 'language': 'en',
'os': 'WEB', 'os': self._CLIENT_PLATFORM,
'platform': 'WEB', 'platform': self._CLIENT_PLATFORM,
'wpf': 'pc', 'wpf': 'pc',
}) })
wmsgpad = int(time.time() * 1000) for is_retry in (False, True):
wmd = base64.b64encode(hmac.HMAC( wmsgpad = int(time.time() * 1000)
key, f'{api_path[:255]}{wmsgpad}'.encode(), digestmod=hashlib.sha1).digest()).decode() wmd = base64.b64encode(hmac.HMAC(
headers = {'Content-Type': 'application/json'} if data else {} self._SIGNING_KEY, f'{api_path[:255]}{wmsgpad}'.encode(),
try: digestmod=hashlib.sha1).digest()).decode()
return self._download_json(
f'https://global.apis.naver.com/weverse/wevweb{api_path}', video_id, note=note, try:
data=data, headers={**self._API_HEADERS, **headers}, query={ return self._download_json(
'wmsgpad': wmsgpad, f'https://global.apis.naver.com/weverse/wevweb{api_path}', video_id, note=note,
'wmd': wmd, data=data, headers={
}) **self._API_HEADERS,
except ExtractorError as e: **self._get_authorization_header(),
if isinstance(e.cause, HTTPError) and e.cause.status == 401: **({'Content-Type': 'application/json'} if data else {}),
self.raise_login_required( 'WEV-device-Id': self._device_id,
'Session token has expired. Log in again or refresh cookies in browser') }, query={
elif isinstance(e.cause, HTTPError) and e.cause.status == 403: 'wmsgpad': wmsgpad,
if 'Authorization' in self._API_HEADERS: 'wmd': wmd,
raise ExtractorError('Your account does not have access to this content', expected=True) })
self.raise_login_required() except ExtractorError as e:
raise if is_retry or not isinstance(e.cause, HTTPError):
raise
elif self._is_logged_in and e.cause.status == 401:
self._refresh_access_token()
continue
elif e.cause.status == 403:
if self._is_logged_in:
raise ExtractorError(
'Your account does not have access to this content', expected=True)
self._report_login_error('login_required')
raise
def _call_post_api(self, video_id): def _call_post_api(self, video_id):
path = '' if 'Authorization' in self._API_HEADERS else '/preview' path = '' if self._is_logged_in else '/preview'
return self._call_api(f'/post/v1.0/post-{video_id}{path}?fieldSet=postV1', video_id) return self._call_api(f'/post/v1.0/post-{video_id}{path}?fieldSet=postV1', video_id)
def _get_community_id(self, channel): def _get_community_id(self, channel):