1197 lines
41 KiB
Python
1197 lines
41 KiB
Python
import asyncio
|
|
import os
|
|
import random
|
|
import re
|
|
import time
|
|
from typing import Optional, List, Dict, Iterable
|
|
|
|
import discord
|
|
from discord.ext import commands
|
|
from discord.ext.commands import MemberConverter
|
|
from discord_slash import SlashCommand, SlashContext, SlashCommandOptionType
|
|
from discord_slash.model import SlashCommandPermissionType
|
|
from discord_slash.utils.manage_commands import create_option, create_choice, create_permission
|
|
from dotenv import load_dotenv
|
|
|
|
from lib.config import config, config_load, config_save, config_get, config_set, config_get_descriptions, \
|
|
config_set_raw, config_meta
|
|
from lib.utils import async_filter, find_category, find_role_case_insensitive, link_channel, connect_and_play, \
|
|
text_to_speech
|
|
|
|
VERSION = "1.3.5"
|
|
|
|
load_dotenv()
|
|
TOKEN = os.getenv('DISCORD_TOKEN')
|
|
|
|
PING_REGEX = re.compile(r'\b(?:ge)?ping', re.IGNORECASE)
|
|
TOPFIT_REGEX = re.compile(r'\b(topfit|fit|top|micro|microsoft|virtual reality|vr|ä+h*m*|hä|ö+h*|ps(ü|eu|y)do-?code|parkett|quatsch|quasi*|onlo[oc]ker)\b', re.IGNORECASE)
|
|
LOEH_REGEX = re.compile(r'Och L(?:oe|ö)h!?', re.IGNORECASE)
|
|
OCH_ANYONE_REGEX = re.compile(r'^Och\s*(\w.*?)\s*[,.!?:]*$', re.IGNORECASE)
|
|
|
|
TOPFIT_WORDS = (
|
|
"äh",
|
|
"ähm",
|
|
"ääh",
|
|
"äääh",
|
|
"ä",
|
|
"öh",
|
|
"ööh",
|
|
"..",
|
|
"...",
|
|
"Micro",
|
|
"Microsoft",
|
|
"Microservices",
|
|
"fit"
|
|
"top",
|
|
"tiptop",
|
|
"topfit",
|
|
"vortual reality",
|
|
"quasi",
|
|
"kwazzi",
|
|
"Psüdocode",
|
|
"Psydocode",
|
|
"Quatsch",
|
|
"Parkett",
|
|
"Onlocker",
|
|
"Necktor"
|
|
)
|
|
|
|
# LOEH_ID = 327126546970312739
|
|
LOEH_ID = 254265844928872448
|
|
LOEH_CHANNEL_ID = 900327877327867944
|
|
OWNER_ID = 327126546970312739
|
|
|
|
INF19X_GUILD_ID = 651348951538335744
|
|
|
|
OCH_LOEH_SOUND = "assets/och_loeh.mp3"
|
|
|
|
config_load()
|
|
intents = discord.Intents.default()
|
|
intents.members = True
|
|
intents.voice_states = True
|
|
bot = commands.Bot(command_prefix=config.get('prefix'), intents=intents)
|
|
slash = SlashCommand(bot, sync_commands=True)
|
|
|
|
if 'LIBOPUS' in os.environ and not len(os.environ['LIBOPUS']) == 0:
|
|
discord.opus.load_opus(os.environ['LIBOPUS'])
|
|
|
|
|
|
async def _get_loeh(guild: discord.Guild) -> Optional[discord.Member]:
|
|
try:
|
|
return await guild.fetch_member(LOEH_ID)
|
|
except discord.NotFound:
|
|
return None
|
|
|
|
|
|
def _get_last_och_time(guild: discord.Guild) -> int:
|
|
return config_get('last-och-time', guild.id)
|
|
|
|
|
|
def _set_last_och_time(guild: discord.Guild):
|
|
config_set_raw('last-och-time', time.time(), guild.id)
|
|
|
|
|
|
@bot.event
|
|
async def on_ready():
|
|
print(f'{bot.user} has connected to Discord!')
|
|
|
|
|
|
@bot.event
|
|
async def on_message(message: discord.Message):
|
|
if message.guild is not None and not message.author.bot:
|
|
if config_get('loeh-enable', message.guild.id) and LOEH_REGEX.match(message.content):
|
|
if message.author.id == LOEH_ID:
|
|
await message.channel.send("https://stuff.siphalor.de/img/spidy-is-that-you.jpg")
|
|
else:
|
|
cooldown = config_get('och-cooldown', message.guild.id)
|
|
t = time.time() - _get_last_och_time(message.guild)
|
|
if t <= cooldown:
|
|
await message.channel.send(
|
|
'429: Too many requests - Wait ' + str(int(cooldown - t)) + ' more seconds!')
|
|
return
|
|
_set_last_och_time(message.guild)
|
|
|
|
loeh = await _get_loeh(message.guild)
|
|
if loeh is None:
|
|
await message.channel.send('404: Löh not found!')
|
|
elif loeh.voice is None:
|
|
await message.channel.send('400: Löh not connected!')
|
|
else:
|
|
voice: discord.VoiceState = loeh.voice
|
|
try:
|
|
voice_channel: discord.VoiceChannel = voice.channel
|
|
source = discord.PCMVolumeTransformer(discord.FFmpegPCMAudio(source=OCH_LOEH_SOUND))
|
|
voice_protocol = await connect_and_play(voice_channel, source=source)
|
|
|
|
await loeh.edit(mute=True)
|
|
sleeper = asyncio.sleep(config_get('och-timeout', message.guild.id))
|
|
|
|
message: Optional[discord.Message] = await message.channel.send('Zu Befehl!')
|
|
await sleeper
|
|
await loeh.edit(mute=False)
|
|
if message is not None:
|
|
await message.edit(content="~~Zu Befehl!~~\nEs sei ihm verziehen.")
|
|
if type(voice_protocol) is discord.VoiceClient:
|
|
await voice_protocol.disconnect(force=True)
|
|
except (asyncio.TimeoutError, discord.Forbidden, discord.HTTPException, discord.ClientException):
|
|
await message.channel.send('Failed to complete your command, Sir')
|
|
return
|
|
elif config_get('och-anyone-enable', message.guild.id) and (
|
|
match := OCH_ANYONE_REGEX.search(message.content)) is not None:
|
|
cooldown = config_get('och-cooldown', message.guild.id)
|
|
t = time.time() - _get_last_och_time(message.guild)
|
|
if t <= cooldown:
|
|
await message.channel.send('429: Too many requests - Wait ' + str(int(cooldown - t)) + ' more seconds!')
|
|
return
|
|
_set_last_och_time(message.guild)
|
|
|
|
guild: discord.Guild = message.guild
|
|
bot_member: discord.Member = guild.get_member(bot.user.id)
|
|
search: str = match.group(1).lower()
|
|
matches: list = []
|
|
voice: Optional[discord.VoiceChannel] = None
|
|
|
|
async for member in guild.fetch_members(limit=100):
|
|
if not member.bot and member.voice is not None and search in member.display_name.lower().split(' '):
|
|
perms: discord.Permissions = member.voice.channel.permissions_for(bot_member)
|
|
if perms.connect and perms.speak:
|
|
voice = member.voice.channel
|
|
matches.append(member)
|
|
|
|
if matches and voice is not None:
|
|
source, destroy_tts = text_to_speech(message.content)
|
|
voice_protocol: Optional[discord.VoiceProtocol] = await connect_and_play(voice, source)
|
|
|
|
async def _mute(m: discord.Member):
|
|
await m.edit(mute=True)
|
|
|
|
async def _unmute(m: discord.Member):
|
|
await m.edit(mute=False)
|
|
|
|
await asyncio.gather(*map(_mute, matches))
|
|
|
|
sleeper = asyncio.sleep(config_get('och-timeout', message.guild.id))
|
|
message: Optional[discord.Message] = await message.channel.send('Auf gehts!')
|
|
await sleeper
|
|
|
|
await asyncio.gather(*map(_unmute, matches))
|
|
|
|
waiter = None
|
|
if message is not None:
|
|
waiter = message.edit(content='~~Auf gehts!~~\nGeschafft!')
|
|
if type(voice_protocol) is discord.VoiceClient:
|
|
await voice_protocol.disconnect(force=True)
|
|
if waiter is not None:
|
|
await waiter
|
|
|
|
destroy_tts()
|
|
else:
|
|
await message.channel.send('404: No users found!')
|
|
elif config_get('inf19x-insiders-enable', message.guild.id):
|
|
if PING_REGEX.search(message.content):
|
|
embed = discord.Embed(
|
|
title="*pinken, schwaches Verb*",
|
|
description="ein Netzwerkgerät testweise ansprechen.\nOft falsch geschrieben als pin__g__en",
|
|
color=16636435
|
|
)
|
|
embed.add_field(name='ich', value='pinke')
|
|
embed.add_field(name='du', value='pinkst')
|
|
embed.add_field(name='er|sie|es', value='pinkt')
|
|
embed.add_field(name='wir', value='pinken')
|
|
embed.add_field(name='ihr', value='pinkt')
|
|
embed.add_field(name='sie', value='pinken')
|
|
embed.add_field(name='Partizip 2', value='gepinkt')
|
|
await message.channel.send(embed=embed)
|
|
else:
|
|
match = TOPFIT_REGEX.search(message.content)
|
|
if match is not None:
|
|
text = ''
|
|
for i in range(0, random.randint(7, 13)):
|
|
text += random.choice(TOPFIT_WORDS) + " "
|
|
text += match.group(1)
|
|
await message.channel.send(text)
|
|
await bot.process_commands(message)
|
|
|
|
|
|
@bot.event
|
|
async def on_voice_state_update(member: discord.Member, before: discord.VoiceState, after: discord.VoiceState):
|
|
if after.channel is None and before.channel is not None:
|
|
channel: discord.VoiceChannel = before.channel
|
|
if not channel.voice_states:
|
|
if channel.category is not None and channel.name.endswith("_vc"):
|
|
if channel.category.name.lower() == config_get("groups-category", channel.guild.id).lower():
|
|
name: str = channel.name[:-3]
|
|
role = await find_role_case_insensitive(channel.guild, name,
|
|
config_get("groups-role-prefix", channel.guild.id))
|
|
|
|
if role is not None:
|
|
await channel.delete(reason="Delete temporary group channel when last person left")
|
|
|
|
|
|
def _is_message_valid_for_selection(message: discord.Message, reaction_filter: Optional[str] = None) -> bool:
|
|
if message.clean_content.strip() == '':
|
|
return False
|
|
|
|
for reaction in message.reactions:
|
|
if reaction.emoji == '❌':
|
|
return False
|
|
|
|
if reaction_filter is not None:
|
|
for reaction in message.reactions:
|
|
if type(reaction.emoji) is str:
|
|
if reaction.emoji == reaction_filter:
|
|
return True
|
|
elif type(reaction.emoji) is discord.Emoji or type(reaction.emoji) is discord.PartialEmoji:
|
|
if ':' + reaction.emoji.name + ':' == reaction_filter:
|
|
return True
|
|
return False
|
|
|
|
return True
|
|
|
|
|
|
@bot.command(name='config', brief='Change the configuration of this bot')
|
|
async def config_prefix_command(ctx: commands.Context, cmd: str = '', key: str = '', *, val: str = ''):
|
|
if ctx.guild is None:
|
|
await ctx.send('You can\'t run config commands in private messages!')
|
|
return
|
|
if cmd == '':
|
|
await ctx.send('Use `' + bot.command_prefix + 'config get` or `' + bot.command_prefix +
|
|
'config set` to query for or update config values. Use `' + bot.command_prefix +
|
|
'config list` to see all config options')
|
|
elif cmd == 'list':
|
|
msg = 'Available config options:'
|
|
for (key, value) in config_get_descriptions():
|
|
msg += '\n - `' + key + '`: ' + value[1]
|
|
await ctx.send(msg)
|
|
elif cmd == 'get':
|
|
if key in config:
|
|
await ctx.send('`' + key + '` is set to `' + str(config_get(key, ctx.guild.id)) + '`')
|
|
else:
|
|
await ctx.send('Unknown config option `' + key + '`')
|
|
elif cmd == 'set':
|
|
if ctx.author.guild_permissions.administrator:
|
|
await ctx.send(config_set(key, val, ctx.guild.id))
|
|
else:
|
|
await ctx.send('You\'re not allowed to change the configuration on this server!')
|
|
elif cmd == 'get-global':
|
|
if key in config:
|
|
await ctx.send('`' + key + '` is globally set to `' + str(config_get(key)) + '`')
|
|
else:
|
|
await ctx.send('Unknown config option `' + key + '`')
|
|
elif cmd == 'set-global':
|
|
if ctx.author.id == OWNER_ID:
|
|
msg = config_set(key, val)
|
|
if key == 'prefix':
|
|
bot.command_prefix = config_get('prefix')
|
|
await ctx.send(msg)
|
|
else:
|
|
await ctx.send('You\'re not allowed to change the global configuration!')
|
|
else:
|
|
await ctx.send('Unknown command!')
|
|
|
|
|
|
# ------------------- The slashy way of life ------------------- #
|
|
|
|
slash_guild_ids = None
|
|
config_choices = [create_choice(name=entry[1][1], value=entry[0]) for entry in config_meta.items() if entry[1][0]]
|
|
config_option = create_option("config_key", "A key identifying the config entry to target", str, True, config_choices)
|
|
|
|
|
|
async def check_slash_context(ctx: SlashContext, require_op: bool = False, require_manage_roles: bool = False) -> bool:
|
|
if ctx.guild is None:
|
|
await ctx.send('You can\'t run config commands in private messages!')
|
|
return False
|
|
|
|
if require_op and not ctx.author.guild_permissions.administrator:
|
|
await ctx.send('You\'re not allowed to run this command on this server!', hidden=True)
|
|
return False
|
|
|
|
if require_manage_roles and not ctx.author.guild_permissions.manage_roles:
|
|
await ctx.send('You\'re not allowed to run this command on this server!', hidden=True)
|
|
return False
|
|
|
|
return True
|
|
|
|
|
|
async def check_text_channel(ctx: SlashContext, channel) -> bool:
|
|
if not isinstance(channel, discord.TextChannel):
|
|
await ctx.send('The given channel is not a text channel!', hidden=True)
|
|
return False
|
|
return True
|
|
|
|
|
|
# __ __ ___ ____ ____
|
|
# | \/ |_ _/ ___| / ___|
|
|
# | |\/| || |\___ \| |
|
|
# | | | || | ___) | |___
|
|
# |_| |_|___|____/ \____|
|
|
|
|
@slash.slash(
|
|
name="about",
|
|
description="Shows information about this bot",
|
|
guild_ids=slash_guild_ids
|
|
)
|
|
async def about_slash(ctx: SlashContext):
|
|
embed = discord.Embed(
|
|
color=discord.Colour.from_rgb(200, 0, 0),
|
|
title="About Och bot",
|
|
description="The Och bot is a special bot created by Siphalor filled with features and inside jokes of his "
|
|
"university group.",
|
|
)
|
|
embed.add_field(name="version", value=VERSION)
|
|
await ctx.send(embed=embed, hidden=True)
|
|
|
|
|
|
@slash.slash(
|
|
name="random-message",
|
|
description="Select a random message from a channel",
|
|
options=[
|
|
create_option(
|
|
name="channel",
|
|
description="The channel to pull the messages from",
|
|
option_type=SlashCommandOptionType.CHANNEL,
|
|
required=False
|
|
),
|
|
create_option(
|
|
name="max_messages",
|
|
description="The maximum amount of messages to index (max 1024, default 100)",
|
|
option_type=SlashCommandOptionType.INTEGER,
|
|
required=False
|
|
),
|
|
create_option(
|
|
name="reaction",
|
|
description="Only pull from messages that have this reaction",
|
|
option_type=str,
|
|
required=False
|
|
)
|
|
],
|
|
guild_ids=slash_guild_ids
|
|
)
|
|
async def random_message_slash(ctx: SlashContext, channel: Optional[discord.abc.GuildChannel] = None,
|
|
max_messages: int = 100, reaction: Optional[str] = None):
|
|
await ctx.defer()
|
|
if channel is None:
|
|
channel = ctx.channel
|
|
|
|
messages = async_filter(lambda m: _is_message_valid_for_selection(m, reaction), channel.history(limit=max_messages))
|
|
messages = [item async for item in messages]
|
|
if not messages:
|
|
await ctx.send("No valid messages found!")
|
|
|
|
msg: discord.Message = random.choice(messages)
|
|
author: discord.abc.User = msg.author
|
|
embed = discord.Embed(
|
|
description=msg.content + "\n\n[Jump to message](" + msg.jump_url + ")"
|
|
)
|
|
embed.set_author(name=author.display_name, icon_url=author.avatar_url)
|
|
embed.set_footer(text="random message from #" + channel.name + " out of " + str(len(messages)) + "messages")
|
|
await ctx.send(embed=embed)
|
|
|
|
|
|
@slash.slash(
|
|
name="list-messages",
|
|
description="Lists all messages with the given filtes",
|
|
options=[
|
|
create_option(
|
|
name="channel",
|
|
description="The channel to pull the messages from",
|
|
option_type=SlashCommandOptionType.CHANNEL,
|
|
required=False
|
|
),
|
|
create_option(
|
|
name="max_messages",
|
|
description="The maximum amount of messages to index (max 1024, default 100)",
|
|
option_type=SlashCommandOptionType.INTEGER,
|
|
required=False
|
|
),
|
|
create_option(
|
|
name="reaction",
|
|
description="Only pull from messages that have this reaction",
|
|
option_type=str,
|
|
required=False
|
|
)
|
|
],
|
|
guild_ids=slash_guild_ids
|
|
)
|
|
async def list_messages_slash(ctx: SlashContext, channel: Optional[discord.abc.GuildChannel] = None,
|
|
max_messages: int = 100, reaction: Optional[str] = None):
|
|
await ctx.defer()
|
|
if channel is None:
|
|
channel = ctx.channel
|
|
messages = async_filter(lambda m: _is_message_valid_for_selection(m, reaction), channel.history(limit=max_messages))
|
|
messages = [item async for item in messages]
|
|
embed = discord.Embed(
|
|
title="List of matching messages",
|
|
description="\n".join([m.content for m in messages])
|
|
)
|
|
await ctx.send(embed=embed)
|
|
|
|
|
|
@slash.slash(
|
|
name="flip-pipelin",
|
|
description="Toss a pipelin to your favourite teacher and see whether it's schnel or slo",
|
|
guild_ids=slash_guild_ids
|
|
)
|
|
async def flip_pipelin_slash(ctx: SlashContext):
|
|
await ctx.send("Pipelin " + random.choice(['schnel', 'slo']))
|
|
|
|
|
|
@slash.slash(
|
|
name="hallöhl",
|
|
description="Pulls any member to the special Löh room",
|
|
options=[
|
|
create_option(
|
|
name="user",
|
|
description="The user to pull to the Löh room",
|
|
option_type=SlashCommandOptionType.USER,
|
|
required=True
|
|
)
|
|
],
|
|
guild_ids=slash_guild_ids
|
|
)
|
|
@slash.permission(
|
|
guild_id=INF19X_GUILD_ID,
|
|
permissions=[
|
|
create_permission(LOEH_ID, SlashCommandPermissionType.USER, permission=True),
|
|
],
|
|
)
|
|
async def halloehl_slash(ctx: SlashContext, user: discord.Member):
|
|
if user.voice is None:
|
|
await ctx.send("User is not is not connected to a vc", hidden=True)
|
|
return
|
|
|
|
if user.voice.channel.guild.id != ctx.guild_id:
|
|
await ctx.send("User is not connected to this guild", hidden=True)
|
|
return
|
|
|
|
vc: Optional[discord.VoiceChannel] = ctx.guild.get_channel(LOEH_CHANNEL_ID)
|
|
if vc is None:
|
|
await ctx.send("This command is not available in this guild", hidden=True)
|
|
return
|
|
|
|
if user.id == LOEH_ID:
|
|
await user.move_to(vc, reason="Löh returns to his home channel")
|
|
await ctx.send("done", hidden=True)
|
|
return
|
|
|
|
loeh = await _get_loeh(ctx.guild)
|
|
if loeh is None:
|
|
await ctx.send("404 - Löh not found", hidden=True)
|
|
return
|
|
|
|
if loeh not in vc.members:
|
|
await ctx.send("You are not in your channel!", hidden=True)
|
|
return
|
|
|
|
perms = loeh.permissions_in(user.voice.channel)
|
|
if not perms.view_channel or not perms.connect:
|
|
await ctx.send("Can't move user out of private channel", hidden=True)
|
|
return
|
|
|
|
await user.move_to(vc, reason="Pull to Löh")
|
|
await ctx.send("done", hidden=True)
|
|
|
|
|
|
# ____ ____ ___ _ _ ____ ____
|
|
# / ___| _ \ / _ \| | | | _ \/ ___|
|
|
# | | _| |_) | | | | | | | |_) \___ \
|
|
# | |_| | _ <| |_| | |_| | __/ ___) |
|
|
# \____|_| \_\\___/ \___/|_| |____/
|
|
|
|
def get_groups_role_prefix(guild_id: int) -> str:
|
|
return config_get("groups-role-prefix", guild_id)
|
|
|
|
|
|
async def get_groups_category(ctx: SlashContext) -> Optional[discord.CategoryChannel]:
|
|
category = find_category(ctx.guild, config_get("groups-category", ctx.guild_id))
|
|
if category is None:
|
|
await ctx.send("Failed to locate group category!")
|
|
return None
|
|
return category
|
|
|
|
|
|
async def get_groups_archive_category(ctx: SlashContext) -> Optional[discord.CategoryChannel]:
|
|
category = find_category(ctx.guild, config_get("groups-archive-category", ctx.guild_id))
|
|
if category is None:
|
|
await ctx.send("Failed to locate group category!")
|
|
return None
|
|
return category
|
|
|
|
|
|
async def get_groups_categories(ctx: SlashContext) -> (discord.CategoryChannel, discord.CategoryChannel):
|
|
groups_cat = await get_groups_category(ctx)
|
|
if groups_cat is None:
|
|
return None
|
|
archive_cat = await get_groups_archive_category(ctx)
|
|
if archive_cat is None:
|
|
return None
|
|
return groups_cat, archive_cat
|
|
|
|
|
|
def collect_group_channels(cat: discord.CategoryChannel) -> Dict[str, discord.abc.GuildChannel]:
|
|
return {channel.name: channel for channel in cat.text_channels}
|
|
|
|
|
|
async def collect_group_roles(guild: discord.Guild) -> List[discord.Role]:
|
|
role_prefix = get_groups_role_prefix(guild.id)
|
|
return list(filter(lambda role: role.name.startswith(role_prefix), await guild.fetch_roles()))
|
|
|
|
|
|
@slash.subcommand(
|
|
base="groups",
|
|
name="list",
|
|
description="Lists all groups on this server",
|
|
guild_ids=slash_guild_ids
|
|
)
|
|
async def groups_list_slash(ctx: SlashContext):
|
|
if not await check_slash_context(ctx, require_manage_roles=True):
|
|
return
|
|
|
|
categories = await get_groups_categories(ctx)
|
|
if categories is None:
|
|
return
|
|
groups_cat, archive_cat = categories
|
|
|
|
active_groups = collect_group_channels(groups_cat)
|
|
archived_groups = collect_group_channels(archive_cat)
|
|
|
|
role_prefix = get_groups_role_prefix(ctx.guild_id)
|
|
msg = ""
|
|
for role in await collect_group_roles(ctx.guild):
|
|
name = role.name[len(role_prefix):].lower()
|
|
if name in active_groups:
|
|
msg += link_channel(active_groups[name]) + "\n"
|
|
elif name in archived_groups:
|
|
msg += link_channel(archived_groups[name], True) + "\n"
|
|
else:
|
|
msg += "*" + name + "*\n"
|
|
|
|
embed = discord.Embed(title="Groups", description=msg)
|
|
embed.set_footer(text="Italic groups are archived or unavailable.")
|
|
await ctx.send(embed=embed)
|
|
|
|
|
|
@slash.subcommand(
|
|
base="groups",
|
|
name="archive",
|
|
description="Move a group to the archive",
|
|
options=[
|
|
create_option(
|
|
name="group_channel",
|
|
description="The group identified by it's channel",
|
|
option_type=SlashCommandOptionType.CHANNEL,
|
|
required=True
|
|
)
|
|
],
|
|
guild_ids=slash_guild_ids
|
|
)
|
|
async def groups_archive_slash(ctx: SlashContext, group_channel: discord.abc.GuildChannel):
|
|
if not await check_slash_context(ctx, require_manage_roles=True):
|
|
return
|
|
if not await check_text_channel(ctx, group_channel):
|
|
return
|
|
|
|
categories = await get_groups_categories(ctx)
|
|
if categories is None:
|
|
return
|
|
groups_cat, archive_cat = categories
|
|
|
|
group_name = group_channel.name
|
|
groups = collect_group_channels(groups_cat)
|
|
if group_channel.category == groups_cat:
|
|
await group_channel.edit(reason="Archive group " + group_name, category=archive_cat)
|
|
if group_name + "_vc" in groups:
|
|
await groups[group_name + "_vc"].delete(reason="Archive group " + group_name)
|
|
|
|
await ctx.send("Group " + group_name + " archived.")
|
|
elif group_channel.category == archive_cat:
|
|
await ctx.send("Group " + group_name + " is already archived.", hidden=True)
|
|
else:
|
|
await ctx.send(group_name + " is not a group")
|
|
|
|
|
|
@slash.subcommand(
|
|
base="groups",
|
|
name="unarchive",
|
|
description="Move a group from the archive to the main category",
|
|
options=[
|
|
create_option(
|
|
name="group_channel",
|
|
description="The group identified by it's channel",
|
|
option_type=SlashCommandOptionType.CHANNEL,
|
|
required=True
|
|
)
|
|
],
|
|
guild_ids=slash_guild_ids
|
|
)
|
|
async def groups_archive_slash(ctx: SlashContext, group_channel: discord.abc.GuildChannel):
|
|
if not await check_slash_context(ctx, require_manage_roles=True):
|
|
return
|
|
if not await check_text_channel(ctx, group_channel):
|
|
return
|
|
|
|
categories = await get_groups_categories(ctx)
|
|
if categories is None:
|
|
return
|
|
groups_cat, archive_cat = categories
|
|
|
|
group_name = group_channel.name
|
|
if group_channel.category == archive_cat:
|
|
await ctx.defer()
|
|
await group_channel.edit(reason="Archive group " + group_name, category=archive_cat)
|
|
await ctx.send("Group " + group_name + " archived.")
|
|
elif group_channel.category == groups_cat:
|
|
await ctx.send("Group " + group_name + " is not archived.", hidden=True)
|
|
else:
|
|
await ctx.send(group_name + " is not a group")
|
|
|
|
|
|
@slash.subcommand(
|
|
base="groups",
|
|
name="delete",
|
|
description="Move a group from the archive to the main category",
|
|
options=[
|
|
create_option(
|
|
name="group_channel",
|
|
description="The group identified by it's channel",
|
|
option_type=SlashCommandOptionType.CHANNEL,
|
|
required=True
|
|
)
|
|
],
|
|
guild_ids=slash_guild_ids
|
|
)
|
|
async def groups_delete_slash(ctx: SlashContext, group_channel: discord.abc.GuildChannel):
|
|
if not await check_slash_context(ctx, require_manage_roles=True):
|
|
return
|
|
if not await check_text_channel(ctx, group_channel):
|
|
return
|
|
group_channel: discord.TextChannel = group_channel
|
|
group_name = group_channel.name
|
|
|
|
await ctx.defer()
|
|
|
|
role = await find_role_case_insensitive(ctx.guild, group_channel.name, get_groups_role_prefix(ctx.guild_id))
|
|
if role is not None:
|
|
await role.delete(reason="Delete group " + group_name)
|
|
|
|
if group_channel.category:
|
|
vcs: List[discord.VoiceChannel] = group_channel.category.voice_channels
|
|
for vc in vcs:
|
|
if vc.name == group_name + "_vc":
|
|
await vc.delete(reason="Delete group " + group_name)
|
|
break
|
|
|
|
await group_channel.delete(reason="Delete group " + group_name)
|
|
await ctx.send("Group " + group_name + " deleted.")
|
|
|
|
|
|
@slash.subcommand(
|
|
base="groups",
|
|
name="create",
|
|
description="Creates a new group channel and role",
|
|
options=[
|
|
create_option(
|
|
name="name",
|
|
description="The name of the group",
|
|
option_type=str,
|
|
required=True
|
|
),
|
|
create_option(
|
|
name="members",
|
|
description="A comma-separated list of members",
|
|
option_type=str,
|
|
required=False
|
|
)
|
|
],
|
|
guild_ids=slash_guild_ids
|
|
)
|
|
async def groups_create_slash(ctx: SlashContext, name: str, members: Optional[str] = None):
|
|
if not await check_slash_context(ctx, require_manage_roles=True):
|
|
return
|
|
await ctx.defer()
|
|
name = name.strip()
|
|
|
|
groups_cat = await get_groups_category(ctx)
|
|
if not groups_cat:
|
|
return
|
|
|
|
cor = groups_cat.create_text_channel(name.lower(), reason="Create grooup " + name)
|
|
cor_role = ctx.guild.create_role(name=config_get("groups-role-prefix", ctx.guild_id) + name.lower(),
|
|
mentionable=True, reason="Create group " + name)
|
|
channel: discord.TextChannel = await cor
|
|
cor = channel.edit(sync_permissions=True)
|
|
role: discord.Role = await cor_role
|
|
await cor
|
|
await channel.set_permissions(role, reason="Create group " + name, read_messages=True)
|
|
|
|
extra = ""
|
|
if members is not None:
|
|
members = members.split(',')
|
|
converter = MemberConverter()
|
|
for member_str in members:
|
|
member_str = member_str.strip()
|
|
try:
|
|
member = await converter.convert(ctx, member_str)
|
|
await member.add_roles(role, reason="Create group " + name)
|
|
except discord.ext.commands.MemberNotFound:
|
|
extra += '\n*' + member_str + '*'
|
|
if extra:
|
|
extra = '\nCouldn\'t resolve users:' + extra
|
|
|
|
await asyncio.sleep(3)
|
|
await channel.send("Hi, " + role.mention)
|
|
await ctx.send("Group " + name + " created." + extra)
|
|
|
|
|
|
@slash.subcommand(
|
|
base="groups",
|
|
name="remove-vc",
|
|
description="Removes a temporary voice channel associated with the current group",
|
|
guild_ids=slash_guild_ids
|
|
)
|
|
async def groups_remove_vc_slash(ctx: SlashContext):
|
|
if not await check_slash_context(ctx):
|
|
return
|
|
|
|
groups_cat = await get_groups_category(ctx)
|
|
if not groups_cat:
|
|
return
|
|
|
|
if ctx.channel.category == groups_cat:
|
|
vc: Optional[discord.VoiceChannel] = discord.utils.find(
|
|
lambda _vc: _vc.name == ctx.channel.name + '_vc',
|
|
ctx.channel.category.voice_channels
|
|
)
|
|
if vc is None:
|
|
await ctx.send("Voice chat is not open!", hidden=True)
|
|
else:
|
|
await ctx.defer(hidden=True)
|
|
await vc.delete(reason="Delete temporary voice chat")
|
|
await ctx.send("Removed temporary voice chat.", hidden=True)
|
|
|
|
else:
|
|
await ctx.send("This channel is not an active group!", hidden=True)
|
|
|
|
|
|
@slash.subcommand(
|
|
base="groups",
|
|
name="vc",
|
|
description="Create a temporary voice chat. Available to anyone.",
|
|
guild_ids=slash_guild_ids
|
|
)
|
|
async def groups_vc_slash(ctx: SlashContext):
|
|
if ctx.channel.category is not None:
|
|
guild: discord.Guild = ctx.guild
|
|
channel: discord.TextChannel = ctx.channel
|
|
if channel.category.name.lower() == config_get("groups-category", guild.id).lower():
|
|
role = await find_role_case_insensitive(guild, channel.name, config_get("groups-role-prefix", guild.id))
|
|
if role is None:
|
|
await ctx.send("Couldn't resolve group role!", hidden=True)
|
|
return
|
|
|
|
category: discord.CategoryChannel = channel.category
|
|
for vc in category.voice_channels:
|
|
if vc.name.lower() == channel.name.lower() + "_vc":
|
|
await ctx.send("Temporary group channel already exists.", hidden=True)
|
|
return
|
|
|
|
await ctx.defer()
|
|
reason = "Create temporary vc for group " + channel.name
|
|
vc = await category.create_voice_channel(channel.name + "_vc", reason=reason)
|
|
await vc.edit(sync_permissions=True, reason=reason)
|
|
await vc.set_permissions(role, view_channel=True, connect=True, reason=reason)
|
|
await ctx.send("Created temporary vc for this group: " + vc.mention)
|
|
return
|
|
|
|
await ctx.send("Not an active group channel!", hidden=True)
|
|
|
|
|
|
# ___ _ _ ___ _____ _____ ____
|
|
# / _ \| | | |/ _ \_ _| ____/ ___|
|
|
# | | | | | | | | | || | | _| \___ \
|
|
# | |_| | |_| | |_| || | | |___ ___) |
|
|
# \__\_\\___/ \___/ |_| |_____|____/
|
|
|
|
def get_quotes(guild_id: int) -> Dict[str, List[str]]:
|
|
quotes: Optional[Dict[str, List[str]]] = config_get("guild_quotes", guild=guild_id)
|
|
if quotes is None:
|
|
quotes = {}
|
|
config_set_raw("guild_quotes", quotes, guild_id)
|
|
return quotes
|
|
|
|
|
|
def get_quotes_rolling_index(guild_id: int, author: str) -> Optional[int]:
|
|
quotes = get_quotes(guild_id)
|
|
if author not in quotes:
|
|
return None
|
|
|
|
quotes = quotes[author]
|
|
rolling_indeces = config_get("guild_quotes_rolling_indeces", guild=guild_id)
|
|
if rolling_indeces is None:
|
|
rolling_indeces = {}
|
|
|
|
if author not in rolling_indeces:
|
|
rolling_indeces[author] = 0
|
|
|
|
index = rolling_indeces[author]
|
|
rolling_indeces[author] = (index + 1) % len(quotes)
|
|
config_set_raw("guild_quotes_rolling_indeces", rolling_indeces, guild=guild_id)
|
|
return index
|
|
|
|
|
|
async def _do_quote(ctx: SlashContext, author: str, quote: str, tts: bool):
|
|
text = '\n'.join(map(lambda line: '> ' + line, quote.splitlines()))
|
|
await ctx.send(text + '\n *~' + author + '*')
|
|
|
|
if tts:
|
|
voice: discord.VoiceState = ctx.author.voice
|
|
if voice.channel is not None:
|
|
source, tts_destroyer = text_to_speech(author + " sagte: " + quote)
|
|
vp: discord.VoiceClient = await connect_and_play(voice.channel, source)
|
|
while vp.is_playing() and vp.channel == voice.channel:
|
|
await asyncio.sleep(0.5)
|
|
|
|
await vp.disconnect()
|
|
tts_destroyer()
|
|
|
|
|
|
@slash.subcommand(
|
|
base="quotes",
|
|
name="random",
|
|
description="Gets a random quote, optionally for the given author",
|
|
options=[
|
|
create_option(
|
|
name="author",
|
|
description="The author to filter by",
|
|
option_type=str,
|
|
required=False
|
|
),
|
|
create_option(
|
|
name="tts",
|
|
description="Text to speech to voice chat",
|
|
option_type=SlashCommandOptionType.BOOLEAN,
|
|
required=False
|
|
)
|
|
],
|
|
guild_ids=slash_guild_ids
|
|
)
|
|
async def quote_random_slash(ctx: SlashContext, author: Optional[str] = None, tts: bool = False):
|
|
if not await check_slash_context(ctx, False):
|
|
return
|
|
|
|
quotes = get_quotes(ctx.guild_id)
|
|
|
|
if author is None:
|
|
authors = quotes.keys()
|
|
if authors:
|
|
author = random.choice([*authors])
|
|
else:
|
|
await ctx.send('No quotes available')
|
|
return
|
|
|
|
if author not in quotes:
|
|
await ctx.send('No such author!', hidden=True)
|
|
return
|
|
|
|
author_quotes = quotes[author]
|
|
quote = random.choice(author_quotes)
|
|
await _do_quote(ctx, author, quote, tts)
|
|
|
|
|
|
@slash.subcommand(
|
|
base="quotes",
|
|
name="tell",
|
|
description="Recite a quote",
|
|
options=[
|
|
create_option(
|
|
name="author",
|
|
description="The author to recite from",
|
|
option_type=str,
|
|
required=True
|
|
),
|
|
create_option(
|
|
name="prefix",
|
|
description="Get a quote by prefix",
|
|
option_type=str,
|
|
required=False
|
|
),
|
|
create_option(
|
|
name="index",
|
|
description="Get a quote with a certain index",
|
|
option_type=SlashCommandOptionType.INTEGER,
|
|
required=False
|
|
),
|
|
create_option(
|
|
name="tts",
|
|
description="Text to speech to voice chat",
|
|
option_type=SlashCommandOptionType.BOOLEAN,
|
|
required=False
|
|
)
|
|
],
|
|
guild_ids=slash_guild_ids
|
|
)
|
|
async def quote_tell_slash(ctx: SlashContext, author: str, prefix: Optional[str] = None, index: Optional[int] = None,
|
|
tts: Optional[bool] = None):
|
|
if not await check_slash_context(ctx, False):
|
|
return
|
|
|
|
quotes = get_quotes(ctx.guild_id)[author]
|
|
|
|
if index is not None:
|
|
await _do_quote(ctx, author, quotes[index], tts)
|
|
return
|
|
if prefix is not None:
|
|
for quote in quotes:
|
|
if quote.lower().startswith(prefix.lower()):
|
|
await _do_quote(ctx, author, quote, tts)
|
|
return
|
|
|
|
index = get_quotes_rolling_index(ctx.guild_id, author)
|
|
if index is not None:
|
|
await _do_quote(ctx, author, quotes[index], tts)
|
|
|
|
|
|
@slash.subcommand(
|
|
base="quotes",
|
|
name="list",
|
|
description="Lists all authors or all quotes by a given author",
|
|
options=[
|
|
create_option(
|
|
name="author",
|
|
description="The author to filter by, use * for all authors",
|
|
option_type=str,
|
|
required=False
|
|
)
|
|
],
|
|
guild_ids=slash_guild_ids
|
|
)
|
|
async def quote_list_slash(ctx: SlashContext, author: Optional[str] = None):
|
|
if not await check_slash_context(ctx, False):
|
|
return
|
|
|
|
def create_list_embed(title: str, items: Iterable[str]) -> discord.Embed:
|
|
embed: discord.Embed = discord.Embed(
|
|
title=title,
|
|
description='\n'.join(items)
|
|
)
|
|
return embed
|
|
|
|
quotes = get_quotes(ctx.guild_id)
|
|
|
|
if author is None:
|
|
await ctx.send(embed=create_list_embed("Available authors", quotes.keys()))
|
|
elif author == '*':
|
|
coroutines = []
|
|
await ctx.defer()
|
|
for author, author_quotes in quotes.items():
|
|
coroutines.append(ctx.channel.send(embed=create_list_embed(author, author_quotes)))
|
|
await asyncio.gather(*coroutines)
|
|
await ctx.send(content="That was all.")
|
|
else:
|
|
author = author.strip().title()
|
|
if author in quotes:
|
|
await ctx.send(embed=create_list_embed(author, quotes[author]))
|
|
else:
|
|
await ctx.send("No such author: \"" + author + "\"", hidden=True)
|
|
|
|
|
|
@slash.subcommand(
|
|
base="quotes",
|
|
name="add",
|
|
description="Add a new quote",
|
|
options=[
|
|
create_option(
|
|
name="author",
|
|
description="The author of the new quote",
|
|
option_type=str,
|
|
required=True
|
|
),
|
|
create_option(
|
|
name="quote",
|
|
description="The quote to add",
|
|
option_type=str,
|
|
required=True
|
|
)
|
|
],
|
|
guild_ids=slash_guild_ids
|
|
)
|
|
async def quote_add_slash(ctx: SlashContext, author: str, quote: str):
|
|
if not await check_slash_context(ctx, False):
|
|
return
|
|
|
|
ignored_character_regex = re.compile(r'\W')
|
|
|
|
def prepare_quote_for_compare(_quote: str) -> str:
|
|
return ignored_character_regex.sub('', _quote).lower()
|
|
|
|
quotes = get_quotes(ctx.guild_id)
|
|
|
|
author = author.strip().title()
|
|
quote = quote.strip()
|
|
quote = quote[0].upper() + quote[1::]
|
|
if author in quotes:
|
|
author_quotes: List[str] = quotes[author]
|
|
stripped_quote = prepare_quote_for_compare(quote)
|
|
for aq in author_quotes:
|
|
if prepare_quote_for_compare(aq) == stripped_quote:
|
|
await ctx.send("Quote already exists!", hidden=True)
|
|
return
|
|
|
|
author_quotes.append(quote)
|
|
config_save()
|
|
await ctx.send("Quote added.")
|
|
|
|
else:
|
|
quotes[author] = [quote]
|
|
config_save()
|
|
await ctx.send("Author and quote added")
|
|
|
|
|
|
@slash.subcommand(
|
|
base="quotes",
|
|
name="remove",
|
|
description="Removes a quote. Requires administrator permissions.",
|
|
options=[
|
|
create_option(
|
|
name="author",
|
|
description="The author to delete from",
|
|
option_type=str,
|
|
required=True
|
|
),
|
|
create_option(
|
|
name="quote",
|
|
description="The exact quote to remove",
|
|
option_type=str,
|
|
required=False
|
|
),
|
|
create_option(
|
|
name="quote_position",
|
|
description="The quote position to delete",
|
|
option_type=SlashCommandOptionType.INTEGER,
|
|
required=False
|
|
)
|
|
],
|
|
guild_ids=slash_guild_ids
|
|
)
|
|
async def quote_remove_slash(ctx: SlashContext, author: str, quote: Optional[str] = None,
|
|
quote_position: Optional[int] = None):
|
|
if not await check_slash_context(ctx, True):
|
|
return
|
|
|
|
quotes = get_quotes(ctx.guild_id)
|
|
|
|
if quotes is None:
|
|
await ctx.send("No quotes available!", hidden=True)
|
|
else:
|
|
author = author.strip().title()
|
|
if author not in quotes:
|
|
await ctx.send("No quotes from author \"" + author + "\"!", hidden=True)
|
|
return
|
|
|
|
author_quotes = quotes[author]
|
|
|
|
if quote is not None:
|
|
quote = quote.strip().lower()
|
|
for aq in author_quotes:
|
|
if aq.lower() == quote:
|
|
author_quotes.remove(aq)
|
|
if not author_quotes:
|
|
quotes.pop(author)
|
|
|
|
await ctx.send("Removed quote from author \"" + author + "\"")
|
|
config_save()
|
|
return
|
|
|
|
await ctx.send("No such quote found!", hidden=True)
|
|
|
|
elif quote_position is not None:
|
|
if 0 <= quote_position < len(author_quotes):
|
|
author_quotes.pop(quote_position)
|
|
if not author_quotes:
|
|
quotes.pop(author)
|
|
|
|
await ctx.send("Removed quote from author \"" + author + "\"")
|
|
config_save()
|
|
return
|
|
|
|
await ctx.send(
|
|
"Invalid quote position - must be within the range of 0 and " + str(len(author_quotes)) + "!",
|
|
hidden=True)
|
|
|
|
else:
|
|
await ctx.send("Either quote or quote_position must be used!", hidden=True)
|
|
|
|
|
|
# ____ ___ _ _ _____ ___ ____
|
|
# / ___/ _ \| \ | | ___|_ _/ ___|
|
|
# | | | | | | \| | |_ | | | _
|
|
# | |__| |_| | |\ | _| | | |_| |
|
|
# \____\___/|_| \_|_| |___\____|
|
|
|
|
@slash.slash(
|
|
name="config",
|
|
description="Change or query the configuration of this bot",
|
|
guild_ids=slash_guild_ids
|
|
)
|
|
async def config_slash(ctx: SlashContext):
|
|
await ctx.send("missingno", hidden=True)
|
|
|
|
|
|
@slash.subcommand(
|
|
base="config",
|
|
name="list",
|
|
description="List available config options",
|
|
guild_ids=slash_guild_ids
|
|
)
|
|
async def config_list_slash(ctx: SlashContext):
|
|
msg = 'Available config options:'
|
|
for (key, value) in config_get_descriptions():
|
|
msg += '\n - `' + key + '`: ' + value[1]
|
|
await ctx.send(msg, hidden=True)
|
|
|
|
|
|
@slash.subcommand(
|
|
base="config",
|
|
name="get",
|
|
description="Queries the configuration for a value",
|
|
options=[
|
|
config_option
|
|
],
|
|
guild_ids=slash_guild_ids
|
|
)
|
|
async def config_get_slash(ctx: SlashContext, config_key: str):
|
|
if not await check_slash_context(ctx, False):
|
|
return
|
|
if config_key in config_meta and config_meta[config_key][0]:
|
|
await ctx.send('`' + config_key + '` is set to `' + str(config_get(config_key, ctx.guild.id)) + '`',
|
|
hidden=True)
|
|
else:
|
|
await ctx.send('Unknown config option `' + config_key + '`', hidden=True)
|
|
|
|
|
|
@slash.subcommand(
|
|
base="config",
|
|
name="set",
|
|
description="Changes a configuration entry",
|
|
options=[
|
|
config_option,
|
|
create_option(
|
|
name="value",
|
|
description="The value to set",
|
|
option_type=str,
|
|
required=True
|
|
)
|
|
],
|
|
guild_ids=slash_guild_ids
|
|
)
|
|
async def config_set_slash(ctx: SlashContext, config_key: str, value: str):
|
|
if not await check_slash_context(ctx, True):
|
|
return
|
|
if ctx.author.guild_permissions.administrator:
|
|
await ctx.send(config_set(config_key, value, ctx.guild.id), hidden=True)
|
|
else:
|
|
await ctx.send('You\'re not allowed to change the configuration on this server!', hidden=True)
|
|
|
|
|
|
bot.run(TOKEN)
|