Rework entire cog system and partially document it and extensions.

This commit is contained in:
Rapptz
2019-02-23 04:10:10 -05:00
parent 3f06f247c0
commit caf3d17d4a
14 changed files with 776 additions and 184 deletions

View File

@@ -35,6 +35,8 @@ from .errors import *
from .cooldowns import Cooldown, BucketType, CooldownMapping
from .view import quoted_word
from . import converter as converters
from ._types import _BaseCommand
from .cog import Cog
__all__ = ['Command', 'Group', 'GroupMixin', 'command', 'group',
'has_role', 'has_permissions', 'has_any_role', 'check',
@@ -102,7 +104,7 @@ class _CaseInsensitiveDict(dict):
def __setitem__(self, k, v):
super().__setitem__(k.lower(), v)
class Command:
class Command(_BaseCommand):
r"""A class that implements the protocol for a bot text command.
These are not created manually, instead they are created via the
@@ -156,14 +158,44 @@ class Command:
and ``b``). Otherwise :func:`.on_command_error` and local error handlers
are called with :exc:`.TooManyArguments`. Defaults to ``True``.
"""
def __init__(self, name, callback, **kwargs):
self.name = name
def __new__(cls, *args, **kwargs):
# if you're wondering why this is done, it's because we need to ensure
# we have a complete original copy of **kwargs even for classes that
# mess with it by popping before delegating to the subclass __init__.
# In order to do this, we need to control the instance creation and
# inject the original kwargs through __new__ rather than doing it
# inside __init__.
self = super().__new__(cls)
# we do a shallow copy because it's probably the most common use case.
# this could potentially break if someone modifies a list or something
# while it's in movement, but for now this is the cheapest and
# fastest way to do what we want.
self.__original_kwargs__ = kwargs.copy()
return self
def __init__(self, func, **kwargs):
if not asyncio.iscoroutinefunction(func):
raise TypeError('Callback must be a coroutine.')
self.name = name = kwargs.get('name') or func.__name__
if not isinstance(name, str):
raise TypeError('Name of a command must be a string.')
self.callback = callback
self.callback = func
self.enabled = kwargs.get('enabled', True)
self.help = kwargs.get('help')
help_doc = kwargs.get('help')
if help_doc is not None:
help_doc = inspect.cleandoc(help_doc)
else:
help_doc = inspect.getdoc(func)
if isinstance(help_doc, bytes):
help_doc = help_doc.decode('utf-8')
self.help = help_doc
self.brief = kwargs.get('brief')
self.usage = kwargs.get('usage')
self.rest_is_raw = kwargs.get('rest_is_raw', False)
@@ -175,11 +207,29 @@ class Command:
self.description = inspect.cleandoc(kwargs.get('description', ''))
self.hidden = kwargs.get('hidden', False)
self.checks = kwargs.get('checks', [])
try:
checks = func.__commands_checks__
checks.reverse()
del func.__commands_checks__
except AttributeError:
checks = kwargs.get('checks', [])
finally:
self.checks = checks
try:
cooldown = func.__commands_cooldown__
del func.__commands_cooldown__
except AttributeError:
cooldown = kwargs.get('cooldown')
finally:
self._buckets = CooldownMapping(cooldown)
self.ignore_extra = kwargs.get('ignore_extra', True)
self.instance = None
self.parent = None
self._buckets = CooldownMapping(kwargs.get('cooldown'))
self.cog = None
# bandaid for the fact that sometimes parent can be the bot instance
parent = kwargs.get('parent')
self.parent = parent if isinstance(parent, _BaseCommand) else None
self._before_invoke = None
self._after_invoke = None
@@ -206,9 +256,33 @@ class Command:
if value.annotation is converters.Greedy:
raise TypeError('Unparameterized Greedy[...] is disallowed in signature.')
def update(self, **kwargs):
"""Updates :class:`Command` instance with updated attribute.
This works similarly to the :func:`.command` decorator in terms
of parameters in that they are passed to the :class:`Command` or
subclass constructors, sans the name and callback.
"""
self.__init__(self.callback, **dict(self.__original_kwargs__, **kwargs))
def copy(self):
"""Creates a copy of this :class:`Command`."""
ret = self.__class__(self.callback, **self.__original_kwargs__)
ret._before_invoke = self._before_invoke
ret._after_invoke = self._after_invoke
return ret
def _update_copy(self, kwargs):
if kwargs:
copy = self.__class__(self.callback, **kwargs)
copy.update(**self.__original_kwargs__)
return copy
else:
return self.copy()
async def dispatch_error(self, ctx, error):
ctx.command_failed = True
cog = self.instance
cog = self.cog
try:
coro = self.on_error
except AttributeError:
@@ -221,20 +295,14 @@ class Command:
await injected(ctx, error)
try:
local = getattr(cog, '_{0.__class__.__name__}__error'.format(cog))
except AttributeError:
pass
else:
wrapped = wrap_callback(local)
await wrapped(ctx, error)
if cog is not None:
local = Cog._get_overridden_method(cog.cog_command_error)
if local is not None:
wrapped = wrap_callback(local)
await wrapped(ctx, error)
finally:
ctx.bot.dispatch('command_error', ctx, error)
def __get__(self, instance, owner):
if instance is not None:
self.instance = instance
return self
async def _actual_conversion(self, ctx, converter, argument, param):
if converter is bool:
return _convert_to_bool(argument)
@@ -392,7 +460,7 @@ class Command:
Useful for inspecting signature.
"""
result = self.params.copy()
if self.instance is not None:
if self.cog is not None:
# first parameter is self
result.popitem(last=False)
@@ -458,7 +526,7 @@ class Command:
return self.qualified_name
async def _parse_arguments(self, ctx):
ctx.args = [ctx] if self.instance is None else [self.instance, ctx]
ctx.args = [ctx] if self.cog is None else [self.cog, ctx]
ctx.kwargs = {}
args = ctx.args
kwargs = ctx.kwargs
@@ -466,7 +534,7 @@ class Command:
view = ctx.view
iterator = iter(self.params.items())
if self.instance is not None:
if self.cog is not None:
# we have 'self' as the first parameter so just advance
# the iterator and resume parsing
try:
@@ -517,7 +585,7 @@ class Command:
async def call_before_hooks(self, ctx):
# now that we're done preparing we can call the pre-command hooks
# first, call the command local hook:
cog = self.instance
cog = self.cog
if self._before_invoke is not None:
if cog is None:
await self._before_invoke(ctx)
@@ -525,12 +593,10 @@ class Command:
await self._before_invoke(cog, ctx)
# call the cog local hook if applicable:
try:
hook = getattr(cog, '_{0.__class__.__name__}__before_invoke'.format(cog))
except AttributeError:
pass
else:
await hook(ctx)
if cog is not None:
hook = Cog._get_overridden_method(cog.cog_before_invoke)
if hook is not None:
await hook(ctx)
# call the bot global hook if necessary
hook = ctx.bot._before_invoke
@@ -538,19 +604,18 @@ class Command:
await hook(ctx)
async def call_after_hooks(self, ctx):
cog = self.instance
cog = self.cog
if self._after_invoke is not None:
if cog is None:
await self._after_invoke(ctx)
else:
await self._after_invoke(cog, ctx)
try:
hook = getattr(cog, '_{0.__class__.__name__}__after_invoke'.format(cog))
except AttributeError:
pass
else:
await hook(ctx)
# call the cog local hook if applicable:
if cog is not None:
hook = Cog._get_overridden_method(cog.cog_after_invoke)
if hook is not None:
await hook(ctx)
hook = ctx.bot._after_invoke
if hook is not None:
@@ -708,7 +773,7 @@ class Command:
@property
def cog_name(self):
"""The name of the cog this command belongs to. None otherwise."""
return type(self.instance).__name__ if self.instance is not None else None
return type(self.cog).__cog_name__ if self.cog is not None else None
@property
def short_doc(self):
@@ -793,13 +858,10 @@ class Command:
if not await ctx.bot.can_run(ctx):
raise CheckFailure('The global check functions for command {0.qualified_name} failed.'.format(self))
cog = self.instance
cog = self.cog
if cog is not None:
try:
local_check = getattr(cog, '_{0.__class__.__name__}__local_check'.format(cog))
except AttributeError:
pass
else:
local_check = Cog._get_overridden_method(cog.cog_check)
if local_check is not None:
ret = await discord.utils.maybe_coroutine(local_check, ctx)
if not ret:
return False
@@ -825,11 +887,11 @@ class GroupMixin:
case_insensitive: :class:`bool`
Whether the commands should be case insensitive. Defaults to ``False``.
"""
def __init__(self, **kwargs):
def __init__(self, *args, **kwargs):
case_insensitive = kwargs.get('case_insensitive', False)
self.all_commands = _CaseInsensitiveDict() if case_insensitive else {}
self.case_insensitive = case_insensitive
super().__init__(**kwargs)
super().__init__(*args, **kwargs)
@property
def commands(self):
@@ -955,6 +1017,7 @@ class GroupMixin:
the internal command list via :meth:`~.GroupMixin.add_command`.
"""
def decorator(func):
kwargs.setdefault('parent', self)
result = command(*args, **kwargs)(func)
self.add_command(result)
return result
@@ -966,6 +1029,7 @@ class GroupMixin:
the internal command list via :meth:`~.GroupMixin.add_command`.
"""
def decorator(func):
kwargs.setdefault('parent', self)
result = group(*args, **kwargs)(func)
self.add_command(result)
return result
@@ -994,9 +1058,16 @@ class Group(GroupMixin, Command):
Indicates if the group's commands should be case insensitive.
Defaults to ``False``.
"""
def __init__(self, **attrs):
def __init__(self, *args, **attrs):
self.invoke_without_command = attrs.pop('invoke_without_command', False)
super().__init__(**attrs)
super().__init__(*args, **attrs)
def copy(self):
"""Creates a copy of this :class:`Group`."""
ret = super().copy()
for cmd in self.commands:
ret.add_command(cmd.copy())
return ret
async def invoke(self, ctx):
early_invoke = not self.invoke_without_command
@@ -1100,33 +1171,7 @@ def command(name=None, cls=None, **attrs):
def decorator(func):
if isinstance(func, Command):
raise TypeError('Callback is already a command.')
if not asyncio.iscoroutinefunction(func):
raise TypeError('Callback must be a coroutine.')
try:
checks = func.__commands_checks__
checks.reverse()
del func.__commands_checks__
except AttributeError:
checks = []
try:
cooldown = func.__commands_cooldown__
del func.__commands_cooldown__
except AttributeError:
cooldown = None
help_doc = attrs.get('help')
if help_doc is not None:
help_doc = inspect.cleandoc(help_doc)
else:
help_doc = inspect.getdoc(func)
if isinstance(help_doc, bytes):
help_doc = help_doc.decode('utf-8')
attrs['help'] = help_doc
fname = name or func.__name__
return cls(name=fname, callback=func, checks=checks, cooldown=cooldown, **attrs)
return cls(func, name=name, **attrs)
return decorator