Files
och-bot/bot.py

356 lines
15 KiB
Python

import asyncio
import os
import random
import re
import time
from typing import Optional, List
import discord
import gtts
from discord.ext import commands
from dotenv import load_dotenv
from lib.config import config, config_meta, config_load, config_save, config_get, config_set, config_get_descriptions, \
config_set_raw
from lib.utils import async_filter
load_dotenv()
TOKEN = os.getenv('DISCORD_TOKEN')
PING_REGEX = re.compile(r'\b(?:ge)?ping', re.IGNORECASE)
TOPFIT_REGEX = re.compile(r'\b(topfit|fit|top|micro|microsoft|virtual reality|vr|ä+h*m*|hä*)\b', re.IGNORECASE)
LOEH_REGEX = re.compile(r'Och L(?:oe|ö)h!?', re.IGNORECASE)
OCH_ANYONE_REGEX = re.compile(r'^Och\s*(\w.*?)\s*[,.!?:]*$', re.IGNORECASE)
TOPFIT_WORDS = (
"äh",
"ähm",
"ääh",
"äääh",
"ä",
"..",
"...",
"Mi",
"Mic",
"Micro",
"Microsoft",
"Microservices",
"top",
"vortual reality"
)
# LOEH_ID = 327126546970312739
LOEH_ID = 254265844928872448
OWNER_ID = 327126546970312739
OCH_LOEH_SOUND = "assets/och_loeh.mp3"
config_load()
intents = discord.Intents.default()
intents.members = True
bot = commands.Bot(command_prefix=config.get('prefix'), intents=intents)
if 'LIBOPUS' in os.environ and not len(os.environ['LIBOPUS']) == 0:
discord.opus.load_opus(os.environ['LIBOPUS'])
async def _get_loeh(guild: discord.Guild) -> Optional[discord.Member]:
try:
return await guild.fetch_member(LOEH_ID)
except discord.NotFound:
return None
def _get_last_och_time(guild: discord.Guild) -> int:
return config_get('last-och-time', guild.id)
def _set_last_och_time(guild: discord.Guild):
config_set_raw('last-och-time', time.time(), guild.id)
@bot.event
async def on_ready():
print(f'{bot.user} has connected to Discord!')
@bot.event
async def on_message(message: discord.Message):
if message.guild is not None and not message.author.bot:
if config_get('loeh-enable', message.guild.id) and LOEH_REGEX.match(message.content):
if message.author.id == LOEH_ID:
await message.channel.send("https://siphalor.de/img/spidy-is-that-you.jpg")
else:
cooldown = config_get('och-cooldown', message.guild.id)
t = time.time() - _get_last_och_time(message.guild)
if t <= cooldown:
await message.channel.send('429: Too many requests - Wait ' + str(int(cooldown - t)) + ' more seconds!')
return
_set_last_och_time(message.guild)
loeh = await _get_loeh(message.guild)
if loeh is None:
await message.channel.send('404: Löh not found!')
elif loeh.voice is None:
await message.channel.send('400: Löh not connected!')
else:
voice: discord.VoiceState = loeh.voice
try:
voice_channel: discord.VoiceChannel = voice.channel
voice_protocol: discord.VoiceProtocol = await voice_channel.connect()
if type(voice_protocol) is discord.VoiceClient:
source = discord.PCMVolumeTransformer(discord.FFmpegPCMAudio(source=OCH_LOEH_SOUND))
voice_protocol.play(source)
await loeh.edit(mute=True)
sleeper = asyncio.sleep(config_get('och-timeout', message.guild.id))
message: Optional[discord.Message] = await message.channel.send('Zu Befehl!')
await sleeper
await loeh.edit(mute=False)
if message is not None:
await message.edit(content="~~Zu Befehl!~~\nEs sei ihm verziehen.")
if type(voice_protocol) is discord.VoiceClient:
await voice_protocol.disconnect()
except (asyncio.TimeoutError, discord.Forbidden, discord.HTTPException, discord.ClientException):
await message.channel.send('Failed to complete your command, Sir')
return
elif config_get('och-anyone-enable', message.guild.id) and (
match := OCH_ANYONE_REGEX.search(message.content)) is not None:
cooldown = config_get('och-cooldown', message.guild.id)
t = time.time() - _get_last_och_time(message.guild)
if t <= cooldown:
await message.channel.send('429: Too many requests - Wait ' + str(int(cooldown - t)) + ' more seconds!')
return
_set_last_och_time(message.guild)
guild: discord.Guild = message.guild
bot_member: discord.Member = guild.get_member(bot.user.id)
search: str = match.group(1).lower()
matches: list = []
voice: Optional[discord.VoiceChannel] = None
async for member in guild.fetch_members(limit=100):
if not member.bot and member.voice is not None and search in member.display_name.lower().split(' '):
perms: discord.Permissions = member.voice.channel.permissions_for(bot_member)
if perms.connect and perms.speak:
voice = member.voice.channel
matches.append(member)
if matches and voice is not None:
voice_protocol: discord.VoiceProtocol = await voice.connect()
if type(voice_protocol) is discord.VoiceClient:
tts = gtts.gTTS(message.content, lang='de')
os.makedirs('temp', exist_ok=True)
tts.save('temp/och.mp3')
source = discord.PCMVolumeTransformer(
discord.FFmpegPCMAudio(source='temp/och.mp3', before_options='-v quiet')
)
voice_protocol.play(source)
async def _mute(m: discord.Member):
await m.edit(mute=True)
async def _unmute(m: discord.Member):
await m.edit(mute=False)
await asyncio.gather(*map(_mute, matches))
sleeper = asyncio.sleep(config_get('och-timeout', message.guild.id))
message: Optional[discord.Message] = await message.channel.send('Auf gehts!')
await sleeper
await asyncio.gather(*map(_unmute, matches))
waiter = None
if message is not None:
waiter = message.edit(content='~~Auf gehts!~~\nGeschafft!')
if type(voice_protocol) is discord.VoiceClient:
await voice_protocol.disconnect()
if waiter is not None:
await waiter
os.remove('temp/och.mp3')
else:
await message.channel.send('404: No users found!')
elif config_get('inf19x-insiders-enable', message.guild.id):
if PING_REGEX.search(message.content):
embed = discord.Embed(
title="*pinken, schwaches Verb*",
description="ein Netzwerkgerät testweise ansprechen.\nOft falsch geschrieben als pin__g__en",
color=16636435
)
embed.add_field(name='ich', value='pinke')
embed.add_field(name='du', value='pinkst')
embed.add_field(name='er|sie|es', value='pinkt')
embed.add_field(name='wir', value='pinken')
embed.add_field(name='ihr', value='pinkt')
embed.add_field(name='sie', value='pinken')
embed.add_field(name='Partizip 2', value='gepinkt')
await message.channel.send(embed=embed)
else:
match = TOPFIT_REGEX.search(message.content)
if match is not None:
text = ''
for i in range(0, random.randint(7, 13)):
text += random.choice(TOPFIT_WORDS) + " "
text += match.group(1)
await message.channel.send(text)
await bot.process_commands(message)
@bot.command(name='random_message', brief='Select a random message from a channel')
async def random_message_command(ctx: commands.Context, channel: Optional[discord.TextChannel] = None,
max_cnt: Optional[int] = 100, reaction_filter: Optional[str] = None):
typing = ctx.channel.trigger_typing()
if channel is None:
channel = ctx.channel
messages = async_filter(lambda m: _is_message_valid_for_selection(m, reaction_filter),
channel.history(limit=max_cnt))
messages = [item async for item in messages]
if not messages:
await typing
await ctx.channel.send("No valid messages found!")
return
msg: discord.Message = random.choice(messages)
author: discord.abc.User = msg.author
embed = discord.Embed(
description=msg.content + "\n\n[Go to message](" + msg.jump_url + ")"
)
embed.set_author(name=author.display_name, icon_url=author.avatar_url)
embed.set_footer(text="random message from #" + channel.name + " out of " + str(len(messages)) + " messages")
await typing
await ctx.channel.send(embed=embed)
@bot.command(name='collect_messages', brief='Lists all messages with the given filters')
async def collect_messages_command(ctx: commands.Context, channel: Optional[discord.TextChannel] = None,
max_cnt: Optional[int] = 100, reaction_filter: Optional[str] = None):
typing = ctx.channel.trigger_typing()
if channel is None:
channel = ctx.channel
messages = async_filter(lambda m: _is_message_valid_for_selection(m, reaction_filter),
channel.history(limit=max_cnt))
messages = [item async for item in messages]
embed = discord.Embed(
title="List of matching messages",
description='\n'.join([m.content for m in messages])
)
await typing
await ctx.channel.send(embed=embed)
@bot.command(name='flip_pipelin', brief='Flip a pipelin')
async def flip_pipelin_command(ctx: commands.Context):
await ctx.channel.send("Pipelin " + random.choice(['schnel', 'langsam']))
@bot.command(name='quote', brief='Get a random quote or add new ones')
async def quote_command(ctx: commands.Context, author: Optional[str], *, quote: Optional[str]):
quotes: Optional[dict[str,List[str]]] = config_get("guild_quotes", guild=ctx.guild.id)
if quotes is None:
quotes = {}
config_set_raw("guild_quotes", quotes, ctx.guild.id)
if author is None:
authors = quotes.keys()
if authors:
author = random.choice([*authors])
author_quotes = quotes[author]
quote = '\n'.join(map(lambda line: '> ' + line, random.choice(author_quotes).splitlines()))
await ctx.channel.send(quote + '\n *~' + author + '*')
else:
await ctx.channel.send("No quotes present!")
else:
author = author.strip().title()
if quote:
quote = quote.strip().capitalize()
if author in quotes:
author_quotes: List[str] = quotes[author]
regex = re.compile(r'[\W]')
stripped_quote = regex.sub(quote, '').lower()
for aq in author_quotes:
if regex.sub(aq, '').lower() == stripped_quote:
await ctx.channel.send("Quote already exists!")
return
author_quotes.append(quote)
config_save()
await ctx.channel.send("Quote added")
else:
quotes[author] = [quote]
config_save()
await ctx.channel.send("Quote added")
else:
if author in quotes:
quote = random.choice(quotes[author])
quote = '\n'.join(map(lambda line: '> ' + line, quote.splitlines()))
await ctx.channel.send(quote + '\n *~ ' + author + '*')
else:
await ctx.channel.send("Unknown author!")
def _is_message_valid_for_selection(message: discord.Message, reaction_filter: Optional[str] = None) -> bool:
if message.clean_content.strip() == '':
return False
for reaction in message.reactions:
if reaction.emoji == '':
return False
if reaction_filter is not None:
for reaction in message.reactions:
if type(reaction.emoji) is str:
if reaction.emoji == reaction_filter:
return True
elif type(reaction.emoji) is discord.Emoji or type(reaction.emoji) is discord.PartialEmoji:
if ':' + reaction.emoji.name + ':' == reaction_filter:
return True
return False
return True
@bot.command(name='config', brief='Change the configuration of this bot')
async def config_prefix_command(ctx: commands.Context, cmd: str = '', key: str = '', *, val: str = ''):
if ctx.guild is None:
await ctx.send('You can\'t run config commands in private messages!')
return
if cmd == '':
await ctx.send('Use `' + bot.command_prefix + 'config get` or `' + bot.command_prefix +
'config set` to query for or update config values. Use `' + bot.command_prefix +
'config list` to see all config options')
elif cmd == 'list':
msg = 'Available config options:'
for (key, value) in config_get_descriptions():
msg += '\n - `' + key + '`: ' + value[1]
await ctx.send(msg)
elif cmd == 'get':
if key in config:
await ctx.send('`' + key + '` is set to `' + str(config_get(key, ctx.guild.id)) + '`')
else:
await ctx.send('Unknown config option `' + key + '`')
elif cmd == 'set':
if ctx.author.guild_permissions.administrator:
await ctx.send(config_set(key, val, ctx.guild.id))
else:
await ctx.send('You\'re not allowed to change the configuration on this server!')
elif cmd == 'get-global':
if key in config:
await ctx.send('`' + key + '` is globally set to `' + str(config_get(key)) + '`')
else:
await ctx.send('Unknown config option `' + key + '`')
elif cmd == 'set-global':
if ctx.author.id == OWNER_ID:
msg = config_set(key, val)
if key == 'prefix':
bot.command_prefix = config_get('prefix')
await ctx.send(msg)
else:
await ctx.send('You\'re not allowed to change the global configuration!')
else:
await ctx.send('Unknown command!')
bot.run(TOKEN)