2610 lines
88 KiB
Python
2610 lines
88 KiB
Python
"""
|
|
The MIT License (MIT)
|
|
|
|
Copyright (c) 2015-present Rapptz
|
|
|
|
Permission is hereby granted, free of charge, to any person obtaining a
|
|
copy of this software and associated documentation files (the "Software"),
|
|
to deal in the Software without restriction, including without limitation
|
|
the rights to use, copy, modify, merge, publish, distribute, sublicense,
|
|
and/or sell copies of the Software, and to permit persons to whom the
|
|
Software is furnished to do so, subject to the following conditions:
|
|
|
|
The above copyright notice and this permission notice shall be included in
|
|
all copies or substantial portions of the Software.
|
|
|
|
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
|
|
OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
|
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
|
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
|
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
|
|
FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
|
|
DEALINGS IN THE SOFTWARE.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
|
|
from typing import (
|
|
Any,
|
|
Callable,
|
|
Dict,
|
|
Generator,
|
|
Generic,
|
|
Iterable,
|
|
Literal,
|
|
List,
|
|
Optional,
|
|
Union,
|
|
Set,
|
|
Tuple,
|
|
TypeVar,
|
|
Type,
|
|
TYPE_CHECKING,
|
|
cast,
|
|
overload,
|
|
)
|
|
import asyncio
|
|
import functools
|
|
import inspect
|
|
import datetime
|
|
from collections import defaultdict
|
|
from operator import itemgetter
|
|
|
|
import discord
|
|
|
|
from .errors import *
|
|
from .cooldowns import Cooldown, BucketType, CooldownMapping, MaxConcurrency, DynamicCooldownMapping
|
|
from .converter import (
|
|
CONVERTER_MAPPING,
|
|
Converter,
|
|
MemberConverter,
|
|
RoleConverter,
|
|
run_converters,
|
|
get_converter,
|
|
Greedy,
|
|
Option,
|
|
)
|
|
from ._types import _BaseCommand
|
|
from .cog import Cog
|
|
from .context import Context
|
|
from .flags import FlagConverter
|
|
|
|
|
|
if TYPE_CHECKING:
|
|
from typing_extensions import Concatenate, ParamSpec, TypeGuard
|
|
|
|
from discord.message import Message
|
|
from discord.types.interactions import EditApplicationCommand, ApplicationCommandInteractionDataOption
|
|
|
|
from ._types import (
|
|
Coro,
|
|
CoroFunc,
|
|
Check,
|
|
Hook,
|
|
Error,
|
|
)
|
|
|
|
|
|
__all__ = (
|
|
"Command",
|
|
"Group",
|
|
"GroupMixin",
|
|
"command",
|
|
"group",
|
|
"has_role",
|
|
"has_permissions",
|
|
"has_any_role",
|
|
"check",
|
|
"check_any",
|
|
"before_invoke",
|
|
"after_invoke",
|
|
"bot_has_role",
|
|
"bot_has_permissions",
|
|
"bot_has_any_role",
|
|
"cooldown",
|
|
"dynamic_cooldown",
|
|
"max_concurrency",
|
|
"dm_only",
|
|
"guild_only",
|
|
"is_owner",
|
|
"is_nsfw",
|
|
"has_guild_permissions",
|
|
"bot_has_guild_permissions",
|
|
)
|
|
|
|
MISSING: Any = discord.utils.MISSING
|
|
|
|
T = TypeVar("T")
|
|
CogT = TypeVar("CogT", bound="Cog")
|
|
CommandT = TypeVar("CommandT", bound="Command")
|
|
ContextT = TypeVar("ContextT", bound="Context")
|
|
# CHT = TypeVar('CHT', bound='Check')
|
|
GroupT = TypeVar("GroupT", bound="Group")
|
|
HookT = TypeVar("HookT", bound="Hook")
|
|
ErrorT = TypeVar("ErrorT", bound="Error")
|
|
|
|
REVERSED_CONVERTER_MAPPING = {v: k for k, v in CONVERTER_MAPPING.items()}
|
|
application_option_type_lookup = {
|
|
str: 3,
|
|
bool: 5,
|
|
int: 4,
|
|
(
|
|
discord.Member,
|
|
discord.User,
|
|
): 6, # Preferably discord.abc.User, but 'Protocols with non-method members don't support issubclass()'
|
|
(discord.abc.GuildChannel, discord.Thread): 7,
|
|
discord.Role: 8,
|
|
float: 10,
|
|
}
|
|
application_option_channel_types = {
|
|
discord.VoiceChannel: [2],
|
|
discord.TextChannel: [0, 5, 6],
|
|
discord.CategoryChannel: [4],
|
|
discord.Thread: [10, 11, 12],
|
|
discord.StageChannel: [13],
|
|
}
|
|
|
|
|
|
if TYPE_CHECKING:
|
|
P = ParamSpec("P")
|
|
else:
|
|
P = TypeVar("P")
|
|
|
|
|
|
def unwrap_function(function: Callable[..., Any]) -> Callable[..., Any]:
|
|
partial = functools.partial
|
|
while True:
|
|
if hasattr(function, "__wrapped__"):
|
|
function = function.__wrapped__
|
|
elif isinstance(function, partial):
|
|
function = function.func
|
|
else:
|
|
return function
|
|
|
|
|
|
def get_signature_parameters(
|
|
function: Callable[..., Any], globalns: Dict[str, Any]
|
|
) -> Tuple[Dict[str, inspect.Parameter], Dict[str, str]]:
|
|
signature = inspect.signature(function)
|
|
params = {}
|
|
cache: Dict[str, Any] = {}
|
|
descriptions = defaultdict(lambda: "no description")
|
|
eval_annotation = discord.utils.evaluate_annotation
|
|
for name, parameter in signature.parameters.items():
|
|
annotation = parameter.annotation
|
|
if isinstance(parameter.default, Option): # type: ignore
|
|
option = parameter.default
|
|
parameter = parameter.replace(default=option.default)
|
|
if option.name is not MISSING:
|
|
name = option.name
|
|
parameter.replace(name=name)
|
|
|
|
descriptions[name] = option.description
|
|
|
|
if annotation is parameter.empty:
|
|
params[name] = parameter
|
|
continue
|
|
if annotation is None:
|
|
params[name] = parameter.replace(annotation=type(None))
|
|
continue
|
|
|
|
annotation = eval_annotation(annotation, globalns, globalns, cache)
|
|
if annotation is Greedy:
|
|
raise TypeError("Unparameterized Greedy[...] is disallowed in signature.")
|
|
|
|
params[name] = parameter.replace(annotation=annotation)
|
|
|
|
return params, descriptions
|
|
|
|
|
|
def wrap_callback(coro):
|
|
@functools.wraps(coro)
|
|
async def wrapped(*args, **kwargs):
|
|
try:
|
|
ret = await coro(*args, **kwargs)
|
|
except CommandError:
|
|
raise
|
|
except asyncio.CancelledError:
|
|
return
|
|
except Exception as exc:
|
|
raise CommandInvokeError(exc) from exc
|
|
return ret
|
|
|
|
return wrapped
|
|
|
|
|
|
def hooked_wrapped_callback(command, ctx, coro):
|
|
@functools.wraps(coro)
|
|
async def wrapped(*args, **kwargs):
|
|
try:
|
|
ret = await coro(*args, **kwargs)
|
|
except CommandError:
|
|
ctx.command_failed = True
|
|
raise
|
|
except asyncio.CancelledError:
|
|
ctx.command_failed = True
|
|
return
|
|
except Exception as exc:
|
|
ctx.command_failed = True
|
|
raise CommandInvokeError(exc) from exc
|
|
finally:
|
|
if command._max_concurrency is not None:
|
|
await command._max_concurrency.release(ctx)
|
|
|
|
await command.call_after_hooks(ctx)
|
|
return ret
|
|
|
|
return wrapped
|
|
|
|
|
|
class _CaseInsensitiveDict(dict):
|
|
def __contains__(self, k):
|
|
return super().__contains__(k.casefold())
|
|
|
|
def __delitem__(self, k):
|
|
return super().__delitem__(k.casefold())
|
|
|
|
def __getitem__(self, k):
|
|
return super().__getitem__(k.casefold())
|
|
|
|
def get(self, k, default=None):
|
|
return super().get(k.casefold(), default)
|
|
|
|
def pop(self, k, default=None):
|
|
return super().pop(k.casefold(), default)
|
|
|
|
def __setitem__(self, k, v):
|
|
super().__setitem__(k.casefold(), v)
|
|
|
|
|
|
class Command(_BaseCommand, Generic[CogT, P, T]):
|
|
r"""A class that implements the protocol for a bot text command.
|
|
|
|
These are not created manually, instead they are created via the
|
|
decorator or functional interface.
|
|
|
|
Attributes
|
|
-----------
|
|
name: :class:`str`
|
|
The name of the command.
|
|
callback: :ref:`coroutine <coroutine>`
|
|
The coroutine that is executed when the command is called.
|
|
help: Optional[:class:`str`]
|
|
The long help text for the command.
|
|
brief: Optional[:class:`str`]
|
|
The short help text for the command.
|
|
usage: Optional[:class:`str`]
|
|
A replacement for arguments in the default help text.
|
|
aliases: Union[List[:class:`str`], Tuple[:class:`str`]]
|
|
The list of aliases the command can be invoked under.
|
|
enabled: :class:`bool`
|
|
A boolean that indicates if the command is currently enabled.
|
|
If the command is invoked while it is disabled, then
|
|
:exc:`.DisabledCommand` is raised to the :func:`.on_command_error`
|
|
event. Defaults to ``True``.
|
|
parent: Optional[:class:`Group`]
|
|
The parent group that this command belongs to. ``None`` if there
|
|
isn't one.
|
|
cog: Optional[:class:`Cog`]
|
|
The cog that this command belongs to. ``None`` if there isn't one.
|
|
checks: List[Callable[[:class:`.Context`], :class:`bool`]]
|
|
A list of predicates that verifies if the command could be executed
|
|
with the given :class:`.Context` as the sole parameter. If an exception
|
|
is necessary to be thrown to signal failure, then one inherited from
|
|
:exc:`.CommandError` should be used. Note that if the checks fail then
|
|
:exc:`.CheckFailure` exception is raised to the :func:`.on_command_error`
|
|
event.
|
|
description: :class:`str`
|
|
The message prefixed into the default help command.
|
|
hidden: :class:`bool`
|
|
If ``True``\, the default help command does not show this in the
|
|
help output.
|
|
rest_is_raw: :class:`bool`
|
|
If ``False`` and a keyword-only argument is provided then the keyword
|
|
only argument is stripped and handled as if it was a regular argument
|
|
that handles :exc:`.MissingRequiredArgument` and default values in a
|
|
regular matter rather than passing the rest completely raw. If ``True``
|
|
then the keyword-only argument will pass in the rest of the arguments
|
|
in a completely raw matter. Defaults to ``False``.
|
|
invoked_subcommand: Optional[:class:`Command`]
|
|
The subcommand that was invoked, if any.
|
|
require_var_positional: :class:`bool`
|
|
If ``True`` and a variadic positional argument is specified, requires
|
|
the user to specify at least one argument. Defaults to ``False``.
|
|
|
|
.. versionadded:: 1.5
|
|
|
|
ignore_extra: :class:`bool`
|
|
If ``True``\, ignores extraneous strings passed to a command if all its
|
|
requirements are met (e.g. ``?foo a b c`` when only expecting ``a``
|
|
and ``b``). Otherwise :func:`.on_command_error` and local error handlers
|
|
are called with :exc:`.TooManyArguments`. Defaults to ``True``.
|
|
cooldown_after_parsing: :class:`bool`
|
|
If ``True``\, cooldown processing is done after argument parsing,
|
|
which calls converters. If ``False`` then cooldown processing is done
|
|
first and then the converters are called second. Defaults to ``False``.
|
|
extras: :class:`dict`
|
|
A dict of user provided extras to attach to the Command.
|
|
|
|
.. versionadded:: 2.0
|
|
|
|
.. note::
|
|
This object may be copied by the library.
|
|
message_command: Optional[:class:`bool`]
|
|
Whether to process this command based on messages.
|
|
|
|
This overwrites the global ``message_commands`` parameter of :class:`.Bot`.
|
|
|
|
.. versionadded:: 2.0
|
|
slash_command: Optional[:class:`bool`]
|
|
Whether to upload and process this command as a slash command.
|
|
|
|
This overwrites the global ``slash_commands`` parameter of :class:`.Bot`.
|
|
|
|
.. versionadded:: 2.0
|
|
slash_command_guilds: Optional[List[:class:`int`]]
|
|
If this is set, only upload this slash command to these guild IDs.
|
|
|
|
This overwrites the global ``slash_command_guilds`` parameter of :class:`.Bot`.
|
|
|
|
.. versionadded:: 2.0
|
|
|
|
option_descriptions: Dict[:class:`str`, :class:`str`]
|
|
The unpacked option descriptions from :class:`.Option`.
|
|
|
|
.. versionadded:: 2.0
|
|
"""
|
|
__original_kwargs__: Dict[str, Any]
|
|
_max_concurrency: Optional[MaxConcurrency]
|
|
|
|
def __new__(cls: Type[CommandT], *args: Any, **kwargs: Any) -> CommandT:
|
|
# if you're wondering why this is done, it's because we need to ensure
|
|
# we have a complete original copy of **kwargs even for classes that
|
|
# mess with it by popping before delegating to the subclass __init__.
|
|
# In order to do this, we need to control the instance creation and
|
|
# inject the original kwargs through __new__ rather than doing it
|
|
# inside __init__.
|
|
self = super().__new__(cls)
|
|
|
|
# we do a shallow copy because it's probably the most common use case.
|
|
# this could potentially break if someone modifies a list or something
|
|
# while it's in movement, but for now this is the cheapest and
|
|
# fastest way to do what we want.
|
|
self.__original_kwargs__ = kwargs.copy()
|
|
return self
|
|
|
|
def __init__(
|
|
self,
|
|
func: Union[
|
|
Callable[Concatenate[CogT, ContextT, P], Coro[T]],
|
|
Callable[Concatenate[ContextT, P], Coro[T]],
|
|
],
|
|
**kwargs: Any,
|
|
):
|
|
if not asyncio.iscoroutinefunction(func):
|
|
raise TypeError("Callback must be a coroutine.")
|
|
|
|
name = kwargs.get("name") or func.__name__
|
|
if not isinstance(name, str):
|
|
raise TypeError("Name of a command must be a string.")
|
|
self.name: str = name
|
|
|
|
self.callback = func
|
|
self.enabled: bool = kwargs.get("enabled", True)
|
|
|
|
self.slash_command: Optional[bool] = kwargs.get("slash_command", None)
|
|
self.message_command: Optional[bool] = kwargs.get("message_command", None)
|
|
self.slash_command_guilds: Optional[Iterable[int]] = kwargs.get("slash_command_guilds", None)
|
|
|
|
help_doc = kwargs.get("help")
|
|
if help_doc is not None:
|
|
help_doc = inspect.cleandoc(help_doc)
|
|
else:
|
|
help_doc = inspect.getdoc(func)
|
|
if isinstance(help_doc, bytes):
|
|
help_doc = help_doc.decode("utf-8")
|
|
|
|
self.help: Optional[str] = help_doc
|
|
|
|
self.brief: Optional[str] = kwargs.get("brief")
|
|
self.usage: Optional[str] = kwargs.get("usage")
|
|
self.rest_is_raw: bool = kwargs.get("rest_is_raw", False)
|
|
self.aliases: Union[List[str], Tuple[str]] = kwargs.get("aliases", [])
|
|
self.extras: Dict[str, Any] = kwargs.get("extras", {})
|
|
|
|
if not isinstance(self.aliases, (list, tuple)):
|
|
raise TypeError("Aliases of a command must be a list or a tuple of strings.")
|
|
|
|
self.description: str = inspect.cleandoc(kwargs.get("description", ""))
|
|
self.hidden: bool = kwargs.get("hidden", False)
|
|
|
|
if hasattr(func, "__command_attrs__"):
|
|
command_attrs: Dict[str, Any] = func.__command_attrs__
|
|
else:
|
|
command_attrs = {}
|
|
|
|
try:
|
|
checks = func.__commands_checks__
|
|
checks.reverse()
|
|
except AttributeError:
|
|
checks = kwargs.get("checks", [])
|
|
|
|
try:
|
|
cooldown = command_attrs.pop("cooldown")
|
|
except KeyError:
|
|
cooldown = kwargs.get("cooldown")
|
|
|
|
if cooldown is None:
|
|
buckets = CooldownMapping(cooldown, BucketType.default)
|
|
elif isinstance(cooldown, CooldownMapping):
|
|
buckets = cooldown
|
|
else:
|
|
raise TypeError("Cooldown must be a an instance of CooldownMapping or None.")
|
|
|
|
self.checks: List[Check] = checks
|
|
self._buckets: CooldownMapping = buckets
|
|
self._max_concurrency = kwargs.get("max_concurrency")
|
|
|
|
self.require_var_positional: bool = kwargs.get("require_var_positional", False)
|
|
self.ignore_extra: bool = kwargs.get("ignore_extra", True)
|
|
self.cooldown_after_parsing: bool = kwargs.get("cooldown_after_parsing", False)
|
|
self.cog: Optional[CogT] = None
|
|
|
|
# bandaid for the fact that sometimes parent can be the bot instance
|
|
parent = kwargs.get("parent")
|
|
self.parent: Optional[GroupMixin] = parent if isinstance(parent, _BaseCommand) else None # type: ignore
|
|
if self.slash_command_guilds is not None and self.parent is not None:
|
|
raise ValueError(
|
|
"Cannot set specific guilds for a subcommand. They are inherited from the top level group."
|
|
)
|
|
|
|
self._before_invoke: Optional[Hook] = None
|
|
try:
|
|
before_invoke = command_attrs.pop("before_invoke")
|
|
except KeyError:
|
|
pass
|
|
else:
|
|
self.before_invoke(before_invoke)
|
|
|
|
self._after_invoke: Optional[Hook] = None
|
|
try:
|
|
after_invoke = command_attrs.pop("after_invoke")
|
|
except KeyError:
|
|
pass
|
|
else:
|
|
self.after_invoke(after_invoke)
|
|
|
|
# Handle user provided command attrs
|
|
self._update_attrs(**command_attrs)
|
|
|
|
@property
|
|
def callback(
|
|
self,
|
|
) -> Union[Callable[Concatenate[CogT, Context, P], Coro[T]], Callable[Concatenate[Context, P], Coro[T]],]:
|
|
return self._callback
|
|
|
|
@callback.setter
|
|
def callback(
|
|
self,
|
|
function: Union[
|
|
Callable[Concatenate[CogT, Context, P], Coro[T]],
|
|
Callable[Concatenate[Context, P], Coro[T]],
|
|
],
|
|
) -> None:
|
|
self._callback = function
|
|
unwrap = unwrap_function(function)
|
|
self.module = unwrap.__module__
|
|
|
|
try:
|
|
globalns = unwrap.__globals__
|
|
except AttributeError:
|
|
globalns = {}
|
|
|
|
self.params, self.option_descriptions = get_signature_parameters(function, globalns)
|
|
|
|
def _update_attrs(self, **command_attrs: Any):
|
|
for key, value in command_attrs.items():
|
|
setattr(self, key, value)
|
|
|
|
def add_check(self, func: Check) -> None:
|
|
"""Adds a check to the command.
|
|
|
|
This is the non-decorator interface to :func:`.check`.
|
|
|
|
.. versionadded:: 1.3
|
|
|
|
Parameters
|
|
-----------
|
|
func
|
|
The function that will be used as a check.
|
|
"""
|
|
|
|
self.checks.append(func)
|
|
|
|
def remove_check(self, func: Check) -> None:
|
|
"""Removes a check from the command.
|
|
|
|
This function is idempotent and will not raise an exception
|
|
if the function is not in the command's checks.
|
|
|
|
.. versionadded:: 1.3
|
|
|
|
Parameters
|
|
-----------
|
|
func
|
|
The function to remove from the checks.
|
|
"""
|
|
|
|
try:
|
|
self.checks.remove(func)
|
|
except ValueError:
|
|
pass
|
|
|
|
def update(self, **kwargs: Any) -> None:
|
|
"""Updates :class:`Command` instance with updated attribute.
|
|
|
|
This works similarly to the :func:`.command` decorator in terms
|
|
of parameters in that they are passed to the :class:`Command` or
|
|
subclass constructors, sans the name and callback.
|
|
"""
|
|
self.__init__(self.callback, **dict(self.__original_kwargs__, **kwargs))
|
|
|
|
async def __call__(self, context: Context, *args: P.args, **kwargs: P.kwargs) -> T:
|
|
"""|coro|
|
|
|
|
Calls the internal callback that the command holds.
|
|
|
|
.. note::
|
|
|
|
This bypasses all mechanisms -- including checks, converters,
|
|
invoke hooks, cooldowns, etc. You must take care to pass
|
|
the proper arguments and types to this function.
|
|
|
|
.. versionadded:: 1.3
|
|
"""
|
|
if self.cog is not None:
|
|
return await self.callback(self.cog, context, *args, **kwargs) # type: ignore
|
|
else:
|
|
return await self.callback(context, *args, **kwargs) # type: ignore
|
|
|
|
def _ensure_assignment_on_copy(self, other: CommandT) -> CommandT:
|
|
other._before_invoke = self._before_invoke
|
|
other._after_invoke = self._after_invoke
|
|
if self.checks != other.checks:
|
|
other.checks = self.checks.copy()
|
|
if self._buckets.valid and not other._buckets.valid:
|
|
other._buckets = self._buckets.copy()
|
|
if self._max_concurrency != other._max_concurrency:
|
|
# _max_concurrency won't be None at this point
|
|
other._max_concurrency = self._max_concurrency.copy() # type: ignore
|
|
|
|
try:
|
|
other.on_error = self.on_error
|
|
except AttributeError:
|
|
pass
|
|
return other
|
|
|
|
def copy(self: CommandT) -> CommandT:
|
|
"""Creates a copy of this command.
|
|
|
|
Returns
|
|
--------
|
|
:class:`Command`
|
|
A new instance of this command.
|
|
"""
|
|
ret = self.__class__(self.callback, **self.__original_kwargs__)
|
|
return self._ensure_assignment_on_copy(ret)
|
|
|
|
def _update_copy(self: CommandT, kwargs: Dict[str, Any]) -> CommandT:
|
|
if kwargs:
|
|
kw = kwargs.copy()
|
|
kw.update(self.__original_kwargs__)
|
|
copy = self.__class__(self.callback, **kw)
|
|
return self._ensure_assignment_on_copy(copy)
|
|
else:
|
|
return self.copy()
|
|
|
|
async def dispatch_error(self, ctx: Context, error: Exception) -> None:
|
|
ctx.command_failed = True
|
|
cog = self.cog
|
|
try:
|
|
coro = self.on_error
|
|
except AttributeError:
|
|
pass
|
|
else:
|
|
injected = wrap_callback(coro)
|
|
if cog is not None:
|
|
await injected(cog, ctx, error)
|
|
else:
|
|
await injected(ctx, error)
|
|
|
|
try:
|
|
if cog is not None:
|
|
local = Cog._get_overridden_method(cog.cog_command_error)
|
|
if local is not None:
|
|
wrapped = wrap_callback(local)
|
|
await wrapped(ctx, error)
|
|
finally:
|
|
ctx.bot.dispatch("command_error", ctx, error)
|
|
|
|
async def transform(self, ctx: Context, param: inspect.Parameter) -> Any:
|
|
if param in ctx._ignored_params:
|
|
# in a slash command, we need a way to mark a param as default so ctx._ignored_params is used
|
|
return param.default if param.default is not param.empty else None
|
|
|
|
required = param.default is param.empty
|
|
converter = get_converter(param)
|
|
consume_rest_is_special = param.kind == param.KEYWORD_ONLY and not self.rest_is_raw
|
|
view = ctx.view
|
|
view.skip_ws()
|
|
|
|
# The greedy converter is simple -- it keeps going until it fails in which case,
|
|
# it undos the view ready for the next parameter to use instead
|
|
if isinstance(converter, Greedy):
|
|
if param.kind in (param.POSITIONAL_OR_KEYWORD, param.POSITIONAL_ONLY):
|
|
return await self._transform_greedy_pos(ctx, param, required, converter.converter)
|
|
elif param.kind == param.VAR_POSITIONAL:
|
|
return await self._transform_greedy_var_pos(ctx, param, converter.converter)
|
|
else:
|
|
# if we're here, then it's a KEYWORD_ONLY param type
|
|
# since this is mostly useless, we'll helpfully transform Greedy[X]
|
|
# into just X and do the parsing that way.
|
|
converter = converter.converter
|
|
|
|
if view.eof:
|
|
if param.kind == param.VAR_POSITIONAL:
|
|
raise RuntimeError() # break the loop
|
|
if required:
|
|
if self._is_typing_optional(param.annotation):
|
|
return None
|
|
if hasattr(converter, "__commands_is_flag__") and converter._can_be_constructible():
|
|
return await converter._construct_default(ctx)
|
|
raise MissingRequiredArgument(param)
|
|
return param.default
|
|
|
|
previous = view.index
|
|
if consume_rest_is_special:
|
|
argument = view.read_rest().strip()
|
|
else:
|
|
try:
|
|
argument = view.get_quoted_word()
|
|
except ArgumentParsingError as exc:
|
|
if self._is_typing_optional(param.annotation):
|
|
view.index = previous
|
|
return None
|
|
else:
|
|
raise exc
|
|
view.previous = previous
|
|
|
|
# type-checker fails to narrow argument
|
|
return await run_converters(ctx, converter, argument, param) # type: ignore
|
|
|
|
async def _transform_greedy_pos(
|
|
self, ctx: Context, param: inspect.Parameter, required: bool, converter: Any
|
|
) -> Any:
|
|
view = ctx.view
|
|
result = []
|
|
while not view.eof:
|
|
# for use with a manual undo
|
|
previous = view.index
|
|
|
|
view.skip_ws()
|
|
try:
|
|
argument = view.get_quoted_word()
|
|
value = await run_converters(ctx, converter, argument, param) # type: ignore
|
|
except (CommandError, ArgumentParsingError):
|
|
view.index = previous
|
|
break
|
|
else:
|
|
result.append(value)
|
|
|
|
if not result and not required:
|
|
return param.default
|
|
return result
|
|
|
|
async def _transform_greedy_var_pos(self, ctx: Context, param: inspect.Parameter, converter: Any) -> Any:
|
|
view = ctx.view
|
|
previous = view.index
|
|
try:
|
|
argument = view.get_quoted_word()
|
|
value = await run_converters(ctx, converter, argument, param) # type: ignore
|
|
except (CommandError, ArgumentParsingError):
|
|
view.index = previous
|
|
raise RuntimeError() from None # break loop
|
|
else:
|
|
return value
|
|
|
|
@property
|
|
def clean_params(self) -> Dict[str, inspect.Parameter]:
|
|
"""Dict[:class:`str`, :class:`inspect.Parameter`]:
|
|
Retrieves the parameter dictionary without the context or self parameters.
|
|
|
|
Useful for inspecting signature.
|
|
"""
|
|
result = self.params.copy()
|
|
if self.cog is not None:
|
|
# first parameter is self
|
|
try:
|
|
del result[next(iter(result))]
|
|
except StopIteration:
|
|
raise ValueError("missing 'self' parameter") from None
|
|
|
|
try:
|
|
# first/second parameter is context
|
|
del result[next(iter(result))]
|
|
except StopIteration:
|
|
raise ValueError("missing 'context' parameter") from None
|
|
|
|
return result
|
|
|
|
@property
|
|
def full_parent_name(self) -> str:
|
|
""":class:`str`: Retrieves the fully qualified parent command name.
|
|
|
|
This the base command name required to execute it. For example,
|
|
in ``?one two three`` the parent name would be ``one two``.
|
|
"""
|
|
entries = []
|
|
command = self
|
|
# command.parent is type-hinted as GroupMixin some attributes are resolved via MRO
|
|
while command.parent is not None: # type: ignore
|
|
command = command.parent # type: ignore
|
|
entries.append(command.name) # type: ignore
|
|
|
|
return " ".join(reversed(entries))
|
|
|
|
@property
|
|
def parents(self) -> List[Group]:
|
|
"""List[:class:`Group`]: Retrieves the parents of this command.
|
|
|
|
If the command has no parents then it returns an empty :class:`list`.
|
|
|
|
For example in commands ``?a b c test``, the parents are ``[c, b, a]``.
|
|
|
|
.. versionadded:: 1.1
|
|
"""
|
|
entries = []
|
|
command = self
|
|
while command.parent is not None: # type: ignore
|
|
command = command.parent # type: ignore
|
|
entries.append(command)
|
|
|
|
return entries
|
|
|
|
@property
|
|
def root_parent(self) -> Optional[Group]:
|
|
"""Optional[:class:`Group`]: Retrieves the root parent of this command.
|
|
|
|
If the command has no parents then it returns ``None``.
|
|
|
|
For example in commands ``?a b c test``, the root parent is ``a``.
|
|
"""
|
|
if not self.parent:
|
|
return None
|
|
return self.parents[-1]
|
|
|
|
@property
|
|
def qualified_name(self) -> str:
|
|
""":class:`str`: Retrieves the fully qualified command name.
|
|
|
|
This is the full parent name with the command name as well.
|
|
For example, in ``?one two three`` the qualified name would be
|
|
``one two three``.
|
|
"""
|
|
|
|
parent = self.full_parent_name
|
|
if parent:
|
|
return parent + " " + self.name
|
|
else:
|
|
return self.name
|
|
|
|
def __str__(self) -> str:
|
|
return self.qualified_name
|
|
|
|
async def _parse_arguments(self, ctx: Context) -> None:
|
|
ctx.args = [ctx] if self.cog is None else [self.cog, ctx]
|
|
ctx.kwargs = {}
|
|
args = ctx.args
|
|
kwargs = ctx.kwargs
|
|
|
|
view = ctx.view
|
|
iterator = iter(self.params.items())
|
|
|
|
if self.cog is not None:
|
|
# we have 'self' as the first parameter so just advance
|
|
# the iterator and resume parsing
|
|
try:
|
|
next(iterator)
|
|
except StopIteration:
|
|
raise discord.ClientException(f'Callback for {self.name} command is missing "self" parameter.')
|
|
|
|
# next we have the 'ctx' as the next parameter
|
|
try:
|
|
next(iterator)
|
|
except StopIteration:
|
|
raise discord.ClientException(f'Callback for {self.name} command is missing "ctx" parameter.')
|
|
|
|
for name, param in iterator:
|
|
ctx.current_parameter = param
|
|
if param.kind in (param.POSITIONAL_OR_KEYWORD, param.POSITIONAL_ONLY):
|
|
transformed = await self.transform(ctx, param)
|
|
args.append(transformed)
|
|
elif param.kind == param.KEYWORD_ONLY:
|
|
# kwarg only param denotes "consume rest" semantics
|
|
if self.rest_is_raw:
|
|
converter = get_converter(param)
|
|
argument = view.read_rest()
|
|
kwargs[name] = await run_converters(ctx, converter, argument, param)
|
|
else:
|
|
kwargs[name] = await self.transform(ctx, param)
|
|
break
|
|
elif param.kind == param.VAR_POSITIONAL:
|
|
if view.eof and self.require_var_positional:
|
|
raise MissingRequiredArgument(param)
|
|
while not view.eof:
|
|
try:
|
|
transformed = await self.transform(ctx, param)
|
|
args.append(transformed)
|
|
except RuntimeError:
|
|
break
|
|
|
|
if not self.ignore_extra and not view.eof:
|
|
raise TooManyArguments("Too many arguments passed to " + self.qualified_name)
|
|
|
|
async def call_before_hooks(self, ctx: Context) -> None:
|
|
# now that we're done preparing we can call the pre-command hooks
|
|
# first, call the command local hook:
|
|
cog = self.cog
|
|
if self._before_invoke is not None:
|
|
# should be cog if @commands.before_invoke is used
|
|
instance = getattr(self._before_invoke, "__self__", cog)
|
|
# __self__ only exists for methods, not functions
|
|
# however, if @command.before_invoke is used, it will be a function
|
|
if instance:
|
|
await self._before_invoke(instance, ctx) # type: ignore
|
|
else:
|
|
await self._before_invoke(ctx) # type: ignore
|
|
|
|
# call the cog local hook if applicable:
|
|
if cog is not None:
|
|
hook = Cog._get_overridden_method(cog.cog_before_invoke)
|
|
if hook is not None:
|
|
await hook(ctx)
|
|
|
|
# call the bot global hook if necessary
|
|
hook = ctx.bot._before_invoke
|
|
if hook is not None:
|
|
await hook(ctx)
|
|
|
|
async def call_after_hooks(self, ctx: Context) -> None:
|
|
cog = self.cog
|
|
if self._after_invoke is not None:
|
|
instance = getattr(self._after_invoke, "__self__", cog)
|
|
if instance:
|
|
await self._after_invoke(instance, ctx) # type: ignore
|
|
else:
|
|
await self._after_invoke(ctx) # type: ignore
|
|
|
|
# call the cog local hook if applicable:
|
|
if cog is not None:
|
|
hook = Cog._get_overridden_method(cog.cog_after_invoke)
|
|
if hook is not None:
|
|
await hook(ctx)
|
|
|
|
hook = ctx.bot._after_invoke
|
|
if hook is not None:
|
|
await hook(ctx)
|
|
|
|
def _prepare_cooldowns(self, ctx: Context) -> None:
|
|
if self._buckets.valid:
|
|
dt = ctx.message.edited_at or ctx.message.created_at
|
|
current = dt.replace(tzinfo=datetime.timezone.utc).timestamp()
|
|
bucket = self._buckets.get_bucket(ctx.message, current)
|
|
if bucket is not None:
|
|
retry_after = bucket.update_rate_limit(current)
|
|
if retry_after:
|
|
raise CommandOnCooldown(bucket, retry_after, self._buckets.type) # type: ignore
|
|
|
|
async def prepare(self, ctx: Context) -> None:
|
|
ctx.command = self
|
|
|
|
if not await self.can_run(ctx):
|
|
raise CheckFailure(f"The check functions for command {self.qualified_name} failed.")
|
|
|
|
if self._max_concurrency is not None:
|
|
# For this application, context can be duck-typed as a Message
|
|
await self._max_concurrency.acquire(ctx) # type: ignore
|
|
|
|
try:
|
|
if self.cooldown_after_parsing:
|
|
await self._parse_arguments(ctx)
|
|
self._prepare_cooldowns(ctx)
|
|
else:
|
|
self._prepare_cooldowns(ctx)
|
|
await self._parse_arguments(ctx)
|
|
|
|
await self.call_before_hooks(ctx)
|
|
except:
|
|
if self._max_concurrency is not None:
|
|
await self._max_concurrency.release(ctx) # type: ignore
|
|
raise
|
|
|
|
def is_on_cooldown(self, ctx: Context) -> bool:
|
|
"""Checks whether the command is currently on cooldown.
|
|
|
|
Parameters
|
|
-----------
|
|
ctx: :class:`.Context`
|
|
The invocation context to use when checking the commands cooldown status.
|
|
|
|
Returns
|
|
--------
|
|
:class:`bool`
|
|
A boolean indicating if the command is on cooldown.
|
|
"""
|
|
if not self._buckets.valid:
|
|
return False
|
|
|
|
bucket = self._buckets.get_bucket(ctx.message)
|
|
dt = ctx.message.edited_at or ctx.message.created_at
|
|
current = dt.replace(tzinfo=datetime.timezone.utc).timestamp()
|
|
return bucket.get_tokens(current) == 0
|
|
|
|
def reset_cooldown(self, ctx: Context) -> None:
|
|
"""Resets the cooldown on this command.
|
|
|
|
Parameters
|
|
-----------
|
|
ctx: :class:`.Context`
|
|
The invocation context to reset the cooldown under.
|
|
"""
|
|
if self._buckets.valid:
|
|
bucket = self._buckets.get_bucket(ctx.message)
|
|
bucket.reset()
|
|
|
|
def get_cooldown_retry_after(self, ctx: Context) -> float:
|
|
"""Retrieves the amount of seconds before this command can be tried again.
|
|
|
|
.. versionadded:: 1.4
|
|
|
|
Parameters
|
|
-----------
|
|
ctx: :class:`.Context`
|
|
The invocation context to retrieve the cooldown from.
|
|
|
|
Returns
|
|
--------
|
|
:class:`float`
|
|
The amount of time left on this command's cooldown in seconds.
|
|
If this is ``0.0`` then the command isn't on cooldown.
|
|
"""
|
|
if self._buckets.valid:
|
|
bucket = self._buckets.get_bucket(ctx.message)
|
|
dt = ctx.message.edited_at or ctx.message.created_at
|
|
current = dt.replace(tzinfo=datetime.timezone.utc).timestamp()
|
|
return bucket.get_retry_after(current)
|
|
|
|
return 0.0
|
|
|
|
async def invoke(self, ctx: Context) -> None:
|
|
await self.prepare(ctx)
|
|
|
|
# terminate the invoked_subcommand chain.
|
|
# since we're in a regular command (and not a group) then
|
|
# the invoked subcommand is None.
|
|
ctx.invoked_subcommand = None
|
|
ctx.subcommand_passed = None
|
|
injected = hooked_wrapped_callback(self, ctx, self.callback)
|
|
await injected(*ctx.args, **ctx.kwargs)
|
|
|
|
async def reinvoke(self, ctx: Context, *, call_hooks: bool = False) -> None:
|
|
ctx.command = self
|
|
await self._parse_arguments(ctx)
|
|
|
|
if call_hooks:
|
|
await self.call_before_hooks(ctx)
|
|
|
|
ctx.invoked_subcommand = None
|
|
try:
|
|
await self.callback(*ctx.args, **ctx.kwargs) # type: ignore
|
|
except:
|
|
ctx.command_failed = True
|
|
raise
|
|
finally:
|
|
if call_hooks:
|
|
await self.call_after_hooks(ctx)
|
|
|
|
def error(self, coro: ErrorT) -> ErrorT:
|
|
"""A decorator that registers a coroutine as a local error handler.
|
|
|
|
A local error handler is an :func:`.on_command_error` event limited to
|
|
a single command. However, the :func:`.on_command_error` is still
|
|
invoked afterwards as the catch-all.
|
|
|
|
Parameters
|
|
-----------
|
|
coro: :ref:`coroutine <coroutine>`
|
|
The coroutine to register as the local error handler.
|
|
|
|
Raises
|
|
-------
|
|
TypeError
|
|
The coroutine passed is not actually a coroutine.
|
|
"""
|
|
|
|
if not asyncio.iscoroutinefunction(coro):
|
|
raise TypeError("The error handler must be a coroutine.")
|
|
|
|
self.on_error: Error = coro
|
|
return coro
|
|
|
|
def has_error_handler(self) -> bool:
|
|
""":class:`bool`: Checks whether the command has an error handler registered.
|
|
|
|
.. versionadded:: 1.7
|
|
"""
|
|
return hasattr(self, "on_error")
|
|
|
|
def before_invoke(self, coro: HookT) -> HookT:
|
|
"""A decorator that registers a coroutine as a pre-invoke hook.
|
|
|
|
A pre-invoke hook is called directly before the command is
|
|
called. This makes it a useful function to set up database
|
|
connections or any type of set up required.
|
|
|
|
This pre-invoke hook takes a sole parameter, a :class:`.Context`.
|
|
|
|
See :meth:`.Bot.before_invoke` for more info.
|
|
|
|
Parameters
|
|
-----------
|
|
coro: :ref:`coroutine <coroutine>`
|
|
The coroutine to register as the pre-invoke hook.
|
|
|
|
Raises
|
|
-------
|
|
TypeError
|
|
The coroutine passed is not actually a coroutine.
|
|
"""
|
|
if not asyncio.iscoroutinefunction(coro):
|
|
raise TypeError("The pre-invoke hook must be a coroutine.")
|
|
|
|
self._before_invoke = coro
|
|
return coro
|
|
|
|
def after_invoke(self, coro: HookT) -> HookT:
|
|
"""A decorator that registers a coroutine as a post-invoke hook.
|
|
|
|
A post-invoke hook is called directly after the command is
|
|
called. This makes it a useful function to clean-up database
|
|
connections or any type of clean up required.
|
|
|
|
This post-invoke hook takes a sole parameter, a :class:`.Context`.
|
|
|
|
See :meth:`.Bot.after_invoke` for more info.
|
|
|
|
Parameters
|
|
-----------
|
|
coro: :ref:`coroutine <coroutine>`
|
|
The coroutine to register as the post-invoke hook.
|
|
|
|
Raises
|
|
-------
|
|
TypeError
|
|
The coroutine passed is not actually a coroutine.
|
|
"""
|
|
if not asyncio.iscoroutinefunction(coro):
|
|
raise TypeError("The post-invoke hook must be a coroutine.")
|
|
|
|
self._after_invoke = coro
|
|
return coro
|
|
|
|
@property
|
|
def cog_name(self) -> Optional[str]:
|
|
"""Optional[:class:`str`]: The name of the cog this command belongs to, if any."""
|
|
return type(self.cog).__cog_name__ if self.cog is not None else None
|
|
|
|
@property
|
|
def short_doc(self) -> str:
|
|
""":class:`str`: Gets the "short" documentation of a command.
|
|
|
|
By default, this is the :attr:`.brief` attribute.
|
|
If that lookup leads to an empty string then the first line of the
|
|
:attr:`.help` attribute is used instead.
|
|
"""
|
|
if self.brief is not None:
|
|
return self.brief
|
|
if self.help is not None:
|
|
return self.help.split("\n", 1)[0]
|
|
return ""
|
|
|
|
def _is_typing_optional(self, annotation: Union[T, Optional[T]]) -> TypeGuard[Optional[T]]:
|
|
return getattr(annotation, "__origin__", None) is Union and type(None) in annotation.__args__ # type: ignore
|
|
|
|
@property
|
|
def signature(self) -> str:
|
|
""":class:`str`: Returns a POSIX-like signature useful for help command output."""
|
|
if self.usage is not None:
|
|
return self.usage
|
|
|
|
params = self.clean_params
|
|
if not params:
|
|
return ""
|
|
|
|
result = []
|
|
for name, param in params.items():
|
|
greedy = isinstance(param.annotation, Greedy)
|
|
optional = False # postpone evaluation of if it's an optional argument
|
|
|
|
# for typing.Literal[...], typing.Optional[typing.Literal[...]], and Greedy[typing.Literal[...]], the
|
|
# parameter signature is a literal list of it's values
|
|
annotation = param.annotation.converter if greedy else param.annotation
|
|
origin = getattr(annotation, "__origin__", None)
|
|
if not greedy and origin is Union:
|
|
none_cls = type(None)
|
|
union_args = annotation.__args__
|
|
optional = union_args[-1] is none_cls
|
|
if len(union_args) == 2 and optional:
|
|
annotation = union_args[0]
|
|
origin = getattr(annotation, "__origin__", None)
|
|
|
|
if origin is Literal:
|
|
name = "|".join(f'"{v}"' if isinstance(v, str) else str(v) for v in annotation.__args__)
|
|
if param.default is not param.empty:
|
|
# We don't want None or '' to trigger the [name=value] case and instead it should
|
|
# do [name] since [name=None] or [name=] are not exactly useful for the user.
|
|
should_print = param.default if isinstance(param.default, str) else param.default is not None
|
|
if should_print:
|
|
result.append(f"[{name}={param.default}]" if not greedy else f"[{name}={param.default}]...")
|
|
continue
|
|
else:
|
|
result.append(f"[{name}]")
|
|
|
|
elif param.kind == param.VAR_POSITIONAL:
|
|
if self.require_var_positional:
|
|
result.append(f"<{name}...>")
|
|
else:
|
|
result.append(f"[{name}...]")
|
|
elif greedy:
|
|
result.append(f"[{name}]...")
|
|
elif optional:
|
|
result.append(f"[{name}]")
|
|
else:
|
|
result.append(f"<{name}>")
|
|
|
|
return " ".join(result)
|
|
|
|
async def can_run(self, ctx: Context) -> bool:
|
|
"""|coro|
|
|
|
|
Checks if the command can be executed by checking all the predicates
|
|
inside the :attr:`~Command.checks` attribute. This also checks whether the
|
|
command is disabled.
|
|
|
|
.. versionchanged:: 1.3
|
|
Checks whether the command is disabled or not
|
|
|
|
Parameters
|
|
-----------
|
|
ctx: :class:`.Context`
|
|
The ctx of the command currently being invoked.
|
|
|
|
Raises
|
|
-------
|
|
:class:`CommandError`
|
|
Any command error that was raised during a check call will be propagated
|
|
by this function.
|
|
|
|
Returns
|
|
--------
|
|
:class:`bool`
|
|
A boolean indicating if the command can be invoked.
|
|
"""
|
|
if not self.enabled:
|
|
raise DisabledCommand(f"{self.name} command is disabled")
|
|
|
|
if ctx.interaction is None and (
|
|
self.message_command is False or (self.message_command is None and not ctx.bot.message_commands)
|
|
):
|
|
raise DisabledCommand(f"{self.name} command cannot be run as a message command")
|
|
|
|
if ctx.interaction is not None and (
|
|
self.slash_command is False or (self.slash_command is None and not ctx.bot.slash_commands)
|
|
):
|
|
raise DisabledCommand(f"{self.name} command cannot be run as a slash command")
|
|
|
|
original = ctx.command
|
|
ctx.command = self
|
|
|
|
try:
|
|
if not await ctx.bot.can_run(ctx):
|
|
raise CheckFailure(f"The global check functions for command {self.qualified_name} failed.")
|
|
|
|
cog = self.cog
|
|
if cog is not None:
|
|
local_check = Cog._get_overridden_method(cog.cog_check)
|
|
if local_check is not None:
|
|
ret = await discord.utils.maybe_coroutine(local_check, ctx)
|
|
if not ret:
|
|
return False
|
|
|
|
predicates = self.checks
|
|
if not predicates:
|
|
# since we have no checks, then we just return True.
|
|
return True
|
|
|
|
return await discord.utils.async_all(predicate(ctx) for predicate in predicates) # type: ignore
|
|
finally:
|
|
ctx.command = original
|
|
|
|
def _param_to_options(
|
|
self, name: str, annotation: Any, required: bool, varadic: bool, description: Optional[str] = None
|
|
) -> List[Optional[ApplicationCommandInteractionDataOption]]:
|
|
|
|
if description is not None:
|
|
self.option_descriptions[name] = description
|
|
|
|
description = self.option_descriptions[name]
|
|
origin = getattr(annotation, "__origin__", None)
|
|
|
|
if inspect.isclass(annotation) and issubclass(annotation, FlagConverter):
|
|
return [
|
|
param
|
|
for name, flag in annotation.get_flags().items()
|
|
for param in self._param_to_options(
|
|
name,
|
|
flag.annotation,
|
|
required=flag.required,
|
|
varadic=flag.annotation is tuple,
|
|
description=flag.description if flag.description is not MISSING else None,
|
|
)
|
|
]
|
|
|
|
if varadic:
|
|
annotation = str
|
|
origin = None
|
|
|
|
if not required and origin is Union and annotation.__args__[-1] is type(None):
|
|
# Unpack Optional[T] (Union[T, None]) into just T
|
|
annotation = annotation.__args__[0]
|
|
origin = getattr(annotation, "__origin__", None)
|
|
|
|
option: Dict[str, Any] = {
|
|
"type": 3,
|
|
"name": name,
|
|
"required": required,
|
|
"description": description,
|
|
}
|
|
|
|
if origin is None:
|
|
if not inspect.isclass(annotation):
|
|
annotation = type(annotation)
|
|
|
|
if issubclass(annotation, Converter):
|
|
# If this is a converter, we want to check if it is a native
|
|
# one, in which we can get the original type, eg, (MemberConverter -> Member)
|
|
annotation = REVERSED_CONVERTER_MAPPING.get(annotation, annotation)
|
|
|
|
for python_type, discord_type in application_option_type_lookup.items():
|
|
if issubclass(annotation, python_type):
|
|
option["type"] = discord_type
|
|
# Set channel types
|
|
if discord_type == 7:
|
|
option["channel_types"] = application_option_channel_types[annotation]
|
|
break
|
|
|
|
elif origin is Union:
|
|
if annotation in {Union[discord.Member, discord.Role], Union[MemberConverter, RoleConverter]}:
|
|
option["type"] = 9
|
|
|
|
elif all([arg in application_option_channel_types for arg in annotation.__args__]):
|
|
option["type"] = 7
|
|
option["channel_types"] = [
|
|
discord_value
|
|
for arg in annotation.__args__
|
|
for discord_value in application_option_channel_types[arg]
|
|
]
|
|
|
|
elif origin is Literal:
|
|
literal_values = annotation.__args__
|
|
python_type = type(literal_values[0])
|
|
if (
|
|
all(type(value) == python_type for value in literal_values)
|
|
and python_type in application_option_type_lookup.keys()
|
|
):
|
|
|
|
option["type"] = application_option_type_lookup[python_type]
|
|
option["choices"] = [
|
|
{"name": literal_value, "value": literal_value} for literal_value in annotation.__args__
|
|
]
|
|
|
|
return [option] # type: ignore
|
|
|
|
def to_application_command(self, nested: int = 0) -> Optional[EditApplicationCommand]:
|
|
if self.slash_command is False:
|
|
return
|
|
elif nested == 3:
|
|
raise ApplicationCommandRegistrationError(self, f"{self.qualified_name} is too deeply nested!")
|
|
|
|
payload = {"name": self.name, "description": self.short_doc or "no description", "options": []}
|
|
if nested != 0:
|
|
payload["type"] = 1
|
|
|
|
for name, param in self.clean_params.items():
|
|
options = self._param_to_options(
|
|
name,
|
|
param.annotation if param.annotation is not param.empty else str,
|
|
varadic=param.kind == param.KEYWORD_ONLY or isinstance(param.annotation, Greedy),
|
|
required=(param.default is param.empty and not self._is_typing_optional(param.annotation))
|
|
or param.kind == param.VAR_POSITIONAL,
|
|
)
|
|
if options is not None:
|
|
payload["options"].extend(option for option in options if option is not None)
|
|
|
|
# Now we have all options, make sure required is before optional.
|
|
payload["options"] = sorted(payload["options"], key=itemgetter("required"), reverse=True)
|
|
return payload # type: ignore
|
|
|
|
|
|
class GroupMixin(Generic[CogT]):
|
|
"""A mixin that implements common functionality for classes that behave
|
|
similar to :class:`.Group` and are allowed to register commands.
|
|
|
|
Attributes
|
|
-----------
|
|
all_commands: :class:`dict`
|
|
A mapping of command name to :class:`.Command`
|
|
objects.
|
|
case_insensitive: :class:`bool`
|
|
Whether the commands should be case insensitive. Defaults to ``True``.
|
|
"""
|
|
|
|
def __init__(self, *args: Any, **kwargs: Any) -> None:
|
|
case_insensitive = kwargs.get("case_insensitive", True)
|
|
self.all_commands: Dict[str, Command[CogT, Any, Any]] = _CaseInsensitiveDict() if case_insensitive else {}
|
|
self.case_insensitive: bool = case_insensitive
|
|
super().__init__(*args, **kwargs)
|
|
|
|
@property
|
|
def commands(self) -> Set[Command[CogT, Any, Any]]:
|
|
"""Set[:class:`.Command`]: A unique set of commands without aliases that are registered."""
|
|
return set(self.all_commands.values())
|
|
|
|
def recursively_remove_all_commands(self) -> None:
|
|
for command in self.all_commands.copy().values():
|
|
if isinstance(command, GroupMixin):
|
|
command.recursively_remove_all_commands()
|
|
self.remove_command(command.name)
|
|
|
|
def add_command(self, command: Command[CogT, Any, Any]) -> None:
|
|
"""Adds a :class:`.Command` into the internal list of commands.
|
|
|
|
This is usually not called, instead the :meth:`~.GroupMixin.command` or
|
|
:meth:`~.GroupMixin.group` shortcut decorators are used instead.
|
|
|
|
.. versionchanged:: 1.4
|
|
Raise :exc:`.CommandRegistrationError` instead of generic :exc:`.ClientException`
|
|
|
|
Parameters
|
|
-----------
|
|
command: :class:`Command`
|
|
The command to add.
|
|
|
|
Raises
|
|
-------
|
|
:exc:`.CommandRegistrationError`
|
|
If the command or its alias is already registered by different command.
|
|
TypeError
|
|
If the command passed is not a subclass of :class:`.Command`.
|
|
"""
|
|
|
|
if not isinstance(command, Command):
|
|
raise TypeError("The command passed must be a subclass of Command")
|
|
|
|
if isinstance(self, Command):
|
|
command.parent = self
|
|
|
|
if command.name in self.all_commands:
|
|
raise CommandRegistrationError(command.name)
|
|
|
|
self.all_commands[command.name] = command
|
|
for alias in command.aliases:
|
|
if alias in self.all_commands:
|
|
self.remove_command(command.name)
|
|
raise CommandRegistrationError(alias, alias_conflict=True)
|
|
self.all_commands[alias] = command
|
|
|
|
def remove_command(self, name: str) -> Optional[Command[CogT, Any, Any]]:
|
|
"""Remove a :class:`.Command` from the internal list
|
|
of commands.
|
|
|
|
This could also be used as a way to remove aliases.
|
|
|
|
Parameters
|
|
-----------
|
|
name: :class:`str`
|
|
The name of the command to remove.
|
|
|
|
Returns
|
|
--------
|
|
Optional[:class:`.Command`]
|
|
The command that was removed. If the name is not valid then
|
|
``None`` is returned instead.
|
|
"""
|
|
command = self.all_commands.pop(name, None)
|
|
|
|
# does not exist
|
|
if command is None:
|
|
return None
|
|
|
|
if name in command.aliases:
|
|
# we're removing an alias so we don't want to remove the rest
|
|
return command
|
|
|
|
# we're not removing the alias so let's delete the rest of them.
|
|
for alias in command.aliases:
|
|
cmd = self.all_commands.pop(alias, None)
|
|
# in the case of a CommandRegistrationError, an alias might conflict
|
|
# with an already existing command. If this is the case, we want to
|
|
# make sure the pre-existing command is not removed.
|
|
if cmd is not None and cmd != command:
|
|
self.all_commands[alias] = cmd
|
|
return command
|
|
|
|
def walk_commands(self) -> Generator[Command[CogT, Any, Any], None, None]:
|
|
"""An iterator that recursively walks through all commands and subcommands.
|
|
|
|
.. versionchanged:: 1.4
|
|
Duplicates due to aliases are no longer returned
|
|
|
|
Yields
|
|
------
|
|
Union[:class:`.Command`, :class:`.Group`]
|
|
A command or group from the internal list of commands.
|
|
"""
|
|
for command in self.commands:
|
|
yield command
|
|
if isinstance(command, GroupMixin):
|
|
yield from command.walk_commands()
|
|
|
|
def get_command(self, name: str) -> Optional[Command[CogT, Any, Any]]:
|
|
"""Get a :class:`.Command` from the internal list
|
|
of commands.
|
|
|
|
This could also be used as a way to get aliases.
|
|
|
|
The name could be fully qualified (e.g. ``'foo bar'``) will get
|
|
the subcommand ``bar`` of the group command ``foo``. If a
|
|
subcommand is not found then ``None`` is returned just as usual.
|
|
|
|
Parameters
|
|
-----------
|
|
name: :class:`str`
|
|
The name of the command to get.
|
|
|
|
Returns
|
|
--------
|
|
Optional[:class:`Command`]
|
|
The command that was requested. If not found, returns ``None``.
|
|
"""
|
|
|
|
# fast path, no space in name.
|
|
if " " not in name:
|
|
return self.all_commands.get(name)
|
|
|
|
names = name.split()
|
|
if not names:
|
|
return None
|
|
obj = self.all_commands.get(names[0])
|
|
if not isinstance(obj, GroupMixin):
|
|
return obj
|
|
|
|
for name in names[1:]:
|
|
try:
|
|
obj = obj.all_commands[name] # type: ignore
|
|
except (AttributeError, KeyError):
|
|
return None
|
|
|
|
return obj
|
|
|
|
@overload
|
|
def command(
|
|
self,
|
|
name: str = ...,
|
|
cls: Type[Command[CogT, P, T]] = ...,
|
|
*args: Any,
|
|
**kwargs: Any,
|
|
) -> Callable[
|
|
[
|
|
Union[
|
|
Callable[Concatenate[CogT, ContextT, P], Coro[T]],
|
|
Callable[Concatenate[ContextT, P], Coro[T]],
|
|
]
|
|
],
|
|
Command[CogT, P, T],
|
|
]:
|
|
...
|
|
|
|
@overload
|
|
def command(
|
|
self,
|
|
name: str = ...,
|
|
cls: Type[CommandT] = ...,
|
|
*args: Any,
|
|
**kwargs: Any,
|
|
) -> Callable[[Callable[Concatenate[ContextT, P], Coro[Any]]], CommandT]:
|
|
...
|
|
|
|
def command(
|
|
self,
|
|
name: str = MISSING,
|
|
cls: Type[CommandT] = MISSING,
|
|
*args: Any,
|
|
**kwargs: Any,
|
|
) -> Callable[[Callable[Concatenate[ContextT, P], Coro[Any]]], CommandT]:
|
|
"""A shortcut decorator that invokes :func:`.command` and adds it to
|
|
the internal command list via :meth:`~.GroupMixin.add_command`.
|
|
|
|
Returns
|
|
--------
|
|
Callable[..., :class:`Command`]
|
|
A decorator that converts the provided method into a Command, adds it to the bot, then returns it.
|
|
"""
|
|
|
|
def decorator(func: Callable[Concatenate[ContextT, P], Coro[Any]]) -> CommandT:
|
|
kwargs.setdefault("parent", self)
|
|
result = command(name=name, cls=cls, *args, **kwargs)(func)
|
|
self.add_command(result)
|
|
return result
|
|
|
|
return decorator
|
|
|
|
@overload
|
|
def group(
|
|
self,
|
|
name: str = ...,
|
|
cls: Type[Group[CogT, P, T]] = ...,
|
|
*args: Any,
|
|
**kwargs: Any,
|
|
) -> Callable[
|
|
[Union[Callable[Concatenate[CogT, ContextT, P], Coro[T]], Callable[Concatenate[ContextT, P], Coro[T]]]],
|
|
Group[CogT, P, T],
|
|
]:
|
|
...
|
|
|
|
@overload
|
|
def group(
|
|
self,
|
|
name: str = ...,
|
|
cls: Type[GroupT] = ...,
|
|
*args: Any,
|
|
**kwargs: Any,
|
|
) -> Callable[[Callable[Concatenate[ContextT, P], Coro[Any]]], GroupT]:
|
|
...
|
|
|
|
def group(
|
|
self,
|
|
name: str = MISSING,
|
|
cls: Type[GroupT] = MISSING,
|
|
*args: Any,
|
|
**kwargs: Any,
|
|
) -> Callable[[Callable[Concatenate[ContextT, P], Coro[Any]]], GroupT]:
|
|
"""A shortcut decorator that invokes :func:`.group` and adds it to
|
|
the internal command list via :meth:`~.GroupMixin.add_command`.
|
|
|
|
Returns
|
|
--------
|
|
Callable[..., :class:`Group`]
|
|
A decorator that converts the provided method into a Group, adds it to the bot, then returns it.
|
|
"""
|
|
|
|
def decorator(func: Callable[Concatenate[ContextT, P], Coro[Any]]) -> GroupT:
|
|
kwargs.setdefault("parent", self)
|
|
result = group(name=name, cls=cls, *args, **kwargs)(func)
|
|
self.add_command(result)
|
|
return result
|
|
|
|
return decorator
|
|
|
|
|
|
class Group(GroupMixin[CogT], Command[CogT, P, T]):
|
|
"""A class that implements a grouping protocol for commands to be
|
|
executed as subcommands.
|
|
|
|
This class is a subclass of :class:`.Command` and thus all options
|
|
valid in :class:`.Command` are valid in here as well.
|
|
|
|
Attributes
|
|
-----------
|
|
invoke_without_command: :class:`bool`
|
|
Indicates if the group callback should begin parsing and
|
|
invocation only if no subcommand was found. Useful for
|
|
making it an error handling function to tell the user that
|
|
no subcommand was found or to have different functionality
|
|
in case no subcommand was found. If this is ``False``, then
|
|
the group callback will always be invoked first. This means
|
|
that the checks and the parsing dictated by its parameters
|
|
will be executed. Defaults to ``False``.
|
|
case_insensitive: :class:`bool`
|
|
Indicates if the group's commands should be case insensitive.
|
|
Defaults to ``False``.
|
|
"""
|
|
|
|
def __init__(self, *args: Any, **attrs: Any) -> None:
|
|
self.invoke_without_command: bool = attrs.pop("invoke_without_command", False)
|
|
super().__init__(*args, **attrs)
|
|
|
|
def copy(self: GroupT) -> GroupT:
|
|
"""Creates a copy of this :class:`Group`.
|
|
|
|
Returns
|
|
--------
|
|
:class:`Group`
|
|
A new instance of this group.
|
|
"""
|
|
ret = super().copy()
|
|
for cmd in self.commands:
|
|
ret.add_command(cmd.copy())
|
|
return ret # type: ignore
|
|
|
|
async def invoke(self, ctx: Context) -> None:
|
|
ctx.invoked_subcommand = None
|
|
ctx.subcommand_passed = None
|
|
early_invoke = not self.invoke_without_command
|
|
if early_invoke:
|
|
await self.prepare(ctx)
|
|
|
|
view = ctx.view
|
|
previous = view.index
|
|
view.skip_ws()
|
|
trigger = view.get_word()
|
|
|
|
if trigger:
|
|
ctx.subcommand_passed = trigger
|
|
ctx.invoked_subcommand = self.all_commands.get(trigger, None)
|
|
|
|
if early_invoke:
|
|
injected = hooked_wrapped_callback(self, ctx, self.callback)
|
|
await injected(*ctx.args, **ctx.kwargs)
|
|
|
|
ctx.invoked_parents.append(ctx.invoked_with) # type: ignore
|
|
|
|
if trigger and ctx.invoked_subcommand:
|
|
ctx.invoked_with = trigger
|
|
await ctx.invoked_subcommand.invoke(ctx)
|
|
elif not early_invoke:
|
|
# undo the trigger parsing
|
|
view.index = previous
|
|
view.previous = previous
|
|
await super().invoke(ctx)
|
|
|
|
async def reinvoke(self, ctx: Context, *, call_hooks: bool = False) -> None:
|
|
ctx.invoked_subcommand = None
|
|
early_invoke = not self.invoke_without_command
|
|
if early_invoke:
|
|
ctx.command = self
|
|
await self._parse_arguments(ctx)
|
|
|
|
if call_hooks:
|
|
await self.call_before_hooks(ctx)
|
|
|
|
view = ctx.view
|
|
previous = view.index
|
|
view.skip_ws()
|
|
trigger = view.get_word()
|
|
|
|
if trigger:
|
|
ctx.subcommand_passed = trigger
|
|
ctx.invoked_subcommand = self.all_commands.get(trigger, None)
|
|
|
|
if early_invoke:
|
|
try:
|
|
await self.callback(*ctx.args, **ctx.kwargs) # type: ignore
|
|
except:
|
|
ctx.command_failed = True
|
|
raise
|
|
finally:
|
|
if call_hooks:
|
|
await self.call_after_hooks(ctx)
|
|
|
|
ctx.invoked_parents.append(ctx.invoked_with) # type: ignore
|
|
|
|
if trigger and ctx.invoked_subcommand:
|
|
ctx.invoked_with = trigger
|
|
await ctx.invoked_subcommand.reinvoke(ctx, call_hooks=call_hooks)
|
|
elif not early_invoke:
|
|
# undo the trigger parsing
|
|
view.index = previous
|
|
view.previous = previous
|
|
await super().reinvoke(ctx, call_hooks=call_hooks)
|
|
|
|
def to_application_command(self, nested: int = 0) -> Optional[EditApplicationCommand]:
|
|
if self.slash_command is False:
|
|
return
|
|
elif nested == 2:
|
|
raise ApplicationCommandRegistrationError(self, f"{self.qualified_name} is too deeply nested!")
|
|
|
|
return { # type: ignore
|
|
"name": self.name,
|
|
"type": int(not (nested - 1)) + 1,
|
|
"description": self.short_doc or "no description",
|
|
"options": [
|
|
cmd.to_application_command(nested=nested + 1) for cmd in sorted(self.commands, key=lambda x: x.name)
|
|
],
|
|
}
|
|
|
|
|
|
# Decorators
|
|
|
|
|
|
@overload
|
|
def command(
|
|
name: str = ...,
|
|
cls: Type[Command[CogT, P, T]] = ...,
|
|
**attrs: Any,
|
|
) -> Callable[
|
|
[
|
|
Union[
|
|
Callable[Concatenate[CogT, ContextT, P], Coro[T]],
|
|
Callable[Concatenate[ContextT, P], Coro[T]],
|
|
]
|
|
],
|
|
Command[CogT, P, T],
|
|
]:
|
|
...
|
|
|
|
|
|
@overload
|
|
def command(
|
|
name: str = ...,
|
|
cls: Type[CommandT] = ...,
|
|
**attrs: Any,
|
|
) -> Callable[
|
|
[
|
|
Union[
|
|
Callable[Concatenate[CogT, ContextT, P], Coro[Any]],
|
|
Callable[Concatenate[ContextT, P], Coro[Any]],
|
|
]
|
|
],
|
|
CommandT,
|
|
]:
|
|
...
|
|
|
|
|
|
def command(
|
|
name: str = MISSING, cls: Type[CommandT] = MISSING, **attrs: Any
|
|
) -> Callable[
|
|
[
|
|
Union[
|
|
Callable[Concatenate[ContextT, P], Coro[Any]],
|
|
Callable[Concatenate[CogT, ContextT, P], Coro[T]],
|
|
]
|
|
],
|
|
Union[Command[CogT, P, T], CommandT],
|
|
]:
|
|
"""A decorator that transforms a function into a :class:`.Command`
|
|
or if called with :func:`.group`, :class:`.Group`.
|
|
|
|
By default the ``help`` attribute is received automatically from the
|
|
docstring of the function and is cleaned up with the use of
|
|
``inspect.cleandoc``. If the docstring is ``bytes``, then it is decoded
|
|
into :class:`str` using utf-8 encoding.
|
|
|
|
All checks added using the :func:`.check` & co. decorators are added into
|
|
the function. There is no way to supply your own checks through this
|
|
decorator.
|
|
|
|
Parameters
|
|
-----------
|
|
name: :class:`str`
|
|
The name to create the command with. By default this uses the
|
|
function name unchanged.
|
|
cls
|
|
The class to construct with. By default this is :class:`.Command`.
|
|
You usually do not change this.
|
|
attrs
|
|
Keyword arguments to pass into the construction of the class denoted
|
|
by ``cls``.
|
|
|
|
Raises
|
|
-------
|
|
TypeError
|
|
If the function is not a coroutine or is already a command.
|
|
"""
|
|
if cls is MISSING:
|
|
cls = Command # type: ignore
|
|
|
|
def decorator(
|
|
func: Union[
|
|
Callable[Concatenate[ContextT, P], Coro[Any]],
|
|
Callable[Concatenate[CogT, ContextT, P], Coro[Any]],
|
|
]
|
|
) -> CommandT:
|
|
if isinstance(func, Command):
|
|
raise TypeError("Callback is already a command.")
|
|
return cls(func, name=name, **attrs)
|
|
|
|
return decorator
|
|
|
|
|
|
@overload
|
|
def group(
|
|
name: str = ...,
|
|
cls: Type[Group[CogT, P, T]] = ...,
|
|
**attrs: Any,
|
|
) -> Callable[
|
|
[
|
|
Union[
|
|
Callable[Concatenate[CogT, ContextT, P], Coro[T]],
|
|
Callable[Concatenate[ContextT, P], Coro[T]],
|
|
]
|
|
],
|
|
Group[CogT, P, T],
|
|
]:
|
|
...
|
|
|
|
|
|
@overload
|
|
def group(
|
|
name: str = ...,
|
|
cls: Type[GroupT] = ...,
|
|
**attrs: Any,
|
|
) -> Callable[
|
|
[
|
|
Union[
|
|
Callable[Concatenate[CogT, ContextT, P], Coro[Any]],
|
|
Callable[Concatenate[ContextT, P], Coro[Any]],
|
|
]
|
|
],
|
|
GroupT,
|
|
]:
|
|
...
|
|
|
|
|
|
def group(
|
|
name: str = MISSING,
|
|
cls: Type[GroupT] = MISSING,
|
|
**attrs: Any,
|
|
) -> Callable[
|
|
[
|
|
Union[
|
|
Callable[Concatenate[ContextT, P], Coro[Any]],
|
|
Callable[Concatenate[CogT, ContextT, P], Coro[T]],
|
|
]
|
|
],
|
|
Union[Group[CogT, P, T], GroupT],
|
|
]:
|
|
"""A decorator that transforms a function into a :class:`.Group`.
|
|
|
|
This is similar to the :func:`.command` decorator but the ``cls``
|
|
parameter is set to :class:`Group` by default.
|
|
|
|
.. versionchanged:: 1.1
|
|
The ``cls`` parameter can now be passed.
|
|
"""
|
|
if cls is MISSING:
|
|
cls = Group # type: ignore
|
|
return command(name=name, cls=cls, **attrs) # type: ignore
|
|
|
|
|
|
def check(predicate: Check, **command_attrs: Any) -> Callable[[T], T]:
|
|
r"""A decorator that adds a check to the :class:`.Command` or its
|
|
subclasses. These checks could be accessed via :attr:`.Command.checks`.
|
|
|
|
These checks should be predicates that take in a single parameter taking
|
|
a :class:`.Context`. If the check returns a ``False``\-like value then
|
|
during invocation a :exc:`.CheckFailure` exception is raised and sent to
|
|
the :func:`.on_command_error` event.
|
|
|
|
If an exception should be thrown in the predicate then it should be a
|
|
subclass of :exc:`.CommandError`. Any exception not subclassed from it
|
|
will be propagated while those subclassed will be sent to
|
|
:func:`.on_command_error`.
|
|
|
|
A special attribute named ``predicate`` is bound to the value
|
|
returned by this decorator to retrieve the predicate passed to the
|
|
decorator. This allows the following introspection and chaining to be done:
|
|
|
|
.. code-block:: python3
|
|
|
|
def owner_or_permissions(**perms):
|
|
original = commands.has_permissions(**perms).predicate
|
|
async def extended_check(ctx):
|
|
if ctx.guild is None:
|
|
return False
|
|
return ctx.guild.owner_id == ctx.author.id or await original(ctx)
|
|
return commands.check(extended_check)
|
|
|
|
.. note::
|
|
|
|
The function returned by ``predicate`` is **always** a coroutine,
|
|
even if the original function was not a coroutine.
|
|
|
|
.. versionchanged:: 1.3
|
|
The ``predicate`` attribute was added.
|
|
|
|
Examples
|
|
---------
|
|
|
|
Creating a basic check to see if the command invoker is you.
|
|
|
|
.. code-block:: python3
|
|
|
|
def check_if_it_is_me(ctx):
|
|
return ctx.message.author.id == 85309593344815104
|
|
|
|
@bot.command()
|
|
@commands.check(check_if_it_is_me)
|
|
async def only_for_me(ctx):
|
|
await ctx.send('I know you!')
|
|
|
|
Transforming common checks into its own decorator:
|
|
|
|
.. code-block:: python3
|
|
|
|
def is_me():
|
|
def predicate(ctx):
|
|
return ctx.message.author.id == 85309593344815104
|
|
return commands.check(predicate)
|
|
|
|
@bot.command()
|
|
@is_me()
|
|
async def only_me(ctx):
|
|
await ctx.send('Only you!')
|
|
|
|
Parameters
|
|
-----------
|
|
predicate: Callable[[:class:`Context`], :class:`bool`]
|
|
The predicate to check if the command should be invoked.
|
|
**command_attrs: Dict[:class:`str`, Any]
|
|
key: value pairs to be added to the command's attributes.
|
|
"""
|
|
|
|
def decorator(func: Union[Command, CoroFunc]) -> Union[Command, CoroFunc]:
|
|
if isinstance(func, Command):
|
|
func.checks.append(predicate)
|
|
func._update_attrs(**command_attrs)
|
|
else:
|
|
if not hasattr(func, "__commands_checks__"):
|
|
func.__commands_checks__ = []
|
|
if not hasattr(func, "__command_attrs__"):
|
|
func.__command_attrs__ = {}
|
|
|
|
func.__commands_checks__.append(predicate)
|
|
func.__command_attrs__.update(command_attrs)
|
|
|
|
return func
|
|
|
|
if inspect.iscoroutinefunction(predicate):
|
|
decorator.predicate = predicate
|
|
else:
|
|
|
|
@functools.wraps(predicate)
|
|
async def wrapper(ctx):
|
|
return predicate(ctx) # type: ignore
|
|
|
|
decorator.predicate = wrapper
|
|
|
|
return decorator # type: ignore
|
|
|
|
|
|
def check_any(*checks: Check) -> Callable[[T], T]:
|
|
r"""A :func:`check` that is added that checks if any of the checks passed
|
|
will pass, i.e. using logical OR.
|
|
|
|
If all checks fail then :exc:`.CheckAnyFailure` is raised to signal the failure.
|
|
It inherits from :exc:`.CheckFailure`.
|
|
|
|
.. note::
|
|
|
|
The ``predicate`` attribute for this function **is** a coroutine.
|
|
|
|
.. versionadded:: 1.3
|
|
|
|
Parameters
|
|
------------
|
|
\*checks: Callable[[:class:`Context`], :class:`bool`]
|
|
An argument list of checks that have been decorated with
|
|
the :func:`check` decorator.
|
|
|
|
Raises
|
|
-------
|
|
TypeError
|
|
A check passed has not been decorated with the :func:`check`
|
|
decorator.
|
|
|
|
Examples
|
|
---------
|
|
|
|
Creating a basic check to see if it's the bot owner or
|
|
the server owner:
|
|
|
|
.. code-block:: python3
|
|
|
|
def is_guild_owner():
|
|
def predicate(ctx):
|
|
return ctx.guild is not None and ctx.guild.owner_id == ctx.author.id
|
|
return commands.check(predicate)
|
|
|
|
@bot.command()
|
|
@commands.check_any(commands.is_owner(), is_guild_owner())
|
|
async def only_for_owners(ctx):
|
|
await ctx.send('Hello mister owner!')
|
|
"""
|
|
|
|
unwrapped = []
|
|
for wrapped in checks:
|
|
try:
|
|
pred = wrapped.predicate
|
|
except AttributeError:
|
|
raise TypeError(f"{wrapped!r} must be wrapped by commands.check decorator") from None
|
|
else:
|
|
unwrapped.append(pred)
|
|
|
|
async def predicate(ctx: Context) -> bool:
|
|
errors = []
|
|
for func in unwrapped:
|
|
try:
|
|
value = await func(ctx)
|
|
except CheckFailure as e:
|
|
errors.append(e)
|
|
else:
|
|
if value:
|
|
return True
|
|
# if we're here, all checks failed
|
|
raise CheckAnyFailure(unwrapped, errors)
|
|
|
|
return check(predicate)
|
|
|
|
|
|
def has_role(item: Union[int, str]) -> Callable[[T], T]:
|
|
"""A :func:`.check` that is added that checks if the member invoking the
|
|
command has the role specified via the name or ID specified.
|
|
|
|
If a string is specified, you must give the exact name of the role, including
|
|
caps and spelling.
|
|
|
|
If an integer is specified, you must give the exact snowflake ID of the role.
|
|
|
|
If the message is invoked in a private message context then the check will
|
|
return ``False``.
|
|
|
|
This check raises one of two special exceptions, :exc:`.MissingRole` if the user
|
|
is missing a role, or :exc:`.NoPrivateMessage` if it is used in a private message.
|
|
Both inherit from :exc:`.CheckFailure`.
|
|
|
|
.. versionchanged:: 1.1
|
|
|
|
Raise :exc:`.MissingRole` or :exc:`.NoPrivateMessage`
|
|
instead of generic :exc:`.CheckFailure`
|
|
|
|
Parameters
|
|
-----------
|
|
item: Union[:class:`int`, :class:`str`]
|
|
The name or ID of the role to check.
|
|
"""
|
|
|
|
def predicate(ctx: Context) -> bool:
|
|
if ctx.guild is None:
|
|
raise NoPrivateMessage()
|
|
|
|
# ctx.guild is None doesn't narrow ctx.author to Member
|
|
if isinstance(item, int):
|
|
role = discord.utils.get(ctx.author.roles, id=item) # type: ignore
|
|
else:
|
|
role = discord.utils.get(ctx.author.roles, name=item) # type: ignore
|
|
if role is None:
|
|
raise MissingRole(item)
|
|
return True
|
|
|
|
return check(predicate)
|
|
|
|
|
|
def has_any_role(*items: Union[int, str]) -> Callable[[T], T]:
|
|
r"""A :func:`.check` that is added that checks if the member invoking the
|
|
command has **any** of the roles specified. This means that if they have
|
|
one out of the three roles specified, then this check will return `True`.
|
|
|
|
Similar to :func:`.has_role`\, the names or IDs passed in must be exact.
|
|
|
|
This check raises one of two special exceptions, :exc:`.MissingAnyRole` if the user
|
|
is missing all roles, or :exc:`.NoPrivateMessage` if it is used in a private message.
|
|
Both inherit from :exc:`.CheckFailure`.
|
|
|
|
.. versionchanged:: 1.1
|
|
|
|
Raise :exc:`.MissingAnyRole` or :exc:`.NoPrivateMessage`
|
|
instead of generic :exc:`.CheckFailure`
|
|
|
|
Parameters
|
|
-----------
|
|
items: List[Union[:class:`str`, :class:`int`]]
|
|
An argument list of names or IDs to check that the member has roles wise.
|
|
|
|
Example
|
|
--------
|
|
|
|
.. code-block:: python3
|
|
|
|
@bot.command()
|
|
@commands.has_any_role('Library Devs', 'Moderators', 492212595072434186)
|
|
async def cool(ctx):
|
|
await ctx.send('You are cool indeed')
|
|
"""
|
|
|
|
def predicate(ctx):
|
|
if ctx.guild is None:
|
|
raise NoPrivateMessage()
|
|
|
|
# ctx.guild is None doesn't narrow ctx.author to Member
|
|
getter = functools.partial(discord.utils.get, ctx.author.roles) # type: ignore
|
|
if any(
|
|
getter(id=item) is not None if isinstance(item, int) else getter(name=item) is not None for item in items
|
|
):
|
|
return True
|
|
raise MissingAnyRole(list(items))
|
|
|
|
return check(predicate, required_roles=items)
|
|
|
|
|
|
def bot_has_role(item: int) -> Callable[[T], T]:
|
|
"""Similar to :func:`.has_role` except checks if the bot itself has the
|
|
role.
|
|
|
|
This check raises one of two special exceptions, :exc:`.BotMissingRole` if the bot
|
|
is missing the role, or :exc:`.NoPrivateMessage` if it is used in a private message.
|
|
Both inherit from :exc:`.CheckFailure`.
|
|
|
|
.. versionchanged:: 1.1
|
|
|
|
Raise :exc:`.BotMissingRole` or :exc:`.NoPrivateMessage`
|
|
instead of generic :exc:`.CheckFailure`
|
|
"""
|
|
|
|
def predicate(ctx):
|
|
if ctx.guild is None:
|
|
raise NoPrivateMessage()
|
|
|
|
me = ctx.me
|
|
if isinstance(item, int):
|
|
role = discord.utils.get(me.roles, id=item)
|
|
else:
|
|
role = discord.utils.get(me.roles, name=item)
|
|
if role is None:
|
|
raise BotMissingRole(item)
|
|
return True
|
|
|
|
return check(predicate, bot_required_role=item)
|
|
|
|
|
|
def bot_has_any_role(*items: int) -> Callable[[T], T]:
|
|
"""Similar to :func:`.has_any_role` except checks if the bot itself has
|
|
any of the roles listed.
|
|
|
|
This check raises one of two special exceptions, :exc:`.BotMissingAnyRole` if the bot
|
|
is missing all roles, or :exc:`.NoPrivateMessage` if it is used in a private message.
|
|
Both inherit from :exc:`.CheckFailure`.
|
|
|
|
.. versionchanged:: 1.1
|
|
|
|
Raise :exc:`.BotMissingAnyRole` or :exc:`.NoPrivateMessage`
|
|
instead of generic checkfailure
|
|
"""
|
|
|
|
def predicate(ctx):
|
|
if ctx.guild is None:
|
|
raise NoPrivateMessage()
|
|
|
|
me = ctx.me
|
|
getter = functools.partial(discord.utils.get, me.roles)
|
|
if any(
|
|
getter(id=item) is not None if isinstance(item, int) else getter(name=item) is not None for item in items
|
|
):
|
|
return True
|
|
raise BotMissingAnyRole(list(items))
|
|
|
|
return check(predicate, bot_required_roles=items)
|
|
|
|
|
|
def has_permissions(**perms: bool) -> Callable[[T], T]:
|
|
"""A :func:`.check` that is added that checks if the member has all of
|
|
the permissions necessary.
|
|
|
|
Note that this check operates on the current channel permissions, not the
|
|
guild wide permissions.
|
|
|
|
The permissions passed in must be exactly like the properties shown under
|
|
:class:`.discord.Permissions`.
|
|
|
|
This check raises a special exception, :exc:`.MissingPermissions`
|
|
that is inherited from :exc:`.CheckFailure`.
|
|
|
|
Parameters
|
|
------------
|
|
perms
|
|
An argument list of permissions to check for.
|
|
|
|
Example
|
|
---------
|
|
|
|
.. code-block:: python3
|
|
|
|
@bot.command()
|
|
@commands.has_permissions(manage_messages=True)
|
|
async def test(ctx):
|
|
await ctx.send('You can manage messages.')
|
|
|
|
"""
|
|
|
|
invalid = set(perms) - set(discord.Permissions.VALID_FLAGS)
|
|
if invalid:
|
|
raise TypeError(f"Invalid permission(s): {', '.join(invalid)}")
|
|
|
|
def predicate(ctx: Context) -> bool:
|
|
ch = ctx.channel
|
|
permissions = ch.permissions_for(ctx.author) # type: ignore
|
|
|
|
missing = [perm for perm, value in perms.items() if getattr(permissions, perm) != value]
|
|
|
|
if not missing:
|
|
return True
|
|
|
|
raise MissingPermissions(missing)
|
|
|
|
return check(predicate, required_permissions=perms)
|
|
|
|
|
|
def bot_has_permissions(**perms: bool) -> Callable[[T], T]:
|
|
"""Similar to :func:`.has_permissions` except checks if the bot itself has
|
|
the permissions listed.
|
|
|
|
This check raises a special exception, :exc:`.BotMissingPermissions`
|
|
that is inherited from :exc:`.CheckFailure`.
|
|
"""
|
|
|
|
invalid = set(perms) - set(discord.Permissions.VALID_FLAGS)
|
|
if invalid:
|
|
raise TypeError(f"Invalid permission(s): {', '.join(invalid)}")
|
|
|
|
def predicate(ctx: Context) -> bool:
|
|
guild = ctx.guild
|
|
me = guild.me if guild is not None else ctx.bot.user
|
|
permissions = ctx.channel.permissions_for(me) # type: ignore
|
|
|
|
missing = [perm for perm, value in perms.items() if getattr(permissions, perm) != value]
|
|
|
|
if not missing:
|
|
return True
|
|
|
|
raise BotMissingPermissions(missing)
|
|
|
|
return check(predicate, bot_required_permissions=perms)
|
|
|
|
|
|
def has_guild_permissions(**perms: bool) -> Callable[[T], T]:
|
|
"""Similar to :func:`.has_permissions`, but operates on guild wide
|
|
permissions instead of the current channel permissions.
|
|
|
|
If this check is called in a DM context, it will raise an
|
|
exception, :exc:`.NoPrivateMessage`.
|
|
|
|
.. versionadded:: 1.3
|
|
"""
|
|
|
|
invalid = set(perms) - set(discord.Permissions.VALID_FLAGS)
|
|
if invalid:
|
|
raise TypeError(f"Invalid permission(s): {', '.join(invalid)}")
|
|
|
|
def predicate(ctx: Context) -> bool:
|
|
if not ctx.guild:
|
|
raise NoPrivateMessage
|
|
|
|
permissions = ctx.author.guild_permissions # type: ignore
|
|
missing = [perm for perm, value in perms.items() if getattr(permissions, perm) != value]
|
|
|
|
if not missing:
|
|
return True
|
|
|
|
raise MissingPermissions(missing)
|
|
|
|
return check(predicate, required_guild_permissions=perms)
|
|
|
|
|
|
def bot_has_guild_permissions(**perms: bool) -> Callable[[T], T]:
|
|
"""Similar to :func:`.has_guild_permissions`, but checks the bot
|
|
members guild permissions.
|
|
|
|
.. versionadded:: 1.3
|
|
"""
|
|
|
|
invalid = set(perms) - set(discord.Permissions.VALID_FLAGS)
|
|
if invalid:
|
|
raise TypeError(f"Invalid permission(s): {', '.join(invalid)}")
|
|
|
|
def predicate(ctx: Context) -> bool:
|
|
if not ctx.guild:
|
|
raise NoPrivateMessage
|
|
|
|
permissions = ctx.me.guild_permissions # type: ignore
|
|
missing = [perm for perm, value in perms.items() if getattr(permissions, perm) != value]
|
|
|
|
if not missing:
|
|
return True
|
|
|
|
raise BotMissingPermissions(missing)
|
|
|
|
return check(predicate, bot_required_guild_permissions=perms)
|
|
|
|
|
|
def dm_only() -> Callable[[T], T]:
|
|
"""A :func:`.check` that indicates this command must only be used in a
|
|
DM context. Only private messages are allowed when
|
|
using the command.
|
|
|
|
This check raises a special exception, :exc:`.PrivateMessageOnly`
|
|
that is inherited from :exc:`.CheckFailure`.
|
|
|
|
.. versionadded:: 1.1
|
|
"""
|
|
|
|
def predicate(ctx: Context) -> bool:
|
|
if ctx.guild is not None:
|
|
raise PrivateMessageOnly()
|
|
return True
|
|
|
|
return check(predicate)
|
|
|
|
|
|
def guild_only() -> Callable[[T], T]:
|
|
"""A :func:`.check` that indicates this command must only be used in a
|
|
guild context only. Basically, no private messages are allowed when
|
|
using the command.
|
|
|
|
This check raises a special exception, :exc:`.NoPrivateMessage`
|
|
that is inherited from :exc:`.CheckFailure`.
|
|
"""
|
|
|
|
def predicate(ctx: Context) -> bool:
|
|
if ctx.guild is None:
|
|
raise NoPrivateMessage()
|
|
return True
|
|
|
|
return check(predicate)
|
|
|
|
|
|
def is_owner() -> Callable[[T], T]:
|
|
"""A :func:`.check` that checks if the person invoking this command is the
|
|
owner of the bot.
|
|
|
|
This is powered by :meth:`.Bot.is_owner`.
|
|
|
|
This check raises a special exception, :exc:`.NotOwner` that is derived
|
|
from :exc:`.CheckFailure`.
|
|
"""
|
|
|
|
async def predicate(ctx: Context) -> bool:
|
|
if not await ctx.bot.is_owner(ctx.author):
|
|
raise NotOwner("You do not own this bot.")
|
|
return True
|
|
|
|
return check(predicate)
|
|
|
|
|
|
def is_nsfw() -> Callable[[T], T]:
|
|
"""A :func:`.check` that checks if the channel is a NSFW channel.
|
|
|
|
This check raises a special exception, :exc:`.NSFWChannelRequired`
|
|
that is derived from :exc:`.CheckFailure`.
|
|
|
|
.. versionchanged:: 1.1
|
|
|
|
Raise :exc:`.NSFWChannelRequired` instead of generic :exc:`.CheckFailure`.
|
|
DM channels will also now pass this check.
|
|
"""
|
|
|
|
def pred(ctx: Context) -> bool:
|
|
ch = ctx.channel
|
|
if ctx.guild is None or (isinstance(ch, (discord.TextChannel, discord.Thread)) and ch.is_nsfw()):
|
|
return True
|
|
raise NSFWChannelRequired(ch) # type: ignore
|
|
|
|
return check(pred)
|
|
|
|
|
|
def cooldown(
|
|
rate: int, per: float, type: Union[BucketType, Callable[[Message], Any]] = BucketType.default
|
|
) -> Callable[[T], T]:
|
|
"""A decorator that adds a cooldown to a :class:`.Command`
|
|
|
|
A cooldown allows a command to only be used a specific amount
|
|
of times in a specific time frame. These cooldowns can be based
|
|
either on a per-guild, per-channel, per-user, per-role or global basis.
|
|
Denoted by the third argument of ``type`` which must be of enum
|
|
type :class:`.BucketType`.
|
|
|
|
If a cooldown is triggered, then :exc:`.CommandOnCooldown` is triggered in
|
|
:func:`.on_command_error` and the local error handler.
|
|
|
|
A command can only have a single cooldown.
|
|
|
|
Parameters
|
|
------------
|
|
rate: :class:`int`
|
|
The number of times a command can be used before triggering a cooldown.
|
|
per: :class:`float`
|
|
The amount of seconds to wait for a cooldown when it's been triggered.
|
|
type: Union[:class:`.BucketType`, Callable[[:class:`.Message`], Any]]
|
|
The type of cooldown to have. If callable, should return a key for the mapping.
|
|
|
|
.. versionchanged:: 1.7
|
|
Callables are now supported for custom bucket types.
|
|
"""
|
|
|
|
def decorator(func: Union[Command, CoroFunc]) -> Union[Command, CoroFunc]:
|
|
if isinstance(func, Command):
|
|
func._buckets = CooldownMapping(Cooldown(rate, per), type)
|
|
else:
|
|
if not hasattr(func, "__command_attrs__"):
|
|
func.__command_attrs__ = {}
|
|
|
|
func.__command_attrs__["cooldown"] = CooldownMapping(Cooldown(rate, per), type)
|
|
return func
|
|
|
|
return decorator # type: ignore
|
|
|
|
|
|
def dynamic_cooldown(
|
|
cooldown: Union[BucketType, Callable[[Message], Any]], type: BucketType = BucketType.default
|
|
) -> Callable[[T], T]:
|
|
"""A decorator that adds a dynamic cooldown to a :class:`.Command`
|
|
|
|
This differs from :func:`.cooldown` in that it takes a function that
|
|
accepts a single parameter of type :class:`.discord.Message` and must
|
|
return a :class:`.Cooldown` or ``None``. If ``None`` is returned then
|
|
that cooldown is effectively bypassed.
|
|
|
|
A cooldown allows a command to only be used a specific amount
|
|
of times in a specific time frame. These cooldowns can be based
|
|
either on a per-guild, per-channel, per-user, per-role or global basis.
|
|
Denoted by the third argument of ``type`` which must be of enum
|
|
type :class:`.BucketType`.
|
|
|
|
If a cooldown is triggered, then :exc:`.CommandOnCooldown` is triggered in
|
|
:func:`.on_command_error` and the local error handler.
|
|
|
|
A command can only have a single cooldown.
|
|
|
|
.. versionadded:: 2.0
|
|
|
|
Parameters
|
|
------------
|
|
cooldown: Callable[[:class:`.discord.Message`], Optional[:class:`.Cooldown`]]
|
|
A function that takes a message and returns a cooldown that will
|
|
apply to this invocation or ``None`` if the cooldown should be bypassed.
|
|
type: :class:`.BucketType`
|
|
The type of cooldown to have.
|
|
"""
|
|
if not callable(cooldown):
|
|
raise TypeError("A callable must be provided")
|
|
|
|
def decorator(func: Union[Command, CoroFunc]) -> Union[Command, CoroFunc]:
|
|
if isinstance(func, Command):
|
|
func._buckets = DynamicCooldownMapping(cooldown, type)
|
|
else:
|
|
if not hasattr(func, "__command_attrs__"):
|
|
func.__command_attrs__ = {}
|
|
|
|
func.__command_attrs__["cooldown"] = DynamicCooldownMapping(cooldown, type)
|
|
return func
|
|
|
|
return decorator # type: ignore
|
|
|
|
|
|
def max_concurrency(number: int, per: BucketType = BucketType.default, *, wait: bool = False) -> Callable[[T], T]:
|
|
"""A decorator that adds a maximum concurrency to a :class:`.Command` or its subclasses.
|
|
|
|
This enables you to only allow a certain number of command invocations at the same time,
|
|
for example if a command takes too long or if only one user can use it at a time. This
|
|
differs from a cooldown in that there is no set waiting period or token bucket -- only
|
|
a set number of people can run the command.
|
|
|
|
.. versionadded:: 1.3
|
|
|
|
Parameters
|
|
-------------
|
|
number: :class:`int`
|
|
The maximum number of invocations of this command that can be running at the same time.
|
|
per: :class:`.BucketType`
|
|
The bucket that this concurrency is based on, e.g. ``BucketType.guild`` would allow
|
|
it to be used up to ``number`` times per guild.
|
|
wait: :class:`bool`
|
|
Whether the command should wait for the queue to be over. If this is set to ``False``
|
|
then instead of waiting until the command can run again, the command raises
|
|
:exc:`.MaxConcurrencyReached` to its error handler. If this is set to ``True``
|
|
then the command waits until it can be executed.
|
|
"""
|
|
|
|
def decorator(func: Union[Command, CoroFunc]) -> Union[Command, CoroFunc]:
|
|
value = MaxConcurrency(number, per=per, wait=wait)
|
|
if isinstance(func, Command):
|
|
func._max_concurrency = value
|
|
else:
|
|
if not hasattr(func, "__command_attrs__"):
|
|
func.__command_attrs__ = {}
|
|
|
|
func.__command_attrs__["_max_concurrency"] = value
|
|
return func
|
|
|
|
return decorator # type: ignore
|
|
|
|
|
|
def before_invoke(coro) -> Callable[[T], T]:
|
|
"""A decorator that registers a coroutine as a pre-invoke hook.
|
|
|
|
This allows you to refer to one before invoke hook for several commands that
|
|
do not have to be within the same cog.
|
|
|
|
.. versionadded:: 1.4
|
|
|
|
Example
|
|
---------
|
|
|
|
.. code-block:: python3
|
|
|
|
async def record_usage(ctx):
|
|
print(ctx.author, 'used', ctx.command, 'at', ctx.message.created_at)
|
|
|
|
@bot.command()
|
|
@commands.before_invoke(record_usage)
|
|
async def who(ctx): # Output: <User> used who at <Time>
|
|
await ctx.send('i am a bot')
|
|
|
|
class What(commands.Cog):
|
|
|
|
@commands.before_invoke(record_usage)
|
|
@commands.command()
|
|
async def when(self, ctx): # Output: <User> used when at <Time>
|
|
await ctx.send(f'and i have existed since {ctx.bot.user.created_at}')
|
|
|
|
@commands.command()
|
|
async def where(self, ctx): # Output: <Nothing>
|
|
await ctx.send('on Discord')
|
|
|
|
@commands.command()
|
|
async def why(self, ctx): # Output: <Nothing>
|
|
await ctx.send('because someone made me')
|
|
|
|
bot.add_cog(What())
|
|
"""
|
|
|
|
def decorator(func: Union[Command, CoroFunc]) -> Union[Command, CoroFunc]:
|
|
if isinstance(func, Command):
|
|
func.before_invoke(coro)
|
|
else:
|
|
if not hasattr(func, "__command_attrs__"):
|
|
func.__command_attrs__ = {}
|
|
|
|
func.__command_attrs__["before_invoke"] = coro
|
|
return func
|
|
|
|
return decorator # type: ignore
|
|
|
|
|
|
def after_invoke(coro) -> Callable[[T], T]:
|
|
"""A decorator that registers a coroutine as a post-invoke hook.
|
|
|
|
This allows you to refer to one after invoke hook for several commands that
|
|
do not have to be within the same cog.
|
|
|
|
.. versionadded:: 1.4
|
|
"""
|
|
|
|
def decorator(func: Union[Command, CoroFunc]) -> Union[Command, CoroFunc]:
|
|
if isinstance(func, Command):
|
|
func.after_invoke(coro)
|
|
else:
|
|
func.__command_attrs__["after_invoke"] = coro
|
|
return func
|
|
|
|
return decorator # type: ignore
|