Add in Decoder

This commit is contained in:
Devon R 2020-12-13 10:12:04 +09:00 committed by GitHub
parent a4b20d08c3
commit a20ddfa766
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -28,13 +28,16 @@ import array
import ctypes import ctypes
import ctypes.util import ctypes.util
import logging import logging
import math
import os.path import os.path
import struct
import sys import sys
from .errors import DiscordException from .errors import DiscordException
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
c_int_ptr = ctypes.POINTER(ctypes.c_int)
c_int_ptr = ctypes.POINTER(ctypes.c_int)
c_int16_ptr = ctypes.POINTER(ctypes.c_int16) c_int16_ptr = ctypes.POINTER(ctypes.c_int16)
c_float_ptr = ctypes.POINTER(ctypes.c_float) c_float_ptr = ctypes.POINTER(ctypes.c_float)
@ -43,17 +46,55 @@ _lib = None
class EncoderStruct(ctypes.Structure): class EncoderStruct(ctypes.Structure):
pass pass
class DecoderStruct(ctypes.Structure):
pass
EncoderStructPtr = ctypes.POINTER(EncoderStruct) EncoderStructPtr = ctypes.POINTER(EncoderStruct)
DecoderStructPtr = ctypes.POINTER(DecoderStruct)
## Some constants from opus_defines.h
# Error codes
OK = 0
BAD_ARG = -1
# Encoder CTLs
APPLICATION_AUDIO = 2049
APPLICATION_VOIP = 2048
APPLICATION_LOWDELAY = 2051
CTL_SET_BITRATE = 4002
CTL_SET_BANDWIDTH = 4008
CTL_SET_FEC = 4012
CTL_SET_PLP = 4014
CTL_SET_SIGNAL = 4024
# Decoder CTLs
CTL_SET_GAIN = 4034
CTL_LAST_PACKET_DURATION = 4039
band_ctl = {
'narrow': 1101,
'medium': 1102,
'wide': 1103,
'superwide': 1104,
'full': 1105,
}
signal_ctl = {
'auto': -1000,
'voice': 3001,
'music': 3002,
}
def _err_lt(result, func, args): def _err_lt(result, func, args):
if result < 0: if result < OK:
log.info('error has happened in %s', func.__name__) log.info('error has happened in %s', func.__name__)
raise OpusError(result) raise OpusError(result)
return result return result
def _err_ne(result, func, args): def _err_ne(result, func, args):
ret = args[-1]._obj ret = args[-1]._obj
if ret.value != 0: if ret.value != OK:
log.info('error has happened in %s', func.__name__) log.info('error has happened in %s', func.__name__)
raise OpusError(ret.value) raise OpusError(ret.value)
return result return result
@ -64,18 +105,53 @@ def _err_ne(result, func, args):
# The third is the result type. # The third is the result type.
# The fourth is the error handler. # The fourth is the error handler.
exported_functions = [ exported_functions = [
# Generic
('opus_get_version_string',
None, ctypes.c_char_p, None),
('opus_strerror', ('opus_strerror',
[ctypes.c_int], ctypes.c_char_p, None), [ctypes.c_int], ctypes.c_char_p, None),
# Encoder functions
('opus_encoder_get_size', ('opus_encoder_get_size',
[ctypes.c_int], ctypes.c_int, None), [ctypes.c_int], ctypes.c_int, None),
('opus_encoder_create', ('opus_encoder_create',
[ctypes.c_int, ctypes.c_int, ctypes.c_int, c_int_ptr], EncoderStructPtr, _err_ne), [ctypes.c_int, ctypes.c_int, ctypes.c_int, c_int_ptr], EncoderStructPtr, _err_ne),
('opus_encode', ('opus_encode',
[EncoderStructPtr, c_int16_ptr, ctypes.c_int, ctypes.c_char_p, ctypes.c_int32], ctypes.c_int32, _err_lt), [EncoderStructPtr, c_int16_ptr, ctypes.c_int, ctypes.c_char_p, ctypes.c_int32], ctypes.c_int32, _err_lt),
('opus_encode_float',
[EncoderStructPtr, c_float_ptr, ctypes.c_int, ctypes.c_char_p, ctypes.c_int32], ctypes.c_int32, _err_lt),
('opus_encoder_ctl', ('opus_encoder_ctl',
None, ctypes.c_int32, _err_lt), None, ctypes.c_int32, _err_lt),
('opus_encoder_destroy', ('opus_encoder_destroy',
[EncoderStructPtr], None, None), [EncoderStructPtr], None, None),
# Decoder functions
('opus_decoder_get_size',
[ctypes.c_int], ctypes.c_int, None),
('opus_decoder_create',
[ctypes.c_int, ctypes.c_int, c_int_ptr], DecoderStructPtr, _err_ne),
('opus_decode',
[DecoderStructPtr, ctypes.c_char_p, ctypes.c_int32, c_int16_ptr, ctypes.c_int, ctypes.c_int],
ctypes.c_int, _err_lt),
('opus_decode_float',
[DecoderStructPtr, ctypes.c_char_p, ctypes.c_int32, c_float_ptr, ctypes.c_int, ctypes.c_int],
ctypes.c_int, _err_lt),
('opus_decoder_ctl',
None, ctypes.c_int32, _err_lt),
('opus_decoder_destroy',
[DecoderStructPtr], None, None),
('opus_decoder_get_nb_samples',
[DecoderStructPtr, ctypes.c_char_p, ctypes.c_int32], ctypes.c_int, _err_lt),
# Packet functions
('opus_packet_get_bandwidth',
[ctypes.c_char_p], ctypes.c_int, _err_lt),
('opus_packet_get_nb_channels',
[ctypes.c_char_p], ctypes.c_int, _err_lt),
('opus_packet_get_nb_frames',
[ctypes.c_char_p, ctypes.c_int], ctypes.c_int, _err_lt),
('opus_packet_get_samples_per_frame',
[ctypes.c_char_p, ctypes.c_int], ctypes.c_int, _err_lt),
] ]
def libopus_loader(name): def libopus_loader(name):
@ -107,8 +183,9 @@ def _load_default():
try: try:
if sys.platform == 'win32': if sys.platform == 'win32':
_basedir = os.path.dirname(os.path.abspath(__file__)) _basedir = os.path.dirname(os.path.abspath(__file__))
_bitness = 'x64' if sys.maxsize > 2**32 else 'x86' _bitness = struct.calcsize('P') * 8
_filename = os.path.join(_basedir, 'bin', 'libopus-0.{}.dll'.format(_bitness)) _target = 'x64' if _bitness > 32 else 'x86'
_filename = os.path.join(_basedir, 'bin', 'libopus-0.{}.dll'.format(_target))
_lib = libopus_loader(_filename) _lib = libopus_loader(_filename)
else: else:
_lib = libopus_loader(ctypes.util.find_library('opus')) _lib = libopus_loader(ctypes.util.find_library('opus'))
@ -188,48 +265,30 @@ class OpusNotLoaded(DiscordException):
"""An exception that is thrown for when libopus is not loaded.""" """An exception that is thrown for when libopus is not loaded."""
pass pass
class _OpusStruct:
# Some constants...
OK = 0
APPLICATION_AUDIO = 2049
APPLICATION_VOIP = 2048
APPLICATION_LOWDELAY = 2051
CTL_SET_BITRATE = 4002
CTL_SET_BANDWIDTH = 4008
CTL_SET_FEC = 4012
CTL_SET_PLP = 4014
CTL_SET_SIGNAL = 4024
band_ctl = {
'narrow': 1101,
'medium': 1102,
'wide': 1103,
'superwide': 1104,
'full': 1105,
}
signal_ctl = {
'auto': -1000,
'voice': 3001,
'music': 3002,
}
class Encoder:
SAMPLING_RATE = 48000 SAMPLING_RATE = 48000
CHANNELS = 2 CHANNELS = 2
FRAME_LENGTH = 20 FRAME_LENGTH = 20 # in milliseconds
SAMPLE_SIZE = 4 # (bit_rate / 8) * CHANNELS (bit_rate == 16) SAMPLE_SIZE = struct.calcsize('h') * CHANNELS
SAMPLES_PER_FRAME = int(SAMPLING_RATE / 1000 * FRAME_LENGTH) SAMPLES_PER_FRAME = int(SAMPLING_RATE / 1000 * FRAME_LENGTH)
FRAME_SIZE = SAMPLES_PER_FRAME * SAMPLE_SIZE FRAME_SIZE = SAMPLES_PER_FRAME * SAMPLE_SIZE
def __init__(self, application=APPLICATION_AUDIO): @staticmethod
self.application = application def get_opus_version() -> str:
if not is_loaded(): if not is_loaded():
if not _load_default(): if not _load_default():
raise OpusNotLoaded() raise OpusNotLoaded()
return _lib.opus_get_version_string().decode('utf-8')
class Encoder(_OpusStruct):
def __init__(self, application=APPLICATION_AUDIO):
if not is_loaded():
if not _load_default():
raise OpusNotLoaded()
self.application = application
self._state = self._create_state() self._state = self._create_state()
self.set_bitrate(128) self.set_bitrate(128)
self.set_fec(True) self.set_fec(True)
@ -280,3 +339,84 @@ class Encoder:
ret = _lib.opus_encode(self._state, pcm, frame_size, data, max_data_bytes) ret = _lib.opus_encode(self._state, pcm, frame_size, data, max_data_bytes)
return array.array('b', data[:ret]).tobytes() return array.array('b', data[:ret]).tobytes()
class Decoder(_OpusStruct):
def __init__(self):
if not is_loaded():
if not _load_default():
raise OpusNotLoaded()
self._state = self._create_state()
def __del__(self):
if hasattr(self, '_state'):
_lib.opus_decoder_destroy(self._state)
self._state = None
def _create_state(self):
ret = ctypes.c_int()
return _lib.opus_decoder_create(self.SAMPLING_RATE, self.CHANNELS, ctypes.byref(ret))
@staticmethod
def packet_get_nb_frames(data):
"""Gets the number of frames in an Opus packet"""
return _lib.opus_packet_get_nb_frames(data, len(data))
@staticmethod
def packet_get_nb_channels(data):
"""Gets the number of channels in an Opus packet"""
return _lib.opus_packet_get_nb_channels(data)
@classmethod
def packet_get_samples_per_frame(cls, data):
"""Gets the number of samples per frame from an Opus packet"""
return _lib.opus_packet_get_samples_per_frame(data, cls.SAMPLING_RATE)
def _set_gain(self, adjustment):
"""Configures decoder gain adjustment.
Scales the decoded output by a factor specified in Q8 dB units.
This has a maximum range of -32768 to 32767 inclusive, and returns
OPUS_BAD_ARG (-1) otherwise. The default is zero indicating no adjustment.
This setting survives decoder reset (irrelevant for now).
gain = 10**x/(20.0*256)
(from opus_defines.h)
"""
return _lib.opus_decoder_ctl(self._state, CTL_SET_GAIN, adjustment)
def set_gain(self, dB):
"""Sets the decoder gain in dB, from -128 to 128."""
dB_Q8 = max(-32768, min(32767, round(dB * 256))) # dB * 2^n where n is 8 (Q8)
return self._set_gain(dB_Q8)
def set_volume(self, mult):
"""Sets the output volume as a float percent, i.e. 0.5 for 50%, 1.75 for 175%, etc."""
return self.set_gain(20 * math.log10(mult)) # amplitude ratio
def _get_last_packet_duration(self):
"""Gets the duration (in samples) of the last packet successfully decoded or concealed."""
ret = ctypes.c_int32()
_lib.opus_decoder_ctl(self._state, CTL_LAST_PACKET_DURATION, ctypes.byref(ret))
return ret.value
def decode(self, data, *, fec=False):
if data is None and fec:
raise OpusError("Invalid arguments: FEC cannot be used with null data")
if data is None:
frame_size = self._get_last_packet_duration() or self.SAMPLES_PER_FRAME
channel_count = self.CHANNELS
else:
frames = self.packet_get_nb_frames(data)
channel_count = self.packet_get_nb_channels(data)
samples_per_frame = self.packet_get_samples_per_frame(data)
frame_size = frames * samples_per_frame
pcm = (ctypes.c_int16 * (frame_size * channel_count))()
pcm_ptr = ctypes.cast(pcm, c_int16_ptr)
ret = _lib.opus_decode(self._state, data, len(data) if data else 0, pcm_ptr, frame_size, fec)
return array.array('h', pcm[:ret * channel_count]).tobytes()