from discord import Game from discord.ext import commands from requests import exceptions as request_exception from discord.utils import oauth_url import requests import logging import time import traceback import sys from GeoffreyBot.util import HandledError from GeoffreyBot.geoffrey_formatter import format_message logger = logging.getLogger('GeoffreyBot') description = ''' Geoffrey (pronounced JOFF-ree) started his life as an inside joke none of you will understand. At some point, she was to become an airhorn discord_bot. Now, they know where your stuff is. If have a suggestion or if something is borked, you can PM my ding dong of a creator ZeroHD. *You must use ?register before adding things to Geoffrey* ''' bad_error_message = 'OOPSIE WOOPSIE!! Uwu We made a fucky wucky!! A wittle fucko boingo! The admins at our ' \ 'headquarters are working VEWY HAWD to fix this! (Error in command {})' class GeoffreyBot(commands.Bot): def __init__(self, geoffrey_app_url, geoffrey_api_url, geoffrey_api_token): self.api_url = geoffrey_api_url self.geoffrey_app_url = geoffrey_app_url self.api_token = geoffrey_api_token self.settings_url = self.api_url + 'settings/' self.command_list_url = self.api_url + 'command/commands' logger.info("Connecting to the Geoffrey API... ") fail_count = 0 while True: try: setting = requests.get(url=self.settings_url, params={"api": self.api_token}).json() self.command_list = requests.get(url=self.command_list_url, params={"api": self.api_token}).json() break except Exception: if fail_count > 20: print("Failed to connect to Geoffrey API!") quit(-1) time.sleep(1) fail_count += 1 self.prefix = setting['BOT_PREFIX'] self.error_users = setting['ERROR_USERS'] self.mod_ranks = setting['MOD_RANKS'] self.special_users = [] self.default_status = setting['STATUS'] super().__init__(command_prefix=self.prefix, description=description, case_insensitive=True, help_command=None) self.load_extension('GeoffreyBot.geoffrey_api') self.load_extension('GeoffreyBot.geoffrey_mod_cmds') self.help_page = self.build_help_page() self.help_dict = self.build_help_dict() async def on_ready(self): logger.info("%s Online, ID: %s", self.user.name, self.user.id) info = await self.application_info() url = oauth_url(info.id) logger.info("Bot url: %s", url) await self.change_presence(activity=Game(self.default_status)) async def on_command(self, ctx): if ctx.invoked_subcommand is None: subcommand = "" else: subcommand = ":" + ctx.invoked_subcommand.__str__() logger.info("User %s, used command '%s%s' with arguments: %s", ctx.message.author, ctx.command.name, subcommand, ctx.args[2:]) async def on_command_error(self, ctx, error): error_str = "" if isinstance(error, commands.CheckFailure): error_str = "You do not have permission to run that whacky command dog." elif hasattr(error, "original") and isinstance(error.original, HandledError): return elif isinstance(error, commands.errors.CommandNotFound): return elif isinstance(error, commands.errors.ExpectedClosingQuoteError): error_str = "It appears you are missing a quotation mark in there, better luck next time goober." elif isinstance(error, commands.errors.BadArgument) or isinstance(error, commands.MissingRequiredArgument) or ( hasattr(error, "original") and isinstance(error.original, ValueError)): error_str = "Well bud, you got this far. Good job! But you still h*cked up the syntax:" error_str = self.build_help_str(ctx.command.name, error_str) elif hasattr(error, "original") and isinstance(error.original, request_exception.ConnectionError): error_str = "Unable to connect to the database. Hopefully someone is working on this..." await self.send_error_message("Can't connect to the GeoffreyAPI, is it offline?") elif hasattr(error, "original"): e = error.original.args[0] if e == "TypeError": error_str = "Well bud, you got this far. Good job! But you still h*cked up the syntax:" error_str = self.build_help_str(ctx.command.name, error_str) if error_str is '': command_args = '' if len(ctx.args) > 0: if len(ctx.args[1].args) > 1: command_args = ctx.args[1].args[2:] else: command_args = '' await self.send_error_message( 'Geoffrey encountered unhandled exception: {} Command: **{}** Context: {}'.format(error, ctx.invoked_with, command_args)) error_message = ["```python"] for tb in traceback.format_tb(error.original.__traceback__): error_message.append(tb) error_message.append("```") error_str = bad_error_message.format(ctx.invoked_with) logger.error("Geoffrey encountered exception: %s", error) traceback.print_exception(type(error), error, error.__traceback__, file=sys.stderr) await ctx.message.channel.send('{} **Error Running Command:** {}'.format( ctx.message.author.mention, error_str)) def build_help_str(self, command_name, help_str=""): if "mod_" in command_name: return help_str for line in self.help_dict[command_name]: help_str += line return help_str async def send_error_message(self, msg): for user_id in self.error_users: user = self.get_user(user_id) if user is None: continue if msg is list: await self.send_list(user, msg) else: await user.send(msg) def run_command(self, command, **kwargs): url = self.base_url + '/api/command/{}' return requests.get(url=url.format(command), params=kwargs).json() @staticmethod async def send_formatted_message(ctx, msg_format, *args, ping_user=False): if ping_user: msg_format = "%s %s" % (ctx.message.author, msg_format) msg = format_message(msg_format, *args) ctx.send(msg) @staticmethod async def send_list(ctx, send_list): msg = "" for s in send_list: if (len(msg) + len(s)) > 2000: await ctx.send(msg) msg = s else: msg = msg + '\n' + s if len(msg) > 0: await ctx.send(msg) def build_help_page(self): help_list = [] largest_command_size = 0 for command in self.command_list: if command["permission_level"] == "PLAYER": if largest_command_size < len(command["command"]): largest_command_size = len(command["command"]) for line in self.description.split("\n"): help_list.append(line) help_list.append("\n**__Commands:__**") for command in self.command_list: if command["permission_level"] == "PLAYER": name = command["command"] help_list.append("**{}{}** - {}".format(self.prefix, name, command["help"])) help_list.append("\nDo ?help to get more info on any of the commands above") return help_list def build_help_dict(self): help_dict = {} for command in self.command_list: command_name = command["command"] help_list = ['```', command["help"] + "\n"] c = self.get_command(command_name) if c is not None: help_msg = c.help help_list.append(help_msg.format(self.prefix)) help_list.append('```') help_dict[command_name] = help_list return help_dict