# Copyright Sage Vaillancourt 2021 import json import logging import os import subprocess import threading import urllib.parse from flask import Blueprint, render_template, request, make_response, session, redirect, jsonify, Response, current_app from wtforms import Form, SelectField, StringField, TextAreaField, validators from email_validator import validate_email, EmailNotValidError from app import db, email from app.pdf_builder import CLData writing_blueprint = Blueprint('writing', __name__) class CLForm(Form): def __init__(self, form: dict[str, str] = None, email_address: str = None, **kwargs): super().__init__(form, **kwargs) if email_address: user = db.get_user(email_address) letters = db.get_user_letters(user.id) self.letterName.choices = letter_choices(letters) letterName = SelectField( 'Select Template:', [validators.optional()], choices=[(1, 'LETTER TITLE')] ) username = StringField( 'Your Name:', [validators.Length(min=4, max=99)], default="Sage" ) company = StringField( 'Company:', [validators.Length(min=2, max=99)], default="BananaCorp" ) jobAndPronoun = StringField( 'Job and Pronoun (a/an):', [validators.Length(min=4, max=99)], default="a banana stocker" ) skillTypes = StringField( 'Skill Type:', [validators.Length(min=2, max=99)], default="practical" ) mySkills = StringField( 'My Skills:', [validators.Length(min=2, max=99)], default="stocking bananas" ) closingText = TextAreaField( 'Closing Text:', [validators.Length(min=2, max=99)], default="I look forward to hearing from you" ) body = TextAreaField( 'Body:', [validators.Length(min=4, max=9999)], default=( "My name is {\\username}. I'm excited for the opportunity to work as " "{\\jobAndPronoun} with your company. I think with my {\\skillTypes} knowledge " "of {\\mySkills}, there's a lot I could contribute to your team.\n\n" "I am passionate about what I do, and I believe we would work well together.\n\n" "Thank you for your consideration." ) ) def to_cl_data(self) -> CLData: return CLData( selectedLetter=int(self.letterName.data or '1') - 1, username=self.username.data, company=self.company.data, jobAndPronoun=self.jobAndPronoun.data, skillTypes=self.skillTypes.data, mySkills=self.mySkills.data, closingText=self.closingText.data, body=self.body.data, ) def render_index( form: CLForm = CLForm(), error: str = None, status: int = 200, letter_errors: list[str] = None ) -> Response: return make_response( render_template( 'writing.jinja2', form=form, username=session.get('username'), error=error, letter_errors=letter_errors, ), status) @writing_blueprint.route('/login', methods=['POST']) def login() -> Response | str: username = request.form['login'] if db.login(username, request.form['password']): session['username'] = username return redirect('/') return render_index(error="Invalid username or password", status=401) @writing_blueprint.route('/test_error', methods=['GET']) def error_test() -> Response: raise Exception("Test error") @writing_blueprint.route('/create_account', methods=['GET']) def create_account_redirect() -> Response: return redirect('/') @writing_blueprint.route('/create_account', methods=['POST']) def create_account() -> Response: email_address = request.form['login'] password = request.form['password'] if password != request.form['confirm-password']: return render_index(error="Password and confirm password must match!", status=400) password_len = len(password) if password_len < 8 or password_len > 64: return render_index(error="Password must be between 8 and 64 characters", status=400) try: validate_email(email_address, check_deliverability=True) except EmailNotValidError as e: return render_index(error=str(e), status=400) if db.get_user(email_address): return render_index(error="A user with that email already exists!", status=400) db.add_user(email_address, password) session['username'] = email_address return redirect('/') @writing_blueprint.route('/logout', methods=['POST', 'GET']) def logout() -> Response: session.pop('username', None) return redirect('/') FREE_TIER_TEMPLATES = 2 @writing_blueprint.route('/add_letter') def add_letter() -> Response: email_address = session.get('username') if not email_address: return render_index() user = db.get_user(email_address) existing_letter_count = len(db.get_user_letters(user.id)) if user.in_free_tier() and existing_letter_count >= FREE_TIER_TEMPLATES: return render_index(error=f'A maximum of {FREE_TIER_TEMPLATES} templates are available to each user.') new_letter_name = f'Letter{existing_letter_count + 1}' default_form_json = jsonify(CLForm().to_cl_data()).get_data(True) db.add_letter(user.id, new_letter_name, default_form_json) return redirect(f'/?letter_name={new_letter_name}') def letter_choices(letters: list[db.Letter]) -> list[(int, str)]: return [(i + 1, letter.title) for i, letter in enumerate(letters)] @writing_blueprint.route('/', methods=['GET']) def index_get() -> Response: email_address = session.get('username') if not email_address: return render_index() form = CLForm() user = db.get_user(email_address) letters = db.get_user_letters(user.id) letter_names = letter_choices(letters) if len(letter_names) == 0: return render_index() form.letterName.choices = letter_names form.letterName.data = 1 selected_letter = request.args.get('letter_name') if selected_letter: for i, letter in enumerate(letters): if letter.title == selected_letter: form.letterName.data = i + 1 break # TODO: Load this data more dynamically # I.e. use a dictionary instead of explicitly-defined fields data = json.loads(letters[form.letterName.data - 1].contents) # Ensures default value is set form.letterName.process_data(form.letterName.data) form.company.data = data['company'] form.body.data = data['body'] form.closingText.data = data['closingText'] form.jobAndPronoun.data = data['jobAndPronoun'] form.mySkills.data = data['mySkills'] form.skillTypes.data = data['skillTypes'] form.username.data = data['username'] return render_index(form=form) @writing_blueprint.route('/reset', methods=['POST', 'GET']) def reset_password() -> Response | str: if request.method == 'POST': email_address = request.form.get('login') existing_reset_id = request.form.get('reset_id') if email_address: reset_id = db.initiate_password_reset(email_address) if reset_id: if not email.send_password_reset(email_address, 'https://undercover.cafe/reset?id=' + str(reset_id)): return render_index(error="Failed to send reset email. Please try again later.", status=500) elif existing_reset_id: new_password = request.form['password'] if not db.complete_reset(existing_reset_id, new_password): return render_index(error="Password reset failed. Your reset link may have expired.", status=500) # TODO: Log in? return redirect('/') query_reset_id = request.args.get('id') # TODO: Add password validation return f'''
''' @writing_blueprint.route('/dbtest', methods=['GET']) def db_test_get() -> Response: response = make_response(db.get_user_letters(1)[0].contents, 200) response.mimetype = "text/plain" return response @writing_blueprint.route('/update', methods=['POST']) def update_get() -> Response: expected_token = os.environ['GITLAB_HOOK_TOKEN'] given_token = request.headers['X-Gitlab-Token'] event_type = request.headers['X-Gitlab-Event'] if expected_token == given_token and event_type == "Push Hook": print("Update notification received.") response = make_response("", 200) response.mimetype = "text/plain" threading.Timer(5, git_update, []).start() return response else: return make_response("", 404) def git_update() -> None: script = os.environ['UPDATE_SCRIPT_PATH'] if not script: return subprocess.Popen(['bash', '-c', "test -f " + script + " && " + script]) @writing_blueprint.route('/status', methods=['GET']) def status_page() -> Response: status_message = f"Currently running on '{os.environ.get('UNDERCOVER_SERVER_NAME')}' server" current_git_hash = subprocess.check_output(['pwd']) #current_git_hash = subprocess.check_output(['git', 'rev-parse', '--short', 'HEAD'], shell=True, universal_newlines=True) return make_response(render_template('error.jinja2', status=200, error_text=status_message, extra_text=current_git_hash), 200) @writing_blueprint.route('/', methods=['POST']) def generate_pdf() -> Response: email_address = session.get('username') form = CLForm(request.form, email_address=email_address) if not form.validate(): return render_index(form=form) data = form.to_cl_data() if email_address: user = db.get_user(email_address) letters = db.get_user_letters(user.id) letter_json = jsonify(data).get_data(True) if len(letters) == 0: db.add_letter(user.id, 'Letter1', letter_json) else: letter = letters[data.selectedLetter] # TODO: Support title editing db.edit_letter(letter.id, letter.title, letter_json) try: resp = data.generate_pdf() except ValueError as e: resp = render_index(form=form, letter_errors=e.args[0]) # Save entered data as cookies on user's machine for pair in data.get_pairs(): resp.set_cookie(pair[0], urllib.parse.quote(pair[1])) return resp