Files
och-bot/bot.py
2021-10-11 21:40:18 +02:00

1125 lines
39 KiB
Python

import asyncio
import os
import random
import re
import time
from typing import Optional, List, Dict, Iterable
import discord
from discord.ext import commands
from discord.ext.commands import MemberConverter
from discord_slash import SlashCommand, SlashContext, SlashCommandOptionType
from discord_slash.utils.manage_commands import create_option, create_choice
from dotenv import load_dotenv
from lib.config import config, config_load, config_save, config_get, config_set, config_get_descriptions, \
config_set_raw, config_meta
from lib.utils import async_filter, find_category, find_role_case_insensitive, link_channel, connect_and_play, \
text_to_speech
VERSION = "1.2.3"
load_dotenv()
TOKEN = os.getenv('DISCORD_TOKEN')
PING_REGEX = re.compile(r'\b(?:ge)?ping', re.IGNORECASE)
TOPFIT_REGEX = re.compile(r'\b(topfit|fit|top|micro|microsoft|virtual reality|vr|ä+h*m*|hä*)\b', re.IGNORECASE)
LOEH_REGEX = re.compile(r'Och L(?:oe|ö)h!?', re.IGNORECASE)
OCH_ANYONE_REGEX = re.compile(r'^Och\s*(\w.*?)\s*[,.!?:]*$', re.IGNORECASE)
TOPFIT_WORDS = (
"äh",
"ähm",
"ääh",
"äääh",
"ä",
"..",
"...",
"Mi",
"Mic",
"Micro",
"Microsoft",
"Microservices",
"top",
"vortual reality"
)
# LOEH_ID = 327126546970312739
LOEH_ID = 254265844928872448
OWNER_ID = 327126546970312739
OCH_LOEH_SOUND = "assets/och_loeh.mp3"
config_load()
intents = discord.Intents.default()
intents.members = True
intents.voice_states = True
bot = commands.Bot(command_prefix=config.get('prefix'), intents=intents)
slash = SlashCommand(bot, sync_commands=True)
if 'LIBOPUS' in os.environ and not len(os.environ['LIBOPUS']) == 0:
discord.opus.load_opus(os.environ['LIBOPUS'])
async def _get_loeh(guild: discord.Guild) -> Optional[discord.Member]:
try:
return await guild.fetch_member(LOEH_ID)
except discord.NotFound:
return None
def _get_last_och_time(guild: discord.Guild) -> int:
return config_get('last-och-time', guild.id)
def _set_last_och_time(guild: discord.Guild):
config_set_raw('last-och-time', time.time(), guild.id)
@bot.event
async def on_ready():
print(f'{bot.user} has connected to Discord!')
@bot.event
async def on_message(message: discord.Message):
if message.guild is not None and not message.author.bot:
if config_get('loeh-enable', message.guild.id) and LOEH_REGEX.match(message.content):
if message.author.id == LOEH_ID:
await message.channel.send("https://stuff.siphalor.de/img/spidy-is-that-you.jpg")
else:
cooldown = config_get('och-cooldown', message.guild.id)
t = time.time() - _get_last_och_time(message.guild)
if t <= cooldown:
await message.channel.send(
'429: Too many requests - Wait ' + str(int(cooldown - t)) + ' more seconds!')
return
_set_last_och_time(message.guild)
loeh = await _get_loeh(message.guild)
if loeh is None:
await message.channel.send('404: Löh not found!')
elif loeh.voice is None:
await message.channel.send('400: Löh not connected!')
else:
voice: discord.VoiceState = loeh.voice
try:
voice_channel: discord.VoiceChannel = voice.channel
source = discord.PCMVolumeTransformer(discord.FFmpegPCMAudio(source=OCH_LOEH_SOUND))
voice_protocol = await connect_and_play(voice_channel, source=source)
await loeh.edit(mute=True)
sleeper = asyncio.sleep(config_get('och-timeout', message.guild.id))
message: Optional[discord.Message] = await message.channel.send('Zu Befehl!')
await sleeper
await loeh.edit(mute=False)
if message is not None:
await message.edit(content="~~Zu Befehl!~~\nEs sei ihm verziehen.")
if type(voice_protocol) is discord.VoiceClient:
await voice_protocol.disconnect(force=True)
except (asyncio.TimeoutError, discord.Forbidden, discord.HTTPException, discord.ClientException):
await message.channel.send('Failed to complete your command, Sir')
return
elif config_get('och-anyone-enable', message.guild.id) and (
match := OCH_ANYONE_REGEX.search(message.content)) is not None:
cooldown = config_get('och-cooldown', message.guild.id)
t = time.time() - _get_last_och_time(message.guild)
if t <= cooldown:
await message.channel.send('429: Too many requests - Wait ' + str(int(cooldown - t)) + ' more seconds!')
return
_set_last_och_time(message.guild)
guild: discord.Guild = message.guild
bot_member: discord.Member = guild.get_member(bot.user.id)
search: str = match.group(1).lower()
matches: list = []
voice: Optional[discord.VoiceChannel] = None
async for member in guild.fetch_members(limit=100):
if not member.bot and member.voice is not None and search in member.display_name.lower().split(' '):
perms: discord.Permissions = member.voice.channel.permissions_for(bot_member)
if perms.connect and perms.speak:
voice = member.voice.channel
matches.append(member)
if matches and voice is not None:
source, destroy_tts = text_to_speech(message.content)
voice_protocol: Optional[discord.VoiceProtocol] = await connect_and_play(voice, source)
async def _mute(m: discord.Member):
await m.edit(mute=True)
async def _unmute(m: discord.Member):
await m.edit(mute=False)
await asyncio.gather(*map(_mute, matches))
sleeper = asyncio.sleep(config_get('och-timeout', message.guild.id))
message: Optional[discord.Message] = await message.channel.send('Auf gehts!')
await sleeper
await asyncio.gather(*map(_unmute, matches))
waiter = None
if message is not None:
waiter = message.edit(content='~~Auf gehts!~~\nGeschafft!')
if type(voice_protocol) is discord.VoiceClient:
await voice_protocol.disconnect(force=True)
if waiter is not None:
await waiter
destroy_tts()
else:
await message.channel.send('404: No users found!')
elif config_get('inf19x-insiders-enable', message.guild.id):
if PING_REGEX.search(message.content):
embed = discord.Embed(
title="*pinken, schwaches Verb*",
description="ein Netzwerkgerät testweise ansprechen.\nOft falsch geschrieben als pin__g__en",
color=16636435
)
embed.add_field(name='ich', value='pinke')
embed.add_field(name='du', value='pinkst')
embed.add_field(name='er|sie|es', value='pinkt')
embed.add_field(name='wir', value='pinken')
embed.add_field(name='ihr', value='pinkt')
embed.add_field(name='sie', value='pinken')
embed.add_field(name='Partizip 2', value='gepinkt')
await message.channel.send(embed=embed)
else:
match = TOPFIT_REGEX.search(message.content)
if match is not None:
text = ''
for i in range(0, random.randint(7, 13)):
text += random.choice(TOPFIT_WORDS) + " "
text += match.group(1)
await message.channel.send(text)
await bot.process_commands(message)
@bot.event
async def on_voice_state_update(member: discord.Member, before: discord.VoiceState, after: discord.VoiceState):
if after.channel is None and before.channel is not None:
channel: discord.VoiceChannel = before.channel
if not channel.voice_states:
if channel.category is not None and channel.name.endswith("_vc"):
if channel.category.name.lower() == config_get("groups-category", channel.guild.id).lower():
name: str = channel.name[:-3]
role = await find_role_case_insensitive(channel.guild, name,
config_get("groups-role-prefix", channel.guild.id))
if role is not None:
await channel.delete(reason="Delete temporary group channel when last person left")
def _is_message_valid_for_selection(message: discord.Message, reaction_filter: Optional[str] = None) -> bool:
if message.clean_content.strip() == '':
return False
for reaction in message.reactions:
if reaction.emoji == '':
return False
if reaction_filter is not None:
for reaction in message.reactions:
if type(reaction.emoji) is str:
if reaction.emoji == reaction_filter:
return True
elif type(reaction.emoji) is discord.Emoji or type(reaction.emoji) is discord.PartialEmoji:
if ':' + reaction.emoji.name + ':' == reaction_filter:
return True
return False
return True
@bot.command(name='config', brief='Change the configuration of this bot')
async def config_prefix_command(ctx: commands.Context, cmd: str = '', key: str = '', *, val: str = ''):
if ctx.guild is None:
await ctx.send('You can\'t run config commands in private messages!')
return
if cmd == '':
await ctx.send('Use `' + bot.command_prefix + 'config get` or `' + bot.command_prefix +
'config set` to query for or update config values. Use `' + bot.command_prefix +
'config list` to see all config options')
elif cmd == 'list':
msg = 'Available config options:'
for (key, value) in config_get_descriptions():
msg += '\n - `' + key + '`: ' + value[1]
await ctx.send(msg)
elif cmd == 'get':
if key in config:
await ctx.send('`' + key + '` is set to `' + str(config_get(key, ctx.guild.id)) + '`')
else:
await ctx.send('Unknown config option `' + key + '`')
elif cmd == 'set':
if ctx.author.guild_permissions.administrator:
await ctx.send(config_set(key, val, ctx.guild.id))
else:
await ctx.send('You\'re not allowed to change the configuration on this server!')
elif cmd == 'get-global':
if key in config:
await ctx.send('`' + key + '` is globally set to `' + str(config_get(key)) + '`')
else:
await ctx.send('Unknown config option `' + key + '`')
elif cmd == 'set-global':
if ctx.author.id == OWNER_ID:
msg = config_set(key, val)
if key == 'prefix':
bot.command_prefix = config_get('prefix')
await ctx.send(msg)
else:
await ctx.send('You\'re not allowed to change the global configuration!')
else:
await ctx.send('Unknown command!')
# ------------------- The slashy way of life ------------------- #
slash_guild_ids = None
config_choices = [create_choice(name=entry[1][1], value=entry[0]) for entry in config_meta.items() if entry[1][0]]
config_option = create_option("config_key", "A key identifying the config entry to target", str, True, config_choices)
async def check_slash_context(ctx: SlashContext, require_op: bool = False, require_manage_roles: bool = False) -> bool:
if ctx.guild is None:
await ctx.send('You can\'t run config commands in private messages!')
return False
if require_op and not ctx.author.guild_permissions.administrator:
await ctx.send('You\'re not allowed to run this command on this server!', hidden=True)
return False
if require_manage_roles and not ctx.author.guild_permissions.manage_roles:
await ctx.send('You\'re not allowed to run this command on this server!', hidden=True)
return False
return True
async def check_text_channel(ctx: SlashContext, channel) -> bool:
if not isinstance(channel, discord.TextChannel):
await ctx.send('The given channel is not a text channel!', hidden=True)
return False
return True
# __ __ ___ ____ ____
# | \/ |_ _/ ___| / ___|
# | |\/| || |\___ \| |
# | | | || | ___) | |___
# |_| |_|___|____/ \____|
@slash.slash(
name="about",
description="Shows information about this bot",
guild_ids=slash_guild_ids
)
async def about_slash(ctx: SlashContext):
embed = discord.Embed(
color=discord.Colour.from_rgb(200, 0, 0),
title="About Och bot",
description="The Och bot is a special bot created by Siphalor filled with features and inside jokes of his "
"university group.",
)
embed.add_field(name="version", value=VERSION)
await ctx.send(embed=embed, hidden=True)
@slash.slash(
name="random-message",
description="Select a random message from a channel",
options=[
create_option(
name="channel",
description="The channel to pull the messages from",
option_type=SlashCommandOptionType.CHANNEL,
required=False
),
create_option(
name="max_messages",
description="The maximum amount of messages to index (max 1024, default 100)",
option_type=SlashCommandOptionType.INTEGER,
required=False
),
create_option(
name="reaction",
description="Only pull from messages that have this reaction",
option_type=str,
required=False
)
],
guild_ids=slash_guild_ids
)
async def random_message_slash(ctx: SlashContext, channel: Optional[discord.abc.GuildChannel] = None,
max_messages: int = 100, reaction: Optional[str] = None):
await ctx.defer()
if channel is None:
channel = ctx.channel
messages = async_filter(lambda m: _is_message_valid_for_selection(m, reaction), channel.history(limit=max_messages))
messages = [item async for item in messages]
if not messages:
await ctx.send("No valid messages found!")
msg: discord.Message = random.choice(messages)
author: discord.abc.User = msg.author
embed = discord.Embed(
description=msg.content + "\n\n[Jump to message](" + msg.jump_url + ")"
)
embed.set_author(name=author.display_name, icon_url=author.avatar_url)
embed.set_footer(text="random message from #" + channel.name + " out of " + str(len(messages)) + "messages")
await ctx.send(embed=embed)
@slash.slash(
name="list-messages",
description="Lists all messages with the given filtes",
options=[
create_option(
name="channel",
description="The channel to pull the messages from",
option_type=SlashCommandOptionType.CHANNEL,
required=False
),
create_option(
name="max_messages",
description="The maximum amount of messages to index (max 1024, default 100)",
option_type=SlashCommandOptionType.INTEGER,
required=False
),
create_option(
name="reaction",
description="Only pull from messages that have this reaction",
option_type=str,
required=False
)
],
guild_ids=slash_guild_ids
)
async def list_messages_slash(ctx: SlashContext, channel: Optional[discord.abc.GuildChannel] = None,
max_messages: int = 100, reaction: Optional[str] = None):
await ctx.defer()
if channel is None:
channel = ctx.channel
messages = async_filter(lambda m: _is_message_valid_for_selection(m, reaction), channel.history(limit=max_messages))
messages = [item async for item in messages]
embed = discord.Embed(
title="List of matching messages",
description="\n".join([m.content for m in messages])
)
await ctx.send(embed=embed)
@slash.slash(
name="flip-pipelin",
description="Toss a pipelin to your favourite teacher and see whether it's schnel or slo"
)
async def flip_pipelin_slash(ctx: SlashContext):
await ctx.send("Pipelin " + random.choice(['schnel', 'slo']))
# ____ ____ ___ _ _ ____ ____
# / ___| _ \ / _ \| | | | _ \/ ___|
# | | _| |_) | | | | | | | |_) \___ \
# | |_| | _ <| |_| | |_| | __/ ___) |
# \____|_| \_\\___/ \___/|_| |____/
def get_groups_role_prefix(guild_id: int) -> str:
return config_get("groups-role-prefix", guild_id)
async def get_groups_category(ctx: SlashContext) -> Optional[discord.CategoryChannel]:
category = find_category(ctx.guild, config_get("groups-category", ctx.guild_id))
if category is None:
await ctx.send("Failed to locate group category!")
return None
return category
async def get_groups_archive_category(ctx: SlashContext) -> Optional[discord.CategoryChannel]:
category = find_category(ctx.guild, config_get("groups-archive-category", ctx.guild_id))
if category is None:
await ctx.send("Failed to locate group category!")
return None
return category
async def get_groups_categories(ctx: SlashContext) -> (discord.CategoryChannel, discord.CategoryChannel):
groups_cat = await get_groups_category(ctx)
if groups_cat is None:
return None
archive_cat = await get_groups_archive_category(ctx)
if archive_cat is None:
return None
return groups_cat, archive_cat
def collect_group_channels(cat: discord.CategoryChannel) -> Dict[str, discord.abc.GuildChannel]:
return {channel.name: channel for channel in cat.text_channels}
async def collect_group_roles(guild: discord.Guild) -> List[discord.Role]:
role_prefix = get_groups_role_prefix(guild.id)
return list(filter(lambda role: role.name.startswith(role_prefix), await guild.fetch_roles()))
@slash.subcommand(
base="groups",
name="list",
description="Lists all groups on this server",
guild_ids=slash_guild_ids
)
async def groups_list_slash(ctx: SlashContext):
if not await check_slash_context(ctx, require_manage_roles=True):
return
categories = await get_groups_categories(ctx)
if categories is None:
return
groups_cat, archive_cat = categories
active_groups = collect_group_channels(groups_cat)
archived_groups = collect_group_channels(archive_cat)
role_prefix = get_groups_role_prefix(ctx.guild_id)
msg = ""
for role in await collect_group_roles(ctx.guild):
name = role.name[len(role_prefix):].lower()
if name in active_groups:
msg += link_channel(active_groups[name]) + "\n"
elif name in archived_groups:
msg += link_channel(archived_groups[name], True) + "\n"
else:
msg += "*" + name + "*\n"
embed = discord.Embed(title="Groups", description=msg)
embed.set_footer(text="Italic groups are archived or unavailable.")
await ctx.send(embed=embed)
@slash.subcommand(
base="groups",
name="archive",
description="Move a group to the archive",
options=[
create_option(
name="group_channel",
description="The group identified by it's channel",
option_type=SlashCommandOptionType.CHANNEL,
required=True
)
],
guild_ids=slash_guild_ids
)
async def groups_archive_slash(ctx: SlashContext, group_channel: discord.abc.GuildChannel):
if not await check_slash_context(ctx, require_manage_roles=True):
return
if not await check_text_channel(ctx, group_channel):
return
categories = await get_groups_categories(ctx)
if categories is None:
return
groups_cat, archive_cat = categories
group_name = group_channel.name
groups = collect_group_channels(groups_cat)
if group_channel.category == groups_cat:
await group_channel.edit(reason="Archive group " + group_name, category=archive_cat)
if group_name + "_vc" in groups:
await groups[group_name + "_vc"].delete(reason="Archive group " + group_name)
await ctx.send("Group " + group_name + " archived.")
elif group_channel.category == archive_cat:
await ctx.send("Group " + group_name + " is already archived.", hidden=True)
else:
await ctx.send(group_name + " is not a group")
@slash.subcommand(
base="groups",
name="unarchive",
description="Move a group from the archive to the main category",
options=[
create_option(
name="group_channel",
description="The group identified by it's channel",
option_type=SlashCommandOptionType.CHANNEL,
required=True
)
],
guild_ids=slash_guild_ids
)
async def groups_archive_slash(ctx: SlashContext, group_channel: discord.abc.GuildChannel):
if not await check_slash_context(ctx, require_manage_roles=True):
return
if not await check_text_channel(ctx, group_channel):
return
categories = await get_groups_categories(ctx)
if categories is None:
return
groups_cat, archive_cat = categories
group_name = group_channel.name
if group_channel.category == archive_cat:
await ctx.defer()
await group_channel.edit(reason="Archive group " + group_name, category=archive_cat)
await ctx.send("Group " + group_name + " archived.")
elif group_channel.category == groups_cat:
await ctx.send("Group " + group_name + " is not archived.", hidden=True)
else:
await ctx.send(group_name + " is not a group")
@slash.subcommand(
base="groups",
name="delete",
description="Move a group from the archive to the main category",
options=[
create_option(
name="group_channel",
description="The group identified by it's channel",
option_type=SlashCommandOptionType.CHANNEL,
required=True
)
],
guild_ids=slash_guild_ids
)
async def groups_delete_slash(ctx: SlashContext, group_channel: discord.abc.GuildChannel):
if not await check_slash_context(ctx, require_manage_roles=True):
return
if not await check_text_channel(ctx, group_channel):
return
group_channel: discord.TextChannel = group_channel
group_name = group_channel.name
await ctx.defer()
role = await find_role_case_insensitive(ctx.guild, group_channel.name, get_groups_role_prefix(ctx.guild_id))
if role is not None:
await role.delete(reason="Delete group " + group_name)
if group_channel.category:
vcs: List[discord.VoiceChannel] = group_channel.category.voice_channels
for vc in vcs:
if vc.name == group_name + "_vc":
await vc.delete(reason="Delete group " + group_name)
break
await group_channel.delete(reason="Delete group " + group_name)
await ctx.send("Group " + group_name + " deleted.")
@slash.subcommand(
base="groups",
name="create",
description="Creates a new group channel and role",
options=[
create_option(
name="name",
description="The name of the group",
option_type=str,
required=True
),
create_option(
name="members",
description="A comma-separated list of members",
option_type=str,
required=False
)
],
guild_ids=slash_guild_ids
)
async def groups_create_slash(ctx: SlashContext, name: str, members: Optional[str] = None):
if not await check_slash_context(ctx, require_manage_roles=True):
return
await ctx.defer()
name = name.strip()
groups_cat = await get_groups_category(ctx)
if not groups_cat:
return
cor = groups_cat.create_text_channel(name.lower(), reason="Create grooup " + name)
cor_role = ctx.guild.create_role(name=config_get("groups-role-prefix", ctx.guild_id) + name.lower(),
mentionable=True, reason="Create group " + name)
channel: discord.TextChannel = await cor
cor = channel.edit(sync_permissions=True)
role: discord.Role = await cor_role
await cor
await channel.set_permissions(role, reason="Create group " + name, read_messages=True)
extra = ""
if members is not None:
members = members.split(',')
converter = MemberConverter()
for member_str in members:
member_str = member_str.strip()
try:
member = await converter.convert(ctx, member_str)
await member.add_roles(role, reason="Create group " + name)
except discord.ext.commands.MemberNotFound:
extra += '\n*' + member_str + '*'
if extra:
extra = '\nCouldn\'t resolve users:' + extra
await asyncio.sleep(3)
await channel.send("Hi, " + role.mention)
await ctx.send("Group " + name + " created." + extra)
@slash.subcommand(
base="groups",
name="remove-vc",
description="Removes a temporary voice channel associated with the current group",
guild_ids=slash_guild_ids
)
async def groups_remove_vc_slash(ctx: SlashContext):
if not await check_slash_context(ctx):
return
groups_cat = await get_groups_category(ctx)
if not groups_cat:
return
if ctx.channel.category == groups_cat:
vc: Optional[discord.VoiceChannel] = discord.utils.find(
lambda _vc: _vc.name == ctx.channel.name + '_vc',
ctx.channel.category.voice_channels
)
if vc is None:
await ctx.send("Voice chat is not open!", hidden=True)
else:
await ctx.defer(hidden=True)
await vc.delete(reason="Delete temporary voice chat")
await ctx.send("Removed temporary voice chat.", hidden=True)
else:
await ctx.send("This channel is not an active group!", hidden=True)
@slash.subcommand(
base="groups",
name="vc",
description="Create a temporary voice chat. Available to anyone.",
guild_ids=slash_guild_ids
)
async def groups_vc_slash(ctx: SlashContext):
if ctx.channel.category is not None:
guild: discord.Guild = ctx.guild
channel: discord.TextChannel = ctx.channel
if channel.category.name.lower() == config_get("groups-category", guild.id).lower():
role = await find_role_case_insensitive(guild, channel.name, config_get("groups-role-prefix", guild.id))
if role is None:
await ctx.send("Couldn't resolve group role!", hidden=True)
return
category: discord.CategoryChannel = channel.category
for vc in category.voice_channels:
if vc.name.lower() == channel.name.lower() + "_vc":
await ctx.send("Temporary group channel already exists.", hidden=True)
return
await ctx.defer()
reason = "Create temporary vc for group " + channel.name
vc = await category.create_voice_channel(channel.name + "_vc", reason=reason)
await vc.edit(sync_permissions=True, reason=reason)
await vc.set_permissions(role, view_channel=True, connect=True, reason=reason)
await ctx.send("Created temporary vc for this group")
return
await ctx.send("Not an active group channel!", hidden=True)
# ___ _ _ ___ _____ _____ ____
# / _ \| | | |/ _ \_ _| ____/ ___|
# | | | | | | | | | || | | _| \___ \
# | |_| | |_| | |_| || | | |___ ___) |
# \__\_\\___/ \___/ |_| |_____|____/
def get_quotes(guild_id: int) -> Dict[str, List[str]]:
quotes: Optional[Dict[str, List[str]]] = config_get("guild_quotes", guild=guild_id)
if quotes is None:
quotes = {}
config_set_raw("guild_quotes", quotes, guild_id)
return quotes
def get_quotes_rolling_index(guild_id: int, author: str) -> Optional[int]:
quotes = get_quotes(guild_id)
if author not in quotes:
return None
quotes = quotes[author]
rolling_indeces = config_get("guild_quotes_rolling_indeces", guild=guild_id)
if rolling_indeces is None:
rolling_indeces = {}
if author not in rolling_indeces:
rolling_indeces[author] = 0
index = rolling_indeces[author]
rolling_indeces[author] = (index + 1) % len(quotes)
config_set_raw("guild_quotes_rolling_indeces", rolling_indeces, guild=guild_id)
return index
async def _do_quote(ctx: SlashContext, author: str, quote: str, tts: bool):
text = '\n'.join(map(lambda line: '> ' + line, quote.splitlines()))
await ctx.send(text + '\n *~' + author + '*')
if tts:
voice: discord.VoiceState = ctx.author.voice
if voice.channel is not None:
source, tts_destroyer = text_to_speech(author + " sagte: " + quote)
vp: discord.VoiceClient = await connect_and_play(voice.channel, source)
while vp.is_playing() and vp.channel == voice.channel:
await asyncio.sleep(0.5)
await vp.disconnect()
tts_destroyer()
@slash.subcommand(
base="quotes",
name="random",
description="Gets a random quote, optionally for the given author",
options=[
create_option(
name="author",
description="The author to filter by",
option_type=str,
required=False
),
create_option(
name="tts",
description="Text to speech to voice chat",
option_type=SlashCommandOptionType.BOOLEAN,
required=False
)
],
guild_ids=slash_guild_ids
)
async def quote_random_slash(ctx: SlashContext, author: Optional[str] = None, tts: bool = False):
if not await check_slash_context(ctx, False):
return
quotes = get_quotes(ctx.guild_id)
if author is None:
authors = quotes.keys()
if authors:
author = random.choice([*authors])
else:
await ctx.send('No quotes available')
return
if author not in quotes:
await ctx.send('No such author!', hidden=True)
return
author_quotes = quotes[author]
quote = random.choice(author_quotes)
await _do_quote(ctx, author, quote, tts)
@slash.subcommand(
base="quotes",
name="tell",
description="Recite a quote",
options=[
create_option(
name="author",
description="The author to recite from",
option_type=str,
required=True
),
create_option(
name="prefix",
description="Get a quote by prefix",
option_type=str,
required=False
),
create_option(
name="index",
description="Get a quote with a certain index",
option_type=SlashCommandOptionType.INTEGER,
required=False
),
create_option(
name="tts",
description="Text to speech to voice chat",
option_type=SlashCommandOptionType.BOOLEAN,
required=False
)
],
guild_ids=slash_guild_ids
)
async def quote_tell_slash(ctx: SlashContext, author: str, prefix: Optional[str] = None, index: Optional[int] = None,
tts: Optional[bool] = None):
if not await check_slash_context(ctx, False):
return
quotes = get_quotes(ctx.guild_id)[author]
if index is not None:
await _do_quote(ctx, author, quotes[index], tts)
return
if prefix is not None:
for quote in quotes:
if quote.lower().startswith(prefix.lower()):
await _do_quote(ctx, author, quote, tts)
return
index = get_quotes_rolling_index(ctx.guild_id, author)
if index is not None:
await _do_quote(ctx, author, quotes[index], tts)
@slash.subcommand(
base="quotes",
name="list",
description="Lists all authors or all quotes by a given author",
options=[
create_option(
name="author",
description="The author to filter by, use * for all authors",
option_type=str,
required=False
)
],
guild_ids=slash_guild_ids
)
async def quote_list_slash(ctx: SlashContext, author: Optional[str] = None):
if not await check_slash_context(ctx, False):
return
def create_list_embed(title: str, items: Iterable[str]) -> discord.Embed:
embed: discord.Embed = discord.Embed(
title=title,
description='\n'.join(items)
)
return embed
quotes = get_quotes(ctx.guild_id)
if author is None:
await ctx.send(embed=create_list_embed("Available authors", quotes.keys()))
elif author == '*':
coroutines = []
await ctx.defer()
for author, author_quotes in quotes.items():
coroutines.append(ctx.channel.send(embed=create_list_embed(author, author_quotes)))
await asyncio.gather(*coroutines)
await ctx.send(content="That was all.")
else:
author = author.strip().title()
if author in quotes:
await ctx.send(embed=create_list_embed(author, quotes[author]))
else:
await ctx.send("No such author: \"" + author + "\"", hidden=True)
@slash.subcommand(
base="quotes",
name="add",
description="Add a new quote",
options=[
create_option(
name="author",
description="The author of the new quote",
option_type=str,
required=True
),
create_option(
name="quote",
description="The quote to add",
option_type=str,
required=True
)
],
guild_ids=slash_guild_ids
)
async def quote_add_slash(ctx: SlashContext, author: str, quote: str):
if not await check_slash_context(ctx, False):
return
ignored_character_regex = re.compile(r'\W')
def prepare_quote_for_compare(_quote: str) -> str:
return ignored_character_regex.sub('', _quote).lower()
quotes = get_quotes(ctx.guild_id)
author = author.strip().title()
quote = quote.strip()
quote = quote[0].upper() + quote[1::]
if author in quotes:
author_quotes: List[str] = quotes[author]
stripped_quote = prepare_quote_for_compare(quote)
for aq in author_quotes:
if prepare_quote_for_compare(aq) == stripped_quote:
await ctx.send("Quote already exists!", hidden=True)
return
author_quotes.append(quote)
config_save()
await ctx.send("Quote added.")
else:
quotes[author] = [quote]
config_save()
await ctx.send("Author and quote added")
@slash.subcommand(
base="quotes",
name="remove",
description="Removes a quote. Requires administrator permissions.",
options=[
create_option(
name="author",
description="The author to delete from",
option_type=str,
required=True
),
create_option(
name="quote",
description="The exact quote to remove",
option_type=str,
required=False
),
create_option(
name="quote_position",
description="The quote position to delete",
option_type=SlashCommandOptionType.INTEGER,
required=False
)
],
guild_ids=slash_guild_ids
)
async def quote_remove_slash(ctx: SlashContext, author: str, quote: Optional[str] = None,
quote_position: Optional[int] = None):
if not await check_slash_context(ctx, True):
return
quotes = get_quotes(ctx.guild_id)
if quotes is None:
await ctx.send("No quotes available!", hidden=True)
else:
author = author.strip().title()
if author not in quotes:
await ctx.send("No quotes from author \"" + author + "\"!", hidden=True)
return
author_quotes = quotes[author]
if quote is not None:
quote = quote.strip().lower()
for aq in author_quotes:
if aq.lower() == quote:
author_quotes.remove(aq)
if not author_quotes:
quotes.pop(author)
await ctx.send("Removed quote from author \"" + author + "\"")
config_save()
return
await ctx.send("No such quote found!", hidden=True)
elif quote_position is not None:
if 0 <= quote_position < len(author_quotes):
author_quotes.pop(quote_position)
if not author_quotes:
quotes.pop(author)
await ctx.send("Removed quote from author \"" + author + "\"")
config_save()
return
await ctx.send(
"Invalid quote position - must be within the range of 0 and " + str(len(author_quotes)) + "!",
hidden=True)
else:
await ctx.send("Either quote or quote_position must be used!", hidden=True)
# ____ ___ _ _ _____ ___ ____
# / ___/ _ \| \ | | ___|_ _/ ___|
# | | | | | | \| | |_ | | | _
# | |__| |_| | |\ | _| | | |_| |
# \____\___/|_| \_|_| |___\____|
@slash.slash(
name="config",
description="Change or query the configuration of this bot",
guild_ids=slash_guild_ids
)
async def config_slash(ctx: SlashContext):
await ctx.send("missingno", hidden=True)
@slash.subcommand(
base="config",
name="list",
description="List available config options",
guild_ids=slash_guild_ids
)
async def config_list_slash(ctx: SlashContext):
msg = 'Available config options:'
for (key, value) in config_get_descriptions():
msg += '\n - `' + key + '`: ' + value[1]
await ctx.send(msg, hidden=True)
@slash.subcommand(
base="config",
name="get",
description="Queries the configuration for a value",
options=[
config_option
],
guild_ids=slash_guild_ids
)
async def config_get_slash(ctx: SlashContext, config_key: str):
if not await check_slash_context(ctx, False):
return
if config_key in config_meta and config_meta[config_key][0]:
await ctx.send('`' + config_key + '` is set to `' + str(config_get(config_key, ctx.guild.id)) + '`',
hidden=True)
else:
await ctx.send('Unknown config option `' + config_key + '`', hidden=True)
@slash.subcommand(
base="config",
name="set",
description="Changes a configuration entry",
options=[
config_option,
create_option(
name="value",
description="The value to set",
option_type=str,
required=True
)
],
guild_ids=slash_guild_ids
)
async def config_set_slash(ctx: SlashContext, config_key: str, value: str):
if not await check_slash_context(ctx, True):
return
if ctx.author.guild_permissions.administrator:
await ctx.send(config_set(config_key, value, ctx.guild.id), hidden=True)
else:
await ctx.send('You\'re not allowed to change the configuration on this server!', hidden=True)
bot.run(TOKEN)