minecraft_manager/api/api.py

130 lines
4.8 KiB
Python

import socket, requests, logging, os, datetime, pytz, mcstatus, random, string
from minecraft_manager.models import Alert
from django.contrib.auth.models import User
from django.conf import settings
logger = logging.getLogger(__name__)
def create_alert(message):
users = User.objects.filter(is_active=True)
for user in users:
alert = Alert(user=user, message=message)
alert.save()
PLUGIN_APPLICATION = 'application'
PLUGIN_ACCEPT = 'accept'
PLUGIN_DENY = 'deny'
PLUGIN_GLOBAL_CHAT = 'global'
PLUGIN_STAFF_CHAT = 'staff'
PLUGIN_DEMOTE = 'demote'
def plugin(key, command):
try:
host = '127.0.0.1'
port = getattr(settings, 'PLUGIN_PORT', None)
full_command = "{0} {1}".format(key, command)
if port and plugin_exists():
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect((host, port))
sock.sendall(full_command.encode('utf-8'))
sock.close()
return True
except:
pass
return False
def discord_mcm(message='', embed=None, ping=False):
discord_mcm_webhook = getattr(settings, 'DISCORD_MCM_WEBHOOK', None)
if discord_mcm_webhook:
return post_webhook(discord_mcm_webhook, message, embed, ping)
return None
def discord_notification(message='', embed=None, ping=False):
discord_notification_webhook = getattr(settings, 'DISCORD_NOTIFICATION_WEBHOOK', None)
if discord_notification_webhook:
return post_webhook(discord_notification_webhook, message, embed, ping)
return None
def post_webhook(webhook_url, message, embed, ping):
ping_list = getattr(settings, 'DISCORD_PING_LIST', [])
if ping and ping_list:
ping_list = ["<@&{0}>".format(ping) for ping in ping_list]
message = "{0}\n{1}".format(" ".join(ping_list), message)
data = {}
if message:
data['content'] = message
if embed:
data['embeds'] = [embed.to_dict()]
return requests.post(webhook_url, json=data)
def strip_format(message):
return message.replace("§0", "").replace("§1", "").replace("§2", "").replace("§3", "").replace("§4", "") \
.replace("§5", "").replace("§6", "").replace("§7", "").replace("§8", "").replace("§9", "").replace("§a", "") \
.replace("§b", "").replace("§c", "").replace("§d", "").replace("§e", "").replace("§f", "").replace("§r", "") \
.replace("§k", "").replace("§l", "").replace("§m", "").replace("§n", "").replace("§o", "")
def plugin_exists():
return os.path.exists(os.path.join(getattr(settings, 'MINECRAFT_BASE_DIR', ''), 'plugins/MinecraftManager'))
def get_chats(as_timezone):
global_log = getattr(settings, 'GLOBAL_LOG', "")
staff_log = getattr(settings, 'STAFF_LOG', "")
log_time_format = getattr(settings, 'LOG_TIME_FORMAT', '%m/%d/%y %I:%M %p')
if plugin_exists():
try:
with open(global_log, encoding='utf-8') as g:
g_chat = g.readlines()[-100:]
except OSError:
g_chat = []
for idx, gc in enumerate(g_chat):
try:
date_str = gc.split("] ")[0].replace("[", "")
msg = gc.split("] ", 1)[1]
dt = datetime.datetime.strptime(date_str, log_time_format)
dt = pytz.timezone('US/Central').localize(dt)
dt = dt.astimezone(pytz.timezone(as_timezone))
g_chat[idx] = {"date": dt.strftime(log_time_format), "text": msg}
except:
g_chat[idx] = {"date": "01/01/17 00:00 AM", "name": "Error", "text": "Parsing Line"}
try:
with open(staff_log, encoding='utf-8') as s:
s_chat = s.readlines()[-100:]
except OSError:
s_chat = []
for idx, sc in enumerate(s_chat):
try:
date_str = sc.split("] ")[0].replace("[", "")
msg = sc.split("] ", 1)[1]
dt = datetime.datetime.strptime(date_str, log_time_format)
dt = pytz.timezone('US/Central').localize(dt)
dt = dt.astimezone(pytz.timezone(as_timezone))
s_chat[idx] = {"date": dt.strftime(log_time_format), "text": msg}
except:
s_chat[idx] = {"date": "01/01/17 00:00 AM", "name": "Error", "text": "Parsing Line"}
return {'global': g_chat, 'staff': s_chat}
return None
def get_query():
try:
server = mcstatus.MinecraftServer.lookup(getattr(settings, 'SERVER_QUERY_IP', None))
query = server.query()
return {'max': str(query.players.max), 'online': str(query.players.online),
'players': sorted(query.players.names)}
except:
return {'max': 0, 'online': 0,
'players': []}
def generate_password(size=20):
return "".join([random.choice(string.ascii_letters + string.digits) for idx in range(0, size)])