1
0
mirror of https://github.com/yt-dlp/yt-dlp.git synced 2025-08-21 10:51:27 +00:00

[utils] Add improved jwt_encode function (#14071)

Also deprecates `jwt_encode_hs256`

Authored by: bashonly
This commit is contained in:
bashonly 2025-08-19 17:36:00 -05:00 committed by GitHub
parent 8df121ba59
commit 35da8df4f8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 84 additions and 15 deletions

View File

@ -315,6 +315,7 @@ banned-from = [
"yt_dlp.utils.error_to_compat_str".msg = "Use `str` instead." "yt_dlp.utils.error_to_compat_str".msg = "Use `str` instead."
"yt_dlp.utils.bytes_to_intlist".msg = "Use `list` instead." "yt_dlp.utils.bytes_to_intlist".msg = "Use `list` instead."
"yt_dlp.utils.intlist_to_bytes".msg = "Use `bytes` instead." "yt_dlp.utils.intlist_to_bytes".msg = "Use `bytes` instead."
"yt_dlp.utils.jwt_encode_hs256".msg = "Use `yt_dlp.utils.jwt_encode` instead."
"yt_dlp.utils.decodeArgument".msg = "Do not use" "yt_dlp.utils.decodeArgument".msg = "Do not use"
"yt_dlp.utils.decodeFilename".msg = "Do not use" "yt_dlp.utils.decodeFilename".msg = "Do not use"
"yt_dlp.utils.encodeFilename".msg = "Do not use" "yt_dlp.utils.encodeFilename".msg = "Do not use"

View File

@ -71,6 +71,8 @@ from yt_dlp.utils import (
iri_to_uri, iri_to_uri,
is_html, is_html,
js_to_json, js_to_json,
jwt_decode_hs256,
jwt_encode,
limit_length, limit_length,
locked_file, locked_file,
lowercase_escape, lowercase_escape,
@ -2180,6 +2182,41 @@ Line 1
assert int_or_none(v=10) == 10, 'keyword passed positional should call function' assert int_or_none(v=10) == 10, 'keyword passed positional should call function'
assert int_or_none(scale=0.1)(10) == 100, 'call after partial application should call the function' assert int_or_none(scale=0.1)(10) == 100, 'call after partial application should call the function'
_JWT_KEY = '12345678'
_JWT_HEADERS_1 = {'a': 'b'}
_JWT_HEADERS_2 = {'typ': 'JWT', 'alg': 'HS256'}
_JWT_HEADERS_3 = {'typ': 'JWT', 'alg': 'RS256'}
_JWT_HEADERS_4 = {'c': 'd', 'alg': 'ES256'}
_JWT_DECODED = {
'foo': 'bar',
'qux': 'baz',
}
_JWT_SIMPLE = 'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJmb28iOiJiYXIiLCJxdXgiOiJiYXoifQ.fKojvTWqnjNTbsdoDTmYNc4tgYAG3h_SWRzM77iLH0U'
_JWT_WITH_EXTRA_HEADERS = 'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCIsImEiOiJiIn0.eyJmb28iOiJiYXIiLCJxdXgiOiJiYXoifQ.Ia91-B77yasfYM7jsB6iVKLew-3rO6ITjNmjWUVXCvQ'
_JWT_WITH_REORDERED_HEADERS = 'eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJmb28iOiJiYXIiLCJxdXgiOiJiYXoifQ.slg-7COta5VOfB36p3tqV4MGPV6TTA_ouGnD48UEVq4'
_JWT_WITH_REORDERED_HEADERS_AND_RS256_ALG = 'eyJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiJ9.eyJmb28iOiJiYXIiLCJxdXgiOiJiYXoifQ.XWp496oVgQnoits0OOocutdjxoaQwn4GUWWxUsKENPM'
_JWT_WITH_EXTRA_HEADERS_AND_ES256_ALG = 'eyJhbGciOiJFUzI1NiIsInR5cCI6IkpXVCIsImMiOiJkIn0.eyJmb28iOiJiYXIiLCJxdXgiOiJiYXoifQ.oM_tc7IkfrwkoRh43rFFE1wOi3J3mQGwx7_lMyKQqDg'
def test_jwt_encode(self):
def test(expected, headers={}):
self.assertEqual(jwt_encode(self._JWT_DECODED, self._JWT_KEY, headers=headers), expected)
test(self._JWT_SIMPLE)
test(self._JWT_WITH_EXTRA_HEADERS, headers=self._JWT_HEADERS_1)
test(self._JWT_WITH_REORDERED_HEADERS, headers=self._JWT_HEADERS_2)
test(self._JWT_WITH_REORDERED_HEADERS_AND_RS256_ALG, headers=self._JWT_HEADERS_3)
test(self._JWT_WITH_EXTRA_HEADERS_AND_ES256_ALG, headers=self._JWT_HEADERS_4)
def test_jwt_decode_hs256(self):
def test(inp):
self.assertEqual(jwt_decode_hs256(inp), self._JWT_DECODED)
test(self._JWT_SIMPLE)
test(self._JWT_WITH_EXTRA_HEADERS)
test(self._JWT_WITH_REORDERED_HEADERS)
test(self._JWT_WITH_REORDERED_HEADERS_AND_RS256_ALG)
test(self._JWT_WITH_EXTRA_HEADERS_AND_ES256_ALG)
if __name__ == '__main__': if __name__ == '__main__':
unittest.main() unittest.main()

View File

@ -4,7 +4,7 @@ from .common import InfoExtractor
from ..utils import ( from ..utils import (
ExtractorError, ExtractorError,
float_or_none, float_or_none,
jwt_encode_hs256, jwt_encode,
try_get, try_get,
) )
@ -83,11 +83,10 @@ class ATVAtIE(InfoExtractor):
'nbf': int(not_before.timestamp()), 'nbf': int(not_before.timestamp()),
'exp': int(expire.timestamp()), 'exp': int(expire.timestamp()),
} }
jwt_token = jwt_encode_hs256(payload, self._ENCRYPTION_KEY, headers={'kid': self._ACCESS_ID})
videos = self._download_json( videos = self._download_json(
'https://vas-v4.p7s1video.net/4.0/getsources', 'https://vas-v4.p7s1video.net/4.0/getsources',
content_id, 'Downloading videos JSON', query={ content_id, 'Downloading videos JSON', query={
'token': jwt_token.decode('utf-8'), 'token': jwt_encode(payload, self._ENCRYPTION_KEY, headers={'kid': self._ACCESS_ID}),
}) })
video_id, videos_data = next(iter(videos['data'].items())) video_id, videos_data = next(iter(videos['data'].items()))

View File

@ -14,7 +14,7 @@ from ..utils import (
get_element_html_by_class, get_element_html_by_class,
int_or_none, int_or_none,
jwt_decode_hs256, jwt_decode_hs256,
jwt_encode_hs256, jwt_encode,
make_archive_id, make_archive_id,
merge_dicts, merge_dicts,
parse_age_limit, parse_age_limit,
@ -98,9 +98,9 @@ class VRTBaseIE(InfoExtractor):
'Content-Type': 'application/json', 'Content-Type': 'application/json',
}, data=json.dumps({ }, data=json.dumps({
'identityToken': id_token or '', 'identityToken': id_token or '',
'playerInfo': jwt_encode_hs256(player_info, self._JWT_SIGNING_KEY, headers={ 'playerInfo': jwt_encode(player_info, self._JWT_SIGNING_KEY, headers={
'kid': self._JWT_KEY_ID, 'kid': self._JWT_KEY_ID,
}).decode(), }),
}, separators=(',', ':')).encode())['vrtPlayerToken'] }, separators=(',', ':')).encode())['vrtPlayerToken']
return self._download_json( return self._download_json(

View File

@ -1,4 +1,8 @@
"""Deprecated - New code should avoid these""" """Deprecated - New code should avoid these"""
import base64
import hashlib
import hmac
import json
import warnings import warnings
from ..compat.compat_utils import passthrough_module from ..compat.compat_utils import passthrough_module
@ -28,4 +32,18 @@ def intlist_to_bytes(xs):
return struct.pack('%dB' % len(xs), *xs) return struct.pack('%dB' % len(xs), *xs)
def jwt_encode_hs256(payload_data, key, headers={}):
header_data = {
'alg': 'HS256',
'typ': 'JWT',
}
if headers:
header_data.update(headers)
header_b64 = base64.b64encode(json.dumps(header_data).encode())
payload_b64 = base64.b64encode(json.dumps(payload_data).encode())
h = hmac.new(key.encode(), header_b64 + b'.' + payload_b64, hashlib.sha256)
signature_b64 = base64.b64encode(h.digest())
return header_b64 + b'.' + payload_b64 + b'.' + signature_b64
compiled_regex_type = type(re.compile('')) compiled_regex_type = type(re.compile(''))

View File

@ -4739,22 +4739,36 @@ def time_seconds(**kwargs):
return time.time() + dt.timedelta(**kwargs).total_seconds() return time.time() + dt.timedelta(**kwargs).total_seconds()
# create a JSON Web Signature (jws) with HS256 algorithm
# the resulting format is in JWS Compact Serialization
# implemented following JWT https://www.rfc-editor.org/rfc/rfc7519.html # implemented following JWT https://www.rfc-editor.org/rfc/rfc7519.html
# implemented following JWS https://www.rfc-editor.org/rfc/rfc7515.html # implemented following JWS https://www.rfc-editor.org/rfc/rfc7515.html
def jwt_encode_hs256(payload_data, key, headers={}): def jwt_encode(payload_data, key, *, alg='HS256', headers=None):
assert alg in ('HS256',), f'Unsupported algorithm "{alg}"'
def jwt_json_bytes(obj):
return json.dumps(obj, separators=(',', ':')).encode()
def jwt_b64encode(bytestring):
return base64.urlsafe_b64encode(bytestring).rstrip(b'=')
header_data = { header_data = {
'alg': 'HS256', 'alg': alg,
'typ': 'JWT', 'typ': 'JWT',
} }
if headers: if headers:
header_data.update(headers) # Allow re-ordering of keys if both 'alg' and 'typ' are present
header_b64 = base64.b64encode(json.dumps(header_data).encode()) if 'alg' in headers and 'typ' in headers:
payload_b64 = base64.b64encode(json.dumps(payload_data).encode()) header_data = headers
else:
header_data.update(headers)
header_b64 = jwt_b64encode(jwt_json_bytes(header_data))
payload_b64 = jwt_b64encode(jwt_json_bytes(payload_data))
# HS256 is the only algorithm currently supported
h = hmac.new(key.encode(), header_b64 + b'.' + payload_b64, hashlib.sha256) h = hmac.new(key.encode(), header_b64 + b'.' + payload_b64, hashlib.sha256)
signature_b64 = base64.b64encode(h.digest()) signature_b64 = jwt_b64encode(h.digest())
return header_b64 + b'.' + payload_b64 + b'.' + signature_b64
return (header_b64 + b'.' + payload_b64 + b'.' + signature_b64).decode()
# can be extended in future to verify the signature and parse header and return the algorithm used if it's not HS256 # can be extended in future to verify the signature and parse header and return the algorithm used if it's not HS256