import asyncio import os import random import re import time from typing import Optional, List, Dict, Iterable import discord import gtts from discord.ext import commands from discord.ext.commands import MemberConverter from discord_slash import SlashCommand, SlashContext, SlashCommandOptionType from discord_slash.utils.manage_commands import create_option, create_choice from dotenv import load_dotenv from lib.config import config, config_load, config_save, config_get, config_set, config_get_descriptions, \ config_set_raw, config_meta from lib.utils import async_filter, find_category, find_role_case_insensitive, link_channel, connect_and_play, \ text_to_speech VERSION = "1.2.0-rc.2" load_dotenv() TOKEN = os.getenv('DISCORD_TOKEN') PING_REGEX = re.compile(r'\b(?:ge)?ping', re.IGNORECASE) TOPFIT_REGEX = re.compile(r'\b(topfit|fit|top|micro|microsoft|virtual reality|vr|ä+h*m*|hä*)\b', re.IGNORECASE) LOEH_REGEX = re.compile(r'Och L(?:oe|ö)h!?', re.IGNORECASE) OCH_ANYONE_REGEX = re.compile(r'^Och\s*(\w.*?)\s*[,.!?:]*$', re.IGNORECASE) TOPFIT_WORDS = ( "äh", "ähm", "ääh", "äääh", "ä", "..", "...", "Mi", "Mic", "Micro", "Microsoft", "Microservices", "top", "vortual reality" ) # LOEH_ID = 327126546970312739 LOEH_ID = 254265844928872448 OWNER_ID = 327126546970312739 OCH_LOEH_SOUND = "assets/och_loeh.mp3" config_load() intents = discord.Intents.default() intents.members = True intents.voice_states = True bot = commands.Bot(command_prefix=config.get('prefix'), intents=intents) slash = SlashCommand(bot, sync_commands=True) if 'LIBOPUS' in os.environ and not len(os.environ['LIBOPUS']) == 0: discord.opus.load_opus(os.environ['LIBOPUS']) async def _get_loeh(guild: discord.Guild) -> Optional[discord.Member]: try: return await guild.fetch_member(LOEH_ID) except discord.NotFound: return None def _get_last_och_time(guild: discord.Guild) -> int: return config_get('last-och-time', guild.id) def _set_last_och_time(guild: discord.Guild): config_set_raw('last-och-time', time.time(), guild.id) @bot.event async def on_ready(): print(f'{bot.user} has connected to Discord!') @bot.event async def on_message(message: discord.Message): if message.guild is not None and not message.author.bot: if config_get('loeh-enable', message.guild.id) and LOEH_REGEX.match(message.content): if message.author.id == LOEH_ID: await message.channel.send("https://stuff.siphalor.de/img/spidy-is-that-you.jpg") else: cooldown = config_get('och-cooldown', message.guild.id) t = time.time() - _get_last_och_time(message.guild) if t <= cooldown: await message.channel.send( '429: Too many requests - Wait ' + str(int(cooldown - t)) + ' more seconds!') return _set_last_och_time(message.guild) loeh = await _get_loeh(message.guild) if loeh is None: await message.channel.send('404: Löh not found!') elif loeh.voice is None: await message.channel.send('400: Löh not connected!') else: voice: discord.VoiceState = loeh.voice try: voice_channel: discord.VoiceChannel = voice.channel source = discord.PCMVolumeTransformer(discord.FFmpegPCMAudio(source=OCH_LOEH_SOUND)) voice_protocol = await connect_and_play(voice_channel, source=source) await loeh.edit(mute=True) sleeper = asyncio.sleep(config_get('och-timeout', message.guild.id)) message: Optional[discord.Message] = await message.channel.send('Zu Befehl!') await sleeper await loeh.edit(mute=False) if message is not None: await message.edit(content="~~Zu Befehl!~~\nEs sei ihm verziehen.") if type(voice_protocol) is discord.VoiceClient: await voice_protocol.disconnect(force=True) except (asyncio.TimeoutError, discord.Forbidden, discord.HTTPException, discord.ClientException): await message.channel.send('Failed to complete your command, Sir') return elif config_get('och-anyone-enable', message.guild.id) and ( match := OCH_ANYONE_REGEX.search(message.content)) is not None: cooldown = config_get('och-cooldown', message.guild.id) t = time.time() - _get_last_och_time(message.guild) if t <= cooldown: await message.channel.send('429: Too many requests - Wait ' + str(int(cooldown - t)) + ' more seconds!') return _set_last_och_time(message.guild) guild: discord.Guild = message.guild bot_member: discord.Member = guild.get_member(bot.user.id) search: str = match.group(1).lower() matches: list = [] voice: Optional[discord.VoiceChannel] = None async for member in guild.fetch_members(limit=100): if not member.bot and member.voice is not None and search in member.display_name.lower().split(' '): perms: discord.Permissions = member.voice.channel.permissions_for(bot_member) if perms.connect and perms.speak: voice = member.voice.channel matches.append(member) if matches and voice is not None: source, destroy_tts = text_to_speech(message.content) voice_protocol: Optional[discord.VoiceProtocol] = await connect_and_play(voice, source) async def _mute(m: discord.Member): await m.edit(mute=True) async def _unmute(m: discord.Member): await m.edit(mute=False) await asyncio.gather(*map(_mute, matches)) sleeper = asyncio.sleep(config_get('och-timeout', message.guild.id)) message: Optional[discord.Message] = await message.channel.send('Auf gehts!') await sleeper await asyncio.gather(*map(_unmute, matches)) waiter = None if message is not None: waiter = message.edit(content='~~Auf gehts!~~\nGeschafft!') if type(voice_protocol) is discord.VoiceClient: await voice_protocol.disconnect(force=True) if waiter is not None: await waiter destroy_tts() else: await message.channel.send('404: No users found!') elif config_get('inf19x-insiders-enable', message.guild.id): if PING_REGEX.search(message.content): embed = discord.Embed( title="*pinken, schwaches Verb*", description="ein Netzwerkgerät testweise ansprechen.\nOft falsch geschrieben als pin__g__en", color=16636435 ) embed.add_field(name='ich', value='pinke') embed.add_field(name='du', value='pinkst') embed.add_field(name='er|sie|es', value='pinkt') embed.add_field(name='wir', value='pinken') embed.add_field(name='ihr', value='pinkt') embed.add_field(name='sie', value='pinken') embed.add_field(name='Partizip 2', value='gepinkt') await message.channel.send(embed=embed) else: match = TOPFIT_REGEX.search(message.content) if match is not None: text = '' for i in range(0, random.randint(7, 13)): text += random.choice(TOPFIT_WORDS) + " " text += match.group(1) await message.channel.send(text) await bot.process_commands(message) @bot.event async def on_voice_state_update(member: discord.Member, before: discord.VoiceState, after: discord.VoiceState): if after.channel is None and before.channel is not None: channel: discord.VoiceChannel = before.channel if not channel.voice_states: if channel.category is not None and channel.name.endswith("_vc"): if channel.category.name.lower() == config_get("groups-category", channel.guild.id).lower(): name: str = channel.name[:-3] role = await find_role_case_insensitive(channel.guild, name, config_get("groups-role-prefix", channel.guild.id)) if role is not None: await channel.delete(reason="Delete temporary group channel when last person left") def _is_message_valid_for_selection(message: discord.Message, reaction_filter: Optional[str] = None) -> bool: if message.clean_content.strip() == '': return False for reaction in message.reactions: if reaction.emoji == '❌': return False if reaction_filter is not None: for reaction in message.reactions: if type(reaction.emoji) is str: if reaction.emoji == reaction_filter: return True elif type(reaction.emoji) is discord.Emoji or type(reaction.emoji) is discord.PartialEmoji: if ':' + reaction.emoji.name + ':' == reaction_filter: return True return False return True @bot.command(name='config', brief='Change the configuration of this bot') async def config_prefix_command(ctx: commands.Context, cmd: str = '', key: str = '', *, val: str = ''): if ctx.guild is None: await ctx.send('You can\'t run config commands in private messages!') return if cmd == '': await ctx.send('Use `' + bot.command_prefix + 'config get` or `' + bot.command_prefix + 'config set` to query for or update config values. Use `' + bot.command_prefix + 'config list` to see all config options') elif cmd == 'list': msg = 'Available config options:' for (key, value) in config_get_descriptions(): msg += '\n - `' + key + '`: ' + value[1] await ctx.send(msg) elif cmd == 'get': if key in config: await ctx.send('`' + key + '` is set to `' + str(config_get(key, ctx.guild.id)) + '`') else: await ctx.send('Unknown config option `' + key + '`') elif cmd == 'set': if ctx.author.guild_permissions.administrator: await ctx.send(config_set(key, val, ctx.guild.id)) else: await ctx.send('You\'re not allowed to change the configuration on this server!') elif cmd == 'get-global': if key in config: await ctx.send('`' + key + '` is globally set to `' + str(config_get(key)) + '`') else: await ctx.send('Unknown config option `' + key + '`') elif cmd == 'set-global': if ctx.author.id == OWNER_ID: msg = config_set(key, val) if key == 'prefix': bot.command_prefix = config_get('prefix') await ctx.send(msg) else: await ctx.send('You\'re not allowed to change the global configuration!') else: await ctx.send('Unknown command!') # ------------------- The slashy way of life ------------------- # slash_guild_ids = None config_choices = [create_choice(name=entry[1][1], value=entry[0]) for entry in config_meta.items() if entry[1][0]] config_option = create_option("config_key", "A key identifying the config entry to target", str, True, config_choices) async def check_slash_context(ctx: SlashContext, require_op: bool = False, require_manage_roles: bool = False) -> bool: if ctx.guild is None: await ctx.send('You can\'t run config commands in private messages!') return False if require_op and not ctx.author.guild_permissions.administrator: await ctx.send('You\'re not allowed to run this command on this server!', hidden=True) return False if require_manage_roles and not ctx.author.guild_permissions.manage_roles: await ctx.send('You\'re not allowed to run this command on this server!', hidden=True) return False return True async def check_text_channel(ctx: SlashContext, channel) -> bool: if not isinstance(channel, discord.TextChannel): await ctx.send('The given channel is not a text channel!', hidden=True) return False return True # __ __ ___ ____ ____ # | \/ |_ _/ ___| / ___| # | |\/| || |\___ \| | # | | | || | ___) | |___ # |_| |_|___|____/ \____| @slash.slash( name="about", description="Shows information about this bot", guild_ids=slash_guild_ids ) async def about_slash(ctx: SlashContext): embed = discord.Embed( color=discord.Colour.from_rgb(200, 0, 0), title="About Och bot", description="The Och bot is a special bot created by Siphalor filled with features and inside jokes of his " "university group.", ) embed.add_field(name="version", value=VERSION) await ctx.send(embed=embed, hidden=True) @slash.slash( name="random-message", description="Select a random message from a channel", options=[ create_option( name="channel", description="The channel to pull the messages from", option_type=SlashCommandOptionType.CHANNEL, required=False ), create_option( name="max_messages", description="The maximum amount of messages to index (max 1024, default 100)", option_type=SlashCommandOptionType.INTEGER, required=False ), create_option( name="reaction", description="Only pull from messages that have this reaction", option_type=str, required=False ) ], guild_ids=slash_guild_ids ) async def random_message_slash(ctx: SlashContext, channel: Optional[discord.abc.GuildChannel] = None, max_messages: int = 100, reaction: Optional[str] = None): await ctx.defer() if channel is None: channel = ctx.channel messages = async_filter(lambda m: _is_message_valid_for_selection(m, reaction), channel.history(limit=max_messages)) messages = [item async for item in messages] if not messages: await ctx.send("No valid messages found!") msg: discord.Message = random.choice(messages) author: discord.abc.User = msg.author embed = discord.Embed( description=msg.content + "\n\n[Jump to message](" + msg.jump_url + ")" ) embed.set_author(name=author.display_name, icon_url=author.avatar_url) embed.set_footer(text="random message from #" + channel.name + " out of " + str(len(messages)) + "messages") await ctx.send(embed=embed) @slash.slash( name="list-messages", description="Lists all messages with the given filtes", options=[ create_option( name="channel", description="The channel to pull the messages from", option_type=SlashCommandOptionType.CHANNEL, required=False ), create_option( name="max_messages", description="The maximum amount of messages to index (max 1024, default 100)", option_type=SlashCommandOptionType.INTEGER, required=False ), create_option( name="reaction", description="Only pull from messages that have this reaction", option_type=str, required=False ) ], guild_ids=slash_guild_ids ) async def list_messages_slash(ctx: SlashContext, channel: Optional[discord.abc.GuildChannel] = None, max_messages: int = 100, reaction: Optional[str] = None): await ctx.defer() if channel is None: channel = ctx.channel messages = async_filter(lambda m: _is_message_valid_for_selection(m, reaction), channel.history(limit=max_messages)) messages = [item async for item in messages] embed = discord.Embed( title="List of matching messages", description="\n".join([m.content for m in messages]) ) await ctx.send(embed=embed) @slash.slash( name="flip-pipelin", description="Toss a pipelin to your favourite teacher and see whether it's schnel or slo" ) async def flip_pipelin_slash(ctx: SlashContext): await ctx.send("Pipelin " + random.choice(['schnel', 'slo'])) # ____ ____ ___ _ _ ____ ____ # / ___| _ \ / _ \| | | | _ \/ ___| # | | _| |_) | | | | | | | |_) \___ \ # | |_| | _ <| |_| | |_| | __/ ___) | # \____|_| \_\\___/ \___/|_| |____/ def get_groups_role_prefix(guild_id: int) -> str: return config_get("groups-role-prefix", guild_id) async def get_groups_category(ctx: SlashContext) -> Optional[discord.CategoryChannel]: category = find_category(ctx.guild, config_get("groups-category", ctx.guild_id)) if category is None: await ctx.send("Failed to locate group category!") return None return category async def get_groups_archive_category(ctx: SlashContext) -> Optional[discord.CategoryChannel]: category = find_category(ctx.guild, config_get("groups-archive-category", ctx.guild_id)) if category is None: await ctx.send("Failed to locate group category!") return None return category async def get_groups_categories(ctx: SlashContext) -> (discord.CategoryChannel, discord.CategoryChannel): groups_cat = await get_groups_category(ctx) if groups_cat is None: return None archive_cat = await get_groups_archive_category(ctx) if archive_cat is None: return None return groups_cat, archive_cat def collect_group_channels(cat: discord.CategoryChannel) -> Dict[str, discord.abc.GuildChannel]: return {channel.name: channel for channel in cat.text_channels} async def collect_group_roles(guild: discord.Guild) -> List[discord.Role]: role_prefix = get_groups_role_prefix(guild.id) return list(filter(lambda role: role.name.startswith(role_prefix), await guild.fetch_roles())) @slash.subcommand( base="groups", name="list", description="Lists all groups on this server", guild_ids=slash_guild_ids ) async def groups_list_slash(ctx: SlashContext): if not await check_slash_context(ctx, require_manage_roles=True): return categories = await get_groups_categories(ctx) if categories is None: return groups_cat, archive_cat = categories active_groups = collect_group_channels(groups_cat) archived_groups = collect_group_channels(archive_cat) role_prefix = get_groups_role_prefix(ctx.guild_id) msg = "" for role in await collect_group_roles(ctx.guild): name = role.name[len(role_prefix):].lower() if name in active_groups: msg += link_channel(active_groups[name]) + "\n" elif name in archived_groups: msg += link_channel(archived_groups[name], True) + "\n" else: msg += "*" + name + "*\n" embed = discord.Embed(title="Groups", description=msg) embed.set_footer(text="Italic groups are archived or unavailable.") await ctx.send(embed=embed) @slash.subcommand( base="groups", name="archive", description="Move a group to the archive", options=[ create_option( name="group_channel", description="The group identified by it's channel", option_type=SlashCommandOptionType.CHANNEL, required=True ) ], guild_ids=slash_guild_ids ) async def groups_archive_slash(ctx: SlashContext, group_channel: discord.abc.GuildChannel): if not await check_slash_context(ctx, require_manage_roles=True): return if not await check_text_channel(ctx, group_channel): return categories = await get_groups_categories(ctx) if categories is None: return groups_cat, archive_cat = categories group_name = group_channel.name groups = collect_group_channels(groups_cat) if group_channel.category == groups_cat: await group_channel.edit(reason="Archive group " + group_name, category=archive_cat) if group_name + "_vc" in groups: await groups[group_name + "_vc"].delete(reason="Archive group " + group_name) await ctx.send("Group " + group_name + " archived.") elif group_channel.category == archive_cat: await ctx.send("Group " + group_name + " is already archived.", hidden=True) else: await ctx.send(group_name + " is not a group") @slash.subcommand( base="groups", name="unarchive", description="Move a group from the archive to the main category", options=[ create_option( name="group_channel", description="The group identified by it's channel", option_type=SlashCommandOptionType.CHANNEL, required=True ) ], guild_ids=slash_guild_ids ) async def groups_archive_slash(ctx: SlashContext, group_channel: discord.abc.GuildChannel): if not await check_slash_context(ctx, require_manage_roles=True): return if not await check_text_channel(ctx, group_channel): return categories = await get_groups_categories(ctx) if categories is None: return groups_cat, archive_cat = categories group_name = group_channel.name if group_channel.category == archive_cat: await ctx.defer() await group_channel.edit(reason="Archive group " + group_name, category=archive_cat) await ctx.send("Group " + group_name + " archived.") elif group_channel.category == groups_cat: await ctx.send("Group " + group_name + " is not archived.", hidden=True) else: await ctx.send(group_name + " is not a group") @slash.subcommand( base="groups", name="delete", description="Move a group from the archive to the main category", options=[ create_option( name="group_channel", description="The group identified by it's channel", option_type=SlashCommandOptionType.CHANNEL, required=True ) ], guild_ids=slash_guild_ids ) async def groups_delete_slash(ctx: SlashContext, group_channel: discord.abc.GuildChannel): if not await check_slash_context(ctx, require_manage_roles=True): return if not await check_text_channel(ctx, group_channel): return group_channel: discord.TextChannel = group_channel group_name = group_channel.name await ctx.defer() role = await find_role_case_insensitive(ctx.guild, group_channel.name, get_groups_role_prefix(ctx.guild_id)) if role is not None: await role.delete(reason="Delete group " + group_name) if group_channel.category: vcs: List[discord.VoiceChannel] = group_channel.category.voice_channels for vc in vcs: if vc.name == group_name + "_vc": await vc.delete(reason="Delete group " + group_name) break await group_channel.delete(reason="Delete group " + group_name) await ctx.send("Group " + group_name + " deleted.") @slash.subcommand( base="groups", name="create", description="Creates a new group channel and role", options=[ create_option( name="name", description="The name of the group", option_type=str, required=True ), create_option( name="members", description="A comma-separated list of members", option_type=str, required=False ) ], guild_ids=slash_guild_ids ) async def groups_create_slash(ctx: SlashContext, name: str, members: Optional[str] = None): if not await check_slash_context(ctx, require_manage_roles=True): return await ctx.defer() name = name.strip() groups_cat = await get_groups_category(ctx) if not groups_cat: return cor = groups_cat.create_text_channel(name.lower(), reason="Create grooup " + name) cor_role = ctx.guild.create_role(name=config_get("groups-role-prefix", ctx.guild_id) + name.lower(), mentionable=True, reason="Create group " + name) channel: discord.TextChannel = await cor cor = channel.edit(sync_permissions=True) role: discord.Role = await cor_role await cor await channel.set_permissions(role, reason="Create group " + name, read_messages=True, manage_threads=True) extra = "" if members is not None: members = members.split(',') converter = MemberConverter() for member_str in members: member_str = member_str.strip() try: member = await converter.convert(ctx, member_str) await member.add_roles(role, reason="Create group " + name) except discord.ext.commands.MemberNotFound: extra += '\n*' + member_str + '*' if extra: extra = '\nCouldn\'t resolve users:' + extra await asyncio.sleep(3) await channel.send("Hi, " + role.mention) await ctx.send("Group " + name + " created." + extra) @slash.subcommand( base="groups", name="remove-vc", description="Removes a temporary voice channel associated with the current group", guild_ids=slash_guild_ids ) async def groups_remove_vc_slash(ctx: SlashContext): if not await check_slash_context(ctx): return groups_cat = await get_groups_category(ctx) if not groups_cat: return if ctx.channel.category == groups_cat: vc: Optional[discord.VoiceChannel] = discord.utils.find( lambda _vc: _vc.name == ctx.channel.name + '_vc', ctx.channel.category.voice_channels ) if vc is None: await ctx.send("Voice chat is not open!", hidden=True) else: await ctx.defer(hidden=True) await vc.delete(reason="Delete temporary voice chat") await ctx.send("Removed temporary voice chat.", hidden=True) else: await ctx.send("This channel is not an active group!", hidden=True) @slash.subcommand( base="groups", name="vc", description="Create a temporary voice chat. Available to anyone.", guild_ids=slash_guild_ids ) async def groups_vc_slash(ctx: SlashContext): if ctx.channel.category is not None: guild: discord.Guild = ctx.guild channel: discord.TextChannel = ctx.channel if channel.category.name.lower() == config_get("groups-category", guild.id).lower(): role = await find_role_case_insensitive(guild, channel.name, config_get("groups-role-prefix", guild.id)) if role is None: await ctx.send("Couldn't resolve group role!", hidden=True) return category: discord.CategoryChannel = channel.category for vc in category.voice_channels: if vc.name.lower() == channel.name.lower() + "_vc": await ctx.send("Temporary group channel already exists.", hidden=True) return await ctx.defer() reason = "Create temporary vc for group " + channel.name vc = await category.create_voice_channel(channel.name + "_vc", reason=reason) await vc.edit(sync_permissions=True, reason=reason) await vc.set_permissions(role, view_channel=True, connect=True, reason=reason) await ctx.send("Created temporary vc for this group") return await ctx.send("Not an active group channel!", hidden=True) # ___ _ _ ___ _____ _____ ____ # / _ \| | | |/ _ \_ _| ____/ ___| # | | | | | | | | | || | | _| \___ \ # | |_| | |_| | |_| || | | |___ ___) | # \__\_\\___/ \___/ |_| |_____|____/ def get_quotes(guild_id: int) -> Dict[str, List[str]]: quotes: Optional[Dict[str, List[str]]] = config_get("guild_quotes", guild=guild_id) if quotes is None: quotes = {} config_set_raw("guild_quotes", quotes, guild_id) return quotes def get_quotes_rolling_index(guild_id: int, author: str) -> Optional[int]: quotes = get_quotes(guild_id) if author not in quotes: return None quotes = quotes[author] rolling_indeces = config_get("guild_quotes_rolling_indeces", guild=guild_id) if rolling_indeces is None: rolling_indeces = {} if author not in rolling_indeces: rolling_indeces[author] = 0 index = rolling_indeces[author] rolling_indeces[author] = (index + 1) % len(rolling_indeces) config_set_raw("guild_quotes_rolling_indeces", rolling_indeces, guild=guild_id) return index async def _do_quote(ctx: SlashContext, author: str, quote: str, tts: bool): text = '\n'.join(map(lambda line: '> ' + line, quote.splitlines())) await ctx.send(text + '\n *~' + author + '*') if tts: voice: discord.VoiceState = ctx.author.voice if voice.channel is not None: source, tts_destroyer = text_to_speech(author + " sagte: " + quote) vp: discord.VoiceClient = await connect_and_play(voice.channel, source) while vp.is_playing() and vp.channel == voice.channel: await asyncio.sleep(0.5) await vp.disconnect() tts_destroyer() @slash.subcommand( base="quotes", name="random", description="Gets a random quote, optionally for the given author", options=[ create_option( name="author", description="The author to filter by", option_type=str, required=False ), create_option( name="tts", description="Text to speech to voice chat", option_type=SlashCommandOptionType.BOOLEAN, required=False ) ], guild_ids=slash_guild_ids ) async def quote_random_slash(ctx: SlashContext, author: Optional[str] = None, tts: bool = False): if not await check_slash_context(ctx, False): return quotes = get_quotes(ctx.guild_id) if author is None: authors = quotes.keys() if authors: author = random.choice([*authors]) else: await ctx.send('No quotes available') return if author not in quotes: await ctx.send('No such author!', hidden=True) return author_quotes = quotes[author] quote = random.choice(author_quotes) await _do_quote(ctx, author, quote, tts) @slash.subcommand( base="quotes", name="tell", description="Recite a quote", options=[ create_option( name="author", description="The author to recite from", option_type=str, required=True ), create_option( name="prefix", description="Get a quote by prefix", option_type=str, required=False ), create_option( name="index", description="Get a quote with a certain index", option_type=SlashCommandOptionType.INTEGER, required=False ), create_option( name="tts", description="Text to speech to voice chat", option_type=SlashCommandOptionType.BOOLEAN, required=False ) ], guild_ids=slash_guild_ids ) async def quote_tell_slash(ctx: SlashContext, author: str, prefix: Optional[str] = None, index: Optional[int] = None, tts: Optional[bool] = None): if not await check_slash_context(ctx, False): return quotes = get_quotes(ctx.guild_id)[author] if index is not None: await _do_quote(ctx, author, quotes[index], tts) return if prefix is not None: for quote in quotes: if quote.startswith(prefix): await _do_quote(ctx, author, quote, tts) return index = get_quotes_rolling_index(ctx.guild_id, author) if index is not None: await _do_quote(ctx, author, quotes[index], tts) @slash.subcommand( base="quotes", name="list", description="Lists all authors or all quotes by a given author", options=[ create_option( name="author", description="The author to filter by, use * for all authors", option_type=str, required=False ) ], guild_ids=slash_guild_ids ) async def quote_list_slash(ctx: SlashContext, author: Optional[str] = None): if not await check_slash_context(ctx, False): return def create_list_embed(title: str, items: Iterable[str]) -> discord.Embed: embed: discord.Embed = discord.Embed( title=title, description='\n'.join(items) ) return embed quotes = get_quotes(ctx.guild_id) if author is None: await ctx.send(embed=create_list_embed("Available authors", quotes.keys())) elif author == '*': coroutines = [] await ctx.defer() for author, author_quotes in quotes.items(): coroutines.append(ctx.channel.send(embed=create_list_embed(author, author_quotes))) await asyncio.gather(*coroutines) await ctx.send(content="That was all.") else: author = author.strip().title() if author in quotes: await ctx.send(embed=create_list_embed(author, quotes[author])) else: await ctx.send("No such author: \"" + author + "\"", hidden=True) @slash.subcommand( base="quotes", name="add", description="Add a new quote", options=[ create_option( name="author", description="The author of the new quote", option_type=str, required=True ), create_option( name="quote", description="The quote to add", option_type=str, required=True ) ], guild_ids=slash_guild_ids ) async def quote_add_slash(ctx: SlashContext, author: str, quote: str): if not await check_slash_context(ctx, False): return ignored_character_regex = re.compile(r'\W') def prepare_quote_for_compare(_quote: str) -> str: return ignored_character_regex.sub('', _quote).lower() quotes = get_quotes(ctx.guild_id) author = author.strip().title() quote = quote.strip() quote = quote[0].upper() + quote[1::] if author in quotes: author_quotes: List[str] = quotes[author] stripped_quote = prepare_quote_for_compare(quote) for aq in author_quotes: if prepare_quote_for_compare(aq) == stripped_quote: await ctx.send("Quote already exists!", hidden=True) return author_quotes.append(quote) config_save() await ctx.send("Quote added.") else: quotes[author] = [quote] config_save() await ctx.send("Author and quote added") @slash.subcommand( base="quotes", name="remove", description="Removes a quote. Requires administrator permissions.", options=[ create_option( name="author", description="The author to delete from", option_type=str, required=True ), create_option( name="quote", description="The exact quote to remove", option_type=str, required=False ), create_option( name="quote_position", description="The quote position to delete", option_type=SlashCommandOptionType.INTEGER, required=False ) ], guild_ids=slash_guild_ids ) async def quote_remove_slash(ctx: SlashContext, author: str, quote: Optional[str] = None, quote_position: Optional[int] = None): if not await check_slash_context(ctx, True): return quotes = get_quotes(ctx.guild_id) if quotes is None: await ctx.send("No quotes available!", hidden=True) else: author = author.strip().title() if author not in quotes: await ctx.send("No quotes from author \"" + author + "\"!", hidden=True) return author_quotes = quotes[author] if quote is not None: quote = quote.strip().lower() for aq in author_quotes: if aq.lower() == quote: author_quotes.remove(aq) if not author_quotes: quotes.pop(author) await ctx.send("Removed quote from author \"" + author + "\"") config_save() return await ctx.send("No such quote found!", hidden=True) elif quote_position is not None: if 0 <= quote_position < len(author_quotes): author_quotes.pop(quote_position) if not author_quotes: quotes.pop(author) await ctx.send("Removed quote from author \"" + author + "\"") config_save() return await ctx.send( "Invalid quote position - must be within the range of 0 and " + str(len(author_quotes)) + "!", hidden=True) else: await ctx.send("Either quote or quote_position must be used!", hidden=True) # ____ ___ _ _ _____ ___ ____ # / ___/ _ \| \ | | ___|_ _/ ___| # | | | | | | \| | |_ | | | _ # | |__| |_| | |\ | _| | | |_| | # \____\___/|_| \_|_| |___\____| @slash.slash( name="config", description="Change or query the configuration of this bot", guild_ids=slash_guild_ids ) async def config_slash(ctx: SlashContext): await ctx.send("missingno", hidden=True) @slash.subcommand( base="config", name="list", description="List available config options", guild_ids=slash_guild_ids ) async def config_list_slash(ctx: SlashContext): msg = 'Available config options:' for (key, value) in config_get_descriptions(): msg += '\n - `' + key + '`: ' + value[1] await ctx.send(msg, hidden=True) @slash.subcommand( base="config", name="get", description="Queries the configuration for a value", options=[ config_option ], guild_ids=slash_guild_ids ) async def config_get_slash(ctx: SlashContext, config_key: str): if not await check_slash_context(ctx, False): return if config_key in config_meta and config_meta[config_key][0]: await ctx.send('`' + config_key + '` is set to `' + str(config_get(config_key, ctx.guild.id)) + '`', hidden=True) else: await ctx.send('Unknown config option `' + config_key + '`', hidden=True) @slash.subcommand( base="config", name="set", description="Changes a configuration entry", options=[ config_option, create_option( name="value", description="The value to set", option_type=str, required=True ) ], guild_ids=slash_guild_ids ) async def config_set_slash(ctx: SlashContext, config_key: str, value: str): if not await check_slash_context(ctx, True): return if ctx.author.guild_permissions.administrator: await ctx.send(config_set(config_key, value, ctx.guild.id), hidden=True) else: await ctx.send('You\'re not allowed to change the configuration on this server!', hidden=True) bot.run(TOKEN)