500 lines
20 KiB
Python
500 lines
20 KiB
Python
import asyncio
|
|
import os
|
|
import random
|
|
import re
|
|
import time
|
|
from types import coroutine
|
|
from typing import Optional, List
|
|
|
|
import discord
|
|
import gtts
|
|
from discord.ext import commands
|
|
from dotenv import load_dotenv
|
|
|
|
from lib.config import config, config_meta, config_load, config_save, config_get, config_set, config_get_descriptions, \
|
|
config_set_raw
|
|
from lib.utils import async_filter, find_category
|
|
|
|
load_dotenv()
|
|
TOKEN = os.getenv('DISCORD_TOKEN')
|
|
|
|
PING_REGEX = re.compile(r'\b(?:ge)?ping', re.IGNORECASE)
|
|
TOPFIT_REGEX = re.compile(r'\b(topfit|fit|top|micro|microsoft|virtual reality|vr|ä+h*m*|hä*)\b', re.IGNORECASE)
|
|
LOEH_REGEX = re.compile(r'Och L(?:oe|ö)h!?', re.IGNORECASE)
|
|
OCH_ANYONE_REGEX = re.compile(r'^Och\s*(\w.*?)\s*[,.!?:]*$', re.IGNORECASE)
|
|
|
|
TOPFIT_WORDS = (
|
|
"äh",
|
|
"ähm",
|
|
"ääh",
|
|
"äääh",
|
|
"ä",
|
|
"..",
|
|
"...",
|
|
"Mi",
|
|
"Mic",
|
|
"Micro",
|
|
"Microsoft",
|
|
"Microservices",
|
|
"top",
|
|
"vortual reality"
|
|
)
|
|
|
|
# LOEH_ID = 327126546970312739
|
|
LOEH_ID = 254265844928872448
|
|
OWNER_ID = 327126546970312739
|
|
|
|
OCH_LOEH_SOUND = "assets/och_loeh.mp3"
|
|
|
|
config_load()
|
|
intents = discord.Intents.default()
|
|
intents.members = True
|
|
bot = commands.Bot(command_prefix=config.get('prefix'), intents=intents)
|
|
|
|
if 'LIBOPUS' in os.environ and not len(os.environ['LIBOPUS']) == 0:
|
|
discord.opus.load_opus(os.environ['LIBOPUS'])
|
|
|
|
|
|
async def _get_loeh(guild: discord.Guild) -> Optional[discord.Member]:
|
|
try:
|
|
return await guild.fetch_member(LOEH_ID)
|
|
except discord.NotFound:
|
|
return None
|
|
|
|
|
|
def _get_last_och_time(guild: discord.Guild) -> int:
|
|
return config_get('last-och-time', guild.id)
|
|
|
|
|
|
def _set_last_och_time(guild: discord.Guild):
|
|
config_set_raw('last-och-time', time.time(), guild.id)
|
|
|
|
|
|
@bot.event
|
|
async def on_ready():
|
|
print(f'{bot.user} has connected to Discord!')
|
|
|
|
|
|
@bot.event
|
|
async def on_message(message: discord.Message):
|
|
if message.guild is not None and not message.author.bot:
|
|
if config_get('loeh-enable', message.guild.id) and LOEH_REGEX.match(message.content):
|
|
if message.author.id == LOEH_ID:
|
|
await message.channel.send("https://siphalor.de/img/spidy-is-that-you.jpg")
|
|
else:
|
|
cooldown = config_get('och-cooldown', message.guild.id)
|
|
t = time.time() - _get_last_och_time(message.guild)
|
|
if t <= cooldown:
|
|
await message.channel.send(
|
|
'429: Too many requests - Wait ' + str(int(cooldown - t)) + ' more seconds!')
|
|
return
|
|
_set_last_och_time(message.guild)
|
|
|
|
loeh = await _get_loeh(message.guild)
|
|
if loeh is None:
|
|
await message.channel.send('404: Löh not found!')
|
|
elif loeh.voice is None:
|
|
await message.channel.send('400: Löh not connected!')
|
|
else:
|
|
voice: discord.VoiceState = loeh.voice
|
|
try:
|
|
voice_channel: discord.VoiceChannel = voice.channel
|
|
voice_protocol: discord.VoiceProtocol = await voice_channel.connect()
|
|
if type(voice_protocol) is discord.VoiceClient:
|
|
source = discord.PCMVolumeTransformer(discord.FFmpegPCMAudio(source=OCH_LOEH_SOUND))
|
|
voice_protocol.play(source)
|
|
await loeh.edit(mute=True)
|
|
sleeper = asyncio.sleep(config_get('och-timeout', message.guild.id))
|
|
|
|
message: Optional[discord.Message] = await message.channel.send('Zu Befehl!')
|
|
await sleeper
|
|
await loeh.edit(mute=False)
|
|
if message is not None:
|
|
await message.edit(content="~~Zu Befehl!~~\nEs sei ihm verziehen.")
|
|
if type(voice_protocol) is discord.VoiceClient:
|
|
await voice_protocol.disconnect()
|
|
except (asyncio.TimeoutError, discord.Forbidden, discord.HTTPException, discord.ClientException):
|
|
await message.channel.send('Failed to complete your command, Sir')
|
|
return
|
|
elif config_get('och-anyone-enable', message.guild.id) and (
|
|
match := OCH_ANYONE_REGEX.search(message.content)) is not None:
|
|
cooldown = config_get('och-cooldown', message.guild.id)
|
|
t = time.time() - _get_last_och_time(message.guild)
|
|
if t <= cooldown:
|
|
await message.channel.send('429: Too many requests - Wait ' + str(int(cooldown - t)) + ' more seconds!')
|
|
return
|
|
_set_last_och_time(message.guild)
|
|
|
|
guild: discord.Guild = message.guild
|
|
bot_member: discord.Member = guild.get_member(bot.user.id)
|
|
search: str = match.group(1).lower()
|
|
matches: list = []
|
|
voice: Optional[discord.VoiceChannel] = None
|
|
|
|
async for member in guild.fetch_members(limit=100):
|
|
if not member.bot and member.voice is not None and search in member.display_name.lower().split(' '):
|
|
perms: discord.Permissions = member.voice.channel.permissions_for(bot_member)
|
|
if perms.connect and perms.speak:
|
|
voice = member.voice.channel
|
|
matches.append(member)
|
|
|
|
if matches and voice is not None:
|
|
voice_protocol: discord.VoiceProtocol = await voice.connect()
|
|
if type(voice_protocol) is discord.VoiceClient:
|
|
tts = gtts.gTTS(message.content, lang='de')
|
|
os.makedirs('temp', exist_ok=True)
|
|
tts.save('temp/och.mp3')
|
|
source = discord.PCMVolumeTransformer(
|
|
discord.FFmpegPCMAudio(source='temp/och.mp3', before_options='-v quiet')
|
|
)
|
|
voice_protocol.play(source)
|
|
|
|
async def _mute(m: discord.Member):
|
|
await m.edit(mute=True)
|
|
|
|
async def _unmute(m: discord.Member):
|
|
await m.edit(mute=False)
|
|
|
|
await asyncio.gather(*map(_mute, matches))
|
|
|
|
sleeper = asyncio.sleep(config_get('och-timeout', message.guild.id))
|
|
message: Optional[discord.Message] = await message.channel.send('Auf gehts!')
|
|
await sleeper
|
|
|
|
await asyncio.gather(*map(_unmute, matches))
|
|
|
|
waiter = None
|
|
if message is not None:
|
|
waiter = message.edit(content='~~Auf gehts!~~\nGeschafft!')
|
|
if type(voice_protocol) is discord.VoiceClient:
|
|
await voice_protocol.disconnect()
|
|
if waiter is not None:
|
|
await waiter
|
|
|
|
os.remove('temp/och.mp3')
|
|
else:
|
|
await message.channel.send('404: No users found!')
|
|
elif config_get('inf19x-insiders-enable', message.guild.id):
|
|
if PING_REGEX.search(message.content):
|
|
embed = discord.Embed(
|
|
title="*pinken, schwaches Verb*",
|
|
description="ein Netzwerkgerät testweise ansprechen.\nOft falsch geschrieben als pin__g__en",
|
|
color=16636435
|
|
)
|
|
embed.add_field(name='ich', value='pinke')
|
|
embed.add_field(name='du', value='pinkst')
|
|
embed.add_field(name='er|sie|es', value='pinkt')
|
|
embed.add_field(name='wir', value='pinken')
|
|
embed.add_field(name='ihr', value='pinkt')
|
|
embed.add_field(name='sie', value='pinken')
|
|
embed.add_field(name='Partizip 2', value='gepinkt')
|
|
await message.channel.send(embed=embed)
|
|
else:
|
|
match = TOPFIT_REGEX.search(message.content)
|
|
if match is not None:
|
|
text = ''
|
|
for i in range(0, random.randint(7, 13)):
|
|
text += random.choice(TOPFIT_WORDS) + " "
|
|
text += match.group(1)
|
|
await message.channel.send(text)
|
|
await bot.process_commands(message)
|
|
|
|
|
|
@bot.command(name='random_message', brief='Select a random message from a channel')
|
|
async def random_message_command(ctx: commands.Context, channel: Optional[discord.TextChannel] = None,
|
|
max_cnt: Optional[int] = 100, reaction_filter: Optional[str] = None):
|
|
typing = ctx.channel.trigger_typing()
|
|
if channel is None:
|
|
channel = ctx.channel
|
|
messages = async_filter(lambda m: _is_message_valid_for_selection(m, reaction_filter),
|
|
channel.history(limit=max_cnt))
|
|
messages = [item async for item in messages]
|
|
if not messages:
|
|
await typing
|
|
await ctx.channel.send("No valid messages found!")
|
|
return
|
|
msg: discord.Message = random.choice(messages)
|
|
author: discord.abc.User = msg.author
|
|
embed = discord.Embed(
|
|
description=msg.content + "\n\n[Go to message](" + msg.jump_url + ")"
|
|
)
|
|
embed.set_author(name=author.display_name, icon_url=author.avatar_url)
|
|
embed.set_footer(text="random message from #" + channel.name + " out of " + str(len(messages)) + " messages")
|
|
await typing
|
|
await ctx.channel.send(embed=embed)
|
|
|
|
|
|
@bot.command(name='collect_messages', brief='Lists all messages with the given filters')
|
|
async def collect_messages_command(ctx: commands.Context, channel: Optional[discord.TextChannel] = None,
|
|
max_cnt: Optional[int] = 100, reaction_filter: Optional[str] = None):
|
|
typing = ctx.channel.trigger_typing()
|
|
if channel is None:
|
|
channel = ctx.channel
|
|
messages = async_filter(lambda m: _is_message_valid_for_selection(m, reaction_filter),
|
|
channel.history(limit=max_cnt))
|
|
messages = [item async for item in messages]
|
|
embed = discord.Embed(
|
|
title="List of matching messages",
|
|
description='\n'.join([m.content for m in messages])
|
|
)
|
|
await typing
|
|
await ctx.channel.send(embed=embed)
|
|
|
|
|
|
@bot.command(name='flip_pipelin', brief='Flip a pipelin')
|
|
async def flip_pipelin_command(ctx: commands.Context):
|
|
await ctx.channel.send("Pipelin " + random.choice(['schnel', 'langsam']))
|
|
|
|
|
|
@bot.command(name='quote', brief='Get a random quote or add new ones')
|
|
async def quote_command(ctx: commands.Context, author: Optional[str], *, quote: Optional[str]):
|
|
quotes: Optional[dict[str, List[str]]] = config_get("guild_quotes", guild=ctx.guild.id)
|
|
if quotes is None:
|
|
quotes = {}
|
|
config_set_raw("guild_quotes", quotes, ctx.guild.id)
|
|
|
|
if author is None:
|
|
authors = quotes.keys()
|
|
if authors:
|
|
author = random.choice([*authors])
|
|
author_quotes = quotes[author]
|
|
quote = '\n'.join(map(lambda line: '> ' + line, random.choice(author_quotes).splitlines()))
|
|
await ctx.channel.send(quote + '\n *~' + author + '*')
|
|
else:
|
|
await ctx.channel.send("No quotes present!")
|
|
elif author == 'list':
|
|
def create_list_embed(quote_author: str, _quotes: List[str]) -> discord.Embed:
|
|
embed: discord.Embed = discord.Embed(
|
|
title=quote_author,
|
|
description='\n'.join(_quotes)
|
|
)
|
|
return embed
|
|
|
|
if quote is None:
|
|
coroutines = []
|
|
for author, author_quotes in quotes.items():
|
|
coroutines.append(ctx.send(embed=create_list_embed(author, author_quotes)))
|
|
await asyncio.gather(*coroutines)
|
|
else:
|
|
author = quote.strip().title()
|
|
if author in quotes:
|
|
await ctx.send(embed=create_list_embed(author, quotes[author]))
|
|
else:
|
|
await ctx.send("No such author!")
|
|
else:
|
|
author = author.strip().title()
|
|
if quote:
|
|
quote = quote.strip()
|
|
quote = quote[0].upper() + quote[1::]
|
|
if author in quotes:
|
|
author_quotes: List[str] = quotes[author]
|
|
regex = re.compile(r'\W')
|
|
stripped_quote = regex.sub('', quote).lower()
|
|
for aq in author_quotes:
|
|
if regex.sub('', aq).lower() == stripped_quote:
|
|
await ctx.channel.send("Quote already exists!")
|
|
return
|
|
|
|
author_quotes.append(quote)
|
|
config_save()
|
|
await ctx.channel.send("Quote added")
|
|
|
|
else:
|
|
quotes[author] = [quote]
|
|
config_save()
|
|
await ctx.channel.send("Quote added")
|
|
else:
|
|
if author in quotes:
|
|
quote = random.choice(quotes[author])
|
|
quote = '\n'.join(map(lambda line: '> ' + line, quote.splitlines()))
|
|
await ctx.channel.send(quote + '\n *~ ' + author + '*')
|
|
else:
|
|
await ctx.channel.send("Unknown author!")
|
|
|
|
|
|
@bot.command(name='quote_remove', brief='Remove a quote')
|
|
async def quote_remove_command(ctx: commands.Context, author: str, quote: str):
|
|
if not ctx.author.guild_permissions.administrator:
|
|
await ctx.channel.send("Only admins are allowed to remove quotes!")
|
|
quotes: Optional[dict[str, List[str]]] = config_get("guild_quotes", guild=ctx.guild.id)
|
|
if quotes is None:
|
|
await ctx.channel.send("No quotes available!")
|
|
|
|
else:
|
|
quote = quote.strip().lower()
|
|
author = author.strip().title()
|
|
if author in quotes:
|
|
for q in quotes[author]:
|
|
if q.lower() == quote:
|
|
quotes[author].remove(q)
|
|
if not quotes[author]:
|
|
quotes.pop(author)
|
|
await ctx.channel.send("Successfully removed quote!")
|
|
config_save()
|
|
return
|
|
# No matching quote found
|
|
try:
|
|
quote = int(quote)
|
|
quotes[author].pop(quote)
|
|
if not quotes[author]:
|
|
quotes.pop(author)
|
|
await ctx.channel.send("Successfully removed quote!")
|
|
config_save()
|
|
return
|
|
except ValueError:
|
|
pass
|
|
|
|
await ctx.channel.send("No such quote found!")
|
|
|
|
|
|
@bot.command(name='groups', brief="Manage groups")
|
|
async def group_command(ctx: commands.Context, subcommand: Optional[str], arg: Optional[str],
|
|
members: commands.Greedy[discord.Member]):
|
|
if subcommand is None:
|
|
await ctx.send("Available commands: `list`, `create`, `archive`")
|
|
return
|
|
|
|
guild: discord.Guild = ctx.guild
|
|
role_prefix = config_get("groups-role-prefix", guild.id)
|
|
|
|
def collect_group_channels(cat: discord.CategoryChannel) -> dict[str, discord.TextChannel]:
|
|
return {channel.name: channel for channel in cat.text_channels}
|
|
|
|
async def collect_group_roles() -> list[discord.Role]:
|
|
return list(filter(lambda role: role.name.startswith(role_prefix), await guild.fetch_roles()))
|
|
|
|
async def fail_category(type: str, expected: str):
|
|
await ctx.send("Unable to find channel category \"" + expected + "\" for " + type + ". Change in configs.")
|
|
|
|
def link_channel(channel: discord.TextChannel, italic: bool = False) -> str:
|
|
if italic:
|
|
return '[*' + channel.name + '*](https://discord.com/channels/' + str(guild.id) + '/' + str(channel.id) + ')'
|
|
return '[' + channel.name + '](https://discord.com/channels/' + str(guild.id) + '/' + str(channel.id) + ')'
|
|
|
|
groups_cat = find_category(guild, config_get("groups-category", guild.id))
|
|
if groups_cat is None:
|
|
await fail_category("groups", config_get("groups-category", guild.id))
|
|
return
|
|
|
|
if subcommand == 'list':
|
|
archive_cat = find_category(guild, config_get("groups-archive-category", guild.id))
|
|
if archive_cat is None:
|
|
await fail_category("archive", config_get("groups-archive-category", guild.id))
|
|
return
|
|
|
|
active_groups = collect_group_channels(groups_cat)
|
|
archived_groups = collect_group_channels(archive_cat)
|
|
|
|
msg = ""
|
|
for role in await collect_group_roles():
|
|
name = role.name[len(role_prefix):]
|
|
if name in active_groups:
|
|
msg += link_channel(active_groups[name]) + "\n"
|
|
elif name in archived_groups:
|
|
msg += link_channel(archived_groups[name], True) + "\n"
|
|
else:
|
|
msg += "*" + name + "*\n"
|
|
|
|
embed = discord.Embed(title="Groups", description=msg)
|
|
embed.set_footer(text="Italic groups are archived or unavailable.")
|
|
await ctx.send(embed=embed)
|
|
elif subcommand == 'archive':
|
|
if arg is None:
|
|
await ctx.send("Group name required!")
|
|
return
|
|
|
|
groups = collect_group_channels(groups_cat)
|
|
if arg in groups:
|
|
archive_cat = find_category(guild, config_get("groups-archive-category", guild.id))
|
|
if archive_cat is None:
|
|
await fail_category("archive", config_get("groups-archive-category", guild.id))
|
|
return
|
|
await groups[arg].edit(reason="Archive group " + arg, category=archive_cat)
|
|
|
|
else:
|
|
await ctx.send("Can't find that group!")
|
|
elif subcommand == 'create':
|
|
if arg is None:
|
|
await ctx.send("Group name required!")
|
|
return
|
|
|
|
arg.strip()
|
|
|
|
cor = groups_cat.create_text_channel(arg.lower(), reason="Create group " + arg)
|
|
cor_role = guild.create_role(name=config_get("groups-role-prefix", guild.id) + arg,
|
|
mentionable=True, reason="Create group " + arg)
|
|
channel: discord.TextChannel = await cor
|
|
cor = channel.edit(sync_permissions=True)
|
|
role: discord.Role = await cor_role
|
|
await cor
|
|
await channel.set_permissions(role, reason="Create group " + arg, read_messages=True)
|
|
if members:
|
|
for member in members:
|
|
await member.add_roles(role, reason="Create group " + arg)
|
|
await channel.send("Hi, @" + role.name)
|
|
|
|
|
|
def _is_message_valid_for_selection(message: discord.Message, reaction_filter: Optional[str] = None) -> bool:
|
|
if message.clean_content.strip() == '':
|
|
return False
|
|
|
|
for reaction in message.reactions:
|
|
if reaction.emoji == '❌':
|
|
return False
|
|
|
|
if reaction_filter is not None:
|
|
for reaction in message.reactions:
|
|
if type(reaction.emoji) is str:
|
|
if reaction.emoji == reaction_filter:
|
|
return True
|
|
elif type(reaction.emoji) is discord.Emoji or type(reaction.emoji) is discord.PartialEmoji:
|
|
if ':' + reaction.emoji.name + ':' == reaction_filter:
|
|
return True
|
|
return False
|
|
|
|
return True
|
|
|
|
|
|
@bot.command(name='config', brief='Change the configuration of this bot')
|
|
async def config_prefix_command(ctx: commands.Context, cmd: str = '', key: str = '', *, val: str = ''):
|
|
if ctx.guild is None:
|
|
await ctx.send('You can\'t run config commands in private messages!')
|
|
return
|
|
if cmd == '':
|
|
await ctx.send('Use `' + bot.command_prefix + 'config get` or `' + bot.command_prefix +
|
|
'config set` to query for or update config values. Use `' + bot.command_prefix +
|
|
'config list` to see all config options')
|
|
elif cmd == 'list':
|
|
msg = 'Available config options:'
|
|
for (key, value) in config_get_descriptions():
|
|
msg += '\n - `' + key + '`: ' + value[1]
|
|
await ctx.send(msg)
|
|
elif cmd == 'get':
|
|
if key in config:
|
|
await ctx.send('`' + key + '` is set to `' + str(config_get(key, ctx.guild.id)) + '`')
|
|
else:
|
|
await ctx.send('Unknown config option `' + key + '`')
|
|
elif cmd == 'set':
|
|
if ctx.author.guild_permissions.administrator:
|
|
await ctx.send(config_set(key, val, ctx.guild.id))
|
|
else:
|
|
await ctx.send('You\'re not allowed to change the configuration on this server!')
|
|
elif cmd == 'get-global':
|
|
if key in config:
|
|
await ctx.send('`' + key + '` is globally set to `' + str(config_get(key)) + '`')
|
|
else:
|
|
await ctx.send('Unknown config option `' + key + '`')
|
|
elif cmd == 'set-global':
|
|
if ctx.author.id == OWNER_ID:
|
|
msg = config_set(key, val)
|
|
if key == 'prefix':
|
|
bot.command_prefix = config_get('prefix')
|
|
await ctx.send(msg)
|
|
else:
|
|
await ctx.send('You\'re not allowed to change the global configuration!')
|
|
else:
|
|
await ctx.send('Unknown command!')
|
|
|
|
|
|
bot.run(TOKEN)
|