diff --git a/database_helper.py b/database_helper.py new file mode 100644 index 0000000000000000000000000000000000000000..e5c3ad9b4120a8afbeb222400ccd71b73ff2a9ae --- /dev/null +++ b/database_helper.py @@ -0,0 +1,135 @@ +from flask import g, Flask +import re +import sqlite3 + +app = Flask(__name__) +DATABASE = './database.db' +all_tokens = {} + +def get_db(): + if 'db' not in g: + g.db = sqlite3.connect(DATABASE) + g.db.row_factory = sqlite3.Row + return g.db + +def init_db(): + with app.app_context(): + db = get_db() + with open('schema.sql', 'r') as f: + sql_script = f.read() + db.executescript(sql_script) + db.commit() + +def change_password(email: str, oldpassword: str, newpassword:str): + db = get_db() + cursor = db.cursor() + cursor.execute("SELECT password FROM users WHERE email = ?", (email,)) + result = cursor.fetchone() + if not result or result[0] != oldpassword: + return {"success": False, "message": "Old password is incorrect", "data": None} + + try: + cursor.execute("UPDATE users SET password = ? WHERE email = ?", (newpassword, email)) + db.commit() + return {"success": True, "message": "Password changed successfully", "data": None} + except Exception as e: + db.rollback() + return {"success": False, "message": "Failed to change password", "data": None} + +def add_user(email, password, firstname, familyName, gender, city, country): + with app.app_context(): + db = get_db() + cursor = db.cursor() + cursor.execute("INSERT INTO users (email, password, firstname, familyName, gender, city, country) VALUES (?, ?, ?, ?, ?, ?, ?)", (email, password, firstname, familyName, gender, city, country)) + db.commit() + +def validate_email(email): + if email is None: + return True + email_regex = r'^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+$' + return re.match(email_regex, email) is not None + +def validate_password(password): + return password is not None and len(password) >= 6 + +def validate_name(name): + return name is not None and name.isalpha() + +def validate_gender(gender): + return gender is not None and gender.lower() in ['male', 'female', 'others'] + +def validate_city_or_country(region): + return region is not None + +def validate_inputs(email, password, firstname, familyName, gender, city, country): + errors = {} + + if not validate_email(email): + errors['email'] = "Invalid email format." + + if not validate_password(password): + errors['password'] = "Password must be at least 6 characters long." + + if not validate_name(firstname): + errors['firstname'] = "Firstname should contain only alphabetic characters." + + if not validate_name(familyName): + errors['familyName'] = "Family name should contain only alphabetic characters." + + if not validate_gender(gender): + errors['gender'] = "Gender must be 'Male' or 'Female' or 'Others'." + + if not validate_city_or_country(city): + errors['city'] = "City should contain only alphabetic characters." + + if not validate_city_or_country(country): + errors['country'] = "Country should contain only alphabetic characters." + return errors + +def is_email_exist( email : str ): + with app.app_context(): + db = get_db() + cursor = db.cursor() + return cursor.execute('SELECT * FROM users WHERE email=?',(email,)).fetchone() + +def check_sign_in_params( username: str, password: str ): + with app.app_context(): + db = get_db() + cursor = db.cursor() + return cursor.execute('SELECT * FROM users WHERE email=? AND password=?', (username, password)).fetchone() + +def change_password( email : str , oldpassword: str, newpassword: str): + with app.app_context(): + db = get_db() + cursor = db.cursor() + + cursor.execute("SELECT password FROM users WHERE email=?", (email,)) + + result = cursor.fetchone() + + if not result or result[0] != oldpassword: + return {"success": False, "message": "Old password is incorrect", "data": None} + + try: + cursor.execute("UPDATE users SET password=? WHERE email=?", (newpassword, email)) + db.commit() + + return {"success": True, "message": "Password changed successfully", "data": newpassword} + + except Exception as e: + db.rollback() + return {"success": False, "message": "Failed to change password", "data": None } + + + +def get_user_data_by_email( email : str ): + db = get_db() + cursor = db.cursor() + + result = cursor.execute('SELECT * FROM users WHERE email=?', (email,)).fetchone() + + if not result: + return {"success": False, "message" : "Failed to fetch user data by the provided email", "data" : None} + + return {"success": True, "message" : "Successfully fetched data", "data" : dict(result)} +