-
Jennifer Lindgren authored
Backend: Create and update project files locally when someone is viewing a project to allow for faster syntax validation.
Jennifer Lindgren authoredBackend: Create and update project files locally when someone is viewing a project to allow for faster syntax validation.
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
app.py 20.85 KiB
import os, sys, subprocess, shutil
import random
import logging
from subprocess import Popen, PIPE
from datetime import datetime
from flask import Flask, request, jsonify, Response, json
from flask_jwt_extended import JWTManager
from flask_jwt_extended import (create_access_token, create_refresh_token,
jwt_required, jwt_refresh_token_required, get_jwt_identity)
from flask_bcrypt import Bcrypt
from flask_socketio import emit
from base64 import b64decode
from simpleflock import SimpleFlock
from __init__ import app
import database_helper as db
from socketio_helper import socketio, SIMPLE_FLOCK_TIMEOUT
flask_bcrypt = Bcrypt(app)
jwt = JWTManager(app)
# Http status codes
OK_STATUS_CODE = 200
BAD_REQUEST_STATUS_CODE = 400
CONFLICT_STATUS_CODE = 409
UNAUTHORIZED_STATUS_CODE = 401
DEFAULT_EDITOR_SETTINGS = {'fontSize': 16, 'margin': 80, 'tabSize': 4}
### GENERAL ###
def generate_response(response, status, headers={}):
response.status_code = status
response.headers = {**{
'Content-Type': 'application/json',
'Access-Control-Expose-Headers': 'X-Auth-Token',
'Access-Control-Allow-Origin': '*'
}, **headers}
return response
### Authentication ###
def is_valid_credentials(usernameEmail, password):
if '@' in usernameEmail:
user = db.get_user_from_email(usernameEmail)
else:
user = db.get_user_from_username(usernameEmail)
return user and flask_bcrypt.check_password_hash(user.password,
b64decode(password).decode("utf-8"))
def is_valid_password(password):
return len(password) >= 8 and len(password) <= 100
def is_valid_collaborators(collaborators):
if not collaborators:
return True
acceptedPermissions = ['View-only', 'May edit']
for collaborator in collaborators:
if (not collaborator or not db.is_user_id(collaborator['userId']) or
not collaborator['permission'] in acceptedPermissions):
return False
return True
@jwt.unauthorized_loader
def unauthorized_response(callback):
return jsonify({
'ok': False,
'message': 'Missing Authorization Header'
}), 401
@app.route('/api/auth', methods=["GET"])
@jwt_required
def auth():
return generate_response(jsonify({
'success': True,
'message': 'Authorization successsful'
}), OK_STATUS_CODE)
@app.route('/api/refresh', methods=['POST'])
@jwt_refresh_token_required
def refresh():
current_user = get_jwt_identity()
return generate_response(jsonify({
'success': True,
'message': 'Successfully refreshed token',
'jwt': create_access_token(identity=current_user, fresh=False)
}), OK_STATUS_CODE)
### USER ###
@app.route('/api/sign_up', methods=['POST'])
def sign_up():
email = request.json['email']
username = request.json['username']
password = b64decode(request.json['password']).decode("utf-8")
if (not email or not username or not password):
return generate_response(jsonify({
'success': False,
'message': 'Bad request.'
}), BAD_REQUEST_STATUS_CODE)
elif (not db.is_unregistered_email(email.lower())):
return generate_response(jsonify({
'success': False,
'message': 'User with that email already exists'
}), CONFLICT_STATUS_CODE)
elif (not db.is_unregistered_username(username.lower())):
return generate_response(jsonify({
'success': False,
'message': 'User with that username already exists'
}), CONFLICT_STATUS_CODE)
elif (not is_valid_password(password)):
return generate_response(jsonify({
'success': False,
'message': 'Password does not fulfill requirements'
}), BAD_REQUEST_STATUS_CODE)
else:
new_user = db.User(
email.lower(),
username.lower(),
flask_bcrypt.generate_password_hash(password).decode('utf-8'))
db.add_user(new_user)
new_editor_settings = db.EditorSettings(
DEFAULT_EDITOR_SETTINGS['fontSize'],
DEFAULT_EDITOR_SETTINGS['margin'],
DEFAULT_EDITOR_SETTINGS['tabSize'])
db.add_editor_settings(new_editor_settings)
new_user.editorSettingsId = new_editor_settings.id
db.update_user(new_user.id, new_user)
return generate_response(db.user_schema.jsonify(new_user),
OK_STATUS_CODE)
@app.route('/api/sign_in', methods=['GET'])
def sign_in():
auth = request.authorization
usernameEmail = auth.username
password = auth.password
if not usernameEmail or not password:
message = 'Missing credentials!'
status = BAD_REQUEST_STATUS_CODE
elif not is_valid_credentials(usernameEmail.lower(), password):
message = 'Incorrect username/email or password'
status = UNAUTHORIZED_STATUS_CODE
else:
if '@' in usernameEmail:
username = db.get_username_from_email(usernameEmail.lower())
else:
username = usernameEmail
db_user = db.get_user_from_username(username.lower())
user = {
'id': db_user.id,
'email': db_user.email,
'username': db_user.username,
'editorSettingsId': db_user.editorSettingsId,
'editorSettings': db.editor_settings_schema.dumps(db_user.editorSettings),
'jwt': create_access_token(identity=username),
'refreshToken': create_refresh_token(identity=username)
}
return generate_response(jsonify(user), OK_STATUS_CODE)
return generate_response(jsonify({
'success': False,
'message': message
}), status)
@app.route('/api/change_password', methods=['POST', 'PUT'])
@jwt_required
def change_password():
username = request.json['username']
user = db.get_user_from_username(username)
oldPassword = request.json['oldPassword']
newPassword = request.json['newPassword']
if (not user):
return generate_response(jsonify({
'success': False,
'message': 'No user with that username'
}), BAD_REQUEST_STATUS_CODE)
elif not is_valid_credentials(username, oldPassword):
return generate_response(jsonify({
'success': False,
'message': 'Old password was incorrect'
}),UNAUTHORIZED_STATUS_CODE)
elif (not is_valid_password(newPassword)):
return generate_response(jsonify({
'success': False,
'message': 'New password does not fulfill requirements'
}), BAD_REQUEST_STATUS_CODE)
else:
updated_user = db.change_password(
username,
flask_bcrypt.generate_password_hash(newPassword).decode('utf-8')
)
return generate_response(db.user_schema.jsonify(updated_user), OK_STATUS_CODE)
@app.route('/api/delete_account', methods=['DELETE'])
def delete_user():
auth = request.authorization
username = auth.username
password = auth.password
user = db.get_user_from_username(username)
if not is_valid_credentials(username, password):
return generate_response(jsonify({
'success': False,
'message': 'Password was incorrect'
}),UNAUTHORIZED_STATUS_CODE)
elif (not user):
return generate_response(jsonify({
'success': False,
'message': 'No user with that username'
}), BAD_REQUEST_STATUS_CODE)
else:
db.delete_user_projects(user.id)
db.delete_editor_settings(user.editorSettingsId)
db.delete_user(user.id)
return generate_response(jsonify({
'success': True,
'message': 'Successfully deleted account'
}), OK_STATUS_CODE)
@app.route('/api/get_users', methods=['GET'])
@jwt_required
def get_users_request():
return generate_response(db.users_schema.jsonify(db.get_users()),
OK_STATUS_CODE)
@app.route('/api/get_user/<id>', methods=['GET'])
@jwt_required
def get_user_request(id):
return generate_response(db.user_schema.jsonify(db.get_user(id)),
OK_STATUS_CODE)
### EDITOR SETTINGS ###
@app.route('/api/update_editor_settings/<user_id>', methods=['PUT'])
@jwt_required
def update_editor_settings_request(user_id):
editor_settings = request.json['editorSettings']
updated_editor_settings = db.EditorSettings(
editor_settings['fontSize'],
editor_settings['margin'],
editor_settings['tabSize'],
)
updated_editor_settings.id = editor_settings['id']
return generate_response(db.editor_settings_schema.jsonify(
db.update_editor_settings(updated_editor_settings)), OK_STATUS_CODE)
### PROJECT ###
@app.route('/api/add_project', methods=['POST'])
@jwt_required
def new_project():
title = request.json['title']
creatorId = request.json['creatorId']
collaborators = request.json['collaborators'] if request.json['collaborators'] != [] else None
if (not title or not creatorId):
return generate_response(jsonify({
'success': False,
'message': 'Bad request.'
}), BAD_REQUEST_STATUS_CODE)
elif (not db.is_user_id(creatorId)):
return generate_response(jsonify({
'success': False,
'message': 'User does not exist'
}), BAD_REQUEST_STATUS_CODE)
elif (not is_valid_collaborators(collaborators)):
return generate_response(jsonify({
'success': False,
'message': 'Invalid collaborators'
}), BAD_REQUEST_STATUS_CODE)
else:
new_project = db.Project(title, creatorId)
db.add_project(new_project)
if collaborators:
for collaborator in collaborators:
new_collaborator = db.Collaborator(
project_id=new_project.id,
user_id=collaborator['userId'],
permission=collaborator['permission'])
db.add_collaborator(new_collaborator)
return generate_response(db.project_schema.jsonify(new_project), OK_STATUS_CODE)
@app.route('/api/duplicate_project/<project_id>', methods=['POST'])
@jwt_required
def duplicate_project(project_id):
title = request.json['title']
creator_id = request.json['creatorId']
if (not title or not creator_id):
return generate_response(jsonify({
'success': False,
'message': 'Bad request.'
}), BAD_REQUEST_STATUS_CODE)
elif (not db.is_user_id(creator_id)):
return generate_response(jsonify({
'success': False,
'message': 'User does not exist'
}), BAD_REQUEST_STATUS_CODE)
else:
old_project = db.get_project(project_id)
duplicated_project = db.Project(
title,
creator_id,
archived=old_project.archived
)
db.add_project(duplicated_project)
collaborators = db.get_project_collaborators(project_id)
if collaborators:
for collaborator in collaborators:
if collaborator['userId'] != creator_id:
duplicated_collaborator = db.Collaborator(
project_id=duplicated_project.id,
user_id=collaborator['userId'],
permission=collaborator['permission'])
db.add_collaborator(duplicated_collaborator)
if old_project.creatorId != creator_id:
old_creator_collaborator = db.Collaborator(
project_id=duplicated_project.id,
user_id=old_project.creatorId,
permission='May edit')
db.add_collaborator(old_creator_collaborator)
files = db.get_project_files(project_id)
if files:
for file in files:
duplicated_file = db.File(
project_id=duplicated_project.id,
name=file['name'],
is_folder=file['isFolder'],
parent=file['parent'],
content=file['content'])
db.add_file(duplicated_file)
return generate_response(db.project_schema.jsonify(duplicated_project), OK_STATUS_CODE)
@app.route('/api/get_projects', methods=['GET'])
@jwt_required
def get_projects_request():
return generate_response(db.projects_schema.jsonify(db.get_projects()),
OK_STATUS_CODE)
@app.route('/api/get_projects/<user_id>', methods=['GET'])
@jwt_required
def get_user_projects_request(user_id):
return generate_response(db.projects_schema.jsonify(db.get_user_projects(user_id)),
OK_STATUS_CODE)
@app.route('/api/get_project/<id>', methods=['GET'])
@jwt_required
def get_project_request(id):
return generate_response(db.project_schema.jsonify(db.get_project(id)),
OK_STATUS_CODE)
@app.route('/api/update_project/<id>', methods=['POST', 'PUT'])
@jwt_required
def update_project_request(id):
project = json.loads(request.json['project'])
updated_project = db.Project(
title = project['title'],
creator_id = project['creatorId'],
archived = project['archived'])
updated_project.id = project['id']
return generate_response(db.project_schema.jsonify(
db.update_project(updated_project)), OK_STATUS_CODE)
@app.route('/api/set_archived_project/<project_id>', methods=['POST', 'PUT'])
@jwt_required
def set_archived_project_request(project_id):
project = db.get_project(project_id)
project.archived = request.json['archived']
return generate_response(db.project_schema.jsonify(
db.update_project(project)), OK_STATUS_CODE)
@app.route('/api/delete_project/<project_id>', methods=['DELETE'])
@jwt_required
def delete_project_request(project_id):
# TODO: Check that the user trying to delete the project is the creator!
db.delete_project_files(project_id)
db.delete_project_collaborators(project_id)
db.delete_project_messages(project_id)
db.delete_project(project_id)
return generate_response(jsonify({
'success': True,
'message': 'Successfully deleted project'
}), OK_STATUS_CODE)
@app.route('/api/parse_project_files/<project_id>', methods=['GET'])
@jwt_required
def parse_file_request(project_id):
if db.get_project(project_id):
if os.path.exists(str(project_id)):
with SimpleFlock(str(project_id) + 'lock', SIMPLE_FLOCK_TIMEOUT):
find_cmd = 'find ' + str(project_id) + '/ -type f -name "*.py"'
find = Popen(find_cmd, shell=True, stdout=PIPE)
files = find.communicate()[0].decode('utf-8')
files = ' '.join(files.splitlines())
pylint_cmd = 'pylint --output-format=json ' + files
pylint = Popen(pylint_cmd, shell=True, stdout=PIPE)
result = pylint.communicate()[0].decode('utf-8')
else:
result = '[]'
return generate_response(jsonify({
'success': True,
'message': 'Successfully parsed files.',
'result': result
}), OK_STATUS_CODE)
else:
return generate_response(jsonify({
'success': False,
'message': 'No such project'
}), BAD_REQUEST_STATUS_CODE)
### FILE ###
@app.route('/api/create_file', methods=['POST'])
@jwt_required
def create_file_request():
file = request.json['file']
new_file = db.File(
file['projectId'],
file['name'],
file['isFolder'],
file['parent'],
file['content'],
)
db.add_file(new_file)
# Update edited date in project
project = db.get_project(new_file.projectId)
project.edited = datetime.now()
db.update_project(project)
# Add file locally on server if project is open
open_project_path = str(new_file.projectId) + '/'
if os.path.exists(open_project_path):
if new_file.parent:
file_path = open_project_path + new_file.parent + '/' + new_file.name
else:
file_path = open_project_path + new_file.name
if new_file.isFolder:
os.makedirs(file_path)
else:
f = open(file_path, 'w+')
f.write(new_file.content)
f.close()
return generate_response(db.file_schema.jsonify(
new_file), OK_STATUS_CODE)
@app.route('/api/update_file/<file_id>', methods=['PUT'])
@jwt_required
def update_file_request(file_id):
file = request.json['file']
old_file = db.get_file(file['id'])
updated_file = db.File(
file['projectId'],
file['name'],
file['isFolder'],
file['parent'],
file['content'])
updated_file.id = file['id']
# Update edited date in project
project = db.get_project(updated_file.projectId)
project.edited = datetime.now()
db.update_project(project)
# Update file locally on server if project is open
open_project_path = str(project.id) + '/'
if old_file.parent:
old_path = open_project_path + old_file.parent + '/' + old_file.name
else:
old_path = open_project_path + old_file.name
if os.path.exists(old_path):
with SimpleFlock(str(project.id) + 'lock', SIMPLE_FLOCK_TIMEOUT):
if os.path.exists(old_path):
if file['parent']:
new_path = open_project_path + file['parent'] + '/' + file['name']
else:
new_path = open_project_path + file['name']
os.rename(old_path, new_path)
return generate_response(db.file_schema.jsonify(
db.update_file(updated_file)), OK_STATUS_CODE)
@app.route('/api/delete_file/<file_id>', methods=['DELETE'])
@jwt_required
def delete_file_request(file_id):
# Update edited date in project
file = db.get_file(file_id)
project = db.get_project(file.projectId)
project.edited = datetime.now()
db.update_project(project)
# Remove file locally on server if project is open
open_project_path = str(file.projectId) + '/'
if file.parent:
path = open_project_path + file.parent + '/' + file.name
else:
path = open_project_path + file.name
if os.path.exists(path):
with SimpleFlock(str(file.projectId) + 'lock', SIMPLE_FLOCK_TIMEOUT):
if os.path.exists(path):
if file.isFolder:
shutil.rmtree(path)
else:
os.remove(path)
return generate_response(db.file_schema.jsonify(
db.delete_file(file_id)), OK_STATUS_CODE)
@app.route('/api/get_project_files/<project_id>', methods=['GET'])
@jwt_required
def get_project_files_request(project_id):
return generate_response(db.files_schema.jsonify(
db.get_project_files(project_id)), OK_STATUS_CODE)
### COLLABORATOR ###
@app.route('/api/add_collaborator', methods=['POST'])
@jwt_required
def add_collaborator_request():
collaborator = request.json['collaborator']
new_collaborator = db.Collaborator(
collaborator['projectId'],
collaborator['userId'],
collaborator['permission'],
)
db.add_collaborator(new_collaborator)
# Update edited date in project
project = db.get_project(new_collaborator.projectId)
project.edited = datetime.now()
db.update_project(project)
return generate_response(db.collaborator_schema.jsonify(
new_collaborator), OK_STATUS_CODE)
@app.route('/api/update_collaborator/<collaborator_id>', methods=['PUT'])
@jwt_required
def update_collaborator_request(collaborator_id):
# Update collaborator
collaborator = request.json['collaborator']
updated_collaborator = db.Collaborator(
collaborator['projectId'],
collaborator['userId'],
collaborator['permission']
)
updated_collaborator.id = collaborator['id']
# Update edited date in project
project = db.get_project(updated_collaborator.projectId)
project.edited = datetime.now()
db.update_project(project)
return generate_response(db.collaborator_schema.jsonify(
db.update_collaborator(updated_collaborator)), OK_STATUS_CODE)
@app.route('/api/delete_collaborator/<collaborator_id>', methods=['DELETE'])
@jwt_required
def delete_collaborator_request(collaborator_id):
# Update edited date in project
collaborator = db.get_collaborator(collaborator_id)
project = db.get_project(collaborator.projectId)
project.edited = datetime.now()
db.update_project(project)
return generate_response(db.collaborator_schema.jsonify(
db.delete_collaborator(collaborator_id)), OK_STATUS_CODE)
@app.route('/api/get_project_collaborators/<project_id>', methods=['GET'])
@jwt_required
def get_project_collaborators_request(project_id):
return generate_response(db.collaborators_schema.jsonify(
db.get_project_collaborators(project_id)), OK_STATUS_CODE)
### MESSAGE ###
@app.route('/api/get_project_messages/<project_id>', methods=['GET'])
@jwt_required
def get_project_messages_request(project_id):
return generate_response(db.messages_schema.jsonify(
db.get_project_messages(project_id)), OK_STATUS_CODE)
if __name__ == '__main__':
socketio.run(app, host='192.168.1.123')
""" If you deleted the db, run the following to re-initialize it. """
# db.db.create_all()