# coding: utf-8 from __future__ import absolute_import from __future__ import division from __future__ import print_function from __future__ import unicode_literals import errno import io import json import logging import os import re import sys import time import webbrowser from datetime import datetime from datetime import timedelta from textwrap import dedent import bs4 import pkg_resources import requests from termcolor import colored from termcolor import cprint from .exceptions import AocdError from .exceptions import DeadTokenError from .exceptions import UnknownUserError from .exceptions import PuzzleUnsolvedError from .exceptions import PuzzleLockedError from .utils import AOC_TZ from .utils import _ensure_intermediate_dirs from .utils import atomic_write_file from .utils import get_owner from .version import __version__ log = logging.getLogger(__name__) AOCD_DATA_DIR = os.path.expanduser(os.environ.get("AOCD_DIR", os.path.join("~", ".config", "aocd"))) AOCD_CONFIG_DIR = os.path.expanduser(os.environ.get("AOCD_CONFIG_DIR", AOCD_DATA_DIR)) URL = "https://adventofcode.com/{year}/day/{day}" USER_AGENT = {"User-Agent": "github.com/wimglenn/advent-of-code-data v{} by hey@wimglenn.com".format(__version__)} class User(object): _token2id = None def __init__(self, token): self.token = token self._owner = "unknown.unknown.0" @classmethod def from_id(cls, id): users = _load_users() if id not in users: raise UnknownUserError("User with id '{}' is not known".format(id)) user = cls(users[id]) user._owner = id return user @property def auth(self): return {"session": self.token} @property def id(self): fname = os.path.join(AOCD_CONFIG_DIR, "token2id.json") if User._token2id is None: try: with io.open(fname, encoding="utf-8") as f: log.debug("loading user id memo from %s", fname) User._token2id = json.load(f) except (IOError, OSError) as err: if err.errno != errno.ENOENT: raise User._token2id = {} if self.token not in User._token2id: log.debug("token not found in memo, attempting to determine user id") owner = get_owner(self.token) log.debug("got owner=%s, adding to memo", owner) User._token2id[self.token] = owner _ensure_intermediate_dirs(fname) with open(fname, "w") as f: json.dump(User._token2id, f, sort_keys=True, indent=2) else: owner = User._token2id[self.token] if self._owner == "unknown.unknown.0": self._owner = owner return owner def __str__(self): return "<{} {} (token=...{})>".format(type(self).__name__, self._owner, self.token[-4:]) @property def memo_dir(self): return os.path.join(AOCD_DATA_DIR, self.id) def get_stats(self, years=None): aoc_now = datetime.now(tz=AOC_TZ) all_years = range(2015, aoc_now.year + int(aoc_now.month == 12)) if isinstance(years, int) and years in all_years: years = (years,) if years is None: years = all_years days = {str(i) for i in range(1, 26)} results = {} for year in years: url = "https://adventofcode.com/{}/leaderboard/self".format(year) response = requests.get(url, cookies=self.auth, headers=USER_AGENT) response.raise_for_status() soup = bs4.BeautifulSoup(response.text, "html.parser") if soup.article is None and "You haven't collected any stars" in soup.main.text: continue if soup.article.pre is None and "overall leaderboard" in soup.article.text: msg = "the auth token ...{} is expired or not functioning" raise DeadTokenError(msg.format(self.token[-4:])) stats_txt = soup.article.pre.text lines = stats_txt.splitlines() lines = [x for x in lines if x.split()[0] in days] for line in reversed(lines): vals = line.split() day = int(vals[0]) results[year, day] = {} results[year, day]["a"] = { "time": _parse_duration(vals[1]), "rank": int(vals[2]), "score": int(vals[3]), } if vals[4] != "-": results[year, day]["b"] = { "time": _parse_duration(vals[4]), "rank": int(vals[5]), "score": int(vals[6]), } return results def default_user(): # export your session id as AOC_SESSION env var cookie = os.getenv("AOC_SESSION") if cookie: return User(token=cookie) # or chuck it in a plaintext file at ~/.config/aocd/token try: with io.open(os.path.join(AOCD_CONFIG_DIR, "token"), encoding="utf-8") as f: cookie = f.read().split()[0] except (IOError, OSError) as err: if err.errno != errno.ENOENT: raise if cookie: return User(token=cookie) msg = dedent( """\ ERROR: AoC session ID is needed to get your puzzle data! You can find it in your browser cookies after login. 1) Save the cookie into a text file {}, or 2) Export the cookie in environment variable AOC_SESSION See https://github.com/wimglenn/advent-of-code-wim/issues/1 for more info. """ ) cprint(msg.format(os.path.join(AOCD_CONFIG_DIR, "token")), color="red", file=sys.stderr) raise AocdError("Missing session ID") class Puzzle(object): def __init__(self, year, day, user=None): self.year = year self.day = day if user is None: user = default_user() self._user = user self.input_data_url = self.url + "/input" self.submit_url = self.url + "/answer" fname = "{}_{:02d}".format(self.year, self.day) prefix = os.path.join(self.user.memo_dir, fname) self.input_data_fname = prefix + "_input.txt" self.example_input_data_fname = prefix + "_example_input.txt" self.answer_a_fname = prefix + "a_answer.txt" self.answer_b_fname = prefix + "b_answer.txt" self.incorrect_answers_a_fname = prefix + "a_bad_answers.txt" self.incorrect_answers_b_fname = prefix + "b_bad_answers.txt" self.title_fname = os.path.join( AOCD_DATA_DIR, "titles", "{}_{:02d}.txt".format(self.year, self.day) ) self._title = "" @property def user(self): return self._user @property def input_data(self): try: # use previously received data, if any existing with io.open(self.input_data_fname, encoding="utf-8") as f: data = f.read() except (IOError, OSError) as err: if err.errno != errno.ENOENT: raise else: log.debug("reusing existing data %s", self.input_data_fname) return data.rstrip("\r\n") sanitized = "..." + self.user.token[-4:] log.info("getting data year=%s day=%s token=%s", self.year, self.day, sanitized) response = requests.get( url=self.input_data_url, cookies=self.user.auth, headers=USER_AGENT ) if not response.ok: if response.status_code == 404: raise PuzzleLockedError("{}/{:02d} not available yet".format(self.year, self.day)) log.error("got %s status code token=%s", response.status_code, sanitized) log.error(response.text) raise AocdError("Unexpected response") data = response.text log.info("saving the puzzle input token=%s", sanitized) atomic_write_file(self.input_data_fname, data) return data.rstrip("\r\n") @property def example_data(self): try: with io.open(self.example_input_data_fname, encoding="utf-8") as f: data = f.read() except (IOError, OSError) as err: if err.errno != errno.ENOENT: raise else: log.debug("reusing existing example data %s", self.example_input_data_fname) return data.rstrip("\r\n") soup = self._soup() try: data = soup.pre.text except Exception: log.info("unable to find example data year=%s day=%s", self.year, self.day) data = "" log.info("saving the example data") atomic_write_file(self.example_input_data_fname, data) return data.rstrip("\r\n") @property def title(self): if os.path.isfile(self.title_fname): with io.open(self.title_fname, encoding="utf-8") as f: self._title = f.read().strip() else: self._save_title() return self._title def _repr_pretty_(self, p, cycle): # this is a hook for IPython's pretty-printer if cycle: p.text(repr(self)) else: template = "<{0}({1.year}, {1.day}) at {2} - {1.title}>" p.text(template.format(type(self).__name__, self, hex(id(self)))) @property def answer_a(self): try: return self._get_answer(part="a") except PuzzleUnsolvedError: raise AttributeError("answer_a") @answer_a.setter def answer_a(self, val): if isinstance(val, int): val = str(val) if getattr(self, "answer_a", None) == val: return self._submit(value=val, part="a") @property def answered_a(self): return bool(getattr(self, "answer_a", None)) @property def answer_b(self): try: return self._get_answer(part="b") except PuzzleUnsolvedError: raise AttributeError("answer_b") @answer_b.setter def answer_b(self, val): if isinstance(val, int): val = str(val) if getattr(self, "answer_b", None) == val: return self._submit(value=val, part="b") @property def answered_b(self): return bool(getattr(self, "answer_b", None)) def answered(self, part): if part == "a": return bool(getattr(self, "answer_a", None)) if part == "b": return bool(getattr(self, "answer_b", None)) raise AocdError('part must be "a" or "b"') @property def answers(self): return self.answer_a, self.answer_b @answers.setter def answers(self, val): self.answer_a, self.answer_b = val @property def incorrect_answers_a(self): return self._get_bad_guesses(part="a") @property def incorrect_answers_b(self): return self._get_bad_guesses(part="b") def _submit(self, value, part, reopen=True, quiet=False): if value in {u"", b"", None, b"None", u"None"}: raise AocdError("cowardly refusing to submit non-answer: {!r}".format(value)) value = str(value) part = str(part).replace("1", "a").replace("2", "b").lower() if part not in {"a", "b"}: raise AocdError('part must be "a" or "b"') bad_guesses = getattr(self, "incorrect_answers_" + part) if value in bad_guesses: if not quiet: msg = "aocd will not submit that answer again. You've previously guessed {} and the server responded:" print(msg.format(value)) cprint(bad_guesses[value], "red") return if part == "b" and value == getattr(self, "answer_a", None): raise AocdError("cowardly refusing to re-submit answer_a ({}) for part b".format(value)) url = self.submit_url check_guess = self._check_guess_against_existing(value, part) if check_guess is not None: if quiet: log.info(check_guess) else: print(check_guess) return sanitized = "..." + self.user.token[-4:] log.info("posting %r to %s (part %s) token=%s", value, url, part, sanitized) level = {"a": 1, "b": 2}[part] response = requests.post( url=url, cookies=self.user.auth, headers=USER_AGENT, data={"level": level, "answer": value}, ) if not response.ok: log.error("got %s status code", response.status_code) log.error(response.text) raise AocdError("Non-200 response for POST: {}".format(response)) soup = bs4.BeautifulSoup(response.text, "html.parser") message = soup.article.text color = None if "That's the right answer" in message: color = "green" if reopen: # So you can read part B on the website... part_b_url = self.url + "#part2" log.info("reopening to %s", part_b_url) webbrowser.open(part_b_url) if not (self.day == 25 and part == "b"): self._save_correct_answer(value=value, part=part) if self.day == 25 and part == "a": log.debug("checking if got 49 stars already for year %s...", self.year) my_stats = self.user.get_stats(self.year) n_stars = sum(len(val) for val in my_stats.values()) if n_stars == 49: log.info("Got 49 stars already, getting 50th...") self._submit(value="done", part="b", reopen=reopen, quiet=quiet) else: log.info("Got %d stars, need %d more for part b", n_stars, 49 - n_stars) elif "Did you already complete it" in message: color = "yellow" elif "That's not the right answer" in message: color = "red" try: context = soup.article.span.code.text except AttributeError: context = soup.article.text log.warning("wrong answer: %s", context) self._save_incorrect_answer(value=value, part=part, extra=soup.article.text) elif "You gave an answer too recently" in message: wait_pattern = r"You have (?:(\d+)m )?(\d+)s left to wait" try: [(minutes, seconds)] = re.findall(wait_pattern, message) except ValueError: log.warning(message) color = "red" else: wait_time = int(seconds) if minutes: wait_time += 60 * int(minutes) log.info("Waiting %d seconds to autoretry", wait_time) time.sleep(wait_time) return self._submit(value=value, part=part, reopen=reopen, quiet=quiet) if not quiet: cprint(message, color=color) return response def _check_guess_against_existing(self, guess, part): try: answer = self._get_answer(part=part) if answer == "": return None except PuzzleUnsolvedError: return None if answer == guess: template = "Part {part} already solved with same answer: {answer}" else: template = colored("Part {part} already solved with different answer: {answer}", "red") return template.format(part=part, answer=answer) def _save_correct_answer(self, value, part): fname = getattr(self, "answer_{}_fname".format(part)) _ensure_intermediate_dirs(fname) txt = value.strip() msg = "saving" if os.path.isfile(fname): with open(fname) as f: prev = f.read() if txt == prev: msg = "the correct answer for %d/%02d part %s was already saved" log.debug(msg, self.year, self.day, part) return msg = "overwriting" msg += " the correct answer for %d/%02d part %s: %s" log.info(msg, self.year, self.day, part, txt) with open(fname, "w") as f: f.write(txt) def _save_incorrect_answer(self, value, part, extra=""): fname = getattr(self, "incorrect_answers_{}_fname".format(part)) _ensure_intermediate_dirs(fname) msg = "appending an incorrect answer for %d/%02d part %s" log.info(msg, self.year, self.day, part) with open(fname, "a") as f: f.write(value.strip() + " " + extra.replace("\n", " ") + "\n") def _save_title(self, soup=None): if soup is None: soup = self._soup() if soup.h2 is None: log.warning("heading not found") return txt = soup.h2.text.strip("- ") prefix = "Day {}: ".format(self.day) if not txt.startswith(prefix): log.error("weird heading, wtf? %s", txt) return txt = self._title = txt[len(prefix) :] _ensure_intermediate_dirs(self.title_fname) with io.open(self.title_fname, "w", encoding="utf-8") as f: print(txt, file=f) def _get_answer(self, part): """ Note: Answers are only revealed after a correct submission. If you've not already solved the puzzle, PuzzleUnsolvedError will be raised. """ if part == "b" and self.day == 25: return "" answer_fname = getattr(self, "answer_{}_fname".format(part)) if os.path.isfile(answer_fname): with open(answer_fname) as f: return f.read().strip() # scrape puzzle page for any previously solved answers soup = self._soup() if not self._title: # may as well save this while we're here self._save_title(soup=soup) hit = "Your puzzle answer was" paras = [p for p in soup.find_all("p") if p.text.startswith(hit)] if paras: parta_correct_answer = paras[0].code.text self._save_correct_answer(value=parta_correct_answer, part="a") if len(paras) > 1: _p1, p2 = paras partb_correct_answer = p2.code.text self._save_correct_answer(value=partb_correct_answer, part="b") if os.path.isfile(answer_fname): with open(answer_fname) as f: return f.read().strip() msg = "Answer {}-{}{} is not available".format(self.year, self.day, part) raise PuzzleUnsolvedError(msg) def _get_bad_guesses(self, part): fname = getattr(self, "incorrect_answers_{}_fname".format(part)) result = {} if os.path.isfile(fname): with open(fname) as f: for line in f: answer, _sep, extra = line.strip().partition(" ") result[answer] = extra return result def solve(self): try: [ep] = pkg_resources.iter_entry_points(group="adventofcode.user") except ValueError: raise AocdError("Puzzle.solve is only available with unique entry point") f = ep.load() return f(year=self.year, day=self.day, data=self.input_data) def solve_for(self, plugin): for ep in pkg_resources.iter_entry_points(group="adventofcode.user"): if ep.name == plugin: break else: raise AocdError("No entry point found for '{}'".format(plugin)) f = ep.load() return f(year=self.year, day=self.day, data=self.input_data) @property def url(self): return URL.format(year=self.year, day=self.day) def view(self): webbrowser.open(self.url) @property def my_stats(self): stats = self.user.get_stats(years=[self.year]) if (self.year, self.day) not in stats: raise PuzzleUnsolvedError result = stats[self.year, self.day] return result def _soup(self): response = requests.get(self.url, cookies=self.user.auth, headers=USER_AGENT) response.raise_for_status() soup = bs4.BeautifulSoup(response.text, "html.parser") return soup @property def easter_eggs(self): soup = self._soup() # Most puzzles have exactly one easter-egg, but 2018/12/17 had two.. eggs = soup.find_all(["span", "em", "code"], class_=None, attrs={"title": bool}) return eggs def _parse_duration(s): """Parse a string like 01:11:16 (hours, minutes, seconds) into a timedelta""" if s == ">24h": return timedelta(hours=24) h, m, s = [int(x) for x in s.split(":")] return timedelta(hours=h, minutes=m, seconds=s) def _load_users(): path = os.path.join(AOCD_CONFIG_DIR, "tokens.json") try: with open(path) as f: users = json.load(f) except IOError: users = {"default": default_user().token} return users