import asyncio import os import random import re import time from typing import Optional import discord import gtts from discord.ext import commands from dotenv import load_dotenv from lib.config import config, config_meta, config_load, config_save, config_get, config_set, config_get_descriptions, \ config_set_raw from lib.utils import async_filter load_dotenv() TOKEN = os.getenv('DISCORD_TOKEN') PING_REGEX = re.compile(r'\b(?:ge)?ping', re.IGNORECASE) TOPFIT_REGEX = re.compile(r'\b(topfit|fit|top|micro|microsoft|virtual reality|vr|ä+h*m*|hä*)\b', re.IGNORECASE) LOEH_REGEX = re.compile(r'Och L(?:oe|ö)h!?', re.IGNORECASE) OCH_ANYONE_REGEX = re.compile(r'^Och\s*(\w.*?)\s*[,.!?:]*$', re.IGNORECASE) TOPFIT_WORDS = ( "äh", "ähm", "ääh", "äääh", "ä", "..", "...", "Mi", "Mic", "Micro", "Microsoft", "Microservices", "top", "vortual reality" ) # LOEH_ID = 327126546970312739 LOEH_ID = 254265844928872448 OWNER_ID = 327126546970312739 OCH_LOEH_SOUND = "assets/och_loeh.mp3" config_load() intents = discord.Intents.default() intents.members = True bot = commands.Bot(command_prefix=config.get('prefix'), intents=intents) if 'LIBOPUS' in os.environ and not len(os.environ['LIBOPUS']) == 0: discord.opus.load_opus(os.environ['LIBOPUS']) async def _get_loeh(guild: discord.Guild) -> Optional[discord.Member]: try: return await guild.fetch_member(LOEH_ID) except discord.NotFound: return None def _get_last_och_time(guild: discord.Guild) -> int: return config_get('last-och-time', guild.id) def _set_last_och_time(guild: discord.Guild): config_set_raw('last-och-time', time.time(), guild.id) @bot.event async def on_ready(): print(f'{bot.user} has connected to Discord!') @bot.event async def on_message(message: discord.Message): if message.guild is not None and not message.author.bot: if config_get('loeh-enable', message.guild.id) and LOEH_REGEX.match(message.content): if message.author.id == LOEH_ID: await message.channel.send("https://siphalor.de/img/spidy-is-that-you.jpg") else: t = time.time() - _get_last_och_time(message.guild) if t <= config_get('och-cooldown', message.guild.id): await message.channel.send('429: Too many requests - Wait ' + str(t) + ' more seconds!') return _set_last_och_time(message.guild) loeh = await _get_loeh(message.guild) if loeh is None: await message.channel.send('404: Löh not found!') elif loeh.voice is None: await message.channel.send('400: Löh not connected!') else: voice: discord.VoiceState = loeh.voice try: voice_channel: discord.VoiceChannel = voice.channel voice_protocol: discord.VoiceProtocol = await voice_channel.connect() if type(voice_protocol) is discord.VoiceClient: source = discord.PCMVolumeTransformer(discord.FFmpegPCMAudio(source=OCH_LOEH_SOUND)) voice_protocol.play(source) await loeh.edit(mute=True) sleeper = asyncio.sleep(config_get('och-timeout', message.guild.id)) message: Optional[discord.Message] = await message.channel.send('Zu Befehl!') await sleeper await loeh.edit(mute=False) if message is not None: await message.edit(content="~~Zu Befehl!~~\nEs sei ihm verziehen.") if type(voice_protocol) is discord.VoiceClient: await voice_protocol.disconnect() except (asyncio.TimeoutError, discord.Forbidden, discord.HTTPException, discord.ClientException): await message.channel.send('Failed to complete your command, Sir') return elif config_get('och-anyone-enable', message.guild.id) and ( match := OCH_ANYONE_REGEX.search(message.content)) is not None: t = time.time() - _get_last_och_time(message.guild) if t <= config_get('och-cooldown', message.guild.id): await message.channel.send('429: Too many requests - Wait ' + str(t) + ' more seconds!') return _set_last_och_time(message.guild) guild: discord.Guild = message.guild bot_member: discord.Member = guild.get_member(bot.user.id) search: str = match.group(1).lower() matches: list = [] voice: Optional[discord.VoiceChannel] = None async for member in guild.fetch_members(limit=100): if not member.bot and member.voice is not None and search in member.display_name.lower().split(' '): perms: discord.Permissions = member.voice.channel.permissions_for(bot_member) if perms.connect and perms.speak: voice = member.voice.channel matches.append(member) if matches and voice is not None: voice_protocol: discord.VoiceProtocol = await voice.connect() if type(voice_protocol) is discord.VoiceClient: tts = gtts.gTTS(message.content, lang='de') os.makedirs('temp', exist_ok=True) tts.save('temp/och.mp3') source = discord.PCMVolumeTransformer( discord.FFmpegPCMAudio(source='temp/och.mp3', before_options='-v quiet') ) voice_protocol.play(source) async def _mute(m: discord.Member): await m.edit(mute=True) async def _unmute(m: discord.Member): await m.edit(mute=False) await asyncio.gather(*map(_mute, matches)) sleeper = asyncio.sleep(config_get('och-timeout', message.guild.id)) message: Optional[discord.Message] = await message.channel.send('Auf gehts!') await sleeper await asyncio.gather(*map(_unmute, matches)) waiter = None if message is not None: waiter = message.edit(content='~~Auf gehts!~~\nGeschafft!') if type(voice_protocol) is discord.VoiceClient: await voice_protocol.disconnect() if waiter is not None: await waiter os.remove('temp/och.mp3') else: await message.channel.send('404: No users found!') elif config_get('inf19x-insiders-enable', message.guild.id): if PING_REGEX.search(message.content): embed = discord.Embed( title="*pinken, schwaches Verb*", description="ein Netzwerkgerät testweise ansprechen.\nOft falsch geschrieben als pin__g__en", color=16636435 ) embed.add_field(name='ich', value='pinke') embed.add_field(name='du', value='pinkst') embed.add_field(name='er|sie|es', value='pinkt') embed.add_field(name='wir', value='pinken') embed.add_field(name='ihr', value='pinkt') embed.add_field(name='sie', value='pinken') embed.add_field(name='Partizip 2', value='gepinkt') await message.channel.send(embed=embed) else: match = TOPFIT_REGEX.search(message.content) if match is not None: text = '' for i in range(0, random.randint(7, 13)): text += random.choice(TOPFIT_WORDS) + " " text += match.group(1) await message.channel.send(text) await bot.process_commands(message) @bot.command(name='random_message', brief='Select a random message from a channel') async def random_message_command(ctx: commands.Context, channel: Optional[discord.TextChannel] = None, max_cnt: Optional[int] = 100, reaction_filter: Optional[str] = None): typing = ctx.channel.trigger_typing() if channel is None: channel = ctx.channel messages = async_filter(lambda m: _is_message_valid_for_selection(m, reaction_filter), channel.history(limit=max_cnt)) messages = [item async for item in messages] if not messages: await typing await ctx.channel.send("No valid messages found!") return msg: discord.Message = random.choice(messages) author: discord.abc.User = msg.author embed = discord.Embed( description=msg.content + "\n\n[Go to message](" + msg.jump_url + ")" ) embed.set_author(name=author.display_name, icon_url=author.avatar_url) embed.set_footer(text="random message from #" + channel.name + " out of " + str(len(messages)) + " messages") await typing await ctx.channel.send(embed=embed) @bot.command(name='collect_messages', brief='Lists all messages with the given filters') async def collect_messages_command(ctx: commands.Context, channel: Optional[discord.TextChannel] = None, max_cnt: Optional[int] = 100, reaction_filter: Optional[str] = None): typing = ctx.channel.trigger_typing() if channel is None: channel = ctx.channel messages = async_filter(lambda m: _is_message_valid_for_selection(m, reaction_filter), channel.history(limit=max_cnt)) messages = [item async for item in messages] embed = discord.Embed( title="List of matching messages", description='\n'.join([m.content for m in messages]) ) await typing await ctx.channel.send(embed=embed) @bot.command(name='flip_pipelin', brief='Flip a pipelin') async def flip_pipelin_command(ctx: commands.Context): await ctx.channel.send("Pipelin " + random.choice(['schnel', 'langsam'])) def _is_message_valid_for_selection(message: discord.Message, reaction_filter: Optional[str] = None) -> bool: if message.clean_content.strip() == '': return False for reaction in message.reactions: if reaction.emoji == '❌': return False if reaction_filter is not None: for reaction in message.reactions: if type(reaction.emoji) is str: if reaction.emoji == reaction_filter: return True elif type(reaction.emoji) is discord.Emoji or type(reaction.emoji) is discord.PartialEmoji: if ':' + reaction.emoji.name + ':' == reaction_filter: return True return False return True @bot.command(name='config', brief='Change the configuration of this bot') async def config_prefix_command(ctx: commands.Context, cmd: str = '', key: str = '', *, val: str = ''): if ctx.guild is None: await ctx.send('You can\'t run config commands in private messages!') return if cmd == '': await ctx.send('Use `' + bot.command_prefix + 'config get` or `' + bot.command_prefix + 'config set` to query for or update config values. Use `' + bot.command_prefix + 'config list` to see all config options') elif cmd == 'list': msg = 'Available config options:' for (key, value) in config_get_descriptions(): msg += '\n - `' + key + '`: ' + value[1] await ctx.send(msg) elif cmd == 'get': if key in config: await ctx.send('`' + key + '` is set to `' + str(config_get(key, ctx.guild.id)) + '`') else: await ctx.send('Unknown config option `' + key + '`') elif cmd == 'set': if ctx.author.guild_permissions.administrator: await ctx.send(config_set(key, val, ctx.guild.id)) else: await ctx.send('You\'re not allowed to change the configuration on this server!') elif cmd == 'get-global': if key in config: await ctx.send('`' + key + '` is globally set to `' + str(config_get(key)) + '`') else: await ctx.send('Unknown config option `' + key + '`') elif cmd == 'set-global': if ctx.author.id == OWNER_ID: msg = config_set(key, val) if key == 'prefix': bot.command_prefix = config_get('prefix') await ctx.send(msg) else: await ctx.send('You\'re not allowed to change the global configuration!') else: await ctx.send('Unknown command!') bot.run(TOKEN)