UnderCover/app/routes.py

323 lines
11 KiB
Python

# Copyright Sage Vaillancourt 2021
import json
import os
import subprocess
import threading
import urllib.parse
from flask import Blueprint, render_template, request, make_response, session, redirect, jsonify, Response, current_app
from wtforms import Form, SelectField, StringField, TextAreaField, validators
from email_validator import validate_email, EmailNotValidError
from app import db, email
from app.pdf_builder import CLData
writing_blueprint = Blueprint('writing', __name__)
class CLForm(Form):
def __init__(self, form: dict[str, str] = None, email_address: str = None, **kwargs):
super().__init__(form, **kwargs)
if email_address:
user = db.get_user(email_address)
letters = db.get_user_letters(user.id)
self.letterName.choices = letter_choices(letters)
letterName = SelectField(
'Select Template:',
[validators.optional()],
choices=[(1, 'LETTER TITLE')]
)
username = StringField(
'Your Name:',
[validators.Length(min=4, max=99)],
default="Sage"
)
company = StringField(
'Company:',
[validators.Length(min=2, max=99)],
default="BananaCorp"
)
jobAndPronoun = StringField(
'Job and Pronoun (a/an):',
[validators.Length(min=4, max=99)],
default="a banana stocker"
)
skillTypes = StringField(
'Skill Type:',
[validators.Length(min=2, max=99)],
default="practical"
)
mySkills = StringField(
'My Skills:',
[validators.Length(min=2, max=99)],
default="stocking bananas"
)
closingText = TextAreaField(
'Closing Text:',
[validators.Length(min=2, max=99)],
default="I look forward to hearing from you"
)
body = TextAreaField(
'Body:',
[validators.Length(min=4, max=9999)],
default=(
"My name is {\\username}. I'm excited for the opportunity to work as "
"{\\jobAndPronoun} with your company. I think with my {\\skillTypes} knowledge "
"of {\\mySkills}, there's a lot I could contribute to your team.\n\n"
"I am passionate about what I do, and I believe we would work well together.\n\n"
"Thank you for your consideration."
)
)
def to_cl_data(self) -> CLData:
return CLData(
selectedLetter=int(self.letterName.data or '1') - 1,
username=self.username.data,
company=self.company.data,
jobAndPronoun=self.jobAndPronoun.data,
skillTypes=self.skillTypes.data,
mySkills=self.mySkills.data,
closingText=self.closingText.data,
body=self.body.data,
)
def render_index(
form: CLForm = CLForm(),
error: str = None,
status: int = 200,
letter_errors: list[str] = None
) -> Response:
return make_response(
render_template(
'writing.jinja2',
form=form,
username=session.get('username'),
error=error,
letter_errors=letter_errors,
), status)
@writing_blueprint.route('/login', methods=['POST'])
def login() -> Response | str:
username = request.form['login']
if db.login(username, request.form['password']):
session['username'] = username
return redirect('/')
return render_index(error="Invalid username or password", status=401)
@writing_blueprint.route('/test_error', methods=['GET'])
def error_test() -> Response:
raise Exception("Test error")
@writing_blueprint.route('/create_account', methods=['GET'])
def create_account_redirect() -> Response:
return redirect('/')
@writing_blueprint.route('/create_account', methods=['POST'])
def create_account() -> Response:
email_address = request.form['login']
password = request.form['password']
if password != request.form['confirm-password']:
return render_index(error="Password and confirm password must match!", status=400)
password_len = len(password)
if password_len < 8 or password_len > 64:
return render_index(error="Password must be between 8 and 64 characters", status=400)
try:
validate_email(email_address, check_deliverability=True)
except EmailNotValidError as e:
return render_index(error=str(e), status=400)
if db.get_user(email_address):
return render_index(error="A user with that email already exists!", status=400)
db.add_user(email_address, password)
session['username'] = email_address
return redirect('/')
@writing_blueprint.route('/logout', methods=['POST', 'GET'])
def logout() -> Response:
session.pop('username', None)
return redirect('/')
FREE_TIER_TEMPLATES = 2
@writing_blueprint.route('/add_letter')
def add_letter() -> Response:
email_address = session.get('username')
if not email_address:
return render_index()
user = db.get_user(email_address)
existing_letter_count = len(db.get_user_letters(user.id))
if user.in_free_tier() and existing_letter_count >= FREE_TIER_TEMPLATES:
return redirect('/?error=template_limit')
new_letter_name = f'Letter{existing_letter_count + 1}'
default_form_json = jsonify(CLForm().to_cl_data()).get_data(True)
db.add_letter(user.id, new_letter_name, default_form_json)
return redirect(f'/?letter_name={new_letter_name}')
def letter_choices(letters: list[db.Letter]) -> list[(int, str)]:
return [(i + 1, letter.title) for i, letter in enumerate(letters)]
@writing_blueprint.route('/', methods=['GET'])
def index_get() -> Response:
email_address = session.get('username')
if not email_address:
return render_index()
form = CLForm()
user = db.get_user(email_address)
letters = db.get_user_letters(user.id)
letter_names = letter_choices(letters)
if len(letter_names) == 0:
return render_index()
form.letterName.choices = letter_names
form.letterName.data = 1
selected_letter = request.args.get('letter_name')
error = None
if request.args.get('error') == 'template_limit':
error = f'A maximum of {FREE_TIER_TEMPLATES} templates are available to each user.'
if selected_letter:
for i, letter in enumerate(letters):
if letter.title == selected_letter:
form.letterName.data = i + 1
break
# TODO: Load this data more dynamically
# I.e. use a dictionary instead of explicitly-defined fields
data = json.loads(letters[form.letterName.data - 1].contents)
# Ensures default value is set
form.letterName.process_data(form.letterName.data)
form.company.data = data['company']
form.body.data = data['body']
form.closingText.data = data['closingText']
form.jobAndPronoun.data = data['jobAndPronoun']
form.mySkills.data = data['mySkills']
form.skillTypes.data = data['skillTypes']
form.username.data = data['username']
return render_index(form=form, error=error)
@writing_blueprint.route('/reset', methods=['POST', 'GET'])
def reset_password() -> Response | str:
if request.method == 'POST':
email_address = request.form.get('login')
existing_reset_id = request.form.get('reset_id')
if email_address:
reset_id = db.initiate_password_reset(email_address)
if reset_id:
if not email.send_password_reset(email_address, 'https://undercover.cafe/reset?id=' + str(reset_id)):
return render_index(error="Failed to send reset email. Please try again later.", status=500)
elif existing_reset_id:
new_password = request.form['password']
if not db.complete_reset(existing_reset_id, new_password):
return render_index(error="Password reset failed. Your reset link may have expired.", status=500)
# TODO: Log in?
return redirect('/')
query_reset_id = request.args.get('id')
# TODO: Add password validation
return f'''
<form formaction="/reset" method="post">
<p><input type=hidden name=reset_id value="{query_reset_id}"></p>
<label>New Password:</label>
<p><input type=password name=password></p>
<p><input type=submit value="Save Password"></p>
</form>
'''
@writing_blueprint.route('/dbtest', methods=['GET'])
def db_test_get() -> Response:
response = make_response(db.get_user_letters(1)[0].contents, 200)
response.mimetype = "text/plain"
return response
@writing_blueprint.route('/update', methods=['POST'])
def update_get() -> Response:
expected_token = os.environ['GITLAB_HOOK_TOKEN']
given_token = request.headers['X-Gitlab-Token']
event_type = request.headers['X-Gitlab-Event']
if expected_token == given_token and event_type == "Push Hook":
print("Update notification received.")
response = make_response("", 200)
response.mimetype = "text/plain"
threading.Timer(5, git_update, []).start()
return response
else:
return make_response("", 404)
def git_update() -> None:
script = os.environ['UPDATE_SCRIPT_PATH']
if not script:
return
subprocess.Popen(['bash', '-c', "test -f " + script + " && " + script])
@writing_blueprint.route('/status', methods=['GET'])
def status_page() -> Response:
status_message = f"Currently running on '{os.environ.get('UNDERCOVER_SERVER_NAME')}' server"
git_commands = 'git log -1 --pretty=%B; git rev-parse --short HEAD'
current_git_hash = subprocess.check_output([git_commands], shell=True, universal_newlines=True)
return make_response(render_template('error.jinja2', status=200, error_text=status_message, extra_text='Latest commit: ' + current_git_hash), 200)
@writing_blueprint.route('/', methods=['POST'])
def generate_pdf() -> Response:
email_address = session.get('username')
form = CLForm(request.form, email_address=email_address)
if not form.validate():
return render_index(form=form)
data = form.to_cl_data()
if email_address:
user = db.get_user(email_address)
letters = db.get_user_letters(user.id)
letter_json = jsonify(data).get_data(True)
if len(letters) == 0:
db.add_letter(user.id, 'Letter1', letter_json)
else:
letter = letters[data.selectedLetter]
# TODO: Support title editing
db.edit_letter(letter.id, letter.title, letter_json)
try:
resp = data.generate_pdf()
except ValueError as e:
resp = render_index(form=form, letter_errors=e.args[0])
# Save entered data as cookies on user's machine
for pair in data.get_pairs():
resp.set_cookie(pair[0], urllib.parse.quote(pair[1]))
return resp