Type-hint voice_client / player

This commit is contained in:
Josh
2021-06-28 14:59:14 +10:00
committed by GitHub
parent cd6b453cb3
commit 5acea453cc
3 changed files with 225 additions and 128 deletions

View File

@ -21,6 +21,7 @@ LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
DEALINGS IN THE SOFTWARE.
"""
from __future__ import annotations
import threading
import traceback
@ -33,12 +34,23 @@ import time
import json
import sys
import re
import io
from typing import Any, Callable, Generic, IO, Optional, TYPE_CHECKING, Tuple, Type, TypeVar, Union
from .errors import ClientException
from .opus import Encoder as OpusEncoder
from .oggparse import OggStream
from .utils import MISSING
log = logging.getLogger(__name__)
if TYPE_CHECKING:
from .voice_client import VoiceClient
AT = TypeVar('AT', bound='AudioSource')
FT = TypeVar('FT', bound='FFmpegOpusAudio')
log: logging.Logger = logging.getLogger(__name__)
__all__ = (
'AudioSource',
@ -49,6 +61,8 @@ __all__ = (
'PCMVolumeTransformer',
)
CREATE_NO_WINDOW: int
if sys.platform != 'win32':
CREATE_NO_WINDOW = 0
else:
@ -65,7 +79,7 @@ class AudioSource:
The audio source reads are done in a separate thread.
"""
def read(self):
def read(self) -> bytes:
"""Reads 20ms worth of audio.
Subclasses must implement this.
@ -85,11 +99,11 @@ class AudioSource:
"""
raise NotImplementedError
def is_opus(self):
def is_opus(self) -> bool:
"""Checks if the audio source is already encoded in Opus."""
return False
def cleanup(self):
def cleanup(self) -> None:
"""Called when clean-up is needed to be done.
Useful for clearing buffer data or processes after
@ -97,7 +111,7 @@ class AudioSource:
"""
pass
def __del__(self):
def __del__(self) -> None:
self.cleanup()
class PCMAudio(AudioSource):
@ -108,10 +122,10 @@ class PCMAudio(AudioSource):
stream: :term:`py:file object`
A file-like object that reads byte data representing raw PCM.
"""
def __init__(self, stream):
self.stream = stream
def __init__(self, stream: io.BufferedIOBase) -> None:
self.stream: io.BufferedIOBase = stream
def read(self):
def read(self) -> bytes:
ret = self.stream.read(OpusEncoder.FRAME_SIZE)
if len(ret) != OpusEncoder.FRAME_SIZE:
return b''
@ -126,17 +140,15 @@ class FFmpegAudio(AudioSource):
.. versionadded:: 1.3
"""
def __init__(self, source, *, executable='ffmpeg', args, **subprocess_kwargs):
self._process = self._stdout = None
def __init__(self, source: str, *, executable: str = 'ffmpeg', args: Any, **subprocess_kwargs: Any):
args = [executable, *args]
kwargs = {'stdout': subprocess.PIPE}
kwargs.update(subprocess_kwargs)
self._process = self._spawn_process(args, **kwargs)
self._stdout = self._process.stdout
self._process: subprocess.Popen = self._spawn_process(args, **kwargs)
self._stdout: IO[bytes] = self._process.stdout # type: ignore
def _spawn_process(self, args, **subprocess_kwargs):
def _spawn_process(self, args: Any, **subprocess_kwargs: Any) -> subprocess.Popen:
process = None
try:
process = subprocess.Popen(args, creationflags=CREATE_NO_WINDOW, **subprocess_kwargs)
@ -148,9 +160,9 @@ class FFmpegAudio(AudioSource):
else:
return process
def cleanup(self):
def cleanup(self) -> None:
proc = self._process
if proc is None:
if proc is MISSING:
return
log.info('Preparing to terminate ffmpeg process %s.', proc.pid)
@ -167,7 +179,7 @@ class FFmpegAudio(AudioSource):
else:
log.info('ffmpeg process %s successfully terminated with return code of %s.', proc.pid, proc.returncode)
self._process = self._stdout = None
self._process = self._stdout = MISSING
class FFmpegPCMAudio(FFmpegAudio):
"""An audio source from FFmpeg (or AVConv).
@ -204,7 +216,16 @@ class FFmpegPCMAudio(FFmpegAudio):
The subprocess failed to be created.
"""
def __init__(self, source, *, executable='ffmpeg', pipe=False, stderr=None, before_options=None, options=None):
def __init__(
self,
source: str,
*,
executable: str = 'ffmpeg',
pipe: bool = False,
stderr: Optional[IO[str]] = None,
before_options: Optional[str] = None,
options: Optional[str] = None
) -> None:
args = []
subprocess_kwargs = {'stdin': source if pipe else subprocess.DEVNULL, 'stderr': stderr}
@ -222,13 +243,13 @@ class FFmpegPCMAudio(FFmpegAudio):
super().__init__(source, executable=executable, args=args, **subprocess_kwargs)
def read(self):
def read(self) -> bytes:
ret = self._stdout.read(OpusEncoder.FRAME_SIZE)
if len(ret) != OpusEncoder.FRAME_SIZE:
return b''
return ret
def is_opus(self):
def is_opus(self) -> bool:
return False
class FFmpegOpusAudio(FFmpegAudio):
@ -292,8 +313,18 @@ class FFmpegOpusAudio(FFmpegAudio):
The subprocess failed to be created.
"""
def __init__(self, source, *, bitrate=128, codec=None, executable='ffmpeg',
pipe=False, stderr=None, before_options=None, options=None):
def __init__(
self,
source: str,
*,
bitrate: int = 128,
codec: Optional[str] = None,
executable: str = 'ffmpeg',
pipe=False,
stderr=None,
before_options=None,
options=None,
) -> None:
args = []
subprocess_kwargs = {'stdin': source if pipe else subprocess.DEVNULL, 'stderr': stderr}
@ -323,7 +354,13 @@ class FFmpegOpusAudio(FFmpegAudio):
self._packet_iter = OggStream(self._stdout).iter_packets()
@classmethod
async def from_probe(cls, source, *, method=None, **kwargs):
async def from_probe(
cls: Type[FT],
source: str,
*,
method: Optional[Union[str, Callable[[str, str], Tuple[Optional[str], Optional[int]]]]] = None,
**kwargs: Any,
) -> FT:
"""|coro|
A factory method that creates a :class:`FFmpegOpusAudio` after probing
@ -382,10 +419,16 @@ class FFmpegOpusAudio(FFmpegAudio):
executable = kwargs.get('executable')
codec, bitrate = await cls.probe(source, method=method, executable=executable)
return cls(source, bitrate=bitrate, codec=codec, **kwargs)
return cls(source, bitrate=bitrate, codec=codec, **kwargs) # type: ignore
@classmethod
async def probe(cls, source, *, method=None, executable=None):
async def probe(
cls,
source: str,
*,
method: Optional[Union[str, Callable[[str, str], Tuple[Optional[str], Optional[int]]]]] = None,
executable: Optional[str] = None,
) -> Tuple[Optional[str], Optional[int]]:
"""|coro|
Probes the input source for bitrate and codec information.
@ -408,7 +451,7 @@ class FFmpegOpusAudio(FFmpegAudio):
Returns
---------
Tuple[Optional[:class:`str`], Optional[:class:`int`]]
Optional[Tuple[Optional[:class:`str`], Optional[:class:`int`]]]
A 2-tuple with the codec and bitrate of the input source.
"""
@ -434,15 +477,15 @@ class FFmpegOpusAudio(FFmpegAudio):
codec = bitrate = None
loop = asyncio.get_event_loop()
try:
codec, bitrate = await loop.run_in_executor(None, lambda: probefunc(source, executable))
codec, bitrate = await loop.run_in_executor(None, lambda: probefunc(source, executable)) # type: ignore
except Exception:
if not fallback:
log.exception("Probe '%s' using '%s' failed", method, executable)
return
return # type: ignore
log.exception("Probe '%s' using '%s' failed, trying fallback", method, executable)
try:
codec, bitrate = await loop.run_in_executor(None, lambda: fallback(source, executable))
codec, bitrate = await loop.run_in_executor(None, lambda: fallback(source, executable)) # type: ignore
except Exception:
log.exception("Fallback probe using '%s' failed", executable)
else:
@ -453,7 +496,7 @@ class FFmpegOpusAudio(FFmpegAudio):
return codec, bitrate
@staticmethod
def _probe_codec_native(source, executable='ffmpeg'):
def _probe_codec_native(source, executable: str = 'ffmpeg') -> Tuple[Optional[str], Optional[int]]:
exe = executable[:2] + 'probe' if executable in ('ffmpeg', 'avconv') else executable
args = [exe, '-v', 'quiet', '-print_format', 'json', '-show_streams', '-select_streams', 'a:0', source]
output = subprocess.check_output(args, timeout=20)
@ -465,12 +508,12 @@ class FFmpegOpusAudio(FFmpegAudio):
codec = streamdata.get('codec_name')
bitrate = int(streamdata.get('bit_rate', 0))
bitrate = max(round(bitrate/1000, 0), 512)
bitrate = max(round(bitrate/1000), 512)
return codec, bitrate
@staticmethod
def _probe_codec_fallback(source, executable='ffmpeg'):
def _probe_codec_fallback(source, executable: str = 'ffmpeg') -> Tuple[Optional[str], Optional[int]]:
args = [executable, '-hide_banner', '-i', source]
proc = subprocess.Popen(args, creationflags=CREATE_NO_WINDOW, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
out, _ = proc.communicate(timeout=20)
@ -487,13 +530,13 @@ class FFmpegOpusAudio(FFmpegAudio):
return codec, bitrate
def read(self):
def read(self) -> bytes:
return next(self._packet_iter, b'')
def is_opus(self):
def is_opus(self) -> bool:
return True
class PCMVolumeTransformer(AudioSource):
class PCMVolumeTransformer(AudioSource, Generic[AT]):
"""Transforms a previous :class:`AudioSource` to have volume controls.
This does not work on audio sources that have :meth:`AudioSource.is_opus`
@ -515,53 +558,53 @@ class PCMVolumeTransformer(AudioSource):
The audio source is opus encoded.
"""
def __init__(self, original, volume=1.0):
def __init__(self, original: AT, volume: float = 1.0):
if not isinstance(original, AudioSource):
raise TypeError(f'expected AudioSource not {original.__class__.__name__}.')
if original.is_opus():
raise ClientException('AudioSource must not be Opus encoded.')
self.original = original
self.original: AT = original
self.volume = volume
@property
def volume(self):
def volume(self) -> float:
"""Retrieves or sets the volume as a floating point percentage (e.g. ``1.0`` for 100%)."""
return self._volume
@volume.setter
def volume(self, value):
def volume(self, value: float) -> None:
self._volume = max(value, 0.0)
def cleanup(self):
def cleanup(self) -> None:
self.original.cleanup()
def read(self):
def read(self) -> bytes:
ret = self.original.read()
return audioop.mul(ret, 2, min(self._volume, 2.0))
class AudioPlayer(threading.Thread):
DELAY = OpusEncoder.FRAME_LENGTH / 1000.0
DELAY: float = OpusEncoder.FRAME_LENGTH / 1000.0
def __init__(self, source, client, *, after=None):
def __init__(self, source: AudioSource, client: VoiceClient, *, after=None):
threading.Thread.__init__(self)
self.daemon = True
self.source = source
self.client = client
self.after = after
self.daemon: bool = True
self.source: AudioSource = source
self.client: VoiceClient = client
self.after: Optional[Callable[[Optional[Exception]], Any]] = after
self._end = threading.Event()
self._resumed = threading.Event()
self._end: threading.Event = threading.Event()
self._resumed: threading.Event = threading.Event()
self._resumed.set() # we are not paused
self._current_error = None
self._connected = client._connected
self._lock = threading.Lock()
self._current_error: Optional[Exception] = None
self._connected: threading.Event = client._connected
self._lock: threading.Lock = threading.Lock()
if after is not None and not callable(after):
raise TypeError('Expected a callable for the "after" parameter.')
def _do_run(self):
def _do_run(self) -> None:
self.loops = 0
self._start = time.perf_counter()
@ -596,7 +639,7 @@ class AudioPlayer(threading.Thread):
delay = max(0, self.DELAY + (next_time - time.perf_counter()))
time.sleep(delay)
def run(self):
def run(self) -> None:
try:
self._do_run()
except Exception as exc:
@ -606,7 +649,7 @@ class AudioPlayer(threading.Thread):
self.source.cleanup()
self._call_after()
def _call_after(self):
def _call_after(self) -> None:
error = self._current_error
if self.after is not None:
@ -622,36 +665,36 @@ class AudioPlayer(threading.Thread):
print(msg, file=sys.stderr)
traceback.print_exception(type(error), error, error.__traceback__)
def stop(self):
def stop(self) -> None:
self._end.set()
self._resumed.set()
self._speak(False)
def pause(self, *, update_speaking=True):
def pause(self, *, update_speaking: bool = True) -> None:
self._resumed.clear()
if update_speaking:
self._speak(False)
def resume(self, *, update_speaking=True):
def resume(self, *, update_speaking: bool = True) -> None:
self.loops = 0
self._start = time.perf_counter()
self._resumed.set()
if update_speaking:
self._speak(True)
def is_playing(self):
def is_playing(self) -> bool:
return self._resumed.is_set() and not self._end.is_set()
def is_paused(self):
def is_paused(self) -> bool:
return not self._end.is_set() and not self._resumed.is_set()
def _set_source(self, source):
def _set_source(self, source: AudioSource) -> None:
with self._lock:
self.pause(update_speaking=False)
self.source = source
self.resume(update_speaking=False)
def _speak(self, speaking):
def _speak(self, speaking: bool) -> None:
try:
asyncio.run_coroutine_threadsafe(self.client.ws.speak(speaking), self.client.loop)
except Exception as e: