Add full search parity and optimize map performance

Search: refactor into modular helpers, add IP/hostname/notification/
vendor/model/type search, smart auto-redirects for exact matches,
ServiceNOW prefix detection, filter buttons with counts, share links
with highlight support, and dark mode badge colors.

Map: fix N+1 queries with eager loading (248->28 queries), switch to
canvas-rendered circleMarkers for better performance with 500+ assets.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
cproudlock
2026-02-03 23:07:09 -05:00
parent c3ce69da12
commit c4bfdc2db2
5 changed files with 1018 additions and 172 deletions

View File

@@ -2,6 +2,7 @@
from flask import Blueprint, request
from flask_jwt_extended import jwt_required
from sqlalchemy.orm import joinedload, subqueryload
from shopdb.extensions import db
from shopdb.core.models import Asset, AssetType, AssetStatus, AssetRelationship, RelationshipType
@@ -73,8 +74,8 @@ def create_asset_type():
t = AssetType(
assettype=data['assettype'],
plugin_name=data.get('plugin_name'),
table_name=data.get('table_name'),
pluginname=data.get('pluginname'),
tablename=data.get('tablename'),
description=data.get('description'),
icon=data.get('icon')
)
@@ -411,26 +412,26 @@ def get_asset_relationships(asset_id: int):
# Get outgoing relationships (this asset is source)
outgoing = AssetRelationship.query.filter_by(
source_assetid=asset_id
sourceassetid=asset_id
).filter(AssetRelationship.isactive == True).all()
# Get incoming relationships (this asset is target)
incoming = AssetRelationship.query.filter_by(
target_assetid=asset_id
targetassetid=asset_id
).filter(AssetRelationship.isactive == True).all()
outgoing_data = []
for rel in outgoing:
r = rel.to_dict()
r['target_asset'] = rel.target_asset.to_dict() if rel.target_asset else None
r['relationship_type_name'] = rel.relationship_type.relationshiptype if rel.relationship_type else None
r['targetasset'] = rel.targetasset.to_dict() if rel.targetasset else None
r['relationshiptypename'] = rel.relationshiptype.relationshiptype if rel.relationshiptype else None
outgoing_data.append(r)
incoming_data = []
for rel in incoming:
r = rel.to_dict()
r['source_asset'] = rel.source_asset.to_dict() if rel.source_asset else None
r['relationship_type_name'] = rel.relationship_type.relationshiptype if rel.relationship_type else None
r['sourceasset'] = rel.sourceasset.to_dict() if rel.sourceasset else None
r['relationshiptypename'] = rel.relationshiptype.relationshiptype if rel.relationshiptype else None
incoming_data.append(r)
return success_response({
@@ -449,13 +450,13 @@ def create_asset_relationship():
return error_response(ErrorCodes.VALIDATION_ERROR, 'No data provided')
# Validate required fields
required = ['source_assetid', 'target_assetid', 'relationshiptypeid']
required = ['sourceassetid', 'targetassetid', 'relationshiptypeid']
for field in required:
if not data.get(field):
return error_response(ErrorCodes.VALIDATION_ERROR, f'{field} is required')
source_id = data['source_assetid']
target_id = data['target_assetid']
source_id = data['sourceassetid']
target_id = data['targetassetid']
type_id = data['relationshiptypeid']
# Validate assets exist
@@ -468,8 +469,8 @@ def create_asset_relationship():
# Check for duplicate relationship
existing = AssetRelationship.query.filter_by(
source_assetid=source_id,
target_assetid=target_id,
sourceassetid=source_id,
targetassetid=target_id,
relationshiptypeid=type_id
).first()
@@ -481,8 +482,8 @@ def create_asset_relationship():
)
rel = AssetRelationship(
source_assetid=source_id,
target_assetid=target_id,
sourceassetid=source_id,
targetassetid=target_id,
relationshiptypeid=type_id,
notes=data.get('notes')
)
@@ -536,9 +537,85 @@ def get_assets_map():
- locationid: Filter by location ID
- search: Search by assetnumber, name, or serialnumber
"""
from shopdb.core.models import Location, BusinessUnit, MachineType, Machine
from shopdb.core.models import Location, BusinessUnit, MachineType, Machine, Communication
query = Asset.query.filter(
# Eager-load all relationships to avoid N+1 queries.
# Core relationships via joinedload, extension tables via subqueryload
# with their nested relationships (vendor, model, type) also eager-loaded.
eager_options = [
joinedload(Asset.assettype),
joinedload(Asset.status),
joinedload(Asset.location),
joinedload(Asset.businessunit),
]
# Eager-load plugin extension tables AND their relationships
try:
from plugins.equipment.models import Equipment
eager_options.append(
subqueryload(Asset.equipment)
.joinedload(Equipment.equipmenttype)
)
eager_options.append(
subqueryload(Asset.equipment)
.joinedload(Equipment.vendor)
)
eager_options.append(
subqueryload(Asset.equipment)
.joinedload(Equipment.model)
)
eager_options.append(
subqueryload(Asset.equipment)
.joinedload(Equipment.controllervendor)
)
eager_options.append(
subqueryload(Asset.equipment)
.joinedload(Equipment.controllermodel)
)
except (ImportError, AttributeError):
pass
try:
from plugins.computers.models import Computer
eager_options.append(
subqueryload(Asset.computer)
.joinedload(Computer.computertype)
)
eager_options.append(
subqueryload(Asset.computer)
.joinedload(Computer.operatingsystem)
)
except (ImportError, AttributeError):
pass
try:
from plugins.network.models import NetworkDevice
eager_options.append(
subqueryload(Asset.network_device)
.joinedload(NetworkDevice.networkdevicetype)
)
eager_options.append(
subqueryload(Asset.network_device)
.joinedload(NetworkDevice.vendor)
)
except (ImportError, AttributeError):
pass
try:
from plugins.printers.models import Printer
eager_options.append(
subqueryload(Asset.printer)
.joinedload(Printer.printertype)
)
eager_options.append(
subqueryload(Asset.printer)
.joinedload(Printer.vendor)
)
eager_options.append(
subqueryload(Asset.printer)
.joinedload(Printer.model)
)
except (ImportError, AttributeError):
pass
query = Asset.query.options(*eager_options).filter(
Asset.isactive == True,
Asset.mapleft.isnot(None),
Asset.maptop.isnot(None)
@@ -555,7 +632,6 @@ def get_assets_map():
subtype_id = int(subtype_id)
asset_type_lower = selected_assettype.lower() if selected_assettype else ''
if asset_type_lower == 'equipment':
# Filter by equipment type
try:
from plugins.equipment.models import Equipment
query = query.join(Equipment, Equipment.assetid == Asset.assetid).filter(
@@ -564,7 +640,6 @@ def get_assets_map():
except ImportError:
pass
elif asset_type_lower == 'computer':
# Filter by computer type
try:
from plugins.computers.models import Computer
query = query.join(Computer, Computer.assetid == Asset.assetid).filter(
@@ -573,7 +648,6 @@ def get_assets_map():
except ImportError:
pass
elif asset_type_lower == 'network device':
# Filter by network device type
try:
from plugins.network.models import NetworkDevice
query = query.join(NetworkDevice, NetworkDevice.assetid == Asset.assetid).filter(
@@ -582,7 +656,6 @@ def get_assets_map():
except ImportError:
pass
elif asset_type_lower == 'printer':
# Filter by printer type
try:
from plugins.printers.models import Printer
query = query.join(Printer, Printer.assetid == Asset.assetid).filter(
@@ -615,7 +688,29 @@ def get_assets_map():
assets = query.all()
# Build response with type-specific data
# Batch-load primary IPs in a single query instead of N+1 per asset.
# Prefer isprimary=True IP, fall back to any IP (comtypeid=1).
asset_ids = [a.assetid for a in assets]
primary_ip_map = {}
if asset_ids:
ip_rows = db.session.query(
Communication.assetid,
Communication.ipaddress,
Communication.isprimary
).filter(
Communication.assetid.in_(asset_ids),
Communication.comtypeid == 1,
Communication.isactive == True
).order_by(
Communication.isprimary.desc()
).all()
for row in ip_rows:
# First match wins (isprimary=True sorted first)
if row.assetid not in primary_ip_map:
primary_ip_map[row.assetid] = row.ipaddress
# Build response - all relationship data is already loaded, no extra queries
data = []
for asset in assets:
item = {
@@ -635,36 +730,32 @@ def get_assets_map():
'locationid': asset.locationid,
'businessunit': asset.businessunit.businessunit if asset.businessunit else None,
'businessunitid': asset.businessunitid,
'primaryip': asset.primary_ip,
'primaryip': primary_ip_map.get(asset.assetid),
}
# Add type-specific data
# Extension data is already eager-loaded via lazy='joined' backrefs
type_data = asset._get_extension_data()
if type_data:
item['typedata'] = type_data
data.append(item)
# Get available asset types for filters
# Get filter options - these are small reference tables, no N+1 concern
asset_types = AssetType.query.filter(AssetType.isactive == True).all()
types_data = [{'assettypeid': t.assettypeid, 'assettype': t.assettype, 'icon': t.icon} for t in asset_types]
# Get status options
statuses = AssetStatus.query.filter(AssetStatus.isactive == True).all()
status_data = [{'statusid': s.statusid, 'status': s.status, 'color': s.color} for s in statuses]
# Get business units
business_units = BusinessUnit.query.filter(BusinessUnit.isactive == True).all()
bu_data = [{'businessunitid': bu.businessunitid, 'businessunit': bu.businessunit} for bu in business_units]
# Get locations
locations = Location.query.filter(Location.isactive == True).all()
loc_data = [{'locationid': loc.locationid, 'locationname': loc.locationname} for loc in locations]
# Get subtypes based on asset type categories (keys match database asset type values)
# Get subtypes for filter dropdowns
subtypes = {}
# Equipment types from equipment plugin
try:
from plugins.equipment.models import EquipmentType
equipment_types = EquipmentType.query.filter(EquipmentType.isactive == True).order_by(EquipmentType.equipmenttype).all()
@@ -672,7 +763,6 @@ def get_assets_map():
except ImportError:
subtypes['Equipment'] = []
# Computer types from computers plugin
try:
from plugins.computers.models import ComputerType
computer_types = ComputerType.query.filter(ComputerType.isactive == True).order_by(ComputerType.computertype).all()
@@ -680,7 +770,6 @@ def get_assets_map():
except ImportError:
subtypes['Computer'] = []
# Network device types
try:
from plugins.network.models import NetworkDeviceType
net_types = NetworkDeviceType.query.filter(NetworkDeviceType.isactive == True).order_by(NetworkDeviceType.networkdevicetype).all()
@@ -688,7 +777,6 @@ def get_assets_map():
except ImportError:
subtypes['Network Device'] = []
# Printer types
try:
from plugins.printers.models import PrinterType
printer_types = PrinterType.query.filter(PrinterType.isactive == True).order_by(PrinterType.printertype).all()

View File

@@ -1,56 +1,91 @@
"""Global search API endpoint."""
"""Global search API endpoint with full search parity."""
import re
import ipaddress
import logging
from datetime import datetime
from flask import Blueprint, request
from flask_jwt_extended import jwt_required
from sqlalchemy.orm import joinedload
from shopdb.extensions import db
from shopdb.core.models import (
Application, KnowledgeBase,
Asset, AssetType
Asset, AssetType, Communication, Vendor, Model
)
from shopdb.utils.responses import success_response
logger = logging.getLogger(__name__)
search_bp = Blueprint('search', __name__)
# ServiceNOW URL template
SERVICENOW_URL = (
'https://geit.service-now.com/now/nav/ui/search/'
'0f8b85d0c7922010099a308dc7c2606a/params/search-term/{ticket}/'
'global-search-data-config-id/c861cea2c7022010099a308dc7c26041/'
'back-button-label/IT4IT%20Homepage/search-context/now%2Fnav%2Fui'
)
@search_bp.route('', methods=['GET'])
@jwt_required(optional=True)
def global_search():
"""
Global search across multiple entity types.
Returns combined results from:
- Machines (equipment and PCs)
- Applications
- Knowledge Base articles
- Printers (if available)
def _classify_query(query):
"""Analyze the query string to determine its nature."""
return {
'is_ip': bool(re.match(r'^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$', query)),
'is_sso': bool(re.match(r'^\d{9}$', query)),
'is_servicenow': bool(re.match(r'^(GEINC|GECHG|GERIT|GESCT)\d+', query, re.IGNORECASE)),
'servicenow_prefix': re.match(r'^(GEINC|GECHG|GERIT|GESCT)', query, re.IGNORECASE).group(1) if re.match(r'^(GEINC|GECHG|GERIT|GESCT)', query, re.IGNORECASE) else None,
'is_fqdn': bool(re.match(r'^[a-zA-Z0-9-]+(\.[a-zA-Z0-9-]+)+$', query)),
}
Results are sorted by relevance score.
"""
query = request.args.get('q', '').strip()
if not query or len(query) < 2:
return success_response({
'results': [],
'query': query,
'message': 'Search query must be at least 2 characters'
})
def _get_asset_result(asset, query, relevance=None):
"""Build a search result dict from an Asset object."""
asset_type_name = asset.assettype.assettype if asset.assettype else 'asset'
if len(query) > 200:
return success_response({
'results': [],
'query': query[:200],
'message': 'Search query too long'
})
plugin_id = asset.assetid
if asset_type_name == 'equipment' and hasattr(asset, 'equipment') and asset.equipment:
plugin_id = asset.equipment.equipmentid
elif asset_type_name == 'computer' and hasattr(asset, 'computer') and asset.computer:
plugin_id = asset.computer.computerid
elif asset_type_name == 'network_device' and hasattr(asset, 'network_device') and asset.network_device:
plugin_id = asset.network_device.networkdeviceid
elif asset_type_name == 'printer' and hasattr(asset, 'printer') and asset.printer:
plugin_id = asset.printer.printerid
url_map = {
'equipment': f"/machines/{plugin_id}",
'computer': f"/pcs/{plugin_id}",
'network_device': f"/network/{plugin_id}",
'printer': f"/printers/{plugin_id}",
}
url = url_map.get(asset_type_name, f"/assets/{asset.assetid}")
display_name = asset.display_name
subtitle = None
if asset.name and asset.assetnumber != asset.name:
subtitle = asset.assetnumber
location_name = asset.location.locationname if asset.location else None
if relevance is None:
relevance = 15
return {
'type': asset_type_name,
'id': plugin_id,
'title': display_name,
'subtitle': subtitle,
'location': location_name,
'url': url,
'relevance': relevance
}
def _search_applications(query, search_term):
"""Search Applications by name and description."""
results = []
search_term = f'%{query}%'
# NOTE: Legacy Machine search is disabled - all data is now in the Asset table
# The Asset search below handles equipment, computers, network devices, and printers
# with proper plugin-specific IDs for correct routing
# Search Applications
try:
apps = Application.query.filter(
Application.isactive == True,
@@ -76,10 +111,13 @@ def global_search():
'relevance': relevance
})
except Exception as e:
import logging
logging.error(f"Application search failed: {e}")
logger.error(f"Application search failed: {e}")
return results
# Search Knowledge Base
def _search_knowledgebase(query, search_term):
"""Search Knowledge Base by description and keywords."""
results = []
try:
kb_articles = KnowledgeBase.query.filter(
KnowledgeBase.isactive == True,
@@ -90,7 +128,6 @@ def global_search():
).limit(20).all()
for kb in kb_articles:
# Weight by clicks and keyword match
relevance = 10 + (kb.clicks or 0) * 0.1
if kb.keywords and query.lower() in kb.keywords.lower():
relevance += 15
@@ -105,13 +142,13 @@ def global_search():
'relevance': relevance
})
except Exception as e:
import logging
logging.error(f"KnowledgeBase search failed: {e}")
logger.error(f"KnowledgeBase search failed: {e}")
return results
# NOTE: Legacy Printer search removed - printers are now in the unified Asset table
# The Asset search below handles printers with correct plugin-specific IDs
# Search Employees (separate database)
def _search_employees(query, search_term):
"""Search Employees in separate wjf_employees database."""
results = []
try:
import pymysql
emp_conn = pymysql.connect(
@@ -140,7 +177,6 @@ def global_search():
full_name = f"{emp['First_Name'].strip()} {emp['Last_Name'].strip()}"
sso_str = str(emp['SSO'])
# Calculate relevance
relevance = 20
if query == sso_str:
relevance = 100
@@ -158,12 +194,18 @@ def global_search():
'relevance': relevance
})
except Exception as e:
import logging
logging.error(f"Employee search failed: {e}")
logger.error(f"Employee search failed: {e}")
return results
# Search unified Assets table
def _search_assets(query, search_term):
"""Search unified Assets table by number, name, serial, notes."""
results = []
try:
assets = Asset.query.join(AssetType).filter(
assets = Asset.query.join(AssetType).options(
joinedload(Asset.assettype),
joinedload(Asset.location),
).filter(
Asset.isactive == True,
db.or_(
Asset.assetnumber.ilike(search_term),
@@ -171,10 +213,9 @@ def global_search():
Asset.serialnumber.ilike(search_term),
Asset.notes.ilike(search_term)
)
).limit(10).all()
).limit(15).all()
for asset in assets:
# Calculate relevance
relevance = 15
if asset.assetnumber and query.lower() == asset.assetnumber.lower():
relevance = 100
@@ -185,48 +226,467 @@ def global_search():
elif asset.name and query.lower() in asset.name.lower():
relevance = 50
# Determine URL and type based on asset type
asset_type_name = asset.assettype.assettype if asset.assettype else 'asset'
results.append(_get_asset_result(asset, query, relevance))
except Exception as e:
logger.error(f"Asset search failed: {e}")
return results
# Get the plugin-specific ID for proper routing
plugin_id = asset.assetid # fallback
if asset_type_name == 'equipment' and hasattr(asset, 'equipment') and asset.equipment:
plugin_id = asset.equipment.equipmentid
elif asset_type_name == 'computer' and hasattr(asset, 'computer') and asset.computer:
plugin_id = asset.computer.computerid
elif asset_type_name == 'network_device' and hasattr(asset, 'network_device') and asset.network_device:
plugin_id = asset.network_device.networkdeviceid
elif asset_type_name == 'printer' and hasattr(asset, 'printer') and asset.printer:
plugin_id = asset.printer.printerid
url_map = {
'equipment': f"/machines/{plugin_id}",
'computer': f"/pcs/{plugin_id}",
'network_device': f"/network/{plugin_id}",
'printer': f"/printers/{plugin_id}",
}
url = url_map.get(asset_type_name, f"/assets/{asset.assetid}")
def _search_by_ip(query, search_term):
"""Search Communications table for IP address matches."""
results = []
try:
comms = Communication.query.filter(
Communication.ipaddress.ilike(search_term)
).options(
joinedload(Communication.asset).joinedload(Asset.assettype),
joinedload(Communication.asset).joinedload(Asset.location),
).limit(10).all()
display_name = asset.display_name
subtitle = None
if asset.name and asset.assetnumber != asset.name:
subtitle = asset.assetnumber
seen_assets = set()
for comm in comms:
asset = comm.asset
if not asset or not asset.isactive or asset.assetid in seen_assets:
continue
seen_assets.add(asset.assetid)
# Get location name
location_name = asset.location.locationname if asset.location else None
relevance = 80 if query == comm.ipaddress else 40
result = _get_asset_result(asset, query, relevance)
result['subtitle'] = comm.ipaddress
results.append(result)
except Exception as e:
logger.error(f"IP search failed: {e}")
return results
def _search_subnets(query):
"""Find which subnet an IP address belongs to."""
results = []
try:
from plugins.network.models import Subnet
ip_obj = ipaddress.ip_address(query)
subnets = Subnet.query.filter(Subnet.isactive == True).all()
for subnet in subnets:
try:
network = ipaddress.ip_network(subnet.cidr, strict=False)
if ip_obj in network:
results.append({
'type': 'subnet',
'id': subnet.subnetid,
'title': f'{subnet.name} ({subnet.cidr})',
'subtitle': subnet.description or subnet.subnettype,
'url': f'/network',
'relevance': 70
})
except ValueError:
continue
except ImportError:
pass
except Exception as e:
logger.error(f"Subnet search failed: {e}")
return results
def _search_hostnames(query, search_term):
"""Search hostname fields across Computer, Printer, NetworkDevice."""
results = []
# Search Computers
try:
from plugins.computers.models import Computer
computers = Computer.query.filter(
Computer.hostname.ilike(search_term)
).options(
joinedload(Computer.asset).joinedload(Asset.assettype),
joinedload(Computer.asset).joinedload(Asset.location),
).limit(10).all()
for comp in computers:
if comp.asset and comp.asset.isactive:
relevance = 85 if query.lower() == (comp.hostname or '').lower() else 40
result = _get_asset_result(comp.asset, query, relevance)
result['subtitle'] = comp.hostname
results.append(result)
except ImportError:
pass
except Exception as e:
logger.error(f"Computer hostname search failed: {e}")
# Search Printers
try:
from plugins.printers.models import Printer
printers = Printer.query.filter(
db.or_(
Printer.hostname.ilike(search_term),
Printer.sharename.ilike(search_term),
Printer.windowsname.ilike(search_term),
)
).options(
joinedload(Printer.asset).joinedload(Asset.assettype),
joinedload(Printer.asset).joinedload(Asset.location),
).limit(10).all()
for printer in printers:
if printer.asset and printer.asset.isactive:
match_field = printer.hostname or printer.sharename or ''
relevance = 85 if query.lower() == match_field.lower() else 40
result = _get_asset_result(printer.asset, query, relevance)
result['subtitle'] = printer.hostname or printer.sharename
results.append(result)
except ImportError:
pass
except Exception as e:
logger.error(f"Printer hostname search failed: {e}")
# Search Network Devices
try:
from plugins.network.models import NetworkDevice
devices = NetworkDevice.query.filter(
NetworkDevice.hostname.ilike(search_term)
).options(
joinedload(NetworkDevice.asset).joinedload(Asset.assettype),
joinedload(NetworkDevice.asset).joinedload(Asset.location),
).limit(10).all()
for device in devices:
if device.asset and device.asset.isactive:
relevance = 85 if query.lower() == (device.hostname or '').lower() else 40
result = _get_asset_result(device.asset, query, relevance)
result['subtitle'] = device.hostname
results.append(result)
except ImportError:
pass
except Exception as e:
logger.error(f"Network device hostname search failed: {e}")
return results
def _search_notifications(query, search_term):
"""Search notifications with time-weighted relevance."""
results = []
try:
from plugins.notifications.models import Notification
notifications = Notification.query.options(
joinedload(Notification.notificationtype)
).filter(
db.or_(
Notification.notification.ilike(search_term),
Notification.ticketnumber.ilike(search_term)
)
).order_by(Notification.starttime.desc()).limit(15).all()
now = datetime.utcnow()
for notif in notifications:
base_relevance = 20
if notif.ticketnumber and query.lower() == notif.ticketnumber.lower():
base_relevance = 85
# Time-weighted relevance
if notif.is_current:
base_relevance *= 3
elif notif.starttime and notif.starttime > now:
base_relevance *= 2
elif notif.endtime and (now - notif.endtime).days < 7:
base_relevance = int(base_relevance * 1.5)
results.append({
'type': asset_type_name,
'id': plugin_id,
'title': display_name,
'subtitle': subtitle,
'location': location_name,
'url': url,
'relevance': relevance
'type': 'notification',
'id': notif.notificationid,
'title': notif.title,
'subtitle': notif.notificationtype.typename if notif.notificationtype else None,
'url': f'/notifications',
'relevance': min(int(base_relevance), 100),
'ticketnumber': notif.ticketnumber,
'iscurrent': notif.is_current
})
except ImportError:
pass
except Exception as e:
import logging
logging.error(f"Asset search failed: {e}")
logger.error(f"Notification search failed: {e}")
return results
def _search_vendor_model_type(query, search_term):
"""Search assets by vendor name, model name, or equipment/device type name."""
results = []
# Equipment: vendor, model, equipmenttype
try:
from plugins.equipment.models import Equipment, EquipmentType
equipment_assets = db.session.query(Asset).join(
Equipment, Equipment.assetid == Asset.assetid
).outerjoin(
Vendor, Equipment.vendorid == Vendor.vendorid
).outerjoin(
Model, Equipment.modelnumberid == Model.modelnumberid
).outerjoin(
EquipmentType, Equipment.equipmenttypeid == EquipmentType.equipmenttypeid
).options(
joinedload(Asset.assettype),
joinedload(Asset.location),
).filter(
Asset.isactive == True,
db.or_(
Vendor.vendor.ilike(search_term),
Model.modelnumber.ilike(search_term),
EquipmentType.equipmenttype.ilike(search_term)
)
).limit(10).all()
for asset in equipment_assets:
results.append(_get_asset_result(asset, query, 30))
except ImportError:
pass
except Exception as e:
logger.error(f"Equipment vendor/model/type search failed: {e}")
# Printers: vendor, model, printertype
try:
from plugins.printers.models import Printer, PrinterType
printer_assets = db.session.query(Asset).join(
Printer, Printer.assetid == Asset.assetid
).outerjoin(
Vendor, Printer.vendorid == Vendor.vendorid
).outerjoin(
Model, Printer.modelnumberid == Model.modelnumberid
).outerjoin(
PrinterType, Printer.printertypeid == PrinterType.printertypeid
).options(
joinedload(Asset.assettype),
joinedload(Asset.location),
).filter(
Asset.isactive == True,
db.or_(
Vendor.vendor.ilike(search_term),
Model.modelnumber.ilike(search_term),
PrinterType.printertype.ilike(search_term)
)
).limit(10).all()
for asset in printer_assets:
results.append(_get_asset_result(asset, query, 30))
except ImportError:
pass
except Exception as e:
logger.error(f"Printer vendor/model/type search failed: {e}")
# Network Devices: vendor, networkdevicetype
try:
from plugins.network.models import NetworkDevice, NetworkDeviceType
netdev_assets = db.session.query(Asset).join(
NetworkDevice, NetworkDevice.assetid == Asset.assetid
).outerjoin(
Vendor, NetworkDevice.vendorid == Vendor.vendorid
).outerjoin(
NetworkDeviceType, NetworkDevice.networkdevicetypeid == NetworkDeviceType.networkdevicetypeid
).options(
joinedload(Asset.assettype),
joinedload(Asset.location),
).filter(
Asset.isactive == True,
db.or_(
Vendor.vendor.ilike(search_term),
NetworkDeviceType.networkdevicetype.ilike(search_term)
)
).limit(10).all()
for asset in netdev_assets:
results.append(_get_asset_result(asset, query, 30))
except ImportError:
pass
except Exception as e:
logger.error(f"Network device vendor/type search failed: {e}")
return results
def _check_smart_redirect(query, classification):
"""Check if query exactly matches a single entity for smart redirect."""
# Exact SSO match
if classification['is_sso']:
try:
import pymysql
emp_conn = pymysql.connect(
host='localhost',
user='root',
password='rootpassword',
database='wjf_employees',
cursorclass=pymysql.cursors.DictCursor
)
with emp_conn.cursor() as cur:
cur.execute(
'SELECT SSO, First_Name, Last_Name FROM employees WHERE SSO = %s LIMIT 1',
(query,)
)
emp = cur.fetchone()
emp_conn.close()
if emp:
name = f"{emp['First_Name'].strip()} {emp['Last_Name'].strip()}"
return {
'type': 'employee',
'url': f"/employees/{emp['SSO']}",
'label': name
}
except Exception:
pass
# Exact asset number match
try:
asset = Asset.query.options(
joinedload(Asset.assettype),
).filter(
Asset.assetnumber == query,
Asset.isactive == True
).first()
if asset:
result = _get_asset_result(asset, query)
return {
'type': result['type'],
'url': result['url'],
'label': asset.display_name
}
except Exception:
pass
# Exact printer CSF/share name
try:
from plugins.printers.models import Printer
printer = Printer.query.options(
joinedload(Printer.asset).joinedload(Asset.assettype),
).filter(
db.or_(
Printer.sharename == query,
Printer.windowsname == query
)
).first()
if printer and printer.asset and printer.asset.isactive:
return {
'type': 'printer',
'url': f"/printers/{printer.printerid}",
'label': printer.sharename or printer.asset.display_name
}
except ImportError:
pass
except Exception:
pass
# Exact hostname match (FQDN or bare hostname)
hostname_plugins = []
try:
from plugins.computers.models import Computer
hostname_plugins.append(('computer', Computer, 'computerid', '/pcs'))
except ImportError:
pass
try:
from plugins.printers.models import Printer
hostname_plugins.append(('printer', Printer, 'printerid', '/printers'))
except ImportError:
pass
try:
from plugins.network.models import NetworkDevice
hostname_plugins.append(('network_device', NetworkDevice, 'networkdeviceid', '/network'))
except ImportError:
pass
for type_name, PluginModel, id_field, url_prefix in hostname_plugins:
try:
device = PluginModel.query.options(
joinedload(PluginModel.asset)
).filter(
PluginModel.hostname == query
).first()
if device and device.asset and device.asset.isactive:
return {
'type': type_name,
'url': f"{url_prefix}/{getattr(device, id_field)}",
'label': device.hostname
}
except Exception:
pass
# Exact IP match
if classification['is_ip']:
try:
comm = Communication.query.options(
joinedload(Communication.asset).joinedload(Asset.assettype)
).filter(
Communication.ipaddress == query
).first()
if comm and comm.asset and comm.asset.isactive:
result = _get_asset_result(comm.asset, query)
return {
'type': result['type'],
'url': result['url'],
'label': f"{comm.asset.display_name} ({comm.ipaddress})"
}
except Exception:
pass
return None
@search_bp.route('', methods=['GET'])
@jwt_required(optional=True)
def global_search():
"""
Global search across multiple entity types.
Returns combined results from assets, applications, knowledge base,
employees, notifications, IP addresses, hostnames, and vendor/model/type.
Supports smart redirects and ServiceNOW ticket detection.
"""
query = request.args.get('q', '').strip()
if not query or len(query) < 2:
return success_response({
'results': [],
'query': query,
'message': 'Search query must be at least 2 characters'
})
if len(query) > 200:
return success_response({
'results': [],
'query': query[:200],
'message': 'Search query too long'
})
classification = _classify_query(query)
# ServiceNOW prefix detection - return redirect immediately
if classification['is_servicenow']:
from urllib.parse import quote
servicenow_url = SERVICENOW_URL.format(ticket=quote(query))
return success_response({
'results': [],
'query': query,
'total': 0,
'counts': {},
'redirect': {
'type': 'servicenow',
'url': servicenow_url,
'label': f'Open {query} in ServiceNOW'
}
})
results = []
search_term = f'%{query}%'
# Run all search domains
results.extend(_search_applications(query, search_term))
results.extend(_search_knowledgebase(query, search_term))
results.extend(_search_employees(query, search_term))
results.extend(_search_assets(query, search_term))
results.extend(_search_notifications(query, search_term))
results.extend(_search_hostnames(query, search_term))
results.extend(_search_vendor_model_type(query, search_term))
# IP-specific searches
if classification['is_ip']:
results.extend(_search_by_ip(query, search_term))
results.extend(_search_subnets(query))
# Sort by relevance (highest first)
results.sort(key=lambda x: x['relevance'], reverse=True)
@@ -240,11 +700,28 @@ def global_search():
seen_ids[key] = True
unique_results.append(r)
# Limit total results
unique_results = unique_results[:30]
# Compute type counts before truncation
type_counts = {}
for r in unique_results:
t = r['type']
type_counts[t] = type_counts.get(t, 0) + 1
return success_response({
total_all = len(unique_results)
# Limit total results
unique_results = unique_results[:50]
# Check for smart redirect
response_data = {
'results': unique_results,
'query': query,
'total': len(unique_results)
})
'total': len(unique_results),
'total_all': total_all,
'counts': type_counts,
}
redirect = _check_smart_redirect(query, classification)
if redirect:
response_data['redirect'] = redirect
return success_response(response_data)