mirror of
https://github.com/Cog-Creators/Red-DiscordBot.git
synced 2025-12-05 17:02:32 -05:00
Merge branch 'V3/release/3.0.0' into V3/develop
# Conflicts: # redbot/cogs/mod/mod.py
This commit is contained in:
@@ -230,7 +230,6 @@ class Cleanup(commands.Cog):
|
||||
to_delete = await self.get_messages_for_deletion(
|
||||
channel=channel, number=None, after=after, delete_pinned=delete_pinned
|
||||
)
|
||||
to_delete.append(ctx.message)
|
||||
|
||||
reason = "{}({}) deleted {} messages in channel {}.".format(
|
||||
author.name, author.id, len(to_delete), channel.name
|
||||
|
||||
@@ -358,7 +358,6 @@ class CustomCommands(commands.Cog):
|
||||
result = responses
|
||||
else:
|
||||
continue
|
||||
# Replace newlines with spaces
|
||||
# Cut preview to 52 characters max
|
||||
if len(result) > 52:
|
||||
result = result[:49] + "..."
|
||||
@@ -369,7 +368,8 @@ class CustomCommands(commands.Cog):
|
||||
results.append((f"{ctx.clean_prefix}{command}", result))
|
||||
|
||||
if await ctx.embed_requested():
|
||||
content = "\n".join(map("**{0[0]}** {0[1]}".format, results))
|
||||
# We need a space before the newline incase the CC preview ends in link (GH-2295)
|
||||
content = " \n".join(map("**{0[0]}** {0[1]}".format, results))
|
||||
pages = list(pagify(content, page_length=1024))
|
||||
embed_pages = []
|
||||
for idx, page in enumerate(pages, start=1):
|
||||
|
||||
@@ -1,66 +0,0 @@
|
||||
from redbot.core import commands
|
||||
|
||||
|
||||
def mod_or_voice_permissions(**perms):
|
||||
async def pred(ctx: commands.Context):
|
||||
author = ctx.author
|
||||
guild = ctx.guild
|
||||
if await ctx.bot.is_owner(author) or guild.owner == author:
|
||||
# Author is bot owner or guild owner
|
||||
return True
|
||||
|
||||
admin_role = guild.get_role(await ctx.bot.db.guild(guild).admin_role())
|
||||
mod_role = guild.get_role(await ctx.bot.db.guild(guild).mod_role())
|
||||
|
||||
if admin_role in author.roles or mod_role in author.roles:
|
||||
return True
|
||||
|
||||
for vc in guild.voice_channels:
|
||||
resolved = vc.permissions_for(author)
|
||||
good = resolved.administrator or all(
|
||||
getattr(resolved, name, None) == value for name, value in perms.items()
|
||||
)
|
||||
if not good:
|
||||
return False
|
||||
else:
|
||||
return True
|
||||
|
||||
return commands.permissions_check(pred)
|
||||
|
||||
|
||||
def admin_or_voice_permissions(**perms):
|
||||
async def pred(ctx: commands.Context):
|
||||
author = ctx.author
|
||||
guild = ctx.guild
|
||||
if await ctx.bot.is_owner(author) or guild.owner == author:
|
||||
return True
|
||||
admin_role = guild.get_role(await ctx.bot.db.guild(guild).admin_role())
|
||||
if admin_role in author.roles:
|
||||
return True
|
||||
for vc in guild.voice_channels:
|
||||
resolved = vc.permissions_for(author)
|
||||
good = resolved.administrator or all(
|
||||
getattr(resolved, name, None) == value for name, value in perms.items()
|
||||
)
|
||||
if not good:
|
||||
return False
|
||||
else:
|
||||
return True
|
||||
|
||||
return commands.permissions_check(pred)
|
||||
|
||||
|
||||
def bot_has_voice_permissions(**perms):
|
||||
async def pred(ctx: commands.Context):
|
||||
guild = ctx.guild
|
||||
for vc in guild.voice_channels:
|
||||
resolved = vc.permissions_for(guild.me)
|
||||
good = resolved.administrator or all(
|
||||
getattr(resolved, name, None) == value for name, value in perms.items()
|
||||
)
|
||||
if not good:
|
||||
return False
|
||||
else:
|
||||
return True
|
||||
|
||||
return commands.check(pred)
|
||||
@@ -11,12 +11,11 @@ from discord.ext.commands.errors import BadArgument
|
||||
from redbot.core import checks, Config, modlog, commands
|
||||
from redbot.core.bot import Red
|
||||
from redbot.core.i18n import Translator, cog_i18n
|
||||
from redbot.core.utils.chat_formatting import box, escape, pagify
|
||||
from .checks import mod_or_voice_permissions, admin_or_voice_permissions, bot_has_voice_permissions
|
||||
from redbot.core.utils.chat_formatting import box, escape, pagify, format_perms_list
|
||||
from redbot.core.utils.common_filters import filter_invites, filter_various_mentions
|
||||
from redbot.core.utils.mod import is_mod_or_superior, is_allowed_by_hierarchy, get_audit_reason
|
||||
from .log import log
|
||||
|
||||
from redbot.core.utils.common_filters import filter_invites, filter_various_mentions
|
||||
|
||||
_ = T_ = Translator("Mod", __file__)
|
||||
|
||||
@@ -794,15 +793,60 @@ class Mod(commands.Cog):
|
||||
except discord.HTTPException:
|
||||
return
|
||||
|
||||
@staticmethod
|
||||
async def _voice_perm_check(
|
||||
ctx: commands.Context, user_voice_state: Optional[discord.VoiceState], **perms: bool
|
||||
) -> bool:
|
||||
"""Check if the bot and user have sufficient permissions for voicebans.
|
||||
|
||||
This also verifies that the user's voice state and connected
|
||||
channel are not ``None``.
|
||||
|
||||
Returns
|
||||
-------
|
||||
bool
|
||||
``True`` if the permissions are sufficient and the user has
|
||||
a valid voice state.
|
||||
|
||||
"""
|
||||
if user_voice_state is None or user_voice_state.channel is None:
|
||||
await ctx.send(_("That user is not in a voice channel."))
|
||||
return False
|
||||
voice_channel: discord.VoiceChannel = user_voice_state.channel
|
||||
required_perms = discord.Permissions()
|
||||
required_perms.update(**perms)
|
||||
if not voice_channel.permissions_for(ctx.me) >= required_perms:
|
||||
await ctx.send(
|
||||
_("I require the {perms} permission(s) in that user's channel to do that.").format(
|
||||
perms=format_perms_list(required_perms)
|
||||
)
|
||||
)
|
||||
return False
|
||||
if (
|
||||
ctx.permission_state is commands.PermState.NORMAL
|
||||
and not voice_channel.permissions_for(ctx.author) >= required_perms
|
||||
):
|
||||
await ctx.send(
|
||||
_(
|
||||
"You must have the {perms} permission(s) in that user's channel to use this "
|
||||
"command."
|
||||
).format(perms=format_perms_list(required_perms))
|
||||
)
|
||||
return False
|
||||
return True
|
||||
|
||||
@commands.command()
|
||||
@commands.guild_only()
|
||||
@admin_or_voice_permissions(mute_members=True, deafen_members=True)
|
||||
@bot_has_voice_permissions(mute_members=True, deafen_members=True)
|
||||
@checks.admin_or_permissions(mute_members=True, deafen_members=True)
|
||||
async def voiceban(self, ctx: commands.Context, user: discord.Member, *, reason: str = None):
|
||||
"""Ban a user from speaking and listening in the server's voice channels."""
|
||||
user_voice_state = user.voice
|
||||
if user_voice_state is None:
|
||||
await ctx.send(_("No voice state for that user!"))
|
||||
user_voice_state: discord.VoiceState = user.voice
|
||||
if (
|
||||
await self._voice_perm_check(
|
||||
ctx, user_voice_state, deafen_members=True, mute_members=True
|
||||
)
|
||||
is False
|
||||
):
|
||||
return
|
||||
needs_mute = True if user_voice_state.mute is False else False
|
||||
needs_deafen = True if user_voice_state.deaf is False else False
|
||||
@@ -837,13 +881,15 @@ class Mod(commands.Cog):
|
||||
|
||||
@commands.command()
|
||||
@commands.guild_only()
|
||||
@admin_or_voice_permissions(mute_members=True, deafen_members=True)
|
||||
@bot_has_voice_permissions(mute_members=True, deafen_members=True)
|
||||
async def voiceunban(self, ctx: commands.Context, user: discord.Member, *, reason: str = None):
|
||||
"""Unban a user from speaking and listening in the server's voice channels."""
|
||||
user_voice_state = user.voice
|
||||
if user_voice_state is None:
|
||||
await ctx.send(_("No voice state for that user!"))
|
||||
if (
|
||||
await self._voice_perm_check(
|
||||
ctx, user_voice_state, deafen_members=True, mute_members=True
|
||||
)
|
||||
is False
|
||||
):
|
||||
return
|
||||
needs_unmute = True if user_voice_state.mute else False
|
||||
needs_undeafen = True if user_voice_state.deaf else False
|
||||
@@ -925,47 +971,43 @@ class Mod(commands.Cog):
|
||||
|
||||
@mute.command(name="voice")
|
||||
@commands.guild_only()
|
||||
@mod_or_voice_permissions(mute_members=True)
|
||||
@bot_has_voice_permissions(mute_members=True)
|
||||
async def voice_mute(self, ctx: commands.Context, user: discord.Member, *, reason: str = None):
|
||||
"""Mute a user in their current voice channel."""
|
||||
user_voice_state = user.voice
|
||||
if (
|
||||
await self._voice_perm_check(
|
||||
ctx, user_voice_state, mute_members=True, manage_channels=True
|
||||
)
|
||||
is False
|
||||
):
|
||||
return
|
||||
guild = ctx.guild
|
||||
author = ctx.author
|
||||
if user_voice_state:
|
||||
channel = user_voice_state.channel
|
||||
if channel:
|
||||
audit_reason = get_audit_reason(author, reason)
|
||||
channel = user_voice_state.channel
|
||||
audit_reason = get_audit_reason(author, reason)
|
||||
|
||||
success, issue = await self.mute_user(guild, channel, author, user, audit_reason)
|
||||
success, issue = await self.mute_user(guild, channel, author, user, audit_reason)
|
||||
|
||||
if success:
|
||||
await ctx.send(
|
||||
_("Muted {user} in channel {channel.name}").format(
|
||||
user=user, channel=channel
|
||||
)
|
||||
)
|
||||
try:
|
||||
await modlog.create_case(
|
||||
self.bot,
|
||||
guild,
|
||||
ctx.message.created_at,
|
||||
"vmute",
|
||||
user,
|
||||
author,
|
||||
reason,
|
||||
until=None,
|
||||
channel=channel,
|
||||
)
|
||||
except RuntimeError as e:
|
||||
await ctx.send(e)
|
||||
else:
|
||||
await channel.send(issue)
|
||||
else:
|
||||
await ctx.send(_("That user is not in a voice channel right now!"))
|
||||
if success:
|
||||
await ctx.send(
|
||||
_("Muted {user} in channel {channel.name}").format(user=user, channel=channel)
|
||||
)
|
||||
try:
|
||||
await modlog.create_case(
|
||||
self.bot,
|
||||
guild,
|
||||
ctx.message.created_at,
|
||||
"vmute",
|
||||
user,
|
||||
author,
|
||||
reason,
|
||||
until=None,
|
||||
channel=channel,
|
||||
)
|
||||
except RuntimeError as e:
|
||||
await ctx.send(e)
|
||||
else:
|
||||
await ctx.send(_("No voice state for the target!"))
|
||||
return
|
||||
await ctx.send(issue)
|
||||
|
||||
@mute.command(name="channel")
|
||||
@commands.guild_only()
|
||||
@@ -1081,51 +1123,45 @@ class Mod(commands.Cog):
|
||||
|
||||
@unmute.command(name="voice")
|
||||
@commands.guild_only()
|
||||
@mod_or_voice_permissions(mute_members=True)
|
||||
@bot_has_voice_permissions(mute_members=True)
|
||||
async def unmute_voice(
|
||||
self, ctx: commands.Context, user: discord.Member, *, reason: str = None
|
||||
):
|
||||
"""Unmute a user in their current voice channel."""
|
||||
user_voice_state = user.voice
|
||||
if (
|
||||
await self._voice_perm_check(
|
||||
ctx, user_voice_state, mute_members=True, manage_channels=True
|
||||
)
|
||||
is False
|
||||
):
|
||||
return
|
||||
guild = ctx.guild
|
||||
author = ctx.author
|
||||
if user_voice_state:
|
||||
channel = user_voice_state.channel
|
||||
if channel:
|
||||
audit_reason = get_audit_reason(author, reason)
|
||||
channel = user_voice_state.channel
|
||||
audit_reason = get_audit_reason(author, reason)
|
||||
|
||||
success, message = await self.unmute_user(
|
||||
guild, channel, author, user, audit_reason
|
||||
success, message = await self.unmute_user(guild, channel, author, user, audit_reason)
|
||||
|
||||
if success:
|
||||
await ctx.send(
|
||||
_("Unmuted {user} in channel {channel.name}").format(user=user, channel=channel)
|
||||
)
|
||||
try:
|
||||
await modlog.create_case(
|
||||
self.bot,
|
||||
guild,
|
||||
ctx.message.created_at,
|
||||
"vunmute",
|
||||
user,
|
||||
author,
|
||||
reason,
|
||||
until=None,
|
||||
channel=channel,
|
||||
)
|
||||
|
||||
if success:
|
||||
await ctx.send(
|
||||
_("Unmuted {user} in channel {channel.name}").format(
|
||||
user=user, channel=channel
|
||||
)
|
||||
)
|
||||
try:
|
||||
await modlog.create_case(
|
||||
self.bot,
|
||||
guild,
|
||||
ctx.message.created_at,
|
||||
"vunmute",
|
||||
user,
|
||||
author,
|
||||
reason,
|
||||
until=None,
|
||||
channel=channel,
|
||||
)
|
||||
except RuntimeError as e:
|
||||
await ctx.send(e)
|
||||
else:
|
||||
await ctx.send(_("Unmute failed. Reason: {}").format(message))
|
||||
else:
|
||||
await ctx.send(_("That user is not in a voice channel right now!"))
|
||||
except RuntimeError as e:
|
||||
await ctx.send(e)
|
||||
else:
|
||||
await ctx.send(_("No voice state for the target!"))
|
||||
return
|
||||
await ctx.send(_("Unmute failed. Reason: {}").format(message))
|
||||
|
||||
@checks.mod_or_permissions(administrator=True)
|
||||
@unmute.command(name="channel")
|
||||
@@ -1347,8 +1383,8 @@ class Mod(commands.Cog):
|
||||
user = author
|
||||
|
||||
# A special case for a special someone :^)
|
||||
special_date = datetime(2016, 1, 10, 6, 8, 4, 443_000)
|
||||
is_special = user.id == 96_130_341_705_637_888 and guild.id == 133_049_272_517_001_216
|
||||
special_date = datetime(2016, 1, 10, 6, 8, 4, 443000)
|
||||
is_special = user.id == 96130341705637888 and guild.id == 133049272517001216
|
||||
|
||||
roles = sorted(user.roles)[1:]
|
||||
names, nicks = await self.get_names_and_nicks(user)
|
||||
|
||||
@@ -148,5 +148,5 @@ class VersionInfo:
|
||||
)
|
||||
|
||||
|
||||
__version__ = "3.0.0rc2"
|
||||
__version__ = "3.0.0rc3.post1"
|
||||
version_info = VersionInfo.from_str(__version__)
|
||||
|
||||
@@ -111,7 +111,7 @@ class RedBase(commands.GroupMixin, commands.bot.BotBase, RPCMixin):
|
||||
|
||||
self.main_dir = bot_dir
|
||||
|
||||
self.cog_mgr = CogManager(paths=(str(self.main_dir / "cogs"),))
|
||||
self.cog_mgr = CogManager()
|
||||
|
||||
super().__init__(*args, formatter=Help(), **kwargs)
|
||||
|
||||
|
||||
@@ -3,7 +3,7 @@ import pkgutil
|
||||
from importlib import import_module, invalidate_caches
|
||||
from importlib.machinery import ModuleSpec
|
||||
from pathlib import Path
|
||||
from typing import Tuple, Union, List, Optional
|
||||
from typing import Union, List, Optional
|
||||
|
||||
import redbot.cogs
|
||||
from redbot.core.utils import deduplicate_iterables
|
||||
@@ -25,8 +25,6 @@ class NoSuchCog(ImportError):
|
||||
Different from ImportError because some ImportErrors can happen inside cogs.
|
||||
"""
|
||||
|
||||
pass
|
||||
|
||||
|
||||
class CogManager:
|
||||
"""Directory manager for Red's cogs.
|
||||
@@ -39,30 +37,27 @@ class CogManager:
|
||||
|
||||
CORE_PATH = Path(redbot.cogs.__path__[0])
|
||||
|
||||
def __init__(self, paths: Tuple[str] = ()):
|
||||
def __init__(self):
|
||||
self.conf = Config.get_conf(self, 2938473984732, True)
|
||||
tmp_cog_install_path = cog_data_path(self) / "cogs"
|
||||
tmp_cog_install_path.mkdir(parents=True, exist_ok=True)
|
||||
self.conf.register_global(paths=[], install_path=str(tmp_cog_install_path))
|
||||
self._paths = [Path(p) for p in paths]
|
||||
|
||||
async def paths(self) -> Tuple[Path, ...]:
|
||||
"""Get all currently valid path directories.
|
||||
async def paths(self) -> List[Path]:
|
||||
"""Get all currently valid path directories, in order of priority
|
||||
|
||||
Returns
|
||||
-------
|
||||
`tuple` of `pathlib.Path`
|
||||
All valid cog paths.
|
||||
List[pathlib.Path]
|
||||
A list of paths where cog packages can be found. The
|
||||
install path is highest priority, followed by the
|
||||
user-defined paths, and the core path has the lowest
|
||||
priority.
|
||||
|
||||
"""
|
||||
conf_paths = [Path(p) for p in await self.conf.paths()]
|
||||
other_paths = self._paths
|
||||
|
||||
all_paths = deduplicate_iterables(conf_paths, other_paths, [self.CORE_PATH])
|
||||
|
||||
if self.install_path not in all_paths:
|
||||
all_paths.insert(0, await self.install_path())
|
||||
return tuple(p.resolve() for p in all_paths if p.is_dir())
|
||||
return deduplicate_iterables(
|
||||
[await self.install_path()], await self.user_defined_paths(), [self.CORE_PATH]
|
||||
)
|
||||
|
||||
async def install_path(self) -> Path:
|
||||
"""Get the install path for 3rd party cogs.
|
||||
@@ -73,8 +68,20 @@ class CogManager:
|
||||
The path to the directory where 3rd party cogs are stored.
|
||||
|
||||
"""
|
||||
p = Path(await self.conf.install_path())
|
||||
return p.resolve()
|
||||
return Path(await self.conf.install_path()).resolve()
|
||||
|
||||
async def user_defined_paths(self) -> List[Path]:
|
||||
"""Get a list of user-defined cog paths.
|
||||
|
||||
All paths will be absolute and unique, in order of priority.
|
||||
|
||||
Returns
|
||||
-------
|
||||
List[pathlib.Path]
|
||||
A list of user-defined paths.
|
||||
|
||||
"""
|
||||
return list(map(Path, deduplicate_iterables(await self.conf.paths())))
|
||||
|
||||
async def set_install_path(self, path: Path) -> Path:
|
||||
"""Set the install path for 3rd party cogs.
|
||||
@@ -125,11 +132,10 @@ class CogManager:
|
||||
path = Path(path)
|
||||
return path
|
||||
|
||||
async def add_path(self, path: Union[Path, str]):
|
||||
async def add_path(self, path: Union[Path, str]) -> None:
|
||||
"""Add a cog path to current list.
|
||||
|
||||
This will ignore duplicates. Does have a side effect of removing all
|
||||
invalid paths from the saved path list.
|
||||
This will ignore duplicates.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
@@ -156,11 +162,12 @@ class CogManager:
|
||||
if path == self.CORE_PATH:
|
||||
raise ValueError("Cannot add the core path as an additional path.")
|
||||
|
||||
async with self.conf.paths() as paths:
|
||||
if not any(Path(p) == path for p in paths):
|
||||
paths.append(str(path))
|
||||
current_paths = await self.user_defined_paths()
|
||||
if path not in current_paths:
|
||||
current_paths.append(path)
|
||||
await self.set_paths(current_paths)
|
||||
|
||||
async def remove_path(self, path: Union[Path, str]) -> Tuple[Path, ...]:
|
||||
async def remove_path(self, path: Union[Path, str]) -> None:
|
||||
"""Remove a path from the current paths list.
|
||||
|
||||
Parameters
|
||||
@@ -168,20 +175,12 @@ class CogManager:
|
||||
path : `pathlib.Path` or `str`
|
||||
Path to remove.
|
||||
|
||||
Returns
|
||||
-------
|
||||
`tuple` of `pathlib.Path`
|
||||
Tuple of new valid paths.
|
||||
|
||||
"""
|
||||
path = self._ensure_path_obj(path).resolve()
|
||||
paths = await self.user_defined_paths()
|
||||
|
||||
paths = [Path(p) for p in await self.conf.paths()]
|
||||
if path in paths:
|
||||
paths.remove(path)
|
||||
await self.set_paths(paths)
|
||||
|
||||
return tuple(paths)
|
||||
paths.remove(path)
|
||||
await self.set_paths(paths)
|
||||
|
||||
async def set_paths(self, paths_: List[Path]):
|
||||
"""Set the current paths list.
|
||||
@@ -192,7 +191,7 @@ class CogManager:
|
||||
List of paths to set.
|
||||
|
||||
"""
|
||||
str_paths = [str(p) for p in paths_]
|
||||
str_paths = list(map(str, paths_))
|
||||
await self.conf.paths.set(str_paths)
|
||||
|
||||
async def _find_ext_cog(self, name: str) -> ModuleSpec:
|
||||
@@ -213,9 +212,9 @@ class CogManager:
|
||||
------
|
||||
NoSuchCog
|
||||
When no cog with the requested name was found.
|
||||
|
||||
"""
|
||||
resolved_paths = await self.paths()
|
||||
real_paths = [str(p) for p in resolved_paths if p != self.CORE_PATH]
|
||||
real_paths = list(map(str, [await self.install_path()] + await self.user_defined_paths()))
|
||||
|
||||
for finder, module_name, _ in pkgutil.iter_modules(real_paths):
|
||||
if name == module_name:
|
||||
@@ -287,10 +286,8 @@ class CogManager:
|
||||
return await self._find_core_cog(name)
|
||||
|
||||
async def available_modules(self) -> List[str]:
|
||||
"""Finds the names of all available modules to load.
|
||||
"""
|
||||
paths = (await self.install_path(),) + await self.paths()
|
||||
paths = [str(p) for p in paths]
|
||||
"""Finds the names of all available modules to load."""
|
||||
paths = list(map(str, await self.paths()))
|
||||
|
||||
ret = []
|
||||
for finder, module_name, _ in pkgutil.iter_modules(paths):
|
||||
@@ -314,13 +311,6 @@ _ = Translator("CogManagerUI", __file__)
|
||||
class CogManagerUI(commands.Cog):
|
||||
"""Commands to interface with Red's cog manager."""
|
||||
|
||||
@staticmethod
|
||||
async def visible_paths(ctx):
|
||||
install_path = await ctx.bot.cog_mgr.install_path()
|
||||
cog_paths = await ctx.bot.cog_mgr.paths()
|
||||
cog_paths = [p for p in cog_paths if p != install_path]
|
||||
return cog_paths
|
||||
|
||||
@commands.command()
|
||||
@checks.is_owner()
|
||||
async def paths(self, ctx: commands.Context):
|
||||
@@ -330,8 +320,7 @@ class CogManagerUI(commands.Cog):
|
||||
cog_mgr = ctx.bot.cog_mgr
|
||||
install_path = await cog_mgr.install_path()
|
||||
core_path = cog_mgr.CORE_PATH
|
||||
cog_paths = await cog_mgr.paths()
|
||||
cog_paths = [p for p in cog_paths if p not in (install_path, core_path)]
|
||||
cog_paths = await cog_mgr.user_defined_paths()
|
||||
|
||||
msg = _("Install Path: {install_path}\nCore Path: {core_path}\n\n").format(
|
||||
install_path=install_path, core_path=core_path
|
||||
@@ -369,7 +358,11 @@ class CogManagerUI(commands.Cog):
|
||||
from !paths
|
||||
"""
|
||||
path_number -= 1
|
||||
cog_paths = await self.visible_paths(ctx)
|
||||
if path_number < 0:
|
||||
await ctx.send(_("Path numbers must be positive."))
|
||||
return
|
||||
|
||||
cog_paths = await ctx.bot.cog_mgr.user_defined_paths()
|
||||
try:
|
||||
to_remove = cog_paths.pop(path_number)
|
||||
except IndexError:
|
||||
@@ -388,8 +381,11 @@ class CogManagerUI(commands.Cog):
|
||||
# Doing this because in the paths command they're 1 indexed
|
||||
from_ -= 1
|
||||
to -= 1
|
||||
if from_ < 0 or to < 0:
|
||||
await ctx.send(_("Path numbers must be positive."))
|
||||
return
|
||||
|
||||
all_paths = await self.visible_paths(ctx)
|
||||
all_paths = await ctx.bot.cog_mgr.user_defined_paths()
|
||||
try:
|
||||
to_move = all_paths.pop(from_)
|
||||
except IndexError:
|
||||
|
||||
@@ -145,7 +145,7 @@ class Command(CogCommandMixin, commands.Command):
|
||||
|
||||
@property
|
||||
def parents(self) -> List["Group"]:
|
||||
"""List[Group] : Returns all parent commands of this command.
|
||||
"""List[commands.Group] : Returns all parent commands of this command.
|
||||
|
||||
This is sorted by the length of :attr:`.qualified_name` from highest to lowest.
|
||||
If the command has no parents, this will be an empty list.
|
||||
|
||||
@@ -1173,6 +1173,9 @@ class Core(commands.Cog, CoreLogic):
|
||||
await ctx.send(
|
||||
_("A backup has been made of this instance. It is at {}.").format(backup_file)
|
||||
)
|
||||
if backup_file.stat().st_size > 8_000_000:
|
||||
await ctx.send(_("This backup is to large to send via DM."))
|
||||
return
|
||||
await ctx.send(_("Would you like to receive a copy via DM? (y/n)"))
|
||||
|
||||
pred = MessagePredicate.yes_or_no(ctx)
|
||||
@@ -1183,10 +1186,18 @@ class Core(commands.Cog, CoreLogic):
|
||||
else:
|
||||
if pred.result is True:
|
||||
await ctx.send(_("OK, it's on its way!"))
|
||||
async with ctx.author.typing():
|
||||
await ctx.author.send(
|
||||
_("Here's a copy of the backup"), file=discord.File(str(backup_file))
|
||||
try:
|
||||
async with ctx.author.typing():
|
||||
await ctx.author.send(
|
||||
_("Here's a copy of the backup"),
|
||||
file=discord.File(str(backup_file)),
|
||||
)
|
||||
except discord.Forbidden:
|
||||
await ctx.send(
|
||||
_("I don't seem to be able to DM you. Do you have closed DMs?")
|
||||
)
|
||||
except discord.HTTPException:
|
||||
await ctx.send(_("I could not send the backup file."))
|
||||
else:
|
||||
await ctx.send(_("OK then."))
|
||||
else:
|
||||
|
||||
@@ -15,7 +15,7 @@ from pkg_resources import DistributionNotFound
|
||||
|
||||
from . import __version__ as red_version, version_info as red_version_info, VersionInfo, commands
|
||||
from .data_manager import storage_type
|
||||
from .utils.chat_formatting import inline, bordered, humanize_list
|
||||
from .utils.chat_formatting import inline, bordered, format_perms_list
|
||||
from .utils import fuzzy_command_search, format_fuzzy_results
|
||||
|
||||
log = logging.getLogger("red")
|
||||
@@ -234,18 +234,13 @@ def init_events(bot, cli_flags):
|
||||
else:
|
||||
await ctx.send(await format_fuzzy_results(ctx, fuzzy_commands, embed=False))
|
||||
elif isinstance(error, commands.BotMissingPermissions):
|
||||
missing_perms: List[str] = []
|
||||
for perm, value in error.missing:
|
||||
if value is True:
|
||||
perm_name = '"' + perm.replace("_", " ").title() + '"'
|
||||
missing_perms.append(perm_name)
|
||||
if len(missing_perms) == 1:
|
||||
if bin(error.missing.value).count("1") == 1: # Only one perm missing
|
||||
plural = ""
|
||||
else:
|
||||
plural = "s"
|
||||
await ctx.send(
|
||||
"I require the {perms} permission{plural} to execute that command.".format(
|
||||
perms=humanize_list(missing_perms), plural=plural
|
||||
perms=format_perms_list(error.missing), plural=plural
|
||||
)
|
||||
)
|
||||
elif isinstance(error, commands.CheckFailure):
|
||||
|
||||
@@ -1,5 +1,8 @@
|
||||
import itertools
|
||||
from typing import Sequence, Iterator, List
|
||||
|
||||
import discord
|
||||
|
||||
from redbot.core.i18n import Translator
|
||||
|
||||
_ = Translator("UtilsChatFormatting", __file__)
|
||||
@@ -329,7 +332,7 @@ def escape(text: str, *, mass_mentions: bool = False, formatting: bool = False)
|
||||
return text
|
||||
|
||||
|
||||
def humanize_list(items: Sequence[str]):
|
||||
def humanize_list(items: Sequence[str]) -> str:
|
||||
"""Get comma-separted list, with the last element joined with *and*.
|
||||
|
||||
This uses an Oxford comma, because without one, items containing
|
||||
@@ -357,3 +360,29 @@ def humanize_list(items: Sequence[str]):
|
||||
if len(items) == 1:
|
||||
return items[0]
|
||||
return ", ".join(items[:-1]) + _(", and ") + items[-1]
|
||||
|
||||
|
||||
def format_perms_list(perms: discord.Permissions) -> str:
|
||||
"""Format a list of permission names.
|
||||
|
||||
This will return a humanized list of the names of all enabled
|
||||
permissions in the provided `discord.Permissions` object.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
perms : discord.Permissions
|
||||
The permissions object with the requested permissions to list
|
||||
enabled.
|
||||
|
||||
Returns
|
||||
-------
|
||||
str
|
||||
The humanized list.
|
||||
|
||||
"""
|
||||
perm_names: List[str] = []
|
||||
for perm, value in perms:
|
||||
if value is True:
|
||||
perm_name = '"' + perm.replace("_", " ").title() + '"'
|
||||
perm_names.append(perm_name)
|
||||
return humanize_list(perm_names).replace("Guild", "Server")
|
||||
|
||||
@@ -73,10 +73,13 @@ async def menu(
|
||||
# noinspection PyAsyncCall
|
||||
start_adding_reactions(message, controls.keys(), ctx.bot.loop)
|
||||
else:
|
||||
if isinstance(current_page, discord.Embed):
|
||||
await message.edit(embed=current_page)
|
||||
else:
|
||||
await message.edit(content=current_page)
|
||||
try:
|
||||
if isinstance(current_page, discord.Embed):
|
||||
await message.edit(embed=current_page)
|
||||
else:
|
||||
await message.edit(content=current_page)
|
||||
except discord.NotFound:
|
||||
return
|
||||
|
||||
try:
|
||||
react, user = await ctx.bot.wait_for(
|
||||
@@ -90,9 +93,12 @@ async def menu(
|
||||
except discord.Forbidden: # cannot remove all reactions
|
||||
for key in controls.keys():
|
||||
await message.remove_reaction(key, ctx.bot.user)
|
||||
return None
|
||||
|
||||
return await controls[react.emoji](ctx, pages, controls, message, page, timeout, react.emoji)
|
||||
except discord.NotFound:
|
||||
return
|
||||
else:
|
||||
return await controls[react.emoji](
|
||||
ctx, pages, controls, message, page, timeout, react.emoji
|
||||
)
|
||||
|
||||
|
||||
async def next_page(
|
||||
@@ -106,10 +112,8 @@ async def next_page(
|
||||
):
|
||||
perms = message.channel.permissions_for(ctx.me)
|
||||
if perms.manage_messages: # Can manage messages, so remove react
|
||||
try:
|
||||
with contextlib.suppress(discord.NotFound):
|
||||
await message.remove_reaction(emoji, ctx.author)
|
||||
except discord.NotFound:
|
||||
pass
|
||||
if page == len(pages) - 1:
|
||||
page = 0 # Loop around to the first item
|
||||
else:
|
||||
@@ -128,10 +132,8 @@ async def prev_page(
|
||||
):
|
||||
perms = message.channel.permissions_for(ctx.me)
|
||||
if perms.manage_messages: # Can manage messages, so remove react
|
||||
try:
|
||||
with contextlib.suppress(discord.NotFound):
|
||||
await message.remove_reaction(emoji, ctx.author)
|
||||
except discord.NotFound:
|
||||
pass
|
||||
if page == 0:
|
||||
page = len(pages) - 1 # Loop around to the last item
|
||||
else:
|
||||
@@ -148,9 +150,8 @@ async def close_menu(
|
||||
timeout: float,
|
||||
emoji: str,
|
||||
):
|
||||
if message:
|
||||
with contextlib.suppress(discord.NotFound):
|
||||
await message.delete()
|
||||
return None
|
||||
|
||||
|
||||
def start_adding_reactions(
|
||||
@@ -161,7 +162,7 @@ def start_adding_reactions(
|
||||
"""Start adding reactions to a message.
|
||||
|
||||
This is a non-blocking operation - calling this will schedule the
|
||||
reactions being added, but will the calling code will continue to
|
||||
reactions being added, but the calling code will continue to
|
||||
execute asynchronously. There is no need to await this function.
|
||||
|
||||
This is particularly useful if you wish to start waiting for a
|
||||
@@ -169,7 +170,7 @@ def start_adding_reactions(
|
||||
this is exactly what `menu` uses to do that.
|
||||
|
||||
This spawns a `asyncio.Task` object and schedules it on ``loop``.
|
||||
If ``loop`` omitted, the loop will be retreived with
|
||||
If ``loop`` omitted, the loop will be retrieved with
|
||||
`asyncio.get_event_loop`.
|
||||
|
||||
Parameters
|
||||
|
||||
@@ -2,7 +2,6 @@ import discord
|
||||
from datetime import datetime
|
||||
from redbot.core.utils.chat_formatting import pagify
|
||||
import io
|
||||
import sys
|
||||
import weakref
|
||||
from typing import List, Optional
|
||||
from .common_filters import filter_mass_mentions
|
||||
@@ -151,15 +150,12 @@ class Tunnel(metaclass=TunnelMeta):
|
||||
|
||||
"""
|
||||
files = []
|
||||
size = 0
|
||||
max_size = 8 * 1024 * 1024
|
||||
for a in m.attachments:
|
||||
_fp = io.BytesIO()
|
||||
await a.save(_fp)
|
||||
size += sys.getsizeof(_fp)
|
||||
if size > max_size:
|
||||
return []
|
||||
files.append(discord.File(_fp, filename=a.filename))
|
||||
max_size = 8 * 1000 * 1000
|
||||
if m.attachments and sum(a.size for a in m.attachments) <= max_size:
|
||||
for a in m.attachments:
|
||||
_fp = io.BytesIO()
|
||||
await a.save(_fp)
|
||||
files.append(discord.File(_fp, filename=a.filename))
|
||||
return files
|
||||
|
||||
async def communicate(
|
||||
|
||||
@@ -1,5 +1,4 @@
|
||||
#!/usr/bin/env python
|
||||
|
||||
#!/usr/bin/env python3
|
||||
import argparse
|
||||
import asyncio
|
||||
import json
|
||||
@@ -177,26 +176,21 @@ def basic_setup():
|
||||
async def json_to_mongo(current_data_dir: Path, storage_details: dict):
|
||||
from redbot.core.drivers.red_mongo import Mongo
|
||||
|
||||
core_data_file = list(current_data_dir.glob("core/settings.json"))[0]
|
||||
m = Mongo("Core", "0", **storage_details)
|
||||
core_data_file = current_data_dir / "core" / "settings.json"
|
||||
driver = Mongo(cog_name="Core", identifier="0", **storage_details)
|
||||
with core_data_file.open(mode="r") as f:
|
||||
core_data = json.loads(f.read())
|
||||
collection = m.get_collection()
|
||||
await collection.update_one(
|
||||
{"_id": m.unique_cog_identifier}, update={"$set": core_data["0"]}, upsert=True
|
||||
)
|
||||
data = core_data.get("0", {})
|
||||
for key, value in data.items():
|
||||
await driver.set(key, value=value)
|
||||
for p in current_data_dir.glob("cogs/**/settings.json"):
|
||||
cog_name = p.parent.stem
|
||||
with p.open(mode="r") as f:
|
||||
cog_data = json.loads(f.read())
|
||||
cog_i = None
|
||||
for ident in list(cog_data.keys()):
|
||||
cog_i = str(hash(ident))
|
||||
cog_m = Mongo(p.parent.stem, cog_i, **storage_details)
|
||||
cog_c = cog_m.get_collection()
|
||||
for ident in list(cog_data.keys()):
|
||||
await cog_c.update_one(
|
||||
{"_id": cog_m.unique_cog_identifier}, update={"$set": cog_data[cog_i]}, upsert=True
|
||||
)
|
||||
cog_data = json.load(f)
|
||||
for identifier, data in cog_data.items():
|
||||
driver = Mongo(cog_name, identifier, **storage_details)
|
||||
for key, value in data.items():
|
||||
await driver.set(key, value=value)
|
||||
|
||||
|
||||
async def mongo_to_json(current_data_dir: Path, storage_details: dict):
|
||||
|
||||
Reference in New Issue
Block a user