From b86db5c4796e0a84740c0c8ec3b564d0a0d6ccc8 Mon Sep 17 00:00:00 2001 From: Sage Vaillancourt Date: Wed, 28 Sep 2022 17:53:32 -0400 Subject: [PATCH] A good bit of refactoring. Also start adding framework for user "tiers". This refactoring is mostly type-hints and f-strings. Some functions were also re-organized. --- undercover/__init__.py | 2 +- undercover/db.py | 46 ++++++------- undercover/email.py | 1 - undercover/fallback.py | 10 +-- undercover/pdf_builder.py | 135 +++++++++++++++++++------------------- undercover/routes.py | 57 +++++++++------- 6 files changed, 130 insertions(+), 121 deletions(-) diff --git a/undercover/__init__.py b/undercover/__init__.py index c69fbf9..5908d2a 100644 --- a/undercover/__init__.py +++ b/undercover/__init__.py @@ -9,7 +9,7 @@ from flask_minify import Minify import undercover.routes -def create_app(test_config=None): +def create_app(test_config=None) -> Flask: app = Flask(__name__, instance_relative_config=True) Minify(app=app, html=True, js=True, cssless=True) secret_key = os.environ.get('UNDERCOVER_SECRET_KEY') diff --git a/undercover/db.py b/undercover/db.py index d77d9f9..c5fbbf1 100644 --- a/undercover/db.py +++ b/undercover/db.py @@ -23,12 +23,15 @@ class Letter: class User: id: int email: str + tier: int + + def in_free_tier(self) -> bool: + return self.tier == 1 @dataclass class UserWithHash: - id: int - email: str + user: User password_hash: str @@ -40,7 +43,7 @@ db_user = os.environ.get('UNDERCOVER_POSTGRES_USER') db_available = host and db_name and port and db_user and os.environ.get('UNDERCOVER_POSTGRES_PASSWORD') if db_available: - def connect(): + def connect() -> psycopg.Connection: return psycopg.connect( host=host, dbname=db_name, @@ -51,27 +54,26 @@ else: sys.stderr.write('Database login not configured: DB access is disabled.\n') sys.stderr.write(' To enable, ensure UNDERCOVER_POSTGRES_{HOST,DBNAME,PORT,USER,PASSWORD} are set.\n') - def connect(): + def connect() -> object: return MockConnection() - -def login(user_email: str, password: str): +def login(user_email: str, password: str) -> bool: pw_bytes: bytes = password.encode('utf-8') - user = __get_user(user_email) + user = __get_user_with_hash(user_email) if user: return bcrypt.checkpw(pw_bytes, user.password_hash.encode('utf-8')) return False -def __gen_pw_hash(password: str): +def __gen_pw_hash(password: str) -> str: pw_bytes = password.encode('utf-8') salt = bcrypt.gensalt() pw_hash = bcrypt.hashpw(pw_bytes, salt) return pw_hash.decode('utf-8') -def add_user(username: str, password: str): +def add_user(username: str, password: str) -> None: pw_hash = __gen_pw_hash(password) with connect() as con: cur = con.cursor() @@ -79,14 +81,14 @@ def add_user(username: str, password: str): con.commit() -def delete_user(username: str): +def delete_user(username: str) -> None: with connect() as con: cur = con.cursor() cur.execute("DELETE FROM users WHERE email = %s", (username,)) con.commit() -def add_letter(user_id: int, letter_title: str, letter_content: str): +def add_letter(user_id: int, letter_title: str, letter_content: str) -> None: with connect() as con: cur = con.cursor() cur.execute("INSERT INTO letter_data(user_id, letter_name, letter_data) VALUES (%s, %s, %s)", @@ -94,7 +96,7 @@ def add_letter(user_id: int, letter_title: str, letter_content: str): con.commit() -def edit_letter(letter_id: int, letter_title: str, letter_content: str): +def edit_letter(letter_id: int, letter_title: str, letter_content: str) -> None: with connect() as con: cur = con.cursor() cur.execute("UPDATE letter_data SET letter_name = %s, letter_data = %s WHERE id = %s", @@ -102,7 +104,7 @@ def edit_letter(letter_id: int, letter_title: str, letter_content: str): con.commit() -def get_user_letters(user_id: int) -> [Letter]: +def get_user_letters(user_id: int) -> list[Letter]: with connect() as con: cur = con.cursor() cur.execute("SELECT id, letter_name, letter_data FROM letter_data WHERE user_id = %s", (str(user_id),)) @@ -110,24 +112,24 @@ def get_user_letters(user_id: int) -> [Letter]: def get_user(email: str) -> Optional[User]: - user = __get_user(email) - if user: - return User(user.id, user.email) + user_with_hash = __get_user_with_hash(email) + if user_with_hash: + return user_with_hash.user return None -def __get_user(email: str) -> Optional[UserWithHash]: +def __get_user_with_hash(email: str) -> Optional[UserWithHash]: """ :param email: :return: User without their password_hash """ with connect() as con: cur = con.cursor() - cur.execute("SELECT id, password FROM users WHERE users.email ILIKE %s", (email,)) + cur.execute("SELECT id, password, tier FROM users WHERE users.email ILIKE %s", (email,)) row = cur.fetchone() if row: - user_id, password = row - return UserWithHash(user_id, email, password) + user_id, password, tier = row + return UserWithHash(User(user_id, email, tier), password) return None @@ -150,14 +152,14 @@ def initiate_password_reset(email: str) -> Optional[UUID]: return reset_id -def delete_reset_row(reset_id: UUID): +def delete_reset_row(reset_id: UUID) -> None: with connect() as con: cur = con.cursor() cur.execute("DELETE FROM resets WHERE id = %s", (reset_id,)) con.commit() -def complete_reset(reset_id: str, new_password: str): +def complete_reset(reset_id: str, new_password: str) -> bool: with connect() as con: cur = con.cursor() cur.execute("SELECT reset_time, user_id FROM resets WHERE id = %s", (reset_id,)) diff --git a/undercover/email.py b/undercover/email.py index 1ca857f..d362e6a 100644 --- a/undercover/email.py +++ b/undercover/email.py @@ -1,4 +1,3 @@ -import json import os import sys diff --git a/undercover/fallback.py b/undercover/fallback.py index 079b42e..f3324ff 100644 --- a/undercover/fallback.py +++ b/undercover/fallback.py @@ -8,22 +8,22 @@ class MockConnection: mock_cursor.fetchone = lambda *a: None mock_cursor.fetchall = lambda *a: [] - def __enter__(self, *a): + def __enter__(self, *a) -> object: return self - def __exit__(self, *a): + def __exit__(self, *a) -> None: pass - def cursor(self): + def cursor(self) -> object: return self.mock_cursor - def commit(self, *a): + def commit(self, *a) -> None: pass class MockCreator: @staticmethod - def create(data=None, *a): + def create(data=None, *a) -> object: print(json.JSONEncoder(indent=2).encode(data)) result = types.SimpleNamespace() result.status_code = 200 diff --git a/undercover/pdf_builder.py b/undercover/pdf_builder.py index bac4120..450ae27 100644 --- a/undercover/pdf_builder.py +++ b/undercover/pdf_builder.py @@ -7,28 +7,19 @@ from dataclasses import dataclass from flask import send_from_directory, Response -def get_unique(): - import uuid - unique = str(uuid.uuid1().hex) - return unique - - -def get_datetime(): - from datetime import datetime - now = datetime.now() - return now.strftime("%Y-%m-%d %H:%M:%S") - - root_dir = os.path.dirname(os.getcwd()) proj_dir = root_dir + '/undercover/' output_dir = proj_dir + 'outputs/' -base_tex_text = open(proj_dir + "/letter_templates/base.tex", "r").read() + +base_tex_text = open(proj_dir + '/letter_templates/base.tex', 'r').read() @dataclass class CLData: - selectedLetter: int # Metadata + # Metadata: + selectedLetter: int + # Form Data: username: str company: str jobAndPronoun: str @@ -37,81 +28,91 @@ class CLData: closingText: str body: str - def get_pairs(self): + def get_pairs(self) -> list[(str, str)]: return [ - ("username", self.username), - ("company", self.company), - ("jobAndPronoun", self.jobAndPronoun), - ("skillTypes", self.skillTypes), - ("mySkills", self.mySkills), - ("closingText", self.closingText), - ("body", self.body), + ('username', self.username), + ('company', self.company), + ('jobAndPronoun', self.jobAndPronoun), + ('skillTypes', self.skillTypes), + ('mySkills', self.mySkills), + ('closingText', self.closingText), + ('body', self.body), ] def generate_pdf(self) -> Response: """ - :return: Response when successful + :return: Response with the pdf attached as a download when successful :raise ValueError: e.args[0] is a list of error strings, if generation fails """ import threading unique_id = get_unique() - unique_file = output_dir + unique_id + ".tex" - f = open(unique_file, "w") + unique_file = output_dir + unique_id + '.tex' + f = open(unique_file, 'w') for pair in self.get_pairs(): - f.write("\\def \\" + pair[0] + "{" + pair[1] + "}\n") + f.write('\\def \\' + pair[0] + '{' + pair[1] + '}\n') f.write(base_tex_text) f.close() - output_arg = "-jobname=outputs/" + unique_id + " " + unique_file - com = "pdflatex -halt-on-error " + output_arg - build_text = "[" + get_datetime() + "] Building for " + unique_id + " " + com = f'pdflatex -halt-on-error -jobname=outputs/{unique_id} {unique_file}' result = subprocess.run( ['bash', '-c', com], stdout=subprocess.PIPE, text=True ) - if result.returncode == 0: - print(build_text + "[SUCCESS]") - - result = subprocess.run( - [ - 'gs', - '-sDEVICE=pdfwrite', - '-dCompatibilityLevel=1.5', - '-dNOPAUSE', - '-dQUIET', - '-dBATCH', - '-dPrinted=false', - '-sOutputFile=outputs/' + unique_id + '.compressed.pdf', - 'outputs/' + unique_id + '.pdf' - ], - stdout=subprocess.PIPE, - text=True - ) - - threading.Timer(60 * 5, cleanup, [output_dir + unique_id]).start() - - output_file = unique_id - if result.returncode == 0: - output_file += ".compressed.pdf" - else: - output_file += ".pdf" - - return send_from_directory( - output_dir, - output_file, - download_name=self.username.replace(" ", "") + "_CoverLetter.pdf", - as_attachment=True - ) - else: - print(build_text + "[FAIL]") + build_text = f'[{get_datetime()}] Building for {unique_id} ' + if result.returncode != 0: + print(build_text + '[FAIL]') # Collect output but delete boilerplate text - errors = list(map(str.strip, result.stdout.split("\n"))) + errors = list(map(str.strip, result.stdout.split('\n'))) del errors[:13] del errors[-2:] raise ValueError(errors) + print(build_text + '[SUCCESS]') + + result = subprocess.run( + [ + 'gs', + '-sDEVICE=pdfwrite', + '-dCompatibilityLevel=1.5', + '-dNOPAUSE', + '-dQUIET', + '-dBATCH', + '-dPrinted=false', + '-sOutputFile=outputs/' + unique_id + '.compressed.pdf', + 'outputs/' + unique_id + '.pdf' + ], + stdout=subprocess.PIPE, + text=True + ) + + threading.Timer(60 * 5, cleanup, [output_dir + unique_id]).start() + + extension = 'compressed.pdf' if result.returncode == 0 else 'pdf' + output_file = f'{unique_id}.{extension}' + + return send_from_directory( + output_dir, + output_file, + download_name=self.username.replace(' ', '') + '_CoverLetter.pdf', + as_attachment=True + ) + + +def cleanup(unique) -> None: + subprocess.run(['bash', '-c', 'rm ' + unique + '.*']) + + +def get_unique() -> str: + import uuid + unique = str(uuid.uuid1().hex) + return unique + + +def get_datetime() -> str: + from datetime import datetime + now = datetime.now() + return now.strftime('%Y-%m-%d %H:%M:%S') + -def cleanup(unique): - subprocess.run(['bash', '-c', "rm " + unique + ".*"]) diff --git a/undercover/routes.py b/undercover/routes.py index 15fd95e..79ca386 100644 --- a/undercover/routes.py +++ b/undercover/routes.py @@ -6,7 +6,7 @@ import subprocess import threading import urllib.parse -from flask import (Blueprint, render_template, request, make_response, session, redirect, jsonify) +from flask import (Blueprint, render_template, request, make_response, session, redirect, jsonify, Response) from wtforms import Form, SelectField, StringField, TextAreaField, validators from email_validator import validate_email, EmailNotValidError @@ -70,8 +70,27 @@ class CLForm(Form): ) ) + def to_cl_data(self) -> CLData: + selected_letter = self.letterName.data or '1' -def render_index(form=CLForm(), error=None, status=200, letter_errors=None): + return CLData( + selectedLetter=int(selected_letter) - 1, + username=self.username.data, + company=self.company.data, + jobAndPronoun=self.jobAndPronoun.data, + skillTypes=self.skillTypes.data, + mySkills=self.mySkills.data, + closingText=self.closingText.data, + body=self.body.data, + ) + + +def render_index( + form: CLForm = CLForm(), + error: str = None, + status: int = 200, + letter_errors: list[str] = None +) -> Response: return make_response( render_template( 'writing.jinja2', @@ -83,7 +102,7 @@ def render_index(form=CLForm(), error=None, status=200, letter_errors=None): @writing_blueprint.route('/login', methods=['POST', 'GET']) -def login(): +def login() -> Response | str: if request.method == 'POST': username = request.form['login'] if db.login(username, request.form['password']): @@ -101,13 +120,13 @@ def login(): @writing_blueprint.route('/logout', methods=['POST', 'GET']) -def logout(): +def logout() -> Response: session.pop('username', None) return redirect('/') @writing_blueprint.route('/', methods=['GET']) -def index_get(): +def index_get() -> Response: email_address = session.get('username') if not email_address: return render_index() @@ -149,12 +168,12 @@ def index_get(): @writing_blueprint.route('/create_account', methods=['GET']) -def create_account_page(): +def create_account_page() -> str: return render_template('create_account.jinja2') @writing_blueprint.route('/create_account', methods=['POST']) -def create_account(): +def create_account() -> Response: email_address = request.form['login'] try: validate_email(email_address, check_deliverability=True) @@ -170,7 +189,7 @@ def create_account(): @writing_blueprint.route('/reset', methods=['POST', 'GET']) -def reset_password(): +def reset_password() -> Response | str: if request.method == 'POST': email_address = request.form.get('login') existing_reset_id = request.form.get('reset_id') @@ -200,14 +219,14 @@ def reset_password(): @writing_blueprint.route('/dbtest', methods=['GET']) -def db_test_get(): +def db_test_get() -> Response: response = make_response(db.get_user_letters(1)[0].contents, 200) response.mimetype = "text/plain" return response @writing_blueprint.route('/update', methods=['POST']) -def update_get(): +def update_get() -> Response: expected_token = os.environ['GITLAB_HOOK_TOKEN'] given_token = request.headers['X-Gitlab-Token'] event_type = request.headers['X-Gitlab-Event'] @@ -222,7 +241,7 @@ def update_get(): return make_response("", 404) -def git_update(): +def git_update() -> None: script = os.environ['UPDATE_SCRIPT_PATH'] if not script: return @@ -230,21 +249,10 @@ def git_update(): @writing_blueprint.route('/', methods=['POST']) -def generate_pdf(): +def generate_pdf() -> Response: form = CLForm(request.form) if form.validate(): - selected_letter = form.letterName.data or '1' - - data = CLData( - selectedLetter=int(selected_letter) - 1, - username=form.username.data, - company=form.company.data, - jobAndPronoun=form.jobAndPronoun.data, - skillTypes=form.skillTypes.data, - mySkills=form.mySkills.data, - closingText=form.closingText.data, - body=form.body.data, - ) + data = form.to_cl_data() email_address = session.get('username') if email_address: @@ -270,4 +278,3 @@ def generate_pdf(): return resp return render_index(form=form) -