Add FFmpegOpusAudio and other voice improvements
Rework FFmpeg player and add FFmpegOpusAudio I have extracted some of the base FFmpeg source code into its own base class and reimplemented the PCM and the new Opus variants. Support avconv probing Also fix a few things Update `__all__` Fix the bugs Rework probe functions and add factory function Probing involves subprocess so it has been reworked into an async factory function. Add docs + a few tweaks * Removed unnecessary read() and is_opus() functions from FFmpegAudio * Clear self._stdout in cleanup() * Add 20 second process communication timeout to probe functions * Capped probe function bitrate values at 512 Change AudioPlayer to use more accurate, monotonic time.perf_counter() Add lazy opus loading The library now no longer loads libopus on import, only on opus.Encoder creation or manually. Fix review nits
This commit is contained in:
		| @@ -31,16 +31,21 @@ import asyncio | ||||
| import logging | ||||
| import shlex | ||||
| import time | ||||
| import json | ||||
| import re | ||||
|  | ||||
| from .errors import ClientException | ||||
| from .opus import Encoder as OpusEncoder | ||||
| from .oggparse import OggStream | ||||
|  | ||||
| log = logging.getLogger(__name__) | ||||
|  | ||||
| __all__ = ( | ||||
|     'AudioSource', | ||||
|     'PCMAudio', | ||||
|     'FFmpegAudio', | ||||
|     'FFmpegPCMAudio', | ||||
|     'FFmpegOpusAudio', | ||||
|     'PCMVolumeTransformer', | ||||
| ) | ||||
|  | ||||
| @@ -107,7 +112,55 @@ class PCMAudio(AudioSource): | ||||
|             return b'' | ||||
|         return ret | ||||
|  | ||||
| class FFmpegPCMAudio(AudioSource): | ||||
| class FFmpegAudio(AudioSource): | ||||
|     """Represents an FFmpeg (or AVConv) based AudioSource. | ||||
|  | ||||
|     User created AudioSources using FFmpeg differently from how :class:`FFmpegPCMAudio` and | ||||
|     :class:`FFmpegOpusAudio` work should subclass this. | ||||
|     """ | ||||
|  | ||||
|     def __init__(self, source, *, executable='ffmpeg', args, **subprocess_kwargs): | ||||
|         args = [executable, *args] | ||||
|         kwargs = {'stdout': subprocess.PIPE} | ||||
|         kwargs.update(subprocess_kwargs) | ||||
|  | ||||
|         self._process = self._spawn_process(args, **kwargs) | ||||
|         self._stdout = self._process.stdout | ||||
|  | ||||
|     def _spawn_process(self, args, **subprocess_kwargs): | ||||
|         process = None | ||||
|         try: | ||||
|             process = subprocess.Popen(args, **subprocess_kwargs) | ||||
|         except FileNotFoundError: | ||||
|             executable = args.partition(' ')[0] if isinstance(args, str) else args[0] | ||||
|             raise ClientException(executable + ' was not found.') from None | ||||
|         except subprocess.SubprocessError as exc: | ||||
|             raise ClientException('Popen failed: {0.__class__.__name__}: {0}'.format(exc)) from exc | ||||
|         else: | ||||
|             return process | ||||
|  | ||||
|     def cleanup(self): | ||||
|         proc = self._process | ||||
|         if proc is None: | ||||
|             return | ||||
|  | ||||
|         log.info('Preparing to terminate ffmpeg process %s.', proc.pid) | ||||
|  | ||||
|         try: | ||||
|             proc.kill() | ||||
|         except Exception: | ||||
|             log.exception("Ignoring error attempting to kill ffmpeg process %s", proc.pid) | ||||
|  | ||||
|         if proc.poll() is None: | ||||
|             log.info('ffmpeg process %s has not terminated. Waiting to terminate...', proc.pid) | ||||
|             proc.communicate() | ||||
|             log.info('ffmpeg process %s should have terminated with a return code of %s.', proc.pid, proc.returncode) | ||||
|         else: | ||||
|             log.info('ffmpeg process %s successfully terminated with return code of %s.', proc.pid, proc.returncode) | ||||
|  | ||||
|         self._process = self._stdout = None | ||||
|  | ||||
| class FFmpegPCMAudio(FFmpegAudio): | ||||
|     """An audio source from FFmpeg (or AVConv). | ||||
|  | ||||
|     This launches a sub-process to a specific input file given. | ||||
| @@ -131,10 +184,10 @@ class FFmpegPCMAudio(AudioSource): | ||||
|     stderr: Optional[:term:`py:file object`] | ||||
|         A file-like object to pass to the Popen constructor. | ||||
|         Could also be an instance of ``subprocess.PIPE``. | ||||
|     options: Optional[:class:`str`] | ||||
|         Extra command line arguments to pass to ffmpeg after the ``-i`` flag. | ||||
|     before_options: Optional[:class:`str`] | ||||
|         Extra command line arguments to pass to ffmpeg before the ``-i`` flag. | ||||
|     options: Optional[:class:`str`] | ||||
|         Extra command line arguments to pass to ffmpeg after the ``-i`` flag. | ||||
|  | ||||
|     Raises | ||||
|     -------- | ||||
| @@ -143,9 +196,8 @@ class FFmpegPCMAudio(AudioSource): | ||||
|     """ | ||||
|  | ||||
|     def __init__(self, source, *, executable='ffmpeg', pipe=False, stderr=None, before_options=None, options=None): | ||||
|         stdin = None if not pipe else source | ||||
|  | ||||
|         args = [executable] | ||||
|         args = [] | ||||
|         subprocess_kwargs = {'stdin': source if pipe else None, 'stderr': stderr} | ||||
|  | ||||
|         if isinstance(before_options, str): | ||||
|             args.extend(shlex.split(before_options)) | ||||
| @@ -159,14 +211,7 @@ class FFmpegPCMAudio(AudioSource): | ||||
|  | ||||
|         args.append('pipe:1') | ||||
|  | ||||
|         self._process = None | ||||
|         try: | ||||
|             self._process = subprocess.Popen(args, stdin=stdin, stdout=subprocess.PIPE, stderr=stderr) | ||||
|             self._stdout = self._process.stdout | ||||
|         except FileNotFoundError: | ||||
|             raise ClientException(executable + ' was not found.') from None | ||||
|         except subprocess.SubprocessError as exc: | ||||
|             raise ClientException('Popen failed: {0.__class__.__name__}: {0}'.format(exc)) from exc | ||||
|         super().__init__(source, executable=executable, args=args, **subprocess_kwargs) | ||||
|  | ||||
|     def read(self): | ||||
|         ret = self._stdout.read(OpusEncoder.FRAME_SIZE) | ||||
| @@ -174,21 +219,268 @@ class FFmpegPCMAudio(AudioSource): | ||||
|             return b'' | ||||
|         return ret | ||||
|  | ||||
|     def cleanup(self): | ||||
|         proc = self._process | ||||
|         if proc is None: | ||||
|             return | ||||
|     def is_opus(self): | ||||
|         return False | ||||
|  | ||||
|         log.info('Preparing to terminate ffmpeg process %s.', proc.pid) | ||||
|         proc.kill() | ||||
|         if proc.poll() is None: | ||||
|             log.info('ffmpeg process %s has not terminated. Waiting to terminate...', proc.pid) | ||||
|             proc.communicate() | ||||
|             log.info('ffmpeg process %s should have terminated with a return code of %s.', proc.pid, proc.returncode) | ||||
| class FFmpegOpusAudio(FFmpegAudio): | ||||
|     """An audio source from FFmpeg (or AVConv). | ||||
|  | ||||
|     This launches a sub-process to a specific input file given.  However, rather than | ||||
|     producing PCM packets like :class:`FFmpegPCMAudio` does that need to be encoded to | ||||
|     opus, this class produces opus packets, skipping the encoding step done by the library. | ||||
|  | ||||
|     Alternatively, instead of instantiating this class directly, you can use | ||||
|     :meth:`FFmpegOpusAudio.from_probe` to probe for bitrate and codec information.  This | ||||
|     can be used to opportunistically skip pointless re-encoding of existing opus audio data | ||||
|     for a boost in performance at the cost of a short initial delay to gather the information. | ||||
|     The same can be achieved by passing ``copy`` to the ``codec`` parameter, but only if you | ||||
|     know that the input source is opus encoded beforehand. | ||||
|  | ||||
|     .. warning:: | ||||
|  | ||||
|         You must have the ffmpeg or avconv executable in your path environment | ||||
|         variable in order for this to work. | ||||
|  | ||||
|     Parameters | ||||
|     ------------ | ||||
|     source: Union[:class:`str`, :class:`io.BufferedIOBase`] | ||||
|         The input that ffmpeg will take and convert to PCM bytes. | ||||
|         If ``pipe`` is True then this is a file-like object that is | ||||
|         passed to the stdin of ffmpeg. | ||||
|     bitrate: :class:`int` | ||||
|         The bitrate in kbps to encode the output to.  Defaults to ``128``. | ||||
|     codec: Optional[:class:`str`] | ||||
|         The codec to use to encode the audio data.  Normally this would be | ||||
|         just ``libopus``, but is used by :meth:`FFmpegOpusAudio.from_probe` to | ||||
|         opportunistically skip pointlessly re-encoding opus audio data by passing | ||||
|         ``copy`` as the codec value.  Any values other than ``copy``, ``opus``, or | ||||
|         ``libopus`` will be considered ``libopus``.  Defaults to ``libopus``. | ||||
|  | ||||
|         .. warning:: | ||||
|  | ||||
|             Do not provide this parameter unless you are certain that the audio input is | ||||
|             already opus encoded.  For typical use :meth:`FFmpegOpusAudio.from_probe` | ||||
|             should be used to determine the proper value for this parameter. | ||||
|  | ||||
|     executable: :class:`str` | ||||
|         The executable name (and path) to use. Defaults to ``ffmpeg``. | ||||
|     pipe: :class:`bool` | ||||
|         If ``True``, denotes that ``source`` parameter will be passed | ||||
|         to the stdin of ffmpeg. Defaults to ``False``. | ||||
|     stderr: Optional[:term:`py:file object`] | ||||
|         A file-like object to pass to the Popen constructor. | ||||
|         Could also be an instance of ``subprocess.PIPE``. | ||||
|     before_options: Optional[:class:`str`] | ||||
|         Extra command line arguments to pass to ffmpeg before the ``-i`` flag. | ||||
|     options: Optional[:class:`str`] | ||||
|         Extra command line arguments to pass to ffmpeg after the ``-i`` flag. | ||||
|  | ||||
|     Raises | ||||
|     -------- | ||||
|     ClientException | ||||
|         The subprocess failed to be created. | ||||
|     """ | ||||
|  | ||||
|     def __init__(self, source, *, bitrate=128, codec=None, executable='ffmpeg', | ||||
|                  pipe=False, stderr=None, before_options=None, options=None): | ||||
|  | ||||
|         args = [] | ||||
|         subprocess_kwargs = {'stdin': source if pipe else None, 'stderr': stderr} | ||||
|  | ||||
|         if isinstance(before_options, str): | ||||
|             args.extend(shlex.split(before_options)) | ||||
|  | ||||
|         args.append('-i') | ||||
|         args.append('-' if pipe else source) | ||||
|  | ||||
|         codec = 'copy' if codec in ('opus', 'libopus') else 'libopus' | ||||
|  | ||||
|         args.extend(('-map_metadata', '-1', | ||||
|                      '-f', 'opus', | ||||
|                      '-c:a', codec, | ||||
|                      '-ar', '48000', | ||||
|                      '-ac', '2', | ||||
|                      '-b:a', '%sk' % bitrate, | ||||
|                      '-loglevel', 'warning')) | ||||
|  | ||||
|         if isinstance(options, str): | ||||
|             args.extend(shlex.split(options)) | ||||
|  | ||||
|         args.append('pipe:1') | ||||
|  | ||||
|         super().__init__(source, executable=executable, args=args, **subprocess_kwargs) | ||||
|         self._packet_iter = OggStream(self._stdout).iter_packets() | ||||
|  | ||||
|     @classmethod | ||||
|     async def from_probe(cls, source, *, method=None, **kwargs): | ||||
|         """|coro| | ||||
|  | ||||
|         A factory method that creates a :class:`FFmpegOpusAudio` after probing | ||||
|         the input source for audio codec and bitrate information. | ||||
|  | ||||
|         Examples | ||||
|         ---------- | ||||
|  | ||||
|         Use this function to create an :class:`FFmpegOpusAudio` instance instead of the constructor: :: | ||||
|  | ||||
|             source = await discord.FFmpegOpusAudio.from_probe("song.webm") | ||||
|             voice_client.play(source) | ||||
|  | ||||
|         If you are on Windows and don't have ffprobe installed, use the ``fallback`` method | ||||
|         to probe using ffmpeg instead: :: | ||||
|  | ||||
|             source = await discord.FFmpegOpusAudio.from_probe("song.webm", method='fallback') | ||||
|             voice_client.play(source) | ||||
|  | ||||
|         Using a custom method of determining codec and bitrate: :: | ||||
|  | ||||
|             def custom_probe(source, executable): | ||||
|                 # some analysis code here | ||||
|  | ||||
|                 return codec, bitrate | ||||
|  | ||||
|             source = await discord.FFmpegOpusAudio.from_probe("song.webm", method=custom_probe) | ||||
|             voice_client.play(source) | ||||
|  | ||||
|         Parameters | ||||
|         ------------ | ||||
|         source | ||||
|             Identical to the ``source`` parameter for the constructor. | ||||
|         method: Optional[Union[:class:`str`, Callable[:class:`str`, :class:`str`]]] | ||||
|             The probing method used to determine bitrate and codec information. As a string, valid | ||||
|             values are ``native`` to use ffprobe (or avprobe) and ``fallback`` to use ffmpeg | ||||
|             (or avconv).  As a callable, it must take two string arguments, ``source`` and | ||||
|             ``executable``.  Both parameters are the same values passed to this factory function. | ||||
|             ``executable`` will default to ``ffmpeg`` if not provided as a keyword argument. | ||||
|         kwargs | ||||
|             The remaining parameters to be passed to the :class:`FFmpegOpusAudio` constructor, | ||||
|             excluding ``bitrate`` and ``codec``. | ||||
|  | ||||
|         Raises | ||||
|         -------- | ||||
|         AttributeError | ||||
|             Invalid probe method, must be ``'native'`` or ``'fallback'``. | ||||
|         TypeError | ||||
|             Invalid value for ``probe`` parameter, must be :class:`str` or a callable. | ||||
|  | ||||
|         Returns | ||||
|         -------- | ||||
|         :class:`FFmpegOpusAudio` | ||||
|             An instance of this class. | ||||
|         """ | ||||
|  | ||||
|         executable = kwargs.get('executable') | ||||
|         codec, bitrate = await cls.probe(source, method=method, executable=executable) | ||||
|         return cls(source, bitrate=bitrate, codec=codec, **kwargs) | ||||
|  | ||||
|     @classmethod | ||||
|     async def probe(cls, source, *, method=None, executable=None): | ||||
|         """|coro| | ||||
|  | ||||
|         Probes the input source for bitrate and codec information. | ||||
|  | ||||
|         Parameters | ||||
|         ------------ | ||||
|         source | ||||
|             Identical to the ``source`` parameter for :class:`FFmpegOpusAudio`. | ||||
|         method | ||||
|             Identical to the ``method`` parameter for :meth:`FFmpegOpusAudio.from_probe`. | ||||
|         executable: :class:`str` | ||||
|             Identical to the ``executable`` parameter for :class:`FFmpegOpusAudio`. | ||||
|  | ||||
|         Raises | ||||
|         -------- | ||||
|         AttributeError | ||||
|             Invalid probe method, must be ``'native'`` or ``'fallback'``. | ||||
|         TypeError | ||||
|             Invalid value for ``probe`` parameter, must be :class:`str` or a callable. | ||||
|  | ||||
|         Returns | ||||
|         --------- | ||||
|         Tuple[Optional[:class:`str`], Optional[:class:`int`]] | ||||
|             A 2-tuple with the codec and bitrate of the input source. | ||||
|         """ | ||||
|  | ||||
|         method = method or 'native' | ||||
|         executable = executable or 'ffmpeg' | ||||
|         probefunc = fallback = None | ||||
|  | ||||
|         if isinstance(method, str): | ||||
|             probefunc = getattr(cls, '_probe_codec_' + method, None) | ||||
|             if probefunc is None: | ||||
|                 raise AttributeError("Invalid probe method '%s'" % method) | ||||
|  | ||||
|             if probefunc is cls._probe_codec_native: | ||||
|                 fallback = cls._probe_codec_fallback | ||||
|  | ||||
|         elif callable(method): | ||||
|             probefunc = method | ||||
|             fallback = cls._probe_codec_fallback | ||||
|         else: | ||||
|             log.info('ffmpeg process %s successfully terminated with return code of %s.', proc.pid, proc.returncode) | ||||
|             raise TypeError("Expected str or callable for parameter 'probe', " \ | ||||
|                             "not '{0.__class__.__name__}'" .format(method)) | ||||
|  | ||||
|         self._process = None | ||||
|         codec = bitrate = None | ||||
|         loop = asyncio.get_event_loop() | ||||
|         try: | ||||
|             codec, bitrate = await loop.run_in_executor(None, lambda: probefunc(source, executable)) | ||||
|         except Exception: | ||||
|             if not fallback: | ||||
|                 log.exception("Probe '%s' using '%s' failed", method, executable) | ||||
|                 return | ||||
|  | ||||
|             log.exception("Probe '%s' using '%s' failed, trying fallback", method, executable) | ||||
|             try: | ||||
|                 codec, bitrate = await loop.run_in_executor(None, lambda: fallback(source, executable)) | ||||
|             except Exception: | ||||
|                 log.exception("Fallback probe using '%s' failed", executable) | ||||
|             else: | ||||
|                 log.info("Fallback probe found codec=%s, bitrate=%s", codec, bitrate) | ||||
|         else: | ||||
|             log.info("Probe found codec=%s, bitrate=%s", codec, bitrate) | ||||
|         finally: | ||||
|             return codec, bitrate | ||||
|  | ||||
|     @staticmethod | ||||
|     def _probe_codec_native(source, executable='ffmpeg'): | ||||
|         exe = executable[:2] + 'probe' if executable in ('ffmpeg', 'avconv') else executable | ||||
|         args = [exe, '-v', 'quiet', '-print_format', 'json', '-show_streams', '-select_streams', 'a:0', source] | ||||
|         output = subprocess.check_output(args, timeout=20) | ||||
|         codec = bitrate = None | ||||
|  | ||||
|         if output: | ||||
|             data = json.loads(output) | ||||
|             streamdata = data['streams'][0] | ||||
|  | ||||
|             codec = streamdata.get('codec_name') | ||||
|             bitrate = int(streamdata.get('bit_rate', 0)) | ||||
|             bitrate = max(round(bitrate/1000, 0), 512) | ||||
|  | ||||
|         return codec, bitrate | ||||
|  | ||||
|     @staticmethod | ||||
|     def _probe_codec_fallback(source, executable='ffmpeg'): | ||||
|         args = [executable, '-hide_banner', '-i',  source] | ||||
|         proc = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) | ||||
|         out, _ = proc.communicate(timeout=20) | ||||
|         output = out.decode('utf8') | ||||
|         codec = bitrate = None | ||||
|  | ||||
|         codec_match = re.search(r"Stream #0.*?Audio: (\w+)", output) | ||||
|         if codec_match: | ||||
|             codec = codec_match.group(1) | ||||
|  | ||||
|         br_match = re.search(r"(\d+) [kK]b/s", output) | ||||
|         if br_match: | ||||
|             bitrate = max(int(br_match.group(1)), 512) | ||||
|  | ||||
|         return codec, bitrate | ||||
|  | ||||
|     def read(self): | ||||
|         return next(self._packet_iter, b'') | ||||
|  | ||||
|     def is_opus(self): | ||||
|         return True | ||||
|  | ||||
| class PCMVolumeTransformer(AudioSource): | ||||
|     """Transforms a previous :class:`AudioSource` to have volume controls. | ||||
| @@ -260,7 +552,7 @@ class AudioPlayer(threading.Thread): | ||||
|  | ||||
|     def _do_run(self): | ||||
|         self.loops = 0 | ||||
|         self._start = time.time() | ||||
|         self._start = time.perf_counter() | ||||
|  | ||||
|         # getattr lookup speed ups | ||||
|         play_audio = self.client.send_audio_packet | ||||
| @@ -279,7 +571,7 @@ class AudioPlayer(threading.Thread): | ||||
|                 self._connected.wait() | ||||
|                 # reset our internal data | ||||
|                 self.loops = 0 | ||||
|                 self._start = time.time() | ||||
|                 self._start = time.perf_counter() | ||||
|  | ||||
|             self.loops += 1 | ||||
|             data = self.source.read() | ||||
| @@ -290,7 +582,7 @@ class AudioPlayer(threading.Thread): | ||||
|  | ||||
|             play_audio(data, encode=not self.source.is_opus()) | ||||
|             next_time = self._start + self.DELAY * self.loops | ||||
|             delay = max(0, self.DELAY + (next_time - time.time())) | ||||
|             delay = max(0, self.DELAY + (next_time - time.perf_counter())) | ||||
|             time.sleep(delay) | ||||
|  | ||||
|     def run(self): | ||||
| @@ -322,7 +614,7 @@ class AudioPlayer(threading.Thread): | ||||
|  | ||||
|     def resume(self, *, update_speaking=True): | ||||
|         self.loops = 0 | ||||
|         self._start = time.time() | ||||
|         self._start = time.perf_counter() | ||||
|         self._resumed.set() | ||||
|         if update_speaking: | ||||
|             self._speak(True) | ||||
|   | ||||
		Reference in New Issue
	
	Block a user