Start toying with some simple db functionality.

This commit is contained in:
Sage Vaillancourt 2022-09-21 23:14:50 -04:00
parent 924bdbd8ce
commit b5e892f57e
4 changed files with 189 additions and 27 deletions

View File

@ -1,8 +1,6 @@
# Copyright Sage Vaillancourt 2021 # Copyright Sage Vaillancourt 2021
import os import os
import time
import subprocess
from flask import ( from flask import (
Flask, redirect, url_for, render_template, send_from_directory Flask, redirect, url_for, render_template, send_from_directory
@ -12,6 +10,7 @@ import writing
INDEX = None INDEX = None
def optimize_css(): def optimize_css():
import re import re
root = os.path.dirname(os.getcwd()) root = os.path.dirname(os.getcwd())
@ -26,6 +25,7 @@ def optimize_css():
minified_file.write(minified) minified_file.write(minified)
minified_file.close() minified_file.close()
def create_app(test_config=None): def create_app(test_config=None):
optimize_css() optimize_css()

139
flaskr/db.py Normal file
View File

@ -0,0 +1,139 @@
import bcrypt
import os
import psycopg2
from dataclasses import dataclass
@dataclass
class Letter:
id: int
title: str
contents: str
@dataclass
class User:
id: int
email: str
@dataclass
class UserWithHash:
id: int
email: str
password_hash: str
def connect():
return psycopg2.connect(
host=os.environ['UNDERCOVER_POSTGRES_HOST'],
dbname=os.environ['UNDERCOVER_POSTGRES_DBNAME'],
port=os.environ['UNDERCOVER_POSTGRES_PORT'],
user=os.environ['UNDERCOVER_POSTGRES_USER'],
password=os.environ['UNDERCOVER_POSTGRES_PASSWORD'])
def connected(action):
with connect() as con:
cur = con.cursor()
return action(cur, con)
def login(user_email: str, password: str):
pw_bytes: bytes = password.encode('utf-8')
user = __get_user(user_email)
return bcrypt.checkpw(pw_bytes, user.password_hash.encode('utf-8'))
def add_user(username: str, password: str):
pw_bytes = password.encode('utf-8')
salt = bcrypt.gensalt()
pw_hash = bcrypt.hashpw(pw_bytes, salt)
decoded = pw_hash.decode('utf-8')
with connect() as con:
cur = con.cursor()
cur.execute("INSERT INTO users(email, password) VALUES (%s, %s)", (username, decoded))
con.commit()
def delete_user(username: str):
with connect() as con:
cur = con.cursor()
cur.execute("DELETE FROM users WHERE email = %s", (username,))
con.commit()
def add_user_lambda(username: str, password: str):
def f(cur, con):
cur.execute("INSERT INTO users(email, password) VALUES (%s, %s)", (username, password))
con.commit()
connected(f)
def add_letter(user_id: int, letter_title: str, letter_content: str):
with connect() as con:
cur = con.cursor()
cur.execute("INSERT INTO letter_data(user_id, letter_name, letter_data) VALUES (%s, %s, %s)",
(user_id, letter_title, letter_content))
con.commit()
def edit_letter(letter_id: int, letter_title: str, letter_content: str):
with connect() as con:
cur = con.cursor()
cur.execute("UPDATE letter_data SET letter_name = %s, letter_data = %s WHERE id = %s",
(letter_title, letter_content, letter_id))
con.commit()
def get_user_letters(user_id: int) -> [Letter]:
with connect() as con:
cur = con.cursor()
cur.execute("SELECT id, letter_name, letter_data FROM letter_data WHERE user_id = %s", str(user_id))
return map(lambda row: Letter(row[0], row[1], row[2]), cur.fetchall())
def get_user(email: str) -> User:
user = __get_user(email)
return User(user.id, user.email)
def __get_user(email: str) -> UserWithHash:
"""
:param email:
:return: User without their password_hash
"""
with connect() as con:
cur = con.cursor()
cur.execute("SELECT id, password FROM users WHERE users.email = %s", (email,))
user_id, password = cur.fetchone()
return UserWithHash(user_id, email, password)
def get_users() -> [UserWithHash]:
with connect() as con:
cur = con.cursor()
cur.execute("SELECT id, email, password FROM users")
return map(lambda row: UserWithHash(row[0], row[1], row[2]), cur.fetchall())
add_user("hash_man", "hashword")
print("Can pull correctly: " + str(login("hash_man", "hashword")))
delete_user("hash_man")
# add_letter(1, "Dynamically-added", "This is a letter added from Python!")
# edit_letter(3, "Dynamically edited!", "This letter was dynamically edited from Python!")
# for letter in get_user_letters(1):
# print("\'" + letter.title + "\"" + ":")
# print(" id: " + str(letter.id))
# print(" letter-data: " + letter.contents)
# print()
# for user in get_users():
# print(user.email + ":")
# print(" id: " + str(user.id))
# print(" password: " + user.password_hash)
# print()

View File

@ -1,9 +1,12 @@
click==8.0.1 bcrypt==4.0.0
Flask==2.0.1 click==8.1.3
Flask-WTF==0.15.1 Flask==2.2.2
Flask-WTF==1.0.1
Flask-SQLAlchemy==2.5.1
gunicorn==20.1.0 gunicorn==20.1.0
itsdangerous==2.0.1 itsdangerous==2.1.2
Jinja2==3.0.1 Jinja2==3.1.2
MarkupSafe==2.0.1 MarkupSafe==2.1.1
Werkzeug==2.0.1 psycopg==3.1.1
WTForms==2.3.3 Werkzeug==2.2.2
WTForms==3.0.1

View File

@ -1,62 +1,80 @@
# Copyright Sage Vaillancourt 2021 # Copyright Sage Vaillancourt 2021
from flask import (Blueprint, render_template, request, make_response) from flask import (Blueprint, render_template, request, make_response)
from wtforms import Form, BooleanField, StringField, TextAreaField, validators from wtforms import Form, StringField, TextAreaField, validators
import urllib.parse import urllib.parse
from latty import CLData from latty import CLData
import flaskr import flaskr
import flaskr.db as db
writing_blueprint = Blueprint('writing', __name__,) writing_blueprint = Blueprint('writing', __name__,)
class CLForm(Form): class CLForm(Form):
username = StringField('Username:', username = StringField(
'Username:',
[validators.Length(min=4, max=99)], [validators.Length(min=4, max=99)],
default="Sage Bernerner" default="Sage Bernerner"
) )
company = StringField('Company:', company = StringField(
'Company:',
[validators.Length(min=2, max=99)], [validators.Length(min=2, max=99)],
default="BananaCorp" default="BananaCorp"
) )
jobandpronoun = StringField('Job and Pronoun (a/an):', jobandpronoun = StringField(
'Job and Pronoun (a/an):',
[validators.Length(min=4, max=99)], [validators.Length(min=4, max=99)],
default="a banana stocker" default="a banana stocker"
) )
skilltypes = StringField('Skill Type:', skilltypes = StringField(
'Skill Type:',
[validators.Length(min=2, max=99)], [validators.Length(min=2, max=99)],
default="practical" default="practical"
) )
myskills = StringField('My Skills:', myskills = StringField(
'My Skills:',
[validators.Length(min=2, max=99)], [validators.Length(min=2, max=99)],
default="stocking bananas" default="stocking bananas"
) )
closingtext = TextAreaField('Closing Text:', closingtext = TextAreaField(
'Closing Text:',
[validators.Length(min=2, max=99)], [validators.Length(min=2, max=99)],
default="I look forward to hearing from you" default="I look forward to hearing from you"
) )
body_string = ("My name is {\\username}. I would like to work as " body_string = (
"My name is {\\username}. I would like to work as "
"{\\jobandpronoun} with your company. I think my {\\skilltypes} knowledge " "{\\jobandpronoun} with your company. I think my {\\skilltypes} knowledge "
"of {\\myskills} can contribute a lot to your team.\n\n" "of {\\myskills} can contribute a lot to your team.\n\n"
"I am passionate about what I do, and I think we would work well together.\n\n" "I am passionate about what I do, and I think we would work well together.\n\n"
"Thank you for your consideration.") "Thank you for your consideration."
body = TextAreaField('Body:', )
body = TextAreaField(
'Body:',
[validators.Length(min=4, max=9999)], [validators.Length(min=4, max=9999)],
default=body_string default=body_string
) )
@writing_blueprint.route('/', methods=['GET']) @writing_blueprint.route('/', methods=['GET'])
def index_get(): def index_get():
if flaskr.INDEX == None: if flaskr.INDEX is None:
flaskr.INDEX = render_template( flaskr.INDEX = render_template(
'writing.html', 'writing.html',
form=CLForm() form=CLForm()
) )
return flaskr.INDEX return flaskr.INDEX
@writing_blueprint.route('/dbtest', methods=['GET'])
def index_get():
return db.get_user_letters(1)[0].contents
@writing_blueprint.route('/', methods=['POST']) @writing_blueprint.route('/', methods=['POST'])
def index_post(): def index_post():
form = CLForm(request.form) form = CLForm(request.form)
@ -73,7 +91,8 @@ def index_post():
(resp, errors) = data.generate_pdf() (resp, errors) = data.generate_pdf()
if errors: if errors:
resp = make_response(render_template('writing.html', resp = make_response(render_template(
'writing.html',
form=form, form=form,
errors=errors, errors=errors,
)) ))
@ -82,6 +101,7 @@ def index_post():
resp.set_cookie(pair[0], urllib.parse.quote(pair[1])) resp.set_cookie(pair[0], urllib.parse.quote(pair[1]))
return resp return resp
return render_template('writing.html', return render_template(
'writing.html',
form=form, form=form,
) )