From 35da8df4f843cb8f0656a301e5bebbf47d64d69a Mon Sep 17 00:00:00 2001 From: bashonly <88596187+bashonly@users.noreply.github.com> Date: Tue, 19 Aug 2025 17:36:00 -0500 Subject: [PATCH] [utils] Add improved `jwt_encode` function (#14071) Also deprecates `jwt_encode_hs256` Authored by: bashonly --- pyproject.toml | 1 + test/test_utils.py | 37 +++++++++++++++++++++++++++++++++++++ yt_dlp/extractor/atvat.py | 5 ++--- yt_dlp/extractor/vrt.py | 6 +++--- yt_dlp/utils/_deprecated.py | 18 ++++++++++++++++++ yt_dlp/utils/_utils.py | 32 +++++++++++++++++++++++--------- 6 files changed, 84 insertions(+), 15 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index d8c3d9e822..35f81423a8 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -315,6 +315,7 @@ banned-from = [ "yt_dlp.utils.error_to_compat_str".msg = "Use `str` instead." "yt_dlp.utils.bytes_to_intlist".msg = "Use `list` instead." "yt_dlp.utils.intlist_to_bytes".msg = "Use `bytes` instead." +"yt_dlp.utils.jwt_encode_hs256".msg = "Use `yt_dlp.utils.jwt_encode` instead." "yt_dlp.utils.decodeArgument".msg = "Do not use" "yt_dlp.utils.decodeFilename".msg = "Do not use" "yt_dlp.utils.encodeFilename".msg = "Do not use" diff --git a/test/test_utils.py b/test/test_utils.py index 44747efda6..dce07c3626 100644 --- a/test/test_utils.py +++ b/test/test_utils.py @@ -71,6 +71,8 @@ from yt_dlp.utils import ( iri_to_uri, is_html, js_to_json, + jwt_decode_hs256, + jwt_encode, limit_length, locked_file, lowercase_escape, @@ -2180,6 +2182,41 @@ Line 1 assert int_or_none(v=10) == 10, 'keyword passed positional should call function' assert int_or_none(scale=0.1)(10) == 100, 'call after partial application should call the function' + _JWT_KEY = '12345678' + _JWT_HEADERS_1 = {'a': 'b'} + _JWT_HEADERS_2 = {'typ': 'JWT', 'alg': 'HS256'} + _JWT_HEADERS_3 = {'typ': 'JWT', 'alg': 'RS256'} + _JWT_HEADERS_4 = {'c': 'd', 'alg': 'ES256'} + _JWT_DECODED = { + 'foo': 'bar', + 'qux': 'baz', + } + _JWT_SIMPLE = 'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJmb28iOiJiYXIiLCJxdXgiOiJiYXoifQ.fKojvTWqnjNTbsdoDTmYNc4tgYAG3h_SWRzM77iLH0U' + _JWT_WITH_EXTRA_HEADERS = 'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCIsImEiOiJiIn0.eyJmb28iOiJiYXIiLCJxdXgiOiJiYXoifQ.Ia91-B77yasfYM7jsB6iVKLew-3rO6ITjNmjWUVXCvQ' + _JWT_WITH_REORDERED_HEADERS = 'eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJmb28iOiJiYXIiLCJxdXgiOiJiYXoifQ.slg-7COta5VOfB36p3tqV4MGPV6TTA_ouGnD48UEVq4' + _JWT_WITH_REORDERED_HEADERS_AND_RS256_ALG = 'eyJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiJ9.eyJmb28iOiJiYXIiLCJxdXgiOiJiYXoifQ.XWp496oVgQnoits0OOocutdjxoaQwn4GUWWxUsKENPM' + _JWT_WITH_EXTRA_HEADERS_AND_ES256_ALG = 'eyJhbGciOiJFUzI1NiIsInR5cCI6IkpXVCIsImMiOiJkIn0.eyJmb28iOiJiYXIiLCJxdXgiOiJiYXoifQ.oM_tc7IkfrwkoRh43rFFE1wOi3J3mQGwx7_lMyKQqDg' + + def test_jwt_encode(self): + def test(expected, headers={}): + self.assertEqual(jwt_encode(self._JWT_DECODED, self._JWT_KEY, headers=headers), expected) + + test(self._JWT_SIMPLE) + test(self._JWT_WITH_EXTRA_HEADERS, headers=self._JWT_HEADERS_1) + test(self._JWT_WITH_REORDERED_HEADERS, headers=self._JWT_HEADERS_2) + test(self._JWT_WITH_REORDERED_HEADERS_AND_RS256_ALG, headers=self._JWT_HEADERS_3) + test(self._JWT_WITH_EXTRA_HEADERS_AND_ES256_ALG, headers=self._JWT_HEADERS_4) + + def test_jwt_decode_hs256(self): + def test(inp): + self.assertEqual(jwt_decode_hs256(inp), self._JWT_DECODED) + + test(self._JWT_SIMPLE) + test(self._JWT_WITH_EXTRA_HEADERS) + test(self._JWT_WITH_REORDERED_HEADERS) + test(self._JWT_WITH_REORDERED_HEADERS_AND_RS256_ALG) + test(self._JWT_WITH_EXTRA_HEADERS_AND_ES256_ALG) + if __name__ == '__main__': unittest.main() diff --git a/yt_dlp/extractor/atvat.py b/yt_dlp/extractor/atvat.py index 37bb616952..b05eccf182 100644 --- a/yt_dlp/extractor/atvat.py +++ b/yt_dlp/extractor/atvat.py @@ -4,7 +4,7 @@ from .common import InfoExtractor from ..utils import ( ExtractorError, float_or_none, - jwt_encode_hs256, + jwt_encode, try_get, ) @@ -83,11 +83,10 @@ class ATVAtIE(InfoExtractor): 'nbf': int(not_before.timestamp()), 'exp': int(expire.timestamp()), } - jwt_token = jwt_encode_hs256(payload, self._ENCRYPTION_KEY, headers={'kid': self._ACCESS_ID}) videos = self._download_json( 'https://vas-v4.p7s1video.net/4.0/getsources', content_id, 'Downloading videos JSON', query={ - 'token': jwt_token.decode('utf-8'), + 'token': jwt_encode(payload, self._ENCRYPTION_KEY, headers={'kid': self._ACCESS_ID}), }) video_id, videos_data = next(iter(videos['data'].items())) diff --git a/yt_dlp/extractor/vrt.py b/yt_dlp/extractor/vrt.py index 6e5514eefd..079feab454 100644 --- a/yt_dlp/extractor/vrt.py +++ b/yt_dlp/extractor/vrt.py @@ -14,7 +14,7 @@ from ..utils import ( get_element_html_by_class, int_or_none, jwt_decode_hs256, - jwt_encode_hs256, + jwt_encode, make_archive_id, merge_dicts, parse_age_limit, @@ -98,9 +98,9 @@ class VRTBaseIE(InfoExtractor): 'Content-Type': 'application/json', }, data=json.dumps({ 'identityToken': id_token or '', - 'playerInfo': jwt_encode_hs256(player_info, self._JWT_SIGNING_KEY, headers={ + 'playerInfo': jwt_encode(player_info, self._JWT_SIGNING_KEY, headers={ 'kid': self._JWT_KEY_ID, - }).decode(), + }), }, separators=(',', ':')).encode())['vrtPlayerToken'] return self._download_json( diff --git a/yt_dlp/utils/_deprecated.py b/yt_dlp/utils/_deprecated.py index e4762699b7..4797cfef53 100644 --- a/yt_dlp/utils/_deprecated.py +++ b/yt_dlp/utils/_deprecated.py @@ -1,4 +1,8 @@ """Deprecated - New code should avoid these""" +import base64 +import hashlib +import hmac +import json import warnings from ..compat.compat_utils import passthrough_module @@ -28,4 +32,18 @@ def intlist_to_bytes(xs): return struct.pack('%dB' % len(xs), *xs) +def jwt_encode_hs256(payload_data, key, headers={}): + header_data = { + 'alg': 'HS256', + 'typ': 'JWT', + } + if headers: + header_data.update(headers) + header_b64 = base64.b64encode(json.dumps(header_data).encode()) + payload_b64 = base64.b64encode(json.dumps(payload_data).encode()) + h = hmac.new(key.encode(), header_b64 + b'.' + payload_b64, hashlib.sha256) + signature_b64 = base64.b64encode(h.digest()) + return header_b64 + b'.' + payload_b64 + b'.' + signature_b64 + + compiled_regex_type = type(re.compile('')) diff --git a/yt_dlp/utils/_utils.py b/yt_dlp/utils/_utils.py index a5471da4df..3adc1d6be2 100644 --- a/yt_dlp/utils/_utils.py +++ b/yt_dlp/utils/_utils.py @@ -4739,22 +4739,36 @@ def time_seconds(**kwargs): return time.time() + dt.timedelta(**kwargs).total_seconds() -# create a JSON Web Signature (jws) with HS256 algorithm -# the resulting format is in JWS Compact Serialization # implemented following JWT https://www.rfc-editor.org/rfc/rfc7519.html # implemented following JWS https://www.rfc-editor.org/rfc/rfc7515.html -def jwt_encode_hs256(payload_data, key, headers={}): +def jwt_encode(payload_data, key, *, alg='HS256', headers=None): + assert alg in ('HS256',), f'Unsupported algorithm "{alg}"' + + def jwt_json_bytes(obj): + return json.dumps(obj, separators=(',', ':')).encode() + + def jwt_b64encode(bytestring): + return base64.urlsafe_b64encode(bytestring).rstrip(b'=') + header_data = { - 'alg': 'HS256', + 'alg': alg, 'typ': 'JWT', } if headers: - header_data.update(headers) - header_b64 = base64.b64encode(json.dumps(header_data).encode()) - payload_b64 = base64.b64encode(json.dumps(payload_data).encode()) + # Allow re-ordering of keys if both 'alg' and 'typ' are present + if 'alg' in headers and 'typ' in headers: + header_data = headers + else: + header_data.update(headers) + + header_b64 = jwt_b64encode(jwt_json_bytes(header_data)) + payload_b64 = jwt_b64encode(jwt_json_bytes(payload_data)) + + # HS256 is the only algorithm currently supported h = hmac.new(key.encode(), header_b64 + b'.' + payload_b64, hashlib.sha256) - signature_b64 = base64.b64encode(h.digest()) - return header_b64 + b'.' + payload_b64 + b'.' + signature_b64 + signature_b64 = jwt_b64encode(h.digest()) + + return (header_b64 + b'.' + payload_b64 + b'.' + signature_b64).decode() # can be extended in future to verify the signature and parse header and return the algorithm used if it's not HS256