import asyncio import os import random import re import time from typing import Optional, List, Dict, Iterable import discord from discord.ext import commands from discord.ext.commands import MemberConverter from discord_slash import SlashCommand, SlashContext, SlashCommandOptionType from discord_slash.utils.manage_commands import create_option, create_choice from dotenv import load_dotenv from lib.config import config, config_load, config_save, config_get, config_set, config_get_descriptions, \ config_set_raw, config_meta from lib.utils import async_filter, find_category, find_role_case_insensitive, link_channel, connect_and_play, \ text_to_speech VERSION = "1.2.2" load_dotenv() TOKEN = os.getenv('DISCORD_TOKEN') PING_REGEX = re.compile(r'\b(?:ge)?ping', re.IGNORECASE) TOPFIT_REGEX = re.compile(r'\b(topfit|fit|top|micro|microsoft|virtual reality|vr|ä+h*m*|hä*)\b', re.IGNORECASE) LOEH_REGEX = re.compile(r'Och L(?:oe|ö)h!?', re.IGNORECASE) OCH_ANYONE_REGEX = re.compile(r'^Och\s*(\w.*?)\s*[,.!?:]*$', re.IGNORECASE) TOPFIT_WORDS = ( "äh", "ähm", "ääh", "äääh", "ä", "..", "...", "Mi", "Mic", "Micro", "Microsoft", "Microservices", "top", "vortual reality" ) # LOEH_ID = 327126546970312739 LOEH_ID = 254265844928872448 OWNER_ID = 327126546970312739 OCH_LOEH_SOUND = "assets/och_loeh.mp3" config_load() intents = discord.Intents.default() intents.members = True intents.voice_states = True bot = commands.Bot(command_prefix=config.get('prefix'), intents=intents) slash = SlashCommand(bot, sync_commands=True) if 'LIBOPUS' in os.environ and not len(os.environ['LIBOPUS']) == 0: discord.opus.load_opus(os.environ['LIBOPUS']) async def _get_loeh(guild: discord.Guild) -> Optional[discord.Member]: try: return await guild.fetch_member(LOEH_ID) except discord.NotFound: return None def _get_last_och_time(guild: discord.Guild) -> int: return config_get('last-och-time', guild.id) def _set_last_och_time(guild: discord.Guild): config_set_raw('last-och-time', time.time(), guild.id) @bot.event async def on_ready(): print(f'{bot.user} has connected to Discord!') @bot.event async def on_message(message: discord.Message): if message.guild is not None and not message.author.bot: if config_get('loeh-enable', message.guild.id) and LOEH_REGEX.match(message.content): if message.author.id == LOEH_ID: await message.channel.send("https://stuff.siphalor.de/img/spidy-is-that-you.jpg") else: cooldown = config_get('och-cooldown', message.guild.id) t = time.time() - _get_last_och_time(message.guild) if t <= cooldown: await message.channel.send( '429: Too many requests - Wait ' + str(int(cooldown - t)) + ' more seconds!') return _set_last_och_time(message.guild) loeh = await _get_loeh(message.guild) if loeh is None: await message.channel.send('404: Löh not found!') elif loeh.voice is None: await message.channel.send('400: Löh not connected!') else: voice: discord.VoiceState = loeh.voice try: voice_channel: discord.VoiceChannel = voice.channel source = discord.PCMVolumeTransformer(discord.FFmpegPCMAudio(source=OCH_LOEH_SOUND)) voice_protocol = await connect_and_play(voice_channel, source=source) await loeh.edit(mute=True) sleeper = asyncio.sleep(config_get('och-timeout', message.guild.id)) message: Optional[discord.Message] = await message.channel.send('Zu Befehl!') await sleeper await loeh.edit(mute=False) if message is not None: await message.edit(content="~~Zu Befehl!~~\nEs sei ihm verziehen.") if type(voice_protocol) is discord.VoiceClient: await voice_protocol.disconnect(force=True) except (asyncio.TimeoutError, discord.Forbidden, discord.HTTPException, discord.ClientException): await message.channel.send('Failed to complete your command, Sir') return elif config_get('och-anyone-enable', message.guild.id) and ( match := OCH_ANYONE_REGEX.search(message.content)) is not None: cooldown = config_get('och-cooldown', message.guild.id) t = time.time() - _get_last_och_time(message.guild) if t <= cooldown: await message.channel.send('429: Too many requests - Wait ' + str(int(cooldown - t)) + ' more seconds!') return _set_last_och_time(message.guild) guild: discord.Guild = message.guild bot_member: discord.Member = guild.get_member(bot.user.id) search: str = match.group(1).lower() matches: list = [] voice: Optional[discord.VoiceChannel] = None async for member in guild.fetch_members(limit=100): if not member.bot and member.voice is not None and search in member.display_name.lower().split(' '): perms: discord.Permissions = member.voice.channel.permissions_for(bot_member) if perms.connect and perms.speak: voice = member.voice.channel matches.append(member) if matches and voice is not None: source, destroy_tts = text_to_speech(message.content) voice_protocol: Optional[discord.VoiceProtocol] = await connect_and_play(voice, source) async def _mute(m: discord.Member): await m.edit(mute=True) async def _unmute(m: discord.Member): await m.edit(mute=False) await asyncio.gather(*map(_mute, matches)) sleeper = asyncio.sleep(config_get('och-timeout', message.guild.id)) message: Optional[discord.Message] = await message.channel.send('Auf gehts!') await sleeper await asyncio.gather(*map(_unmute, matches)) waiter = None if message is not None: waiter = message.edit(content='~~Auf gehts!~~\nGeschafft!') if type(voice_protocol) is discord.VoiceClient: await voice_protocol.disconnect(force=True) if waiter is not None: await waiter destroy_tts() else: await message.channel.send('404: No users found!') elif config_get('inf19x-insiders-enable', message.guild.id): if PING_REGEX.search(message.content): embed = discord.Embed( title="*pinken, schwaches Verb*", description="ein Netzwerkgerät testweise ansprechen.\nOft falsch geschrieben als pin__g__en", color=16636435 ) embed.add_field(name='ich', value='pinke') embed.add_field(name='du', value='pinkst') embed.add_field(name='er|sie|es', value='pinkt') embed.add_field(name='wir', value='pinken') embed.add_field(name='ihr', value='pinkt') embed.add_field(name='sie', value='pinken') embed.add_field(name='Partizip 2', value='gepinkt') await message.channel.send(embed=embed) else: match = TOPFIT_REGEX.search(message.content) if match is not None: text = '' for i in range(0, random.randint(7, 13)): text += random.choice(TOPFIT_WORDS) + " " text += match.group(1) await message.channel.send(text) await bot.process_commands(message) @bot.event async def on_voice_state_update(member: discord.Member, before: discord.VoiceState, after: discord.VoiceState): if after.channel is None and before.channel is not None: channel: discord.VoiceChannel = before.channel if not channel.voice_states: if channel.category is not None and channel.name.endswith("_vc"): if channel.category.name.lower() == config_get("groups-category", channel.guild.id).lower(): name: str = channel.name[:-3] role = await find_role_case_insensitive(channel.guild, name, config_get("groups-role-prefix", channel.guild.id)) if role is not None: await channel.delete(reason="Delete temporary group channel when last person left") def _is_message_valid_for_selection(message: discord.Message, reaction_filter: Optional[str] = None) -> bool: if message.clean_content.strip() == '': return False for reaction in message.reactions: if reaction.emoji == '❌': return False if reaction_filter is not None: for reaction in message.reactions: if type(reaction.emoji) is str: if reaction.emoji == reaction_filter: return True elif type(reaction.emoji) is discord.Emoji or type(reaction.emoji) is discord.PartialEmoji: if ':' + reaction.emoji.name + ':' == reaction_filter: return True return False return True @bot.command(name='config', brief='Change the configuration of this bot') async def config_prefix_command(ctx: commands.Context, cmd: str = '', key: str = '', *, val: str = ''): if ctx.guild is None: await ctx.send('You can\'t run config commands in private messages!') return if cmd == '': await ctx.send('Use `' + bot.command_prefix + 'config get` or `' + bot.command_prefix + 'config set` to query for or update config values. Use `' + bot.command_prefix + 'config list` to see all config options') elif cmd == 'list': msg = 'Available config options:' for (key, value) in config_get_descriptions(): msg += '\n - `' + key + '`: ' + value[1] await ctx.send(msg) elif cmd == 'get': if key in config: await ctx.send('`' + key + '` is set to `' + str(config_get(key, ctx.guild.id)) + '`') else: await ctx.send('Unknown config option `' + key + '`') elif cmd == 'set': if ctx.author.guild_permissions.administrator: await ctx.send(config_set(key, val, ctx.guild.id)) else: await ctx.send('You\'re not allowed to change the configuration on this server!') elif cmd == 'get-global': if key in config: await ctx.send('`' + key + '` is globally set to `' + str(config_get(key)) + '`') else: await ctx.send('Unknown config option `' + key + '`') elif cmd == 'set-global': if ctx.author.id == OWNER_ID: msg = config_set(key, val) if key == 'prefix': bot.command_prefix = config_get('prefix') await ctx.send(msg) else: await ctx.send('You\'re not allowed to change the global configuration!') else: await ctx.send('Unknown command!') # ------------------- The slashy way of life ------------------- # slash_guild_ids = None config_choices = [create_choice(name=entry[1][1], value=entry[0]) for entry in config_meta.items() if entry[1][0]] config_option = create_option("config_key", "A key identifying the config entry to target", str, True, config_choices) async def check_slash_context(ctx: SlashContext, require_op: bool = False, require_manage_roles: bool = False) -> bool: if ctx.guild is None: await ctx.send('You can\'t run config commands in private messages!') return False if require_op and not ctx.author.guild_permissions.administrator: await ctx.send('You\'re not allowed to run this command on this server!', hidden=True) return False if require_manage_roles and not ctx.author.guild_permissions.manage_roles: await ctx.send('You\'re not allowed to run this command on this server!', hidden=True) return False return True async def check_text_channel(ctx: SlashContext, channel) -> bool: if not isinstance(channel, discord.TextChannel): await ctx.send('The given channel is not a text channel!', hidden=True) return False return True # __ __ ___ ____ ____ # | \/ |_ _/ ___| / ___| # | |\/| || |\___ \| | # | | | || | ___) | |___ # |_| |_|___|____/ \____| @slash.slash( name="about", description="Shows information about this bot", guild_ids=slash_guild_ids ) async def about_slash(ctx: SlashContext): embed = discord.Embed( color=discord.Colour.from_rgb(200, 0, 0), title="About Och bot", description="The Och bot is a special bot created by Siphalor filled with features and inside jokes of his " "university group.", ) embed.add_field(name="version", value=VERSION) await ctx.send(embed=embed, hidden=True) @slash.slash( name="random-message", description="Select a random message from a channel", options=[ create_option( name="channel", description="The channel to pull the messages from", option_type=SlashCommandOptionType.CHANNEL, required=False ), create_option( name="max_messages", description="The maximum amount of messages to index (max 1024, default 100)", option_type=SlashCommandOptionType.INTEGER, required=False ), create_option( name="reaction", description="Only pull from messages that have this reaction", option_type=str, required=False ) ], guild_ids=slash_guild_ids ) async def random_message_slash(ctx: SlashContext, channel: Optional[discord.abc.GuildChannel] = None, max_messages: int = 100, reaction: Optional[str] = None): await ctx.defer() if channel is None: channel = ctx.channel messages = async_filter(lambda m: _is_message_valid_for_selection(m, reaction), channel.history(limit=max_messages)) messages = [item async for item in messages] if not messages: await ctx.send("No valid messages found!") msg: discord.Message = random.choice(messages) author: discord.abc.User = msg.author embed = discord.Embed( description=msg.content + "\n\n[Jump to message](" + msg.jump_url + ")" ) embed.set_author(name=author.display_name, icon_url=author.avatar_url) embed.set_footer(text="random message from #" + channel.name + " out of " + str(len(messages)) + "messages") await ctx.send(embed=embed) @slash.slash( name="list-messages", description="Lists all messages with the given filtes", options=[ create_option( name="channel", description="The channel to pull the messages from", option_type=SlashCommandOptionType.CHANNEL, required=False ), create_option( name="max_messages", description="The maximum amount of messages to index (max 1024, default 100)", option_type=SlashCommandOptionType.INTEGER, required=False ), create_option( name="reaction", description="Only pull from messages that have this reaction", option_type=str, required=False ) ], guild_ids=slash_guild_ids ) async def list_messages_slash(ctx: SlashContext, channel: Optional[discord.abc.GuildChannel] = None, max_messages: int = 100, reaction: Optional[str] = None): await ctx.defer() if channel is None: channel = ctx.channel messages = async_filter(lambda m: _is_message_valid_for_selection(m, reaction), channel.history(limit=max_messages)) messages = [item async for item in messages] embed = discord.Embed( title="List of matching messages", description="\n".join([m.content for m in messages]) ) await ctx.send(embed=embed) @slash.slash( name="flip-pipelin", description="Toss a pipelin to your favourite teacher and see whether it's schnel or slo" ) async def flip_pipelin_slash(ctx: SlashContext): await ctx.send("Pipelin " + random.choice(['schnel', 'slo'])) # ____ ____ ___ _ _ ____ ____ # / ___| _ \ / _ \| | | | _ \/ ___| # | | _| |_) | | | | | | | |_) \___ \ # | |_| | _ <| |_| | |_| | __/ ___) | # \____|_| \_\\___/ \___/|_| |____/ def get_groups_role_prefix(guild_id: int) -> str: return config_get("groups-role-prefix", guild_id) async def get_groups_category(ctx: SlashContext) -> Optional[discord.CategoryChannel]: category = find_category(ctx.guild, config_get("groups-category", ctx.guild_id)) if category is None: await ctx.send("Failed to locate group category!") return None return category async def get_groups_archive_category(ctx: SlashContext) -> Optional[discord.CategoryChannel]: category = find_category(ctx.guild, config_get("groups-archive-category", ctx.guild_id)) if category is None: await ctx.send("Failed to locate group category!") return None return category async def get_groups_categories(ctx: SlashContext) -> (discord.CategoryChannel, discord.CategoryChannel): groups_cat = await get_groups_category(ctx) if groups_cat is None: return None archive_cat = await get_groups_archive_category(ctx) if archive_cat is None: return None return groups_cat, archive_cat def collect_group_channels(cat: discord.CategoryChannel) -> Dict[str, discord.abc.GuildChannel]: return {channel.name: channel for channel in cat.text_channels} async def collect_group_roles(guild: discord.Guild) -> List[discord.Role]: role_prefix = get_groups_role_prefix(guild.id) return list(filter(lambda role: role.name.startswith(role_prefix), await guild.fetch_roles())) @slash.subcommand( base="groups", name="list", description="Lists all groups on this server", guild_ids=slash_guild_ids ) async def groups_list_slash(ctx: SlashContext): if not await check_slash_context(ctx, require_manage_roles=True): return categories = await get_groups_categories(ctx) if categories is None: return groups_cat, archive_cat = categories active_groups = collect_group_channels(groups_cat) archived_groups = collect_group_channels(archive_cat) role_prefix = get_groups_role_prefix(ctx.guild_id) msg = "" for role in await collect_group_roles(ctx.guild): name = role.name[len(role_prefix):].lower() if name in active_groups: msg += link_channel(active_groups[name]) + "\n" elif name in archived_groups: msg += link_channel(archived_groups[name], True) + "\n" else: msg += "*" + name + "*\n" embed = discord.Embed(title="Groups", description=msg) embed.set_footer(text="Italic groups are archived or unavailable.") await ctx.send(embed=embed) @slash.subcommand( base="groups", name="archive", description="Move a group to the archive", options=[ create_option( name="group_channel", description="The group identified by it's channel", option_type=SlashCommandOptionType.CHANNEL, required=True ) ], guild_ids=slash_guild_ids ) async def groups_archive_slash(ctx: SlashContext, group_channel: discord.abc.GuildChannel): if not await check_slash_context(ctx, require_manage_roles=True): return if not await check_text_channel(ctx, group_channel): return categories = await get_groups_categories(ctx) if categories is None: return groups_cat, archive_cat = categories group_name = group_channel.name groups = collect_group_channels(groups_cat) if group_channel.category == groups_cat: await group_channel.edit(reason="Archive group " + group_name, category=archive_cat) if group_name + "_vc" in groups: await groups[group_name + "_vc"].delete(reason="Archive group " + group_name) await ctx.send("Group " + group_name + " archived.") elif group_channel.category == archive_cat: await ctx.send("Group " + group_name + " is already archived.", hidden=True) else: await ctx.send(group_name + " is not a group") @slash.subcommand( base="groups", name="unarchive", description="Move a group from the archive to the main category", options=[ create_option( name="group_channel", description="The group identified by it's channel", option_type=SlashCommandOptionType.CHANNEL, required=True ) ], guild_ids=slash_guild_ids ) async def groups_archive_slash(ctx: SlashContext, group_channel: discord.abc.GuildChannel): if not await check_slash_context(ctx, require_manage_roles=True): return if not await check_text_channel(ctx, group_channel): return categories = await get_groups_categories(ctx) if categories is None: return groups_cat, archive_cat = categories group_name = group_channel.name if group_channel.category == archive_cat: await ctx.defer() await group_channel.edit(reason="Archive group " + group_name, category=archive_cat) await ctx.send("Group " + group_name + " archived.") elif group_channel.category == groups_cat: await ctx.send("Group " + group_name + " is not archived.", hidden=True) else: await ctx.send(group_name + " is not a group") @slash.subcommand( base="groups", name="delete", description="Move a group from the archive to the main category", options=[ create_option( name="group_channel", description="The group identified by it's channel", option_type=SlashCommandOptionType.CHANNEL, required=True ) ], guild_ids=slash_guild_ids ) async def groups_delete_slash(ctx: SlashContext, group_channel: discord.abc.GuildChannel): if not await check_slash_context(ctx, require_manage_roles=True): return if not await check_text_channel(ctx, group_channel): return group_channel: discord.TextChannel = group_channel group_name = group_channel.name await ctx.defer() role = await find_role_case_insensitive(ctx.guild, group_channel.name, get_groups_role_prefix(ctx.guild_id)) if role is not None: await role.delete(reason="Delete group " + group_name) if group_channel.category: vcs: List[discord.VoiceChannel] = group_channel.category.voice_channels for vc in vcs: if vc.name == group_name + "_vc": await vc.delete(reason="Delete group " + group_name) break await group_channel.delete(reason="Delete group " + group_name) await ctx.send("Group " + group_name + " deleted.") @slash.subcommand( base="groups", name="create", description="Creates a new group channel and role", options=[ create_option( name="name", description="The name of the group", option_type=str, required=True ), create_option( name="members", description="A comma-separated list of members", option_type=str, required=False ) ], guild_ids=slash_guild_ids ) async def groups_create_slash(ctx: SlashContext, name: str, members: Optional[str] = None): if not await check_slash_context(ctx, require_manage_roles=True): return await ctx.defer() name = name.strip() groups_cat = await get_groups_category(ctx) if not groups_cat: return cor = groups_cat.create_text_channel(name.lower(), reason="Create grooup " + name) cor_role = ctx.guild.create_role(name=config_get("groups-role-prefix", ctx.guild_id) + name.lower(), mentionable=True, reason="Create group " + name) channel: discord.TextChannel = await cor cor = channel.edit(sync_permissions=True) role: discord.Role = await cor_role await cor await channel.set_permissions(role, reason="Create group " + name, read_messages=True) extra = "" if members is not None: members = members.split(',') converter = MemberConverter() for member_str in members: member_str = member_str.strip() try: member = await converter.convert(ctx, member_str) await member.add_roles(role, reason="Create group " + name) except discord.ext.commands.MemberNotFound: extra += '\n*' + member_str + '*' if extra: extra = '\nCouldn\'t resolve users:' + extra await asyncio.sleep(3) await channel.send("Hi, " + role.mention) await ctx.send("Group " + name + " created." + extra) @slash.subcommand( base="groups", name="remove-vc", description="Removes a temporary voice channel associated with the current group", guild_ids=slash_guild_ids ) async def groups_remove_vc_slash(ctx: SlashContext): if not await check_slash_context(ctx): return groups_cat = await get_groups_category(ctx) if not groups_cat: return if ctx.channel.category == groups_cat: vc: Optional[discord.VoiceChannel] = discord.utils.find( lambda _vc: _vc.name == ctx.channel.name + '_vc', ctx.channel.category.voice_channels ) if vc is None: await ctx.send("Voice chat is not open!", hidden=True) else: await ctx.defer(hidden=True) await vc.delete(reason="Delete temporary voice chat") await ctx.send("Removed temporary voice chat.", hidden=True) else: await ctx.send("This channel is not an active group!", hidden=True) @slash.subcommand( base="groups", name="vc", description="Create a temporary voice chat. Available to anyone.", guild_ids=slash_guild_ids ) async def groups_vc_slash(ctx: SlashContext): if ctx.channel.category is not None: guild: discord.Guild = ctx.guild channel: discord.TextChannel = ctx.channel if channel.category.name.lower() == config_get("groups-category", guild.id).lower(): role = await find_role_case_insensitive(guild, channel.name, config_get("groups-role-prefix", guild.id)) if role is None: await ctx.send("Couldn't resolve group role!", hidden=True) return category: discord.CategoryChannel = channel.category for vc in category.voice_channels: if vc.name.lower() == channel.name.lower() + "_vc": await ctx.send("Temporary group channel already exists.", hidden=True) return await ctx.defer() reason = "Create temporary vc for group " + channel.name vc = await category.create_voice_channel(channel.name + "_vc", reason=reason) await vc.edit(sync_permissions=True, reason=reason) await vc.set_permissions(role, view_channel=True, connect=True, reason=reason) await ctx.send("Created temporary vc for this group") return await ctx.send("Not an active group channel!", hidden=True) # ___ _ _ ___ _____ _____ ____ # / _ \| | | |/ _ \_ _| ____/ ___| # | | | | | | | | | || | | _| \___ \ # | |_| | |_| | |_| || | | |___ ___) | # \__\_\\___/ \___/ |_| |_____|____/ def get_quotes(guild_id: int) -> Dict[str, List[str]]: quotes: Optional[Dict[str, List[str]]] = config_get("guild_quotes", guild=guild_id) if quotes is None: quotes = {} config_set_raw("guild_quotes", quotes, guild_id) return quotes def get_quotes_rolling_index(guild_id: int, author: str) -> Optional[int]: quotes = get_quotes(guild_id) if author not in quotes: return None quotes = quotes[author] rolling_indeces = config_get("guild_quotes_rolling_indeces", guild=guild_id) if rolling_indeces is None: rolling_indeces = {} if author not in rolling_indeces: rolling_indeces[author] = 0 index = rolling_indeces[author] rolling_indeces[author] = (index + 1) % len(quotes) config_set_raw("guild_quotes_rolling_indeces", rolling_indeces, guild=guild_id) return index async def _do_quote(ctx: SlashContext, author: str, quote: str, tts: bool): text = '\n'.join(map(lambda line: '> ' + line, quote.splitlines())) await ctx.send(text + '\n *~' + author + '*') if tts: voice: discord.VoiceState = ctx.author.voice if voice.channel is not None: source, tts_destroyer = text_to_speech(author + " sagte: " + quote) vp: discord.VoiceClient = await connect_and_play(voice.channel, source) while vp.is_playing() and vp.channel == voice.channel: await asyncio.sleep(0.5) await vp.disconnect() tts_destroyer() @slash.subcommand( base="quotes", name="random", description="Gets a random quote, optionally for the given author", options=[ create_option( name="author", description="The author to filter by", option_type=str, required=False ), create_option( name="tts", description="Text to speech to voice chat", option_type=SlashCommandOptionType.BOOLEAN, required=False ) ], guild_ids=slash_guild_ids ) async def quote_random_slash(ctx: SlashContext, author: Optional[str] = None, tts: bool = False): if not await check_slash_context(ctx, False): return quotes = get_quotes(ctx.guild_id) if author is None: authors = quotes.keys() if authors: author = random.choice([*authors]) else: await ctx.send('No quotes available') return if author not in quotes: await ctx.send('No such author!', hidden=True) return author_quotes = quotes[author] quote = random.choice(author_quotes) await _do_quote(ctx, author, quote, tts) @slash.subcommand( base="quotes", name="tell", description="Recite a quote", options=[ create_option( name="author", description="The author to recite from", option_type=str, required=True ), create_option( name="prefix", description="Get a quote by prefix", option_type=str, required=False ), create_option( name="index", description="Get a quote with a certain index", option_type=SlashCommandOptionType.INTEGER, required=False ), create_option( name="tts", description="Text to speech to voice chat", option_type=SlashCommandOptionType.BOOLEAN, required=False ) ], guild_ids=slash_guild_ids ) async def quote_tell_slash(ctx: SlashContext, author: str, prefix: Optional[str] = None, index: Optional[int] = None, tts: Optional[bool] = None): if not await check_slash_context(ctx, False): return quotes = get_quotes(ctx.guild_id)[author] if index is not None: await _do_quote(ctx, author, quotes[index], tts) return if prefix is not None: for quote in quotes: if quote.lower().startswith(prefix.lower()): await _do_quote(ctx, author, quote, tts) return index = get_quotes_rolling_index(ctx.guild_id, author) if index is not None: await _do_quote(ctx, author, quotes[index], tts) @slash.subcommand( base="quotes", name="list", description="Lists all authors or all quotes by a given author", options=[ create_option( name="author", description="The author to filter by, use * for all authors", option_type=str, required=False ) ], guild_ids=slash_guild_ids ) async def quote_list_slash(ctx: SlashContext, author: Optional[str] = None): if not await check_slash_context(ctx, False): return def create_list_embed(title: str, items: Iterable[str]) -> discord.Embed: embed: discord.Embed = discord.Embed( title=title, description='\n'.join(items) ) return embed quotes = get_quotes(ctx.guild_id) if author is None: await ctx.send(embed=create_list_embed("Available authors", quotes.keys())) elif author == '*': coroutines = [] await ctx.defer() for author, author_quotes in quotes.items(): coroutines.append(ctx.channel.send(embed=create_list_embed(author, author_quotes))) await asyncio.gather(*coroutines) await ctx.send(content="That was all.") else: author = author.strip().title() if author in quotes: await ctx.send(embed=create_list_embed(author, quotes[author])) else: await ctx.send("No such author: \"" + author + "\"", hidden=True) @slash.subcommand( base="quotes", name="add", description="Add a new quote", options=[ create_option( name="author", description="The author of the new quote", option_type=str, required=True ), create_option( name="quote", description="The quote to add", option_type=str, required=True ) ], guild_ids=slash_guild_ids ) async def quote_add_slash(ctx: SlashContext, author: str, quote: str): if not await check_slash_context(ctx, False): return ignored_character_regex = re.compile(r'\W') def prepare_quote_for_compare(_quote: str) -> str: return ignored_character_regex.sub('', _quote).lower() quotes = get_quotes(ctx.guild_id) author = author.strip().title() quote = quote.strip() quote = quote[0].upper() + quote[1::] if author in quotes: author_quotes: List[str] = quotes[author] stripped_quote = prepare_quote_for_compare(quote) for aq in author_quotes: if prepare_quote_for_compare(aq) == stripped_quote: await ctx.send("Quote already exists!", hidden=True) return author_quotes.append(quote) config_save() await ctx.send("Quote added.") else: quotes[author] = [quote] config_save() await ctx.send("Author and quote added") @slash.subcommand( base="quotes", name="remove", description="Removes a quote. Requires administrator permissions.", options=[ create_option( name="author", description="The author to delete from", option_type=str, required=True ), create_option( name="quote", description="The exact quote to remove", option_type=str, required=False ), create_option( name="quote_position", description="The quote position to delete", option_type=SlashCommandOptionType.INTEGER, required=False ) ], guild_ids=slash_guild_ids ) async def quote_remove_slash(ctx: SlashContext, author: str, quote: Optional[str] = None, quote_position: Optional[int] = None): if not await check_slash_context(ctx, True): return quotes = get_quotes(ctx.guild_id) if quotes is None: await ctx.send("No quotes available!", hidden=True) else: author = author.strip().title() if author not in quotes: await ctx.send("No quotes from author \"" + author + "\"!", hidden=True) return author_quotes = quotes[author] if quote is not None: quote = quote.strip().lower() for aq in author_quotes: if aq.lower() == quote: author_quotes.remove(aq) if not author_quotes: quotes.pop(author) await ctx.send("Removed quote from author \"" + author + "\"") config_save() return await ctx.send("No such quote found!", hidden=True) elif quote_position is not None: if 0 <= quote_position < len(author_quotes): author_quotes.pop(quote_position) if not author_quotes: quotes.pop(author) await ctx.send("Removed quote from author \"" + author + "\"") config_save() return await ctx.send( "Invalid quote position - must be within the range of 0 and " + str(len(author_quotes)) + "!", hidden=True) else: await ctx.send("Either quote or quote_position must be used!", hidden=True) # ____ ___ _ _ _____ ___ ____ # / ___/ _ \| \ | | ___|_ _/ ___| # | | | | | | \| | |_ | | | _ # | |__| |_| | |\ | _| | | |_| | # \____\___/|_| \_|_| |___\____| @slash.slash( name="config", description="Change or query the configuration of this bot", guild_ids=slash_guild_ids ) async def config_slash(ctx: SlashContext): await ctx.send("missingno", hidden=True) @slash.subcommand( base="config", name="list", description="List available config options", guild_ids=slash_guild_ids ) async def config_list_slash(ctx: SlashContext): msg = 'Available config options:' for (key, value) in config_get_descriptions(): msg += '\n - `' + key + '`: ' + value[1] await ctx.send(msg, hidden=True) @slash.subcommand( base="config", name="get", description="Queries the configuration for a value", options=[ config_option ], guild_ids=slash_guild_ids ) async def config_get_slash(ctx: SlashContext, config_key: str): if not await check_slash_context(ctx, False): return if config_key in config_meta and config_meta[config_key][0]: await ctx.send('`' + config_key + '` is set to `' + str(config_get(config_key, ctx.guild.id)) + '`', hidden=True) else: await ctx.send('Unknown config option `' + config_key + '`', hidden=True) @slash.subcommand( base="config", name="set", description="Changes a configuration entry", options=[ config_option, create_option( name="value", description="The value to set", option_type=str, required=True ) ], guild_ids=slash_guild_ids ) async def config_set_slash(ctx: SlashContext, config_key: str, value: str): if not await check_slash_context(ctx, True): return if ctx.author.guild_permissions.administrator: await ctx.send(config_set(config_key, value, ctx.guild.id), hidden=True) else: await ctx.send('You\'re not allowed to change the configuration on this server!', hidden=True) bot.run(TOKEN)