UnderCover/undercover/db.py

192 lines
5.4 KiB
Python

from uuid import uuid4, UUID
import os
from dataclasses import dataclass
from typing import Optional
import bcrypt
import psycopg
@dataclass
class Letter:
id: int
title: str
contents: str
@dataclass
class User:
id: int
email: str
@dataclass
class UserWithHash:
id: int
email: str
password_hash: str
host = os.environ.get('UNDERCOVER_POSTGRES_HOST')
db_name = os.environ.get('UNDERCOVER_POSTGRES_DBNAME')
port = os.environ.get('UNDERCOVER_POSTGRES_PORT')
user = os.environ.get('UNDERCOVER_POSTGRES_USER')
db_available = host and db_name and port and user and os.environ.get('UNDERCOVER_POSTGRES_PASSWORD')
def connect():
return psycopg.connect(
host=host,
dbname=db_name,
port=port,
user=user,
password=os.environ.get('UNDERCOVER_POSTGRES_PASSWORD'))
def connected(action):
with connect() as con:
cur = con.cursor()
return action(cur, con)
def login(user_email: str, password: str):
pw_bytes: bytes = password.encode('utf-8')
user = __get_user(user_email)
if user:
return bcrypt.checkpw(pw_bytes, user.password_hash.encode('utf-8'))
return False
def __gen_pw_hash(password: str):
pw_bytes = password.encode('utf-8')
salt = bcrypt.gensalt()
pw_hash = bcrypt.hashpw(pw_bytes, salt)
return pw_hash.decode('utf-8')
def add_user(username: str, password: str):
pw_hash = __gen_pw_hash(password)
with connect() as con:
cur = con.cursor()
cur.execute("INSERT INTO users(email, password) VALUES (%s, %s)", (username, pw_hash))
con.commit()
def delete_user(username: str):
with connect() as con:
cur = con.cursor()
cur.execute("DELETE FROM users WHERE email = %s", (username,))
con.commit()
def add_user_lambda(username: str, password: str):
def f(cur, con):
cur.execute("INSERT INTO users(email, password) VALUES (%s, %s)", (username, password))
con.commit()
connected(f)
def add_letter(user_id: int, letter_title: str, letter_content: str):
with connect() as con:
cur = con.cursor()
cur.execute("INSERT INTO letter_data(user_id, letter_name, letter_data) VALUES (%s, %s, %s)",
(user_id, letter_title, letter_content))
con.commit()
def edit_letter(letter_id: int, letter_title: str, letter_content: str):
with connect() as con:
cur = con.cursor()
cur.execute("UPDATE letter_data SET letter_name = %s, letter_data = %s WHERE id = %s",
(letter_title, letter_content, letter_id))
con.commit()
def get_user_letters(user_id: int) -> [Letter]:
with connect() as con:
cur = con.cursor()
cur.execute("SELECT id, letter_name, letter_data FROM letter_data WHERE user_id = %s", (str(user_id),))
return list(map(lambda row: Letter(row[0], row[1], row[2]), cur.fetchall()))
def get_user(email: str) -> Optional[User]:
user = __get_user(email)
if user:
return User(user.id, user.email)
return None
def __get_user(email: str) -> Optional[UserWithHash]:
"""
:param email:
:return: User without their password_hash
"""
with connect() as con:
cur = con.cursor()
cur.execute("SELECT id, password FROM users WHERE users.email ILIKE %s", (email,))
row = cur.fetchone()
if row:
user_id, password = row
return UserWithHash(user_id, email, password)
return None
def get_users() -> [UserWithHash]:
with connect() as con:
cur = con.cursor()
cur.execute("SELECT id, email, password FROM users")
return map(lambda row: UserWithHash(row[0], row[1], row[2]), cur.fetchall())
def initiate_password_reset(email: str) -> Optional[UUID]:
user = get_user(email)
if not user:
return None
reset_id = uuid4()
with connect() as con:
cur = con.cursor()
cur.execute(
"INSERT INTO resets(user_id, id, reset_time) VALUES (%s, %s, NOW())",
(user.id, reset_id)
)
con.commit()
return reset_id
def complete_reset(reset_id: str, new_password: str):
with connect() as con:
cur = con.cursor()
cur.execute("SELECT reset_time, user_id FROM resets WHERE id = %s", (reset_id,))
row = cur.fetchone()
if row:
reset_time, user_id = row
if reset_time: # TODO: And reset_time is not too far in the past
cur.execute("DELETE FROM resets WHERE id = %s", (reset_id,))
password_hash = __gen_pw_hash(new_password)
cur.execute("UPDATE users SET password = %s WHERE id = %s", (password_hash, user_id))
con.commit()
return True
return False
if __name__ == "__main__":
add_user("hash_man", "hashword")
print("Can pull correctly: " + str(login("hash_man", "hashword")))
delete_user("hash_man")
# add_letter(1, "Dynamically-added", "This is a letter added from Python!")
# edit_letter(3, "Dynamically edited!", "This letter was dynamically edited from Python!")
# for letter in get_user_letters(1):
# print("\'" + letter.title + "\"" + ":")
# print(" id: " + str(letter.id))
# print(" letter-data: " + letter.contents)
# print()
# for user in get_users():
# print(user.email + ":")
# print(" id: " + str(user.id))
# print(" password: " + user.password_hash)
# print()