UnderCover/undercover/db.py

156 lines
4.2 KiB
Python

import os
from dataclasses import dataclass
from datetime import datetime, timedelta
from typing import Optional
from uuid import uuid4, UUID
import bcrypt
import psycopg
@dataclass
class Letter:
id: int
title: str
contents: str
@dataclass
class User:
id: int
email: str
@dataclass
class UserWithHash:
id: int
email: str
password_hash: str
host = os.environ.get('UNDERCOVER_POSTGRES_HOST')
db_name = os.environ.get('UNDERCOVER_POSTGRES_DBNAME')
port = os.environ.get('UNDERCOVER_POSTGRES_PORT')
db_user = os.environ.get('UNDERCOVER_POSTGRES_USER')
db_available = host and db_name and port and db_user and os.environ.get('UNDERCOVER_POSTGRES_PASSWORD')
def connect():
return psycopg.connect(
host=host,
dbname=db_name,
port=port,
user=db_user,
password=os.environ.get('UNDERCOVER_POSTGRES_PASSWORD'))
def login(user_email: str, password: str):
pw_bytes: bytes = password.encode('utf-8')
user = __get_user(user_email)
if user:
return bcrypt.checkpw(pw_bytes, user.password_hash.encode('utf-8'))
return False
def __gen_pw_hash(password: str):
pw_bytes = password.encode('utf-8')
salt = bcrypt.gensalt()
pw_hash = bcrypt.hashpw(pw_bytes, salt)
return pw_hash.decode('utf-8')
def add_user(username: str, password: str):
pw_hash = __gen_pw_hash(password)
with connect() as con:
cur = con.cursor()
cur.execute("INSERT INTO users(email, password) VALUES (%s, %s)", (username, pw_hash))
con.commit()
def delete_user(username: str):
with connect() as con:
cur = con.cursor()
cur.execute("DELETE FROM users WHERE email = %s", (username,))
con.commit()
def add_letter(user_id: int, letter_title: str, letter_content: str):
with connect() as con:
cur = con.cursor()
cur.execute("INSERT INTO letter_data(user_id, letter_name, letter_data) VALUES (%s, %s, %s)",
(user_id, letter_title, letter_content))
con.commit()
def edit_letter(letter_id: int, letter_title: str, letter_content: str):
with connect() as con:
cur = con.cursor()
cur.execute("UPDATE letter_data SET letter_name = %s, letter_data = %s WHERE id = %s",
(letter_title, letter_content, letter_id))
con.commit()
def get_user_letters(user_id: int) -> [Letter]:
with connect() as con:
cur = con.cursor()
cur.execute("SELECT id, letter_name, letter_data FROM letter_data WHERE user_id = %s", (str(user_id),))
return list(map(lambda row: Letter(row[0], row[1], row[2]), cur.fetchall()))
def get_user(email: str) -> Optional[User]:
user = __get_user(email)
if user:
return User(user.id, user.email)
return None
def __get_user(email: str) -> Optional[UserWithHash]:
"""
:param email:
:return: User without their password_hash
"""
with connect() as con:
cur = con.cursor()
cur.execute("SELECT id, password FROM users WHERE users.email ILIKE %s", (email,))
row = cur.fetchone()
if row:
user_id, password = row
return UserWithHash(user_id, email, password)
return None
def initiate_password_reset(email: str) -> Optional[UUID]:
user = get_user(email)
if not user:
return None
reset_id = uuid4()
with connect() as con:
cur = con.cursor()
cur.execute(
"INSERT INTO resets(user_id, id, reset_time) VALUES (%s, %s, NOW())",
(user.id, reset_id)
)
con.commit()
return reset_id
RESET_TIME = timedelta(minutes=-1 * 15)
def complete_reset(reset_id: str, new_password: str):
with connect() as con:
cur = con.cursor()
cur.execute("SELECT reset_time, user_id FROM resets WHERE id = %s", (reset_id,))
row = cur.fetchone()
if not row or not row[0]:
return False
reset_time, user_id = row
if reset_time > (datetime.utcnow() + RESET_TIME):
cur.execute("DELETE FROM resets WHERE id = %s", (reset_id,))
password_hash = __gen_pw_hash(new_password)
cur.execute("UPDATE users SET password = %s WHERE id = %s", (password_hash, user_id))
con.commit()
return True
return False