import asyncio import os import random import re import time from types import coroutine from typing import Optional, List import discord import gtts from discord.ext import commands from dotenv import load_dotenv from lib.config import config, config_meta, config_load, config_save, config_get, config_set, config_get_descriptions, \ config_set_raw from lib.utils import async_filter, find_category load_dotenv() TOKEN = os.getenv('DISCORD_TOKEN') PING_REGEX = re.compile(r'\b(?:ge)?ping', re.IGNORECASE) TOPFIT_REGEX = re.compile(r'\b(topfit|fit|top|micro|microsoft|virtual reality|vr|ä+h*m*|hä*)\b', re.IGNORECASE) LOEH_REGEX = re.compile(r'Och L(?:oe|ö)h!?', re.IGNORECASE) OCH_ANYONE_REGEX = re.compile(r'^Och\s*(\w.*?)\s*[,.!?:]*$', re.IGNORECASE) TOPFIT_WORDS = ( "äh", "ähm", "ääh", "äääh", "ä", "..", "...", "Mi", "Mic", "Micro", "Microsoft", "Microservices", "top", "vortual reality" ) # LOEH_ID = 327126546970312739 LOEH_ID = 254265844928872448 OWNER_ID = 327126546970312739 OCH_LOEH_SOUND = "assets/och_loeh.mp3" config_load() intents = discord.Intents.default() intents.members = True bot = commands.Bot(command_prefix=config.get('prefix'), intents=intents) if 'LIBOPUS' in os.environ and not len(os.environ['LIBOPUS']) == 0: discord.opus.load_opus(os.environ['LIBOPUS']) async def _get_loeh(guild: discord.Guild) -> Optional[discord.Member]: try: return await guild.fetch_member(LOEH_ID) except discord.NotFound: return None def _get_last_och_time(guild: discord.Guild) -> int: return config_get('last-och-time', guild.id) def _set_last_och_time(guild: discord.Guild): config_set_raw('last-och-time', time.time(), guild.id) @bot.event async def on_ready(): print(f'{bot.user} has connected to Discord!') @bot.event async def on_message(message: discord.Message): if message.guild is not None and not message.author.bot: if config_get('loeh-enable', message.guild.id) and LOEH_REGEX.match(message.content): if message.author.id == LOEH_ID: await message.channel.send("https://siphalor.de/img/spidy-is-that-you.jpg") else: cooldown = config_get('och-cooldown', message.guild.id) t = time.time() - _get_last_och_time(message.guild) if t <= cooldown: await message.channel.send( '429: Too many requests - Wait ' + str(int(cooldown - t)) + ' more seconds!') return _set_last_och_time(message.guild) loeh = await _get_loeh(message.guild) if loeh is None: await message.channel.send('404: Löh not found!') elif loeh.voice is None: await message.channel.send('400: Löh not connected!') else: voice: discord.VoiceState = loeh.voice try: voice_channel: discord.VoiceChannel = voice.channel voice_protocol: discord.VoiceProtocol = await voice_channel.connect() if type(voice_protocol) is discord.VoiceClient: source = discord.PCMVolumeTransformer(discord.FFmpegPCMAudio(source=OCH_LOEH_SOUND)) voice_protocol.play(source) await loeh.edit(mute=True) sleeper = asyncio.sleep(config_get('och-timeout', message.guild.id)) message: Optional[discord.Message] = await message.channel.send('Zu Befehl!') await sleeper await loeh.edit(mute=False) if message is not None: await message.edit(content="~~Zu Befehl!~~\nEs sei ihm verziehen.") if type(voice_protocol) is discord.VoiceClient: await voice_protocol.disconnect() except (asyncio.TimeoutError, discord.Forbidden, discord.HTTPException, discord.ClientException): await message.channel.send('Failed to complete your command, Sir') return elif config_get('och-anyone-enable', message.guild.id) and ( match := OCH_ANYONE_REGEX.search(message.content)) is not None: cooldown = config_get('och-cooldown', message.guild.id) t = time.time() - _get_last_och_time(message.guild) if t <= cooldown: await message.channel.send('429: Too many requests - Wait ' + str(int(cooldown - t)) + ' more seconds!') return _set_last_och_time(message.guild) guild: discord.Guild = message.guild bot_member: discord.Member = guild.get_member(bot.user.id) search: str = match.group(1).lower() matches: list = [] voice: Optional[discord.VoiceChannel] = None async for member in guild.fetch_members(limit=100): if not member.bot and member.voice is not None and search in member.display_name.lower().split(' '): perms: discord.Permissions = member.voice.channel.permissions_for(bot_member) if perms.connect and perms.speak: voice = member.voice.channel matches.append(member) if matches and voice is not None: voice_protocol: discord.VoiceProtocol = await voice.connect() if type(voice_protocol) is discord.VoiceClient: tts = gtts.gTTS(message.content, lang='de') os.makedirs('temp', exist_ok=True) tts.save('temp/och.mp3') source = discord.PCMVolumeTransformer( discord.FFmpegPCMAudio(source='temp/och.mp3', before_options='-v quiet') ) voice_protocol.play(source) async def _mute(m: discord.Member): await m.edit(mute=True) async def _unmute(m: discord.Member): await m.edit(mute=False) await asyncio.gather(*map(_mute, matches)) sleeper = asyncio.sleep(config_get('och-timeout', message.guild.id)) message: Optional[discord.Message] = await message.channel.send('Auf gehts!') await sleeper await asyncio.gather(*map(_unmute, matches)) waiter = None if message is not None: waiter = message.edit(content='~~Auf gehts!~~\nGeschafft!') if type(voice_protocol) is discord.VoiceClient: await voice_protocol.disconnect() if waiter is not None: await waiter os.remove('temp/och.mp3') else: await message.channel.send('404: No users found!') elif config_get('inf19x-insiders-enable', message.guild.id): if PING_REGEX.search(message.content): embed = discord.Embed( title="*pinken, schwaches Verb*", description="ein Netzwerkgerät testweise ansprechen.\nOft falsch geschrieben als pin__g__en", color=16636435 ) embed.add_field(name='ich', value='pinke') embed.add_field(name='du', value='pinkst') embed.add_field(name='er|sie|es', value='pinkt') embed.add_field(name='wir', value='pinken') embed.add_field(name='ihr', value='pinkt') embed.add_field(name='sie', value='pinken') embed.add_field(name='Partizip 2', value='gepinkt') await message.channel.send(embed=embed) else: match = TOPFIT_REGEX.search(message.content) if match is not None: text = '' for i in range(0, random.randint(7, 13)): text += random.choice(TOPFIT_WORDS) + " " text += match.group(1) await message.channel.send(text) await bot.process_commands(message) @bot.command(name='random_message', brief='Select a random message from a channel') async def random_message_command(ctx: commands.Context, channel: Optional[discord.TextChannel] = None, max_cnt: Optional[int] = 100, reaction_filter: Optional[str] = None): typing = ctx.channel.trigger_typing() if channel is None: channel = ctx.channel messages = async_filter(lambda m: _is_message_valid_for_selection(m, reaction_filter), channel.history(limit=max_cnt)) messages = [item async for item in messages] if not messages: await typing await ctx.channel.send("No valid messages found!") return msg: discord.Message = random.choice(messages) author: discord.abc.User = msg.author embed = discord.Embed( description=msg.content + "\n\n[Go to message](" + msg.jump_url + ")" ) embed.set_author(name=author.display_name, icon_url=author.avatar_url) embed.set_footer(text="random message from #" + channel.name + " out of " + str(len(messages)) + " messages") await typing await ctx.channel.send(embed=embed) @bot.command(name='collect_messages', brief='Lists all messages with the given filters') async def collect_messages_command(ctx: commands.Context, channel: Optional[discord.TextChannel] = None, max_cnt: Optional[int] = 100, reaction_filter: Optional[str] = None): typing = ctx.channel.trigger_typing() if channel is None: channel = ctx.channel messages = async_filter(lambda m: _is_message_valid_for_selection(m, reaction_filter), channel.history(limit=max_cnt)) messages = [item async for item in messages] embed = discord.Embed( title="List of matching messages", description='\n'.join([m.content for m in messages]) ) await typing await ctx.channel.send(embed=embed) @bot.command(name='flip_pipelin', brief='Flip a pipelin') async def flip_pipelin_command(ctx: commands.Context): await ctx.channel.send("Pipelin " + random.choice(['schnel', 'langsam'])) @bot.command(name='quote', brief='Get a random quote or add new ones') async def quote_command(ctx: commands.Context, author: Optional[str], *, quote: Optional[str]): quotes: Optional[dict[str, List[str]]] = config_get("guild_quotes", guild=ctx.guild.id) if quotes is None: quotes = {} config_set_raw("guild_quotes", quotes, ctx.guild.id) if author is None: authors = quotes.keys() if authors: author = random.choice([*authors]) author_quotes = quotes[author] quote = '\n'.join(map(lambda line: '> ' + line, random.choice(author_quotes).splitlines())) await ctx.channel.send(quote + '\n *~' + author + '*') else: await ctx.channel.send("No quotes present!") elif author == 'list': def create_list_embed(quote_author: str, _quotes: List[str]) -> discord.Embed: embed: discord.Embed = discord.Embed( title=quote_author, description='\n'.join(_quotes) ) return embed if quote is None: coroutines = [] for author, author_quotes in quotes.items(): coroutines.append(ctx.send(embed=create_list_embed(author, author_quotes))) await asyncio.gather(*coroutines) else: author = quote.strip().title() if author in quotes: await ctx.send(embed=create_list_embed(author, quotes[author])) else: await ctx.send("No such author!") else: author = author.strip().title() if quote: quote = quote.strip() quote = quote[0].upper() + quote[1::] if author in quotes: author_quotes: List[str] = quotes[author] regex = re.compile(r'\W') stripped_quote = regex.sub('', quote).lower() for aq in author_quotes: if regex.sub('', aq).lower() == stripped_quote: await ctx.channel.send("Quote already exists!") return author_quotes.append(quote) config_save() await ctx.channel.send("Quote added") else: quotes[author] = [quote] config_save() await ctx.channel.send("Quote added") else: if author in quotes: quote = random.choice(quotes[author]) quote = '\n'.join(map(lambda line: '> ' + line, quote.splitlines())) await ctx.channel.send(quote + '\n *~ ' + author + '*') else: await ctx.channel.send("Unknown author!") @bot.command(name='quote_remove', brief='Remove a quote') async def quote_remove_command(ctx: commands.Context, author: str, quote: str): if not ctx.author.guild_permissions.administrator: await ctx.channel.send("Only admins are allowed to remove quotes!") quotes: Optional[dict[str, List[str]]] = config_get("guild_quotes", guild=ctx.guild.id) if quotes is None: await ctx.channel.send("No quotes available!") else: quote = quote.strip().lower() author = author.strip().title() if author in quotes: for q in quotes[author]: if q.lower() == quote: quotes[author].remove(q) if not quotes[author]: quotes.pop(author) await ctx.channel.send("Successfully removed quote!") config_save() return # No matching quote found try: quote = int(quote) quotes[author].pop(quote) if not quotes[author]: quotes.pop(author) await ctx.channel.send("Successfully removed quote!") config_save() return except ValueError: pass await ctx.channel.send("No such quote found!") @bot.command(name='groups', brief="Manage groups") async def group_command(ctx: commands.Context, subcommand: Optional[str], arg: Optional[str], members: commands.Greedy[discord.Member]): if not ctx.author.guild_permissions.manage_roles: await ctx.send("Access denied!") return if subcommand is None: await ctx.send("Available commands: `list`, `create `, `archive `, `delete `") return guild: discord.Guild = ctx.guild role_prefix = config_get("groups-role-prefix", guild.id) def collect_group_channels(cat: discord.CategoryChannel) -> dict[str, discord.TextChannel]: return {channel.name: channel for channel in cat.text_channels} async def collect_group_roles() -> list[discord.Role]: return list(filter(lambda role: role.name.startswith(role_prefix), await guild.fetch_roles())) async def fail_category(type: str, expected: str): await ctx.send("Unable to find channel category \"" + expected + "\" for " + type + ". Change in configs.") def link_channel(channel: discord.TextChannel, italic: bool = False) -> str: if italic: return '[*' + channel.name + '*](https://discord.com/channels/' + str(guild.id) + '/' + str(channel.id) + ')' return '[' + channel.name + '](https://discord.com/channels/' + str(guild.id) + '/' + str(channel.id) + ')' groups_cat = find_category(guild, config_get("groups-category", guild.id)) if groups_cat is None: await fail_category("groups", config_get("groups-category", guild.id)) return if subcommand == 'list': archive_cat = find_category(guild, config_get("groups-archive-category", guild.id)) if archive_cat is None: await fail_category("archive", config_get("groups-archive-category", guild.id)) return active_groups = collect_group_channels(groups_cat) archived_groups = collect_group_channels(archive_cat) msg = "" for role in await collect_group_roles(): name = role.name[len(role_prefix):] if name in active_groups: msg += link_channel(active_groups[name]) + "\n" elif name in archived_groups: msg += link_channel(archived_groups[name], True) + "\n" else: msg += "*" + name + "*\n" embed = discord.Embed(title="Groups", description=msg) embed.set_footer(text="Italic groups are archived or unavailable.") await ctx.send(embed=embed) elif subcommand == 'archive': if arg is None: await ctx.send("Group name required!") return groups = collect_group_channels(groups_cat) if arg in groups: archive_cat = find_category(guild, config_get("groups-archive-category", guild.id)) if archive_cat is None: await fail_category("archive", config_get("groups-archive-category", guild.id)) return await groups[arg].edit(reason="Archive group " + arg, category=archive_cat) else: await ctx.send("Can't find that group!") elif subcommand == 'delete': if arg is None: await ctx.send("Group name required!") return groups = collect_group_channels(groups_cat) archive_cat = find_category(guild, config_get("groups-archive-category", guild.id)) if archive_cat is None: await fail_category("archive", config_get("groups-archive-category", guild.id)) return active_groups = collect_group_channels(groups_cat) archive_groups = collect_group_channels(archive_cat) role = guild.get_role(config_get("groups-role-prefix", guild.id)) if role is not None: await role.delete(reason="Delete group " + arg) if arg in active_groups: await active_groups[arg].delete(reason="Delete group " + arg) if arg in archive_groups: await archive_groups[arg].delete(reason="Delete group " + arg) elif subcommand == 'create': if arg is None: await ctx.send("Group name required!") return arg.strip() cor = groups_cat.create_text_channel(arg.lower(), reason="Create group " + arg) cor_role = guild.create_role(name=config_get("groups-role-prefix", guild.id) + arg, mentionable=True, reason="Create group " + arg) channel: discord.TextChannel = await cor cor = channel.edit(sync_permissions=True) role: discord.Role = await cor_role await cor await channel.set_permissions(role, reason="Create group " + arg, read_messages=True) if members: for member in members: await member.add_roles(role, reason="Create group " + arg) await asyncio.sleep(5) await channel.send("Hi, " + role.mention) await ctx.send("Done :)") def _is_message_valid_for_selection(message: discord.Message, reaction_filter: Optional[str] = None) -> bool: if message.clean_content.strip() == '': return False for reaction in message.reactions: if reaction.emoji == '❌': return False if reaction_filter is not None: for reaction in message.reactions: if type(reaction.emoji) is str: if reaction.emoji == reaction_filter: return True elif type(reaction.emoji) is discord.Emoji or type(reaction.emoji) is discord.PartialEmoji: if ':' + reaction.emoji.name + ':' == reaction_filter: return True return False return True @bot.command(name='config', brief='Change the configuration of this bot') async def config_prefix_command(ctx: commands.Context, cmd: str = '', key: str = '', *, val: str = ''): if ctx.guild is None: await ctx.send('You can\'t run config commands in private messages!') return if cmd == '': await ctx.send('Use `' + bot.command_prefix + 'config get` or `' + bot.command_prefix + 'config set` to query for or update config values. Use `' + bot.command_prefix + 'config list` to see all config options') elif cmd == 'list': msg = 'Available config options:' for (key, value) in config_get_descriptions(): msg += '\n - `' + key + '`: ' + value[1] await ctx.send(msg) elif cmd == 'get': if key in config: await ctx.send('`' + key + '` is set to `' + str(config_get(key, ctx.guild.id)) + '`') else: await ctx.send('Unknown config option `' + key + '`') elif cmd == 'set': if ctx.author.guild_permissions.administrator: await ctx.send(config_set(key, val, ctx.guild.id)) else: await ctx.send('You\'re not allowed to change the configuration on this server!') elif cmd == 'get-global': if key in config: await ctx.send('`' + key + '` is globally set to `' + str(config_get(key)) + '`') else: await ctx.send('Unknown config option `' + key + '`') elif cmd == 'set-global': if ctx.author.id == OWNER_ID: msg = config_set(key, val) if key == 'prefix': bot.command_prefix = config_get('prefix') await ctx.send(msg) else: await ctx.send('You\'re not allowed to change the global configuration!') else: await ctx.send('Unknown command!') bot.run(TOKEN)