2047 lines
		
	
	
		
			68 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			2047 lines
		
	
	
		
			68 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| # -*- coding: utf-8 -*-
 | |
| 
 | |
| """
 | |
| The MIT License (MIT)
 | |
| 
 | |
| Copyright (c) 2015-2020 Rapptz
 | |
| 
 | |
| Permission is hereby granted, free of charge, to any person obtaining a
 | |
| copy of this software and associated documentation files (the "Software"),
 | |
| to deal in the Software without restriction, including without limitation
 | |
| the rights to use, copy, modify, merge, publish, distribute, sublicense,
 | |
| and/or sell copies of the Software, and to permit persons to whom the
 | |
| Software is furnished to do so, subject to the following conditions:
 | |
| 
 | |
| The above copyright notice and this permission notice shall be included in
 | |
| all copies or substantial portions of the Software.
 | |
| 
 | |
| THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
 | |
| OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
 | |
| FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
 | |
| AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
 | |
| LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
 | |
| FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
 | |
| DEALINGS IN THE SOFTWARE.
 | |
| """
 | |
| 
 | |
| import asyncio
 | |
| import functools
 | |
| import inspect
 | |
| import typing
 | |
| import datetime
 | |
| 
 | |
| import discord
 | |
| 
 | |
| from .errors import *
 | |
| from .cooldowns import Cooldown, BucketType, CooldownMapping, MaxConcurrency
 | |
| from . import converter as converters
 | |
| from ._types import _BaseCommand
 | |
| from .cog import Cog
 | |
| 
 | |
| __all__ = (
 | |
|     'Command',
 | |
|     'Group',
 | |
|     'GroupMixin',
 | |
|     'command',
 | |
|     'group',
 | |
|     'has_role',
 | |
|     'has_permissions',
 | |
|     'has_any_role',
 | |
|     'check',
 | |
|     'check_any',
 | |
|     'before_invoke',
 | |
|     'after_invoke',
 | |
|     'bot_has_role',
 | |
|     'bot_has_permissions',
 | |
|     'bot_has_any_role',
 | |
|     'cooldown',
 | |
|     'max_concurrency',
 | |
|     'dm_only',
 | |
|     'guild_only',
 | |
|     'is_owner',
 | |
|     'is_nsfw',
 | |
|     'has_guild_permissions',
 | |
|     'bot_has_guild_permissions'
 | |
| )
 | |
| 
 | |
| def wrap_callback(coro):
 | |
|     @functools.wraps(coro)
 | |
|     async def wrapped(*args, **kwargs):
 | |
|         try:
 | |
|             ret = await coro(*args, **kwargs)
 | |
|         except CommandError:
 | |
|             raise
 | |
|         except asyncio.CancelledError:
 | |
|             return
 | |
|         except Exception as exc:
 | |
|             raise CommandInvokeError(exc) from exc
 | |
|         return ret
 | |
|     return wrapped
 | |
| 
 | |
| def hooked_wrapped_callback(command, ctx, coro):
 | |
|     @functools.wraps(coro)
 | |
|     async def wrapped(*args, **kwargs):
 | |
|         try:
 | |
|             ret = await coro(*args, **kwargs)
 | |
|         except CommandError:
 | |
|             ctx.command_failed = True
 | |
|             raise
 | |
|         except asyncio.CancelledError:
 | |
|             ctx.command_failed = True
 | |
|             return
 | |
|         except Exception as exc:
 | |
|             ctx.command_failed = True
 | |
|             raise CommandInvokeError(exc) from exc
 | |
|         finally:
 | |
|             if command._max_concurrency is not None:
 | |
|                 await command._max_concurrency.release(ctx)
 | |
| 
 | |
|             await command.call_after_hooks(ctx)
 | |
|         return ret
 | |
|     return wrapped
 | |
| 
 | |
| def _convert_to_bool(argument):
 | |
|     lowered = argument.lower()
 | |
|     if lowered in ('yes', 'y', 'true', 't', '1', 'enable', 'on'):
 | |
|         return True
 | |
|     elif lowered in ('no', 'n', 'false', 'f', '0', 'disable', 'off'):
 | |
|         return False
 | |
|     else:
 | |
|         raise BadBoolArgument(lowered)
 | |
| 
 | |
| class _CaseInsensitiveDict(dict):
 | |
|     def __contains__(self, k):
 | |
|         return super().__contains__(k.casefold())
 | |
| 
 | |
|     def __delitem__(self, k):
 | |
|         return super().__delitem__(k.casefold())
 | |
| 
 | |
|     def __getitem__(self, k):
 | |
|         return super().__getitem__(k.casefold())
 | |
| 
 | |
|     def get(self, k, default=None):
 | |
|         return super().get(k.casefold(), default)
 | |
| 
 | |
|     def pop(self, k, default=None):
 | |
|         return super().pop(k.casefold(), default)
 | |
| 
 | |
|     def __setitem__(self, k, v):
 | |
|         super().__setitem__(k.casefold(), v)
 | |
| 
 | |
| class Command(_BaseCommand):
 | |
|     r"""A class that implements the protocol for a bot text command.
 | |
| 
 | |
|     These are not created manually, instead they are created via the
 | |
|     decorator or functional interface.
 | |
| 
 | |
|     Attributes
 | |
|     -----------
 | |
|     name: :class:`str`
 | |
|         The name of the command.
 | |
|     callback: :ref:`coroutine <coroutine>`
 | |
|         The coroutine that is executed when the command is called.
 | |
|     help: :class:`str`
 | |
|         The long help text for the command.
 | |
|     brief: Optional[:class:`str`]
 | |
|         The short help text for the command.
 | |
|     usage: :class:`str`
 | |
|         A replacement for arguments in the default help text.
 | |
|     aliases: Union[List[:class:`str`], Tuple[:class:`str`]]
 | |
|         The list of aliases the command can be invoked under.
 | |
|     enabled: :class:`bool`
 | |
|         A boolean that indicates if the command is currently enabled.
 | |
|         If the command is invoked while it is disabled, then
 | |
|         :exc:`.DisabledCommand` is raised to the :func:`.on_command_error`
 | |
|         event. Defaults to ``True``.
 | |
|     parent: Optional[:class:`Command`]
 | |
|         The parent command that this command belongs to. ``None`` if there
 | |
|         isn't one.
 | |
|     cog: Optional[:class:`Cog`]
 | |
|         The cog that this command belongs to. ``None`` if there isn't one.
 | |
|     checks: List[Callable[..., :class:`bool`]]
 | |
|         A list of predicates that verifies if the command could be executed
 | |
|         with the given :class:`.Context` as the sole parameter. If an exception
 | |
|         is necessary to be thrown to signal failure, then one inherited from
 | |
|         :exc:`.CommandError` should be used. Note that if the checks fail then
 | |
|         :exc:`.CheckFailure` exception is raised to the :func:`.on_command_error`
 | |
|         event.
 | |
|     description: :class:`str`
 | |
|         The message prefixed into the default help command.
 | |
|     hidden: :class:`bool`
 | |
|         If ``True``\, the default help command does not show this in the
 | |
|         help output.
 | |
|     rest_is_raw: :class:`bool`
 | |
|         If ``False`` and a keyword-only argument is provided then the keyword
 | |
|         only argument is stripped and handled as if it was a regular argument
 | |
|         that handles :exc:`.MissingRequiredArgument` and default values in a
 | |
|         regular matter rather than passing the rest completely raw. If ``True``
 | |
|         then the keyword-only argument will pass in the rest of the arguments
 | |
|         in a completely raw matter. Defaults to ``False``.
 | |
|     invoked_subcommand: Optional[:class:`Command`]
 | |
|         The subcommand that was invoked, if any.
 | |
|     require_var_positional: :class:`bool`
 | |
|         If ``True`` and a variadic positional argument is specified, requires
 | |
|         the user to specify at least one argument. Defaults to ``False``.
 | |
| 
 | |
|         .. versionadded:: 1.5
 | |
| 
 | |
|     ignore_extra: :class:`bool`
 | |
|         If ``True``\, ignores extraneous strings passed to a command if all its
 | |
|         requirements are met (e.g. ``?foo a b c`` when only expecting ``a``
 | |
|         and ``b``). Otherwise :func:`.on_command_error` and local error handlers
 | |
|         are called with :exc:`.TooManyArguments`. Defaults to ``True``.
 | |
|     cooldown_after_parsing: :class:`bool`
 | |
|         If ``True``\, cooldown processing is done after argument parsing,
 | |
|         which calls converters. If ``False`` then cooldown processing is done
 | |
|         first and then the converters are called second. Defaults to ``False``.
 | |
|     """
 | |
| 
 | |
|     def __new__(cls, *args, **kwargs):
 | |
|         # if you're wondering why this is done, it's because we need to ensure
 | |
|         # we have a complete original copy of **kwargs even for classes that
 | |
|         # mess with it by popping before delegating to the subclass __init__.
 | |
|         # In order to do this, we need to control the instance creation and
 | |
|         # inject the original kwargs through __new__ rather than doing it
 | |
|         # inside __init__.
 | |
|         self = super().__new__(cls)
 | |
| 
 | |
|         # we do a shallow copy because it's probably the most common use case.
 | |
|         # this could potentially break if someone modifies a list or something
 | |
|         # while it's in movement, but for now this is the cheapest and
 | |
|         # fastest way to do what we want.
 | |
|         self.__original_kwargs__ = kwargs.copy()
 | |
|         return self
 | |
| 
 | |
|     def __init__(self, func, **kwargs):
 | |
|         if not asyncio.iscoroutinefunction(func):
 | |
|             raise TypeError('Callback must be a coroutine.')
 | |
| 
 | |
|         self.name = name = kwargs.get('name') or func.__name__
 | |
|         if not isinstance(name, str):
 | |
|             raise TypeError('Name of a command must be a string.')
 | |
| 
 | |
|         self.callback = func
 | |
|         self.enabled = kwargs.get('enabled', True)
 | |
| 
 | |
|         help_doc = kwargs.get('help')
 | |
|         if help_doc is not None:
 | |
|             help_doc = inspect.cleandoc(help_doc)
 | |
|         else:
 | |
|             help_doc = inspect.getdoc(func)
 | |
|             if isinstance(help_doc, bytes):
 | |
|                 help_doc = help_doc.decode('utf-8')
 | |
| 
 | |
|         self.help = help_doc
 | |
| 
 | |
|         self.brief = kwargs.get('brief')
 | |
|         self.usage = kwargs.get('usage')
 | |
|         self.rest_is_raw = kwargs.get('rest_is_raw', False)
 | |
|         self.aliases = kwargs.get('aliases', [])
 | |
| 
 | |
|         if not isinstance(self.aliases, (list, tuple)):
 | |
|             raise TypeError("Aliases of a command must be a list or a tuple of strings.")
 | |
| 
 | |
|         self.description = inspect.cleandoc(kwargs.get('description', ''))
 | |
|         self.hidden = kwargs.get('hidden', False)
 | |
| 
 | |
|         try:
 | |
|             checks = func.__commands_checks__
 | |
|             checks.reverse()
 | |
|         except AttributeError:
 | |
|             checks = kwargs.get('checks', [])
 | |
|         finally:
 | |
|             self.checks = checks
 | |
| 
 | |
|         try:
 | |
|             cooldown = func.__commands_cooldown__
 | |
|         except AttributeError:
 | |
|             cooldown = kwargs.get('cooldown')
 | |
|         finally:
 | |
|             self._buckets = CooldownMapping(cooldown)
 | |
| 
 | |
|         try:
 | |
|             max_concurrency = func.__commands_max_concurrency__
 | |
|         except AttributeError:
 | |
|             max_concurrency = kwargs.get('max_concurrency')
 | |
|         finally:
 | |
|             self._max_concurrency = max_concurrency
 | |
| 
 | |
|         self.require_var_positional = kwargs.get('require_var_positional', False)
 | |
|         self.ignore_extra = kwargs.get('ignore_extra', True)
 | |
|         self.cooldown_after_parsing = kwargs.get('cooldown_after_parsing', False)
 | |
|         self.cog = None
 | |
| 
 | |
|         # bandaid for the fact that sometimes parent can be the bot instance
 | |
|         parent = kwargs.get('parent')
 | |
|         self.parent = parent if isinstance(parent, _BaseCommand) else None
 | |
| 
 | |
|         try:
 | |
|             before_invoke = func.__before_invoke__
 | |
|         except AttributeError:
 | |
|             self._before_invoke = None
 | |
|         else:
 | |
|             self.before_invoke(before_invoke)
 | |
| 
 | |
|         try:
 | |
|             after_invoke = func.__after_invoke__
 | |
|         except AttributeError:
 | |
|             self._after_invoke = None
 | |
|         else:
 | |
|             self.after_invoke(after_invoke)
 | |
| 
 | |
|     @property
 | |
|     def callback(self):
 | |
|         return self._callback
 | |
| 
 | |
|     @callback.setter
 | |
|     def callback(self, function):
 | |
|         self._callback = function
 | |
|         self.module = function.__module__
 | |
| 
 | |
|         signature = inspect.signature(function)
 | |
|         self.params = signature.parameters.copy()
 | |
| 
 | |
|         # PEP-563 allows postponing evaluation of annotations with a __future__
 | |
|         # import. When postponed, Parameter.annotation will be a string and must
 | |
|         # be replaced with the real value for the converters to work later on
 | |
|         for key, value in self.params.items():
 | |
|             if isinstance(value.annotation, str):
 | |
|                 self.params[key] = value = value.replace(annotation=eval(value.annotation, function.__globals__))
 | |
| 
 | |
|             # fail early for when someone passes an unparameterized Greedy type
 | |
|             if value.annotation is converters.Greedy:
 | |
|                 raise TypeError('Unparameterized Greedy[...] is disallowed in signature.')
 | |
| 
 | |
|     def add_check(self, func):
 | |
|         """Adds a check to the command.
 | |
| 
 | |
|         This is the non-decorator interface to :func:`.check`.
 | |
| 
 | |
|         .. versionadded:: 1.3
 | |
| 
 | |
|         Parameters
 | |
|         -----------
 | |
|         func
 | |
|             The function that will be used as a check.
 | |
|         """
 | |
| 
 | |
|         self.checks.append(func)
 | |
| 
 | |
|     def remove_check(self, func):
 | |
|         """Removes a check from the command.
 | |
| 
 | |
|         This function is idempotent and will not raise an exception
 | |
|         if the function is not in the command's checks.
 | |
| 
 | |
|         .. versionadded:: 1.3
 | |
| 
 | |
|         Parameters
 | |
|         -----------
 | |
|         func
 | |
|             The function to remove from the checks.
 | |
|         """
 | |
| 
 | |
|         try:
 | |
|             self.checks.remove(func)
 | |
|         except ValueError:
 | |
|             pass
 | |
| 
 | |
|     def update(self, **kwargs):
 | |
|         """Updates :class:`Command` instance with updated attribute.
 | |
| 
 | |
|         This works similarly to the :func:`.command` decorator in terms
 | |
|         of parameters in that they are passed to the :class:`Command` or
 | |
|         subclass constructors, sans the name and callback.
 | |
|         """
 | |
|         self.__init__(self.callback, **dict(self.__original_kwargs__, **kwargs))
 | |
| 
 | |
|     async def __call__(self, *args, **kwargs):
 | |
|         """|coro|
 | |
| 
 | |
|         Calls the internal callback that the command holds.
 | |
| 
 | |
|         .. note::
 | |
| 
 | |
|             This bypasses all mechanisms -- including checks, converters,
 | |
|             invoke hooks, cooldowns, etc. You must take care to pass
 | |
|             the proper arguments and types to this function.
 | |
| 
 | |
|         .. versionadded:: 1.3
 | |
|         """
 | |
|         if self.cog is not None:
 | |
|             return await self.callback(self.cog, *args, **kwargs)
 | |
|         else:
 | |
|             return await self.callback(*args, **kwargs)
 | |
| 
 | |
|     def _ensure_assignment_on_copy(self, other):
 | |
|         other._before_invoke = self._before_invoke
 | |
|         other._after_invoke = self._after_invoke
 | |
|         if self.checks != other.checks:
 | |
|             other.checks = self.checks.copy()
 | |
|         if self._buckets.valid and not other._buckets.valid:
 | |
|             other._buckets = self._buckets.copy()
 | |
|         if self._max_concurrency != other._max_concurrency:
 | |
|             other._max_concurrency = self._max_concurrency.copy()
 | |
| 
 | |
|         try:
 | |
|             other.on_error = self.on_error
 | |
|         except AttributeError:
 | |
|             pass
 | |
|         return other
 | |
| 
 | |
|     def copy(self):
 | |
|         """Creates a copy of this command.
 | |
| 
 | |
|         Returns
 | |
|         --------
 | |
|         :class:`Command`
 | |
|             A new instance of this command.
 | |
|         """
 | |
|         ret = self.__class__(self.callback, **self.__original_kwargs__)
 | |
|         return self._ensure_assignment_on_copy(ret)
 | |
| 
 | |
|     def _update_copy(self, kwargs):
 | |
|         if kwargs:
 | |
|             kw = kwargs.copy()
 | |
|             kw.update(self.__original_kwargs__)
 | |
|             copy = self.__class__(self.callback, **kw)
 | |
|             return self._ensure_assignment_on_copy(copy)
 | |
|         else:
 | |
|             return self.copy()
 | |
| 
 | |
|     async def dispatch_error(self, ctx, error):
 | |
|         ctx.command_failed = True
 | |
|         cog = self.cog
 | |
|         try:
 | |
|             coro = self.on_error
 | |
|         except AttributeError:
 | |
|             pass
 | |
|         else:
 | |
|             injected = wrap_callback(coro)
 | |
|             if cog is not None:
 | |
|                 await injected(cog, ctx, error)
 | |
|             else:
 | |
|                 await injected(ctx, error)
 | |
| 
 | |
|         try:
 | |
|             if cog is not None:
 | |
|                 local = Cog._get_overridden_method(cog.cog_command_error)
 | |
|                 if local is not None:
 | |
|                     wrapped = wrap_callback(local)
 | |
|                     await wrapped(ctx, error)
 | |
|         finally:
 | |
|             ctx.bot.dispatch('command_error', ctx, error)
 | |
| 
 | |
|     async def _actual_conversion(self, ctx, converter, argument, param):
 | |
|         if converter is bool:
 | |
|             return _convert_to_bool(argument)
 | |
| 
 | |
|         try:
 | |
|             module = converter.__module__
 | |
|         except AttributeError:
 | |
|             pass
 | |
|         else:
 | |
|             if module is not None and (module.startswith('discord.') and not module.endswith('converter')):
 | |
|                 converter = getattr(converters, converter.__name__ + 'Converter', converter)
 | |
| 
 | |
|         try:
 | |
|             if inspect.isclass(converter):
 | |
|                 if issubclass(converter, converters.Converter):
 | |
|                     instance = converter()
 | |
|                     ret = await instance.convert(ctx, argument)
 | |
|                     return ret
 | |
|                 else:
 | |
|                     method = getattr(converter, 'convert', None)
 | |
|                     if method is not None and inspect.ismethod(method):
 | |
|                         ret = await method(ctx, argument)
 | |
|                         return ret
 | |
|             elif isinstance(converter, converters.Converter):
 | |
|                 ret = await converter.convert(ctx, argument)
 | |
|                 return ret
 | |
|         except CommandError:
 | |
|             raise
 | |
|         except Exception as exc:
 | |
|             raise ConversionError(converter, exc) from exc
 | |
| 
 | |
|         try:
 | |
|             return converter(argument)
 | |
|         except CommandError:
 | |
|             raise
 | |
|         except Exception as exc:
 | |
|             try:
 | |
|                 name = converter.__name__
 | |
|             except AttributeError:
 | |
|                 name = converter.__class__.__name__
 | |
| 
 | |
|             raise BadArgument('Converting to "{}" failed for parameter "{}".'.format(name, param.name)) from exc
 | |
| 
 | |
|     async def do_conversion(self, ctx, converter, argument, param):
 | |
|         try:
 | |
|             origin = converter.__origin__
 | |
|         except AttributeError:
 | |
|             pass
 | |
|         else:
 | |
|             if origin is typing.Union:
 | |
|                 errors = []
 | |
|                 _NoneType = type(None)
 | |
|                 for conv in converter.__args__:
 | |
|                     # if we got to this part in the code, then the previous conversions have failed
 | |
|                     # so we should just undo the view, return the default, and allow parsing to continue
 | |
|                     # with the other parameters
 | |
|                     if conv is _NoneType and param.kind != param.VAR_POSITIONAL:
 | |
|                         ctx.view.undo()
 | |
|                         return None if param.default is param.empty else param.default
 | |
| 
 | |
|                     try:
 | |
|                         value = await self._actual_conversion(ctx, conv, argument, param)
 | |
|                     except CommandError as exc:
 | |
|                         errors.append(exc)
 | |
|                     else:
 | |
|                         return value
 | |
| 
 | |
|                 # if we're  here, then we failed all the converters
 | |
|                 raise BadUnionArgument(param, converter.__args__, errors)
 | |
| 
 | |
|         return await self._actual_conversion(ctx, converter, argument, param)
 | |
| 
 | |
|     def _get_converter(self, param):
 | |
|         converter = param.annotation
 | |
|         if converter is param.empty:
 | |
|             if param.default is not param.empty:
 | |
|                 converter = str if param.default is None else type(param.default)
 | |
|             else:
 | |
|                 converter = str
 | |
|         return converter
 | |
| 
 | |
|     async def transform(self, ctx, param):
 | |
|         required = param.default is param.empty
 | |
|         converter = self._get_converter(param)
 | |
|         consume_rest_is_special = param.kind == param.KEYWORD_ONLY and not self.rest_is_raw
 | |
|         view = ctx.view
 | |
|         view.skip_ws()
 | |
| 
 | |
|         # The greedy converter is simple -- it keeps going until it fails in which case,
 | |
|         # it undos the view ready for the next parameter to use instead
 | |
|         if type(converter) is converters._Greedy:
 | |
|             if param.kind == param.POSITIONAL_OR_KEYWORD:
 | |
|                 return await self._transform_greedy_pos(ctx, param, required, converter.converter)
 | |
|             elif param.kind == param.VAR_POSITIONAL:
 | |
|                 return await self._transform_greedy_var_pos(ctx, param, converter.converter)
 | |
|             else:
 | |
|                 # if we're here, then it's a KEYWORD_ONLY param type
 | |
|                 # since this is mostly useless, we'll helpfully transform Greedy[X]
 | |
|                 # into just X and do the parsing that way.
 | |
|                 converter = converter.converter
 | |
| 
 | |
|         if view.eof:
 | |
|             if param.kind == param.VAR_POSITIONAL:
 | |
|                 raise RuntimeError() # break the loop
 | |
|             if required:
 | |
|                 if self._is_typing_optional(param.annotation):
 | |
|                     return None
 | |
|                 raise MissingRequiredArgument(param)
 | |
|             return param.default
 | |
| 
 | |
|         previous = view.index
 | |
|         if consume_rest_is_special:
 | |
|             argument = view.read_rest().strip()
 | |
|         else:
 | |
|             argument = view.get_quoted_word()
 | |
|         view.previous = previous
 | |
| 
 | |
|         return await self.do_conversion(ctx, converter, argument, param)
 | |
| 
 | |
|     async def _transform_greedy_pos(self, ctx, param, required, converter):
 | |
|         view = ctx.view
 | |
|         result = []
 | |
|         while not view.eof:
 | |
|             # for use with a manual undo
 | |
|             previous = view.index
 | |
| 
 | |
|             view.skip_ws()
 | |
|             try:
 | |
|                 argument = view.get_quoted_word()
 | |
|                 value = await self.do_conversion(ctx, converter, argument, param)
 | |
|             except (CommandError, ArgumentParsingError):
 | |
|                 view.index = previous
 | |
|                 break
 | |
|             else:
 | |
|                 result.append(value)
 | |
| 
 | |
|         if not result and not required:
 | |
|             return param.default
 | |
|         return result
 | |
| 
 | |
|     async def _transform_greedy_var_pos(self, ctx, param, converter):
 | |
|         view = ctx.view
 | |
|         previous = view.index
 | |
|         try:
 | |
|             argument = view.get_quoted_word()
 | |
|             value = await self.do_conversion(ctx, converter, argument, param)
 | |
|         except (CommandError, ArgumentParsingError):
 | |
|             view.index = previous
 | |
|             raise RuntimeError() from None # break loop
 | |
|         else:
 | |
|             return value
 | |
| 
 | |
|     @property
 | |
|     def clean_params(self):
 | |
|         """OrderedDict[:class:`str`, :class:`inspect.Parameter`]:
 | |
|         Retrieves the parameter OrderedDict without the context or self parameters.
 | |
| 
 | |
|         Useful for inspecting signature.
 | |
|         """
 | |
|         result = self.params.copy()
 | |
|         if self.cog is not None:
 | |
|             # first parameter is self
 | |
|             result.popitem(last=False)
 | |
| 
 | |
|         try:
 | |
|             # first/second parameter is context
 | |
|             result.popitem(last=False)
 | |
|         except Exception:
 | |
|             raise ValueError('Missing context parameter') from None
 | |
| 
 | |
|         return result
 | |
| 
 | |
|     @property
 | |
|     def full_parent_name(self):
 | |
|         """:class:`str`: Retrieves the fully qualified parent command name.
 | |
| 
 | |
|         This the base command name required to execute it. For example,
 | |
|         in ``?one two three`` the parent name would be ``one two``.
 | |
|         """
 | |
|         entries = []
 | |
|         command = self
 | |
|         while command.parent is not None:
 | |
|             command = command.parent
 | |
|             entries.append(command.name)
 | |
| 
 | |
|         return ' '.join(reversed(entries))
 | |
| 
 | |
|     @property
 | |
|     def parents(self):
 | |
|         """List[:class:`Command`]: Retrieves the parents of this command.
 | |
| 
 | |
|         If the command has no parents then it returns an empty :class:`list`.
 | |
| 
 | |
|         For example in commands ``?a b c test``, the parents are ``[c, b, a]``.
 | |
| 
 | |
|         .. versionadded:: 1.1
 | |
|         """
 | |
|         entries = []
 | |
|         command = self
 | |
|         while command.parent is not None:
 | |
|             command = command.parent
 | |
|             entries.append(command)
 | |
| 
 | |
|         return entries
 | |
| 
 | |
|     @property
 | |
|     def root_parent(self):
 | |
|         """Optional[:class:`Command`]: Retrieves the root parent of this command.
 | |
| 
 | |
|         If the command has no parents then it returns ``None``.
 | |
| 
 | |
|         For example in commands ``?a b c test``, the root parent is ``a``.
 | |
|         """
 | |
|         if not self.parent:
 | |
|             return None
 | |
|         return self.parents[-1]
 | |
| 
 | |
|     @property
 | |
|     def qualified_name(self):
 | |
|         """:class:`str`: Retrieves the fully qualified command name.
 | |
| 
 | |
|         This is the full parent name with the command name as well.
 | |
|         For example, in ``?one two three`` the qualified name would be
 | |
|         ``one two three``.
 | |
|         """
 | |
| 
 | |
|         parent = self.full_parent_name
 | |
|         if parent:
 | |
|             return parent + ' ' + self.name
 | |
|         else:
 | |
|             return self.name
 | |
| 
 | |
|     def __str__(self):
 | |
|         return self.qualified_name
 | |
| 
 | |
|     async def _parse_arguments(self, ctx):
 | |
|         ctx.args = [ctx] if self.cog is None else [self.cog, ctx]
 | |
|         ctx.kwargs = {}
 | |
|         args = ctx.args
 | |
|         kwargs = ctx.kwargs
 | |
| 
 | |
|         view = ctx.view
 | |
|         iterator = iter(self.params.items())
 | |
| 
 | |
|         if self.cog is not None:
 | |
|             # we have 'self' as the first parameter so just advance
 | |
|             # the iterator and resume parsing
 | |
|             try:
 | |
|                 next(iterator)
 | |
|             except StopIteration:
 | |
|                 fmt = 'Callback for {0.name} command is missing "self" parameter.'
 | |
|                 raise discord.ClientException(fmt.format(self))
 | |
| 
 | |
|         # next we have the 'ctx' as the next parameter
 | |
|         try:
 | |
|             next(iterator)
 | |
|         except StopIteration:
 | |
|             fmt = 'Callback for {0.name} command is missing "ctx" parameter.'
 | |
|             raise discord.ClientException(fmt.format(self))
 | |
| 
 | |
|         for name, param in iterator:
 | |
|             if param.kind == param.POSITIONAL_OR_KEYWORD:
 | |
|                 transformed = await self.transform(ctx, param)
 | |
|                 args.append(transformed)
 | |
|             elif param.kind == param.KEYWORD_ONLY:
 | |
|                 # kwarg only param denotes "consume rest" semantics
 | |
|                 if self.rest_is_raw:
 | |
|                     converter = self._get_converter(param)
 | |
|                     argument = view.read_rest()
 | |
|                     kwargs[name] = await self.do_conversion(ctx, converter, argument, param)
 | |
|                 else:
 | |
|                     kwargs[name] = await self.transform(ctx, param)
 | |
|                 break
 | |
|             elif param.kind == param.VAR_POSITIONAL:
 | |
|                 if view.eof and self.require_var_positional:
 | |
|                     raise MissingRequiredArgument(param)
 | |
|                 while not view.eof:
 | |
|                     try:
 | |
|                         transformed = await self.transform(ctx, param)
 | |
|                         args.append(transformed)
 | |
|                     except RuntimeError:
 | |
|                         break
 | |
| 
 | |
|         if not self.ignore_extra:
 | |
|             if not view.eof:
 | |
|                 raise TooManyArguments('Too many arguments passed to ' + self.qualified_name)
 | |
| 
 | |
|     async def call_before_hooks(self, ctx):
 | |
|         # now that we're done preparing we can call the pre-command hooks
 | |
|         # first, call the command local hook:
 | |
|         cog = self.cog
 | |
|         if self._before_invoke is not None:
 | |
|             # should be cog if @commands.before_invoke is used
 | |
|             instance = getattr(self._before_invoke, '__self__', cog)
 | |
|             # __self__ only exists for methods, not functions
 | |
|             # however, if @command.before_invoke is used, it will be a function
 | |
|             if instance:
 | |
|                 await self._before_invoke(instance, ctx)
 | |
|             else:
 | |
|                 await self._before_invoke(ctx)
 | |
| 
 | |
|         # call the cog local hook if applicable:
 | |
|         if cog is not None:
 | |
|             hook = Cog._get_overridden_method(cog.cog_before_invoke)
 | |
|             if hook is not None:
 | |
|                 await hook(ctx)
 | |
| 
 | |
|         # call the bot global hook if necessary
 | |
|         hook = ctx.bot._before_invoke
 | |
|         if hook is not None:
 | |
|             await hook(ctx)
 | |
| 
 | |
|     async def call_after_hooks(self, ctx):
 | |
|         cog = self.cog
 | |
|         if self._after_invoke is not None:
 | |
|             instance = getattr(self._after_invoke, '__self__', cog)
 | |
|             if instance:
 | |
|                     await self._after_invoke(instance, ctx)
 | |
|             else:
 | |
|                 await self._after_invoke(ctx)
 | |
| 
 | |
|         # call the cog local hook if applicable:
 | |
|         if cog is not None:
 | |
|             hook = Cog._get_overridden_method(cog.cog_after_invoke)
 | |
|             if hook is not None:
 | |
|                 await hook(ctx)
 | |
| 
 | |
|         hook = ctx.bot._after_invoke
 | |
|         if hook is not None:
 | |
|             await hook(ctx)
 | |
| 
 | |
|     def _prepare_cooldowns(self, ctx):
 | |
|         if self._buckets.valid:
 | |
|             dt = ctx.message.edited_at or ctx.message.created_at
 | |
|             current = dt.replace(tzinfo=datetime.timezone.utc).timestamp()
 | |
|             bucket = self._buckets.get_bucket(ctx.message, current)
 | |
|             retry_after = bucket.update_rate_limit(current)
 | |
|             if retry_after:
 | |
|                 raise CommandOnCooldown(bucket, retry_after)
 | |
| 
 | |
|     async def prepare(self, ctx):
 | |
|         ctx.command = self
 | |
| 
 | |
|         if not await self.can_run(ctx):
 | |
|             raise CheckFailure('The check functions for command {0.qualified_name} failed.'.format(self))
 | |
| 
 | |
|         if self._max_concurrency is not None:
 | |
|             await self._max_concurrency.acquire(ctx)
 | |
| 
 | |
|         if self.cooldown_after_parsing:
 | |
|             await self._parse_arguments(ctx)
 | |
|             self._prepare_cooldowns(ctx)
 | |
|         else:
 | |
|             self._prepare_cooldowns(ctx)
 | |
|             await self._parse_arguments(ctx)
 | |
| 
 | |
|         await self.call_before_hooks(ctx)
 | |
| 
 | |
|     def is_on_cooldown(self, ctx):
 | |
|         """Checks whether the command is currently on cooldown.
 | |
| 
 | |
|         Parameters
 | |
|         -----------
 | |
|         ctx: :class:`.Context`
 | |
|             The invocation context to use when checking the commands cooldown status.
 | |
| 
 | |
|         Returns
 | |
|         --------
 | |
|         :class:`bool`
 | |
|             A boolean indicating if the command is on cooldown.
 | |
|         """
 | |
|         if not self._buckets.valid:
 | |
|             return False
 | |
| 
 | |
|         bucket = self._buckets.get_bucket(ctx.message)
 | |
|         dt = ctx.message.edited_at or ctx.message.created_at
 | |
|         current = dt.replace(tzinfo=datetime.timezone.utc).timestamp()
 | |
|         return bucket.get_tokens(current) == 0
 | |
| 
 | |
|     def reset_cooldown(self, ctx):
 | |
|         """Resets the cooldown on this command.
 | |
| 
 | |
|         Parameters
 | |
|         -----------
 | |
|         ctx: :class:`.Context`
 | |
|             The invocation context to reset the cooldown under.
 | |
|         """
 | |
|         if self._buckets.valid:
 | |
|             bucket = self._buckets.get_bucket(ctx.message)
 | |
|             bucket.reset()
 | |
| 
 | |
|     def get_cooldown_retry_after(self, ctx):
 | |
|         """Retrieves the amount of seconds before this command can be tried again.
 | |
| 
 | |
|         .. versionadded:: 1.4
 | |
| 
 | |
|         Parameters
 | |
|         -----------
 | |
|         ctx: :class:`.Context`
 | |
|             The invocation context to retrieve the cooldown from.
 | |
| 
 | |
|         Returns
 | |
|         --------
 | |
|         :class:`float`
 | |
|             The amount of time left on this command's cooldown in seconds.
 | |
|             If this is ``0.0`` then the command isn't on cooldown.
 | |
|         """
 | |
|         if self._buckets.valid:
 | |
|             bucket = self._buckets.get_bucket(ctx.message)
 | |
|             dt = ctx.message.edited_at or ctx.message.created_at
 | |
|             current = dt.replace(tzinfo=datetime.timezone.utc).timestamp()
 | |
|             return bucket.get_retry_after(current)
 | |
| 
 | |
|         return 0.0
 | |
| 
 | |
|     async def invoke(self, ctx):
 | |
|         await self.prepare(ctx)
 | |
| 
 | |
|         # terminate the invoked_subcommand chain.
 | |
|         # since we're in a regular command (and not a group) then
 | |
|         # the invoked subcommand is None.
 | |
|         ctx.invoked_subcommand = None
 | |
|         ctx.subcommand_passed = None
 | |
|         injected = hooked_wrapped_callback(self, ctx, self.callback)
 | |
|         await injected(*ctx.args, **ctx.kwargs)
 | |
| 
 | |
|     async def reinvoke(self, ctx, *, call_hooks=False):
 | |
|         ctx.command = self
 | |
|         await self._parse_arguments(ctx)
 | |
| 
 | |
|         if call_hooks:
 | |
|             await self.call_before_hooks(ctx)
 | |
| 
 | |
|         ctx.invoked_subcommand = None
 | |
|         try:
 | |
|             await self.callback(*ctx.args, **ctx.kwargs)
 | |
|         except:
 | |
|             ctx.command_failed = True
 | |
|             raise
 | |
|         finally:
 | |
|             if call_hooks:
 | |
|                 await self.call_after_hooks(ctx)
 | |
| 
 | |
|     def error(self, coro):
 | |
|         """A decorator that registers a coroutine as a local error handler.
 | |
| 
 | |
|         A local error handler is an :func:`.on_command_error` event limited to
 | |
|         a single command. However, the :func:`.on_command_error` is still
 | |
|         invoked afterwards as the catch-all.
 | |
| 
 | |
|         Parameters
 | |
|         -----------
 | |
|         coro: :ref:`coroutine <coroutine>`
 | |
|             The coroutine to register as the local error handler.
 | |
| 
 | |
|         Raises
 | |
|         -------
 | |
|         TypeError
 | |
|             The coroutine passed is not actually a coroutine.
 | |
|         """
 | |
| 
 | |
|         if not asyncio.iscoroutinefunction(coro):
 | |
|             raise TypeError('The error handler must be a coroutine.')
 | |
| 
 | |
|         self.on_error = coro
 | |
|         return coro
 | |
| 
 | |
|     def before_invoke(self, coro):
 | |
|         """A decorator that registers a coroutine as a pre-invoke hook.
 | |
| 
 | |
|         A pre-invoke hook is called directly before the command is
 | |
|         called. This makes it a useful function to set up database
 | |
|         connections or any type of set up required.
 | |
| 
 | |
|         This pre-invoke hook takes a sole parameter, a :class:`.Context`.
 | |
| 
 | |
|         See :meth:`.Bot.before_invoke` for more info.
 | |
| 
 | |
|         Parameters
 | |
|         -----------
 | |
|         coro: :ref:`coroutine <coroutine>`
 | |
|             The coroutine to register as the pre-invoke hook.
 | |
| 
 | |
|         Raises
 | |
|         -------
 | |
|         TypeError
 | |
|             The coroutine passed is not actually a coroutine.
 | |
|         """
 | |
|         if not asyncio.iscoroutinefunction(coro):
 | |
|             raise TypeError('The pre-invoke hook must be a coroutine.')
 | |
| 
 | |
|         self._before_invoke = coro
 | |
|         return coro
 | |
| 
 | |
|     def after_invoke(self, coro):
 | |
|         """A decorator that registers a coroutine as a post-invoke hook.
 | |
| 
 | |
|         A post-invoke hook is called directly after the command is
 | |
|         called. This makes it a useful function to clean-up database
 | |
|         connections or any type of clean up required.
 | |
| 
 | |
|         This post-invoke hook takes a sole parameter, a :class:`.Context`.
 | |
| 
 | |
|         See :meth:`.Bot.after_invoke` for more info.
 | |
| 
 | |
|         Parameters
 | |
|         -----------
 | |
|         coro: :ref:`coroutine <coroutine>`
 | |
|             The coroutine to register as the post-invoke hook.
 | |
| 
 | |
|         Raises
 | |
|         -------
 | |
|         TypeError
 | |
|             The coroutine passed is not actually a coroutine.
 | |
|         """
 | |
|         if not asyncio.iscoroutinefunction(coro):
 | |
|             raise TypeError('The post-invoke hook must be a coroutine.')
 | |
| 
 | |
|         self._after_invoke = coro
 | |
|         return coro
 | |
| 
 | |
|     @property
 | |
|     def cog_name(self):
 | |
|         """Optional[:class:`str`]: The name of the cog this command belongs to, if any."""
 | |
|         return type(self.cog).__cog_name__ if self.cog is not None else None
 | |
| 
 | |
|     @property
 | |
|     def short_doc(self):
 | |
|         """:class:`str`: Gets the "short" documentation of a command.
 | |
| 
 | |
|         By default, this is the :attr:`brief` attribute.
 | |
|         If that lookup leads to an empty string then the first line of the
 | |
|         :attr:`help` attribute is used instead.
 | |
|         """
 | |
|         if self.brief is not None:
 | |
|             return self.brief
 | |
|         if self.help is not None:
 | |
|             return self.help.split('\n', 1)[0]
 | |
|         return ''
 | |
| 
 | |
|     def _is_typing_optional(self, annotation):
 | |
|         try:
 | |
|             origin = annotation.__origin__
 | |
|         except AttributeError:
 | |
|             return False
 | |
| 
 | |
|         if origin is not typing.Union:
 | |
|             return False
 | |
| 
 | |
|         return annotation.__args__[-1] is type(None)
 | |
| 
 | |
|     @property
 | |
|     def signature(self):
 | |
|         """:class:`str`: Returns a POSIX-like signature useful for help command output."""
 | |
|         if self.usage is not None:
 | |
|             return self.usage
 | |
| 
 | |
| 
 | |
|         params = self.clean_params
 | |
|         if not params:
 | |
|             return ''
 | |
| 
 | |
|         result = []
 | |
|         for name, param in params.items():
 | |
|             greedy = isinstance(param.annotation, converters._Greedy)
 | |
| 
 | |
|             if param.default is not param.empty:
 | |
|                 # We don't want None or '' to trigger the [name=value] case and instead it should
 | |
|                 # do [name] since [name=None] or [name=] are not exactly useful for the user.
 | |
|                 should_print = param.default if isinstance(param.default, str) else param.default is not None
 | |
|                 if should_print:
 | |
|                     result.append('[%s=%s]' % (name, param.default) if not greedy else
 | |
|                                   '[%s=%s]...' % (name, param.default))
 | |
|                     continue
 | |
|                 else:
 | |
|                     result.append('[%s]' % name)
 | |
| 
 | |
|             elif param.kind == param.VAR_POSITIONAL:
 | |
|                 if self.require_var_positional:
 | |
|                     result.append('<%s...>' % name)
 | |
|                 else:
 | |
|                     result.append('[%s...]' % name)
 | |
|             elif greedy:
 | |
|                 result.append('[%s]...' % name)
 | |
|             elif self._is_typing_optional(param.annotation):
 | |
|                 result.append('[%s]' % name)
 | |
|             else:
 | |
|                 result.append('<%s>' % name)
 | |
| 
 | |
|         return ' '.join(result)
 | |
| 
 | |
|     async def can_run(self, ctx):
 | |
|         """|coro|
 | |
| 
 | |
|         Checks if the command can be executed by checking all the predicates
 | |
|         inside the :attr:`checks` attribute. This also checks whether the
 | |
|         command is disabled.
 | |
| 
 | |
|         .. versionchanged:: 1.3
 | |
|             Checks whether the command is disabled or not
 | |
| 
 | |
|         Parameters
 | |
|         -----------
 | |
|         ctx: :class:`.Context`
 | |
|             The ctx of the command currently being invoked.
 | |
| 
 | |
|         Raises
 | |
|         -------
 | |
|         :class:`CommandError`
 | |
|             Any command error that was raised during a check call will be propagated
 | |
|             by this function.
 | |
| 
 | |
|         Returns
 | |
|         --------
 | |
|         :class:`bool`
 | |
|             A boolean indicating if the command can be invoked.
 | |
|         """
 | |
| 
 | |
|         if not self.enabled:
 | |
|             raise DisabledCommand('{0.name} command is disabled'.format(self))
 | |
| 
 | |
|         original = ctx.command
 | |
|         ctx.command = self
 | |
| 
 | |
|         try:
 | |
|             if not await ctx.bot.can_run(ctx):
 | |
|                 raise CheckFailure('The global check functions for command {0.qualified_name} failed.'.format(self))
 | |
| 
 | |
|             cog = self.cog
 | |
|             if cog is not None:
 | |
|                 local_check = Cog._get_overridden_method(cog.cog_check)
 | |
|                 if local_check is not None:
 | |
|                     ret = await discord.utils.maybe_coroutine(local_check, ctx)
 | |
|                     if not ret:
 | |
|                         return False
 | |
| 
 | |
|             predicates = self.checks
 | |
|             if not predicates:
 | |
|                 # since we have no checks, then we just return True.
 | |
|                 return True
 | |
| 
 | |
|             return await discord.utils.async_all(predicate(ctx) for predicate in predicates)
 | |
|         finally:
 | |
|             ctx.command = original
 | |
| 
 | |
| class GroupMixin:
 | |
|     """A mixin that implements common functionality for classes that behave
 | |
|     similar to :class:`.Group` and are allowed to register commands.
 | |
| 
 | |
|     Attributes
 | |
|     -----------
 | |
|     all_commands: :class:`dict`
 | |
|         A mapping of command name to :class:`.Command`
 | |
|         objects.
 | |
|     case_insensitive: :class:`bool`
 | |
|         Whether the commands should be case insensitive. Defaults to ``False``.
 | |
|     """
 | |
|     def __init__(self, *args, **kwargs):
 | |
|         case_insensitive = kwargs.get('case_insensitive', False)
 | |
|         self.all_commands = _CaseInsensitiveDict() if case_insensitive else {}
 | |
|         self.case_insensitive = case_insensitive
 | |
|         super().__init__(*args, **kwargs)
 | |
| 
 | |
|     @property
 | |
|     def commands(self):
 | |
|         """Set[:class:`.Command`]: A unique set of commands without aliases that are registered."""
 | |
|         return set(self.all_commands.values())
 | |
| 
 | |
|     def recursively_remove_all_commands(self):
 | |
|         for command in self.all_commands.copy().values():
 | |
|             if isinstance(command, GroupMixin):
 | |
|                 command.recursively_remove_all_commands()
 | |
|             self.remove_command(command.name)
 | |
| 
 | |
|     def add_command(self, command):
 | |
|         """Adds a :class:`.Command` into the internal list of commands.
 | |
| 
 | |
|         This is usually not called, instead the :meth:`~.GroupMixin.command` or
 | |
|         :meth:`~.GroupMixin.group` shortcut decorators are used instead.
 | |
| 
 | |
|         .. versionchanged:: 1.4
 | |
|              Raise :exc:`.CommandRegistrationError` instead of generic :exc:`.ClientException`
 | |
| 
 | |
|         Parameters
 | |
|         -----------
 | |
|         command: :class:`Command`
 | |
|             The command to add.
 | |
| 
 | |
|         Raises
 | |
|         -------
 | |
|         :exc:`.CommandRegistrationError`
 | |
|             If the command or its alias is already registered by different command.
 | |
|         TypeError
 | |
|             If the command passed is not a subclass of :class:`.Command`.
 | |
|         """
 | |
| 
 | |
|         if not isinstance(command, Command):
 | |
|             raise TypeError('The command passed must be a subclass of Command')
 | |
| 
 | |
|         if isinstance(self, Command):
 | |
|             command.parent = self
 | |
| 
 | |
|         if command.name in self.all_commands:
 | |
|             raise CommandRegistrationError(command.name)
 | |
| 
 | |
|         self.all_commands[command.name] = command
 | |
|         for alias in command.aliases:
 | |
|             if alias in self.all_commands:
 | |
|                 raise CommandRegistrationError(alias, alias_conflict=True)
 | |
|             self.all_commands[alias] = command
 | |
| 
 | |
|     def remove_command(self, name):
 | |
|         """Remove a :class:`.Command` from the internal list
 | |
|         of commands.
 | |
| 
 | |
|         This could also be used as a way to remove aliases.
 | |
| 
 | |
|         Parameters
 | |
|         -----------
 | |
|         name: :class:`str`
 | |
|             The name of the command to remove.
 | |
| 
 | |
|         Returns
 | |
|         --------
 | |
|         Optional[:class:`.Command`]
 | |
|             The command that was removed. If the name is not valid then
 | |
|             ``None`` is returned instead.
 | |
|         """
 | |
|         command = self.all_commands.pop(name, None)
 | |
| 
 | |
|         # does not exist
 | |
|         if command is None:
 | |
|             return None
 | |
| 
 | |
|         if name in command.aliases:
 | |
|             # we're removing an alias so we don't want to remove the rest
 | |
|             return command
 | |
| 
 | |
|         # we're not removing the alias so let's delete the rest of them.
 | |
|         for alias in command.aliases:
 | |
|             self.all_commands.pop(alias, None)
 | |
|         return command
 | |
| 
 | |
|     def walk_commands(self):
 | |
|         """An iterator that recursively walks through all commands and subcommands.
 | |
| 
 | |
|         .. versionchanged:: 1.4
 | |
|             Duplicates due to aliases are no longer returned
 | |
| 
 | |
|         Yields
 | |
|         ------
 | |
|         Union[:class:`.Command`, :class:`.Group`]
 | |
|             A command or group from the internal list of commands.
 | |
|         """
 | |
|         for command in self.commands:
 | |
|             yield command
 | |
|             if isinstance(command, GroupMixin):
 | |
|                 yield from command.walk_commands()
 | |
| 
 | |
|     def get_command(self, name):
 | |
|         """Get a :class:`.Command` from the internal list
 | |
|         of commands.
 | |
| 
 | |
|         This could also be used as a way to get aliases.
 | |
| 
 | |
|         The name could be fully qualified (e.g. ``'foo bar'``) will get
 | |
|         the subcommand ``bar`` of the group command ``foo``. If a
 | |
|         subcommand is not found then ``None`` is returned just as usual.
 | |
| 
 | |
|         Parameters
 | |
|         -----------
 | |
|         name: :class:`str`
 | |
|             The name of the command to get.
 | |
| 
 | |
|         Returns
 | |
|         --------
 | |
|         Optional[:class:`Command`]
 | |
|             The command that was requested. If not found, returns ``None``.
 | |
|         """
 | |
| 
 | |
|         # fast path, no space in name.
 | |
|         if ' ' not in name:
 | |
|             return self.all_commands.get(name)
 | |
| 
 | |
|         names = name.split()
 | |
|         if not names:
 | |
|             return None
 | |
|         obj = self.all_commands.get(names[0])
 | |
|         if not isinstance(obj, GroupMixin):
 | |
|             return obj
 | |
| 
 | |
|         for name in names[1:]:
 | |
|             try:
 | |
|                 obj = obj.all_commands[name]
 | |
|             except (AttributeError, KeyError):
 | |
|                 return None
 | |
| 
 | |
|         return obj
 | |
| 
 | |
|     def command(self, *args, **kwargs):
 | |
|         """A shortcut decorator that invokes :func:`.command` and adds it to
 | |
|         the internal command list via :meth:`~.GroupMixin.add_command`.
 | |
| 
 | |
|         Returns
 | |
|         --------
 | |
|         Callable[..., :class:`Command`]
 | |
|             A decorator that converts the provided method into a Command, adds it to the bot, then returns it.
 | |
|         """
 | |
|         def decorator(func):
 | |
|             kwargs.setdefault('parent', self)
 | |
|             result = command(*args, **kwargs)(func)
 | |
|             self.add_command(result)
 | |
|             return result
 | |
| 
 | |
|         return decorator
 | |
| 
 | |
|     def group(self, *args, **kwargs):
 | |
|         """A shortcut decorator that invokes :func:`.group` and adds it to
 | |
|         the internal command list via :meth:`~.GroupMixin.add_command`.
 | |
| 
 | |
|         Returns
 | |
|         --------
 | |
|         Callable[..., :class:`Group`]
 | |
|             A decorator that converts the provided method into a Group, adds it to the bot, then returns it.
 | |
|         """
 | |
|         def decorator(func):
 | |
|             kwargs.setdefault('parent', self)
 | |
|             result = group(*args, **kwargs)(func)
 | |
|             self.add_command(result)
 | |
|             return result
 | |
| 
 | |
|         return decorator
 | |
| 
 | |
| class Group(GroupMixin, Command):
 | |
|     """A class that implements a grouping protocol for commands to be
 | |
|     executed as subcommands.
 | |
| 
 | |
|     This class is a subclass of :class:`.Command` and thus all options
 | |
|     valid in :class:`.Command` are valid in here as well.
 | |
| 
 | |
|     Attributes
 | |
|     -----------
 | |
|     invoke_without_command: :class:`bool`
 | |
|         Indicates if the group callback should begin parsing and
 | |
|         invocation only if no subcommand was found. Useful for
 | |
|         making it an error handling function to tell the user that
 | |
|         no subcommand was found or to have different functionality
 | |
|         in case no subcommand was found. If this is ``False``, then
 | |
|         the group callback will always be invoked first. This means
 | |
|         that the checks and the parsing dictated by its parameters
 | |
|         will be executed. Defaults to ``False``.
 | |
|     case_insensitive: :class:`bool`
 | |
|         Indicates if the group's commands should be case insensitive.
 | |
|         Defaults to ``False``.
 | |
|     """
 | |
|     def __init__(self, *args, **attrs):
 | |
|         self.invoke_without_command = attrs.pop('invoke_without_command', False)
 | |
|         super().__init__(*args, **attrs)
 | |
| 
 | |
|     def copy(self):
 | |
|         """Creates a copy of this :class:`Group`.
 | |
| 
 | |
|         Returns
 | |
|         --------
 | |
|         :class:`Group`
 | |
|             A new instance of this group.
 | |
|         """
 | |
|         ret = super().copy()
 | |
|         for cmd in self.commands:
 | |
|             ret.add_command(cmd.copy())
 | |
|         return ret
 | |
| 
 | |
|     async def invoke(self, ctx):
 | |
|         ctx.invoked_subcommand = None
 | |
|         ctx.subcommand_passed = None
 | |
|         early_invoke = not self.invoke_without_command
 | |
|         if early_invoke:
 | |
|             await self.prepare(ctx)
 | |
| 
 | |
|         view = ctx.view
 | |
|         previous = view.index
 | |
|         view.skip_ws()
 | |
|         trigger = view.get_word()
 | |
| 
 | |
|         if trigger:
 | |
|             ctx.subcommand_passed = trigger
 | |
|             ctx.invoked_subcommand = self.all_commands.get(trigger, None)
 | |
| 
 | |
|         if early_invoke:
 | |
|             injected = hooked_wrapped_callback(self, ctx, self.callback)
 | |
|             await injected(*ctx.args, **ctx.kwargs)
 | |
| 
 | |
|         if trigger and ctx.invoked_subcommand:
 | |
|             ctx.invoked_with = trigger
 | |
|             await ctx.invoked_subcommand.invoke(ctx)
 | |
|         elif not early_invoke:
 | |
|             # undo the trigger parsing
 | |
|             view.index = previous
 | |
|             view.previous = previous
 | |
|             await super().invoke(ctx)
 | |
| 
 | |
|     async def reinvoke(self, ctx, *, call_hooks=False):
 | |
|         ctx.invoked_subcommand = None
 | |
|         early_invoke = not self.invoke_without_command
 | |
|         if early_invoke:
 | |
|             ctx.command = self
 | |
|             await self._parse_arguments(ctx)
 | |
| 
 | |
|             if call_hooks:
 | |
|                 await self.call_before_hooks(ctx)
 | |
| 
 | |
|         view = ctx.view
 | |
|         previous = view.index
 | |
|         view.skip_ws()
 | |
|         trigger = view.get_word()
 | |
| 
 | |
|         if trigger:
 | |
|             ctx.subcommand_passed = trigger
 | |
|             ctx.invoked_subcommand = self.all_commands.get(trigger, None)
 | |
| 
 | |
|         if early_invoke:
 | |
|             try:
 | |
|                 await self.callback(*ctx.args, **ctx.kwargs)
 | |
|             except:
 | |
|                 ctx.command_failed = True
 | |
|                 raise
 | |
|             finally:
 | |
|                 if call_hooks:
 | |
|                     await self.call_after_hooks(ctx)
 | |
| 
 | |
|         if trigger and ctx.invoked_subcommand:
 | |
|             ctx.invoked_with = trigger
 | |
|             await ctx.invoked_subcommand.reinvoke(ctx, call_hooks=call_hooks)
 | |
|         elif not early_invoke:
 | |
|             # undo the trigger parsing
 | |
|             view.index = previous
 | |
|             view.previous = previous
 | |
|             await super().reinvoke(ctx, call_hooks=call_hooks)
 | |
| 
 | |
| # Decorators
 | |
| 
 | |
| def command(name=None, cls=None, **attrs):
 | |
|     """A decorator that transforms a function into a :class:`.Command`
 | |
|     or if called with :func:`.group`, :class:`.Group`.
 | |
| 
 | |
|     By default the ``help`` attribute is received automatically from the
 | |
|     docstring of the function and is cleaned up with the use of
 | |
|     ``inspect.cleandoc``. If the docstring is ``bytes``, then it is decoded
 | |
|     into :class:`str` using utf-8 encoding.
 | |
| 
 | |
|     All checks added using the :func:`.check` & co. decorators are added into
 | |
|     the function. There is no way to supply your own checks through this
 | |
|     decorator.
 | |
| 
 | |
|     Parameters
 | |
|     -----------
 | |
|     name: :class:`str`
 | |
|         The name to create the command with. By default this uses the
 | |
|         function name unchanged.
 | |
|     cls
 | |
|         The class to construct with. By default this is :class:`.Command`.
 | |
|         You usually do not change this.
 | |
|     attrs
 | |
|         Keyword arguments to pass into the construction of the class denoted
 | |
|         by ``cls``.
 | |
| 
 | |
|     Raises
 | |
|     -------
 | |
|     TypeError
 | |
|         If the function is not a coroutine or is already a command.
 | |
|     """
 | |
|     if cls is None:
 | |
|         cls = Command
 | |
| 
 | |
|     def decorator(func):
 | |
|         if isinstance(func, Command):
 | |
|             raise TypeError('Callback is already a command.')
 | |
|         return cls(func, name=name, **attrs)
 | |
| 
 | |
|     return decorator
 | |
| 
 | |
| def group(name=None, **attrs):
 | |
|     """A decorator that transforms a function into a :class:`.Group`.
 | |
| 
 | |
|     This is similar to the :func:`.command` decorator but the ``cls``
 | |
|     parameter is set to :class:`Group` by default.
 | |
| 
 | |
|     .. versionchanged:: 1.1
 | |
|         The ``cls`` parameter can now be passed.
 | |
|     """
 | |
| 
 | |
|     attrs.setdefault('cls', Group)
 | |
|     return command(name=name, **attrs)
 | |
| 
 | |
| def check(predicate):
 | |
|     r"""A decorator that adds a check to the :class:`.Command` or its
 | |
|     subclasses. These checks could be accessed via :attr:`.Command.checks`.
 | |
| 
 | |
|     These checks should be predicates that take in a single parameter taking
 | |
|     a :class:`.Context`. If the check returns a ``False``\-like value then
 | |
|     during invocation a :exc:`.CheckFailure` exception is raised and sent to
 | |
|     the :func:`.on_command_error` event.
 | |
| 
 | |
|     If an exception should be thrown in the predicate then it should be a
 | |
|     subclass of :exc:`.CommandError`. Any exception not subclassed from it
 | |
|     will be propagated while those subclassed will be sent to
 | |
|     :func:`.on_command_error`.
 | |
| 
 | |
|     A special attribute named ``predicate`` is bound to the value
 | |
|     returned by this decorator to retrieve the predicate passed to the
 | |
|     decorator. This allows the following introspection and chaining to be done:
 | |
| 
 | |
|     .. code-block:: python3
 | |
| 
 | |
|         def owner_or_permissions(**perms):
 | |
|             original = commands.has_permissions(**perms).predicate
 | |
|             async def extended_check(ctx):
 | |
|                 if ctx.guild is None:
 | |
|                     return False
 | |
|                 return ctx.guild.owner_id == ctx.author.id or await original(ctx)
 | |
|             return commands.check(extended_check)
 | |
| 
 | |
|     .. note::
 | |
| 
 | |
|         The function returned by ``predicate`` is **always** a coroutine,
 | |
|         even if the original function was not a coroutine.
 | |
| 
 | |
|     .. versionchanged:: 1.3
 | |
|         The ``predicate`` attribute was added.
 | |
| 
 | |
|     Examples
 | |
|     ---------
 | |
| 
 | |
|     Creating a basic check to see if the command invoker is you.
 | |
| 
 | |
|     .. code-block:: python3
 | |
| 
 | |
|         def check_if_it_is_me(ctx):
 | |
|             return ctx.message.author.id == 85309593344815104
 | |
| 
 | |
|         @bot.command()
 | |
|         @commands.check(check_if_it_is_me)
 | |
|         async def only_for_me(ctx):
 | |
|             await ctx.send('I know you!')
 | |
| 
 | |
|     Transforming common checks into its own decorator:
 | |
| 
 | |
|     .. code-block:: python3
 | |
| 
 | |
|         def is_me():
 | |
|             def predicate(ctx):
 | |
|                 return ctx.message.author.id == 85309593344815104
 | |
|             return commands.check(predicate)
 | |
| 
 | |
|         @bot.command()
 | |
|         @is_me()
 | |
|         async def only_me(ctx):
 | |
|             await ctx.send('Only you!')
 | |
| 
 | |
|     Parameters
 | |
|     -----------
 | |
|     predicate: Callable[[:class:`Context`], :class:`bool`]
 | |
|         The predicate to check if the command should be invoked.
 | |
|     """
 | |
| 
 | |
|     def decorator(func):
 | |
|         if isinstance(func, Command):
 | |
|             func.checks.append(predicate)
 | |
|         else:
 | |
|             if not hasattr(func, '__commands_checks__'):
 | |
|                 func.__commands_checks__ = []
 | |
| 
 | |
|             func.__commands_checks__.append(predicate)
 | |
| 
 | |
|         return func
 | |
| 
 | |
|     if inspect.iscoroutinefunction(predicate):
 | |
|         decorator.predicate = predicate
 | |
|     else:
 | |
|         @functools.wraps(predicate)
 | |
|         async def wrapper(ctx):
 | |
|             return predicate(ctx)
 | |
|         decorator.predicate = wrapper
 | |
| 
 | |
|     return decorator
 | |
| 
 | |
| def check_any(*checks):
 | |
|     r"""A :func:`check` that is added that checks if any of the checks passed
 | |
|     will pass, i.e. using logical OR.
 | |
| 
 | |
|     If all checks fail then :exc:`.CheckAnyFailure` is raised to signal the failure.
 | |
|     It inherits from :exc:`.CheckFailure`.
 | |
| 
 | |
|     .. note::
 | |
| 
 | |
|         The ``predicate`` attribute for this function **is** a coroutine.
 | |
| 
 | |
|     .. versionadded:: 1.3
 | |
| 
 | |
|     Parameters
 | |
|     ------------
 | |
|     \*checks: Callable[[:class:`Context`], :class:`bool`]
 | |
|         An argument list of checks that have been decorated with
 | |
|         the :func:`check` decorator.
 | |
| 
 | |
|     Raises
 | |
|     -------
 | |
|     TypeError
 | |
|         A check passed has not been decorated with the :func:`check`
 | |
|         decorator.
 | |
| 
 | |
|     Examples
 | |
|     ---------
 | |
| 
 | |
|     Creating a basic check to see if it's the bot owner or
 | |
|     the server owner:
 | |
| 
 | |
|     .. code-block:: python3
 | |
| 
 | |
|         def is_guild_owner():
 | |
|             def predicate(ctx):
 | |
|                 return ctx.guild is not None and ctx.guild.owner_id == ctx.author.id
 | |
|             return commands.check(predicate)
 | |
| 
 | |
|         @bot.command()
 | |
|         @commands.check_any(commands.is_owner(), is_guild_owner())
 | |
|         async def only_for_owners(ctx):
 | |
|             await ctx.send('Hello mister owner!')
 | |
|     """
 | |
| 
 | |
|     unwrapped = []
 | |
|     for wrapped in checks:
 | |
|         try:
 | |
|             pred = wrapped.predicate
 | |
|         except AttributeError:
 | |
|             raise TypeError('%r must be wrapped by commands.check decorator' % wrapped) from None
 | |
|         else:
 | |
|             unwrapped.append(pred)
 | |
| 
 | |
|     async def predicate(ctx):
 | |
|         errors = []
 | |
|         for func in unwrapped:
 | |
|             try:
 | |
|                 value = await func(ctx)
 | |
|             except CheckFailure as e:
 | |
|                 errors.append(e)
 | |
|             else:
 | |
|                 if value:
 | |
|                     return True
 | |
|         # if we're here, all checks failed
 | |
|         raise CheckAnyFailure(unwrapped, errors)
 | |
| 
 | |
|     return check(predicate)
 | |
| 
 | |
| def has_role(item):
 | |
|     """A :func:`.check` that is added that checks if the member invoking the
 | |
|     command has the role specified via the name or ID specified.
 | |
| 
 | |
|     If a string is specified, you must give the exact name of the role, including
 | |
|     caps and spelling.
 | |
| 
 | |
|     If an integer is specified, you must give the exact snowflake ID of the role.
 | |
| 
 | |
|     If the message is invoked in a private message context then the check will
 | |
|     return ``False``.
 | |
| 
 | |
|     This check raises one of two special exceptions, :exc:`.MissingRole` if the user
 | |
|     is missing a role, or :exc:`.NoPrivateMessage` if it is used in a private message.
 | |
|     Both inherit from :exc:`.CheckFailure`.
 | |
| 
 | |
|     .. versionchanged:: 1.1
 | |
| 
 | |
|         Raise :exc:`.MissingRole` or :exc:`.NoPrivateMessage`
 | |
|         instead of generic :exc:`.CheckFailure`
 | |
| 
 | |
|     Parameters
 | |
|     -----------
 | |
|     item: Union[:class:`int`, :class:`str`]
 | |
|         The name or ID of the role to check.
 | |
|     """
 | |
| 
 | |
|     def predicate(ctx):
 | |
|         if not isinstance(ctx.channel, discord.abc.GuildChannel):
 | |
|             raise NoPrivateMessage()
 | |
| 
 | |
|         if isinstance(item, int):
 | |
|             role = discord.utils.get(ctx.author.roles, id=item)
 | |
|         else:
 | |
|             role = discord.utils.get(ctx.author.roles, name=item)
 | |
|         if role is None:
 | |
|             raise MissingRole(item)
 | |
|         return True
 | |
| 
 | |
|     return check(predicate)
 | |
| 
 | |
| def has_any_role(*items):
 | |
|     r"""A :func:`.check` that is added that checks if the member invoking the
 | |
|     command has **any** of the roles specified. This means that if they have
 | |
|     one out of the three roles specified, then this check will return `True`.
 | |
| 
 | |
|     Similar to :func:`.has_role`\, the names or IDs passed in must be exact.
 | |
| 
 | |
|     This check raises one of two special exceptions, :exc:`.MissingAnyRole` if the user
 | |
|     is missing all roles, or :exc:`.NoPrivateMessage` if it is used in a private message.
 | |
|     Both inherit from :exc:`.CheckFailure`.
 | |
| 
 | |
|     .. versionchanged:: 1.1
 | |
| 
 | |
|         Raise :exc:`.MissingAnyRole` or :exc:`.NoPrivateMessage`
 | |
|         instead of generic :exc:`.CheckFailure`
 | |
| 
 | |
|     Parameters
 | |
|     -----------
 | |
|     items: List[Union[:class:`str`, :class:`int`]]
 | |
|         An argument list of names or IDs to check that the member has roles wise.
 | |
| 
 | |
|     Example
 | |
|     --------
 | |
| 
 | |
|     .. code-block:: python3
 | |
| 
 | |
|         @bot.command()
 | |
|         @commands.has_any_role('Library Devs', 'Moderators', 492212595072434186)
 | |
|         async def cool(ctx):
 | |
|             await ctx.send('You are cool indeed')
 | |
|     """
 | |
|     def predicate(ctx):
 | |
|         if not isinstance(ctx.channel, discord.abc.GuildChannel):
 | |
|             raise NoPrivateMessage()
 | |
| 
 | |
|         getter = functools.partial(discord.utils.get, ctx.author.roles)
 | |
|         if any(getter(id=item) is not None if isinstance(item, int) else getter(name=item) is not None for item in items):
 | |
|             return True
 | |
|         raise MissingAnyRole(items)
 | |
| 
 | |
|     return check(predicate)
 | |
| 
 | |
| def bot_has_role(item):
 | |
|     """Similar to :func:`.has_role` except checks if the bot itself has the
 | |
|     role.
 | |
| 
 | |
|     This check raises one of two special exceptions, :exc:`.BotMissingRole` if the bot
 | |
|     is missing the role, or :exc:`.NoPrivateMessage` if it is used in a private message.
 | |
|     Both inherit from :exc:`.CheckFailure`.
 | |
| 
 | |
|     .. versionchanged:: 1.1
 | |
| 
 | |
|         Raise :exc:`.BotMissingRole` or :exc:`.NoPrivateMessage`
 | |
|         instead of generic :exc:`.CheckFailure`
 | |
|     """
 | |
| 
 | |
|     def predicate(ctx):
 | |
|         ch = ctx.channel
 | |
|         if not isinstance(ch, discord.abc.GuildChannel):
 | |
|             raise NoPrivateMessage()
 | |
| 
 | |
|         me = ch.guild.me
 | |
|         if isinstance(item, int):
 | |
|             role = discord.utils.get(me.roles, id=item)
 | |
|         else:
 | |
|             role = discord.utils.get(me.roles, name=item)
 | |
|         if role is None:
 | |
|             raise BotMissingRole(item)
 | |
|         return True
 | |
|     return check(predicate)
 | |
| 
 | |
| def bot_has_any_role(*items):
 | |
|     """Similar to :func:`.has_any_role` except checks if the bot itself has
 | |
|     any of the roles listed.
 | |
| 
 | |
|     This check raises one of two special exceptions, :exc:`.BotMissingAnyRole` if the bot
 | |
|     is missing all roles, or :exc:`.NoPrivateMessage` if it is used in a private message.
 | |
|     Both inherit from :exc:`.CheckFailure`.
 | |
| 
 | |
|     .. versionchanged:: 1.1
 | |
| 
 | |
|         Raise :exc:`.BotMissingAnyRole` or :exc:`.NoPrivateMessage`
 | |
|         instead of generic checkfailure
 | |
|     """
 | |
|     def predicate(ctx):
 | |
|         ch = ctx.channel
 | |
|         if not isinstance(ch, discord.abc.GuildChannel):
 | |
|             raise NoPrivateMessage()
 | |
| 
 | |
|         me = ch.guild.me
 | |
|         getter = functools.partial(discord.utils.get, me.roles)
 | |
|         if any(getter(id=item) is not None if isinstance(item, int) else getter(name=item) is not None for item in items):
 | |
|             return True
 | |
|         raise BotMissingAnyRole(items)
 | |
|     return check(predicate)
 | |
| 
 | |
| def has_permissions(**perms):
 | |
|     """A :func:`.check` that is added that checks if the member has all of
 | |
|     the permissions necessary.
 | |
| 
 | |
|     Note that this check operates on the current channel permissions, not the
 | |
|     guild wide permissions.
 | |
| 
 | |
|     The permissions passed in must be exactly like the properties shown under
 | |
|     :class:`.discord.Permissions`.
 | |
| 
 | |
|     This check raises a special exception, :exc:`.MissingPermissions`
 | |
|     that is inherited from :exc:`.CheckFailure`.
 | |
| 
 | |
|     Parameters
 | |
|     ------------
 | |
|     perms
 | |
|         An argument list of permissions to check for.
 | |
| 
 | |
|     Example
 | |
|     ---------
 | |
| 
 | |
|     .. code-block:: python3
 | |
| 
 | |
|         @bot.command()
 | |
|         @commands.has_permissions(manage_messages=True)
 | |
|         async def test(ctx):
 | |
|             await ctx.send('You can manage messages.')
 | |
| 
 | |
|     """
 | |
| 
 | |
|     invalid = set(perms) - set(discord.Permissions.VALID_FLAGS)
 | |
|     if invalid:
 | |
|         raise TypeError('Invalid permission(s): %s' % (', '.join(invalid)))
 | |
| 
 | |
|     def predicate(ctx):
 | |
|         ch = ctx.channel
 | |
|         permissions = ch.permissions_for(ctx.author)
 | |
| 
 | |
|         missing = [perm for perm, value in perms.items() if getattr(permissions, perm) != value]
 | |
| 
 | |
|         if not missing:
 | |
|             return True
 | |
| 
 | |
|         raise MissingPermissions(missing)
 | |
| 
 | |
|     return check(predicate)
 | |
| 
 | |
| def bot_has_permissions(**perms):
 | |
|     """Similar to :func:`.has_permissions` except checks if the bot itself has
 | |
|     the permissions listed.
 | |
| 
 | |
|     This check raises a special exception, :exc:`.BotMissingPermissions`
 | |
|     that is inherited from :exc:`.CheckFailure`.
 | |
|     """
 | |
| 
 | |
|     invalid = set(perms) - set(discord.Permissions.VALID_FLAGS)
 | |
|     if invalid:
 | |
|         raise TypeError('Invalid permission(s): %s' % (', '.join(invalid)))
 | |
| 
 | |
|     def predicate(ctx):
 | |
|         guild = ctx.guild
 | |
|         me = guild.me if guild is not None else ctx.bot.user
 | |
|         permissions = ctx.channel.permissions_for(me)
 | |
| 
 | |
|         missing = [perm for perm, value in perms.items() if getattr(permissions, perm) != value]
 | |
| 
 | |
|         if not missing:
 | |
|             return True
 | |
| 
 | |
|         raise BotMissingPermissions(missing)
 | |
| 
 | |
|     return check(predicate)
 | |
| 
 | |
| def has_guild_permissions(**perms):
 | |
|     """Similar to :func:`.has_permissions`, but operates on guild wide
 | |
|     permissions instead of the current channel permissions.
 | |
| 
 | |
|     If this check is called in a DM context, it will raise an
 | |
|     exception, :exc:`.NoPrivateMessage`.
 | |
| 
 | |
|     .. versionadded:: 1.3
 | |
|     """
 | |
| 
 | |
|     invalid = set(perms) - set(discord.Permissions.VALID_FLAGS)
 | |
|     if invalid:
 | |
|         raise TypeError('Invalid permission(s): %s' % (', '.join(invalid)))
 | |
| 
 | |
|     def predicate(ctx):
 | |
|         if not ctx.guild:
 | |
|             raise NoPrivateMessage
 | |
| 
 | |
|         permissions = ctx.author.guild_permissions
 | |
|         missing = [perm for perm, value in perms.items() if getattr(permissions, perm) != value]
 | |
| 
 | |
|         if not missing:
 | |
|             return True
 | |
| 
 | |
|         raise MissingPermissions(missing)
 | |
| 
 | |
|     return check(predicate)
 | |
| 
 | |
| def bot_has_guild_permissions(**perms):
 | |
|     """Similar to :func:`.has_guild_permissions`, but checks the bot
 | |
|     members guild permissions.
 | |
| 
 | |
|     .. versionadded:: 1.3
 | |
|     """
 | |
| 
 | |
|     invalid = set(perms) - set(discord.Permissions.VALID_FLAGS)
 | |
|     if invalid:
 | |
|         raise TypeError('Invalid permission(s): %s' % (', '.join(invalid)))
 | |
| 
 | |
|     def predicate(ctx):
 | |
|         if not ctx.guild:
 | |
|             raise NoPrivateMessage
 | |
| 
 | |
|         permissions = ctx.me.guild_permissions
 | |
|         missing = [perm for perm, value in perms.items() if getattr(permissions, perm) != value]
 | |
| 
 | |
|         if not missing:
 | |
|             return True
 | |
| 
 | |
|         raise BotMissingPermissions(missing)
 | |
| 
 | |
|     return check(predicate)
 | |
| 
 | |
| def dm_only():
 | |
|     """A :func:`.check` that indicates this command must only be used in a
 | |
|     DM context. Only private messages are allowed when
 | |
|     using the command.
 | |
| 
 | |
|     This check raises a special exception, :exc:`.PrivateMessageOnly`
 | |
|     that is inherited from :exc:`.CheckFailure`.
 | |
| 
 | |
|     .. versionadded:: 1.1
 | |
|     """
 | |
| 
 | |
|     def predicate(ctx):
 | |
|         if ctx.guild is not None:
 | |
|             raise PrivateMessageOnly()
 | |
|         return True
 | |
| 
 | |
|     return check(predicate)
 | |
| 
 | |
| def guild_only():
 | |
|     """A :func:`.check` that indicates this command must only be used in a
 | |
|     guild context only. Basically, no private messages are allowed when
 | |
|     using the command.
 | |
| 
 | |
|     This check raises a special exception, :exc:`.NoPrivateMessage`
 | |
|     that is inherited from :exc:`.CheckFailure`.
 | |
|     """
 | |
| 
 | |
|     def predicate(ctx):
 | |
|         if ctx.guild is None:
 | |
|             raise NoPrivateMessage()
 | |
|         return True
 | |
| 
 | |
|     return check(predicate)
 | |
| 
 | |
| def is_owner():
 | |
|     """A :func:`.check` that checks if the person invoking this command is the
 | |
|     owner of the bot.
 | |
| 
 | |
|     This is powered by :meth:`.Bot.is_owner`.
 | |
| 
 | |
|     This check raises a special exception, :exc:`.NotOwner` that is derived
 | |
|     from :exc:`.CheckFailure`.
 | |
|     """
 | |
| 
 | |
|     async def predicate(ctx):
 | |
|         if not await ctx.bot.is_owner(ctx.author):
 | |
|             raise NotOwner('You do not own this bot.')
 | |
|         return True
 | |
| 
 | |
|     return check(predicate)
 | |
| 
 | |
| def is_nsfw():
 | |
|     """A :func:`.check` that checks if the channel is a NSFW channel.
 | |
| 
 | |
|     This check raises a special exception, :exc:`.NSFWChannelRequired`
 | |
|     that is derived from :exc:`.CheckFailure`.
 | |
| 
 | |
|     .. versionchanged:: 1.1
 | |
| 
 | |
|         Raise :exc:`.NSFWChannelRequired` instead of generic :exc:`.CheckFailure`.
 | |
|         DM channels will also now pass this check.
 | |
|     """
 | |
|     def pred(ctx):
 | |
|         ch = ctx.channel
 | |
|         if ctx.guild is None or (isinstance(ch, discord.TextChannel) and ch.is_nsfw()):
 | |
|             return True
 | |
|         raise NSFWChannelRequired(ch)
 | |
|     return check(pred)
 | |
| 
 | |
| def cooldown(rate, per, type=BucketType.default):
 | |
|     """A decorator that adds a cooldown to a :class:`.Command`
 | |
| 
 | |
|     A cooldown allows a command to only be used a specific amount
 | |
|     of times in a specific time frame. These cooldowns can be based
 | |
|     either on a per-guild, per-channel, per-user, per-role or global basis.
 | |
|     Denoted by the third argument of ``type`` which must be of enum
 | |
|     type :class:`.BucketType`.
 | |
| 
 | |
|     If a cooldown is triggered, then :exc:`.CommandOnCooldown` is triggered in
 | |
|     :func:`.on_command_error` and the local error handler.
 | |
| 
 | |
|     A command can only have a single cooldown.
 | |
| 
 | |
|     Parameters
 | |
|     ------------
 | |
|     rate: :class:`int`
 | |
|         The number of times a command can be used before triggering a cooldown.
 | |
|     per: :class:`float`
 | |
|         The amount of seconds to wait for a cooldown when it's been triggered.
 | |
|     type: :class:`.BucketType`
 | |
|         The type of cooldown to have.
 | |
|     """
 | |
| 
 | |
|     def decorator(func):
 | |
|         if isinstance(func, Command):
 | |
|             func._buckets = CooldownMapping(Cooldown(rate, per, type))
 | |
|         else:
 | |
|             func.__commands_cooldown__ = Cooldown(rate, per, type)
 | |
|         return func
 | |
|     return decorator
 | |
| 
 | |
| def max_concurrency(number, per=BucketType.default, *, wait=False):
 | |
|     """A decorator that adds a maximum concurrency to a :class:`.Command` or its subclasses.
 | |
| 
 | |
|     This enables you to only allow a certain number of command invocations at the same time,
 | |
|     for example if a command takes too long or if only one user can use it at a time. This
 | |
|     differs from a cooldown in that there is no set waiting period or token bucket -- only
 | |
|     a set number of people can run the command.
 | |
| 
 | |
|     .. versionadded:: 1.3
 | |
| 
 | |
|     Parameters
 | |
|     -------------
 | |
|     number: :class:`int`
 | |
|         The maximum number of invocations of this command that can be running at the same time.
 | |
|     per: :class:`.BucketType`
 | |
|         The bucket that this concurrency is based on, e.g. ``BucketType.guild`` would allow
 | |
|         it to be used up to ``number`` times per guild.
 | |
|     wait: :class:`bool`
 | |
|         Whether the command should wait for the queue to be over. If this is set to ``False``
 | |
|         then instead of waiting until the command can run again, the command raises
 | |
|         :exc:`.MaxConcurrencyReached` to its error handler. If this is set to ``True``
 | |
|         then the command waits until it can be executed.
 | |
|     """
 | |
| 
 | |
|     def decorator(func):
 | |
|         value = MaxConcurrency(number, per=per, wait=wait)
 | |
|         if isinstance(func, Command):
 | |
|             func._max_concurrency = value
 | |
|         else:
 | |
|             func.__commands_max_concurrency__ = value
 | |
|         return func
 | |
|     return decorator
 | |
| 
 | |
| def before_invoke(coro):
 | |
|     """A decorator that registers a coroutine as a pre-invoke hook.
 | |
| 
 | |
|     This allows you to refer to one before invoke hook for several commands that
 | |
|     do not have to be within the same cog.
 | |
| 
 | |
|     .. versionadded:: 1.4
 | |
| 
 | |
|     Example
 | |
|     ---------
 | |
| 
 | |
|     .. code-block:: python3
 | |
| 
 | |
|         async def record_usage(ctx):
 | |
|             print(ctx.author, 'used', ctx.command, 'at', ctx.message.created_at)
 | |
| 
 | |
|         @bot.command()
 | |
|         @commands.before_invoke(record_usage)
 | |
|         async def who(ctx): # Output: <User> used who at <Time>
 | |
|             await ctx.send('i am a bot')
 | |
| 
 | |
|         class What(commands.Cog):
 | |
| 
 | |
|             @commands.before_invoke(record_usage)
 | |
|             @commands.command()
 | |
|             async def when(self, ctx): # Output: <User> used when at <Time>
 | |
|                 await ctx.send('and i have existed since {}'.format(ctx.bot.user.created_at))
 | |
| 
 | |
|             @commands.command()
 | |
|             async def where(self, ctx): # Output: <Nothing>
 | |
|                 await ctx.send('on Discord')
 | |
| 
 | |
|             @commands.command()
 | |
|             async def why(self, ctx): # Output: <Nothing>
 | |
|                 await ctx.send('because someone made me')
 | |
| 
 | |
|         bot.add_cog(What())
 | |
|     """
 | |
|     def decorator(func):
 | |
|         if isinstance(func, Command):
 | |
|             func.before_invoke(coro)
 | |
|         else:
 | |
|             func.__before_invoke__ = coro
 | |
|         return func
 | |
|     return decorator
 | |
| 
 | |
| def after_invoke(coro):
 | |
|     """A decorator that registers a coroutine as a post-invoke hook.
 | |
| 
 | |
|     This allows you to refer to one after invoke hook for several commands that
 | |
|     do not have to be within the same cog.
 | |
| 
 | |
|     .. versionadded:: 1.4
 | |
|     """
 | |
|     def decorator(func):
 | |
|         if isinstance(func, Command):
 | |
|             func.after_invoke(coro)
 | |
|         else:
 | |
|             func.__after_invoke__ = coro
 | |
|         return func
 | |
|     return decorator
 |