Reformat code using black

Segments where readability was hampered were fixed by appropriate
format skipping directives. New code should hopefully be black
compatible. The moment they remove the -S option is probably the moment
I stop using black though.
This commit is contained in:
Rapptz
2022-02-20 06:29:41 -05:00
parent af8e74d327
commit 88b520b5ab
56 changed files with 738 additions and 289 deletions

View File

@ -42,6 +42,7 @@ if TYPE_CHECKING:
BAND_CTL = Literal['narrow', 'medium', 'wide', 'superwide', 'full']
SIGNAL_CTL = Literal['auto', 'voice', 'music']
class BandCtl(TypedDict):
narrow: int
medium: int
@ -49,11 +50,13 @@ class BandCtl(TypedDict):
superwide: int
full: int
class SignalCtl(TypedDict):
auto: int
voice: int
music: int
__all__ = (
'Encoder',
'OpusError',
@ -62,23 +65,27 @@ __all__ = (
_log = logging.getLogger(__name__)
c_int_ptr = ctypes.POINTER(ctypes.c_int)
c_int_ptr = ctypes.POINTER(ctypes.c_int)
c_int16_ptr = ctypes.POINTER(ctypes.c_int16)
c_float_ptr = ctypes.POINTER(ctypes.c_float)
_lib = None
class EncoderStruct(ctypes.Structure):
pass
class DecoderStruct(ctypes.Structure):
pass
EncoderStructPtr = ctypes.POINTER(EncoderStruct)
DecoderStructPtr = ctypes.POINTER(DecoderStruct)
## Some constants from opus_defines.h
# Error codes
# fmt: off
OK = 0
BAD_ARG = -1
@ -96,6 +103,7 @@ CTL_SET_SIGNAL = 4024
# Decoder CTLs
CTL_SET_GAIN = 4034
CTL_LAST_PACKET_DURATION = 4039
# fmt: on
band_ctl: BandCtl = {
'narrow': 1101,
@ -111,12 +119,14 @@ signal_ctl: SignalCtl = {
'music': 3002,
}
def _err_lt(result: int, func: Callable, args: List) -> int:
if result < OK:
_log.info('error has happened in %s', func.__name__)
raise OpusError(result)
return result
def _err_ne(result: T, func: Callable, args: List) -> T:
ret = args[-1]._obj
if ret.value != OK:
@ -124,6 +134,7 @@ def _err_ne(result: T, func: Callable, args: List) -> T:
raise OpusError(ret.value)
return result
# A list of exported functions.
# The first argument is obviously the name.
# The second one are the types of arguments it takes.
@ -131,54 +142,46 @@ def _err_ne(result: T, func: Callable, args: List) -> T:
# The fourth is the error handler.
exported_functions: List[Tuple[Any, ...]] = [
# Generic
('opus_get_version_string',
None, ctypes.c_char_p, None),
('opus_strerror',
[ctypes.c_int], ctypes.c_char_p, None),
('opus_get_version_string', None, ctypes.c_char_p, None),
('opus_strerror', [ctypes.c_int], ctypes.c_char_p, None),
# Encoder functions
('opus_encoder_get_size',
[ctypes.c_int], ctypes.c_int, None),
('opus_encoder_create',
[ctypes.c_int, ctypes.c_int, ctypes.c_int, c_int_ptr], EncoderStructPtr, _err_ne),
('opus_encode',
[EncoderStructPtr, c_int16_ptr, ctypes.c_int, ctypes.c_char_p, ctypes.c_int32], ctypes.c_int32, _err_lt),
('opus_encode_float',
[EncoderStructPtr, c_float_ptr, ctypes.c_int, ctypes.c_char_p, ctypes.c_int32], ctypes.c_int32, _err_lt),
('opus_encoder_ctl',
None, ctypes.c_int32, _err_lt),
('opus_encoder_destroy',
[EncoderStructPtr], None, None),
('opus_encoder_get_size', [ctypes.c_int], ctypes.c_int, None),
('opus_encoder_create', [ctypes.c_int, ctypes.c_int, ctypes.c_int, c_int_ptr], EncoderStructPtr, _err_ne),
('opus_encode', [EncoderStructPtr, c_int16_ptr, ctypes.c_int, ctypes.c_char_p, ctypes.c_int32], ctypes.c_int32, _err_lt),
(
'opus_encode_float',
[EncoderStructPtr, c_float_ptr, ctypes.c_int, ctypes.c_char_p, ctypes.c_int32],
ctypes.c_int32,
_err_lt,
),
('opus_encoder_ctl', None, ctypes.c_int32, _err_lt),
('opus_encoder_destroy', [EncoderStructPtr], None, None),
# Decoder functions
('opus_decoder_get_size',
[ctypes.c_int], ctypes.c_int, None),
('opus_decoder_create',
[ctypes.c_int, ctypes.c_int, c_int_ptr], DecoderStructPtr, _err_ne),
('opus_decode',
('opus_decoder_get_size', [ctypes.c_int], ctypes.c_int, None),
('opus_decoder_create', [ctypes.c_int, ctypes.c_int, c_int_ptr], DecoderStructPtr, _err_ne),
(
'opus_decode',
[DecoderStructPtr, ctypes.c_char_p, ctypes.c_int32, c_int16_ptr, ctypes.c_int, ctypes.c_int],
ctypes.c_int, _err_lt),
('opus_decode_float',
ctypes.c_int,
_err_lt,
),
(
'opus_decode_float',
[DecoderStructPtr, ctypes.c_char_p, ctypes.c_int32, c_float_ptr, ctypes.c_int, ctypes.c_int],
ctypes.c_int, _err_lt),
('opus_decoder_ctl',
None, ctypes.c_int32, _err_lt),
('opus_decoder_destroy',
[DecoderStructPtr], None, None),
('opus_decoder_get_nb_samples',
[DecoderStructPtr, ctypes.c_char_p, ctypes.c_int32], ctypes.c_int, _err_lt),
ctypes.c_int,
_err_lt,
),
('opus_decoder_ctl', None, ctypes.c_int32, _err_lt),
('opus_decoder_destroy', [DecoderStructPtr], None, None),
('opus_decoder_get_nb_samples', [DecoderStructPtr, ctypes.c_char_p, ctypes.c_int32], ctypes.c_int, _err_lt),
# Packet functions
('opus_packet_get_bandwidth',
[ctypes.c_char_p], ctypes.c_int, _err_lt),
('opus_packet_get_nb_channels',
[ctypes.c_char_p], ctypes.c_int, _err_lt),
('opus_packet_get_nb_frames',
[ctypes.c_char_p, ctypes.c_int], ctypes.c_int, _err_lt),
('opus_packet_get_samples_per_frame',
[ctypes.c_char_p, ctypes.c_int], ctypes.c_int, _err_lt),
('opus_packet_get_bandwidth', [ctypes.c_char_p], ctypes.c_int, _err_lt),
('opus_packet_get_nb_channels', [ctypes.c_char_p], ctypes.c_int, _err_lt),
('opus_packet_get_nb_frames', [ctypes.c_char_p, ctypes.c_int], ctypes.c_int, _err_lt),
('opus_packet_get_samples_per_frame', [ctypes.c_char_p, ctypes.c_int], ctypes.c_int, _err_lt),
]
def libopus_loader(name: str) -> Any:
# create the library...
lib = ctypes.cdll.LoadLibrary(name)
@ -203,6 +206,7 @@ def libopus_loader(name: str) -> Any:
return lib
def _load_default() -> bool:
global _lib
try:
@ -219,6 +223,7 @@ def _load_default() -> bool:
return _lib is not None
def load_opus(name: str) -> None:
"""Loads the libopus shared library for use with voice.
@ -257,6 +262,7 @@ def load_opus(name: str) -> None:
global _lib
_lib = libopus_loader(name)
def is_loaded() -> bool:
"""Function to check if opus lib is successfully loaded either
via the :func:`ctypes.util.find_library` call of :func:`load_opus`.
@ -271,6 +277,7 @@ def is_loaded() -> bool:
global _lib
return _lib is not None
class OpusError(DiscordException):
"""An exception that is thrown for libopus related errors.
@ -286,10 +293,13 @@ class OpusError(DiscordException):
_log.info('"%s" has happened', msg)
super().__init__(msg)
class OpusNotLoaded(DiscordException):
"""An exception that is thrown for when libopus is not loaded."""
pass
class _OpusStruct:
SAMPLING_RATE = 48000
CHANNELS = 2
@ -306,6 +316,7 @@ class _OpusStruct:
return _lib.opus_get_version_string().decode('utf-8')
class Encoder(_OpusStruct):
def __init__(self, application: int = APPLICATION_AUDIO):
_OpusStruct.get_opus_version()
@ -322,7 +333,7 @@ class Encoder(_OpusStruct):
if hasattr(self, '_state'):
_lib.opus_encoder_destroy(self._state)
# This is a destructor, so it's okay to assign None
self._state = None # type: ignore
self._state = None # type: ignore
def _create_state(self) -> EncoderStruct:
ret = ctypes.c_int()
@ -352,18 +363,19 @@ class Encoder(_OpusStruct):
_lib.opus_encoder_ctl(self._state, CTL_SET_FEC, 1 if enabled else 0)
def set_expected_packet_loss_percent(self, percentage: float) -> None:
_lib.opus_encoder_ctl(self._state, CTL_SET_PLP, min(100, max(0, int(percentage * 100)))) # type: ignore
_lib.opus_encoder_ctl(self._state, CTL_SET_PLP, min(100, max(0, int(percentage * 100)))) # type: ignore
def encode(self, pcm: bytes, frame_size: int) -> bytes:
max_data_bytes = len(pcm)
# bytes can be used to reference pointer
pcm_ptr = ctypes.cast(pcm, c_int16_ptr) # type: ignore
pcm_ptr = ctypes.cast(pcm, c_int16_ptr) # type: ignore
data = (ctypes.c_char * max_data_bytes)()
ret = _lib.opus_encode(self._state, pcm_ptr, frame_size, data, max_data_bytes)
# array can be initialized with bytes but mypy doesn't know
return array.array('b', data[:ret]).tobytes() # type: ignore
return array.array('b', data[:ret]).tobytes() # type: ignore
class Decoder(_OpusStruct):
def __init__(self):
@ -375,7 +387,7 @@ class Decoder(_OpusStruct):
if hasattr(self, '_state'):
_lib.opus_decoder_destroy(self._state)
# This is a destructor, so it's okay to assign None
self._state = None # type: ignore
self._state = None # type: ignore
def _create_state(self) -> DecoderStruct:
ret = ctypes.c_int()
@ -411,12 +423,12 @@ class Decoder(_OpusStruct):
def set_gain(self, dB: float) -> int:
"""Sets the decoder gain in dB, from -128 to 128."""
dB_Q8 = max(-32768, min(32767, round(dB * 256))) # dB * 2^n where n is 8 (Q8)
dB_Q8 = max(-32768, min(32767, round(dB * 256))) # dB * 2^n where n is 8 (Q8)
return self._set_gain(dB_Q8)
def set_volume(self, mult: float) -> int:
"""Sets the output volume as a float percent, i.e. 0.5 for 50%, 1.75 for 175%, etc."""
return self.set_gain(20 * math.log10(mult)) # amplitude ratio
return self.set_gain(20 * math.log10(mult)) # amplitude ratio
def _get_last_packet_duration(self) -> int:
"""Gets the duration (in samples) of the last packet successfully decoded or concealed."""
@ -428,7 +440,7 @@ class Decoder(_OpusStruct):
@overload
def decode(self, data: bytes, *, fec: bool) -> bytes:
...
@overload
def decode(self, data: Literal[None], *, fec: Literal[False]) -> bytes:
...
@ -451,4 +463,4 @@ class Decoder(_OpusStruct):
ret = _lib.opus_decode(self._state, data, len(data) if data else 0, pcm_ptr, frame_size, fec)
return array.array('h', pcm[:ret * channel_count]).tobytes()
return array.array('h', pcm[: ret * channel_count]).tobytes()