forked from Minecraft/minecraft_manager
291 lines
13 KiB
Python
291 lines
13 KiB
Python
import discord
|
|
from io import StringIO
|
|
from discord.ext import commands
|
|
from django.contrib.auth.models import User
|
|
from django.db import close_old_connections
|
|
|
|
from minecraft_manager.api import api
|
|
from minecraft_manager.bot.utils import get_application
|
|
from minecraft_manager.models import Application, Player
|
|
from minecraft_manager.utils import build_application, full_static
|
|
|
|
|
|
class Commands(commands.Cog):
|
|
|
|
def __init__(self, bot):
|
|
self.bot = bot
|
|
|
|
async def cog_before_invoke(self, ctx):
|
|
# FIX STALE DB CONNECTIONS
|
|
close_old_connections()
|
|
|
|
async def cog_check(self, ctx):
|
|
# No DMs
|
|
if ctx.guild is None:
|
|
return False
|
|
|
|
# Check roles
|
|
if not hasattr(ctx.author, "roles"):
|
|
return False
|
|
|
|
for role in ctx.author.roles:
|
|
for auth_role in self.bot.auth_roles:
|
|
if role.id == auth_role:
|
|
return True
|
|
return False
|
|
|
|
def is_superuser(self, member: discord.Member):
|
|
for role in member.roles:
|
|
for auth_role in self.bot.superuser_roles:
|
|
if role.id == auth_role:
|
|
return True
|
|
return False
|
|
|
|
@commands.command()
|
|
async def help(self, ctx):
|
|
embed = discord.Embed(colour=discord.Colour(0x417505))
|
|
embed.set_thumbnail(
|
|
url=full_static('favicon.png'))
|
|
embed.title = "Minecraft Manager Help"
|
|
embed.add_field(name="{}app search <username>".format(self.bot.prefix),
|
|
value="Search for applications by partial or exact username.")
|
|
embed.add_field(name="{}app info <app ID>".format(self.bot.prefix),
|
|
value="Get detailed information about a specific application.")
|
|
embed.add_field(name="{}app accept|deny <app ID>".format(self.bot.prefix), value="Take action on an application.")
|
|
embed.add_field(name="{}demote <username>".format(self.bot.prefix),
|
|
value="Demote a player to the role given to accepted applications.")
|
|
embed.add_field(name="{}compare".format(self.bot.prefix), value="Compare Discord users to the Whitelist.")
|
|
embed.add_field(name="{}sync".format(self.bot.prefix), value="Sync Discord users with Player records.")
|
|
await self.bot.discord_message(ctx.message.channel, embed)
|
|
|
|
@commands.group("app", aliases=["application"])
|
|
async def app(self, ctx):
|
|
if ctx.invoked_subcommand is None:
|
|
await self.bot.discord_message(ctx.message.channel, "No sub-command supplied. Info, Search, Accept, or Deny.")
|
|
|
|
@commands.command()
|
|
async def info(self, ctx, *args):
|
|
await self._info(ctx, *args)
|
|
|
|
@app.command("info")
|
|
async def app_info(self, ctx, *args):
|
|
await self._info(ctx, *args)
|
|
|
|
async def _info(self, ctx, *args):
|
|
if len(args) == 0:
|
|
await self.bot.discord_message(ctx.message.channel, "Info requires an application ID or username.")
|
|
|
|
key = args[0]
|
|
is_id = True
|
|
try:
|
|
int(key)
|
|
except:
|
|
is_id = False
|
|
|
|
application = None
|
|
if is_id:
|
|
application = get_application(key)
|
|
if not application:
|
|
await self.bot.discord_message(ctx.message.channel, "An Application with that ID doesn't exist.")
|
|
return
|
|
else:
|
|
found = False
|
|
applications = Application.objects.filter(username__icontains=key)
|
|
if len(applications) == 0:
|
|
applications = Application.objects.filter(player__username__icontains=key)
|
|
if len(applications) == 1:
|
|
await self.bot.discord_message(ctx.message.channel, "**No applications matched, however there is a player match**")
|
|
application = applications[0]
|
|
found = True
|
|
elif len(applications) == 1:
|
|
application = applications[0]
|
|
found = True
|
|
if not found:
|
|
await self.bot.discord_message(ctx.message.channel, "An exact Application could not be found. Try search instead.")
|
|
return
|
|
await self.bot.discord_message(ctx.message.channel, build_application(application))
|
|
|
|
@commands.command()
|
|
async def accept(self, ctx, app_id: int):
|
|
await self._accept(ctx, app_id)
|
|
|
|
@app.command("accept")
|
|
async def app_accept(self, ctx, app_id: int):
|
|
await self._accept(ctx, app_id)
|
|
|
|
async def _accept(self, ctx, app_id: int):
|
|
application = get_application(app_id)
|
|
if not application:
|
|
await self.bot.discord_message(ctx.message.channel, "An Application with that ID doesn't exist.")
|
|
return
|
|
|
|
if not application.accepted:
|
|
application.accepted = True
|
|
application.save()
|
|
if Player.objects.filter(username__iexact=application.username).exists():
|
|
player = Player.objects.get(username__iexact=application.username)
|
|
player.application_id = application.id
|
|
player.save()
|
|
await self.bot.discord_message(ctx.message.channel, "App ID **{0}** was successfully accepted.".format(app_id))
|
|
if not api.plugin(api.PLUGIN_ACCEPT, application.username):
|
|
await self.bot.discord_message(ctx.message.channel, "Could not accept in-game, is the server running?")
|
|
|
|
@commands.command()
|
|
async def deny(self, ctx, app_id: int):
|
|
await self._deny(ctx, app_id)
|
|
|
|
@app.command("deny")
|
|
async def app_deny(self, ctx, app_id: int):
|
|
await self._deny(ctx, app_id)
|
|
|
|
async def _deny(self, ctx, app_id: int):
|
|
application = get_application(app_id)
|
|
if not application:
|
|
await self.bot.discord_message(ctx.message.channel, "An Application with that ID doesn't exist.")
|
|
return
|
|
|
|
if not application.accepted:
|
|
application.accepted = False
|
|
application.save()
|
|
if Player.objects.filter(username__iexact=application.username).exists():
|
|
player = Player.objects.get(username__iexact=application.username)
|
|
player.application_id = application.id
|
|
player.save()
|
|
await self.bot.discord_message(ctx.message.channel, "App ID **{0}** was successfully denied.".format(app_id))
|
|
if not api.plugin(api.PLUGIN_DENY, application.username):
|
|
await self.bot.discord_message(ctx.message.channel, "Could not deny in-game, is the server running?")
|
|
|
|
@commands.command()
|
|
async def clear(self, ctx, app_id: int):
|
|
await self._clear(ctx, app_id)
|
|
|
|
@app.command("clear")
|
|
async def app_clear(self, ctx, app_id: int):
|
|
await self._clear(ctx, app_id)
|
|
|
|
async def _clear(self, ctx, app_id: int):
|
|
application = get_application(app_id)
|
|
if not application:
|
|
await self.bot.discord_message(ctx.message.channel, "An Application with that ID doesn't exist.")
|
|
return
|
|
|
|
if not application.accepted:
|
|
application.accepted = None
|
|
application.save()
|
|
if Player.objects.filter(username__iexact=application.username).exists():
|
|
player = Player.objects.get(username__iexact=application.username)
|
|
player.application_id = application.id
|
|
player.save()
|
|
await self.bot.discord_message(ctx.message.channel, "App ID **{0}** was successfully cleared.".format(app_id))
|
|
|
|
@commands.command()
|
|
async def search(self, ctx, search: str):
|
|
await self._search(ctx, search)
|
|
|
|
@app.command("search")
|
|
async def app_search(self, ctx, search: str):
|
|
await self._search(ctx, search)
|
|
|
|
async def _search(self, ctx, search: str):
|
|
applications = Application.objects.filter(username__icontains=search)[:10]
|
|
count = Application.objects.filter(username__icontains=search).count()
|
|
if count > 0:
|
|
if count == 1:
|
|
info = build_application(applications[0])
|
|
else:
|
|
info = "**Found the following applications**"
|
|
for app in applications:
|
|
info += "\n{0} - {1} ({2})".format(app.id, app.username.replace("_", "\\_"), app.status)
|
|
if count > 10:
|
|
info += "\n**This is only 10 applications out of {0} found. Please narrow your search if possible.**".format(len(applications))
|
|
else:
|
|
players = Player.objects.filter(username__icontains=search, application__isnull=False)[:10]
|
|
count = Player.objects.filter(username__icontains=search, application__isnull=False).count()
|
|
if count > 0:
|
|
if count == 1:
|
|
await self.bot.discord_message(ctx.message.channel, "**No applications matched, however there is a player match**")
|
|
info = build_application(players[0].application)
|
|
else:
|
|
info = "**No applications matched, however there are player matches**"
|
|
for player in players:
|
|
app = player.application
|
|
info += "\n{0} - {1} AKA {2} ({3})".format(app.id, app.username.replace("_", "\\_"),
|
|
player.username.replace("_", "\\_"), app.status)
|
|
if count > 10:
|
|
info += "\n**This is only 10 players out of {0} found. Please narrow your search if possible.**".format(len(players))
|
|
else:
|
|
info = "No applications matched that search."
|
|
await self.bot.discord_message(ctx.message.channel, info)
|
|
|
|
@commands.command()
|
|
async def demote(self, ctx, username: str):
|
|
if not self.is_superuser(ctx.author):
|
|
return
|
|
await ctx.message.delete()
|
|
if api.plugin(api.PLUGIN_DEMOTE, username):
|
|
await self.bot.discord_message(ctx.message.channel, "{} has been demoted in-game.".format(username))
|
|
else:
|
|
await self.bot.discord_message(ctx.message.channel, "{} could not be demoted in-game, is the server running?".format(username))
|
|
if User.objects.filter(username__iexact=username).exists():
|
|
user = User.objects.get(username__iexact=username)
|
|
user.is_active = False
|
|
user.save()
|
|
await self.bot.discord_message(ctx.message.channel, "{} has been de-activated in MCM.".format(username))
|
|
else:
|
|
await self.bot.discord_message(ctx.message.channel, "{} could not be found in MCM, is their account up-to-date?".format(username))
|
|
|
|
@commands.command()
|
|
async def compare(self, ctx):
|
|
await ctx.message.delete()
|
|
await ctx.message.channel.trigger_typing()
|
|
no_player = []
|
|
no_application = []
|
|
for member in ctx.message.guild.members:
|
|
if member.bot:
|
|
continue
|
|
name = member.nick if member.nick else member.name
|
|
try:
|
|
Player.objects.get(username__iexact=name)
|
|
except:
|
|
no_player.append(name)
|
|
try:
|
|
Application.objects.get(username__iexact=name)
|
|
except:
|
|
no_player = no_player[:-1]
|
|
no_application.append(name)
|
|
if no_player:
|
|
header = "**The following users have an application match, but no player match on the whitelist:**\n"
|
|
await self.bot.discord_message(ctx.author, "{}```{}```".format(header, "\n".join(no_player)))
|
|
if no_application:
|
|
header = "**The following users do not have an application or player match on the whitelist:**\n"
|
|
await self.bot.discord_message(ctx.author, "{}```{}```".format(header, "\n".join(no_application)))
|
|
|
|
@commands.command()
|
|
async def sync(self, ctx):
|
|
def sync_player(player):
|
|
for member in ctx.guild.members:
|
|
name = member.nick if member.nick else member.name
|
|
if player.username.lower() == name.lower():
|
|
player.discord_id = member.id
|
|
player.save()
|
|
return True
|
|
return False
|
|
# Attempt to sync users
|
|
need_sync = Player.objects.filter(discord_id__isnull=True)
|
|
need_sync_count = need_sync.count()
|
|
synced = []
|
|
not_synced = []
|
|
if need_sync_count > 0:
|
|
for ns in need_sync:
|
|
if sync_player(ns):
|
|
synced.append(ns.username)
|
|
else:
|
|
not_synced.append(ns.username)
|
|
txt = "Synced\n\t{}\n\nNot Synced\n\t{}".format('\n\t'.join(synced), '\n\t'.join(not_synced))
|
|
attach = discord.File(fp=StringIO(txt), filename="sync.txt")
|
|
await ctx.channel.send(content="Successfully synced {}/{} players.".format(len(synced), need_sync_count), file=attach)
|
|
|
|
|
|
def setup(bot):
|
|
bot.add_cog(Commands(bot))
|