Implement slash commands

This commit is contained in:
Rapptz
2022-02-25 11:10:47 -05:00
parent 3d0c506124
commit 0d2db90028
10 changed files with 2083 additions and 2 deletions

View File

@@ -43,7 +43,7 @@ from .template import *
from .widget import *
from .object import *
from .reaction import *
from . import utils, opus, abc, ui
from . import utils, opus, abc, ui, app_commands
from .enums import *
from .embeds import *
from .mentions import *

View File

@@ -0,0 +1,16 @@
"""
discord.app_commands
~~~~~~~~~~~~~~~~~~~~~
Application commands support for the Discord API
:copyright: (c) 2015-present Rapptz
:license: MIT, see LICENSE for more details.
"""
from .commands import *
from .enums import *
from .errors import *
from .models import *
from .tree import *

View File

@@ -0,0 +1,743 @@
"""
The MIT License (MIT)
Copyright (c) 2015-present Rapptz
Permission is hereby granted, free of charge, to any person obtaining a
copy of this software and associated documentation files (the "Software"),
to deal in the Software without restriction, including without limitation
the rights to use, copy, modify, merge, publish, distribute, sublicense,
and/or sell copies of the Software, and to permit persons to whom the
Software is furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
DEALINGS IN THE SOFTWARE.
"""
from __future__ import annotations
import inspect
from typing import (
Any,
Callable,
ClassVar,
Coroutine,
Dict,
Generic,
List,
Optional,
Set,
TYPE_CHECKING,
Tuple,
Type,
TypeVar,
Union,
)
from dataclasses import dataclass
from textwrap import TextWrapper
import sys
import re
from .enums import AppCommandOptionType, AppCommandType
from ..interactions import Interaction
from ..enums import ChannelType, try_enum
from .models import Choice
from .errors import CommandSignatureMismatch, CommandAlreadyRegistered
from ..utils import resolve_annotation, MISSING, is_inside_class
from ..user import User
from ..member import Member
from ..role import Role
from ..mixins import Hashable
from ..permissions import Permissions
if TYPE_CHECKING:
from typing_extensions import ParamSpec, Concatenate
from ..interactions import Interaction
from ..types.interactions import (
ResolvedData,
PartialThread,
PartialChannel,
ApplicationCommandInteractionDataOption,
)
from ..state import ConnectionState
from .namespace import Namespace
__all__ = (
'CommandParameter',
'Command',
'Group',
'command',
'describe',
)
if TYPE_CHECKING:
P = ParamSpec('P')
else:
P = TypeVar('P')
T = TypeVar('T')
GroupT = TypeVar('GroupT', bound='Group')
Coro = Coroutine[Any, Any, T]
if TYPE_CHECKING:
CommandCallback = Union[
Callable[Concatenate[GroupT, Interaction, P], Coro[T]],
Callable[Concatenate[Interaction, P], Coro[T]],
]
else:
CommandCallback = Callable[..., Coro[T]]
VALID_SLASH_COMMAND_NAME = re.compile(r'^[\w-]{1,32}$')
CAMEL_CASE_REGEX = re.compile(r'(?<!^)(?=[A-Z])')
def _shorten(
input: str,
*,
_wrapper: TextWrapper = TextWrapper(width=100, max_lines=1, replace_whitespace=True, placeholder='...'),
) -> str:
return _wrapper.fill(' '.join(input.strip().split()))
def _to_kebab_case(text: str) -> str:
return CAMEL_CASE_REGEX.sub('-', text).lower()
@dataclass
class CommandParameter:
"""Represents a application command parameter.
Attributes
-----------
name: :class:`str`
The name of the parameter.
description: :class:`str`
The description of the parameter
required: :class:`bool`
Whether the parameter is required
choices: List[:class:`~discord.app_commands.Choice`]
A list of choices this parameter takes
type: :class:`~discord.app_commands.AppCommandOptionType`
The underlying type of this parameter.
channel_types: List[:class:`~discord.ChannelType`]
The channel types that are allowed for this parameter.
min_value: Optional[:class:`int`]
The minimum supported value for this parameter.
max_value: Optional[:class:`int`]
The maximum supported value for this parameter.
autocomplete: :class:`bool`
Whether this parameter enables autocomplete.
"""
name: str = MISSING
description: str = MISSING
required: bool = MISSING
default: Any = MISSING
choices: List[Choice] = MISSING
type: AppCommandOptionType = MISSING
channel_types: List[ChannelType] = MISSING
min_value: Optional[int] = None
max_value: Optional[int] = None
autocomplete: bool = MISSING
annotation: Any = MISSING
# restrictor: Optional[RestrictorType] = None
def to_dict(self) -> Dict[str, Any]:
base = {
'type': self.type.value,
'name': self.name,
'description': self.description,
'required': self.required,
}
if self.choices:
base['choices'] = [choice.to_dict() for choice in self.choices]
if self.channel_types:
base['channel_types'] = [t.value for t in self.channel_types]
if self.autocomplete:
base['autocomplete'] = True
if self.min_value is not None:
base['min_value'] = self.min_value
if self.max_value is not None:
base['max_value'] = self.max_value
return base
annotation_to_option_type: Dict[Any, AppCommandOptionType] = {
str: AppCommandOptionType.string,
int: AppCommandOptionType.integer,
float: AppCommandOptionType.number,
bool: AppCommandOptionType.boolean,
User: AppCommandOptionType.user,
Member: AppCommandOptionType.user,
Role: AppCommandOptionType.role,
# StageChannel: AppCommandOptionType.channel,
# StoreChannel: AppCommandOptionType.channel,
# VoiceChannel: AppCommandOptionType.channel,
# TextChannel: AppCommandOptionType.channel,
}
NoneType = type(None)
allowed_default_types: Dict[AppCommandOptionType, Tuple[Type[Any], ...]] = {
AppCommandOptionType.string: (str, NoneType),
AppCommandOptionType.integer: (int, NoneType),
AppCommandOptionType.boolean: (bool, NoneType),
}
# Some sanity checks:
# str => string
# int => int
# User => user
# etc ...
# Optional[str] => string, required: false, default: None
# Optional[int] => integer, required: false, default: None
# Optional[Model] = None => resolved, required: false, default: None
# Optional[Model] can only have (CommandParameter, None) as default
# Optional[int | str | bool] can have (CommandParameter, None, int | str | bool) as a default
# Union[str, Member] => disallowed
# Union[int, str] => disallowed
# Union[Member, User] => user
# Optional[Union[Member, User]] => user, required: false, default: None
# Union[Member, User, Object] => mentionable
# Union[Models] => mentionable
# Optional[Union[Models]] => mentionable, required: false, default: None
def _annotation_to_type(
annotation: Any,
*,
mapping=annotation_to_option_type,
_none=NoneType,
) -> Tuple[AppCommandOptionType, Any]:
# Straight simple case, a regular ol' parameter
try:
option_type = mapping[annotation]
except KeyError:
pass
else:
return (option_type, MISSING)
# Check if there's an origin
origin = getattr(annotation, '__origin__', None)
if origin is not Union: # TODO: Python 3.10
# Only Union/Optional is supported so bail early
raise TypeError(f'unsupported type annotation {annotation!r}')
default = MISSING
if annotation.__args__[-1] is _none:
if len(annotation.__args__) == 2:
underlying = annotation.__args__[0]
option_type = mapping.get(underlying)
if option_type is None:
raise TypeError(f'unsupported inner optional type {underlying!r}')
return (option_type, None)
else:
args = annotation.__args__[:-1]
default = None
else:
args = annotation.__args__
# At this point only models are allowed
# Since Optional[int | bool | str] will be taken care of above
# The only valid transformations here are:
# [Member, User] => user
# [Member, User, Role] => mentionable
# [Member | User, Role] => mentionable
supported_types: Set[Any] = {Role, Member, User}
if not all(arg in supported_types for arg in args):
raise TypeError(f'unsupported types given inside {annotation!r}')
if args == (User, Member) or args == (Member, User):
return (AppCommandOptionType.user, default)
return (AppCommandOptionType.mentionable, default)
def _populate_descriptions(params: Dict[str, CommandParameter], descriptions: Dict[str, Any]) -> None:
for name, param in params.items():
description = descriptions.pop(name, MISSING)
if description is MISSING:
param.description = '...'
continue
if not isinstance(description, str):
raise TypeError('description must be a string')
param.description = description
if descriptions:
first = next(iter(descriptions))
raise TypeError(f'unknown parameter given: {first}')
def _get_parameter(annotation: Any, parameter: inspect.Parameter) -> CommandParameter:
(type, default) = _annotation_to_type(annotation)
if default is MISSING:
default = parameter.default
if default is parameter.empty:
default = MISSING
result = CommandParameter(
type=type,
default=default,
required=default is MISSING,
name=parameter.name,
)
if parameter.kind in (parameter.POSITIONAL_ONLY, parameter.VAR_KEYWORD, parameter.VAR_POSITIONAL):
raise TypeError(f'unsupported parameter kind in callback: {parameter.kind!s}')
# Verify validity of the default parameter
if result.default is not MISSING:
valid_types: Tuple[Any, ...] = allowed_default_types.get(result.type, (NoneType,))
if not isinstance(result.default, valid_types):
raise TypeError(f'invalid default parameter type given ({result.default.__class__}), expected {valid_types}')
result.annotation = annotation
return result
def _extract_parameters_from_callback(func: Callable[..., Any], globalns: Dict[str, Any]) -> Dict[str, CommandParameter]:
params = inspect.signature(func).parameters
cache = {}
required_params = is_inside_class(func) + 1
if len(params) < required_params:
raise TypeError(f'callback must have more than {required_params - 1} parameter(s)')
iterator = iter(params.values())
for _ in range(0, required_params):
next(iterator)
parameters: List[CommandParameter] = []
for parameter in iterator:
if parameter.annotation is parameter.empty:
raise TypeError(f'annotation for {parameter.name} must be given')
resolved = resolve_annotation(parameter.annotation, globalns, globalns, cache)
param = _get_parameter(resolved, parameter)
parameters.append(param)
values = sorted(parameters, key=lambda a: a.required, reverse=True)
result = {v.name: v for v in values}
try:
descriptions = func.__discord_app_commands_param_description__
except AttributeError:
pass
else:
_populate_descriptions(result, descriptions)
return result
class Command(Generic[GroupT, P, T]):
"""A class that implements an application command.
These are usually not created manually, instead they are created using
one of the following decorators:
- :func:`~discord.app_commands.command`
- :meth:`Group.command <discord.app_commands.Group.command>`
.. versionadded:: 2.0
Attributes
------------
name: :class:`str`
The name of the application command.
type: :class:`AppCommandType`
The type of application command.
callback: :ref:`coroutine <coroutine>`
The coroutine that is executed when the command is called.
description: :class:`str`
The description of the application command. This shows up in the UI to describe
the application command.
parent: Optional[:class:`CommandGroup`]
The parent application command. ``None`` if there isn't one.
"""
def __init__(
self,
*,
name: str,
description: str,
callback: CommandCallback[GroupT, P, T],
type: AppCommandType = AppCommandType.chat_input,
parent: Optional[Group] = None,
):
self.name: str = name
self.description: str = description
self._callback: CommandCallback[GroupT, P, T] = callback
self.parent: Optional[Group] = parent
self.binding: Optional[GroupT] = None
self.type: AppCommandType = type
self._params: Dict[str, CommandParameter] = _extract_parameters_from_callback(callback, callback.__globals__)
def _copy_with_binding(self, binding: GroupT) -> Command:
cls = self.__class__
copy = cls.__new__(cls)
copy.name = self.name
copy.description = self.description
copy._callback = self._callback
copy.parent = self.parent
copy.type = self.type
copy._params = self._params.copy()
copy.binding = binding
return copy
def to_dict(self) -> Dict[str, Any]:
# If we have a parent then our type is a subcommand
# Otherwise, the type falls back to the specific command type (e.g. slash command or context menu)
option_type = self.type.value if self.parent is None else AppCommandOptionType.subcommand.value
return {
'name': self.name,
'description': self.description,
'type': option_type,
'options': [param.to_dict() for param in self._params.values()],
}
async def _invoke_with_namespace(self, interaction: Interaction, namespace: Namespace) -> T:
defaults = ((name, param.default) for name, param in self._params.items() if not param.required)
namespace._update_with_defaults(defaults)
# These type ignores are because the type checker doesn't quite understand the narrowing here
# Likewise, it thinks we're missing positional arguments when there aren't any.
try:
if self.binding is not None:
return await self._callback(self.binding, interaction, **namespace.__dict__) # type: ignore
return await self._callback(interaction, **namespace.__dict__) # type: ignore
except TypeError:
# In order to detect mismatch from the provided signature and the Discord data,
# there are many ways it can go wrong yet all of them eventually lead to a TypeError
# from the Python compiler showcasing that the signature is incorrect. This lovely
# piece of code essentially checks the last frame of the caller and checks if the
# locals contains our `self` reference.
#
# This is because there is a possibility that a TypeError is raised within the body
# of the function, and in that case the locals wouldn't contain a reference to
# the command object under the name `self`.
frame = inspect.trace()[-1].frame
if frame.f_locals.get('self') is self:
raise CommandSignatureMismatch(self) from None
raise
def get_parameter(self, name: str) -> Optional[CommandParameter]:
"""Returns the :class:`CommandParameter` with the given name.
Parameters
-----------
name: :class:`str`
The parameter name to get.
Returns
--------
Optional[:class:`CommandParameter`]
The command parameter, if found.
"""
return self._params.get(name)
@property
def root_parent(self) -> Optional[Group]:
"""Optional[:class:`Group`]: The root parent of this command."""
if self.parent is None:
return None
parent = self.parent
return parent.parent or parent
def _get_internal_command(self, name: str) -> Optional[Union[Command, Group]]:
return None
class Group:
"""A class that implements an application command group.
These are usually inherited rather than created manually.
.. versionadded:: 2.0
Attributes
------------
name: :class:`str`
The name of the group. If not given, it defaults to a lower-case
kebab-case version of the class name.
description: :class:`str`
The description of the group. This shows up in the UI to describe
the group. If not given, it defaults to the docstring of the
class shortened to 100 characters.
parent: Optional[:class:`CommandGroup`]
The parent group. ``None`` if there isn't one.
"""
__discord_app_commands_group_children__: ClassVar[List[Union[Command, Group]]] = []
__discord_app_commands_group_name__: str = MISSING
__discord_app_commands_group_description__: str = MISSING
def __init_subclass__(cls, *, name: str = MISSING, description: str = MISSING) -> None:
cls.__discord_app_commands_group_children__ = children = [
member for member in cls.__dict__.values() if isinstance(member, (Group, Command)) and member.parent is None
]
found = set()
for child in children:
if child.name in found:
raise TypeError(f'Command {child.name} is a duplicate')
found.add(child.name)
if name is MISSING:
cls.__discord_app_commands_group_name__ = _to_kebab_case(cls.__name__)
else:
cls.__discord_app_commands_group_name__ = name
if description is MISSING:
if cls.__doc__ is None:
cls.__discord_app_commands_group_description__ = '...'
else:
cls.__discord_app_commands_group_description__ = _shorten(cls.__doc__)
else:
cls.__discord_app_commands_group_description__ = description
if len(children) > 25:
raise TypeError('groups cannot have more than 25 commands')
def __init__(
self,
*,
name: str = MISSING,
description: str = MISSING,
parent: Optional[Group] = None,
):
cls = self.__class__
self.name: str = name if name is not MISSING else cls.__discord_app_commands_group_name__
self.description: str = description or cls.__discord_app_commands_group_description__
if not self.description:
raise TypeError('groups must have a description')
self.parent: Optional[Group] = parent
self._children: Dict[str, Union[Command, Group]] = {
child.name: child._copy_with_binding(self) for child in self.__discord_app_commands_group_children__
}
for child in self._children.values():
child.parent = self
if parent is not None and parent.parent is not None:
raise ValueError('groups can only be nested at most one level')
def _copy_with_binding(self, binding: Group) -> Group:
cls = self.__class__
copy = cls.__new__(cls)
copy.name = self.name
copy.description = self.description
copy.parent = self.parent
copy._children = {child.name: child._copy_with_binding(binding) for child in self._children.values()}
return copy
def to_dict(self) -> Dict[str, Any]:
# If this has a parent command then it's part of a subcommand group
# Otherwise, it's just a regular command
option_type = 1 if self.parent is None else AppCommandOptionType.subcommand_group.value
return {
'name': self.name,
'description': self.description,
'type': option_type,
'options': [child.to_dict() for child in self._children.values()],
}
@property
def root_parent(self) -> Optional[Group]:
"""Optional[:class:`Group`]: The parent of this group."""
return self.parent
def _get_internal_command(self, name: str) -> Optional[Union[Command, Group]]:
return self._children.get(name)
def add_command(self, command: Union[Command, Group], /, *, override: bool = False):
"""Adds a command or group to this group's internal list of commands.
Parameters
-----------
command: Union[:class:`Command`, :class:`Group`]
The command or group to add.
override: :class:`bool`
Whether to override a pre-existing command or group with the same name.
If ``False`` then an exception is raised.
Raises
-------
CommandAlreadyRegistered
The command or group is already registered. Note that the :attr:`CommandAlreadyRegistered.guild_id`
attribute will always be ``None`` in this case.
ValueError
There are too many commands already registered.
"""
if not override and command.name in self._children:
raise CommandAlreadyRegistered(command.name, guild_id=None)
self._children[command.name] = command
if len(self._children) > 25:
raise ValueError('maximum number of child commands exceeded')
def remove_command(self, name: str, /) -> Optional[Union[Command, Group]]:
"""Remove a command or group from the internal list of commands.
Parameters
-----------
name: :class:`str`
The name of the command or group to remove.
Returns
--------
Optional[Union[:class:`~discord.app_commands.Command`, :class:`~discord.app_commands.Group`]]
The command that was removed. If nothing was removed
then ``None`` is returned instead.
"""
self._children.pop(name, None)
def get_command(self, name: str, /) -> Optional[Union[Command, Group]]:
"""Retrieves a command or group from its name.
Parameters
-----------
name: :class:`str`
The name of the command or group to retrieve.
Returns
--------
Optional[Union[:class:`~discord.app_commands.Command`, :class:`~discord.app_commands.Group`]]
The command or group that was retrieved. If nothing was found
then ``None`` is returned instead.
"""
return self._children.get(name)
def command(
self,
*,
name: str = MISSING,
description: str = MISSING,
) -> Callable[[CommandCallback[GroupT, P, T]], Command[GroupT, P, T]]:
"""Creates an application command under this group.
Parameters
------------
name: :class:`str`
The name of the application command. If not given, it defaults to a lower-case
version of the callback name.
description: :class:`str`
The description of the application command. This shows up in the UI to describe
the application command. If not given, it defaults to the first line of the docstring
of the callback shortened to 100 characters.
"""
def decorator(func: CommandCallback[GroupT, P, T]) -> Command[GroupT, P, T]:
if not inspect.iscoroutinefunction(func):
raise TypeError('command function must be a coroutine function')
if description is MISSING:
if func.__doc__ is None:
desc = '...'
else:
desc = _shorten(func.__doc__)
else:
desc = description
command = Command(
name=name if name is not MISSING else func.__name__,
description=desc,
callback=func,
type=AppCommandType.chat_input,
parent=self,
)
self.add_command(command)
return command
return decorator
def command(
*,
name: str = MISSING,
description: str = MISSING,
) -> Callable[[CommandCallback[GroupT, P, T]], Command[GroupT, P, T]]:
"""Creates an application command from a regular function.
Parameters
------------
name: :class:`str`
The name of the application command. If not given, it defaults to a lower-case
version of the callback name.
description: :class:`str`
The description of the application command. This shows up in the UI to describe
the application command. If not given, it defaults to the first line of the docstring
of the callback shortened to 100 characters.
"""
def decorator(func: CommandCallback[GroupT, P, T]) -> Command[GroupT, P, T]:
if not inspect.iscoroutinefunction(func):
raise TypeError('command function must be a coroutine function')
if description is MISSING:
if func.__doc__ is None:
desc = '...'
else:
desc = _shorten(func.__doc__)
else:
desc = description
return Command(
name=name if name is not MISSING else func.__name__,
description=desc,
callback=func,
type=AppCommandType.chat_input,
parent=None,
)
return decorator
def describe(**parameters: str) -> Callable[[T], T]:
r"""Describes the given parameters by their name using the key of the keyword argument
as the name.
Example:
.. code-block:: python3
@app_commands.command()
@app_commads.describe(member='the member to ban')
async def ban(interaction: discord.Interaction, member: discord.Member):
await interaction.response.send_message(f'Banned {member}')
Parameters
-----------
\*\*parameters
The description of the parameters.
Raises
--------
TypeError
The parameter name is not found.
"""
def decorator(inner: T) -> T:
if isinstance(inner, Command):
_populate_descriptions(inner._params, parameters)
else:
inner.__discord_app_commands_param_description__ = parameters # type: ignore - Runtime attribute assignment
return inner
return decorator

View File

@@ -0,0 +1,53 @@
"""
The MIT License (MIT)
Copyright (c) 2015-present Rapptz
Permission is hereby granted, free of charge, to any person obtaining a
copy of this software and associated documentation files (the "Software"),
to deal in the Software without restriction, including without limitation
the rights to use, copy, modify, merge, publish, distribute, sublicense,
and/or sell copies of the Software, and to permit persons to whom the
Software is furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
DEALINGS IN THE SOFTWARE.
"""
from ..enums import Enum
__all__ = (
'AppCommandOptionType',
'AppCommandType',
)
class AppCommandOptionType(Enum):
subcommand = 1
subcommand_group = 2
string = 3
integer = 4
boolean = 5
user = 6
channel = 7
role = 8
mentionable = 9
number = 10
attachment = 11
def is_argument(self) -> bool:
return 11 >= self.value >= 3
class AppCommandType(Enum):
chat_input = 1
user = 2
message = 3

View File

@@ -0,0 +1,96 @@
"""
The MIT License (MIT)
Copyright (c) 2015-present Rapptz
Permission is hereby granted, free of charge, to any person obtaining a
copy of this software and associated documentation files (the "Software"),
to deal in the Software without restriction, including without limitation
the rights to use, copy, modify, merge, publish, distribute, sublicense,
and/or sell copies of the Software, and to permit persons to whom the
Software is furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
DEALINGS IN THE SOFTWARE.
"""
from __future__ import annotations
from typing import TYPE_CHECKING, List, Optional, Union
from ..errors import DiscordException
__all__ = (
'CommandAlreadyRegistered',
'CommandSignatureMismatch',
'CommandNotFound',
)
if TYPE_CHECKING:
from .commands import Command, Group
class CommandAlreadyRegistered(DiscordException):
"""An exception raised when a command is already registered.
Attributes
-----------
name: :class:`str`
The name of the command already registered.
guild_id: Optional[:class:`int`]
The guild ID this command was already registered at.
If ``None`` then it was a global command.
"""
def __init__(self, name: str, guild_id: Optional[int]):
self.name = name
self.guild_id = guild_id
super().__init__(f'Command {name!r} already registered.')
class CommandNotFound(DiscordException):
"""An exception raised when an application command could not be found.
Attributes
------------
name: :class:`str`
The name of the application command not found.
parents: List[:class:`str`]
A list of parent command names that were previously found
prior to the application command not being found.
"""
def __init__(self, name: str, parents: List[str]):
self.name = name
self.parents = parents
super().__init__(f'Application command {name!r} not found')
class CommandSignatureMismatch(DiscordException):
"""An exception raised when an application command from Discord has a different signature
from the one provided in the code. This happens because your command definition differs
from the command definition you provided Discord. Either your code is out of date or the
data from Discord is out of sync.
Attributes
------------
command: Union[:class:`~discord.app_commands.Command`, :class:`~discord.app_commands.Group`]
The command that had the signature mismatch.
"""
def __init__(self, command: Union[Command, Group]):
self.command: Union[Command, Group] = command
msg = (
f'The signature for command {command!r} is different from the one provided by Discord. '
'This can happen because either your code is out of date or you have not synced the '
'commands with Discord, causing the mismatch in data. It is recommended to sync the '
'command tree to fix this issue.'
)
super().__init__(msg)

View File

@@ -0,0 +1,592 @@
"""
The MIT License (MIT)
Copyright (c) 2015-present Rapptz
Permission is hereby granted, free of charge, to any person obtaining a
copy of this software and associated documentation files (the "Software"),
to deal in the Software without restriction, including without limitation
the rights to use, copy, modify, merge, publish, distribute, sublicense,
and/or sell copies of the Software, and to permit persons to whom the
Software is furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
DEALINGS IN THE SOFTWARE.
"""
from __future__ import annotations
from datetime import datetime
from ..permissions import Permissions
from ..enums import ChannelType, try_enum
from ..mixins import Hashable
from ..utils import _get_as_snowflake, parse_time, snowflake_time
from .enums import AppCommandOptionType, AppCommandType
from typing import List, NamedTuple, TYPE_CHECKING, Optional, Union
__all__ = (
'AppCommand',
'AppCommandGroup',
'AppCommandChannel',
'AppCommandThread',
'Argument',
'Choice',
)
def is_app_command_argument_type(value: int) -> bool:
return 11 >= value >= 3
if TYPE_CHECKING:
from ..types.command import (
ApplicationCommand as ApplicationCommandPayload,
ApplicationCommandOptionChoice,
ApplicationCommandOption,
)
from ..types.interactions import (
PartialChannel,
PartialThread,
)
from ..types.threads import ThreadMetadata
from ..state import ConnectionState
from ..guild import GuildChannel, Guild
from ..channel import TextChannel
from ..threads import Thread
ApplicationCommandParent = Union['AppCommand', 'AppCommandGroup']
class AppCommand(Hashable):
"""Represents a application command.
In common parlance this is referred to as a "Slash Command" or a
"Context Menu Command".
.. versionadded:: 2.0
.. container:: operations
.. describe:: x == y
Checks if two application commands are equal.
.. describe:: x != y
Checks if two application commands are not equal.
.. describe:: hash(x)
Returns the application command's hash.
.. describe:: str(x)
Returns the application command's name.
Attributes
-----------
id: :class:`int`
The application command's ID.
application_id: :class:`int`
The application command's application's ID.
type: :class:`ApplicationCommandType`
The application command's type.
name: :class:`str`
The application command's name.
description: :class:`str`
The application command's description.
"""
__slots__ = (
'id',
'type',
'application_id',
'name',
'description',
'options',
'_state',
)
def __init__(self, *, data: ApplicationCommandPayload, state=None):
self._state = state
self._from_data(data)
def _from_data(self, data: ApplicationCommandPayload):
self.id: int = int(data['id'])
self.application_id: int = int(data['application_id'])
self.name: str = data['name']
self.description: str = data['description']
self.type: AppCommandType = try_enum(AppCommandType, data.get('type', 1))
self.options = [app_command_option_factory(data=d, parent=self, state=self._state) for d in data.get('options', [])]
def to_dict(self) -> ApplicationCommandPayload:
return {
'id': self.id,
'type': self.type.value,
'application_id': self.application_id,
'name': self.name,
'description': self.description,
'options': [opt.to_dict() for opt in self.options],
} # type: ignore -- Type checker does not understand this literal.
def __str__(self) -> str:
return self.name
def __repr__(self) -> str:
return f'<{self.__class__.__name__} id={self.id!r} name={self.name!r} type={self.type!r}>'
class Choice(NamedTuple):
"""Represents an application command argument choice.
.. versionadded:: 2.0
.. container:: operations
.. describe:: x == y
Checks if two choices are equal.
.. describe:: x != y
Checks if two choices are not equal.
Parameters
-----------
name: :class:`str`
The name of the choice. Used for display purposes.
value: Union[:class:`int`, :class:`str`, :class:`float`]
The value of the choice.
"""
name: str
value: Union[int, str, float]
def to_dict(self) -> ApplicationCommandOptionChoice:
return {
'name': self.name,
'value': self.value,
} # type: ignore -- Type checker does not understand this literal.
class AppCommandChannel(Hashable):
"""Represents an application command partially resolved channel object.
.. versionadded:: 2.0
.. container:: operations
.. describe:: x == y
Checks if two channels are equal.
.. describe:: x != y
Checks if two channels are not equal.
.. describe:: hash(x)
Returns the channel's hash.
.. describe:: str(x)
Returns the channel's name.
Attributes
-----------
id: :class:`int`
The ID of the channel.
type: :class:`~discord.ChannelType`
The type of channel.
name: :class:`str`
The name of the channel.
permissions: :class:`~discord.Permissions`
The resolved permissions of the user who invoked
the application command in that channel.
guild_id: :class:`int`
The guild ID this channel belongs to.
"""
__slots__ = (
'id',
'type',
'name',
'permissions',
'guild_id',
'_state',
)
def __init__(
self,
*,
state: ConnectionState,
data: PartialChannel,
guild_id: int,
):
self._state = state
self.guild_id = guild_id
self.id = int(data['id'])
self.type = try_enum(ChannelType, data['type'])
self.name = data['name']
self.permissions = Permissions(int(data['permissions']))
def __str__(self) -> str:
return self.name
def __repr__(self) -> str:
return f'<{self.__class__.__name__} id={self.id!r} name={self.name!r} type={self.type!r}>'
@property
def guild(self) -> Optional[Guild]:
"""Optional[:class:`~discord.Guild`]: The channel's guild, from cache, if found."""
return self._state._get_guild(self.guild_id)
def resolve(self) -> Optional[GuildChannel]:
"""Resolves the application command channel to the appropriate channel
from cache if found.
Returns
--------
Optional[:class:`.abc.GuildChannel`]
The resolved guild channel or ``None`` if not found in cache.
"""
guild = self._state._get_guild(self.guild_id)
if guild is not None:
return guild.get_channel(self.id)
return None
async def fetch(self) -> GuildChannel:
"""|coro|
Fetches the partial channel to a full :class:`.abc.GuildChannel`.
Raises
--------
NotFound
The channel was not found.
Forbidden
You do not have the permissions required to get a channel.
HTTPException
Retrieving the channel failed.
Returns
--------
:class:`.abc.GuildChannel`
The full channel.
"""
client = self._state._get_client()
return await client.fetch_channel(self.id) # type: ignore -- This is explicit narrowing
@property
def mention(self) -> str:
""":class:`str`: The string that allows you to mention the channel."""
return f'<#{self.id}>'
@property
def created_at(self) -> datetime:
""":class:`datetime.datetime`: An aware timestamp of when this channel was created in UTC."""
return snowflake_time(self.id)
class AppCommandThread(Hashable):
"""Represents an application command partially resolved thread object.
.. versionadded:: 2.0
.. container:: operations
.. describe:: x == y
Checks if two thread are equal.
.. describe:: x != y
Checks if two thread are not equal.
.. describe:: hash(x)
Returns the thread's hash.
.. describe:: str(x)
Returns the thread's name.
Attributes
-----------
id: :class:`int`
The ID of the thread.
type: :class:`~discord.ChannelType`
The type of thread.
name: :class:`str`
The name of the thread.
parent_id: :class:`int`
The parent text channel ID this thread belongs to.
permissions: :class:`~discord.Permissions`
The resolved permissions of the user who invoked
the application command in that thread.
guild_id: :class:`int`
The guild ID this thread belongs to.
archived: :class:`bool`
Whether the thread is archived.
locked: :class:`bool`
Whether the thread is locked.
invitable: :class:`bool`
Whether non-moderators can add other non-moderators to this thread.
This is always ``True`` for public threads.
archiver_id: Optional[:class:`int`]
The user's ID that archived this thread.
auto_archive_duration: :class:`int`
The duration in minutes until the thread is automatically archived due to inactivity.
Usually a value of 60, 1440, 4320 and 10080.
archive_timestamp: :class:`datetime.datetime`
An aware timestamp of when the thread's archived status was last updated in UTC.
"""
__slots__ = (
'id',
'type',
'name',
'permissions',
'guild_id',
'parent_id',
'archived',
'archiver_id',
'auto_archive_duration',
'archive_timestamp',
'locked',
'invitable',
'_created_at',
'_state',
)
def __init__(
self,
*,
state: ConnectionState,
data: PartialThread,
guild_id: int,
):
self._state = state
self.guild_id = guild_id
self.id = int(data['id'])
self.parent_id = int(data['parent_id'])
self.type = try_enum(ChannelType, data['type'])
self.name = data['name']
self.permissions = Permissions(int(data['permissions']))
self._unroll_metadata(data['thread_metadata'])
def __str__(self) -> str:
return self.name
def __repr__(self) -> str:
return f'<{self.__class__.__name__} id={self.id!r} name={self.name!r} archived={self.archived} type={self.type!r}>'
@property
def guild(self) -> Optional[Guild]:
"""Optional[:class:`~discord.Guild`]: The channel's guild, from cache, if found."""
return self._state._get_guild(self.guild_id)
def _unroll_metadata(self, data: ThreadMetadata):
self.archived = data['archived']
self.archiver_id = _get_as_snowflake(data, 'archiver_id')
self.auto_archive_duration = data['auto_archive_duration']
self.archive_timestamp = parse_time(data['archive_timestamp'])
self.locked = data.get('locked', False)
self.invitable = data.get('invitable', True)
self._created_at = parse_time(data.get('create_timestamp'))
@property
def parent(self) -> Optional[TextChannel]:
"""Optional[:class:`TextChannel`]: The parent channel this thread belongs to."""
return self.guild.get_channel(self.parent_id) # type: ignore
@property
def mention(self) -> str:
""":class:`str`: The string that allows you to mention the thread."""
return f'<#{self.id}>'
@property
def created_at(self) -> Optional[datetime]:
"""An aware timestamp of when the thread was created in UTC.
.. note::
This timestamp only exists for threads created after 9 January 2022, otherwise returns ``None``.
"""
return self._created_at
def resolve(self) -> Optional[Thread]:
"""Resolves the application command channel to the appropriate channel
from cache if found.
Returns
--------
Optional[:class:`.abc.GuildChannel`]
The resolved guild channel or ``None`` if not found in cache.
"""
guild = self._state._get_guild(self.guild_id)
if guild is not None:
return guild.get_thread(self.id)
return None
async def fetch(self) -> Thread:
"""|coro|
Fetches the partial channel to a full :class:`~discord.Thread`.
Raises
--------
NotFound
The thread was not found.
Forbidden
You do not have the permissions required to get a thread.
HTTPException
Retrieving the thread failed.
Returns
--------
:class:`~discord.Thread`
The full thread.
"""
client = self._state._get_client()
return await client.fetch_channel(self.id) # type: ignore -- This is explicit narrowing
class Argument:
"""Represents a application command argument.
.. versionadded:: 2.0
Attributes
------------
type: :class:`AppCommandOptionType`
The type of argument.
name: :class:`str`
The name of the argument.
description: :class:`str`
The description of the argument.
required: :class:`bool`
Whether the argument is required.
choices: List[:class:`Choice`]
A list of choices for the command to choose from for this argument.
parent: Union[:class:`AppCommand`, :class:`AppCommandGroup`]
The parent application command that has this argument.
"""
__slots__ = (
'type',
'name',
'description',
'required',
'choices',
'parent',
'_state',
)
def __init__(self, *, parent: ApplicationCommandParent, data: ApplicationCommandOption, state=None):
self._state = state
self.parent = parent
self._from_data(data)
def __repr__(self) -> str:
return f'<{self.__class__.__name__} name={self.name!r} type={self.type!r} required={self.required}>'
def _from_data(self, data: ApplicationCommandOption):
self.type: AppCommandOptionType = try_enum(AppCommandOptionType, data['type'])
self.name: str = data['name']
self.description: str = data['description']
self.required: bool = data.get('required', False)
self.choices: List[Choice] = [Choice(name=d['name'], value=d['value']) for d in data.get('choices', [])]
def to_dict(self) -> ApplicationCommandOption:
return {
'name': self.name,
'type': self.type.value,
'description': self.description,
'required': self.required,
'choices': [choice.to_dict() for choice in self.choices],
'options': [],
} # type: ignore -- Type checker does not understand this literal.
class AppCommandGroup:
"""Represents a application command subcommand.
.. versionadded:: 2.0
Attributes
------------
type: :class:`ApplicationCommandOptionType`
The type of subcommand.
name: :class:`str`
The name of the subcommand.
description: :class:`str`
The description of the subcommand.
required: :class:`bool`
Whether the subcommand is required.
choices: List[:class:`Choice`]
A list of choices for the command to choose from for this subcommand.
arguments: List[:class:`Argument`]
A list of arguments.
parent: Union[:class:`AppCommand`, :class:`AppCommandGroup`]
The parent application command.
"""
__slots__ = (
'type',
'name',
'description',
'required',
'choices',
'arguments',
'parent',
'_state',
)
def __init__(self, *, parent: ApplicationCommandParent, data: ApplicationCommandOption, state=None):
self.parent = parent
self._state = state
self._from_data(data)
def __repr__(self) -> str:
return f'<{self.__class__.__name__} name={self.name!r} type={self.type!r} required={self.required}>'
def _from_data(self, data: ApplicationCommandOption):
self.type: AppCommandOptionType = try_enum(AppCommandOptionType, data['type'])
self.name: str = data['name']
self.description: str = data['description']
self.required: bool = data.get('required', False)
self.choices: List[Choice] = [Choice(name=d['name'], value=d['value']) for d in data.get('choices', [])]
self.arguments: List[Argument] = [
Argument(parent=self, state=self._state, data=d)
for d in data.get('options', [])
if is_app_command_argument_type(d['type'])
]
def to_dict(self) -> 'ApplicationCommandOption':
return {
'name': self.name,
'type': self.type.value,
'description': self.description,
'required': self.required,
'choices': [choice.to_dict() for choice in self.choices],
'options': [arg.to_dict() for arg in self.arguments],
} # type: ignore -- Type checker does not understand this literal.
def app_command_option_factory(
parent: ApplicationCommandParent, data: ApplicationCommandOption, *, state=None
) -> Union[Argument, AppCommandGroup]:
if is_app_command_argument_type(data['type']):
return Argument(parent=parent, data=data, state=state)
else:
return AppCommandGroup(parent=parent, data=data, state=state)

View File

@@ -0,0 +1,160 @@
"""
The MIT License (MIT)
Copyright (c) 2015-present Rapptz
Permission is hereby granted, free of charge, to any person obtaining a
copy of this software and associated documentation files (the "Software"),
to deal in the Software without restriction, including without limitation
the rights to use, copy, modify, merge, publish, distribute, sublicense,
and/or sell copies of the Software, and to permit persons to whom the
Software is furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
DEALINGS IN THE SOFTWARE.
"""
from __future__ import annotations
from typing import TYPE_CHECKING, Any, Dict, Iterable, List, Tuple
from ..interactions import Interaction
from ..member import Member
from ..object import Object
from ..role import Role
from ..message import Message, Attachment
from .models import AppCommandChannel, AppCommandThread
if TYPE_CHECKING:
from ..types.interactions import ResolvedData, ApplicationCommandInteractionDataOption
class Namespace:
"""An object that holds the parameters being passed to a command in a mostly raw state.
This class is deliberately simple and just holds the option name and resolved value as a simple
key-pair mapping. These attributes can be accessed using dot notation. For example, an option
with the name of ``example`` can be accessed using ``ns.example``.
.. versionadded:: 2.0
.. container:: operations
.. describe:: x == y
Checks if two namespaces are equal by checking if all attributes are equal.
.. describe:: x != y
Checks if two namespaces are not equal.
This namespace object converts resolved objects into their appropriate form depending on their
type. Consult the table below for conversion information.
+------------------------------------------+-------------------------------------------------------------------------------+
| Option Type | Resolved Type |
+==========================================+===============================================================================+
| :attr:`AppCommandOptionType.string` | :class:`str` |
+------------------------------------------+-------------------------------------------------------------------------------+
| :attr:`AppCommandOptionType.integer` | :class:`int` |
+------------------------------------------+-------------------------------------------------------------------------------+
| :attr:`AppCommandOptionType.boolean` | :class:`bool` |
+------------------------------------------+-------------------------------------------------------------------------------+
| :attr:`AppCommandOptionType.number` | :class:`float` |
+------------------------------------------+-------------------------------------------------------------------------------+
| :attr:`AppCommandOptionType.user` | :class:`~discord.User` or :class:`~discord.Member` |
+------------------------------------------+-------------------------------------------------------------------------------+
| :attr:`AppCommandOptionType.channel` | :class:`.AppCommandChannel` or :class:`.AppCommandThread` |
+------------------------------------------+-------------------------------------------------------------------------------+
| :attr:`AppCommandOptionType.role` | :class:`~discord.Role` |
+------------------------------------------+-------------------------------------------------------------------------------+
| :attr:`AppCommandOptionType.mentionable` | :class:`~discord.User` or :class:`~discord.Member`, or :class:`~discord.Role` |
+------------------------------------------+-------------------------------------------------------------------------------+
| :attr:`AppCommandOptionType.attachment` | :class:`~discord.Attachment` |
+------------------------------------------+-------------------------------------------------------------------------------+
"""
def __init__(
self,
interaction: Interaction,
resolved: ResolvedData,
options: List[ApplicationCommandInteractionDataOption],
):
completed: Dict[str, Any] = {}
state = interaction._state
members = resolved.get('members', {})
guild_id = interaction.guild_id
guild = (state._get_guild(guild_id) or Object(id=guild_id)) if guild_id is not None else None
for (user_id, user_data) in resolved.get('users', {}).items():
try:
member_data = members[user_id]
except KeyError:
completed[user_id] = state.create_user(user_data)
else:
member_data['user'] = user_data
# Guild ID can't be None in this case.
# There's a type mismatch here that I don't actually care about
member = Member(state=state, guild=guild, data=member_data) # type: ignore
completed[user_id] = member
completed.update(
{
# The guild ID can't be None in this case.
role_id: Role(guild=guild, state=state, data=role_data) # type: ignore
for role_id, role_data in resolved.get('roles', {}).items()
}
)
for (channel_id, channel_data) in resolved.get('channels', {}).items():
if channel_data['type'] in (10, 11, 12):
# The guild ID can't be none in this case
completed[channel_id] = AppCommandThread(state=state, data=channel_data, guild_id=guild_id) # type: ignore
else:
# The guild ID can't be none in this case
completed[channel_id] = AppCommandChannel(state=state, data=channel_data, guild_id=guild_id) # type: ignore
completed.update(
{
attachment_id: Attachment(data=attachment_data, state=state)
for attachment_id, attachment_data in resolved.get('attachments', {}).items()
}
)
# TODO: messages
for option in options:
opt_type = option['type']
name = option['name']
if opt_type in (3, 4, 5): # string, integer, boolean
value = option['value'] # type: ignore -- Key is there
self.__dict__[name] = value
elif opt_type == 10: # number
value = option['value'] # type: ignore -- Key is there
if value is None:
self.__dict__[name] = float('nan')
else:
self.__dict__[name] = float(value)
elif opt_type in (6, 7, 8, 9, 11):
# Remaining ones should be snowflake based ones with resolved data
snowflake: str = option['value'] # type: ignore -- Key is there
value = completed.get(snowflake)
self.__dict__[name] = value
def __repr__(self) -> str:
items = (f'{k}={v!r}' for k, v in self.__dict__.items())
return '<{} {}>'.format(self.__class__.__name__, ' '.join(items))
def __eq__(self, other: object) -> bool:
if isinstance(self, Namespace) and isinstance(other, Namespace):
return self.__dict__ == other.__dict__
return NotImplemented
def _update_with_defaults(self, defaults: Iterable[Tuple[str, Any]]) -> None:
for key, value in defaults:
self.__dict__.setdefault(key, value)

View File

@@ -0,0 +1,416 @@
"""
The MIT License (MIT)
Copyright (c) 2015-present Rapptz
Permission is hereby granted, free of charge, to any person obtaining a
copy of this software and associated documentation files (the "Software"),
to deal in the Software without restriction, including without limitation
the rights to use, copy, modify, merge, publish, distribute, sublicense,
and/or sell copies of the Software, and to permit persons to whom the
Software is furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
DEALINGS IN THE SOFTWARE.
"""
from __future__ import annotations
import inspect
from typing import Callable, Dict, List, Optional, TYPE_CHECKING, Tuple, Type, Union
from .namespace import Namespace
from .models import AppCommand
from .commands import Command, Group, _shorten
from .enums import AppCommandType
from .errors import CommandAlreadyRegistered, CommandNotFound, CommandSignatureMismatch
from ..errors import ClientException
from ..utils import MISSING
if TYPE_CHECKING:
from ..types.interactions import ApplicationCommandInteractionData, ApplicationCommandInteractionDataOption
from ..interactions import Interaction
from ..client import Client
from ..abc import Snowflake
from .commands import CommandCallback, P, T
__all__ = ('CommandTree',)
class CommandTree:
"""Represents a container that holds application command information.
Parameters
-----------
client: :class:`Client`
The client instance to get application command information from.
"""
def __init__(self, client: Client):
self.client = client
self._http = client.http
self._state = client._connection
self._state._command_tree = self
self._guild_commands: Dict[int, Dict[str, Union[Command, Group]]] = {}
self._global_commands: Dict[str, Union[Command, Group]] = {}
# (name, guild_id, command_type): Command
# The above two mappings can use this structure too but we need fast retrieval
# by name and guild_id in the above case while here it isn't as important since
# it's uncommon and N=5 anyway.
self._context_menus: Dict[Tuple[str, Optional[int], int], Command] = {}
async def fetch_commands(self, *, guild: Optional[Snowflake] = None) -> List[AppCommand]:
"""|coro|
Fetches the application's current commands.
If no guild is passed then global commands are fetched, otherwise
the guild's commands are fetched instead.
Parameters
-----------
guild: Optional[:class:`abc.Snowflake`]
The guild to fetch the commands from. If not passed then global commands
are fetched instead.
Raises
-------
HTTPException
Fetching the commands failed.
ClientException
The application ID could not be found.
Returns
--------
List[:class:`~discord.app_commands.AppCommand`]
The application's commands.
"""
if self.client.application_id is None:
raise ClientException('Client does not have an application ID set')
if guild is None:
commands = await self._http.get_global_commands(self.client.application_id)
else:
commands = await self._http.get_guild_commands(self.client.application_id, guild.id)
return [AppCommand(data=data, state=self._state) for data in commands]
def add_command(self, command: Union[Command, Group], /, *, guild: Optional[Snowflake] = None, override: bool = False):
"""Adds an application command to the tree.
This only adds the command locally -- in order to sync the commands
and enable them in the client, :meth:`sync` must be called.
The root parent of the command is added regardless of the type passed.
Parameters
-----------
command: Union[:class:`Command`, :class:`Group`]
The application command or group to add.
guild: Optional[:class:`abc.Snowflake`]
The guild to add the command to. If not given then it
becomes a global command instead.
override: :class:`bool`
Whether to override a command with the same name. If ``False``
an exception is raised. Default is ``False``.
Raises
--------
~discord.CommandAlreadyRegistered
The command was already registered and no override was specified.
TypeError
The application command passed is not a valid application command.
ValueError
The maximum number of commands was reached globally or for that guild.
This is currently 100 for slash commands and 5 for context menu commands.
"""
if not isinstance(command, (Command, Group)):
raise TypeError(f'Expected a application command, received {command.__class__!r} instead')
# todo: validate application command groups having children (required)
root = command.root_parent or command
name = root.name
if guild is not None:
commands = self._guild_commands.setdefault(guild.id, {})
found = name in commands
if found and not override:
raise CommandAlreadyRegistered(name, guild.id)
if len(commands) + found > 100:
raise ValueError('maximum number of slash commands exceeded (100)')
commands[name] = root
else:
found = name in self._global_commands
if found and not override:
raise CommandAlreadyRegistered(name, None)
if len(self._global_commands) + found > 100:
raise ValueError('maximum number of slash commands exceeded (100)')
self._global_commands[name] = root
def remove_command(self, command: str, /, *, guild: Optional[Snowflake] = None) -> Optional[Union[Command, Group]]:
"""Removes an application command from the tree.
This only removes the command locally -- in order to sync the commands
and remove them in the client, :meth:`sync` must be called.
Parameters
-----------
command: :class:`str`
The name of the root command to remove.
guild: Optional[:class:`abc.Snowflake`]
The guild to remove the command from. If not given then it
removes a global command instead.
Returns
---------
Optional[Union[:class:`Command`, :class:`Group`]]
The application command that got removed.
If nothing was removed then ``None`` is returned instead.
"""
if guild is None:
return self._global_commands.pop(command, None)
else:
try:
commands = self._guild_commands[guild.id]
except KeyError:
return None
else:
return commands.pop(command, None)
def get_command(self, command: str, /, *, guild: Optional[Snowflake] = None) -> Optional[Union[Command, Group]]:
"""Gets a application command from the tree.
.. note::
This does *not* include context menu commands.
Parameters
-----------
command: :class:`str`
The name of the root command to get.
guild: Optional[:class:`abc.Snowflake`]
The guild to get the command from. If not given then it
gets a global command instead.
Returns
---------
Optional[Union[:class:`Command`, :class:`Group`]]
The application command that was found.
If nothing was found then ``None`` is returned instead.
"""
if guild is None:
return self._global_commands.get(command)
else:
try:
commands = self._guild_commands[guild.id]
except KeyError:
return None
else:
return commands.get(command)
def get_commands(self, *, guild: Optional[Snowflake] = None) -> List[Union[Command, Group]]:
"""Gets all application commands from the tree.
.. note::
This does *not* retrieve context menu commands.
Parameters
-----------
guild: Optional[:class:`~discord.abc.Snowflake`]
The guild to get the commands from. If not given then it
gets all global commands instead.
Returns
---------
List[Union[:class:`Command`, :class:`Group`]]
The application commands from the tree.
"""
if guild is None:
return list(self._global_commands.values())
else:
try:
commands = self._guild_commands[guild.id]
except KeyError:
return []
else:
return list(commands.values())
def command(
self,
*,
name: str = MISSING,
description: str = MISSING,
guild: Optional[Snowflake] = None,
) -> Callable[[CommandCallback[Group, P, T]], Command[Group, P, T]]:
"""Creates an application command directly under this tree.
Parameters
------------
name: :class:`str`
The name of the application command. If not given, it defaults to a lower-case
version of the callback name.
description: :class:`str`
The description of the application command. This shows up in the UI to describe
the application command. If not given, it defaults to the first line of the docstring
of the callback shortened to 100 characters.
guild: Optional[:class:`Snowflake`]
The guild to add the command to. If not given then it
becomes a global command instead.
"""
def decorator(func: CommandCallback[Group, P, T]) -> Command[Group, P, T]:
if not inspect.iscoroutinefunction(func):
raise TypeError('command function must be a coroutine function')
if description is MISSING:
if func.__doc__ is None:
desc = '...'
else:
desc = _shorten(func.__doc__)
else:
desc = description
command = Command(
name=name if name is not MISSING else func.__name__,
description=desc,
callback=func,
type=AppCommandType.chat_input,
parent=None,
)
self.add_command(command, guild=guild)
return command
return decorator
async def sync(self, *, guild: Optional[Snowflake]) -> List[AppCommand]:
"""|coro|
Syncs the application commands to Discord.
This must be called for the application commands to show up.
Global commands take up to 1-hour to propagate but guild
commands propagate instantly.
Parameters
-----------
guild: Optional[:class:`~discord.abc.Snowflake`]
The guild to sync the commands to. If ``None`` then it
syncs all global commands instead.
Raises
-------
HTTPException
Syncing the commands failed.
ClientException
The client does not have an application ID.
Returns
--------
List[:class:`~discord.AppCommand`]
The application's commands that got synced.
"""
if self.client.application_id is None:
raise ClientException('Client does not have an application ID set')
commands = self.get_commands(guild=guild)
payload = [command.to_dict() for command in commands]
if guild is None:
data = await self._http.bulk_upsert_global_commands(self.client.application_id, payload=payload)
else:
data = await self._http.bulk_upsert_guild_commands(self.client.application_id, guild.id, payload=payload)
return [AppCommand(data=d, state=self._state) for d in data]
def _from_interaction(self, interaction: Interaction):
async def wrapper():
try:
await self.call(interaction)
except Exception as e:
print(f'Error:', e)
self.client.loop.create_task(wrapper(), name='CommandTree-invoker')
async def call(self, interaction: Interaction):
"""|coro|
Given an :class:`~discord.Interaction`, calls the matching
application command that's being invoked.
This is usually called automatically by the library.
Parameters
-----------
interaction: :class:`~discord.Interaction`
The interaction to dispatch from.
Raises
--------
CommandNotFound
The application command referred to could not be found.
CommandSignatureMismatch
The interaction data referred to a parameter that was not found in the
application command definition.
"""
data: ApplicationCommandInteractionData = interaction.data # type: ignore
parents: List[str] = []
name = data['name']
command = self._global_commands.get(name)
if interaction.guild_id:
try:
guild_commands = self._guild_commands[interaction.guild_id]
except KeyError:
pass
else:
command = guild_commands.get(name) or command
# If it's not found at this point then it's not gonna be found at any point
if command is None:
raise CommandNotFound(name, parents)
# This could be done recursively but it'd be a bother due to the state needed
# to be tracked above like the parents, the actual command type, and the
# resulting options we care about
searching = True
options: List[ApplicationCommandInteractionDataOption] = data.get('options', [])
while searching:
for option in options:
# Find subcommands
if option.get('type', 0) in (1, 2):
parents.append(name)
name = option['name']
command = command._get_internal_command(name)
if command is None:
raise CommandNotFound(name, parents)
options = option.get('options', [])
break
else:
searching = False
break
else:
break
if isinstance(command, Group):
# Right now, groups can't be invoked. This is a Discord limitation in how they
# do slash commands. So if we're here and we have a Group rather than a Command instance
# then something in the code is out of date from the data that Discord has.
raise CommandSignatureMismatch(command)
# At this point options refers to the arguments of the command
# and command refers to the class type we care about
namespace = Namespace(interaction, data.get('resolved', {}), options)
await command._invoke_with_namespace(interaction, namespace)

View File

@@ -69,6 +69,7 @@ if TYPE_CHECKING:
from .voice_client import VoiceProtocol
from .client import Client
from .gateway import DiscordWebSocket
from .app_commands import CommandTree
from .types.snowflake import Snowflake
from .types.activity import Activity as ActivityPayload
@@ -227,6 +228,7 @@ class ConnectionState:
self._activity: Optional[ActivityPayload] = activity
self._status: Optional[str] = status
self._intents: Intents = intents
self._command_tree: Optional[CommandTree] = None
if not intents.members or cache_flags._empty:
self.store_user = self.store_user_no_intents # type: ignore - This reassignment is on purpose
@@ -690,7 +692,9 @@ class ConnectionState:
def parse_interaction_create(self, data: gw.InteractionCreateEvent) -> None:
interaction = Interaction(data=data, state=self)
if data['type'] == 3: # interaction component
if data['type'] == 2 and self._command_tree: # application command
self._command_tree._from_interaction(interaction)
elif data['type'] == 3: # interaction component
# These keys are always there for this interaction type
custom_id = interaction.data['custom_id'] # type: ignore
component_type = interaction.data['component_type'] # type: ignore