-
Jennifer Lindgren authoredJennifer Lindgren authored
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
app.py 14.65 KiB
import os, sys
import random
from __init__ import app
import database as db
import logging
from flask import Flask, request, jsonify, Response, json
from flask_jwt_extended import JWTManager
from flask_jwt_extended import (create_access_token, create_refresh_token,
jwt_required, jwt_refresh_token_required, get_jwt_identity)
from flask_bcrypt import Bcrypt
from base64 import b64decode
from datetime import datetime
flask_bcrypt = Bcrypt(app)
jwt = JWTManager(app)
# Http status codes
OK_STATUS_CODE = 200
BAD_REQUEST_STATUS_CODE = 400
CONFLICT_STATUS_CODE = 409
UNAUTHORIZED_STATUS_CODE = 401
DEFAULT_EDITOR_SETTINGS = {'fontSize': 16, 'margin': 80, 'tabSize': 4}
def is_valid_credentials(usernameEmail, password):
if '@' in usernameEmail:
user = db.get_user_from_email(usernameEmail)
else:
user = db.get_user_from_username(usernameEmail)
return user and flask_bcrypt.check_password_hash(user.password,
b64decode(password).decode("utf-8"))
def generate_response(response, status, headers={}):
response.status_code = status
response.headers = {**{
'Content-Type': 'application/json',
'Access-Control-Expose-Headers' : 'X-Auth-Token',
}, **headers}
return response
def is_valid_password(password):
return len(password) >= 8 and len(password) <= 100
def is_valid_collaborators(collaborators):
if not collaborators:
return True
acceptedPermissions = ['View-only', 'May edit']
for collaborator in collaborators:
if (not collaborator or not db.is_user_id(collaborator['userId']) or
not collaborator['permission'] in acceptedPermissions):
return False
return True
@jwt.unauthorized_loader
def unauthorized_response(callback):
return jsonify({
'ok': False,
'message': 'Missing Authorization Header'
}), 401
@app.route('/api/auth', methods=["GET"])
@jwt_required
def auth():
return generate_response(jsonify({
'success': True,
'message': 'Authorization successsful'
}), OK_STATUS_CODE)
@app.route('/api/refresh', methods=['POST'])
@jwt_refresh_token_required
def refresh():
current_user = get_jwt_identity()
return generate_response(jsonify({
'success': True,
'message': 'Successfully refreshed token',
'jwt': create_access_token(identity=current_user, fresh=False)
}), OK_STATUS_CODE)
### USER ###
@app.route('/api/sign_up', methods=['POST'])
def sign_up():
email = request.json['email']
username = request.json['username']
password = b64decode(request.json['password']).decode("utf-8")
if (not email or not username or not password):
return generate_response(jsonify({
'success': False,
'message': 'Bad request.'
}), BAD_REQUEST_STATUS_CODE)
elif (not db.is_unregistered_email(email)):
return generate_response(jsonify({
'success': False,
'message': 'User with that email already exists'
}), CONFLICT_STATUS_CODE)
elif (not db.is_unregistered_username(username)):
return generate_response(jsonify({
'success': False,
'message': 'User with that username already exists'
}), CONFLICT_STATUS_CODE)
elif (not is_valid_password(password)):
return generate_response(jsonify({
'success': False,
'message': 'Password does not fulfill requirements'
}), BAD_REQUEST_STATUS_CODE)
else:
new_user = db.User(
email,
username,
flask_bcrypt.generate_password_hash(password).decode('utf-8'))
db.add_user(new_user)
new_editor_settings = db.EditorSettings(
DEFAULT_EDITOR_SETTINGS['fontSize'],
DEFAULT_EDITOR_SETTINGS['margin'],
DEFAULT_EDITOR_SETTINGS['tabSize'])
db.add_editor_settings(new_editor_settings)
new_user.editorSettingsId = new_editor_settings.id
db.update_user(new_user.id, new_user)
return generate_response(db.user_schema.jsonify(new_user),
OK_STATUS_CODE)
@app.route('/api/sign_in', methods=['GET'])
def sign_in():
auth = request.authorization
usernameEmail = auth.username
password = auth.password
if not usernameEmail or not password:
message = 'Missing credentials!'
status = BAD_REQUEST_STATUS_CODE
elif not is_valid_credentials(usernameEmail, password):
message = 'Incorrect username/email or password'
status = UNAUTHORIZED_STATUS_CODE
else:
if '@' in usernameEmail:
username = db.get_username_from_email(usernameEmail)
else:
username = usernameEmail
db_user = db.get_user_from_username(username)
user = {
'id': db_user.id,
'email': db_user.email,
'username': db_user.username,
'editorSettingsId': db_user.editorSettingsId,
'editorSettings': db.editor_settings_schema.dumps(db_user.editorSettings),
'jwt': create_access_token(identity=username),
'refreshToken': create_refresh_token(identity=username)
}
return generate_response(jsonify(user), OK_STATUS_CODE)
return generate_response(jsonify({
'success': False,
'message': message
}), status)
@app.route('/api/change_password', methods=['POST', 'PUT'])
@jwt_required
def change_password():
username = request.json['username']
user = db.get_user_from_username(username)
oldPassword = request.json['oldPassword']
newPassword = request.json['newPassword']
if (not user):
return generate_response(jsonify({
'success': False,
'message': 'No user with that username'
}), BAD_REQUEST_STATUS_CODE)
elif not is_valid_credentials(username, oldPassword):
return generate_response(jsonify({
'success': False,
'message': 'Old password was incorrect'
}),UNAUTHORIZED_STATUS_CODE)
elif (not is_valid_password(newPassword)):
return generate_response(jsonify({
'success': False,
'message': 'New password does not fulfill requirements'
}), BAD_REQUEST_STATUS_CODE)
else:
updated_user = db.change_password(
username,
flask_bcrypt.generate_password_hash(newPassword).decode('utf-8')
)
return generate_response(db.user_schema.jsonify(updated_user), OK_STATUS_CODE)
@app.route('/api/delete_account', methods=['DELETE'])
def delete_user():
auth = request.authorization
username = auth.username
password = auth.password
user = db.get_user_from_username(username)
if not is_valid_credentials(username, password):
return generate_response(jsonify({
'success': False,
'message': 'Password was incorrect'
}),UNAUTHORIZED_STATUS_CODE)
elif (not user):
return generate_response(jsonify({
'success': False,
'message': 'No user with that username'
}), BAD_REQUEST_STATUS_CODE)
else:
db.delete_user_projects(user.id)
db.delete_user(user.id)
return generate_response(jsonify({
'success': True,
'message': 'Successfully deleted account'
}), OK_STATUS_CODE)
@app.route('/api/get_users', methods=['GET'])
@jwt_required
def get_users_request():
return generate_response(db.users_schema.jsonify(db.get_users()),
OK_STATUS_CODE)
@app.route('/api/get_user/<id>', methods=['GET'])
@jwt_required
def get_user_request(id):
return generate_response(db.user_schema.jsonify(db.get_user(id)),
OK_STATUS_CODE)
### EDITOR SETTINGS ###
@app.route('/api/update_editor_settings/<user_id>', methods=['PUT'])
@jwt_required
def update_editor_settings_request(user_id):
editor_settings = request.json['editorSettings']
updated_editor_settings = db.EditorSettings(
editor_settings['fontSize'],
editor_settings['margin'],
editor_settings['tabSize'],
)
updated_editor_settings.id = editor_settings['id']
return generate_response(db.editor_settings_schema.jsonify(
db.update_editor_settings(updated_editor_settings)), OK_STATUS_CODE)
### PROJECT ###
@app.route('/api/add_project', methods=['POST'])
@jwt_required
def new_project():
title = request.json['title']
creatorId = request.json['creatorId']
collaborators = request.json['collaborators'] if request.json['collaborators'] != [] else None
if (not title or not creatorId):
return generate_response(jsonify({
'success': False,
'message': 'Bad request.'
}), BAD_REQUEST_STATUS_CODE)
elif (not db.is_user_id(creatorId)):
return generate_response(jsonify({
'success': False,
'message': 'User does not exist'
}), BAD_REQUEST_STATUS_CODE)
elif (not is_valid_collaborators(collaborators)):
return generate_response(jsonify({
'success': False,
'message': 'Invalid collaborators'
}), BAD_REQUEST_STATUS_CODE)
else:
new_project = db.Project(title, creatorId)
db.add_project(new_project)
app.logger.error(new_project.id)
if collaborators:
for collaborator in collaborators:
new_collaborator = db.Collaborator(
project_id=new_project.id,
user_id=collaborator['userId'],
permission=collaborator['permission'])
db.add_collaborator(new_collaborator)
return generate_response(db.project_schema.jsonify(new_project), OK_STATUS_CODE)
@app.route('/api/duplicate_project/<project_id>', methods=['POST'])
@jwt_required
def duplicate_project(project_id):
title = request.json['title']
creatorId = request.json['creatorId']
if (not title or not creatorId):
return generate_response(jsonify({
'success': False,
'message': 'Bad request.'
}), BAD_REQUEST_STATUS_CODE)
elif (not db.is_user_id(creatorId)):
return generate_response(jsonify({
'success': False,
'message': 'User does not exist'
}), BAD_REQUEST_STATUS_CODE)
else:
project = db.get_project(project_id)
duplicated_project = db.Project(
title,
creatorId,
archived=project.archived
)
db.add_project(duplicated_project)
collaborators = db.get_project_collaborators(project_id)
if collaborators:
for collaborator in collaborators:
duplicated_collaborator = db.Collaborator(
project_id=duplicated_project.id,
user_id=collaborator['userId'],
permission=collaborator['permission'])
db.add_collaborator(duplicated_collaborator)
files = db.get_project_files(project_id)
if files:
for file in files:
duplicated_file = db.File(
project_id=duplicated_project.id,
name=file['name'],
is_folder=file['isFolder'],
parent=file['parent'],
content=file['content'])
db.add_file(duplicated_file)
return generate_response(db.project_schema.jsonify(duplicated_project), OK_STATUS_CODE)
@app.route('/api/get_projects', methods=['GET'])
@jwt_required
def get_projects_request():
return generate_response(db.projects_schema.jsonify(db.get_projects()),
OK_STATUS_CODE)
@app.route('/api/get_project/<id>', methods=['GET'])
@jwt_required
def get_project_request(id):
return generate_response(db.project_schema.jsonify(db.get_project(id)),
OK_STATUS_CODE)
@app.route('/api/update_project/<id>', methods=['POST', 'PUT'])
@jwt_required
def update_project_request(id):
project = json.loads(request.json['project'])
updated_project = db.Project(
title = project['title'],
creator_id = project['creatorId'],
archived = project['archived'])
updated_project.id = project['id']
return generate_response(db.project_schema.jsonify(
db.update_project(updated_project)), OK_STATUS_CODE)
@app.route('/api/delete_project/<id>', methods=['DELETE'])
@jwt_required
def delete_project_request(id):
project = json.loads(request.json['project'])
# TODO: Check that the user trying to delete the project is the creator!
db.delete_project(project['id'])
return generate_response(db.project_schema.jsonify({
'success': True,
'message': 'Successfully deleted project'
}), OK_STATUS_CODE)
### FILE ###
@app.route('/api/create_file', methods=['POST'])
@jwt_required
def create_file_request():
file = request.json['file']
new_file = db.File(
file['projectId'],
file['name'],
file['isFolder'],
file['parent'],
file['content'],
)
return generate_response(db.file_schema.jsonify(
db.add_file(new_file)), OK_STATUS_CODE)
@app.route('/api/update_file/<file_id>', methods=['PUT'])
@jwt_required
def update_file_request(file_id):
updated_file = request.json['file']
return generate_response(db.file_schema.jsonify(
db.update_file(file)), OK_STATUS_CODE)
@app.route('/api/update_files', methods=['PUT'])
@jwt_required
def update_files_request():
updated_files = request.json['files']
return generate_response(db.files_schema.jsonify(
db.update_files(updated_files)), OK_STATUS_CODE)
@app.route('/api/delete_file/<file_id>', methods=['DELETE'])
@jwt_required
def delete_file_request(file_id):
return generate_response(db.file_schema.jsonify(
db.delete_file(file_id)), OK_STATUS_CODE)
@app.route('/api/get_project_files/<project_id>', methods=['GET'])
@jwt_required
def get_project_files_request(project_id):
return generate_response(db.files_schema.jsonify(
db.get_project_files(project_id)), OK_STATUS_CODE)
### COLLABORATOR ###
@app.route('/api/get_project_collaborators/<project_id>', methods=['GET'])
@jwt_required
def get_project_collaborators_request(project_id):
return generate_response(db.collaborators_schema.jsonify(
db.get_project_collaborators(project_id)), OK_STATUS_CODE)
### MESSAGE ###
@app.route('/api/get_project_messages/<project_id>', methods=['GET'])
@jwt_required
def get_project_messages_request(project_id):
return generate_response(db.messages_schema.jsonify(
db.get_project_messages(project_id)), OK_STATUS_CODE)
if __name__ == '__main__':
#app.run(debug=True)
""" If you deleted the db, run the following to re-initialize it. """
db.db.create_all()
""" CAUTION! Uncommenting the following row will delete all users in the database. """
# print('Deleted ' + str(db.delete_all_users()) + ' users.')