Files
shopdb-flask/plugins/printers/api/asset_routes.py
cproudlock 9efdb5f52d Add print badges, pagination, route splitting, JWT auth fixes, and list page alignment
- Fix equipment badge barcode not rendering (loading race condition)
- Fix printer QR code not rendering on initial load (same race condition)
- Add model image to equipment badge via imageurl from Model table
- Fix white-on-white machine number text on badge, tighten barcode spacing
- Add PaginationBar component used across all list pages
- Split monolithic router into per-plugin route modules
- Fix 25 GET API endpoints returning 401 (jwt_required -> optional=True)
- Align list page columns across Equipment, PCs, and Network pages
- Add print views: EquipmentBadge, PrinterQRSingle, PrinterQRBatch, USBLabelBatch
- Add PC Relationships report, migration docs, and CLAUDE.md project guide
- Various plugin model, API, and frontend refinements

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-02-04 07:32:44 -05:00

477 lines
16 KiB
Python

"""Printers API routes - new Asset-based architecture."""
from flask import Blueprint, request
from flask_jwt_extended import jwt_required
from shopdb.extensions import db
from shopdb.core.models import Asset, AssetType, Vendor, Model, Communication, CommunicationType
from shopdb.utils.responses import (
success_response,
error_response,
paginated_response,
ErrorCodes
)
from shopdb.utils.pagination import get_pagination_params, paginate_query
from ..models import Printer, PrinterType
from ..services import ZabbixService
printers_asset_bp = Blueprint('printers_asset', __name__)
# =============================================================================
# Printer Types
# =============================================================================
@printers_asset_bp.route('/types', methods=['GET'])
@jwt_required(optional=True)
def list_printer_types():
"""List all printer types."""
page, per_page = get_pagination_params(request)
query = PrinterType.query
if request.args.get('active', 'true').lower() != 'false':
query = query.filter(PrinterType.isactive == True)
if search := request.args.get('search'):
query = query.filter(PrinterType.printertype.ilike(f'%{search}%'))
query = query.order_by(PrinterType.printertype)
items, total = paginate_query(query, page, per_page)
data = [t.to_dict() for t in items]
return paginated_response(data, page, per_page, total)
@printers_asset_bp.route('/types/<int:type_id>', methods=['GET'])
@jwt_required(optional=True)
def get_printer_type(type_id: int):
"""Get a single printer type."""
t = PrinterType.query.get(type_id)
if not t:
return error_response(
ErrorCodes.NOT_FOUND,
f'Printer type with ID {type_id} not found',
http_code=404
)
return success_response(t.to_dict())
@printers_asset_bp.route('/types', methods=['POST'])
@jwt_required()
def create_printer_type():
"""Create a new printer type."""
data = request.get_json()
if not data or not data.get('printertype'):
return error_response(ErrorCodes.VALIDATION_ERROR, 'printertype is required')
if PrinterType.query.filter_by(printertype=data['printertype']).first():
return error_response(
ErrorCodes.CONFLICT,
f"Printer type '{data['printertype']}' already exists",
http_code=409
)
t = PrinterType(
printertype=data['printertype'],
description=data.get('description'),
icon=data.get('icon')
)
db.session.add(t)
db.session.commit()
return success_response(t.to_dict(), message='Printer type created', http_code=201)
# =============================================================================
# Printers CRUD
# =============================================================================
@printers_asset_bp.route('', methods=['GET'])
@jwt_required(optional=True)
def list_printers():
"""
List all printers with filtering and pagination.
Query parameters:
- page, per_page: Pagination
- active: Filter by active status
- search: Search by asset number, name, or hostname
- type_id: Filter by printer type ID
- vendor_id: Filter by vendor ID
- location_id: Filter by location ID
- businessunit_id: Filter by business unit ID
"""
page, per_page = get_pagination_params(request)
# Join Printer with Asset
query = db.session.query(Printer).join(Asset)
# Active filter
if request.args.get('active', 'true').lower() != 'false':
query = query.filter(Asset.isactive == True)
# Search filter
if search := request.args.get('search'):
query = query.filter(
db.or_(
Asset.assetnumber.ilike(f'%{search}%'),
Asset.name.ilike(f'%{search}%'),
Asset.serialnumber.ilike(f'%{search}%'),
Printer.hostname.ilike(f'%{search}%'),
Printer.windowsname.ilike(f'%{search}%')
)
)
# Type filter
if type_id := request.args.get('type_id'):
query = query.filter(Printer.printertypeid == int(type_id))
# Vendor filter
if vendor_id := request.args.get('vendor_id'):
query = query.filter(Printer.vendorid == int(vendor_id))
# Location filter
if location_id := request.args.get('location_id'):
query = query.filter(Asset.locationid == int(location_id))
# Business unit filter
if bu_id := request.args.get('businessunit_id'):
query = query.filter(Asset.businessunitid == int(bu_id))
# Sorting
sort_by = request.args.get('sort', 'hostname')
sort_dir = request.args.get('dir', 'asc')
if sort_by == 'hostname':
col = Printer.hostname
elif sort_by == 'assetnumber':
col = Asset.assetnumber
elif sort_by == 'name':
col = Asset.name
else:
col = Printer.hostname
query = query.order_by(col.desc() if sort_dir == 'desc' else col)
items, total = paginate_query(query, page, per_page)
# Build response with both asset and printer data
data = []
for printer in items:
item = printer.asset.to_dict() if printer.asset else {}
item['printer'] = printer.to_dict()
# Add primary IP address
if printer.asset:
primary_comm = Communication.query.filter_by(
assetid=printer.asset.assetid,
isprimary=True
).first()
if not primary_comm:
primary_comm = Communication.query.filter_by(
assetid=printer.asset.assetid
).first()
item['ipaddress'] = primary_comm.ipaddress if primary_comm else None
data.append(item)
return paginated_response(data, page, per_page, total)
@printers_asset_bp.route('/<int:printer_id>', methods=['GET'])
@jwt_required(optional=True)
def get_printer(printer_id: int):
"""Get a single printer with full details."""
printer = Printer.query.get(printer_id)
if not printer:
return error_response(
ErrorCodes.NOT_FOUND,
f'Printer with ID {printer_id} not found',
http_code=404
)
result = printer.asset.to_dict() if printer.asset else {}
result['printer'] = printer.to_dict()
# Add communications
if printer.asset:
comms = Communication.query.filter_by(assetid=printer.asset.assetid).all()
result['communications'] = [c.to_dict() for c in comms]
return success_response(result)
@printers_asset_bp.route('/by-asset/<int:asset_id>', methods=['GET'])
@jwt_required(optional=True)
def get_printer_by_asset(asset_id: int):
"""Get printer data by asset ID."""
printer = Printer.query.filter_by(assetid=asset_id).first()
if not printer:
return error_response(
ErrorCodes.NOT_FOUND,
f'Printer for asset {asset_id} not found',
http_code=404
)
result = printer.asset.to_dict() if printer.asset else {}
result['printer'] = printer.to_dict()
return success_response(result)
@printers_asset_bp.route('', methods=['POST'])
@jwt_required()
def create_printer():
"""
Create new printer (creates both Asset and Printer records).
Required fields:
- assetnumber: Business identifier
Optional fields:
- name, serialnumber, statusid, locationid, businessunitid
- printertypeid, vendorid, modelnumberid, hostname
- windowsname, sharename, iscsf, installpath, pin
- iscolor, isduplex, isnetwork
- mapleft, maptop, notes
"""
data = request.get_json()
if not data:
return error_response(ErrorCodes.VALIDATION_ERROR, 'No data provided')
if not data.get('assetnumber'):
return error_response(ErrorCodes.VALIDATION_ERROR, 'assetnumber is required')
# Check for duplicate assetnumber
if Asset.query.filter_by(assetnumber=data['assetnumber']).first():
return error_response(
ErrorCodes.CONFLICT,
f"Asset with number '{data['assetnumber']}' already exists",
http_code=409
)
# Get printer asset type
printer_type = AssetType.query.filter_by(assettype='printer').first()
if not printer_type:
return error_response(
ErrorCodes.INTERNAL_ERROR,
'Printer asset type not found. Plugin may not be properly installed.',
http_code=500
)
# Create the core asset
asset = Asset(
assetnumber=data['assetnumber'],
name=data.get('name'),
serialnumber=data.get('serialnumber'),
assettypeid=printer_type.assettypeid,
statusid=data.get('statusid', 1),
locationid=data.get('locationid'),
businessunitid=data.get('businessunitid'),
mapleft=data.get('mapleft'),
maptop=data.get('maptop'),
notes=data.get('notes')
)
db.session.add(asset)
db.session.flush() # Get the assetid
# Create the printer extension
printer = Printer(
assetid=asset.assetid,
printertypeid=data.get('printertypeid'),
vendorid=data.get('vendorid'),
modelnumberid=data.get('modelnumberid'),
hostname=data.get('hostname'),
windowsname=data.get('windowsname'),
sharename=data.get('sharename'),
iscsf=data.get('iscsf', False),
installpath=data.get('installpath'),
pin=data.get('pin'),
iscolor=data.get('iscolor', False),
isduplex=data.get('isduplex', False),
isnetwork=data.get('isnetwork', True)
)
db.session.add(printer)
# Create communication record if IP provided
if data.get('ipaddress'):
ip_comtype = CommunicationType.query.filter_by(comtype='IP').first()
if ip_comtype:
comm = Communication(
assetid=asset.assetid,
comtypeid=ip_comtype.comtypeid,
ipaddress=data['ipaddress'],
isprimary=True
)
db.session.add(comm)
db.session.commit()
result = asset.to_dict()
result['printer'] = printer.to_dict()
return success_response(result, message='Printer created', http_code=201)
@printers_asset_bp.route('/<int:printer_id>', methods=['PUT'])
@jwt_required()
def update_printer(printer_id: int):
"""Update printer (both Asset and Printer records)."""
printer = Printer.query.get(printer_id)
if not printer:
return error_response(
ErrorCodes.NOT_FOUND,
f'Printer with ID {printer_id} not found',
http_code=404
)
data = request.get_json()
if not data:
return error_response(ErrorCodes.VALIDATION_ERROR, 'No data provided')
asset = printer.asset
# Check for conflicting assetnumber
if 'assetnumber' in data and data['assetnumber'] != asset.assetnumber:
if Asset.query.filter_by(assetnumber=data['assetnumber']).first():
return error_response(
ErrorCodes.CONFLICT,
f"Asset with number '{data['assetnumber']}' already exists",
http_code=409
)
# Update asset fields
asset_fields = ['assetnumber', 'name', 'serialnumber', 'statusid',
'locationid', 'businessunitid', 'mapleft', 'maptop', 'notes', 'isactive']
for key in asset_fields:
if key in data:
setattr(asset, key, data[key])
# Update printer fields
printer_fields = ['printertypeid', 'vendorid', 'modelnumberid', 'hostname',
'windowsname', 'sharename', 'iscsf', 'installpath', 'pin',
'iscolor', 'isduplex', 'isnetwork']
for key in printer_fields:
if key in data:
setattr(printer, key, data[key])
db.session.commit()
result = asset.to_dict()
result['printer'] = printer.to_dict()
return success_response(result, message='Printer updated')
@printers_asset_bp.route('/<int:printer_id>', methods=['DELETE'])
@jwt_required()
def delete_printer(printer_id: int):
"""Delete (soft delete) printer."""
printer = Printer.query.get(printer_id)
if not printer:
return error_response(
ErrorCodes.NOT_FOUND,
f'Printer with ID {printer_id} not found',
http_code=404
)
# Soft delete the asset
printer.asset.isactive = False
db.session.commit()
return success_response(message='Printer deleted')
# =============================================================================
# Supply Levels (Zabbix Integration)
# =============================================================================
@printers_asset_bp.route('/<int:printer_id>/supplies', methods=['GET'])
@jwt_required(optional=True)
def get_printer_supplies(printer_id: int):
"""Get supply levels from Zabbix (real-time lookup)."""
printer = Printer.query.get(printer_id)
if not printer:
return error_response(ErrorCodes.NOT_FOUND, 'Printer not found', http_code=404)
# Get IP address from communications
comm = Communication.query.filter_by(
assetid=printer.assetid,
isprimary=True
).first()
if not comm:
comm = Communication.query.filter_by(assetid=printer.assetid).first()
if not comm or not comm.ipaddress:
return error_response(ErrorCodes.VALIDATION_ERROR, 'Printer has no IP address')
service = ZabbixService()
if not service.isconfigured:
return error_response(ErrorCodes.BAD_REQUEST, 'Zabbix not configured')
supplies = service.getsuppliesbyip(comm.ipaddress)
return success_response({
'ipaddress': comm.ipaddress,
'supplies': supplies or []
})
# =============================================================================
# Dashboard
# =============================================================================
@printers_asset_bp.route('/dashboard/summary', methods=['GET'])
@jwt_required(optional=True)
def dashboard_summary():
"""Get printer dashboard summary data."""
# Total active printers
total = db.session.query(Printer).join(Asset).filter(
Asset.isactive == True
).count()
# Count by printer type
by_type = db.session.query(
PrinterType.printertype,
db.func.count(Printer.printerid)
).join(Printer, Printer.printertypeid == PrinterType.printertypeid
).join(Asset, Asset.assetid == Printer.assetid
).filter(Asset.isactive == True
).group_by(PrinterType.printertype
).all()
# Count by vendor
by_vendor = db.session.query(
Vendor.vendor,
db.func.count(Printer.printerid)
).join(Printer, Printer.vendorid == Vendor.vendorid
).join(Asset, Asset.assetid == Printer.assetid
).filter(Asset.isactive == True
).group_by(Vendor.vendor
).all()
return success_response({
'total': total,
'totalprinters': total, # For dashboard compatibility
'online': total, # Placeholder - would need monitoring integration
'lowsupplies': 0, # Placeholder - would need Zabbix integration
'criticalsupplies': 0, # Placeholder - would need Zabbix integration
'bytype': [{'type': t, 'count': c} for t, c in by_type],
'byvendor': [{'vendor': v, 'count': c} for v, c in by_vendor],
})