"""User management API routes.""" from flask import Blueprint, request from flask_jwt_extended import jwt_required, current_user from werkzeug.security import generate_password_hash from shopdb.extensions import db from shopdb.core.models import User, Role, Permission, AuditLog from shopdb.utils.responses import success_response, error_response, ErrorCodes users_bp = Blueprint('users', __name__) @users_bp.route('', methods=['GET']) @jwt_required() def list_users(): """List all users.""" if not current_user.hasrole('admin'): return error_response(ErrorCodes.FORBIDDEN, 'Admin access required', http_code=403) users = User.query.order_by(User.username).all() return success_response([user_to_dict(u) for u in users]) @users_bp.route('/', methods=['GET']) @jwt_required() def get_user(userid: int): """Get a single user.""" if not current_user.hasrole('admin') and current_user.userid != userid: return error_response(ErrorCodes.FORBIDDEN, 'Access denied', http_code=403) user = User.query.get(userid) if not user: return error_response(ErrorCodes.NOT_FOUND, 'User not found', http_code=404) return success_response(user_to_dict(user)) @users_bp.route('', methods=['POST']) @jwt_required() def create_user(): """Create a new user.""" if not current_user.hasrole('admin'): return error_response(ErrorCodes.FORBIDDEN, 'Admin access required', http_code=403) data = request.get_json() if not data: return error_response(ErrorCodes.VALIDATION_ERROR, 'Request body required') # Validate required fields if not data.get('username'): return error_response(ErrorCodes.VALIDATION_ERROR, 'Username is required') if not data.get('email'): return error_response(ErrorCodes.VALIDATION_ERROR, 'Email is required') if not data.get('password'): return error_response(ErrorCodes.VALIDATION_ERROR, 'Password is required') # Check uniqueness if User.query.filter_by(username=data['username']).first(): return error_response(ErrorCodes.CONFLICT, 'Username already exists', http_code=409) if User.query.filter_by(email=data['email']).first(): return error_response(ErrorCodes.CONFLICT, 'Email already exists', http_code=409) user = User( username=data['username'], email=data['email'], passwordhash=generate_password_hash(data['password']), firstname=data.get('firstname'), lastname=data.get('lastname'), isactive=data.get('isactive', True) ) # Assign roles role_ids = data.get('roles', []) if role_ids: roles = Role.query.filter(Role.roleid.in_(role_ids)).all() user.roles = roles db.session.add(user) # Audit log AuditLog.log('created', 'User', entityname=user.username) db.session.commit() return success_response(user_to_dict(user), message='User created', http_code=201) @users_bp.route('/', methods=['PUT']) @jwt_required() def update_user(userid: int): """Update a user.""" if not current_user.hasrole('admin') and current_user.userid != userid: return error_response(ErrorCodes.FORBIDDEN, 'Access denied', http_code=403) user = User.query.get(userid) if not user: return error_response(ErrorCodes.NOT_FOUND, 'User not found', http_code=404) data = request.get_json() if not data: return error_response(ErrorCodes.VALIDATION_ERROR, 'Request body required') changes = {} # Update fields if 'email' in data and data['email'] != user.email: if User.query.filter(User.email == data['email'], User.userid != userid).first(): return error_response(ErrorCodes.CONFLICT, 'Email already in use', http_code=409) changes['email'] = {'old': user.email, 'new': data['email']} user.email = data['email'] if 'firstname' in data: if data['firstname'] != user.firstname: changes['firstname'] = {'old': user.firstname, 'new': data['firstname']} user.firstname = data['firstname'] if 'lastname' in data: if data['lastname'] != user.lastname: changes['lastname'] = {'old': user.lastname, 'new': data['lastname']} user.lastname = data['lastname'] # Admin-only fields if current_user.hasrole('admin'): if 'isactive' in data: if data['isactive'] != user.isactive: changes['isactive'] = {'old': user.isactive, 'new': data['isactive']} user.isactive = data['isactive'] if 'roles' in data: old_roles = [r.rolename for r in user.roles] roles = Role.query.filter(Role.roleid.in_(data['roles'])).all() new_roles = [r.rolename for r in roles] if set(old_roles) != set(new_roles): changes['roles'] = {'old': old_roles, 'new': new_roles} user.roles = roles # Unlock user if data.get('unlock'): user.lockeduntil = None user.failedlogins = 0 changes['unlocked'] = {'old': True, 'new': False} # Password change if 'password' in data and data['password']: user.passwordhash = generate_password_hash(data['password']) changes['password'] = {'old': '***', 'new': '***'} if changes: AuditLog.log('updated', 'User', entityid=user.userid, entityname=user.username, changes=changes) db.session.commit() return success_response(user_to_dict(user), message='User updated') @users_bp.route('/', methods=['DELETE']) @jwt_required() def delete_user(userid: int): """Delete a user.""" if not current_user.hasrole('admin'): return error_response(ErrorCodes.FORBIDDEN, 'Admin access required', http_code=403) if current_user.userid == userid: return error_response(ErrorCodes.VALIDATION_ERROR, 'Cannot delete your own account') user = User.query.get(userid) if not user: return error_response(ErrorCodes.NOT_FOUND, 'User not found', http_code=404) username = user.username db.session.delete(user) AuditLog.log('deleted', 'User', entityid=userid, entityname=username) db.session.commit() return success_response(None, message='User deleted') # Permissions endpoints @users_bp.route('/permissions', methods=['GET']) @jwt_required() def list_permissions(): """List all permissions grouped by category.""" permissions = Permission.query.order_by(Permission.category, Permission.name).all() # Group by category grouped = {} for p in permissions: if p.category not in grouped: grouped[p.category] = [] grouped[p.category].append({ 'permissionid': p.permissionid, 'name': p.name, 'description': p.description }) return success_response({ 'permissions': [{ 'permissionid': p.permissionid, 'name': p.name, 'description': p.description, 'category': p.category } for p in permissions], 'grouped': grouped }) # Roles endpoints @users_bp.route('/roles', methods=['GET']) @jwt_required() def list_roles(): """List all roles with their permissions.""" roles = Role.query.order_by(Role.rolename).all() return success_response([{ 'roleid': r.roleid, 'rolename': r.rolename, 'description': r.description, 'usercount': r.users.count(), 'permissions': [p.name for p in r.permissions], 'isadmin': r.rolename == 'admin' } for r in roles]) @users_bp.route('/roles', methods=['POST']) @jwt_required() def create_role(): """Create a new role.""" if not current_user.hasrole('admin'): return error_response(ErrorCodes.FORBIDDEN, 'Admin access required', http_code=403) data = request.get_json() if not data or not data.get('rolename'): return error_response(ErrorCodes.VALIDATION_ERROR, 'Role name is required') if Role.query.filter_by(rolename=data['rolename']).first(): return error_response(ErrorCodes.CONFLICT, 'Role already exists', http_code=409) role = Role( rolename=data['rolename'], description=data.get('description') ) # Assign permissions if 'permissions' in data: perms = Permission.query.filter(Permission.name.in_(data['permissions'])).all() role.permissions = perms db.session.add(role) AuditLog.log('created', 'Role', entityname=role.rolename) db.session.commit() return success_response({ 'roleid': role.roleid, 'rolename': role.rolename, 'description': role.description, 'permissions': [p.name for p in role.permissions] }, message='Role created', http_code=201) @users_bp.route('/roles/', methods=['PUT']) @jwt_required() def update_role(roleid: int): """Update a role.""" if not current_user.hasrole('admin'): return error_response(ErrorCodes.FORBIDDEN, 'Admin access required', http_code=403) role = Role.query.get(roleid) if not role: return error_response(ErrorCodes.NOT_FOUND, 'Role not found', http_code=404) # Cannot modify admin role permissions if role.rolename == 'admin' and 'permissions' in request.get_json(): return error_response(ErrorCodes.VALIDATION_ERROR, 'Cannot modify admin role permissions') data = request.get_json() changes = {} if 'description' in data: if data['description'] != role.description: changes['description'] = {'old': role.description, 'new': data['description']} role.description = data['description'] # Update permissions if 'permissions' in data and role.rolename != 'admin': old_perms = [p.name for p in role.permissions] perms = Permission.query.filter(Permission.name.in_(data['permissions'])).all() new_perms = [p.name for p in perms] if set(old_perms) != set(new_perms): changes['permissions'] = {'old': old_perms, 'new': new_perms} role.permissions = perms if changes: AuditLog.log('updated', 'Role', entityid=role.roleid, entityname=role.rolename, changes=changes) db.session.commit() return success_response({ 'roleid': role.roleid, 'rolename': role.rolename, 'description': role.description, 'permissions': [p.name for p in role.permissions] }, message='Role updated') @users_bp.route('/roles/', methods=['DELETE']) @jwt_required() def delete_role(roleid: int): """Delete a role.""" if not current_user.hasrole('admin'): return error_response(ErrorCodes.FORBIDDEN, 'Admin access required', http_code=403) role = Role.query.get(roleid) if not role: return error_response(ErrorCodes.NOT_FOUND, 'Role not found', http_code=404) if role.rolename == 'admin': return error_response(ErrorCodes.VALIDATION_ERROR, 'Cannot delete the admin role') if role.users.count() > 0: return error_response(ErrorCodes.VALIDATION_ERROR, f'Role is assigned to {role.users.count()} user(s)') rolename = role.rolename db.session.delete(role) AuditLog.log('deleted', 'Role', entityid=roleid, entityname=rolename) db.session.commit() return success_response(None, message='Role deleted') def user_to_dict(user: User) -> dict: """Convert user to dict for API response.""" return { 'userid': user.userid, 'username': user.username, 'email': user.email, 'firstname': user.firstname, 'lastname': user.lastname, 'isactive': user.isactive, 'islocked': user.islocked, 'lastlogindate': user.lastlogindate.isoformat() + 'Z' if user.lastlogindate else None, 'failedlogins': user.failedlogins, 'roles': [{'roleid': r.roleid, 'rolename': r.rolename} for r in user.roles], 'createddate': user.createddate.isoformat() + 'Z' if user.createddate else None, 'modifieddate': user.modifieddate.isoformat() + 'Z' if user.modifieddate else None }