Geoffrey-Bot/GeoffreyBot/geoffrey.py

182 lines
6.5 KiB
Python

from discord import Game
from discord.ext import commands
from requests import exceptions as request_exception
from discord.utils import oauth_url
import requests
import logging
import time
import traceback
import sys
from GeoffreyBot.geoffrey_api import HandledError
logger = logging.getLogger('GeoffreyBot')
description = '''
Geoffrey (pronounced JOFF-ree) started his life as an inside joke none of you will understand.
At some point, she was to become an airhorn discord_bot. Now, they know where your stuff is.
If have a suggestion or if something is borked, you can PM my ding dong of a creator ZeroHD.
*You must use ?register before adding things to Geoffrey*
'''
bad_error_message = 'OOPSIE WOOPSIE!! Uwu We made a fucky wucky!! A wittle fucko boingo! The admins at our ' \
'headquarters are working VEWY HAWD to fix this! (Error in command {})'
class GeoffreyBot(commands.Bot):
def __init__(self, base_url, api_token):
self.base_url = base_url
self.api_token = api_token
settings_url = base_url + '/GeoffreyApp/api/settings/'
command_list_url = base_url + '/GeoffreyApp/api/command/commands'
logger.info("Connecting to the Geoffrey API... ")
while True:
try:
setting = requests.get(url=settings_url, params={"api": self.api_token}).json()
self.command_list = requests.get(url=command_list_url, params={"api": self.api_token}).json()
break
except Exception:
time.sleep(1)
self.prefix = setting['BOT_PREFIX']
self.error_users = setting['ERROR_USERS']
self.admin_users = setting['MOD_RANKS']
self.special_users = []
self.default_status = setting['STATUS']
super().__init__(command_prefix=self.prefix, description=description, case_insensitive=True, help_command=None)
self.load_extension('GeoffreyBot.geoffrey_api')
self.help_page = self.build_help_page()
self.help_dict = self.build_help_dict()
async def on_ready(self):
logger.info("%s Online, ID: %s", self.user.name, self.user.id)
info = await self.application_info()
url = oauth_url(info.id)
logger.info("Bot url: %s", url)
await self.change_presence(activity=Game(self.default_status))
async def on_command(self, ctx):
if ctx.invoked_subcommand is None:
subcommand = ""
else:
subcommand = ":" + ctx.invoked_subcommand.__str__()
logger.info("User %s, used command '%s%s' with arguments: %s", ctx.message.author, ctx.command.name, subcommand,
ctx.args[2:])
async def on_command_error(self, ctx, error):
error_str = ""
if hasattr(error, "original") and isinstance(error.original, HandledError):
return
elif isinstance(error, commands.errors.CommandNotFound):
return
elif isinstance(error, commands.errors.BadArgument) or isinstance(error,
commands.errors.MissingRequiredArgument) or (
hasattr(error, "original") and isinstance(error.original, ValueError)):
error_str = "Well bud, you got this far. Good job! But you still h*cked up the syntax:"
for line in self.help_dict[ctx.command.name]:
error_str += line
elif hasattr(error, "original") and isinstance(error.original, request_exception.ConnectionError):
error_str = "Unable to connect to the database. Hopefully someone is working on this..."
await self.send_error_message("Can't connect to the GeoffreyAPI, is it offline?")
if error_str is '':
await self.send_error_message(
'Geoffrey encountered unhandled exception: {} Command: **{}** Context: {}'
.format(error,
ctx.invoked_with,
ctx.args[1].args[2:]))
error_message = ["```python"]
for tb in traceback.format_tb(error.original.__traceback__):
error_message.append(tb)
error_message.append("```")
error_str = bad_error_message.format(ctx.invoked_with)
logger.error("Geoffrey encountered exception: %s", error)
traceback.print_exception(type(error), error, error.__traceback__, file=sys.stderr)
await ctx.message.channel.send('{} **Error Running Command:** {}'.format(
ctx.message.author.mention, error_str))
async def send_error_message(self, msg):
for user_id in self.error_users:
user = self.get_user(user_id)
if msg is list:
await self.send_list(user, msg)
else:
await user.send(msg)
def run_command(self, command, **kwargs):
url = self.base_url + '/api/command/{}'
return requests.get(url=url.format(command), params=kwargs).json()
@staticmethod
async def send_list(ctx, send_list):
size = 0
msg = ""
for s in send_list:
if (size + len(s)) > 2000:
await ctx.send(msg)
msg = s
size = len(s)
else:
msg = msg + '\n' + s
size = size + len(s)
if len(msg) > 0:
await ctx.send(msg)
def build_help_page(self):
help_list = []
largest_command_size = 0
for command in self.command_list:
if largest_command_size < len(command["command"]):
largest_command_size = len(command["command"])
for line in self.description.split("\n"):
help_list.append(line)
help_list.append("\nCommands:")
for command in self.command_list:
name = command["command"].ljust(largest_command_size + 2)
help_list.append(" {}{}{}".format(self.prefix, name, command["help"]))
help_list.append("\nDo ?help <command name> to get more info on any of the commands above")
return help_list
def build_help_dict(self):
help_dict = {}
for command in self.command_list:
command_name = command["command"]
help_list = ['```', command["help"] + "\n"]
c = self.get_command(command_name)
if c is not None:
help_msg = c.help
help_list.append(help_msg.format(self.prefix))
help_list.append('```')
help_dict[command_name] = help_list
return help_dict