223 lines
7.9 KiB
Python
223 lines
7.9 KiB
Python
from discord import Game
|
|
from discord.ext import commands
|
|
from requests import exceptions as request_exception
|
|
from discord.utils import oauth_url
|
|
import requests
|
|
import logging
|
|
import time
|
|
import traceback
|
|
import sys
|
|
from GeoffreyBot.util import HandledError
|
|
from GeoffreyBot.geoffrey_formatter import format_message
|
|
|
|
logger = logging.getLogger('GeoffreyBot')
|
|
|
|
description = '''
|
|
Geoffrey (pronounced JOFF-ree) started his life as an inside joke none of you will understand.
|
|
At some point, she was to become an airhorn discord_bot. Now, they know where your stuff is.
|
|
|
|
If have a suggestion or if something is borked, you can PM my ding dong of a creator ZeroHD.
|
|
|
|
*You must use ?register before adding things to Geoffrey*
|
|
'''
|
|
|
|
bad_error_message = 'OOPSIE WOOPSIE!! Uwu We made a fucky wucky!! A wittle fucko boingo! The admins at our ' \
|
|
'headquarters are working VEWY HAWD to fix this! (Error in command {})'
|
|
|
|
|
|
class GeoffreyBot(commands.Bot):
|
|
def __init__(self, base_url, api_token):
|
|
self.base_url = base_url
|
|
self.api_token = api_token
|
|
|
|
settings_url = base_url + 'api/settings/'
|
|
command_list_url = base_url + 'api/command/commands'
|
|
|
|
logger.info("Connecting to the Geoffrey API... ")
|
|
|
|
fail_count = 0
|
|
while True:
|
|
try:
|
|
setting = requests.get(url=settings_url, params={"api": self.api_token}).json()
|
|
self.command_list = requests.get(url=command_list_url, params={"api": self.api_token}).json()
|
|
break
|
|
except Exception:
|
|
if fail_count > 20:
|
|
print("Failed to connect to Geoffrey API!")
|
|
quit(-1)
|
|
time.sleep(1)
|
|
fail_count += 1
|
|
|
|
self.prefix = setting['BOT_PREFIX']
|
|
self.error_users = setting['ERROR_USERS']
|
|
self.mod_ranks = setting['MOD_RANKS']
|
|
self.special_users = []
|
|
self.default_status = setting['STATUS']
|
|
|
|
super().__init__(command_prefix=self.prefix, description=description, case_insensitive=True, help_command=None)
|
|
|
|
self.load_extension('GeoffreyBot.geoffrey_api')
|
|
self.load_extension('GeoffreyBot.geoffrey_mod_cmds')
|
|
|
|
self.help_page = self.build_help_page()
|
|
self.help_dict = self.build_help_dict()
|
|
|
|
async def on_ready(self):
|
|
logger.info("%s Online, ID: %s", self.user.name, self.user.id)
|
|
info = await self.application_info()
|
|
url = oauth_url(info.id)
|
|
logger.info("Bot url: %s", url)
|
|
await self.change_presence(activity=Game(self.default_status))
|
|
|
|
async def on_command(self, ctx):
|
|
if ctx.invoked_subcommand is None:
|
|
subcommand = ""
|
|
else:
|
|
subcommand = ":" + ctx.invoked_subcommand.__str__()
|
|
|
|
logger.info("User %s, used command '%s%s' with arguments: %s", ctx.message.author, ctx.command.name, subcommand,
|
|
ctx.args[2:])
|
|
|
|
async def on_command_error(self, ctx, error):
|
|
error_str = ""
|
|
|
|
if isinstance(error, commands.CheckFailure):
|
|
error_str = "You do not have permission to run that whacky command dog."
|
|
elif hasattr(error, "original") and isinstance(error.original, HandledError):
|
|
return
|
|
elif isinstance(error, commands.errors.CommandNotFound):
|
|
return
|
|
elif isinstance(error, commands.errors.BadArgument) or isinstance(error, commands.MissingRequiredArgument) or (
|
|
hasattr(error, "original") and isinstance(error.original, ValueError)):
|
|
error_str = "Well bud, you got this far. Good job! But you still h*cked up the syntax:"
|
|
|
|
error_str = self.build_help_str(ctx.command.name, error_str)
|
|
|
|
elif hasattr(error, "original") and isinstance(error.original, request_exception.ConnectionError):
|
|
error_str = "Unable to connect to the database. Hopefully someone is working on this..."
|
|
await self.send_error_message("Can't connect to the GeoffreyAPI, is it offline?")
|
|
elif hasattr(error, "original"):
|
|
e = error.original.args[0]
|
|
|
|
if e == "TypeError":
|
|
error_str = "Well bud, you got this far. Good job! But you still h*cked up the syntax:"
|
|
|
|
error_str = self.build_help_str(ctx.command.name, error_str)
|
|
|
|
if error_str is '':
|
|
command_args = ''
|
|
if len(ctx.args) > 0:
|
|
if len(ctx.args[1].args) > 1:
|
|
command_args = ctx.args[1].args[2:]
|
|
else:
|
|
command_args = ''
|
|
|
|
await self.send_error_message(
|
|
'Geoffrey encountered unhandled exception: {} Command: **{}** Context: {}'.format(error,
|
|
ctx.invoked_with,
|
|
command_args))
|
|
|
|
error_message = ["```python"]
|
|
for tb in traceback.format_tb(error.original.__traceback__):
|
|
error_message.append(tb)
|
|
|
|
error_message.append("```")
|
|
|
|
error_str = bad_error_message.format(ctx.invoked_with)
|
|
|
|
logger.error("Geoffrey encountered exception: %s", error)
|
|
traceback.print_exception(type(error), error, error.__traceback__, file=sys.stderr)
|
|
|
|
await ctx.message.channel.send('{} **Error Running Command:** {}'.format(
|
|
ctx.message.author.mention, error_str))
|
|
|
|
def build_help_str(self, command_name, help_str=""):
|
|
|
|
if "mod_" in command_name:
|
|
return help_str
|
|
|
|
for line in self.help_dict[command_name]:
|
|
help_str += line
|
|
|
|
return help_str
|
|
|
|
async def send_error_message(self, msg):
|
|
for user_id in self.error_users:
|
|
user = self.get_user(user_id)
|
|
|
|
if user is None:
|
|
continue
|
|
|
|
if msg is list:
|
|
await self.send_list(user, msg)
|
|
else:
|
|
await user.send(msg)
|
|
|
|
def run_command(self, command, **kwargs):
|
|
url = self.base_url + '/api/command/{}'
|
|
|
|
return requests.get(url=url.format(command), params=kwargs).json()
|
|
|
|
@staticmethod
|
|
async def send_formatted_message(ctx, msg_format, *args, ping_user=False):
|
|
if ping_user:
|
|
msg_format = "%s %s" % (ctx.message.author, msg_format)
|
|
|
|
msg = format_message(msg_format, *args)
|
|
|
|
ctx.send(msg)
|
|
|
|
@staticmethod
|
|
async def send_list(ctx, send_list):
|
|
msg = ""
|
|
|
|
for s in send_list:
|
|
if (len(msg) + len(s)) > 2000:
|
|
await ctx.send(msg)
|
|
msg = s
|
|
else:
|
|
msg = msg + '\n' + s
|
|
|
|
if len(msg) > 0:
|
|
await ctx.send(msg)
|
|
|
|
def build_help_page(self):
|
|
help_list = []
|
|
largest_command_size = 0
|
|
for command in self.command_list:
|
|
if command["permission_level"] == "PLAYER":
|
|
if largest_command_size < len(command["command"]):
|
|
largest_command_size = len(command["command"])
|
|
|
|
for line in self.description.split("\n"):
|
|
help_list.append(line)
|
|
|
|
help_list.append("\n**__Commands:__**")
|
|
for command in self.command_list:
|
|
if command["permission_level"] == "PLAYER":
|
|
name = command["command"]
|
|
help_list.append("**{}{}** - {}".format(self.prefix, name, command["help"]))
|
|
|
|
help_list.append("\nDo ?help <command name> to get more info on any of the commands above")
|
|
|
|
return help_list
|
|
|
|
def build_help_dict(self):
|
|
help_dict = {}
|
|
for command in self.command_list:
|
|
command_name = command["command"]
|
|
|
|
help_list = ['```', command["help"] + "\n"]
|
|
|
|
c = self.get_command(command_name)
|
|
|
|
if c is not None:
|
|
help_msg = c.help
|
|
help_list.append(help_msg.format(self.prefix))
|
|
|
|
help_list.append('```')
|
|
|
|
help_dict[command_name] = help_list
|
|
|
|
return help_dict
|