Files
shopdb-flask/shopdb/core/api/assets.py
cproudlock c3ce69da12 Migrate frontend to plugin-based asset architecture
- Add equipmentApi and computersApi to replace legacy machinesApi
- Add controller vendor/model fields to Equipment model and forms
- Fix map marker navigation to use plugin-specific IDs (equipmentid,
  computerid, printerid, networkdeviceid) instead of assetid
- Fix search to use unified Asset table with correct plugin IDs
- Remove legacy printer search that used non-existent field names
- Enable optional JWT auth for detail endpoints (public read access)
- Clean up USB plugin models (remove unused checkout model)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-29 16:07:41 -05:00

739 lines
25 KiB
Python

"""Assets API endpoints - unified asset queries."""
from flask import Blueprint, request
from flask_jwt_extended import jwt_required
from shopdb.extensions import db
from shopdb.core.models import Asset, AssetType, AssetStatus, AssetRelationship, RelationshipType
from shopdb.utils.responses import (
success_response,
error_response,
paginated_response,
ErrorCodes
)
from shopdb.utils.pagination import get_pagination_params, paginate_query
assets_bp = Blueprint('assets', __name__)
# =============================================================================
# Asset Types
# =============================================================================
@assets_bp.route('/types', methods=['GET'])
@jwt_required()
def list_asset_types():
"""List all asset types."""
page, per_page = get_pagination_params(request)
query = AssetType.query
if request.args.get('active', 'true').lower() != 'false':
query = query.filter(AssetType.isactive == True)
query = query.order_by(AssetType.assettype)
items, total = paginate_query(query, page, per_page)
data = [t.to_dict() for t in items]
return paginated_response(data, page, per_page, total)
@assets_bp.route('/types/<int:type_id>', methods=['GET'])
@jwt_required()
def get_asset_type(type_id: int):
"""Get a single asset type."""
t = AssetType.query.get(type_id)
if not t:
return error_response(
ErrorCodes.NOT_FOUND,
f'Asset type with ID {type_id} not found',
http_code=404
)
return success_response(t.to_dict())
@assets_bp.route('/types', methods=['POST'])
@jwt_required()
def create_asset_type():
"""Create a new asset type."""
data = request.get_json()
if not data or not data.get('assettype'):
return error_response(ErrorCodes.VALIDATION_ERROR, 'assettype is required')
if AssetType.query.filter_by(assettype=data['assettype']).first():
return error_response(
ErrorCodes.CONFLICT,
f"Asset type '{data['assettype']}' already exists",
http_code=409
)
t = AssetType(
assettype=data['assettype'],
plugin_name=data.get('plugin_name'),
table_name=data.get('table_name'),
description=data.get('description'),
icon=data.get('icon')
)
db.session.add(t)
db.session.commit()
return success_response(t.to_dict(), message='Asset type created', http_code=201)
# =============================================================================
# Asset Statuses
# =============================================================================
@assets_bp.route('/statuses', methods=['GET'])
@jwt_required()
def list_asset_statuses():
"""List all asset statuses."""
page, per_page = get_pagination_params(request)
query = AssetStatus.query
if request.args.get('active', 'true').lower() != 'false':
query = query.filter(AssetStatus.isactive == True)
query = query.order_by(AssetStatus.status)
items, total = paginate_query(query, page, per_page)
data = [s.to_dict() for s in items]
return paginated_response(data, page, per_page, total)
@assets_bp.route('/statuses/<int:status_id>', methods=['GET'])
@jwt_required()
def get_asset_status(status_id: int):
"""Get a single asset status."""
s = AssetStatus.query.get(status_id)
if not s:
return error_response(
ErrorCodes.NOT_FOUND,
f'Asset status with ID {status_id} not found',
http_code=404
)
return success_response(s.to_dict())
@assets_bp.route('/statuses', methods=['POST'])
@jwt_required()
def create_asset_status():
"""Create a new asset status."""
data = request.get_json()
if not data or not data.get('status'):
return error_response(ErrorCodes.VALIDATION_ERROR, 'status is required')
if AssetStatus.query.filter_by(status=data['status']).first():
return error_response(
ErrorCodes.CONFLICT,
f"Asset status '{data['status']}' already exists",
http_code=409
)
s = AssetStatus(
status=data['status'],
description=data.get('description'),
color=data.get('color')
)
db.session.add(s)
db.session.commit()
return success_response(s.to_dict(), message='Asset status created', http_code=201)
# =============================================================================
# Assets
# =============================================================================
@assets_bp.route('', methods=['GET'])
@jwt_required()
def list_assets():
"""
List all assets with filtering and pagination.
Query parameters:
- page: Page number (default: 1)
- per_page: Items per page (default: 20, max: 100)
- active: Filter by active status (default: true)
- search: Search by assetnumber or name
- type: Filter by asset type name (e.g., 'equipment', 'computer')
- type_id: Filter by asset type ID
- status_id: Filter by status ID
- location_id: Filter by location ID
- businessunit_id: Filter by business unit ID
- include_type_data: Include category-specific extension data (default: false)
"""
page, per_page = get_pagination_params(request)
query = Asset.query
# Active filter
if request.args.get('active', 'true').lower() != 'false':
query = query.filter(Asset.isactive == True)
# Search filter
if search := request.args.get('search'):
query = query.filter(
db.or_(
Asset.assetnumber.ilike(f'%{search}%'),
Asset.name.ilike(f'%{search}%'),
Asset.serialnumber.ilike(f'%{search}%')
)
)
# Type filter by name
if type_name := request.args.get('type'):
query = query.join(AssetType).filter(AssetType.assettype == type_name)
# Type filter by ID
if type_id := request.args.get('type_id'):
query = query.filter(Asset.assettypeid == int(type_id))
# Status filter
if status_id := request.args.get('status_id'):
query = query.filter(Asset.statusid == int(status_id))
# Location filter
if location_id := request.args.get('location_id'):
query = query.filter(Asset.locationid == int(location_id))
# Business unit filter
if bu_id := request.args.get('businessunit_id'):
query = query.filter(Asset.businessunitid == int(bu_id))
# Sorting
sort_by = request.args.get('sort', 'assetnumber')
sort_dir = request.args.get('dir', 'asc')
sort_columns = {
'assetnumber': Asset.assetnumber,
'name': Asset.name,
'createddate': Asset.createddate,
'modifieddate': Asset.modifieddate,
}
if sort_by in sort_columns:
col = sort_columns[sort_by]
query = query.order_by(col.desc() if sort_dir == 'desc' else col)
else:
query = query.order_by(Asset.assetnumber)
items, total = paginate_query(query, page, per_page)
# Include type data if requested
include_type_data = request.args.get('include_type_data', 'false').lower() == 'true'
data = [a.to_dict(include_type_data=include_type_data) for a in items]
return paginated_response(data, page, per_page, total)
@assets_bp.route('/<int:asset_id>', methods=['GET'])
@jwt_required()
def get_asset(asset_id: int):
"""
Get a single asset with full details.
Query parameters:
- include_type_data: Include category-specific extension data (default: true)
"""
asset = Asset.query.get(asset_id)
if not asset:
return error_response(
ErrorCodes.NOT_FOUND,
f'Asset with ID {asset_id} not found',
http_code=404
)
include_type_data = request.args.get('include_type_data', 'true').lower() != 'false'
return success_response(asset.to_dict(include_type_data=include_type_data))
@assets_bp.route('', methods=['POST'])
@jwt_required()
def create_asset():
"""Create a new asset."""
data = request.get_json()
if not data:
return error_response(ErrorCodes.VALIDATION_ERROR, 'No data provided')
# Validate required fields
if not data.get('assetnumber'):
return error_response(ErrorCodes.VALIDATION_ERROR, 'assetnumber is required')
if not data.get('assettypeid'):
return error_response(ErrorCodes.VALIDATION_ERROR, 'assettypeid is required')
# Check for duplicate assetnumber
if Asset.query.filter_by(assetnumber=data['assetnumber']).first():
return error_response(
ErrorCodes.CONFLICT,
f"Asset with number '{data['assetnumber']}' already exists",
http_code=409
)
# Validate foreign keys exist
if not AssetType.query.get(data['assettypeid']):
return error_response(
ErrorCodes.VALIDATION_ERROR,
f"Asset type with ID {data['assettypeid']} not found"
)
asset = Asset(
assetnumber=data['assetnumber'],
name=data.get('name'),
serialnumber=data.get('serialnumber'),
assettypeid=data['assettypeid'],
statusid=data.get('statusid', 1),
locationid=data.get('locationid'),
businessunitid=data.get('businessunitid'),
mapleft=data.get('mapleft'),
maptop=data.get('maptop'),
notes=data.get('notes')
)
db.session.add(asset)
db.session.commit()
return success_response(asset.to_dict(), message='Asset created', http_code=201)
@assets_bp.route('/<int:asset_id>', methods=['PUT'])
@jwt_required()
def update_asset(asset_id: int):
"""Update an asset."""
asset = Asset.query.get(asset_id)
if not asset:
return error_response(
ErrorCodes.NOT_FOUND,
f'Asset with ID {asset_id} not found',
http_code=404
)
data = request.get_json()
if not data:
return error_response(ErrorCodes.VALIDATION_ERROR, 'No data provided')
# Check for conflicting assetnumber
if 'assetnumber' in data and data['assetnumber'] != asset.assetnumber:
if Asset.query.filter_by(assetnumber=data['assetnumber']).first():
return error_response(
ErrorCodes.CONFLICT,
f"Asset with number '{data['assetnumber']}' already exists",
http_code=409
)
# Update allowed fields
allowed_fields = [
'assetnumber', 'name', 'serialnumber', 'assettypeid', 'statusid',
'locationid', 'businessunitid', 'mapleft', 'maptop', 'notes', 'isactive'
]
for key in allowed_fields:
if key in data:
setattr(asset, key, data[key])
db.session.commit()
return success_response(asset.to_dict(), message='Asset updated')
@assets_bp.route('/<int:asset_id>', methods=['DELETE'])
@jwt_required()
def delete_asset(asset_id: int):
"""Delete (soft delete) an asset."""
asset = Asset.query.get(asset_id)
if not asset:
return error_response(
ErrorCodes.NOT_FOUND,
f'Asset with ID {asset_id} not found',
http_code=404
)
asset.isactive = False
db.session.commit()
return success_response(message='Asset deleted')
@assets_bp.route('/lookup/<assetnumber>', methods=['GET'])
@jwt_required()
def lookup_asset_by_number(assetnumber: str):
"""
Look up an asset by its asset number.
Useful for finding the asset ID when you only have the machine/asset number.
"""
asset = Asset.query.filter_by(assetnumber=assetnumber, isactive=True).first()
if not asset:
return error_response(
ErrorCodes.NOT_FOUND,
f'Asset with number {assetnumber} not found',
http_code=404
)
return success_response(asset.to_dict(include_type_data=True))
# =============================================================================
# Asset Relationships
# =============================================================================
@assets_bp.route('/<int:asset_id>/relationships', methods=['GET'])
@jwt_required(optional=True)
def get_asset_relationships(asset_id: int):
"""
Get all relationships for an asset.
Returns both outgoing (source) and incoming (target) relationships.
"""
asset = Asset.query.get(asset_id)
if not asset:
return error_response(
ErrorCodes.NOT_FOUND,
f'Asset with ID {asset_id} not found',
http_code=404
)
# Get outgoing relationships (this asset is source)
outgoing = AssetRelationship.query.filter_by(
source_assetid=asset_id
).filter(AssetRelationship.isactive == True).all()
# Get incoming relationships (this asset is target)
incoming = AssetRelationship.query.filter_by(
target_assetid=asset_id
).filter(AssetRelationship.isactive == True).all()
outgoing_data = []
for rel in outgoing:
r = rel.to_dict()
r['target_asset'] = rel.target_asset.to_dict() if rel.target_asset else None
r['relationship_type_name'] = rel.relationship_type.relationshiptype if rel.relationship_type else None
outgoing_data.append(r)
incoming_data = []
for rel in incoming:
r = rel.to_dict()
r['source_asset'] = rel.source_asset.to_dict() if rel.source_asset else None
r['relationship_type_name'] = rel.relationship_type.relationshiptype if rel.relationship_type else None
incoming_data.append(r)
return success_response({
'outgoing': outgoing_data,
'incoming': incoming_data
})
@assets_bp.route('/relationships', methods=['POST'])
@jwt_required()
def create_asset_relationship():
"""Create a relationship between two assets."""
data = request.get_json()
if not data:
return error_response(ErrorCodes.VALIDATION_ERROR, 'No data provided')
# Validate required fields
required = ['source_assetid', 'target_assetid', 'relationshiptypeid']
for field in required:
if not data.get(field):
return error_response(ErrorCodes.VALIDATION_ERROR, f'{field} is required')
source_id = data['source_assetid']
target_id = data['target_assetid']
type_id = data['relationshiptypeid']
# Validate assets exist
if not Asset.query.get(source_id):
return error_response(ErrorCodes.NOT_FOUND, f'Source asset {source_id} not found', http_code=404)
if not Asset.query.get(target_id):
return error_response(ErrorCodes.NOT_FOUND, f'Target asset {target_id} not found', http_code=404)
if not RelationshipType.query.get(type_id):
return error_response(ErrorCodes.NOT_FOUND, f'Relationship type {type_id} not found', http_code=404)
# Check for duplicate relationship
existing = AssetRelationship.query.filter_by(
source_assetid=source_id,
target_assetid=target_id,
relationshiptypeid=type_id
).first()
if existing:
return error_response(
ErrorCodes.CONFLICT,
'This relationship already exists',
http_code=409
)
rel = AssetRelationship(
source_assetid=source_id,
target_assetid=target_id,
relationshiptypeid=type_id,
notes=data.get('notes')
)
db.session.add(rel)
db.session.commit()
return success_response(rel.to_dict(), message='Relationship created', http_code=201)
@assets_bp.route('/relationships/<int:rel_id>', methods=['DELETE'])
@jwt_required()
def delete_asset_relationship(rel_id: int):
"""Delete an asset relationship."""
rel = AssetRelationship.query.get(rel_id)
if not rel:
return error_response(
ErrorCodes.NOT_FOUND,
f'Relationship with ID {rel_id} not found',
http_code=404
)
rel.isactive = False
db.session.commit()
return success_response(message='Relationship deleted')
# =============================================================================
# Asset Communications
# =============================================================================
# =============================================================================
# Unified Asset Map
# =============================================================================
@assets_bp.route('/map', methods=['GET'])
@jwt_required(optional=True)
def get_assets_map():
"""
Get all assets with map positions for unified floor map display.
Returns assets with mapleft/maptop coordinates, joined with type-specific data.
Query parameters:
- assettype: Filter by asset type name (equipment, computer, network_device, printer)
- subtype: Filter by subtype ID (machinetype for equipment/computer, networkdevicetype for network, printertype for printer)
- businessunitid: Filter by business unit ID
- statusid: Filter by status ID
- locationid: Filter by location ID
- search: Search by assetnumber, name, or serialnumber
"""
from shopdb.core.models import Location, BusinessUnit, MachineType, Machine
query = Asset.query.filter(
Asset.isactive == True,
Asset.mapleft.isnot(None),
Asset.maptop.isnot(None)
)
selected_assettype = request.args.get('assettype')
# Filter by asset type name
if selected_assettype:
query = query.join(AssetType).filter(AssetType.assettype == selected_assettype)
# Filter by subtype (depends on asset type) - case-insensitive matching
if subtype_id := request.args.get('subtype'):
subtype_id = int(subtype_id)
asset_type_lower = selected_assettype.lower() if selected_assettype else ''
if asset_type_lower == 'equipment':
# Filter by equipment type
try:
from plugins.equipment.models import Equipment
query = query.join(Equipment, Equipment.assetid == Asset.assetid).filter(
Equipment.equipmenttypeid == subtype_id
)
except ImportError:
pass
elif asset_type_lower == 'computer':
# Filter by computer type
try:
from plugins.computers.models import Computer
query = query.join(Computer, Computer.assetid == Asset.assetid).filter(
Computer.computertypeid == subtype_id
)
except ImportError:
pass
elif asset_type_lower == 'network device':
# Filter by network device type
try:
from plugins.network.models import NetworkDevice
query = query.join(NetworkDevice, NetworkDevice.assetid == Asset.assetid).filter(
NetworkDevice.networkdevicetypeid == subtype_id
)
except ImportError:
pass
elif asset_type_lower == 'printer':
# Filter by printer type
try:
from plugins.printers.models import Printer
query = query.join(Printer, Printer.assetid == Asset.assetid).filter(
Printer.printertypeid == subtype_id
)
except ImportError:
pass
# Filter by business unit
if bu_id := request.args.get('businessunitid'):
query = query.filter(Asset.businessunitid == int(bu_id))
# Filter by status
if status_id := request.args.get('statusid'):
query = query.filter(Asset.statusid == int(status_id))
# Filter by location
if location_id := request.args.get('locationid'):
query = query.filter(Asset.locationid == int(location_id))
# Search filter
if search := request.args.get('search'):
query = query.filter(
db.or_(
Asset.assetnumber.ilike(f'%{search}%'),
Asset.name.ilike(f'%{search}%'),
Asset.serialnumber.ilike(f'%{search}%')
)
)
assets = query.all()
# Build response with type-specific data
data = []
for asset in assets:
item = {
'assetid': asset.assetid,
'assetnumber': asset.assetnumber,
'name': asset.name,
'displayname': asset.display_name,
'serialnumber': asset.serialnumber,
'mapleft': asset.mapleft,
'maptop': asset.maptop,
'assettype': asset.assettype.assettype if asset.assettype else None,
'assettypeid': asset.assettypeid,
'status': asset.status.status if asset.status else None,
'statusid': asset.statusid,
'statuscolor': asset.status.color if asset.status else None,
'location': asset.location.locationname if asset.location else None,
'locationid': asset.locationid,
'businessunit': asset.businessunit.businessunit if asset.businessunit else None,
'businessunitid': asset.businessunitid,
'primaryip': asset.primary_ip,
}
# Add type-specific data
type_data = asset._get_extension_data()
if type_data:
item['typedata'] = type_data
data.append(item)
# Get available asset types for filters
asset_types = AssetType.query.filter(AssetType.isactive == True).all()
types_data = [{'assettypeid': t.assettypeid, 'assettype': t.assettype, 'icon': t.icon} for t in asset_types]
# Get status options
statuses = AssetStatus.query.filter(AssetStatus.isactive == True).all()
status_data = [{'statusid': s.statusid, 'status': s.status, 'color': s.color} for s in statuses]
# Get business units
business_units = BusinessUnit.query.filter(BusinessUnit.isactive == True).all()
bu_data = [{'businessunitid': bu.businessunitid, 'businessunit': bu.businessunit} for bu in business_units]
# Get locations
locations = Location.query.filter(Location.isactive == True).all()
loc_data = [{'locationid': loc.locationid, 'locationname': loc.locationname} for loc in locations]
# Get subtypes based on asset type categories (keys match database asset type values)
subtypes = {}
# Equipment types from equipment plugin
try:
from plugins.equipment.models import EquipmentType
equipment_types = EquipmentType.query.filter(EquipmentType.isactive == True).order_by(EquipmentType.equipmenttype).all()
subtypes['Equipment'] = [{'id': et.equipmenttypeid, 'name': et.equipmenttype} for et in equipment_types]
except ImportError:
subtypes['Equipment'] = []
# Computer types from computers plugin
try:
from plugins.computers.models import ComputerType
computer_types = ComputerType.query.filter(ComputerType.isactive == True).order_by(ComputerType.computertype).all()
subtypes['Computer'] = [{'id': ct.computertypeid, 'name': ct.computertype} for ct in computer_types]
except ImportError:
subtypes['Computer'] = []
# Network device types
try:
from plugins.network.models import NetworkDeviceType
net_types = NetworkDeviceType.query.filter(NetworkDeviceType.isactive == True).order_by(NetworkDeviceType.networkdevicetype).all()
subtypes['Network Device'] = [{'id': nt.networkdevicetypeid, 'name': nt.networkdevicetype} for nt in net_types]
except ImportError:
subtypes['Network Device'] = []
# Printer types
try:
from plugins.printers.models import PrinterType
printer_types = PrinterType.query.filter(PrinterType.isactive == True).order_by(PrinterType.printertype).all()
subtypes['Printer'] = [{'id': pt.printertypeid, 'name': pt.printertype} for pt in printer_types]
except ImportError:
subtypes['Printer'] = []
return success_response({
'assets': data,
'total': len(data),
'filters': {
'assettypes': types_data,
'statuses': status_data,
'businessunits': bu_data,
'locations': loc_data,
'subtypes': subtypes
}
})
@assets_bp.route('/<int:asset_id>/communications', methods=['GET'])
@jwt_required()
def get_asset_communications(asset_id: int):
"""Get all communications for an asset."""
from shopdb.core.models import Communication
asset = Asset.query.get(asset_id)
if not asset:
return error_response(
ErrorCodes.NOT_FOUND,
f'Asset with ID {asset_id} not found',
http_code=404
)
comms = Communication.query.filter_by(
assetid=asset_id,
isactive=True
).all()
data = []
for comm in comms:
c = comm.to_dict()
c['comtype_name'] = comm.comtype.comtype if comm.comtype else None
data.append(c)
return success_response(data)