A good bit of refactoring.

Also start adding framework for user "tiers".
This refactoring is mostly type-hints and f-strings.
Some functions were also re-organized.
This commit is contained in:
Sage Vaillancourt 2022-09-28 17:53:32 -04:00
parent f1090abbc4
commit b86db5c479
6 changed files with 130 additions and 121 deletions

View File

@ -9,7 +9,7 @@ from flask_minify import Minify
import undercover.routes
def create_app(test_config=None):
def create_app(test_config=None) -> Flask:
app = Flask(__name__, instance_relative_config=True)
Minify(app=app, html=True, js=True, cssless=True)
secret_key = os.environ.get('UNDERCOVER_SECRET_KEY')

View File

@ -23,12 +23,15 @@ class Letter:
class User:
id: int
email: str
tier: int
def in_free_tier(self) -> bool:
return self.tier == 1
@dataclass
class UserWithHash:
id: int
email: str
user: User
password_hash: str
@ -40,7 +43,7 @@ db_user = os.environ.get('UNDERCOVER_POSTGRES_USER')
db_available = host and db_name and port and db_user and os.environ.get('UNDERCOVER_POSTGRES_PASSWORD')
if db_available:
def connect():
def connect() -> psycopg.Connection:
return psycopg.connect(
host=host,
dbname=db_name,
@ -51,27 +54,26 @@ else:
sys.stderr.write('Database login not configured: DB access is disabled.\n')
sys.stderr.write(' To enable, ensure UNDERCOVER_POSTGRES_{HOST,DBNAME,PORT,USER,PASSWORD} are set.\n')
def connect():
def connect() -> object:
return MockConnection()
def login(user_email: str, password: str):
def login(user_email: str, password: str) -> bool:
pw_bytes: bytes = password.encode('utf-8')
user = __get_user(user_email)
user = __get_user_with_hash(user_email)
if user:
return bcrypt.checkpw(pw_bytes, user.password_hash.encode('utf-8'))
return False
def __gen_pw_hash(password: str):
def __gen_pw_hash(password: str) -> str:
pw_bytes = password.encode('utf-8')
salt = bcrypt.gensalt()
pw_hash = bcrypt.hashpw(pw_bytes, salt)
return pw_hash.decode('utf-8')
def add_user(username: str, password: str):
def add_user(username: str, password: str) -> None:
pw_hash = __gen_pw_hash(password)
with connect() as con:
cur = con.cursor()
@ -79,14 +81,14 @@ def add_user(username: str, password: str):
con.commit()
def delete_user(username: str):
def delete_user(username: str) -> None:
with connect() as con:
cur = con.cursor()
cur.execute("DELETE FROM users WHERE email = %s", (username,))
con.commit()
def add_letter(user_id: int, letter_title: str, letter_content: str):
def add_letter(user_id: int, letter_title: str, letter_content: str) -> None:
with connect() as con:
cur = con.cursor()
cur.execute("INSERT INTO letter_data(user_id, letter_name, letter_data) VALUES (%s, %s, %s)",
@ -94,7 +96,7 @@ def add_letter(user_id: int, letter_title: str, letter_content: str):
con.commit()
def edit_letter(letter_id: int, letter_title: str, letter_content: str):
def edit_letter(letter_id: int, letter_title: str, letter_content: str) -> None:
with connect() as con:
cur = con.cursor()
cur.execute("UPDATE letter_data SET letter_name = %s, letter_data = %s WHERE id = %s",
@ -102,7 +104,7 @@ def edit_letter(letter_id: int, letter_title: str, letter_content: str):
con.commit()
def get_user_letters(user_id: int) -> [Letter]:
def get_user_letters(user_id: int) -> list[Letter]:
with connect() as con:
cur = con.cursor()
cur.execute("SELECT id, letter_name, letter_data FROM letter_data WHERE user_id = %s", (str(user_id),))
@ -110,24 +112,24 @@ def get_user_letters(user_id: int) -> [Letter]:
def get_user(email: str) -> Optional[User]:
user = __get_user(email)
if user:
return User(user.id, user.email)
user_with_hash = __get_user_with_hash(email)
if user_with_hash:
return user_with_hash.user
return None
def __get_user(email: str) -> Optional[UserWithHash]:
def __get_user_with_hash(email: str) -> Optional[UserWithHash]:
"""
:param email:
:return: User without their password_hash
"""
with connect() as con:
cur = con.cursor()
cur.execute("SELECT id, password FROM users WHERE users.email ILIKE %s", (email,))
cur.execute("SELECT id, password, tier FROM users WHERE users.email ILIKE %s", (email,))
row = cur.fetchone()
if row:
user_id, password = row
return UserWithHash(user_id, email, password)
user_id, password, tier = row
return UserWithHash(User(user_id, email, tier), password)
return None
@ -150,14 +152,14 @@ def initiate_password_reset(email: str) -> Optional[UUID]:
return reset_id
def delete_reset_row(reset_id: UUID):
def delete_reset_row(reset_id: UUID) -> None:
with connect() as con:
cur = con.cursor()
cur.execute("DELETE FROM resets WHERE id = %s", (reset_id,))
con.commit()
def complete_reset(reset_id: str, new_password: str):
def complete_reset(reset_id: str, new_password: str) -> bool:
with connect() as con:
cur = con.cursor()
cur.execute("SELECT reset_time, user_id FROM resets WHERE id = %s", (reset_id,))

View File

@ -1,4 +1,3 @@
import json
import os
import sys

View File

@ -8,22 +8,22 @@ class MockConnection:
mock_cursor.fetchone = lambda *a: None
mock_cursor.fetchall = lambda *a: []
def __enter__(self, *a):
def __enter__(self, *a) -> object:
return self
def __exit__(self, *a):
def __exit__(self, *a) -> None:
pass
def cursor(self):
def cursor(self) -> object:
return self.mock_cursor
def commit(self, *a):
def commit(self, *a) -> None:
pass
class MockCreator:
@staticmethod
def create(data=None, *a):
def create(data=None, *a) -> object:
print(json.JSONEncoder(indent=2).encode(data))
result = types.SimpleNamespace()
result.status_code = 200

View File

@ -7,28 +7,19 @@ from dataclasses import dataclass
from flask import send_from_directory, Response
def get_unique():
import uuid
unique = str(uuid.uuid1().hex)
return unique
def get_datetime():
from datetime import datetime
now = datetime.now()
return now.strftime("%Y-%m-%d %H:%M:%S")
root_dir = os.path.dirname(os.getcwd())
proj_dir = root_dir + '/undercover/'
output_dir = proj_dir + 'outputs/'
base_tex_text = open(proj_dir + "/letter_templates/base.tex", "r").read()
base_tex_text = open(proj_dir + '/letter_templates/base.tex', 'r').read()
@dataclass
class CLData:
selectedLetter: int # Metadata
# Metadata:
selectedLetter: int
# Form Data:
username: str
company: str
jobAndPronoun: str
@ -37,42 +28,48 @@ class CLData:
closingText: str
body: str
def get_pairs(self):
def get_pairs(self) -> list[(str, str)]:
return [
("username", self.username),
("company", self.company),
("jobAndPronoun", self.jobAndPronoun),
("skillTypes", self.skillTypes),
("mySkills", self.mySkills),
("closingText", self.closingText),
("body", self.body),
('username', self.username),
('company', self.company),
('jobAndPronoun', self.jobAndPronoun),
('skillTypes', self.skillTypes),
('mySkills', self.mySkills),
('closingText', self.closingText),
('body', self.body),
]
def generate_pdf(self) -> Response:
"""
:return: Response when successful
:return: Response with the pdf attached as a download when successful
:raise ValueError: e.args[0] is a list of error strings, if generation fails
"""
import threading
unique_id = get_unique()
unique_file = output_dir + unique_id + ".tex"
f = open(unique_file, "w")
unique_file = output_dir + unique_id + '.tex'
f = open(unique_file, 'w')
for pair in self.get_pairs():
f.write("\\def \\" + pair[0] + "{" + pair[1] + "}\n")
f.write('\\def \\' + pair[0] + '{' + pair[1] + '}\n')
f.write(base_tex_text)
f.close()
output_arg = "-jobname=outputs/" + unique_id + " " + unique_file
com = "pdflatex -halt-on-error " + output_arg
build_text = "[" + get_datetime() + "] Building for " + unique_id + " "
com = f'pdflatex -halt-on-error -jobname=outputs/{unique_id} {unique_file}'
result = subprocess.run(
['bash', '-c', com],
stdout=subprocess.PIPE,
text=True
)
if result.returncode == 0:
print(build_text + "[SUCCESS]")
build_text = f'[{get_datetime()}] Building for {unique_id} '
if result.returncode != 0:
print(build_text + '[FAIL]')
# Collect output but delete boilerplate text
errors = list(map(str.strip, result.stdout.split('\n')))
del errors[:13]
del errors[-2:]
raise ValueError(errors)
print(build_text + '[SUCCESS]')
result = subprocess.run(
[
@ -92,26 +89,30 @@ class CLData:
threading.Timer(60 * 5, cleanup, [output_dir + unique_id]).start()
output_file = unique_id
if result.returncode == 0:
output_file += ".compressed.pdf"
else:
output_file += ".pdf"
extension = 'compressed.pdf' if result.returncode == 0 else 'pdf'
output_file = f'{unique_id}.{extension}'
return send_from_directory(
output_dir,
output_file,
download_name=self.username.replace(" ", "") + "_CoverLetter.pdf",
download_name=self.username.replace(' ', '') + '_CoverLetter.pdf',
as_attachment=True
)
else:
print(build_text + "[FAIL]")
# Collect output but delete boilerplate text
errors = list(map(str.strip, result.stdout.split("\n")))
del errors[:13]
del errors[-2:]
raise ValueError(errors)
def cleanup(unique):
subprocess.run(['bash', '-c', "rm " + unique + ".*"])
def cleanup(unique) -> None:
subprocess.run(['bash', '-c', 'rm ' + unique + '.*'])
def get_unique() -> str:
import uuid
unique = str(uuid.uuid1().hex)
return unique
def get_datetime() -> str:
from datetime import datetime
now = datetime.now()
return now.strftime('%Y-%m-%d %H:%M:%S')

View File

@ -6,7 +6,7 @@ import subprocess
import threading
import urllib.parse
from flask import (Blueprint, render_template, request, make_response, session, redirect, jsonify)
from flask import (Blueprint, render_template, request, make_response, session, redirect, jsonify, Response)
from wtforms import Form, SelectField, StringField, TextAreaField, validators
from email_validator import validate_email, EmailNotValidError
@ -70,8 +70,27 @@ class CLForm(Form):
)
)
def to_cl_data(self) -> CLData:
selected_letter = self.letterName.data or '1'
def render_index(form=CLForm(), error=None, status=200, letter_errors=None):
return CLData(
selectedLetter=int(selected_letter) - 1,
username=self.username.data,
company=self.company.data,
jobAndPronoun=self.jobAndPronoun.data,
skillTypes=self.skillTypes.data,
mySkills=self.mySkills.data,
closingText=self.closingText.data,
body=self.body.data,
)
def render_index(
form: CLForm = CLForm(),
error: str = None,
status: int = 200,
letter_errors: list[str] = None
) -> Response:
return make_response(
render_template(
'writing.jinja2',
@ -83,7 +102,7 @@ def render_index(form=CLForm(), error=None, status=200, letter_errors=None):
@writing_blueprint.route('/login', methods=['POST', 'GET'])
def login():
def login() -> Response | str:
if request.method == 'POST':
username = request.form['login']
if db.login(username, request.form['password']):
@ -101,13 +120,13 @@ def login():
@writing_blueprint.route('/logout', methods=['POST', 'GET'])
def logout():
def logout() -> Response:
session.pop('username', None)
return redirect('/')
@writing_blueprint.route('/', methods=['GET'])
def index_get():
def index_get() -> Response:
email_address = session.get('username')
if not email_address:
return render_index()
@ -149,12 +168,12 @@ def index_get():
@writing_blueprint.route('/create_account', methods=['GET'])
def create_account_page():
def create_account_page() -> str:
return render_template('create_account.jinja2')
@writing_blueprint.route('/create_account', methods=['POST'])
def create_account():
def create_account() -> Response:
email_address = request.form['login']
try:
validate_email(email_address, check_deliverability=True)
@ -170,7 +189,7 @@ def create_account():
@writing_blueprint.route('/reset', methods=['POST', 'GET'])
def reset_password():
def reset_password() -> Response | str:
if request.method == 'POST':
email_address = request.form.get('login')
existing_reset_id = request.form.get('reset_id')
@ -200,14 +219,14 @@ def reset_password():
@writing_blueprint.route('/dbtest', methods=['GET'])
def db_test_get():
def db_test_get() -> Response:
response = make_response(db.get_user_letters(1)[0].contents, 200)
response.mimetype = "text/plain"
return response
@writing_blueprint.route('/update', methods=['POST'])
def update_get():
def update_get() -> Response:
expected_token = os.environ['GITLAB_HOOK_TOKEN']
given_token = request.headers['X-Gitlab-Token']
event_type = request.headers['X-Gitlab-Event']
@ -222,7 +241,7 @@ def update_get():
return make_response("", 404)
def git_update():
def git_update() -> None:
script = os.environ['UPDATE_SCRIPT_PATH']
if not script:
return
@ -230,21 +249,10 @@ def git_update():
@writing_blueprint.route('/', methods=['POST'])
def generate_pdf():
def generate_pdf() -> Response:
form = CLForm(request.form)
if form.validate():
selected_letter = form.letterName.data or '1'
data = CLData(
selectedLetter=int(selected_letter) - 1,
username=form.username.data,
company=form.company.data,
jobAndPronoun=form.jobAndPronoun.data,
skillTypes=form.skillTypes.data,
mySkills=form.mySkills.data,
closingText=form.closingText.data,
body=form.body.data,
)
data = form.to_cl_data()
email_address = session.get('username')
if email_address:
@ -270,4 +278,3 @@ def generate_pdf():
return resp
return render_index(form=form)