276 lines
		
	
	
		
			8.1 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			276 lines
		
	
	
		
			8.1 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| # -*- coding: utf-8 -*-
 | |
| 
 | |
| """
 | |
| The MIT License (MIT)
 | |
| 
 | |
| Copyright (c) 2015-2019 Rapptz
 | |
| 
 | |
| Permission is hereby granted, free of charge, to any person obtaining a
 | |
| copy of this software and associated documentation files (the "Software"),
 | |
| to deal in the Software without restriction, including without limitation
 | |
| the rights to use, copy, modify, merge, publish, distribute, sublicense,
 | |
| and/or sell copies of the Software, and to permit persons to whom the
 | |
| Software is furnished to do so, subject to the following conditions:
 | |
| 
 | |
| The above copyright notice and this permission notice shall be included in
 | |
| all copies or substantial portions of the Software.
 | |
| 
 | |
| THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
 | |
| OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
 | |
| FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
 | |
| AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
 | |
| LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
 | |
| FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
 | |
| DEALINGS IN THE SOFTWARE.
 | |
| """
 | |
| 
 | |
| import array
 | |
| import ctypes
 | |
| import ctypes.util
 | |
| import logging
 | |
| import os.path
 | |
| import sys
 | |
| 
 | |
| from .errors import DiscordException
 | |
| 
 | |
| log = logging.getLogger(__name__)
 | |
| c_int_ptr = ctypes.POINTER(ctypes.c_int)
 | |
| c_int16_ptr = ctypes.POINTER(ctypes.c_int16)
 | |
| c_float_ptr = ctypes.POINTER(ctypes.c_float)
 | |
| 
 | |
| class EncoderStruct(ctypes.Structure):
 | |
|     pass
 | |
| 
 | |
| EncoderStructPtr = ctypes.POINTER(EncoderStruct)
 | |
| 
 | |
| def _err_lt(result, func, args):
 | |
|     if result < 0:
 | |
|         log.info('error has happened in %s', func.__name__)
 | |
|         raise OpusError(result)
 | |
|     return result
 | |
| 
 | |
| def _err_ne(result, func, args):
 | |
|     ret = args[-1]._obj
 | |
|     if ret.value != 0:
 | |
|         log.info('error has happened in %s', func.__name__)
 | |
|         raise OpusError(ret.value)
 | |
|     return result
 | |
| 
 | |
| # A list of exported functions.
 | |
| # The first argument is obviously the name.
 | |
| # The second one are the types of arguments it takes.
 | |
| # The third is the result type.
 | |
| # The fourth is the error handler.
 | |
| exported_functions = [
 | |
|     ('opus_strerror',
 | |
|         [ctypes.c_int], ctypes.c_char_p, None),
 | |
|     ('opus_encoder_get_size',
 | |
|         [ctypes.c_int], ctypes.c_int, None),
 | |
|     ('opus_encoder_create',
 | |
|         [ctypes.c_int, ctypes.c_int, ctypes.c_int, c_int_ptr], EncoderStructPtr, _err_ne),
 | |
|     ('opus_encode',
 | |
|         [EncoderStructPtr, c_int16_ptr, ctypes.c_int, ctypes.c_char_p, ctypes.c_int32], ctypes.c_int32, _err_lt),
 | |
|     ('opus_encoder_ctl',
 | |
|         None, ctypes.c_int32, _err_lt),
 | |
|     ('opus_encoder_destroy',
 | |
|         [EncoderStructPtr], None, None),
 | |
| ]
 | |
| 
 | |
| def libopus_loader(name):
 | |
|     # create the library...
 | |
|     lib = ctypes.cdll.LoadLibrary(name)
 | |
| 
 | |
|     # register the functions...
 | |
|     for item in exported_functions:
 | |
|         func = getattr(lib, item[0])
 | |
| 
 | |
|         try:
 | |
|             if item[1]:
 | |
|                 func.argtypes = item[1]
 | |
| 
 | |
|             func.restype = item[2]
 | |
|         except KeyError:
 | |
|             pass
 | |
| 
 | |
|         try:
 | |
|             if item[3]:
 | |
|                 func.errcheck = item[3]
 | |
|         except KeyError:
 | |
|             log.exception("Error assigning check function to %s", func)
 | |
| 
 | |
|     return lib
 | |
| 
 | |
| try:
 | |
|     if sys.platform == 'win32':
 | |
|         _basedir = os.path.dirname(os.path.abspath(__file__))
 | |
|         _bitness = 'x64' if sys.maxsize > 2**32 else 'x86'
 | |
|         _filename = os.path.join(_basedir, 'bin', 'libopus-0.{}.dll'.format(_bitness))
 | |
|         _lib = libopus_loader(_filename)
 | |
|     else:
 | |
|         _lib = libopus_loader(ctypes.util.find_library('opus'))
 | |
| except Exception:
 | |
|     _lib = None
 | |
| 
 | |
| def load_opus(name):
 | |
|     """Loads the libopus shared library for use with voice.
 | |
| 
 | |
|     If this function is not called then the library uses the function
 | |
|     :func:`ctypes.util.find_library` and then loads that one
 | |
|     if available.
 | |
| 
 | |
|     Not loading a library leads to voice not working.
 | |
| 
 | |
|     This function propagates the exceptions thrown.
 | |
| 
 | |
|     .. warning::
 | |
| 
 | |
|         The bitness of the library must match the bitness of your python
 | |
|         interpreter. If the library is 64-bit then your python interpreter
 | |
|         must be 64-bit as well. Usually if there's a mismatch in bitness then
 | |
|         the load will throw an exception.
 | |
| 
 | |
|     .. note::
 | |
| 
 | |
|         On Windows, this function should not need to be called as the binaries
 | |
|         are automatically loaded.
 | |
| 
 | |
|     .. note::
 | |
| 
 | |
|         On Windows, the .dll extension is not necessary. However, on Linux
 | |
|         the full extension is required to load the library, e.g. ``libopus.so.1``.
 | |
|         On Linux however, :func:`ctypes.util.find_library` will usually find the library automatically
 | |
|         without you having to call this.
 | |
| 
 | |
|     Parameters
 | |
|     ----------
 | |
|     name: :class:`str`
 | |
|         The filename of the shared library.
 | |
|     """
 | |
|     global _lib
 | |
|     _lib = libopus_loader(name)
 | |
| 
 | |
| def is_loaded():
 | |
|     """Function to check if opus lib is successfully loaded either
 | |
|     via the :func:`ctypes.util.find_library` call of :func:`load_opus`.
 | |
| 
 | |
|     This must return ``True`` for voice to work.
 | |
| 
 | |
|     Returns
 | |
|     -------
 | |
|     :class:`bool`
 | |
|         Indicates if the opus library has been loaded.
 | |
|     """
 | |
|     global _lib
 | |
|     return _lib is not None
 | |
| 
 | |
| class OpusError(DiscordException):
 | |
|     """An exception that is thrown for libopus related errors.
 | |
| 
 | |
|     Attributes
 | |
|     ----------
 | |
|     code: :class:`int`
 | |
|         The error code returned.
 | |
|     """
 | |
| 
 | |
|     def __init__(self, code):
 | |
|         self.code = code
 | |
|         msg = _lib.opus_strerror(self.code).decode('utf-8')
 | |
|         log.info('"%s" has happened', msg)
 | |
|         super().__init__(msg)
 | |
| 
 | |
| class OpusNotLoaded(DiscordException):
 | |
|     """An exception that is thrown for when libopus is not loaded."""
 | |
|     pass
 | |
| 
 | |
| 
 | |
| # Some constants...
 | |
| OK = 0
 | |
| APPLICATION_AUDIO    = 2049
 | |
| APPLICATION_VOIP     = 2048
 | |
| APPLICATION_LOWDELAY = 2051
 | |
| CTL_SET_BITRATE      = 4002
 | |
| CTL_SET_BANDWIDTH    = 4008
 | |
| CTL_SET_FEC          = 4012
 | |
| CTL_SET_PLP          = 4014
 | |
| CTL_SET_SIGNAL       = 4024
 | |
| 
 | |
| band_ctl = {
 | |
|     'narrow': 1101,
 | |
|     'medium': 1102,
 | |
|     'wide': 1103,
 | |
|     'superwide': 1104,
 | |
|     'full': 1105,
 | |
| }
 | |
| 
 | |
| signal_ctl = {
 | |
|     'auto': -1000,
 | |
|     'voice': 3001,
 | |
|     'music': 3002,
 | |
| }
 | |
| 
 | |
| class Encoder:
 | |
|     SAMPLING_RATE = 48000
 | |
|     CHANNELS = 2
 | |
|     FRAME_LENGTH = 20
 | |
|     SAMPLE_SIZE = 4 # (bit_rate / 8) * CHANNELS (bit_rate == 16)
 | |
|     SAMPLES_PER_FRAME = int(SAMPLING_RATE / 1000 * FRAME_LENGTH)
 | |
| 
 | |
|     FRAME_SIZE = SAMPLES_PER_FRAME * SAMPLE_SIZE
 | |
| 
 | |
|     def __init__(self, application=APPLICATION_AUDIO):
 | |
|         self.application = application
 | |
| 
 | |
|         if not is_loaded():
 | |
|             raise OpusNotLoaded()
 | |
| 
 | |
|         self._state = self._create_state()
 | |
|         self.set_bitrate(128)
 | |
|         self.set_fec(True)
 | |
|         self.set_expected_packet_loss_percent(0.15)
 | |
|         self.set_bandwidth('full')
 | |
|         self.set_signal_type('auto')
 | |
| 
 | |
|     def __del__(self):
 | |
|         if hasattr(self, '_state'):
 | |
|             _lib.opus_encoder_destroy(self._state)
 | |
|             self._state = None
 | |
| 
 | |
|     def _create_state(self):
 | |
|         ret = ctypes.c_int()
 | |
|         return _lib.opus_encoder_create(self.SAMPLING_RATE, self.CHANNELS, self.application, ctypes.byref(ret))
 | |
| 
 | |
|     def set_bitrate(self, kbps):
 | |
|         kbps = min(512, max(16, int(kbps)))
 | |
| 
 | |
|         _lib.opus_encoder_ctl(self._state, CTL_SET_BITRATE, kbps * 1024)
 | |
|         return kbps
 | |
| 
 | |
|     def set_bandwidth(self, req):
 | |
|         if req not in band_ctl:
 | |
|             raise KeyError('%r is not a valid bandwidth setting. Try one of: %s' % (req, ','.join(band_ctl)))
 | |
| 
 | |
|         k = band_ctl[req]
 | |
|         _lib.opus_encoder_ctl(self._state, CTL_SET_BANDWIDTH, k)
 | |
| 
 | |
|     def set_signal_type(self, req):
 | |
|         if req not in signal_ctl:
 | |
|             raise KeyError('%r is not a valid signal setting. Try one of: %s' % (req, ','.join(signal_ctl)))
 | |
| 
 | |
|         k = signal_ctl[req]
 | |
|         _lib.opus_encoder_ctl(self._state, CTL_SET_SIGNAL, k)
 | |
| 
 | |
|     def set_fec(self, enabled=True):
 | |
|         _lib.opus_encoder_ctl(self._state, CTL_SET_FEC, 1 if enabled else 0)
 | |
| 
 | |
|     def set_expected_packet_loss_percent(self, percentage):
 | |
|         _lib.opus_encoder_ctl(self._state, CTL_SET_PLP, min(100, max(0, int(percentage * 100))))
 | |
| 
 | |
|     def encode(self, pcm, frame_size):
 | |
|         max_data_bytes = len(pcm)
 | |
|         pcm = ctypes.cast(pcm, c_int16_ptr)
 | |
|         data = (ctypes.c_char * max_data_bytes)()
 | |
| 
 | |
|         ret = _lib.opus_encode(self._state, pcm, frame_size, data, max_data_bytes)
 | |
| 
 | |
|         return array.array('b', data[:ret]).tobytes()
 |