Files
shopdb-flask/shopdb/core/api/users.py
cproudlock e18c7c2d87 Add system settings, audit logging, user management, and dark mode fixes
System Settings:
- Add SystemSettings.vue with Zabbix integration, SMTP/email config, SAML SSO settings
- Add Setting model with key-value storage and typed values
- Add settings API with caching

Audit Logging:
- Add AuditLog model tracking user, IP, action, entity changes
- Add comprehensive audit logging to all CRUD operations:
  - Machines, Computers, Equipment, Network devices, VLANs, Subnets
  - Printers, USB devices (including checkout/checkin)
  - Applications, Settings, Users/Roles
- Track old/new values for all field changes
- Mask sensitive values (passwords, tokens) in logs

User Management:
- Add UsersList.vue with full user CRUD
- Add Role management with granular permissions
- Add 41 predefined permissions across 10 categories
- Add users API with roles and permissions endpoints

Reports:
- Add TonerReport.vue for printer supply monitoring

Dark Mode Fixes:
- Fix map position section in PCForm, PrinterForm
- Fix alert-warning in KnowledgeBaseDetail
- All components now use CSS variables for theming

CLI Commands:
- Add flask seed permissions
- Add flask seed settings

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-02-04 22:16:56 -05:00

352 lines
12 KiB
Python

"""User management API routes."""
from flask import Blueprint, request
from flask_jwt_extended import jwt_required, current_user
from werkzeug.security import generate_password_hash
from shopdb.extensions import db
from shopdb.core.models import User, Role, Permission, AuditLog
from shopdb.utils.responses import success_response, error_response, ErrorCodes
users_bp = Blueprint('users', __name__)
@users_bp.route('', methods=['GET'])
@jwt_required()
def list_users():
"""List all users."""
if not current_user.hasrole('admin'):
return error_response(ErrorCodes.FORBIDDEN, 'Admin access required', http_code=403)
users = User.query.order_by(User.username).all()
return success_response([user_to_dict(u) for u in users])
@users_bp.route('/<int:userid>', methods=['GET'])
@jwt_required()
def get_user(userid: int):
"""Get a single user."""
if not current_user.hasrole('admin') and current_user.userid != userid:
return error_response(ErrorCodes.FORBIDDEN, 'Access denied', http_code=403)
user = User.query.get(userid)
if not user:
return error_response(ErrorCodes.NOT_FOUND, 'User not found', http_code=404)
return success_response(user_to_dict(user))
@users_bp.route('', methods=['POST'])
@jwt_required()
def create_user():
"""Create a new user."""
if not current_user.hasrole('admin'):
return error_response(ErrorCodes.FORBIDDEN, 'Admin access required', http_code=403)
data = request.get_json()
if not data:
return error_response(ErrorCodes.VALIDATION_ERROR, 'Request body required')
# Validate required fields
if not data.get('username'):
return error_response(ErrorCodes.VALIDATION_ERROR, 'Username is required')
if not data.get('email'):
return error_response(ErrorCodes.VALIDATION_ERROR, 'Email is required')
if not data.get('password'):
return error_response(ErrorCodes.VALIDATION_ERROR, 'Password is required')
# Check uniqueness
if User.query.filter_by(username=data['username']).first():
return error_response(ErrorCodes.CONFLICT, 'Username already exists', http_code=409)
if User.query.filter_by(email=data['email']).first():
return error_response(ErrorCodes.CONFLICT, 'Email already exists', http_code=409)
user = User(
username=data['username'],
email=data['email'],
passwordhash=generate_password_hash(data['password']),
firstname=data.get('firstname'),
lastname=data.get('lastname'),
isactive=data.get('isactive', True)
)
# Assign roles
role_ids = data.get('roles', [])
if role_ids:
roles = Role.query.filter(Role.roleid.in_(role_ids)).all()
user.roles = roles
db.session.add(user)
# Audit log
AuditLog.log('created', 'User', entityname=user.username)
db.session.commit()
return success_response(user_to_dict(user), message='User created', http_code=201)
@users_bp.route('/<int:userid>', methods=['PUT'])
@jwt_required()
def update_user(userid: int):
"""Update a user."""
if not current_user.hasrole('admin') and current_user.userid != userid:
return error_response(ErrorCodes.FORBIDDEN, 'Access denied', http_code=403)
user = User.query.get(userid)
if not user:
return error_response(ErrorCodes.NOT_FOUND, 'User not found', http_code=404)
data = request.get_json()
if not data:
return error_response(ErrorCodes.VALIDATION_ERROR, 'Request body required')
changes = {}
# Update fields
if 'email' in data and data['email'] != user.email:
if User.query.filter(User.email == data['email'], User.userid != userid).first():
return error_response(ErrorCodes.CONFLICT, 'Email already in use', http_code=409)
changes['email'] = {'old': user.email, 'new': data['email']}
user.email = data['email']
if 'firstname' in data:
if data['firstname'] != user.firstname:
changes['firstname'] = {'old': user.firstname, 'new': data['firstname']}
user.firstname = data['firstname']
if 'lastname' in data:
if data['lastname'] != user.lastname:
changes['lastname'] = {'old': user.lastname, 'new': data['lastname']}
user.lastname = data['lastname']
# Admin-only fields
if current_user.hasrole('admin'):
if 'isactive' in data:
if data['isactive'] != user.isactive:
changes['isactive'] = {'old': user.isactive, 'new': data['isactive']}
user.isactive = data['isactive']
if 'roles' in data:
old_roles = [r.rolename for r in user.roles]
roles = Role.query.filter(Role.roleid.in_(data['roles'])).all()
new_roles = [r.rolename for r in roles]
if set(old_roles) != set(new_roles):
changes['roles'] = {'old': old_roles, 'new': new_roles}
user.roles = roles
# Unlock user
if data.get('unlock'):
user.lockeduntil = None
user.failedlogins = 0
changes['unlocked'] = {'old': True, 'new': False}
# Password change
if 'password' in data and data['password']:
user.passwordhash = generate_password_hash(data['password'])
changes['password'] = {'old': '***', 'new': '***'}
if changes:
AuditLog.log('updated', 'User', entityid=user.userid, entityname=user.username, changes=changes)
db.session.commit()
return success_response(user_to_dict(user), message='User updated')
@users_bp.route('/<int:userid>', methods=['DELETE'])
@jwt_required()
def delete_user(userid: int):
"""Delete a user."""
if not current_user.hasrole('admin'):
return error_response(ErrorCodes.FORBIDDEN, 'Admin access required', http_code=403)
if current_user.userid == userid:
return error_response(ErrorCodes.VALIDATION_ERROR, 'Cannot delete your own account')
user = User.query.get(userid)
if not user:
return error_response(ErrorCodes.NOT_FOUND, 'User not found', http_code=404)
username = user.username
db.session.delete(user)
AuditLog.log('deleted', 'User', entityid=userid, entityname=username)
db.session.commit()
return success_response(None, message='User deleted')
# Permissions endpoints
@users_bp.route('/permissions', methods=['GET'])
@jwt_required()
def list_permissions():
"""List all permissions grouped by category."""
permissions = Permission.query.order_by(Permission.category, Permission.name).all()
# Group by category
grouped = {}
for p in permissions:
if p.category not in grouped:
grouped[p.category] = []
grouped[p.category].append({
'permissionid': p.permissionid,
'name': p.name,
'description': p.description
})
return success_response({
'permissions': [{
'permissionid': p.permissionid,
'name': p.name,
'description': p.description,
'category': p.category
} for p in permissions],
'grouped': grouped
})
# Roles endpoints
@users_bp.route('/roles', methods=['GET'])
@jwt_required()
def list_roles():
"""List all roles with their permissions."""
roles = Role.query.order_by(Role.rolename).all()
return success_response([{
'roleid': r.roleid,
'rolename': r.rolename,
'description': r.description,
'usercount': r.users.count(),
'permissions': [p.name for p in r.permissions],
'isadmin': r.rolename == 'admin'
} for r in roles])
@users_bp.route('/roles', methods=['POST'])
@jwt_required()
def create_role():
"""Create a new role."""
if not current_user.hasrole('admin'):
return error_response(ErrorCodes.FORBIDDEN, 'Admin access required', http_code=403)
data = request.get_json()
if not data or not data.get('rolename'):
return error_response(ErrorCodes.VALIDATION_ERROR, 'Role name is required')
if Role.query.filter_by(rolename=data['rolename']).first():
return error_response(ErrorCodes.CONFLICT, 'Role already exists', http_code=409)
role = Role(
rolename=data['rolename'],
description=data.get('description')
)
# Assign permissions
if 'permissions' in data:
perms = Permission.query.filter(Permission.name.in_(data['permissions'])).all()
role.permissions = perms
db.session.add(role)
AuditLog.log('created', 'Role', entityname=role.rolename)
db.session.commit()
return success_response({
'roleid': role.roleid,
'rolename': role.rolename,
'description': role.description,
'permissions': [p.name for p in role.permissions]
}, message='Role created', http_code=201)
@users_bp.route('/roles/<int:roleid>', methods=['PUT'])
@jwt_required()
def update_role(roleid: int):
"""Update a role."""
if not current_user.hasrole('admin'):
return error_response(ErrorCodes.FORBIDDEN, 'Admin access required', http_code=403)
role = Role.query.get(roleid)
if not role:
return error_response(ErrorCodes.NOT_FOUND, 'Role not found', http_code=404)
# Cannot modify admin role permissions
if role.rolename == 'admin' and 'permissions' in request.get_json():
return error_response(ErrorCodes.VALIDATION_ERROR, 'Cannot modify admin role permissions')
data = request.get_json()
changes = {}
if 'description' in data:
if data['description'] != role.description:
changes['description'] = {'old': role.description, 'new': data['description']}
role.description = data['description']
# Update permissions
if 'permissions' in data and role.rolename != 'admin':
old_perms = [p.name for p in role.permissions]
perms = Permission.query.filter(Permission.name.in_(data['permissions'])).all()
new_perms = [p.name for p in perms]
if set(old_perms) != set(new_perms):
changes['permissions'] = {'old': old_perms, 'new': new_perms}
role.permissions = perms
if changes:
AuditLog.log('updated', 'Role', entityid=role.roleid, entityname=role.rolename, changes=changes)
db.session.commit()
return success_response({
'roleid': role.roleid,
'rolename': role.rolename,
'description': role.description,
'permissions': [p.name for p in role.permissions]
}, message='Role updated')
@users_bp.route('/roles/<int:roleid>', methods=['DELETE'])
@jwt_required()
def delete_role(roleid: int):
"""Delete a role."""
if not current_user.hasrole('admin'):
return error_response(ErrorCodes.FORBIDDEN, 'Admin access required', http_code=403)
role = Role.query.get(roleid)
if not role:
return error_response(ErrorCodes.NOT_FOUND, 'Role not found', http_code=404)
if role.rolename == 'admin':
return error_response(ErrorCodes.VALIDATION_ERROR, 'Cannot delete the admin role')
if role.users.count() > 0:
return error_response(ErrorCodes.VALIDATION_ERROR, f'Role is assigned to {role.users.count()} user(s)')
rolename = role.rolename
db.session.delete(role)
AuditLog.log('deleted', 'Role', entityid=roleid, entityname=rolename)
db.session.commit()
return success_response(None, message='Role deleted')
def user_to_dict(user: User) -> dict:
"""Convert user to dict for API response."""
return {
'userid': user.userid,
'username': user.username,
'email': user.email,
'firstname': user.firstname,
'lastname': user.lastname,
'isactive': user.isactive,
'islocked': user.islocked,
'lastlogindate': user.lastlogindate.isoformat() + 'Z' if user.lastlogindate else None,
'failedlogins': user.failedlogins,
'roles': [{'roleid': r.roleid, 'rolename': r.rolename} for r in user.roles],
'createddate': user.createddate.isoformat() + 'Z' if user.createddate else None,
'modifieddate': user.modifieddate.isoformat() + 'Z' if user.modifieddate else None
}