Skip to content
Snippets Groups Projects
auth.py 3.18 KiB
Newer Older
  • Learn to ignore specific revisions
  • import app.core.http_codes as codes
    
    import app.database.controller as dbc
    
    from app.apis import check_jwt, item_response, text_response
    
    from app.core.codes import verify_code
    from app.core.dto import AuthDTO, CodeDTO
    
    from app.core.parsers import create_user_parser, login_code_parser, login_parser
    
    from app.database.models import User
    
    from flask_jwt_extended import (
        create_access_token,
        create_refresh_token,
        get_jwt_identity,
        get_raw_jwt,
        jwt_refresh_token_required,
        jwt_required,
    )
    
    Carl Schönfelder's avatar
    Carl Schönfelder committed
    from flask_restx import Namespace, Resource, cors
    
    api = AuthDTO.api
    
    schema = AuthDTO.schema
    list_schema = AuthDTO.list_schema
    
        return {"role": item_user.role.name, "city_id": item_user.city_id}
    
    
    
    @api.route("/signup")
    class AuthSignup(Resource):
    
        @check_jwt(editor=False)
    
        def post(self):
            args = create_user_parser.parse_args(strict=True)
            email = args.get("email")
    
    
            if dbc.get.user_exists(email):
    
                api.abort(codes.BAD_REQUEST, "User already exists")
    
    
            item_user = dbc.add.user(**args)
    
            return item_response(schema.dump(item_user))
    
    
    
    @api.route("/delete/<ID>")
    @api.param("ID")
    class AuthDelete(Resource):
    
        @check_jwt(editor=False)
    
            item_user = dbc.get.user(ID)
    
            dbc.delete.default(item_user)
    
            if int(ID) == get_jwt_identity():
    
                jti = get_raw_jwt()["jti"]
                dbc.add.blacklist(jti)
    
            return text_response(f"User {ID} deleted")
    
    
    
    @api.route("/login")
    class AuthLogin(Resource):
        def post(self):
            args = login_parser.parse_args(strict=True)
            email = args.get("email")
            password = args.get("password")
    
            item_user = dbc.get.user_by_email(email, required=False)
    
    
            if not item_user or not item_user.is_correct_password(password):
                api.abort(codes.UNAUTHORIZED, "Invalid email or password")
    
            access_token = create_access_token(item_user.id, user_claims=get_user_claims(item_user))
            refresh_token = create_refresh_token(item_user.id)
    
            response = {"id": item_user.id, "access_token": access_token, "refresh_token": refresh_token}
            return response
    
    
    
    @api.route("/login/code")
    class AuthLoginCode(Resource):
        def post(self):
            args = login_code_parser.parse_args()
            code = args["code"]
    
    
            if not verify_code(code):
                api.abort(codes.BAD_REQUEST, "Invalid code")
    
    
            item_code = dbc.get.code_by_code(code, True, "A presentation with that code does not exist")
    
            return item_response(CodeDTO.schema.dump(item_code)), codes.OK
    
    
    
    @api.route("/logout")
    class AuthLogout(Resource):
    
        @check_jwt(editor=True)
    
        def post(self):
            jti = get_raw_jwt()["jti"]
            dbc.add.blacklist(jti)
    
            return text_response("User logout")
    
    
    
    @api.route("/refresh")
    class AuthRefresh(Resource):
    
        @check_jwt(editor=True)
    
        @jwt_refresh_token_required
        def post(self):
            old_jti = get_raw_jwt()["jti"]
    
    
            item_user = dbc.get.user(get_jwt_identity())
    
            access_token = create_access_token(item_user.id, user_claims=get_user_claims(item_user))
            dbc.add.blacklist(old_jti)
            response = {"access_token": access_token}
            return response