312 lines
10 KiB
Python
312 lines
10 KiB
Python
# Copyright Sage Vaillancourt 2021
|
|
|
|
import json
|
|
import os
|
|
import subprocess
|
|
import threading
|
|
import urllib.parse
|
|
|
|
from flask import Blueprint, render_template, request, make_response, session, redirect, jsonify, Response, current_app
|
|
from wtforms import Form, SelectField, StringField, TextAreaField, validators
|
|
from email_validator import validate_email, EmailNotValidError
|
|
|
|
from app import db, email
|
|
from app.pdf_builder import CLData
|
|
|
|
writing_blueprint = Blueprint('writing', __name__)
|
|
|
|
|
|
class CLForm(Form):
|
|
def __init__(self, form: dict[str, str] = None, email_address: str = None, **kwargs):
|
|
super().__init__(form, **kwargs)
|
|
if email_address:
|
|
user = db.get_user(email_address)
|
|
letters = db.get_user_letters(user.id)
|
|
self.letterName.choices = letter_choices(letters)
|
|
|
|
letterName = SelectField(
|
|
'Select Template:',
|
|
[validators.optional()],
|
|
choices=[(1, 'LETTER TITLE')]
|
|
)
|
|
|
|
username = StringField(
|
|
'Your Name:',
|
|
[validators.Length(min=4, max=99)],
|
|
default="Sage"
|
|
)
|
|
company = StringField(
|
|
'Company:',
|
|
[validators.Length(min=2, max=99)],
|
|
default="BananaCorp"
|
|
)
|
|
jobAndPronoun = StringField(
|
|
'Job and Pronoun (a/an):',
|
|
[validators.Length(min=4, max=99)],
|
|
default="a banana stocker"
|
|
)
|
|
skillTypes = StringField(
|
|
'Skill Type:',
|
|
[validators.Length(min=2, max=99)],
|
|
default="practical"
|
|
)
|
|
mySkills = StringField(
|
|
'My Skills:',
|
|
[validators.Length(min=2, max=99)],
|
|
default="stocking bananas"
|
|
)
|
|
|
|
closingText = TextAreaField(
|
|
'Closing Text:',
|
|
[validators.Length(min=2, max=99)],
|
|
default="I look forward to hearing from you"
|
|
)
|
|
|
|
body = TextAreaField(
|
|
'Body:',
|
|
[validators.Length(min=4, max=9999)],
|
|
default=(
|
|
"My name is {\\username}. I'm excited for the opportunity to work as "
|
|
"{\\jobAndPronoun} with your company. I think with my {\\skillTypes} knowledge "
|
|
"of {\\mySkills}, there's a lot I could contribute to your team.\n\n"
|
|
|
|
"I am passionate about what I do, and I believe we would work well together.\n\n"
|
|
|
|
"Thank you for your consideration."
|
|
)
|
|
)
|
|
|
|
def to_cl_data(self) -> CLData:
|
|
return CLData(
|
|
selectedLetter=int(self.letterName.data or '1') - 1,
|
|
username=self.username.data,
|
|
company=self.company.data,
|
|
jobAndPronoun=self.jobAndPronoun.data,
|
|
skillTypes=self.skillTypes.data,
|
|
mySkills=self.mySkills.data,
|
|
closingText=self.closingText.data,
|
|
body=self.body.data,
|
|
)
|
|
|
|
|
|
def render_index(
|
|
form: CLForm = CLForm(),
|
|
error: str = None,
|
|
status: int = 200,
|
|
letter_errors: list[str] = None
|
|
) -> Response:
|
|
return make_response(
|
|
render_template(
|
|
'writing.jinja2',
|
|
form=form,
|
|
username=session.get('username'),
|
|
error=error,
|
|
letter_errors=letter_errors,
|
|
), status)
|
|
|
|
|
|
@writing_blueprint.route('/login', methods=['POST'])
|
|
def login() -> Response | str:
|
|
username = request.form['login']
|
|
if db.login(username, request.form['password']):
|
|
session['username'] = username
|
|
return redirect('/')
|
|
|
|
return render_index(error="Invalid username or password", status=401)
|
|
|
|
|
|
@writing_blueprint.route('/test_error', methods=['GET'])
|
|
def error_test() -> Response:
|
|
raise Exception("Test error")
|
|
|
|
|
|
@writing_blueprint.route('/create_account', methods=['GET'])
|
|
def create_account_redirect() -> Response:
|
|
return redirect('/')
|
|
|
|
|
|
@writing_blueprint.route('/create_account', methods=['POST'])
|
|
def create_account() -> Response:
|
|
email_address = request.form['login']
|
|
password = request.form['password']
|
|
|
|
if password != request.form['confirm-password']:
|
|
return render_index(error="Password and confirm password must match!", status=400)
|
|
password_len = len(password)
|
|
if password_len < 8 or password_len > 64:
|
|
return render_index(error="Password must be between 8 and 64 characters", status=400)
|
|
|
|
try:
|
|
validate_email(email_address, check_deliverability=True)
|
|
except EmailNotValidError as e:
|
|
return render_index(error=str(e), status=400)
|
|
|
|
if db.get_user(email_address):
|
|
return render_index(error="A user with that email already exists!", status=400)
|
|
|
|
db.add_user(email_address, password)
|
|
session['username'] = email_address
|
|
return redirect('/')
|
|
|
|
|
|
@writing_blueprint.route('/logout', methods=['POST', 'GET'])
|
|
def logout() -> Response:
|
|
session.pop('username', None)
|
|
return redirect('/')
|
|
|
|
|
|
FREE_TIER_TEMPLATES = 2
|
|
|
|
|
|
@writing_blueprint.route('/add_letter')
|
|
def add_letter() -> Response:
|
|
email_address = session.get('username')
|
|
if not email_address:
|
|
return render_index()
|
|
user = db.get_user(email_address)
|
|
|
|
existing_letter_count = len(db.get_user_letters(user.id))
|
|
if user.in_free_tier() and existing_letter_count >= FREE_TIER_TEMPLATES:
|
|
return render_index(error=f'A maximum of {FREE_TIER_TEMPLATES} templates are available to each user.')
|
|
new_letter_name = f'Letter{existing_letter_count + 1}'
|
|
default_form_json = jsonify(CLForm().to_cl_data()).get_data(True)
|
|
db.add_letter(user.id, new_letter_name, default_form_json)
|
|
return redirect(f'/?letter_name={new_letter_name}')
|
|
|
|
|
|
def letter_choices(letters: list[db.Letter]) -> list[(int, str)]:
|
|
return [(i + 1, letter.title) for i, letter in enumerate(letters)]
|
|
|
|
|
|
@writing_blueprint.route('/', methods=['GET'])
|
|
def index_get() -> Response:
|
|
email_address = session.get('username')
|
|
if not email_address:
|
|
return render_index()
|
|
|
|
form = CLForm()
|
|
user = db.get_user(email_address)
|
|
letters = db.get_user_letters(user.id)
|
|
letter_names = letter_choices(letters)
|
|
if len(letter_names) == 0:
|
|
return render_index()
|
|
form.letterName.choices = letter_names
|
|
|
|
form.letterName.data = 1
|
|
selected_letter = request.args.get('letter_name')
|
|
|
|
if selected_letter:
|
|
for i, letter in enumerate(letters):
|
|
if letter.title == selected_letter:
|
|
form.letterName.data = i + 1
|
|
break
|
|
|
|
# TODO: Load this data more dynamically
|
|
# I.e. use a dictionary instead of explicitly-defined fields
|
|
data = json.loads(letters[form.letterName.data - 1].contents)
|
|
|
|
# Ensures default value is set
|
|
form.letterName.process_data(form.letterName.data)
|
|
|
|
form.company.data = data['company']
|
|
form.body.data = data['body']
|
|
form.closingText.data = data['closingText']
|
|
form.jobAndPronoun.data = data['jobAndPronoun']
|
|
form.mySkills.data = data['mySkills']
|
|
form.skillTypes.data = data['skillTypes']
|
|
form.username.data = data['username']
|
|
|
|
return render_index(form=form)
|
|
|
|
|
|
@writing_blueprint.route('/reset', methods=['POST', 'GET'])
|
|
def reset_password() -> Response | str:
|
|
if request.method == 'POST':
|
|
email_address = request.form.get('login')
|
|
existing_reset_id = request.form.get('reset_id')
|
|
if email_address:
|
|
reset_id = db.initiate_password_reset(email_address)
|
|
if reset_id:
|
|
if not email.send_password_reset(email_address, 'https://undercover.cafe/reset?id=' + str(reset_id)):
|
|
return render_index(error="Failed to send reset email. Please try again later.", status=500)
|
|
elif existing_reset_id:
|
|
new_password = request.form['password']
|
|
if not db.complete_reset(existing_reset_id, new_password):
|
|
return render_index(error="Password reset failed. Your reset link may have expired.", status=500)
|
|
# TODO: Log in?
|
|
|
|
return redirect('/')
|
|
|
|
query_reset_id = request.args.get('id')
|
|
# TODO: Add password validation
|
|
return f'''
|
|
<form formaction="/reset" method="post">
|
|
<p><input type=hidden name=reset_id value="{query_reset_id}"></p>
|
|
<label>New Password:</label>
|
|
<p><input type=password name=password></p>
|
|
<p><input type=submit value="Save Password"></p>
|
|
</form>
|
|
'''
|
|
|
|
|
|
@writing_blueprint.route('/dbtest', methods=['GET'])
|
|
def db_test_get() -> Response:
|
|
response = make_response(db.get_user_letters(1)[0].contents, 200)
|
|
response.mimetype = "text/plain"
|
|
return response
|
|
|
|
|
|
@writing_blueprint.route('/update', methods=['POST'])
|
|
def update_get() -> Response:
|
|
expected_token = os.environ['GITLAB_HOOK_TOKEN']
|
|
given_token = request.headers['X-Gitlab-Token']
|
|
event_type = request.headers['X-Gitlab-Event']
|
|
|
|
if expected_token == given_token and event_type == "Push Hook":
|
|
print("Update notification received.")
|
|
response = make_response("", 200)
|
|
response.mimetype = "text/plain"
|
|
threading.Timer(5, git_update, []).start()
|
|
return response
|
|
else:
|
|
return make_response("", 404)
|
|
|
|
|
|
def git_update() -> None:
|
|
script = os.environ['UPDATE_SCRIPT_PATH']
|
|
if not script:
|
|
return
|
|
subprocess.Popen(['bash', '-c', "test -f " + script + " && " + script])
|
|
|
|
|
|
@writing_blueprint.route('/', methods=['POST'])
|
|
def generate_pdf() -> Response:
|
|
email_address = session.get('username')
|
|
form = CLForm(request.form, email_address=email_address)
|
|
if not form.validate():
|
|
return render_index(form=form)
|
|
|
|
data = form.to_cl_data()
|
|
|
|
if email_address:
|
|
user = db.get_user(email_address)
|
|
letters = db.get_user_letters(user.id)
|
|
letter_json = jsonify(data).get_data(True)
|
|
if len(letters) == 0:
|
|
db.add_letter(user.id, 'Letter1', letter_json)
|
|
else:
|
|
letter = letters[data.selectedLetter]
|
|
# TODO: Support title editing
|
|
db.edit_letter(letter.id, letter.title, letter_json)
|
|
|
|
try:
|
|
resp = data.generate_pdf()
|
|
except ValueError as e:
|
|
resp = render_index(form=form, letter_errors=e.args[0])
|
|
|
|
# Save entered data as cookies on user's machine
|
|
for pair in data.get_pairs():
|
|
resp.set_cookie(pair[0], urllib.parse.quote(pair[1]))
|
|
|
|
return resp
|