Newer
Older
from app.apis import check_jwt, item_response, text_response
from app.core.codes import verify_code
from app.core.dto import AuthDTO, CodeDTO
from app.core.parsers import create_user_parser, login_code_parser, login_parser
from flask_jwt_extended import (
create_access_token,
create_refresh_token,
get_jwt_identity,
get_raw_jwt,
jwt_refresh_token_required,
jwt_required,
)
schema = AuthDTO.schema
list_schema = AuthDTO.list_schema
def get_user_claims(item_user):
return {"role": item_user.role.name, "city_id": item_user.city_id}
@api.route("/signup")
class AuthSignup(Resource):
def post(self):
args = create_user_parser.parse_args(strict=True)
email = args.get("email")
api.abort(codes.BAD_REQUEST, "User already exists")
return item_response(schema.dump(item_user))
@api.route("/delete/<ID>")
@api.param("ID")
class AuthDelete(Resource):
def delete(self, ID):
jti = get_raw_jwt()["jti"]
dbc.add.blacklist(jti)
return text_response(f"User {ID} deleted")
@api.route("/login")
class AuthLogin(Resource):
def post(self):
args = login_parser.parse_args(strict=True)
email = args.get("email")
password = args.get("password")
item_user = dbc.get.user_by_email(email, required=False)
if not item_user or not item_user.is_correct_password(password):
api.abort(codes.UNAUTHORIZED, "Invalid email or password")
access_token = create_access_token(item_user.id, user_claims=get_user_claims(item_user))
refresh_token = create_refresh_token(item_user.id)
response = {"id": item_user.id, "access_token": access_token, "refresh_token": refresh_token}
return response
@api.route("/login/code")
class AuthLoginCode(Resource):
def post(self):
args = login_code_parser.parse_args()
code = args["code"]
if not verify_code(code):
api.abort(codes.BAD_REQUEST, "Invalid code")
item_code = dbc.get.code_by_code(code, True, "A presentation with that code does not exist")
return item_response(CodeDTO.schema.dump(item_code)), codes.OK
@api.route("/logout")
class AuthLogout(Resource):
def post(self):
jti = get_raw_jwt()["jti"]
dbc.add.blacklist(jti)
@api.route("/refresh")
class AuthRefresh(Resource):
@jwt_refresh_token_required
def post(self):
old_jti = get_raw_jwt()["jti"]
access_token = create_access_token(item_user.id, user_claims=get_user_claims(item_user))
dbc.add.blacklist(old_jti)
response = {"access_token": access_token}
return response