Files
och-bot/bot.py
2021-01-17 16:06:53 +01:00

232 lines
9.3 KiB
Python

import asyncio
import os
import random
import re
import time
from typing import Optional
import discord
from discord.ext import commands
from dotenv import load_dotenv
from lib.config import config, config_meta, config_load, config_save, config_get, config_set, config_get_descriptions
from lib.utils import async_filter
load_dotenv()
TOKEN = os.getenv('DISCORD_TOKEN')
PING_REGEX = re.compile(r'\b(?:ge)?ping', re.IGNORECASE)
TOPFIT_REGEX = re.compile(r'\b(topfit|fit|top|micro|microsoft|virtual reality|vr|ä+h*m*|hä*)\b', re.IGNORECASE)
LOEH_REGEX = re.compile(r'Och L(?:oe|ö)h!?', re.IGNORECASE)
TOPFIT_WORDS = (
"äh",
"ähm",
"ääh",
"äääh",
"ä",
"..",
"...",
"Mi",
"Mic",
"Micro",
"Microsoft",
"Microservices",
"top",
"vortual reality"
)
# LOEH_ID = 327126546970312739
LOEH_ID = 254265844928872448
OWNER_ID = 327126546970312739
OCH_LOEH_SOUND = "assets/och_loeh.mp3"
last_loeh = 0
config_load()
bot = commands.Bot(command_prefix=config.get('prefix'))
async def get_loeh(guild: discord.Guild) -> Optional[discord.Member]:
try:
return await guild.fetch_member(LOEH_ID)
except discord.NotFound:
return None
@bot.event
async def on_ready():
print(f'{bot.user} has connected to Discord!')
@bot.event
async def on_message(message: discord.Message):
global last_loeh
if message.guild is not None and not message.author.bot:
if config_get('loeh-enable', message.guild.id) and LOEH_REGEX.match(message.content):
if message.author.id == LOEH_ID:
await message.channel.send("https://siphalor.de/img/spidy-is-that-you.jpg")
else:
t = time.time()
if t - config_get('loeh-cooldown', message.guild.id) <= last_loeh:
await message.channel.send('Don\'t try this to often, **' + message.author.nick +
'**. That might backfire.')
last_loeh = t
return
last_loeh = t
loeh = await get_loeh(message.guild)
if loeh is None:
await message.channel.send('404: Löh not found!')
elif loeh.voice is None:
await message.channel.send('400: Löh not connected!')
else:
voice: discord.VoiceState = loeh.voice
try:
voice_channel: discord.VoiceChannel = voice.channel
voice_protocol: discord.VoiceProtocol = await voice_channel.connect()
if type(voice_protocol) is discord.VoiceClient:
source = discord.PCMVolumeTransformer(discord.FFmpegPCMAudio(source=OCH_LOEH_SOUND))
voice_protocol.play(source)
await loeh.edit(mute=True)
sleeper = asyncio.sleep(config_get('loeh-timeout', message.guild.id))
message: Optional[discord.Message] = await message.channel.send('Zu Befehl!')
await sleeper
await loeh.edit(mute=False)
if message is not None:
await message.edit(content="~~Zu Befehl!~~\nEs sei ihm verziehen.")
if type(voice_protocol) is discord.VoiceClient:
await voice_protocol.disconnect()
except (asyncio.TimeoutError, discord.Forbidden, discord.HTTPException, discord.ClientException):
await message.channel.send('Failed to complete your command, Sir')
return
elif config_get('inf19x-insiders-enable', message.guild.id):
if PING_REGEX.search(message.content):
embed = discord.Embed(
title="*pinken, schwaches Verb*",
description="ein Netzwerkgerät testweise ansprechen.\nOft falsch geschrieben als pin__g__en",
color=16636435
)
embed.add_field(name='ich', value='pinke')
embed.add_field(name='du', value='pinkst')
embed.add_field(name='er|sie|es', value='pinkt')
embed.add_field(name='wir', value='pinken')
embed.add_field(name='ihr', value='pinkt')
embed.add_field(name='sie', value='pinken')
embed.add_field(name='Partizip 2', value='gepinkt')
await message.channel.send(embed=embed)
else:
match = TOPFIT_REGEX.search(message.content)
if match is not None:
text = ''
for i in range(0, random.randint(7, 13)):
text += random.choice(TOPFIT_WORDS) + " "
text += match.group()
await message.channel.send(text)
await bot.process_commands(message)
@bot.command(name='random_message', brief='Select a random message from a channel')
async def random_message_command(ctx: commands.Context, channel: Optional[discord.TextChannel] = None,
max_cnt: Optional[int] = 100, reaction_filter: Optional[str] = None):
typing = ctx.channel.trigger_typing()
if channel is None:
channel = ctx.channel
messages = async_filter(lambda m: _is_message_valid_for_selection(m, reaction_filter), channel.history(limit=max_cnt))
messages = [item async for item in messages]
if not messages:
await typing
await ctx.channel.send("No valid messages found!")
return
msg: discord.Message = random.choice(messages)
author: discord.abc.User = msg.author
embed = discord.Embed(
description=msg.content + "\n\n[Go to message](" + msg.jump_url + ")"
)
embed.set_author(name=author.display_name, icon_url=author.avatar_url)
embed.set_footer(text="random message from #" + channel.name + " out of " + str(len(messages)) + " messages")
await typing
await ctx.channel.send(embed=embed)
@bot.command(name='collect_messages', brief='Lists all messages with the given filters')
async def collect_messages_command(ctx: commands.Context, channel: Optional[discord.TextChannel] = None,
max_cnt: Optional[int] = 100, reaction_filter: Optional[str] = None):
typing = ctx.channel.trigger_typing()
if channel is None:
channel = ctx.channel
messages = async_filter(lambda m: _is_message_valid_for_selection(m, reaction_filter), channel.history(limit=max_cnt))
messages = [item async for item in messages]
embed = discord.Embed(
title="List of matching messages",
description='\n'.join([m.content for m in messages])
)
await typing
await ctx.channel.send(embed=embed)
def _is_message_valid_for_selection(message: discord.Message, reaction_filter: Optional[str] = None) -> bool:
if message.clean_content.strip() == '':
return False
for reaction in message.reactions:
if reaction.emoji == '':
return False
if reaction_filter is not None:
for reaction in message.reactions:
if type(reaction.emoji) is str:
if reaction.emoji == reaction_filter:
return True
elif type(reaction.emoji) is discord.Emoji or type(reaction.emoji) is discord.PartialEmoji:
if ':' + reaction.emoji.name + ':' == reaction_filter:
return True
return False
return True
@bot.command(name='config', brief='Change the configuration of this bot')
async def config_prefix_command(ctx: commands.Context, cmd: str = '', key: str = '', *, val: str = ''):
if ctx.guild is None:
await ctx.send('You can\'t run config commands in private messages!')
return
if cmd == '':
await ctx.send('Use `' + bot.command_prefix + 'config get` or `' + bot.command_prefix +
'config set` to query for or update config values. Use `' + bot.command_prefix +
'config list` to see all config options')
elif cmd == 'list':
msg = 'Available config options:'
for (key, value) in config_get_descriptions():
msg += '\n - `' + key + '`: ' + value[1]
await ctx.send(msg)
elif cmd == 'get':
if key in config:
await ctx.send('`' + key + '` is set to `' + str(config_get(key, ctx.guild.id)) + '`')
else:
await ctx.send('Unknown config option `' + key + '`')
elif cmd == 'set':
if ctx.author.guild_permissions.administrator:
await ctx.send(config_set(key, val, ctx.guild.id))
else:
await ctx.send('You\'re not allowed to change the configuration on this server!')
elif cmd == 'get-global':
if key in config:
await ctx.send('`' + key + '` is globally set to `' + str(config_get(key)) + '`')
else:
await ctx.send('Unknown config option `' + key + '`')
elif cmd == 'set-global':
if ctx.author.id == OWNER_ID:
msg = config_set(key, val)
if key == 'prefix':
bot.command_prefix = config_get('prefix')
await ctx.send(msg)
else:
await ctx.send('You\'re not allowed to change the global configuration!')
else:
await ctx.send('Unknown command!')
bot.run(TOKEN)