Skip to content
Snippets Groups Projects
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
app.py 14.65 KiB
import os, sys
import random

from __init__ import app
import database as db

import logging
from flask import Flask, request, jsonify, Response, json
from flask_jwt_extended import JWTManager
from flask_jwt_extended import (create_access_token, create_refresh_token,
                                jwt_required, jwt_refresh_token_required, get_jwt_identity)
from flask_bcrypt import Bcrypt
from base64 import b64decode
from datetime import datetime

flask_bcrypt = Bcrypt(app)
jwt = JWTManager(app)

# Http status codes
OK_STATUS_CODE = 200
BAD_REQUEST_STATUS_CODE = 400
CONFLICT_STATUS_CODE = 409
UNAUTHORIZED_STATUS_CODE = 401

DEFAULT_EDITOR_SETTINGS = {'fontSize': 16, 'margin': 80, 'tabSize': 4}

def is_valid_credentials(usernameEmail, password):
    if '@' in usernameEmail:
        user = db.get_user_from_email(usernameEmail)
    else:
        user = db.get_user_from_username(usernameEmail)
    return user and flask_bcrypt.check_password_hash(user.password,
        b64decode(password).decode("utf-8"))

def generate_response(response, status, headers={}):
    response.status_code = status
    response.headers = {**{
        'Content-Type': 'application/json',
        'Access-Control-Expose-Headers' : 'X-Auth-Token',
        }, **headers}
    return response

def is_valid_password(password):
    return len(password) >= 8 and len(password) <= 100

def is_valid_collaborators(collaborators):
    if not collaborators:
        return True
    acceptedPermissions = ['View-only', 'May edit']
    for collaborator in collaborators:
        if (not collaborator or not db.is_user_id(collaborator['userId']) or
        not collaborator['permission'] in acceptedPermissions):
            return False
    return True

@jwt.unauthorized_loader
def unauthorized_response(callback):
    return jsonify({
        'ok': False,
        'message': 'Missing Authorization Header'
    }), 401

@app.route('/api/auth', methods=["GET"])
@jwt_required
def auth():
    return generate_response(jsonify({
        'success': True,
        'message': 'Authorization successsful'
        }), OK_STATUS_CODE)

@app.route('/api/refresh', methods=['POST'])
@jwt_refresh_token_required
def refresh():
    current_user = get_jwt_identity()
    return generate_response(jsonify({
        'success': True,
        'message': 'Successfully refreshed token',
        'jwt': create_access_token(identity=current_user, fresh=False)
        }), OK_STATUS_CODE)

### USER ###
@app.route('/api/sign_up', methods=['POST'])
def sign_up():
    email = request.json['email']    
    username = request.json['username']
    password = b64decode(request.json['password']).decode("utf-8")
    if (not email or not username or not password):
        return generate_response(jsonify({
            'success': False,
            'message': 'Bad request.'
        }), BAD_REQUEST_STATUS_CODE)
    elif (not db.is_unregistered_email(email)):
        return generate_response(jsonify({
            'success': False,
            'message': 'User with that email already exists'
        }), CONFLICT_STATUS_CODE)
    elif (not db.is_unregistered_username(username)):
        return generate_response(jsonify({
            'success': False,
            'message': 'User with that username already exists'
        }), CONFLICT_STATUS_CODE)
    elif (not is_valid_password(password)):
        return generate_response(jsonify({
            'success': False,
            'message': 'Password does not fulfill requirements'
        }), BAD_REQUEST_STATUS_CODE)
    else:
        new_user = db.User(
            email,
            username,
            flask_bcrypt.generate_password_hash(password).decode('utf-8'))
        db.add_user(new_user)

        new_editor_settings = db.EditorSettings(
            DEFAULT_EDITOR_SETTINGS['fontSize'],
            DEFAULT_EDITOR_SETTINGS['margin'],
            DEFAULT_EDITOR_SETTINGS['tabSize'])
        db.add_editor_settings(new_editor_settings)

        new_user.editorSettingsId = new_editor_settings.id
        db.update_user(new_user.id, new_user)
        return generate_response(db.user_schema.jsonify(new_user),
            OK_STATUS_CODE)

@app.route('/api/sign_in', methods=['GET'])
def sign_in():
    auth = request.authorization
    usernameEmail = auth.username
    password = auth.password

    if not usernameEmail or not password:
        message = 'Missing credentials!'
        status = BAD_REQUEST_STATUS_CODE
    elif not is_valid_credentials(usernameEmail, password):
        message = 'Incorrect username/email or password'
        status = UNAUTHORIZED_STATUS_CODE
    else:
        if '@' in usernameEmail:
            username = db.get_username_from_email(usernameEmail)
        else:
            username = usernameEmail

        db_user = db.get_user_from_username(username)
        user = {
            'id': db_user.id,
            'email': db_user.email,
            'username': db_user.username,
            'editorSettingsId': db_user.editorSettingsId,
            'editorSettings': db.editor_settings_schema.dumps(db_user.editorSettings),
            'jwt': create_access_token(identity=username),
            'refreshToken': create_refresh_token(identity=username)
        }
        return generate_response(jsonify(user), OK_STATUS_CODE)

    return generate_response(jsonify({
        'success': False,
        'message': message
        }), status)

@app.route('/api/change_password', methods=['POST', 'PUT'])
@jwt_required
def change_password():
    username = request.json['username']
    user = db.get_user_from_username(username)
    oldPassword = request.json['oldPassword']
    newPassword = request.json['newPassword']
    if (not user):
        return generate_response(jsonify({
            'success': False,
            'message': 'No user with that username'
        }), BAD_REQUEST_STATUS_CODE)
    elif not is_valid_credentials(username, oldPassword):
        return generate_response(jsonify({
            'success': False,
            'message': 'Old password was incorrect'
        }),UNAUTHORIZED_STATUS_CODE)
    elif (not is_valid_password(newPassword)):
        return generate_response(jsonify({
            'success': False,
            'message': 'New password does not fulfill requirements'
        }), BAD_REQUEST_STATUS_CODE)
    else:
        updated_user = db.change_password(
            username,
            flask_bcrypt.generate_password_hash(newPassword).decode('utf-8')
            )
        return generate_response(db.user_schema.jsonify(updated_user), OK_STATUS_CODE)

@app.route('/api/delete_account', methods=['DELETE'])
def delete_user():
    auth = request.authorization
    username = auth.username
    password = auth.password
    user = db.get_user_from_username(username)
    if not is_valid_credentials(username, password):
        return generate_response(jsonify({
            'success': False,
            'message': 'Password was incorrect'
        }),UNAUTHORIZED_STATUS_CODE)
    elif (not user):
        return generate_response(jsonify({
            'success': False,
            'message': 'No user with that username'
        }), BAD_REQUEST_STATUS_CODE)
    else:
        db.delete_user_projects(user.id)
        db.delete_user(user.id)
        return generate_response(jsonify({
            'success': True,
            'message': 'Successfully deleted account'
        }), OK_STATUS_CODE)

@app.route('/api/get_users', methods=['GET'])
@jwt_required
def get_users_request():
    return generate_response(db.users_schema.jsonify(db.get_users()),
        OK_STATUS_CODE)

@app.route('/api/get_user/<id>', methods=['GET'])
@jwt_required
def get_user_request(id):
    return generate_response(db.user_schema.jsonify(db.get_user(id)),
        OK_STATUS_CODE)

### EDITOR SETTINGS ###
@app.route('/api/update_editor_settings/<user_id>', methods=['PUT'])
@jwt_required
def update_editor_settings_request(user_id):
    editor_settings = request.json['editorSettings']
    updated_editor_settings = db.EditorSettings(
        editor_settings['fontSize'],
        editor_settings['margin'],
        editor_settings['tabSize'],
    )
    updated_editor_settings.id = editor_settings['id']
    return generate_response(db.editor_settings_schema.jsonify(
        db.update_editor_settings(updated_editor_settings)), OK_STATUS_CODE)

### PROJECT ###
@app.route('/api/add_project', methods=['POST'])
@jwt_required
def new_project():
    title = request.json['title']
    creatorId = request.json['creatorId']
    collaborators = request.json['collaborators'] if request.json['collaborators'] != [] else None
    if (not title or not creatorId):
        return generate_response(jsonify({
            'success': False,
            'message': 'Bad request.'
        }), BAD_REQUEST_STATUS_CODE)
    elif (not db.is_user_id(creatorId)):
        return generate_response(jsonify({
            'success': False,
            'message': 'User does not exist'
        }), BAD_REQUEST_STATUS_CODE)
    elif (not is_valid_collaborators(collaborators)):
        return generate_response(jsonify({
            'success': False,
            'message': 'Invalid collaborators'
        }), BAD_REQUEST_STATUS_CODE)
    else:
        new_project = db.Project(title, creatorId)
        db.add_project(new_project)
        app.logger.error(new_project.id)

        if collaborators:
            for collaborator in collaborators:
                new_collaborator = db.Collaborator(
                    project_id=new_project.id,
                    user_id=collaborator['userId'],
                    permission=collaborator['permission'])
                db.add_collaborator(new_collaborator)
        
        
        return generate_response(db.project_schema.jsonify(new_project), OK_STATUS_CODE)

@app.route('/api/duplicate_project/<project_id>', methods=['POST'])
@jwt_required
def duplicate_project(project_id):
    title = request.json['title']
    creatorId = request.json['creatorId']
    if (not title or not creatorId):
        return generate_response(jsonify({
            'success': False,
            'message': 'Bad request.'
        }), BAD_REQUEST_STATUS_CODE)
    elif (not db.is_user_id(creatorId)):
        return generate_response(jsonify({
            'success': False,
            'message': 'User does not exist'
        }), BAD_REQUEST_STATUS_CODE)
    else:
        project = db.get_project(project_id)
        duplicated_project = db.Project(
            title,
            creatorId,
            archived=project.archived
        )
        db.add_project(duplicated_project)

        collaborators = db.get_project_collaborators(project_id)
        if collaborators:
            for collaborator in collaborators:
                duplicated_collaborator = db.Collaborator(
                    project_id=duplicated_project.id,
                    user_id=collaborator['userId'],
                    permission=collaborator['permission'])
                db.add_collaborator(duplicated_collaborator)

        files = db.get_project_files(project_id)
        if files:
            for file in files:
                duplicated_file = db.File(
                    project_id=duplicated_project.id,
                    name=file['name'],
                    is_folder=file['isFolder'],
                    parent=file['parent'],
                    content=file['content'])
                db.add_file(duplicated_file)

        return generate_response(db.project_schema.jsonify(duplicated_project), OK_STATUS_CODE)

@app.route('/api/get_projects', methods=['GET'])
@jwt_required
def get_projects_request():
    return generate_response(db.projects_schema.jsonify(db.get_projects()),
        OK_STATUS_CODE)

@app.route('/api/get_project/<id>', methods=['GET'])
@jwt_required
def get_project_request(id):
    return generate_response(db.project_schema.jsonify(db.get_project(id)),
        OK_STATUS_CODE)

@app.route('/api/update_project/<id>', methods=['POST', 'PUT'])
@jwt_required
def update_project_request(id):
    project = json.loads(request.json['project'])
    updated_project = db.Project(
        title = project['title'],
        creator_id = project['creatorId'],
        archived = project['archived'])
    updated_project.id = project['id']

    return generate_response(db.project_schema.jsonify(
        db.update_project(updated_project)), OK_STATUS_CODE)

@app.route('/api/delete_project/<id>', methods=['DELETE'])
@jwt_required
def delete_project_request(id):
    project = json.loads(request.json['project'])
    # TODO: Check that the user trying to delete the project is the creator!
    db.delete_project(project['id'])
    return generate_response(db.project_schema.jsonify({
            'success': True,
            'message': 'Successfully deleted project'
        }), OK_STATUS_CODE)

### FILE ###
@app.route('/api/create_file', methods=['POST'])
@jwt_required
def create_file_request():
    file = request.json['file']
    new_file = db.File(
        file['projectId'],
        file['name'],
        file['isFolder'],
        file['parent'],
        file['content'],
    )
    return generate_response(db.file_schema.jsonify(
        db.add_file(new_file)), OK_STATUS_CODE)


@app.route('/api/update_file/<file_id>', methods=['PUT'])
@jwt_required
def update_file_request(file_id):
    updated_file = request.json['file']
    return generate_response(db.file_schema.jsonify(
        db.update_file(file)), OK_STATUS_CODE)

@app.route('/api/update_files', methods=['PUT'])
@jwt_required
def update_files_request():
    updated_files = request.json['files']
    return generate_response(db.files_schema.jsonify(
        db.update_files(updated_files)), OK_STATUS_CODE)

@app.route('/api/delete_file/<file_id>', methods=['DELETE'])
@jwt_required
def delete_file_request(file_id):
    return generate_response(db.file_schema.jsonify(
        db.delete_file(file_id)), OK_STATUS_CODE)

@app.route('/api/get_project_files/<project_id>', methods=['GET'])
@jwt_required
def get_project_files_request(project_id):
    return generate_response(db.files_schema.jsonify(
        db.get_project_files(project_id)), OK_STATUS_CODE)

### COLLABORATOR ###
@app.route('/api/get_project_collaborators/<project_id>', methods=['GET'])
@jwt_required
def get_project_collaborators_request(project_id):
    return generate_response(db.collaborators_schema.jsonify(
        db.get_project_collaborators(project_id)), OK_STATUS_CODE)

### MESSAGE ###
@app.route('/api/get_project_messages/<project_id>', methods=['GET'])
@jwt_required
def get_project_messages_request(project_id):
    return generate_response(db.messages_schema.jsonify(
        db.get_project_messages(project_id)), OK_STATUS_CODE)

if __name__ == '__main__':
    #app.run(debug=True)

    """ If you deleted the db, run the following to re-initialize it. """
    db.db.create_all()

    """ CAUTION! Uncommenting the following row will delete all users in the database. """
    # print('Deleted ' + str(db.delete_all_users()) + ' users.')