Add USB, Notifications, Network plugins and reusable EmployeeSearch component

New Plugins:
- USB plugin: Device checkout/checkin with employee lookup, checkout history
- Notifications plugin: Announcements with types, scheduling, shopfloor display
- Network plugin: Network device management with subnets and VLANs
- Equipment and Computers plugins: Asset type separation

Frontend:
- EmployeeSearch component: Reusable employee lookup with autocomplete
- USB views: List, detail, checkout/checkin modals
- Notifications views: List, form with recognition mode
- Network views: Device list, detail, form
- Calendar view with FullCalendar integration
- Shopfloor and TV dashboard views
- Reports index page
- Map editor for asset positioning
- Light/dark mode fixes for map tooltips

Backend:
- Employee search API with external lookup service
- Collector API for PowerShell data collection
- Reports API endpoints
- Slides API for TV dashboard
- Fixed AppVersion model (removed BaseModel inheritance)
- Added checkout_name column to usbcheckouts table

Styling:
- Unified detail page styles
- Improved pagination (page numbers instead of prev/next)
- Dark/light mode theme improvements

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
cproudlock
2026-01-21 16:37:49 -05:00
parent 02d83335ee
commit 9c220a4194
110 changed files with 17693 additions and 600 deletions

View File

@@ -0,0 +1,5 @@
"""Computers plugin for ShopDB."""
from .plugin import ComputersPlugin
__all__ = ['ComputersPlugin']

View File

@@ -0,0 +1,5 @@
"""Computers plugin API."""
from .routes import computers_bp
__all__ = ['computers_bp']

View File

@@ -0,0 +1,628 @@
"""Computers plugin API endpoints."""
from flask import Blueprint, request
from flask_jwt_extended import jwt_required
from shopdb.extensions import db
from shopdb.core.models import Asset, AssetType, OperatingSystem, Application, AppVersion
from shopdb.utils.responses import (
success_response,
error_response,
paginated_response,
ErrorCodes
)
from shopdb.utils.pagination import get_pagination_params, paginate_query
from ..models import Computer, ComputerType, ComputerInstalledApp
computers_bp = Blueprint('computers', __name__)
# =============================================================================
# Computer Types
# =============================================================================
@computers_bp.route('/types', methods=['GET'])
@jwt_required()
def list_computer_types():
"""List all computer types."""
page, per_page = get_pagination_params(request)
query = ComputerType.query
if request.args.get('active', 'true').lower() != 'false':
query = query.filter(ComputerType.isactive == True)
if search := request.args.get('search'):
query = query.filter(ComputerType.computertype.ilike(f'%{search}%'))
query = query.order_by(ComputerType.computertype)
items, total = paginate_query(query, page, per_page)
data = [t.to_dict() for t in items]
return paginated_response(data, page, per_page, total)
@computers_bp.route('/types/<int:type_id>', methods=['GET'])
@jwt_required()
def get_computer_type(type_id: int):
"""Get a single computer type."""
t = ComputerType.query.get(type_id)
if not t:
return error_response(
ErrorCodes.NOT_FOUND,
f'Computer type with ID {type_id} not found',
http_code=404
)
return success_response(t.to_dict())
@computers_bp.route('/types', methods=['POST'])
@jwt_required()
def create_computer_type():
"""Create a new computer type."""
data = request.get_json()
if not data or not data.get('computertype'):
return error_response(ErrorCodes.VALIDATION_ERROR, 'computertype is required')
if ComputerType.query.filter_by(computertype=data['computertype']).first():
return error_response(
ErrorCodes.CONFLICT,
f"Computer type '{data['computertype']}' already exists",
http_code=409
)
t = ComputerType(
computertype=data['computertype'],
description=data.get('description'),
icon=data.get('icon')
)
db.session.add(t)
db.session.commit()
return success_response(t.to_dict(), message='Computer type created', http_code=201)
@computers_bp.route('/types/<int:type_id>', methods=['PUT'])
@jwt_required()
def update_computer_type(type_id: int):
"""Update a computer type."""
t = ComputerType.query.get(type_id)
if not t:
return error_response(
ErrorCodes.NOT_FOUND,
f'Computer type with ID {type_id} not found',
http_code=404
)
data = request.get_json()
if not data:
return error_response(ErrorCodes.VALIDATION_ERROR, 'No data provided')
if 'computertype' in data and data['computertype'] != t.computertype:
if ComputerType.query.filter_by(computertype=data['computertype']).first():
return error_response(
ErrorCodes.CONFLICT,
f"Computer type '{data['computertype']}' already exists",
http_code=409
)
for key in ['computertype', 'description', 'icon', 'isactive']:
if key in data:
setattr(t, key, data[key])
db.session.commit()
return success_response(t.to_dict(), message='Computer type updated')
# =============================================================================
# Computers CRUD
# =============================================================================
@computers_bp.route('', methods=['GET'])
@jwt_required()
def list_computers():
"""
List all computers with filtering and pagination.
Query parameters:
- page, per_page: Pagination
- active: Filter by active status
- search: Search by asset number, name, or hostname
- type_id: Filter by computer type ID
- os_id: Filter by operating system ID
- location_id: Filter by location ID
- businessunit_id: Filter by business unit ID
- shopfloor: Filter by shopfloor flag (true/false)
"""
page, per_page = get_pagination_params(request)
# Join Computer with Asset
query = db.session.query(Computer).join(Asset)
# Active filter
if request.args.get('active', 'true').lower() != 'false':
query = query.filter(Asset.isactive == True)
# Search filter
if search := request.args.get('search'):
query = query.filter(
db.or_(
Asset.assetnumber.ilike(f'%{search}%'),
Asset.name.ilike(f'%{search}%'),
Asset.serialnumber.ilike(f'%{search}%'),
Computer.hostname.ilike(f'%{search}%')
)
)
# Computer type filter
if type_id := request.args.get('type_id'):
query = query.filter(Computer.computertypeid == int(type_id))
# OS filter
if os_id := request.args.get('os_id'):
query = query.filter(Computer.osid == int(os_id))
# Location filter
if location_id := request.args.get('location_id'):
query = query.filter(Asset.locationid == int(location_id))
# Business unit filter
if bu_id := request.args.get('businessunit_id'):
query = query.filter(Asset.businessunitid == int(bu_id))
# Shopfloor filter
if shopfloor := request.args.get('shopfloor'):
query = query.filter(Computer.isshopfloor == (shopfloor.lower() == 'true'))
# Sorting
sort_by = request.args.get('sort', 'hostname')
sort_dir = request.args.get('dir', 'asc')
if sort_by == 'hostname':
col = Computer.hostname
elif sort_by == 'assetnumber':
col = Asset.assetnumber
elif sort_by == 'name':
col = Asset.name
elif sort_by == 'lastreporteddate':
col = Computer.lastreporteddate
else:
col = Computer.hostname
query = query.order_by(col.desc() if sort_dir == 'desc' else col)
items, total = paginate_query(query, page, per_page)
# Build response with both asset and computer data
data = []
for comp in items:
item = comp.asset.to_dict() if comp.asset else {}
item['computer'] = comp.to_dict()
data.append(item)
return paginated_response(data, page, per_page, total)
@computers_bp.route('/<int:computer_id>', methods=['GET'])
@jwt_required()
def get_computer(computer_id: int):
"""Get a single computer with full details."""
comp = Computer.query.get(computer_id)
if not comp:
return error_response(
ErrorCodes.NOT_FOUND,
f'Computer with ID {computer_id} not found',
http_code=404
)
result = comp.asset.to_dict() if comp.asset else {}
result['computer'] = comp.to_dict()
return success_response(result)
@computers_bp.route('/by-asset/<int:asset_id>', methods=['GET'])
@jwt_required()
def get_computer_by_asset(asset_id: int):
"""Get computer data by asset ID."""
comp = Computer.query.filter_by(assetid=asset_id).first()
if not comp:
return error_response(
ErrorCodes.NOT_FOUND,
f'Computer for asset {asset_id} not found',
http_code=404
)
result = comp.asset.to_dict() if comp.asset else {}
result['computer'] = comp.to_dict()
return success_response(result)
@computers_bp.route('/by-hostname/<hostname>', methods=['GET'])
@jwt_required()
def get_computer_by_hostname(hostname: str):
"""Get computer by hostname."""
comp = Computer.query.filter_by(hostname=hostname).first()
if not comp:
return error_response(
ErrorCodes.NOT_FOUND,
f'Computer with hostname {hostname} not found',
http_code=404
)
result = comp.asset.to_dict() if comp.asset else {}
result['computer'] = comp.to_dict()
return success_response(result)
@computers_bp.route('', methods=['POST'])
@jwt_required()
def create_computer():
"""
Create new computer (creates both Asset and Computer records).
Required fields:
- assetnumber: Business identifier
Optional fields:
- name, serialnumber, statusid, locationid, businessunitid
- computertypeid, hostname, osid
- isvnc, iswinrm, isshopfloor
- mapleft, maptop, notes
"""
data = request.get_json()
if not data:
return error_response(ErrorCodes.VALIDATION_ERROR, 'No data provided')
if not data.get('assetnumber'):
return error_response(ErrorCodes.VALIDATION_ERROR, 'assetnumber is required')
# Check for duplicate assetnumber
if Asset.query.filter_by(assetnumber=data['assetnumber']).first():
return error_response(
ErrorCodes.CONFLICT,
f"Asset with number '{data['assetnumber']}' already exists",
http_code=409
)
# Check for duplicate hostname
if data.get('hostname'):
if Computer.query.filter_by(hostname=data['hostname']).first():
return error_response(
ErrorCodes.CONFLICT,
f"Computer with hostname '{data['hostname']}' already exists",
http_code=409
)
# Get computer asset type
computer_type = AssetType.query.filter_by(assettype='computer').first()
if not computer_type:
return error_response(
ErrorCodes.INTERNAL_ERROR,
'Computer asset type not found. Plugin may not be properly installed.',
http_code=500
)
# Create the core asset
asset = Asset(
assetnumber=data['assetnumber'],
name=data.get('name'),
serialnumber=data.get('serialnumber'),
assettypeid=computer_type.assettypeid,
statusid=data.get('statusid', 1),
locationid=data.get('locationid'),
businessunitid=data.get('businessunitid'),
mapleft=data.get('mapleft'),
maptop=data.get('maptop'),
notes=data.get('notes')
)
db.session.add(asset)
db.session.flush() # Get the assetid
# Create the computer extension
comp = Computer(
assetid=asset.assetid,
computertypeid=data.get('computertypeid'),
hostname=data.get('hostname'),
osid=data.get('osid'),
loggedinuser=data.get('loggedinuser'),
lastreporteddate=data.get('lastreporteddate'),
lastboottime=data.get('lastboottime'),
isvnc=data.get('isvnc', False),
iswinrm=data.get('iswinrm', False),
isshopfloor=data.get('isshopfloor', False)
)
db.session.add(comp)
db.session.commit()
result = asset.to_dict()
result['computer'] = comp.to_dict()
return success_response(result, message='Computer created', http_code=201)
@computers_bp.route('/<int:computer_id>', methods=['PUT'])
@jwt_required()
def update_computer(computer_id: int):
"""Update computer (both Asset and Computer records)."""
comp = Computer.query.get(computer_id)
if not comp:
return error_response(
ErrorCodes.NOT_FOUND,
f'Computer with ID {computer_id} not found',
http_code=404
)
data = request.get_json()
if not data:
return error_response(ErrorCodes.VALIDATION_ERROR, 'No data provided')
asset = comp.asset
# Check for conflicting assetnumber
if 'assetnumber' in data and data['assetnumber'] != asset.assetnumber:
if Asset.query.filter_by(assetnumber=data['assetnumber']).first():
return error_response(
ErrorCodes.CONFLICT,
f"Asset with number '{data['assetnumber']}' already exists",
http_code=409
)
# Check for conflicting hostname
if 'hostname' in data and data['hostname'] != comp.hostname:
existing = Computer.query.filter_by(hostname=data['hostname']).first()
if existing and existing.computerid != computer_id:
return error_response(
ErrorCodes.CONFLICT,
f"Computer with hostname '{data['hostname']}' already exists",
http_code=409
)
# Update asset fields
asset_fields = ['assetnumber', 'name', 'serialnumber', 'statusid',
'locationid', 'businessunitid', 'mapleft', 'maptop', 'notes', 'isactive']
for key in asset_fields:
if key in data:
setattr(asset, key, data[key])
# Update computer fields
computer_fields = ['computertypeid', 'hostname', 'osid', 'loggedinuser',
'lastreporteddate', 'lastboottime', 'isvnc', 'iswinrm', 'isshopfloor']
for key in computer_fields:
if key in data:
setattr(comp, key, data[key])
db.session.commit()
result = asset.to_dict()
result['computer'] = comp.to_dict()
return success_response(result, message='Computer updated')
@computers_bp.route('/<int:computer_id>', methods=['DELETE'])
@jwt_required()
def delete_computer(computer_id: int):
"""Delete (soft delete) computer."""
comp = Computer.query.get(computer_id)
if not comp:
return error_response(
ErrorCodes.NOT_FOUND,
f'Computer with ID {computer_id} not found',
http_code=404
)
# Soft delete the asset
comp.asset.isactive = False
db.session.commit()
return success_response(message='Computer deleted')
# =============================================================================
# Installed Applications
# =============================================================================
@computers_bp.route('/<int:computer_id>/apps', methods=['GET'])
@jwt_required()
def get_installed_apps(computer_id: int):
"""Get all installed applications for a computer."""
comp = Computer.query.get(computer_id)
if not comp:
return error_response(
ErrorCodes.NOT_FOUND,
f'Computer with ID {computer_id} not found',
http_code=404
)
apps = ComputerInstalledApp.query.filter_by(
computerid=computer_id,
isactive=True
).all()
data = [app.to_dict() for app in apps]
return success_response(data)
@computers_bp.route('/<int:computer_id>/apps', methods=['POST'])
@jwt_required()
def add_installed_app(computer_id: int):
"""Add an installed application to a computer."""
comp = Computer.query.get(computer_id)
if not comp:
return error_response(
ErrorCodes.NOT_FOUND,
f'Computer with ID {computer_id} not found',
http_code=404
)
data = request.get_json()
if not data or not data.get('appid'):
return error_response(ErrorCodes.VALIDATION_ERROR, 'appid is required')
appid = data['appid']
# Validate app exists
if not Application.query.get(appid):
return error_response(ErrorCodes.NOT_FOUND, f'Application {appid} not found', http_code=404)
# Check for duplicate
existing = ComputerInstalledApp.query.filter_by(
computerid=computer_id,
appid=appid
).first()
if existing:
if existing.isactive:
return error_response(
ErrorCodes.CONFLICT,
'This application is already installed on this computer',
http_code=409
)
else:
# Reactivate
existing.isactive = True
existing.appversionid = data.get('appversionid')
db.session.commit()
return success_response(existing.to_dict(), message='Application reinstalled')
# Create new installation record
installed = ComputerInstalledApp(
computerid=computer_id,
appid=appid,
appversionid=data.get('appversionid')
)
db.session.add(installed)
db.session.commit()
return success_response(installed.to_dict(), message='Application installed', http_code=201)
@computers_bp.route('/<int:computer_id>/apps/<int:app_id>', methods=['DELETE'])
@jwt_required()
def remove_installed_app(computer_id: int, app_id: int):
"""Remove an installed application from a computer."""
installed = ComputerInstalledApp.query.filter_by(
computerid=computer_id,
appid=app_id,
isactive=True
).first()
if not installed:
return error_response(
ErrorCodes.NOT_FOUND,
'Installation record not found',
http_code=404
)
installed.isactive = False
db.session.commit()
return success_response(message='Application uninstalled')
# =============================================================================
# Status Reporting
# =============================================================================
@computers_bp.route('/<int:computer_id>/report', methods=['POST'])
@jwt_required(optional=True)
def report_status(computer_id: int):
"""
Report computer status (for agent-based reporting).
This endpoint can be called periodically by a client agent
to update status information.
"""
comp = Computer.query.get(computer_id)
if not comp:
return error_response(
ErrorCodes.NOT_FOUND,
f'Computer with ID {computer_id} not found',
http_code=404
)
data = request.get_json() or {}
# Update status fields
from datetime import datetime
comp.lastreporteddate = datetime.utcnow()
if 'loggedinuser' in data:
comp.loggedinuser = data['loggedinuser']
if 'lastboottime' in data:
comp.lastboottime = data['lastboottime']
db.session.commit()
return success_response(message='Status reported')
# =============================================================================
# Dashboard
# =============================================================================
@computers_bp.route('/dashboard/summary', methods=['GET'])
@jwt_required()
def dashboard_summary():
"""Get computer dashboard summary data."""
# Total active computers
total = db.session.query(Computer).join(Asset).filter(
Asset.isactive == True
).count()
# Count by computer type
by_type = db.session.query(
ComputerType.computertype,
db.func.count(Computer.computerid)
).join(Computer, Computer.computertypeid == ComputerType.computertypeid
).join(Asset, Asset.assetid == Computer.assetid
).filter(Asset.isactive == True
).group_by(ComputerType.computertype
).all()
# Count by OS
by_os = db.session.query(
OperatingSystem.osname,
db.func.count(Computer.computerid)
).join(Computer, Computer.osid == OperatingSystem.osid
).join(Asset, Asset.assetid == Computer.assetid
).filter(Asset.isactive == True
).group_by(OperatingSystem.osname
).all()
# Count shopfloor vs non-shopfloor
shopfloor_count = db.session.query(Computer).join(Asset).filter(
Asset.isactive == True,
Computer.isshopfloor == True
).count()
return success_response({
'total': total,
'by_type': [{'type': t, 'count': c} for t, c in by_type],
'by_os': [{'os': o, 'count': c} for o, c in by_os],
'shopfloor': shopfloor_count,
'non_shopfloor': total - shopfloor_count
})

View File

@@ -0,0 +1,23 @@
{
"name": "computers",
"version": "1.0.0",
"description": "Computer management plugin for PCs, servers, and workstations with software tracking",
"author": "ShopDB Team",
"dependencies": [],
"core_version": ">=1.0.0",
"api_prefix": "/api/computers",
"provides": {
"asset_type": "computer",
"features": [
"computer_tracking",
"software_inventory",
"remote_access",
"os_management"
]
},
"settings": {
"enable_winrm": true,
"enable_vnc": true,
"auto_report_interval_hours": 24
}
}

View File

@@ -0,0 +1,9 @@
"""Computers plugin models."""
from .computer import Computer, ComputerType, ComputerInstalledApp
__all__ = [
'Computer',
'ComputerType',
'ComputerInstalledApp',
]

View File

@@ -0,0 +1,184 @@
"""Computer plugin models."""
from shopdb.extensions import db
from shopdb.core.models.base import BaseModel
class ComputerType(BaseModel):
"""
Computer type classification.
Examples: Shopfloor PC, Engineer Workstation, CMM PC, Server, etc.
"""
__tablename__ = 'computertypes'
computertypeid = db.Column(db.Integer, primary_key=True)
computertype = db.Column(db.String(100), unique=True, nullable=False)
description = db.Column(db.Text)
icon = db.Column(db.String(50), comment='Icon name for UI')
def __repr__(self):
return f"<ComputerType {self.computertype}>"
class Computer(BaseModel):
"""
Computer-specific extension data.
Links to core Asset table via assetid.
Stores computer-specific fields like hostname, OS, logged in user, etc.
"""
__tablename__ = 'computers'
computerid = db.Column(db.Integer, primary_key=True)
# Link to core asset
assetid = db.Column(
db.Integer,
db.ForeignKey('assets.assetid', ondelete='CASCADE'),
unique=True,
nullable=False,
index=True
)
# Computer classification
computertypeid = db.Column(
db.Integer,
db.ForeignKey('computertypes.computertypeid'),
nullable=True
)
# Network identity
hostname = db.Column(
db.String(100),
index=True,
comment='Network hostname'
)
# Operating system
osid = db.Column(
db.Integer,
db.ForeignKey('operatingsystems.osid'),
nullable=True
)
# Status tracking
loggedinuser = db.Column(db.String(100), nullable=True)
lastreporteddate = db.Column(db.DateTime, nullable=True)
lastboottime = db.Column(db.DateTime, nullable=True)
# Remote access features
isvnc = db.Column(
db.Boolean,
default=False,
comment='VNC remote access enabled'
)
iswinrm = db.Column(
db.Boolean,
default=False,
comment='WinRM enabled'
)
# Classification flags
isshopfloor = db.Column(
db.Boolean,
default=False,
comment='Shopfloor PC (vs office PC)'
)
# Relationships
asset = db.relationship(
'Asset',
backref=db.backref('computer', uselist=False, lazy='joined')
)
computertype = db.relationship('ComputerType', backref='computers')
operatingsystem = db.relationship('OperatingSystem', backref='computers')
# Installed applications (one-to-many)
installedapps = db.relationship(
'ComputerInstalledApp',
back_populates='computer',
cascade='all, delete-orphan',
lazy='dynamic'
)
__table_args__ = (
db.Index('idx_computer_type', 'computertypeid'),
db.Index('idx_computer_hostname', 'hostname'),
db.Index('idx_computer_os', 'osid'),
)
def __repr__(self):
return f"<Computer {self.hostname or self.assetid}>"
def to_dict(self):
"""Convert to dictionary with related names."""
result = super().to_dict()
# Add related object names
if self.computertype:
result['computertype_name'] = self.computertype.computertype
if self.operatingsystem:
result['os_name'] = self.operatingsystem.osname
return result
class ComputerInstalledApp(db.Model):
"""
Junction table for applications installed on computers.
Tracks which applications are installed on which computers,
including version information.
"""
__tablename__ = 'computerinstalledapps'
id = db.Column(db.Integer, primary_key=True)
computerid = db.Column(
db.Integer,
db.ForeignKey('computers.computerid', ondelete='CASCADE'),
nullable=False
)
appid = db.Column(
db.Integer,
db.ForeignKey('applications.appid'),
nullable=False
)
appversionid = db.Column(
db.Integer,
db.ForeignKey('appversions.appversionid'),
nullable=True
)
isactive = db.Column(db.Boolean, default=True, nullable=False)
installeddate = db.Column(db.DateTime, default=db.func.now())
# Relationships
computer = db.relationship('Computer', back_populates='installedapps')
application = db.relationship('Application')
appversion = db.relationship('AppVersion')
__table_args__ = (
db.UniqueConstraint('computerid', 'appid', name='uq_computer_app'),
db.Index('idx_compapp_computer', 'computerid'),
db.Index('idx_compapp_app', 'appid'),
)
def to_dict(self):
"""Convert to dictionary."""
return {
'id': self.id,
'computerid': self.computerid,
'appid': self.appid,
'appversionid': self.appversionid,
'isactive': self.isactive,
'installeddate': self.installeddate.isoformat() + 'Z' if self.installeddate else None,
'application': {
'appid': self.application.appid,
'appname': self.application.appname,
'appdescription': self.application.appdescription,
} if self.application else None,
'version': self.appversion.version if self.appversion else None
}
def __repr__(self):
return f"<ComputerInstalledApp computer={self.computerid} app={self.appid}>"

209
plugins/computers/plugin.py Normal file
View File

@@ -0,0 +1,209 @@
"""Computers plugin main class."""
import json
import logging
from pathlib import Path
from typing import List, Dict, Optional, Type
from flask import Flask, Blueprint
import click
from shopdb.plugins.base import BasePlugin, PluginMeta
from shopdb.extensions import db
from shopdb.core.models import AssetType, AssetStatus
from .models import Computer, ComputerType, ComputerInstalledApp
from .api import computers_bp
logger = logging.getLogger(__name__)
class ComputersPlugin(BasePlugin):
"""
Computers plugin - manages PC, server, and workstation assets.
Computers include shopfloor PCs, engineer workstations, servers, etc.
Uses the new Asset architecture with Computer extension table.
"""
def __init__(self):
self._manifest = self._load_manifest()
def _load_manifest(self) -> Dict:
"""Load plugin manifest from JSON file."""
manifestpath = Path(__file__).parent / 'manifest.json'
if manifestpath.exists():
with open(manifestpath, 'r') as f:
return json.load(f)
return {}
@property
def meta(self) -> PluginMeta:
"""Return plugin metadata."""
return PluginMeta(
name=self._manifest.get('name', 'computers'),
version=self._manifest.get('version', '1.0.0'),
description=self._manifest.get(
'description',
'Computer management for PCs, servers, and workstations'
),
author=self._manifest.get('author', 'ShopDB Team'),
dependencies=self._manifest.get('dependencies', []),
core_version=self._manifest.get('core_version', '>=1.0.0'),
api_prefix=self._manifest.get('api_prefix', '/api/computers'),
)
def get_blueprint(self) -> Optional[Blueprint]:
"""Return Flask Blueprint with API routes."""
return computers_bp
def get_models(self) -> List[Type]:
"""Return list of SQLAlchemy model classes."""
return [Computer, ComputerType, ComputerInstalledApp]
def init_app(self, app: Flask, db_instance) -> None:
"""Initialize plugin with Flask app."""
logger.info(f"Computers plugin initialized (v{self.meta.version})")
def on_install(self, app: Flask) -> None:
"""Called when plugin is installed."""
with app.app_context():
self._ensure_asset_type()
self._ensure_computer_types()
logger.info("Computers plugin installed")
def _ensure_asset_type(self) -> None:
"""Ensure computer asset type exists."""
existing = AssetType.query.filter_by(assettype='computer').first()
if not existing:
at = AssetType(
assettype='computer',
plugin_name='computers',
table_name='computers',
description='PCs, servers, and workstations',
icon='desktop'
)
db.session.add(at)
logger.debug("Created asset type: computer")
db.session.commit()
def _ensure_computer_types(self) -> None:
"""Ensure basic computer types exist."""
computer_types = [
('Shopfloor PC', 'PC located on the shop floor for machine operation', 'desktop'),
('Engineer Workstation', 'Engineering workstation for CAD/CAM work', 'laptop'),
('CMM PC', 'PC dedicated to CMM operation', 'desktop'),
('Server', 'Server system', 'server'),
('Kiosk', 'Kiosk or info display PC', 'tv'),
('Laptop', 'Laptop computer', 'laptop'),
('Virtual Machine', 'Virtual machine', 'cloud'),
('Other', 'Other computer type', 'desktop'),
]
for name, description, icon in computer_types:
existing = ComputerType.query.filter_by(computertype=name).first()
if not existing:
ct = ComputerType(
computertype=name,
description=description,
icon=icon
)
db.session.add(ct)
logger.debug(f"Created computer type: {name}")
db.session.commit()
def on_uninstall(self, app: Flask) -> None:
"""Called when plugin is uninstalled."""
logger.info("Computers plugin uninstalled")
def get_cli_commands(self) -> List:
"""Return CLI commands for this plugin."""
@click.group('computers')
def computerscli():
"""Computers plugin commands."""
pass
@computerscli.command('list-types')
def list_types():
"""List all computer types."""
from flask import current_app
with current_app.app_context():
types = ComputerType.query.filter_by(isactive=True).all()
if not types:
click.echo('No computer types found.')
return
click.echo('Computer Types:')
for t in types:
click.echo(f" [{t.computertypeid}] {t.computertype}")
@computerscli.command('stats')
def stats():
"""Show computer statistics."""
from flask import current_app
from shopdb.core.models import Asset
with current_app.app_context():
total = db.session.query(Computer).join(Asset).filter(
Asset.isactive == True
).count()
click.echo(f"Total active computers: {total}")
# Shopfloor count
shopfloor = db.session.query(Computer).join(Asset).filter(
Asset.isactive == True,
Computer.isshopfloor == True
).count()
click.echo(f" Shopfloor PCs: {shopfloor}")
click.echo(f" Other: {total - shopfloor}")
@computerscli.command('find')
@click.argument('hostname')
def find_by_hostname(hostname):
"""Find a computer by hostname."""
from flask import current_app
with current_app.app_context():
comp = Computer.query.filter(
Computer.hostname.ilike(f'%{hostname}%')
).first()
if not comp:
click.echo(f'No computer found matching hostname: {hostname}')
return
click.echo(f'Found: {comp.hostname}')
click.echo(f' Asset: {comp.asset.assetnumber}')
click.echo(f' Type: {comp.computertype.computertype if comp.computertype else "N/A"}')
click.echo(f' OS: {comp.operatingsystem.osname if comp.operatingsystem else "N/A"}')
click.echo(f' Logged in: {comp.loggedinuser or "N/A"}')
return [computerscli]
def get_dashboard_widgets(self) -> List[Dict]:
"""Return dashboard widget definitions."""
return [
{
'name': 'Computer Status',
'component': 'ComputerStatusWidget',
'endpoint': '/api/computers/dashboard/summary',
'size': 'medium',
'position': 6,
},
]
def get_navigation_items(self) -> List[Dict]:
"""Return navigation menu items."""
return [
{
'name': 'Computers',
'icon': 'desktop',
'route': '/computers',
'position': 15,
},
]

View File

@@ -0,0 +1,5 @@
"""Equipment plugin for ShopDB."""
from .plugin import EquipmentPlugin
__all__ = ['EquipmentPlugin']

View File

@@ -0,0 +1,5 @@
"""Equipment plugin API."""
from .routes import equipment_bp
__all__ = ['equipment_bp']

View File

@@ -0,0 +1,429 @@
"""Equipment plugin API endpoints."""
from flask import Blueprint, request
from flask_jwt_extended import jwt_required
from shopdb.extensions import db
from shopdb.core.models import Asset, AssetType, Vendor, Model
from shopdb.utils.responses import (
success_response,
error_response,
paginated_response,
ErrorCodes
)
from shopdb.utils.pagination import get_pagination_params, paginate_query
from ..models import Equipment, EquipmentType
equipment_bp = Blueprint('equipment', __name__)
# =============================================================================
# Equipment Types
# =============================================================================
@equipment_bp.route('/types', methods=['GET'])
@jwt_required()
def list_equipment_types():
"""List all equipment types."""
page, per_page = get_pagination_params(request)
query = EquipmentType.query
if request.args.get('active', 'true').lower() != 'false':
query = query.filter(EquipmentType.isactive == True)
if search := request.args.get('search'):
query = query.filter(EquipmentType.equipmenttype.ilike(f'%{search}%'))
query = query.order_by(EquipmentType.equipmenttype)
items, total = paginate_query(query, page, per_page)
data = [t.to_dict() for t in items]
return paginated_response(data, page, per_page, total)
@equipment_bp.route('/types/<int:type_id>', methods=['GET'])
@jwt_required()
def get_equipment_type(type_id: int):
"""Get a single equipment type."""
t = EquipmentType.query.get(type_id)
if not t:
return error_response(
ErrorCodes.NOT_FOUND,
f'Equipment type with ID {type_id} not found',
http_code=404
)
return success_response(t.to_dict())
@equipment_bp.route('/types', methods=['POST'])
@jwt_required()
def create_equipment_type():
"""Create a new equipment type."""
data = request.get_json()
if not data or not data.get('equipmenttype'):
return error_response(ErrorCodes.VALIDATION_ERROR, 'equipmenttype is required')
if EquipmentType.query.filter_by(equipmenttype=data['equipmenttype']).first():
return error_response(
ErrorCodes.CONFLICT,
f"Equipment type '{data['equipmenttype']}' already exists",
http_code=409
)
t = EquipmentType(
equipmenttype=data['equipmenttype'],
description=data.get('description'),
icon=data.get('icon')
)
db.session.add(t)
db.session.commit()
return success_response(t.to_dict(), message='Equipment type created', http_code=201)
@equipment_bp.route('/types/<int:type_id>', methods=['PUT'])
@jwt_required()
def update_equipment_type(type_id: int):
"""Update an equipment type."""
t = EquipmentType.query.get(type_id)
if not t:
return error_response(
ErrorCodes.NOT_FOUND,
f'Equipment type with ID {type_id} not found',
http_code=404
)
data = request.get_json()
if not data:
return error_response(ErrorCodes.VALIDATION_ERROR, 'No data provided')
if 'equipmenttype' in data and data['equipmenttype'] != t.equipmenttype:
if EquipmentType.query.filter_by(equipmenttype=data['equipmenttype']).first():
return error_response(
ErrorCodes.CONFLICT,
f"Equipment type '{data['equipmenttype']}' already exists",
http_code=409
)
for key in ['equipmenttype', 'description', 'icon', 'isactive']:
if key in data:
setattr(t, key, data[key])
db.session.commit()
return success_response(t.to_dict(), message='Equipment type updated')
# =============================================================================
# Equipment CRUD
# =============================================================================
@equipment_bp.route('', methods=['GET'])
@jwt_required()
def list_equipment():
"""
List all equipment with filtering and pagination.
Query parameters:
- page, per_page: Pagination
- active: Filter by active status
- search: Search by asset number or name
- type_id: Filter by equipment type ID
- vendor_id: Filter by vendor ID
- location_id: Filter by location ID
- businessunit_id: Filter by business unit ID
"""
page, per_page = get_pagination_params(request)
# Join Equipment with Asset
query = db.session.query(Equipment).join(Asset)
# Active filter
if request.args.get('active', 'true').lower() != 'false':
query = query.filter(Asset.isactive == True)
# Search filter
if search := request.args.get('search'):
query = query.filter(
db.or_(
Asset.assetnumber.ilike(f'%{search}%'),
Asset.name.ilike(f'%{search}%'),
Asset.serialnumber.ilike(f'%{search}%')
)
)
# Equipment type filter
if type_id := request.args.get('type_id'):
query = query.filter(Equipment.equipmenttypeid == int(type_id))
# Vendor filter
if vendor_id := request.args.get('vendor_id'):
query = query.filter(Equipment.vendorid == int(vendor_id))
# Location filter
if location_id := request.args.get('location_id'):
query = query.filter(Asset.locationid == int(location_id))
# Business unit filter
if bu_id := request.args.get('businessunit_id'):
query = query.filter(Asset.businessunitid == int(bu_id))
# Sorting
sort_by = request.args.get('sort', 'assetnumber')
sort_dir = request.args.get('dir', 'asc')
if sort_by == 'assetnumber':
col = Asset.assetnumber
elif sort_by == 'name':
col = Asset.name
else:
col = Asset.assetnumber
query = query.order_by(col.desc() if sort_dir == 'desc' else col)
items, total = paginate_query(query, page, per_page)
# Build response with both asset and equipment data
data = []
for equip in items:
item = equip.asset.to_dict() if equip.asset else {}
item['equipment'] = equip.to_dict()
data.append(item)
return paginated_response(data, page, per_page, total)
@equipment_bp.route('/<int:equipment_id>', methods=['GET'])
@jwt_required()
def get_equipment(equipment_id: int):
"""Get a single equipment item with full details."""
equip = Equipment.query.get(equipment_id)
if not equip:
return error_response(
ErrorCodes.NOT_FOUND,
f'Equipment with ID {equipment_id} not found',
http_code=404
)
result = equip.asset.to_dict() if equip.asset else {}
result['equipment'] = equip.to_dict()
return success_response(result)
@equipment_bp.route('/by-asset/<int:asset_id>', methods=['GET'])
@jwt_required()
def get_equipment_by_asset(asset_id: int):
"""Get equipment data by asset ID."""
equip = Equipment.query.filter_by(assetid=asset_id).first()
if not equip:
return error_response(
ErrorCodes.NOT_FOUND,
f'Equipment for asset {asset_id} not found',
http_code=404
)
result = equip.asset.to_dict() if equip.asset else {}
result['equipment'] = equip.to_dict()
return success_response(result)
@equipment_bp.route('', methods=['POST'])
@jwt_required()
def create_equipment():
"""
Create new equipment (creates both Asset and Equipment records).
Required fields:
- assetnumber: Business identifier
Optional fields:
- name, serialnumber, statusid, locationid, businessunitid
- equipmenttypeid, vendorid, modelnumberid
- requiresmanualconfig, islocationonly
- mapleft, maptop, notes
"""
data = request.get_json()
if not data:
return error_response(ErrorCodes.VALIDATION_ERROR, 'No data provided')
if not data.get('assetnumber'):
return error_response(ErrorCodes.VALIDATION_ERROR, 'assetnumber is required')
# Check for duplicate assetnumber
if Asset.query.filter_by(assetnumber=data['assetnumber']).first():
return error_response(
ErrorCodes.CONFLICT,
f"Asset with number '{data['assetnumber']}' already exists",
http_code=409
)
# Get equipment asset type
equipment_type = AssetType.query.filter_by(assettype='equipment').first()
if not equipment_type:
return error_response(
ErrorCodes.INTERNAL_ERROR,
'Equipment asset type not found. Plugin may not be properly installed.',
http_code=500
)
# Create the core asset
asset = Asset(
assetnumber=data['assetnumber'],
name=data.get('name'),
serialnumber=data.get('serialnumber'),
assettypeid=equipment_type.assettypeid,
statusid=data.get('statusid', 1),
locationid=data.get('locationid'),
businessunitid=data.get('businessunitid'),
mapleft=data.get('mapleft'),
maptop=data.get('maptop'),
notes=data.get('notes')
)
db.session.add(asset)
db.session.flush() # Get the assetid
# Create the equipment extension
equip = Equipment(
assetid=asset.assetid,
equipmenttypeid=data.get('equipmenttypeid'),
vendorid=data.get('vendorid'),
modelnumberid=data.get('modelnumberid'),
requiresmanualconfig=data.get('requiresmanualconfig', False),
islocationonly=data.get('islocationonly', False),
lastmaintenancedate=data.get('lastmaintenancedate'),
nextmaintenancedate=data.get('nextmaintenancedate'),
maintenanceintervaldays=data.get('maintenanceintervaldays')
)
db.session.add(equip)
db.session.commit()
result = asset.to_dict()
result['equipment'] = equip.to_dict()
return success_response(result, message='Equipment created', http_code=201)
@equipment_bp.route('/<int:equipment_id>', methods=['PUT'])
@jwt_required()
def update_equipment(equipment_id: int):
"""Update equipment (both Asset and Equipment records)."""
equip = Equipment.query.get(equipment_id)
if not equip:
return error_response(
ErrorCodes.NOT_FOUND,
f'Equipment with ID {equipment_id} not found',
http_code=404
)
data = request.get_json()
if not data:
return error_response(ErrorCodes.VALIDATION_ERROR, 'No data provided')
asset = equip.asset
# Check for conflicting assetnumber
if 'assetnumber' in data and data['assetnumber'] != asset.assetnumber:
if Asset.query.filter_by(assetnumber=data['assetnumber']).first():
return error_response(
ErrorCodes.CONFLICT,
f"Asset with number '{data['assetnumber']}' already exists",
http_code=409
)
# Update asset fields
asset_fields = ['assetnumber', 'name', 'serialnumber', 'statusid',
'locationid', 'businessunitid', 'mapleft', 'maptop', 'notes', 'isactive']
for key in asset_fields:
if key in data:
setattr(asset, key, data[key])
# Update equipment fields
equipment_fields = ['equipmenttypeid', 'vendorid', 'modelnumberid',
'requiresmanualconfig', 'islocationonly',
'lastmaintenancedate', 'nextmaintenancedate', 'maintenanceintervaldays']
for key in equipment_fields:
if key in data:
setattr(equip, key, data[key])
db.session.commit()
result = asset.to_dict()
result['equipment'] = equip.to_dict()
return success_response(result, message='Equipment updated')
@equipment_bp.route('/<int:equipment_id>', methods=['DELETE'])
@jwt_required()
def delete_equipment(equipment_id: int):
"""Delete (soft delete) equipment."""
equip = Equipment.query.get(equipment_id)
if not equip:
return error_response(
ErrorCodes.NOT_FOUND,
f'Equipment with ID {equipment_id} not found',
http_code=404
)
# Soft delete the asset (equipment extension will stay linked)
equip.asset.isactive = False
db.session.commit()
return success_response(message='Equipment deleted')
# =============================================================================
# Dashboard
# =============================================================================
@equipment_bp.route('/dashboard/summary', methods=['GET'])
@jwt_required()
def dashboard_summary():
"""Get equipment dashboard summary data."""
# Total active equipment count
total = db.session.query(Equipment).join(Asset).filter(
Asset.isactive == True
).count()
# Count by equipment type
by_type = db.session.query(
EquipmentType.equipmenttype,
db.func.count(Equipment.equipmentid)
).join(Equipment, Equipment.equipmenttypeid == EquipmentType.equipmenttypeid
).join(Asset, Asset.assetid == Equipment.assetid
).filter(Asset.isactive == True
).group_by(EquipmentType.equipmenttype
).all()
# Count by status
from shopdb.core.models import AssetStatus
by_status = db.session.query(
AssetStatus.status,
db.func.count(Equipment.equipmentid)
).join(Asset, Asset.assetid == Equipment.assetid
).join(AssetStatus, AssetStatus.statusid == Asset.statusid
).filter(Asset.isactive == True
).group_by(AssetStatus.status
).all()
return success_response({
'total': total,
'by_type': [{'type': t, 'count': c} for t, c in by_type],
'by_status': [{'status': s, 'count': c} for s, c in by_status]
})

View File

@@ -0,0 +1,22 @@
{
"name": "equipment",
"version": "1.0.0",
"description": "Equipment management plugin for CNCs, CMMs, lathes, grinders, and other manufacturing equipment",
"author": "ShopDB Team",
"dependencies": [],
"core_version": ">=1.0.0",
"api_prefix": "/api/equipment",
"provides": {
"asset_type": "equipment",
"features": [
"equipment_tracking",
"maintenance_scheduling",
"vendor_management",
"model_catalog"
]
},
"settings": {
"enable_maintenance_alerts": true,
"maintenance_alert_days": 30
}
}

View File

@@ -0,0 +1,8 @@
"""Equipment plugin models."""
from .equipment import Equipment, EquipmentType
__all__ = [
'Equipment',
'EquipmentType',
]

View File

@@ -0,0 +1,109 @@
"""Equipment plugin models."""
from shopdb.extensions import db
from shopdb.core.models.base import BaseModel
class EquipmentType(BaseModel):
"""
Equipment type classification.
Examples: CNC, CMM, Lathe, Grinder, EDM, Part Marker, etc.
"""
__tablename__ = 'equipmenttypes'
equipmenttypeid = db.Column(db.Integer, primary_key=True)
equipmenttype = db.Column(db.String(100), unique=True, nullable=False)
description = db.Column(db.Text)
icon = db.Column(db.String(50), comment='Icon name for UI')
def __repr__(self):
return f"<EquipmentType {self.equipmenttype}>"
class Equipment(BaseModel):
"""
Equipment-specific extension data.
Links to core Asset table via assetid.
Stores equipment-specific fields like type, model, vendor, etc.
"""
__tablename__ = 'equipment'
equipmentid = db.Column(db.Integer, primary_key=True)
# Link to core asset
assetid = db.Column(
db.Integer,
db.ForeignKey('assets.assetid', ondelete='CASCADE'),
unique=True,
nullable=False,
index=True
)
# Equipment classification
equipmenttypeid = db.Column(
db.Integer,
db.ForeignKey('equipmenttypes.equipmenttypeid'),
nullable=True
)
# Vendor and model
vendorid = db.Column(
db.Integer,
db.ForeignKey('vendors.vendorid'),
nullable=True
)
modelnumberid = db.Column(
db.Integer,
db.ForeignKey('models.modelnumberid'),
nullable=True
)
# Equipment-specific fields
requiresmanualconfig = db.Column(
db.Boolean,
default=False,
comment='Multi-PC machine needs manual configuration'
)
islocationonly = db.Column(
db.Boolean,
default=False,
comment='Virtual location marker (not actual equipment)'
)
# Maintenance tracking
lastmaintenancedate = db.Column(db.DateTime, nullable=True)
nextmaintenancedate = db.Column(db.DateTime, nullable=True)
maintenanceintervaldays = db.Column(db.Integer, nullable=True)
# Relationships
asset = db.relationship(
'Asset',
backref=db.backref('equipment', uselist=False, lazy='joined')
)
equipmenttype = db.relationship('EquipmentType', backref='equipment')
vendor = db.relationship('Vendor', backref='equipment_items')
model = db.relationship('Model', backref='equipment_items')
__table_args__ = (
db.Index('idx_equipment_type', 'equipmenttypeid'),
db.Index('idx_equipment_vendor', 'vendorid'),
)
def __repr__(self):
return f"<Equipment {self.assetid}>"
def to_dict(self):
"""Convert to dictionary with related names."""
result = super().to_dict()
# Add related object names
if self.equipmenttype:
result['equipmenttype_name'] = self.equipmenttype.equipmenttype
if self.vendor:
result['vendor_name'] = self.vendor.vendor
if self.model:
result['model_name'] = self.model.modelnumber
return result

220
plugins/equipment/plugin.py Normal file
View File

@@ -0,0 +1,220 @@
"""Equipment plugin main class."""
import json
import logging
from pathlib import Path
from typing import List, Dict, Optional, Type
from flask import Flask, Blueprint
import click
from shopdb.plugins.base import BasePlugin, PluginMeta
from shopdb.extensions import db
from shopdb.core.models import AssetType, AssetStatus
from .models import Equipment, EquipmentType
from .api import equipment_bp
logger = logging.getLogger(__name__)
class EquipmentPlugin(BasePlugin):
"""
Equipment plugin - manages manufacturing equipment assets.
Equipment includes CNCs, CMMs, lathes, grinders, EDMs, part markers, etc.
Uses the new Asset architecture with Equipment extension table.
"""
def __init__(self):
self._manifest = self._load_manifest()
def _load_manifest(self) -> Dict:
"""Load plugin manifest from JSON file."""
manifestpath = Path(__file__).parent / 'manifest.json'
if manifestpath.exists():
with open(manifestpath, 'r') as f:
return json.load(f)
return {}
@property
def meta(self) -> PluginMeta:
"""Return plugin metadata."""
return PluginMeta(
name=self._manifest.get('name', 'equipment'),
version=self._manifest.get('version', '1.0.0'),
description=self._manifest.get(
'description',
'Equipment management for manufacturing assets'
),
author=self._manifest.get('author', 'ShopDB Team'),
dependencies=self._manifest.get('dependencies', []),
core_version=self._manifest.get('core_version', '>=1.0.0'),
api_prefix=self._manifest.get('api_prefix', '/api/equipment'),
)
def get_blueprint(self) -> Optional[Blueprint]:
"""Return Flask Blueprint with API routes."""
return equipment_bp
def get_models(self) -> List[Type]:
"""Return list of SQLAlchemy model classes."""
return [Equipment, EquipmentType]
def init_app(self, app: Flask, db_instance) -> None:
"""Initialize plugin with Flask app."""
logger.info(f"Equipment plugin initialized (v{self.meta.version})")
def on_install(self, app: Flask) -> None:
"""Called when plugin is installed."""
with app.app_context():
self._ensure_asset_type()
self._ensure_asset_statuses()
self._ensure_equipment_types()
logger.info("Equipment plugin installed")
def _ensure_asset_type(self) -> None:
"""Ensure equipment asset type exists."""
existing = AssetType.query.filter_by(assettype='equipment').first()
if not existing:
at = AssetType(
assettype='equipment',
plugin_name='equipment',
table_name='equipment',
description='Manufacturing equipment (CNCs, CMMs, lathes, etc.)',
icon='cog'
)
db.session.add(at)
logger.debug("Created asset type: equipment")
db.session.commit()
def _ensure_asset_statuses(self) -> None:
"""Ensure standard asset statuses exist."""
statuses = [
('In Use', 'Asset is currently in use', '#28a745'),
('Spare', 'Spare/backup asset', '#17a2b8'),
('Retired', 'Asset has been retired', '#6c757d'),
('Maintenance', 'Asset is under maintenance', '#ffc107'),
('Decommissioned', 'Asset has been decommissioned', '#dc3545'),
]
for name, description, color in statuses:
existing = AssetStatus.query.filter_by(status=name).first()
if not existing:
s = AssetStatus(
status=name,
description=description,
color=color
)
db.session.add(s)
logger.debug(f"Created asset status: {name}")
db.session.commit()
def _ensure_equipment_types(self) -> None:
"""Ensure basic equipment types exist."""
equipment_types = [
('CNC', 'Computer Numerical Control machine', 'cnc'),
('CMM', 'Coordinate Measuring Machine', 'cmm'),
('Lathe', 'Lathe machine', 'lathe'),
('Grinder', 'Grinding machine', 'grinder'),
('EDM', 'Electrical Discharge Machine', 'edm'),
('Part Marker', 'Part marking/engraving equipment', 'marker'),
('Mill', 'Milling machine', 'mill'),
('Press', 'Press machine', 'press'),
('Robot', 'Industrial robot', 'robot'),
('Other', 'Other equipment type', 'cog'),
]
for name, description, icon in equipment_types:
existing = EquipmentType.query.filter_by(equipmenttype=name).first()
if not existing:
et = EquipmentType(
equipmenttype=name,
description=description,
icon=icon
)
db.session.add(et)
logger.debug(f"Created equipment type: {name}")
db.session.commit()
def on_uninstall(self, app: Flask) -> None:
"""Called when plugin is uninstalled."""
logger.info("Equipment plugin uninstalled")
def get_cli_commands(self) -> List:
"""Return CLI commands for this plugin."""
@click.group('equipment')
def equipmentcli():
"""Equipment plugin commands."""
pass
@equipmentcli.command('list-types')
def list_types():
"""List all equipment types."""
from flask import current_app
with current_app.app_context():
types = EquipmentType.query.filter_by(isactive=True).all()
if not types:
click.echo('No equipment types found.')
return
click.echo('Equipment Types:')
for t in types:
click.echo(f" [{t.equipmenttypeid}] {t.equipmenttype}")
@equipmentcli.command('stats')
def stats():
"""Show equipment statistics."""
from flask import current_app
from shopdb.core.models import Asset
with current_app.app_context():
total = db.session.query(Equipment).join(Asset).filter(
Asset.isactive == True
).count()
click.echo(f"Total active equipment: {total}")
# By type
by_type = db.session.query(
EquipmentType.equipmenttype,
db.func.count(Equipment.equipmentid)
).join(Equipment, Equipment.equipmenttypeid == EquipmentType.equipmenttypeid
).join(Asset, Asset.assetid == Equipment.assetid
).filter(Asset.isactive == True
).group_by(EquipmentType.equipmenttype
).all()
if by_type:
click.echo("\nBy Type:")
for t, c in by_type:
click.echo(f" {t}: {c}")
return [equipmentcli]
def get_dashboard_widgets(self) -> List[Dict]:
"""Return dashboard widget definitions."""
return [
{
'name': 'Equipment Status',
'component': 'EquipmentStatusWidget',
'endpoint': '/api/equipment/dashboard/summary',
'size': 'medium',
'position': 5,
},
]
def get_navigation_items(self) -> List[Dict]:
"""Return navigation menu items."""
return [
{
'name': 'Equipment',
'icon': 'cog',
'route': '/equipment',
'position': 10,
},
]

View File

@@ -0,0 +1,5 @@
"""Network plugin for ShopDB."""
from .plugin import NetworkPlugin
__all__ = ['NetworkPlugin']

View File

@@ -0,0 +1,5 @@
"""Network plugin API."""
from .routes import network_bp
__all__ = ['network_bp']

View File

@@ -0,0 +1,817 @@
"""Network plugin API endpoints."""
from flask import Blueprint, request
from flask_jwt_extended import jwt_required
from shopdb.extensions import db
from shopdb.core.models import Asset, AssetType, Vendor
from shopdb.utils.responses import (
success_response,
error_response,
paginated_response,
ErrorCodes
)
from shopdb.utils.pagination import get_pagination_params, paginate_query
from ..models import NetworkDevice, NetworkDeviceType, Subnet, VLAN
network_bp = Blueprint('network', __name__)
# =============================================================================
# Network Device Types
# =============================================================================
@network_bp.route('/types', methods=['GET'])
@jwt_required()
def list_network_device_types():
"""List all network device types."""
page, per_page = get_pagination_params(request)
query = NetworkDeviceType.query
if request.args.get('active', 'true').lower() != 'false':
query = query.filter(NetworkDeviceType.isactive == True)
if search := request.args.get('search'):
query = query.filter(NetworkDeviceType.networkdevicetype.ilike(f'%{search}%'))
query = query.order_by(NetworkDeviceType.networkdevicetype)
items, total = paginate_query(query, page, per_page)
data = [t.to_dict() for t in items]
return paginated_response(data, page, per_page, total)
@network_bp.route('/types/<int:type_id>', methods=['GET'])
@jwt_required()
def get_network_device_type(type_id: int):
"""Get a single network device type."""
t = NetworkDeviceType.query.get(type_id)
if not t:
return error_response(
ErrorCodes.NOT_FOUND,
f'Network device type with ID {type_id} not found',
http_code=404
)
return success_response(t.to_dict())
@network_bp.route('/types', methods=['POST'])
@jwt_required()
def create_network_device_type():
"""Create a new network device type."""
data = request.get_json()
if not data or not data.get('networkdevicetype'):
return error_response(ErrorCodes.VALIDATION_ERROR, 'networkdevicetype is required')
if NetworkDeviceType.query.filter_by(networkdevicetype=data['networkdevicetype']).first():
return error_response(
ErrorCodes.CONFLICT,
f"Network device type '{data['networkdevicetype']}' already exists",
http_code=409
)
t = NetworkDeviceType(
networkdevicetype=data['networkdevicetype'],
description=data.get('description'),
icon=data.get('icon')
)
db.session.add(t)
db.session.commit()
return success_response(t.to_dict(), message='Network device type created', http_code=201)
@network_bp.route('/types/<int:type_id>', methods=['PUT'])
@jwt_required()
def update_network_device_type(type_id: int):
"""Update a network device type."""
t = NetworkDeviceType.query.get(type_id)
if not t:
return error_response(
ErrorCodes.NOT_FOUND,
f'Network device type with ID {type_id} not found',
http_code=404
)
data = request.get_json()
if not data:
return error_response(ErrorCodes.VALIDATION_ERROR, 'No data provided')
if 'networkdevicetype' in data and data['networkdevicetype'] != t.networkdevicetype:
if NetworkDeviceType.query.filter_by(networkdevicetype=data['networkdevicetype']).first():
return error_response(
ErrorCodes.CONFLICT,
f"Network device type '{data['networkdevicetype']}' already exists",
http_code=409
)
for key in ['networkdevicetype', 'description', 'icon', 'isactive']:
if key in data:
setattr(t, key, data[key])
db.session.commit()
return success_response(t.to_dict(), message='Network device type updated')
# =============================================================================
# Network Devices CRUD
# =============================================================================
@network_bp.route('', methods=['GET'])
@jwt_required()
def list_network_devices():
"""
List all network devices with filtering and pagination.
Query parameters:
- page, per_page: Pagination
- active: Filter by active status
- search: Search by asset number, name, or hostname
- type_id: Filter by network device type ID
- vendor_id: Filter by vendor ID
- location_id: Filter by location ID
- businessunit_id: Filter by business unit ID
- poe: Filter by PoE capability (true/false)
- managed: Filter by managed status (true/false)
"""
page, per_page = get_pagination_params(request)
# Join NetworkDevice with Asset
query = db.session.query(NetworkDevice).join(Asset)
# Active filter
if request.args.get('active', 'true').lower() != 'false':
query = query.filter(Asset.isactive == True)
# Search filter
if search := request.args.get('search'):
query = query.filter(
db.or_(
Asset.assetnumber.ilike(f'%{search}%'),
Asset.name.ilike(f'%{search}%'),
Asset.serialnumber.ilike(f'%{search}%'),
NetworkDevice.hostname.ilike(f'%{search}%')
)
)
# Type filter
if type_id := request.args.get('type_id'):
query = query.filter(NetworkDevice.networkdevicetypeid == int(type_id))
# Vendor filter
if vendor_id := request.args.get('vendor_id'):
query = query.filter(NetworkDevice.vendorid == int(vendor_id))
# Location filter
if location_id := request.args.get('location_id'):
query = query.filter(Asset.locationid == int(location_id))
# Business unit filter
if bu_id := request.args.get('businessunit_id'):
query = query.filter(Asset.businessunitid == int(bu_id))
# PoE filter
if poe := request.args.get('poe'):
query = query.filter(NetworkDevice.ispoe == (poe.lower() == 'true'))
# Managed filter
if managed := request.args.get('managed'):
query = query.filter(NetworkDevice.ismanaged == (managed.lower() == 'true'))
# Sorting
sort_by = request.args.get('sort', 'hostname')
sort_dir = request.args.get('dir', 'asc')
if sort_by == 'hostname':
col = NetworkDevice.hostname
elif sort_by == 'assetnumber':
col = Asset.assetnumber
elif sort_by == 'name':
col = Asset.name
else:
col = NetworkDevice.hostname
query = query.order_by(col.desc() if sort_dir == 'desc' else col)
items, total = paginate_query(query, page, per_page)
# Build response with both asset and network device data
data = []
for netdev in items:
item = netdev.asset.to_dict() if netdev.asset else {}
item['network_device'] = netdev.to_dict()
data.append(item)
return paginated_response(data, page, per_page, total)
@network_bp.route('/<int:device_id>', methods=['GET'])
@jwt_required()
def get_network_device(device_id: int):
"""Get a single network device with full details."""
netdev = NetworkDevice.query.get(device_id)
if not netdev:
return error_response(
ErrorCodes.NOT_FOUND,
f'Network device with ID {device_id} not found',
http_code=404
)
result = netdev.asset.to_dict() if netdev.asset else {}
result['network_device'] = netdev.to_dict()
return success_response(result)
@network_bp.route('/by-asset/<int:asset_id>', methods=['GET'])
@jwt_required()
def get_network_device_by_asset(asset_id: int):
"""Get network device data by asset ID."""
netdev = NetworkDevice.query.filter_by(assetid=asset_id).first()
if not netdev:
return error_response(
ErrorCodes.NOT_FOUND,
f'Network device for asset {asset_id} not found',
http_code=404
)
result = netdev.asset.to_dict() if netdev.asset else {}
result['network_device'] = netdev.to_dict()
return success_response(result)
@network_bp.route('/by-hostname/<hostname>', methods=['GET'])
@jwt_required()
def get_network_device_by_hostname(hostname: str):
"""Get network device by hostname."""
netdev = NetworkDevice.query.filter_by(hostname=hostname).first()
if not netdev:
return error_response(
ErrorCodes.NOT_FOUND,
f'Network device with hostname {hostname} not found',
http_code=404
)
result = netdev.asset.to_dict() if netdev.asset else {}
result['network_device'] = netdev.to_dict()
return success_response(result)
@network_bp.route('', methods=['POST'])
@jwt_required()
def create_network_device():
"""
Create new network device (creates both Asset and NetworkDevice records).
Required fields:
- assetnumber: Business identifier
Optional fields:
- name, serialnumber, statusid, locationid, businessunitid
- networkdevicetypeid, vendorid, hostname
- firmwareversion, portcount, ispoe, ismanaged, rackunit
- mapleft, maptop, notes
"""
data = request.get_json()
if not data:
return error_response(ErrorCodes.VALIDATION_ERROR, 'No data provided')
if not data.get('assetnumber'):
return error_response(ErrorCodes.VALIDATION_ERROR, 'assetnumber is required')
# Check for duplicate assetnumber
if Asset.query.filter_by(assetnumber=data['assetnumber']).first():
return error_response(
ErrorCodes.CONFLICT,
f"Asset with number '{data['assetnumber']}' already exists",
http_code=409
)
# Check for duplicate hostname
if data.get('hostname'):
if NetworkDevice.query.filter_by(hostname=data['hostname']).first():
return error_response(
ErrorCodes.CONFLICT,
f"Network device with hostname '{data['hostname']}' already exists",
http_code=409
)
# Get network device asset type
network_type = AssetType.query.filter_by(assettype='network_device').first()
if not network_type:
return error_response(
ErrorCodes.INTERNAL_ERROR,
'Network device asset type not found. Plugin may not be properly installed.',
http_code=500
)
# Create the core asset
asset = Asset(
assetnumber=data['assetnumber'],
name=data.get('name'),
serialnumber=data.get('serialnumber'),
assettypeid=network_type.assettypeid,
statusid=data.get('statusid', 1),
locationid=data.get('locationid'),
businessunitid=data.get('businessunitid'),
mapleft=data.get('mapleft'),
maptop=data.get('maptop'),
notes=data.get('notes')
)
db.session.add(asset)
db.session.flush() # Get the assetid
# Create the network device extension
netdev = NetworkDevice(
assetid=asset.assetid,
networkdevicetypeid=data.get('networkdevicetypeid'),
vendorid=data.get('vendorid'),
hostname=data.get('hostname'),
firmwareversion=data.get('firmwareversion'),
portcount=data.get('portcount'),
ispoe=data.get('ispoe', False),
ismanaged=data.get('ismanaged', False),
rackunit=data.get('rackunit')
)
db.session.add(netdev)
db.session.commit()
result = asset.to_dict()
result['network_device'] = netdev.to_dict()
return success_response(result, message='Network device created', http_code=201)
@network_bp.route('/<int:device_id>', methods=['PUT'])
@jwt_required()
def update_network_device(device_id: int):
"""Update network device (both Asset and NetworkDevice records)."""
netdev = NetworkDevice.query.get(device_id)
if not netdev:
return error_response(
ErrorCodes.NOT_FOUND,
f'Network device with ID {device_id} not found',
http_code=404
)
data = request.get_json()
if not data:
return error_response(ErrorCodes.VALIDATION_ERROR, 'No data provided')
asset = netdev.asset
# Check for conflicting assetnumber
if 'assetnumber' in data and data['assetnumber'] != asset.assetnumber:
if Asset.query.filter_by(assetnumber=data['assetnumber']).first():
return error_response(
ErrorCodes.CONFLICT,
f"Asset with number '{data['assetnumber']}' already exists",
http_code=409
)
# Check for conflicting hostname
if 'hostname' in data and data['hostname'] != netdev.hostname:
existing = NetworkDevice.query.filter_by(hostname=data['hostname']).first()
if existing and existing.networkdeviceid != device_id:
return error_response(
ErrorCodes.CONFLICT,
f"Network device with hostname '{data['hostname']}' already exists",
http_code=409
)
# Update asset fields
asset_fields = ['assetnumber', 'name', 'serialnumber', 'statusid',
'locationid', 'businessunitid', 'mapleft', 'maptop', 'notes', 'isactive']
for key in asset_fields:
if key in data:
setattr(asset, key, data[key])
# Update network device fields
netdev_fields = ['networkdevicetypeid', 'vendorid', 'hostname',
'firmwareversion', 'portcount', 'ispoe', 'ismanaged', 'rackunit']
for key in netdev_fields:
if key in data:
setattr(netdev, key, data[key])
db.session.commit()
result = asset.to_dict()
result['network_device'] = netdev.to_dict()
return success_response(result, message='Network device updated')
@network_bp.route('/<int:device_id>', methods=['DELETE'])
@jwt_required()
def delete_network_device(device_id: int):
"""Delete (soft delete) network device."""
netdev = NetworkDevice.query.get(device_id)
if not netdev:
return error_response(
ErrorCodes.NOT_FOUND,
f'Network device with ID {device_id} not found',
http_code=404
)
# Soft delete the asset
netdev.asset.isactive = False
db.session.commit()
return success_response(message='Network device deleted')
# =============================================================================
# Dashboard
# =============================================================================
@network_bp.route('/dashboard/summary', methods=['GET'])
@jwt_required()
def dashboard_summary():
"""Get network device dashboard summary data."""
# Total active network devices
total = db.session.query(NetworkDevice).join(Asset).filter(
Asset.isactive == True
).count()
# Count by device type
by_type = db.session.query(
NetworkDeviceType.networkdevicetype,
db.func.count(NetworkDevice.networkdeviceid)
).join(NetworkDevice, NetworkDevice.networkdevicetypeid == NetworkDeviceType.networkdevicetypeid
).join(Asset, Asset.assetid == NetworkDevice.assetid
).filter(Asset.isactive == True
).group_by(NetworkDeviceType.networkdevicetype
).all()
# Count by vendor
by_vendor = db.session.query(
Vendor.vendor,
db.func.count(NetworkDevice.networkdeviceid)
).join(NetworkDevice, NetworkDevice.vendorid == Vendor.vendorid
).join(Asset, Asset.assetid == NetworkDevice.assetid
).filter(Asset.isactive == True
).group_by(Vendor.vendor
).all()
# Count PoE vs non-PoE
poe_count = db.session.query(NetworkDevice).join(Asset).filter(
Asset.isactive == True,
NetworkDevice.ispoe == True
).count()
return success_response({
'total': total,
'by_type': [{'type': t, 'count': c} for t, c in by_type],
'by_vendor': [{'vendor': v, 'count': c} for v, c in by_vendor],
'poe': poe_count,
'non_poe': total - poe_count
})
# =============================================================================
# VLANs
# =============================================================================
@network_bp.route('/vlans', methods=['GET'])
@jwt_required()
def list_vlans():
"""List all VLANs with filtering and pagination."""
page, per_page = get_pagination_params(request)
query = VLAN.query
if request.args.get('active', 'true').lower() != 'false':
query = query.filter(VLAN.isactive == True)
# Search filter
if search := request.args.get('search'):
query = query.filter(
db.or_(
VLAN.name.ilike(f'%{search}%'),
VLAN.description.ilike(f'%{search}%'),
db.cast(VLAN.vlannumber, db.String).ilike(f'%{search}%')
)
)
# Type filter
if vlan_type := request.args.get('type'):
query = query.filter(VLAN.vlantype == vlan_type)
query = query.order_by(VLAN.vlannumber)
items, total = paginate_query(query, page, per_page)
data = [v.to_dict() for v in items]
return paginated_response(data, page, per_page, total)
@network_bp.route('/vlans/<int:vlan_id>', methods=['GET'])
@jwt_required()
def get_vlan(vlan_id: int):
"""Get a single VLAN with its subnets."""
vlan = VLAN.query.get(vlan_id)
if not vlan:
return error_response(
ErrorCodes.NOT_FOUND,
f'VLAN with ID {vlan_id} not found',
http_code=404
)
result = vlan.to_dict()
# Include associated subnets
result['subnets'] = [s.to_dict() for s in vlan.subnets.filter_by(isactive=True).all()]
return success_response(result)
@network_bp.route('/vlans', methods=['POST'])
@jwt_required()
def create_vlan():
"""Create a new VLAN."""
data = request.get_json()
if not data:
return error_response(ErrorCodes.VALIDATION_ERROR, 'No data provided')
if not data.get('vlannumber'):
return error_response(ErrorCodes.VALIDATION_ERROR, 'vlannumber is required')
if not data.get('name'):
return error_response(ErrorCodes.VALIDATION_ERROR, 'name is required')
# Check for duplicate VLAN number
if VLAN.query.filter_by(vlannumber=data['vlannumber']).first():
return error_response(
ErrorCodes.CONFLICT,
f"VLAN {data['vlannumber']} already exists",
http_code=409
)
vlan = VLAN(
vlannumber=data['vlannumber'],
name=data['name'],
description=data.get('description'),
vlantype=data.get('vlantype')
)
db.session.add(vlan)
db.session.commit()
return success_response(vlan.to_dict(), message='VLAN created', http_code=201)
@network_bp.route('/vlans/<int:vlan_id>', methods=['PUT'])
@jwt_required()
def update_vlan(vlan_id: int):
"""Update a VLAN."""
vlan = VLAN.query.get(vlan_id)
if not vlan:
return error_response(
ErrorCodes.NOT_FOUND,
f'VLAN with ID {vlan_id} not found',
http_code=404
)
data = request.get_json()
if not data:
return error_response(ErrorCodes.VALIDATION_ERROR, 'No data provided')
# Check for conflicting VLAN number
if 'vlannumber' in data and data['vlannumber'] != vlan.vlannumber:
if VLAN.query.filter_by(vlannumber=data['vlannumber']).first():
return error_response(
ErrorCodes.CONFLICT,
f"VLAN {data['vlannumber']} already exists",
http_code=409
)
for key in ['vlannumber', 'name', 'description', 'vlantype', 'isactive']:
if key in data:
setattr(vlan, key, data[key])
db.session.commit()
return success_response(vlan.to_dict(), message='VLAN updated')
@network_bp.route('/vlans/<int:vlan_id>', methods=['DELETE'])
@jwt_required()
def delete_vlan(vlan_id: int):
"""Delete (soft delete) a VLAN."""
vlan = VLAN.query.get(vlan_id)
if not vlan:
return error_response(
ErrorCodes.NOT_FOUND,
f'VLAN with ID {vlan_id} not found',
http_code=404
)
# Check if VLAN has associated subnets
if vlan.subnets.filter_by(isactive=True).count() > 0:
return error_response(
ErrorCodes.VALIDATION_ERROR,
'Cannot delete VLAN with associated subnets',
http_code=400
)
vlan.isactive = False
db.session.commit()
return success_response(message='VLAN deleted')
# =============================================================================
# Subnets
# =============================================================================
@network_bp.route('/subnets', methods=['GET'])
@jwt_required()
def list_subnets():
"""List all subnets with filtering and pagination."""
page, per_page = get_pagination_params(request)
query = Subnet.query
if request.args.get('active', 'true').lower() != 'false':
query = query.filter(Subnet.isactive == True)
# Search filter
if search := request.args.get('search'):
query = query.filter(
db.or_(
Subnet.cidr.ilike(f'%{search}%'),
Subnet.name.ilike(f'%{search}%'),
Subnet.description.ilike(f'%{search}%')
)
)
# VLAN filter
if vlan_id := request.args.get('vlanid'):
query = query.filter(Subnet.vlanid == int(vlan_id))
# Location filter
if location_id := request.args.get('locationid'):
query = query.filter(Subnet.locationid == int(location_id))
# Type filter
if subnet_type := request.args.get('type'):
query = query.filter(Subnet.subnettype == subnet_type)
query = query.order_by(Subnet.cidr)
items, total = paginate_query(query, page, per_page)
data = [s.to_dict() for s in items]
return paginated_response(data, page, per_page, total)
@network_bp.route('/subnets/<int:subnet_id>', methods=['GET'])
@jwt_required()
def get_subnet(subnet_id: int):
"""Get a single subnet."""
subnet = Subnet.query.get(subnet_id)
if not subnet:
return error_response(
ErrorCodes.NOT_FOUND,
f'Subnet with ID {subnet_id} not found',
http_code=404
)
return success_response(subnet.to_dict())
@network_bp.route('/subnets', methods=['POST'])
@jwt_required()
def create_subnet():
"""Create a new subnet."""
data = request.get_json()
if not data:
return error_response(ErrorCodes.VALIDATION_ERROR, 'No data provided')
if not data.get('cidr'):
return error_response(ErrorCodes.VALIDATION_ERROR, 'cidr is required')
if not data.get('name'):
return error_response(ErrorCodes.VALIDATION_ERROR, 'name is required')
# Validate CIDR format (basic check)
cidr = data['cidr']
if '/' not in cidr:
return error_response(ErrorCodes.VALIDATION_ERROR, 'cidr must be in CIDR notation (e.g., 10.1.1.0/24)')
# Check for duplicate CIDR
if Subnet.query.filter_by(cidr=cidr).first():
return error_response(
ErrorCodes.CONFLICT,
f"Subnet {cidr} already exists",
http_code=409
)
# Validate VLAN if provided
if data.get('vlanid'):
if not VLAN.query.get(data['vlanid']):
return error_response(
ErrorCodes.VALIDATION_ERROR,
f"VLAN with ID {data['vlanid']} not found"
)
subnet = Subnet(
cidr=cidr,
name=data['name'],
description=data.get('description'),
gatewayip=data.get('gatewayip'),
subnetmask=data.get('subnetmask'),
networkaddress=data.get('networkaddress'),
broadcastaddress=data.get('broadcastaddress'),
vlanid=data.get('vlanid'),
subnettype=data.get('subnettype'),
locationid=data.get('locationid'),
dhcpenabled=data.get('dhcpenabled', True),
dhcprangestart=data.get('dhcprangestart'),
dhcprangeend=data.get('dhcprangeend'),
dns1=data.get('dns1'),
dns2=data.get('dns2')
)
db.session.add(subnet)
db.session.commit()
return success_response(subnet.to_dict(), message='Subnet created', http_code=201)
@network_bp.route('/subnets/<int:subnet_id>', methods=['PUT'])
@jwt_required()
def update_subnet(subnet_id: int):
"""Update a subnet."""
subnet = Subnet.query.get(subnet_id)
if not subnet:
return error_response(
ErrorCodes.NOT_FOUND,
f'Subnet with ID {subnet_id} not found',
http_code=404
)
data = request.get_json()
if not data:
return error_response(ErrorCodes.VALIDATION_ERROR, 'No data provided')
# Check for conflicting CIDR
if 'cidr' in data and data['cidr'] != subnet.cidr:
if Subnet.query.filter_by(cidr=data['cidr']).first():
return error_response(
ErrorCodes.CONFLICT,
f"Subnet {data['cidr']} already exists",
http_code=409
)
allowed_fields = ['cidr', 'name', 'description', 'gatewayip', 'subnetmask',
'networkaddress', 'broadcastaddress', 'vlanid', 'subnettype',
'locationid', 'dhcpenabled', 'dhcprangestart', 'dhcprangeend',
'dns1', 'dns2', 'isactive']
for key in allowed_fields:
if key in data:
setattr(subnet, key, data[key])
db.session.commit()
return success_response(subnet.to_dict(), message='Subnet updated')
@network_bp.route('/subnets/<int:subnet_id>', methods=['DELETE'])
@jwt_required()
def delete_subnet(subnet_id: int):
"""Delete (soft delete) a subnet."""
subnet = Subnet.query.get(subnet_id)
if not subnet:
return error_response(
ErrorCodes.NOT_FOUND,
f'Subnet with ID {subnet_id} not found',
http_code=404
)
subnet.isactive = False
db.session.commit()
return success_response(message='Subnet deleted')

View File

@@ -0,0 +1,22 @@
{
"name": "network",
"version": "1.0.0",
"description": "Network device management plugin for switches, APs, cameras, and IDFs",
"author": "ShopDB Team",
"dependencies": [],
"core_version": ">=1.0.0",
"api_prefix": "/api/network",
"provides": {
"asset_type": "network_device",
"features": [
"network_device_tracking",
"port_management",
"firmware_tracking",
"poe_monitoring"
]
},
"settings": {
"enable_snmp_polling": false,
"snmp_community": "public"
}
}

View File

@@ -0,0 +1,11 @@
"""Network plugin models."""
from .network_device import NetworkDevice, NetworkDeviceType
from .subnet import Subnet, VLAN
__all__ = [
'NetworkDevice',
'NetworkDeviceType',
'Subnet',
'VLAN',
]

View File

@@ -0,0 +1,121 @@
"""Network device plugin models."""
from shopdb.extensions import db
from shopdb.core.models.base import BaseModel
class NetworkDeviceType(BaseModel):
"""
Network device type classification.
Examples: Switch, Router, Access Point, Camera, IDF, Firewall, etc.
"""
__tablename__ = 'networkdevicetypes'
networkdevicetypeid = db.Column(db.Integer, primary_key=True)
networkdevicetype = db.Column(db.String(100), unique=True, nullable=False)
description = db.Column(db.Text)
icon = db.Column(db.String(50), comment='Icon name for UI')
def __repr__(self):
return f"<NetworkDeviceType {self.networkdevicetype}>"
class NetworkDevice(BaseModel):
"""
Network device-specific extension data.
Links to core Asset table via assetid.
Stores network device-specific fields like hostname, firmware, ports, etc.
"""
__tablename__ = 'networkdevices'
networkdeviceid = db.Column(db.Integer, primary_key=True)
# Link to core asset
assetid = db.Column(
db.Integer,
db.ForeignKey('assets.assetid', ondelete='CASCADE'),
unique=True,
nullable=False,
index=True
)
# Network device classification
networkdevicetypeid = db.Column(
db.Integer,
db.ForeignKey('networkdevicetypes.networkdevicetypeid'),
nullable=True
)
# Vendor
vendorid = db.Column(
db.Integer,
db.ForeignKey('vendors.vendorid'),
nullable=True
)
# Network identity
hostname = db.Column(
db.String(100),
index=True,
comment='Network hostname'
)
# Firmware/software version
firmwareversion = db.Column(db.String(100), nullable=True)
# Physical characteristics
portcount = db.Column(
db.Integer,
nullable=True,
comment='Number of ports (for switches)'
)
# Features
ispoe = db.Column(
db.Boolean,
default=False,
comment='Power over Ethernet capable'
)
ismanaged = db.Column(
db.Boolean,
default=False,
comment='Managed device (SNMP, web interface, etc.)'
)
# For IDF/closet locations
rackunit = db.Column(
db.String(20),
nullable=True,
comment='Rack unit position (e.g., U1, U5)'
)
# Relationships
asset = db.relationship(
'Asset',
backref=db.backref('network_device', uselist=False, lazy='joined')
)
networkdevicetype = db.relationship('NetworkDeviceType', backref='networkdevices')
vendor = db.relationship('Vendor', backref='network_devices')
__table_args__ = (
db.Index('idx_netdev_type', 'networkdevicetypeid'),
db.Index('idx_netdev_hostname', 'hostname'),
db.Index('idx_netdev_vendor', 'vendorid'),
)
def __repr__(self):
return f"<NetworkDevice {self.hostname or self.assetid}>"
def to_dict(self):
"""Convert to dictionary with related names."""
result = super().to_dict()
# Add related object names
if self.networkdevicetype:
result['networkdevicetype_name'] = self.networkdevicetype.networkdevicetype
if self.vendor:
result['vendor_name'] = self.vendor.vendor
return result

View File

@@ -0,0 +1,146 @@
"""Subnet and VLAN models for network plugin."""
from shopdb.extensions import db
from shopdb.core.models.base import BaseModel
class VLAN(BaseModel):
"""
VLAN definition.
Represents a virtual LAN for network segmentation.
"""
__tablename__ = 'vlans'
vlanid = db.Column(db.Integer, primary_key=True)
vlannumber = db.Column(db.Integer, unique=True, nullable=False, comment='VLAN ID number')
name = db.Column(db.String(100), nullable=False, comment='VLAN name')
description = db.Column(db.Text, nullable=True)
# Optional classification
vlantype = db.Column(
db.String(50),
nullable=True,
comment='Type: data, voice, management, guest, etc.'
)
# Relationships
subnets = db.relationship('Subnet', backref='vlan', lazy='dynamic')
__table_args__ = (
db.Index('idx_vlan_number', 'vlannumber'),
)
def __repr__(self):
return f"<VLAN {self.vlannumber} - {self.name}>"
def to_dict(self):
"""Convert to dictionary."""
result = super().to_dict()
result['subnetcount'] = self.subnets.count() if self.subnets else 0
return result
class Subnet(BaseModel):
"""
Subnet/IP network definition.
Represents an IP subnet with optional VLAN association.
"""
__tablename__ = 'subnets'
subnetid = db.Column(db.Integer, primary_key=True)
# Network definition
cidr = db.Column(
db.String(18),
unique=True,
nullable=False,
comment='CIDR notation (e.g., 10.1.1.0/24)'
)
name = db.Column(db.String(100), nullable=False, comment='Subnet name')
description = db.Column(db.Text, nullable=True)
# Network details
gatewayip = db.Column(
db.String(15),
nullable=True,
comment='Default gateway IP address'
)
subnetmask = db.Column(
db.String(15),
nullable=True,
comment='Subnet mask (e.g., 255.255.255.0)'
)
networkaddress = db.Column(
db.String(15),
nullable=True,
comment='Network address (e.g., 10.1.1.0)'
)
broadcastaddress = db.Column(
db.String(15),
nullable=True,
comment='Broadcast address (e.g., 10.1.1.255)'
)
# VLAN association
vlanid = db.Column(
db.Integer,
db.ForeignKey('vlans.vlanid'),
nullable=True
)
# Classification
subnettype = db.Column(
db.String(50),
nullable=True,
comment='Type: production, development, management, dmz, etc.'
)
# Location association
locationid = db.Column(
db.Integer,
db.ForeignKey('locations.locationid'),
nullable=True
)
# DHCP settings
dhcpenabled = db.Column(db.Boolean, default=True, comment='DHCP enabled for this subnet')
dhcprangestart = db.Column(db.String(15), nullable=True, comment='DHCP range start IP')
dhcprangeend = db.Column(db.String(15), nullable=True, comment='DHCP range end IP')
# DNS settings
dns1 = db.Column(db.String(15), nullable=True, comment='Primary DNS server')
dns2 = db.Column(db.String(15), nullable=True, comment='Secondary DNS server')
# Relationships
location = db.relationship('Location', backref='subnets')
__table_args__ = (
db.Index('idx_subnet_cidr', 'cidr'),
db.Index('idx_subnet_vlan', 'vlanid'),
db.Index('idx_subnet_location', 'locationid'),
)
def __repr__(self):
return f"<Subnet {self.cidr} - {self.name}>"
@property
def vlan_number(self):
"""Get the VLAN number."""
return self.vlan.vlannumber if self.vlan else None
def to_dict(self):
"""Convert to dictionary with related data."""
result = super().to_dict()
# Add VLAN info
if self.vlan:
result['vlannumber'] = self.vlan.vlannumber
result['vlanname'] = self.vlan.name
# Add location info
if self.location:
result['locationname'] = self.location.locationname
return result

217
plugins/network/plugin.py Normal file
View File

@@ -0,0 +1,217 @@
"""Network plugin main class."""
import json
import logging
from pathlib import Path
from typing import List, Dict, Optional, Type
from flask import Flask, Blueprint
import click
from shopdb.plugins.base import BasePlugin, PluginMeta
from shopdb.extensions import db
from shopdb.core.models import AssetType
from .models import NetworkDevice, NetworkDeviceType, Subnet, VLAN
from .api import network_bp
logger = logging.getLogger(__name__)
class NetworkPlugin(BasePlugin):
"""
Network plugin - manages network device assets.
Network devices include switches, routers, access points, cameras, IDFs, etc.
Uses the new Asset architecture with NetworkDevice extension table.
"""
def __init__(self):
self._manifest = self._load_manifest()
def _load_manifest(self) -> Dict:
"""Load plugin manifest from JSON file."""
manifestpath = Path(__file__).parent / 'manifest.json'
if manifestpath.exists():
with open(manifestpath, 'r') as f:
return json.load(f)
return {}
@property
def meta(self) -> PluginMeta:
"""Return plugin metadata."""
return PluginMeta(
name=self._manifest.get('name', 'network'),
version=self._manifest.get('version', '1.0.0'),
description=self._manifest.get(
'description',
'Network device management for switches, APs, and cameras'
),
author=self._manifest.get('author', 'ShopDB Team'),
dependencies=self._manifest.get('dependencies', []),
core_version=self._manifest.get('core_version', '>=1.0.0'),
api_prefix=self._manifest.get('api_prefix', '/api/network'),
)
def get_blueprint(self) -> Optional[Blueprint]:
"""Return Flask Blueprint with API routes."""
return network_bp
def get_models(self) -> List[Type]:
"""Return list of SQLAlchemy model classes."""
return [NetworkDevice, NetworkDeviceType, Subnet, VLAN]
def init_app(self, app: Flask, db_instance) -> None:
"""Initialize plugin with Flask app."""
logger.info(f"Network plugin initialized (v{self.meta.version})")
def on_install(self, app: Flask) -> None:
"""Called when plugin is installed."""
with app.app_context():
self._ensure_asset_type()
self._ensure_network_device_types()
logger.info("Network plugin installed")
def _ensure_asset_type(self) -> None:
"""Ensure network_device asset type exists."""
existing = AssetType.query.filter_by(assettype='network_device').first()
if not existing:
at = AssetType(
assettype='network_device',
plugin_name='network',
table_name='networkdevices',
description='Network infrastructure devices (switches, APs, cameras, etc.)',
icon='network-wired'
)
db.session.add(at)
logger.debug("Created asset type: network_device")
db.session.commit()
def _ensure_network_device_types(self) -> None:
"""Ensure basic network device types exist."""
device_types = [
('Switch', 'Network switch', 'network-wired'),
('Router', 'Network router', 'router'),
('Access Point', 'Wireless access point', 'wifi'),
('Firewall', 'Network firewall', 'shield'),
('Camera', 'IP camera', 'video'),
('IDF', 'Intermediate Distribution Frame/closet', 'box'),
('MDF', 'Main Distribution Frame', 'building'),
('Patch Panel', 'Patch panel', 'th'),
('UPS', 'Uninterruptible power supply', 'battery'),
('Other', 'Other network device', 'network-wired'),
]
for name, description, icon in device_types:
existing = NetworkDeviceType.query.filter_by(networkdevicetype=name).first()
if not existing:
ndt = NetworkDeviceType(
networkdevicetype=name,
description=description,
icon=icon
)
db.session.add(ndt)
logger.debug(f"Created network device type: {name}")
db.session.commit()
def on_uninstall(self, app: Flask) -> None:
"""Called when plugin is uninstalled."""
logger.info("Network plugin uninstalled")
def get_cli_commands(self) -> List:
"""Return CLI commands for this plugin."""
@click.group('network')
def networkcli():
"""Network plugin commands."""
pass
@networkcli.command('list-types')
def list_types():
"""List all network device types."""
from flask import current_app
with current_app.app_context():
types = NetworkDeviceType.query.filter_by(isactive=True).all()
if not types:
click.echo('No network device types found.')
return
click.echo('Network Device Types:')
for t in types:
click.echo(f" [{t.networkdevicetypeid}] {t.networkdevicetype}")
@networkcli.command('stats')
def stats():
"""Show network device statistics."""
from flask import current_app
from shopdb.core.models import Asset
with current_app.app_context():
total = db.session.query(NetworkDevice).join(Asset).filter(
Asset.isactive == True
).count()
click.echo(f"Total active network devices: {total}")
# By type
by_type = db.session.query(
NetworkDeviceType.networkdevicetype,
db.func.count(NetworkDevice.networkdeviceid)
).join(NetworkDevice, NetworkDevice.networkdevicetypeid == NetworkDeviceType.networkdevicetypeid
).join(Asset, Asset.assetid == NetworkDevice.assetid
).filter(Asset.isactive == True
).group_by(NetworkDeviceType.networkdevicetype
).all()
if by_type:
click.echo("\nBy Type:")
for t, c in by_type:
click.echo(f" {t}: {c}")
@networkcli.command('find')
@click.argument('hostname')
def find_by_hostname(hostname):
"""Find a network device by hostname."""
from flask import current_app
with current_app.app_context():
netdev = NetworkDevice.query.filter(
NetworkDevice.hostname.ilike(f'%{hostname}%')
).first()
if not netdev:
click.echo(f'No network device found matching hostname: {hostname}')
return
click.echo(f'Found: {netdev.hostname}')
click.echo(f' Asset: {netdev.asset.assetnumber}')
click.echo(f' Type: {netdev.networkdevicetype.networkdevicetype if netdev.networkdevicetype else "N/A"}')
click.echo(f' Firmware: {netdev.firmwareversion or "N/A"}')
click.echo(f' PoE: {"Yes" if netdev.ispoe else "No"}')
return [networkcli]
def get_dashboard_widgets(self) -> List[Dict]:
"""Return dashboard widget definitions."""
return [
{
'name': 'Network Status',
'component': 'NetworkStatusWidget',
'endpoint': '/api/network/dashboard/summary',
'size': 'medium',
'position': 7,
},
]
def get_navigation_items(self) -> List[Dict]:
"""Return navigation menu items."""
return [
{
'name': 'Network',
'icon': 'network-wired',
'route': '/network',
'position': 18,
},
]

View File

@@ -0,0 +1,5 @@
"""Notifications plugin package."""
from .plugin import NotificationsPlugin
__all__ = ['NotificationsPlugin']

View File

@@ -0,0 +1,5 @@
"""Notifications plugin API."""
from .routes import notifications_bp
__all__ = ['notifications_bp']

View File

@@ -0,0 +1,617 @@
"""Notifications plugin API endpoints - adapted to existing schema."""
from datetime import datetime
from flask import Blueprint, request
from flask_jwt_extended import jwt_required
from shopdb.extensions import db
from shopdb.utils.responses import (
success_response,
error_response,
paginated_response,
ErrorCodes
)
from shopdb.utils.pagination import get_pagination_params, paginate_query
from ..models import Notification, NotificationType
notifications_bp = Blueprint('notifications', __name__)
# =============================================================================
# Notification Types
# =============================================================================
@notifications_bp.route('/types', methods=['GET'])
def list_notification_types():
"""List all notification types."""
page, per_page = get_pagination_params(request)
query = NotificationType.query
if request.args.get('active', 'true').lower() != 'false':
query = query.filter(NotificationType.isactive == True)
query = query.order_by(NotificationType.typename)
items, total = paginate_query(query, page, per_page)
data = [t.to_dict() for t in items]
return paginated_response(data, page, per_page, total)
@notifications_bp.route('/types', methods=['POST'])
@jwt_required()
def create_notification_type():
"""Create a new notification type."""
data = request.get_json()
if not data or not data.get('typename'):
return error_response(ErrorCodes.VALIDATION_ERROR, 'typename is required')
if NotificationType.query.filter_by(typename=data['typename']).first():
return error_response(
ErrorCodes.CONFLICT,
f"Notification type '{data['typename']}' already exists",
http_code=409
)
t = NotificationType(
typename=data['typename'],
typedescription=data.get('typedescription') or data.get('description'),
typecolor=data.get('typecolor') or data.get('color', '#17a2b8')
)
db.session.add(t)
db.session.commit()
return success_response(t.to_dict(), message='Notification type created', http_code=201)
# =============================================================================
# Notifications CRUD
# =============================================================================
@notifications_bp.route('', methods=['GET'])
def list_notifications():
"""
List all notifications with filtering and pagination.
Query parameters:
- page, per_page: Pagination
- active: Filter by active status (default: true)
- type_id: Filter by notification type ID
- current: Filter to currently active notifications only
- search: Search in notification text
"""
page, per_page = get_pagination_params(request)
query = Notification.query
# Active filter
if request.args.get('active', 'true').lower() != 'false':
query = query.filter(Notification.isactive == True)
# Type filter
if type_id := request.args.get('type_id'):
query = query.filter(Notification.notificationtypeid == int(type_id))
# Current filter (active based on dates)
if request.args.get('current', 'false').lower() == 'true':
now = datetime.utcnow()
query = query.filter(
Notification.starttime <= now,
db.or_(
Notification.endtime.is_(None),
Notification.endtime >= now
)
)
# Search filter
if search := request.args.get('search'):
query = query.filter(
Notification.notification.ilike(f'%{search}%')
)
# Sorting by start time (newest first)
query = query.order_by(Notification.starttime.desc())
items, total = paginate_query(query, page, per_page)
data = [n.to_dict() for n in items]
return paginated_response(data, page, per_page, total)
@notifications_bp.route('/<int:notification_id>', methods=['GET'])
def get_notification(notification_id: int):
"""Get a single notification."""
n = Notification.query.get(notification_id)
if not n:
return error_response(
ErrorCodes.NOT_FOUND,
f'Notification with ID {notification_id} not found',
http_code=404
)
return success_response(n.to_dict())
@notifications_bp.route('', methods=['POST'])
@jwt_required()
def create_notification():
"""Create a new notification."""
data = request.get_json()
if not data:
return error_response(ErrorCodes.VALIDATION_ERROR, 'No data provided')
# Validate required fields
notification_text = data.get('notification') or data.get('message')
if not notification_text:
return error_response(ErrorCodes.VALIDATION_ERROR, 'notification/message is required')
# Parse dates
starttime = datetime.utcnow()
if data.get('starttime') or data.get('startdate'):
try:
date_str = data.get('starttime') or data.get('startdate')
starttime = datetime.fromisoformat(date_str.replace('Z', '+00:00'))
except ValueError:
return error_response(ErrorCodes.VALIDATION_ERROR, 'Invalid starttime format')
endtime = None
if data.get('endtime') or data.get('enddate'):
try:
date_str = data.get('endtime') or data.get('enddate')
endtime = datetime.fromisoformat(date_str.replace('Z', '+00:00'))
except ValueError:
return error_response(ErrorCodes.VALIDATION_ERROR, 'Invalid endtime format')
n = Notification(
notification=notification_text,
notificationtypeid=data.get('notificationtypeid'),
businessunitid=data.get('businessunitid'),
appid=data.get('appid'),
starttime=starttime,
endtime=endtime,
ticketnumber=data.get('ticketnumber'),
link=data.get('link') or data.get('linkurl'),
isactive=True,
isshopfloor=data.get('isshopfloor', False),
employeesso=data.get('employeesso'),
employeename=data.get('employeename')
)
db.session.add(n)
db.session.commit()
return success_response(n.to_dict(), message='Notification created', http_code=201)
@notifications_bp.route('/<int:notification_id>', methods=['PUT'])
@jwt_required()
def update_notification(notification_id: int):
"""Update a notification."""
n = Notification.query.get(notification_id)
if not n:
return error_response(
ErrorCodes.NOT_FOUND,
f'Notification with ID {notification_id} not found',
http_code=404
)
data = request.get_json()
if not data:
return error_response(ErrorCodes.VALIDATION_ERROR, 'No data provided')
# Update text content
if 'notification' in data or 'message' in data:
n.notification = data.get('notification') or data.get('message')
# Update simple fields
if 'notificationtypeid' in data:
n.notificationtypeid = data['notificationtypeid']
if 'businessunitid' in data:
n.businessunitid = data['businessunitid']
if 'appid' in data:
n.appid = data['appid']
if 'ticketnumber' in data:
n.ticketnumber = data['ticketnumber']
if 'link' in data or 'linkurl' in data:
n.link = data.get('link') or data.get('linkurl')
if 'isactive' in data:
n.isactive = data['isactive']
if 'isshopfloor' in data:
n.isshopfloor = data['isshopfloor']
if 'employeesso' in data:
n.employeesso = data['employeesso']
if 'employeename' in data:
n.employeename = data['employeename']
# Parse and update dates
if 'starttime' in data or 'startdate' in data:
date_str = data.get('starttime') or data.get('startdate')
if date_str:
try:
n.starttime = datetime.fromisoformat(date_str.replace('Z', '+00:00'))
except ValueError:
return error_response(ErrorCodes.VALIDATION_ERROR, 'Invalid starttime format')
else:
n.starttime = datetime.utcnow()
if 'endtime' in data or 'enddate' in data:
date_str = data.get('endtime') or data.get('enddate')
if date_str:
try:
n.endtime = datetime.fromisoformat(date_str.replace('Z', '+00:00'))
except ValueError:
return error_response(ErrorCodes.VALIDATION_ERROR, 'Invalid endtime format')
else:
n.endtime = None
db.session.commit()
return success_response(n.to_dict(), message='Notification updated')
@notifications_bp.route('/<int:notification_id>', methods=['DELETE'])
@jwt_required()
def delete_notification(notification_id: int):
"""Delete (soft delete) a notification."""
n = Notification.query.get(notification_id)
if not n:
return error_response(
ErrorCodes.NOT_FOUND,
f'Notification with ID {notification_id} not found',
http_code=404
)
n.isactive = False
db.session.commit()
return success_response(message='Notification deleted')
# =============================================================================
# Special Endpoints
# =============================================================================
@notifications_bp.route('/active', methods=['GET'])
def get_active_notifications():
"""
Get currently active notifications for display.
"""
now = datetime.utcnow()
notifications = Notification.query.filter(
Notification.isactive == True,
db.or_(
Notification.starttime.is_(None),
Notification.starttime <= now
),
db.or_(
Notification.endtime.is_(None),
Notification.endtime >= now
)
).order_by(Notification.starttime.desc()).all()
data = [n.to_dict() for n in notifications]
return success_response({
'notifications': data,
'total': len(data)
})
@notifications_bp.route('/calendar', methods=['GET'])
def get_calendar_events():
"""
Get notifications in FullCalendar event format.
Query parameters:
- start: Start date (ISO format)
- end: End date (ISO format)
"""
query = Notification.query.filter(Notification.isactive == True)
# Date range filter
if start := request.args.get('start'):
try:
start_date = datetime.fromisoformat(start.replace('Z', '+00:00'))
query = query.filter(
db.or_(
Notification.endtime >= start_date,
Notification.endtime.is_(None)
)
)
except ValueError:
pass
if end := request.args.get('end'):
try:
end_date = datetime.fromisoformat(end.replace('Z', '+00:00'))
query = query.filter(Notification.starttime <= end_date)
except ValueError:
pass
notifications = query.order_by(Notification.starttime).all()
events = [n.to_calendar_event() for n in notifications]
return success_response(events)
@notifications_bp.route('/dashboard/summary', methods=['GET'])
def dashboard_summary():
"""Get notifications dashboard summary."""
now = datetime.utcnow()
# Total active notifications
total_active = Notification.query.filter(
Notification.isactive == True,
db.or_(
Notification.starttime.is_(None),
Notification.starttime <= now
),
db.or_(
Notification.endtime.is_(None),
Notification.endtime >= now
)
).count()
# By type
by_type = db.session.query(
NotificationType.typename,
NotificationType.typecolor,
db.func.count(Notification.notificationid)
).join(Notification
).filter(
Notification.isactive == True
).group_by(NotificationType.typename, NotificationType.typecolor
).all()
return success_response({
'active': total_active,
'by_type': [{'type': t, 'color': c, 'count': n} for t, c, n in by_type]
})
@notifications_bp.route('/employee/<sso>', methods=['GET'])
def get_employee_recognitions(sso):
"""
Get recognitions for a specific employee by SSO.
Returns all recognition-type notifications where the employee is mentioned.
"""
if not sso or not sso.isdigit():
return error_response(ErrorCodes.VALIDATION_ERROR, 'Valid SSO required')
# Find recognition type(s)
recognition_types = NotificationType.query.filter(
db.or_(
NotificationType.typecolor == 'recognition',
NotificationType.typename.ilike('%recognition%')
)
).all()
recognition_type_ids = [rt.notificationtypeid for rt in recognition_types]
# Find notifications where this employee is mentioned
# Check both exact match and comma-separated list
query = Notification.query.filter(
Notification.isactive == True,
db.or_(
Notification.employeesso == sso,
Notification.employeesso.like(f'{sso},%'),
Notification.employeesso.like(f'%,{sso}'),
Notification.employeesso.like(f'%,{sso},%')
)
)
# Optionally filter to recognition types only
if recognition_type_ids:
query = query.filter(Notification.notificationtypeid.in_(recognition_type_ids))
query = query.order_by(Notification.starttime.desc())
notifications = query.all()
data = [n.to_dict() for n in notifications]
return success_response({
'recognitions': data,
'total': len(data)
})
@notifications_bp.route('/shopfloor', methods=['GET'])
def get_shopfloor_notifications():
"""
Get notifications for shopfloor TV dashboard.
Returns current and upcoming notifications with isshopfloor=1.
Splits multi-employee recognition into separate entries.
Query parameters:
- businessunit: Filter by business unit ID (null = all units)
"""
from datetime import timedelta
now = datetime.utcnow()
business_unit = request.args.get('businessunit')
# Base query for shopfloor notifications
base_query = Notification.query.filter(Notification.isshopfloor == True)
# Business unit filter
if business_unit and business_unit.isdigit():
# Specific BU: show that BU's notifications AND null (all units)
base_query = base_query.filter(
db.or_(
Notification.businessunitid == int(business_unit),
Notification.businessunitid.is_(None)
)
)
else:
# All units: only show notifications with NULL businessunitid
base_query = base_query.filter(Notification.businessunitid.is_(None))
# Current notifications (active now or ended within 30 minutes)
thirty_min_ago = now - timedelta(minutes=30)
current_query = base_query.filter(
db.or_(
# Active and currently showing
db.and_(
Notification.isactive == True,
db.or_(Notification.starttime.is_(None), Notification.starttime <= now),
db.or_(Notification.endtime.is_(None), Notification.endtime >= now)
),
# Recently ended (within 30 min) - show as resolved
db.and_(
Notification.endtime.isnot(None),
Notification.endtime >= thirty_min_ago,
Notification.endtime < now
)
)
).order_by(Notification.notificationid.desc())
current_notifications = current_query.all()
# Upcoming notifications (starts within next 5 days)
five_days = now + timedelta(days=5)
upcoming_query = base_query.filter(
Notification.isactive == True,
Notification.starttime > now,
Notification.starttime <= five_days
).order_by(Notification.starttime)
upcoming_notifications = upcoming_query.all()
def notification_to_shopfloor(n, employee_override=None):
"""Convert notification to shopfloor format."""
is_resolved = n.endtime and n.endtime < now
result = {
'notificationid': n.notificationid,
'notification': n.notification,
'starttime': n.starttime.isoformat() if n.starttime else None,
'endtime': n.endtime.isoformat() if n.endtime else None,
'ticketnumber': n.ticketnumber,
'link': n.link,
'isactive': n.isactive,
'isshopfloor': True,
'resolved': is_resolved,
'typename': n.notificationtype.typename if n.notificationtype else None,
'typecolor': n.notificationtype.typecolor if n.notificationtype else None,
}
# Employee info
if employee_override:
result['employeesso'] = employee_override.get('sso')
result['employeename'] = employee_override.get('name')
result['employeepicture'] = employee_override.get('picture')
else:
result['employeesso'] = n.employeesso
result['employeename'] = n.employeename
result['employeepicture'] = None
# Try to get picture from wjf_employees
if n.employeesso and n.employeesso.isdigit():
try:
import pymysql
conn = pymysql.connect(
host='localhost', user='root', password='rootpassword',
database='wjf_employees', cursorclass=pymysql.cursors.DictCursor
)
with conn.cursor() as cur:
cur.execute('SELECT Picture FROM employees WHERE SSO = %s', (int(n.employeesso),))
emp = cur.fetchone()
if emp and emp.get('Picture'):
result['employeepicture'] = emp['Picture']
conn.close()
except Exception:
pass
return result
# Process current notifications (split multi-employee recognition)
current_data = []
for n in current_notifications:
is_recognition = n.notificationtype and n.notificationtype.typecolor == 'recognition'
if is_recognition and n.employeesso and ',' in n.employeesso:
# Split into individual cards for each employee
ssos = [s.strip() for s in n.employeesso.split(',')]
names = n.employeename.split(', ') if n.employeename else []
for i, sso in enumerate(ssos):
name = names[i] if i < len(names) else sso
# Look up picture
picture = None
if sso.isdigit():
try:
import pymysql
conn = pymysql.connect(
host='localhost', user='root', password='rootpassword',
database='wjf_employees', cursorclass=pymysql.cursors.DictCursor
)
with conn.cursor() as cur:
cur.execute('SELECT Picture FROM employees WHERE SSO = %s', (int(sso),))
emp = cur.fetchone()
if emp:
picture = emp.get('Picture')
conn.close()
except Exception:
pass
current_data.append(notification_to_shopfloor(n, {
'sso': sso,
'name': name,
'picture': picture
}))
else:
current_data.append(notification_to_shopfloor(n))
# Process upcoming notifications
upcoming_data = []
for n in upcoming_notifications:
is_recognition = n.notificationtype and n.notificationtype.typecolor == 'recognition'
if is_recognition and n.employeesso and ',' in n.employeesso:
ssos = [s.strip() for s in n.employeesso.split(',')]
names = n.employeename.split(', ') if n.employeename else []
for i, sso in enumerate(ssos):
name = names[i] if i < len(names) else sso
picture = None
if sso.isdigit():
try:
import pymysql
conn = pymysql.connect(
host='localhost', user='root', password='rootpassword',
database='wjf_employees', cursorclass=pymysql.cursors.DictCursor
)
with conn.cursor() as cur:
cur.execute('SELECT Picture FROM employees WHERE SSO = %s', (int(sso),))
emp = cur.fetchone()
if emp:
picture = emp.get('Picture')
conn.close()
except Exception:
pass
upcoming_data.append(notification_to_shopfloor(n, {
'sso': sso,
'name': name,
'picture': picture
}))
else:
upcoming_data.append(notification_to_shopfloor(n))
return success_response({
'timestamp': now.isoformat(),
'current': current_data,
'upcoming': upcoming_data
})

View File

@@ -0,0 +1,12 @@
{
"name": "notifications",
"version": "1.0.0",
"description": "Notifications and announcements management plugin",
"author": "ShopDB Team",
"dependencies": [],
"core_version": ">=1.0.0",
"api_prefix": "/api/notifications",
"provides": {
"features": ["notifications", "announcements", "calendar_events"]
}
}

View File

@@ -0,0 +1,5 @@
"""Notifications plugin models."""
from .notification import Notification, NotificationType
__all__ = ['Notification', 'NotificationType']

View File

@@ -0,0 +1,157 @@
"""Notifications plugin models - adapted to existing database schema."""
from datetime import datetime
from shopdb.extensions import db
class NotificationType(db.Model):
"""
Notification type classification.
Matches existing notificationtypes table.
"""
__tablename__ = 'notificationtypes'
notificationtypeid = db.Column(db.Integer, primary_key=True)
typename = db.Column(db.String(50), nullable=False)
typedescription = db.Column(db.Text)
typecolor = db.Column(db.String(20), default='#17a2b8')
isactive = db.Column(db.Boolean, default=True)
def __repr__(self):
return f"<NotificationType {self.typename}>"
def to_dict(self):
return {
'notificationtypeid': self.notificationtypeid,
'typename': self.typename,
'typedescription': self.typedescription,
'typecolor': self.typecolor,
'isactive': self.isactive
}
class Notification(db.Model):
"""
Notification/announcement model.
Matches existing notifications table schema.
"""
__tablename__ = 'notifications'
notificationid = db.Column(db.Integer, primary_key=True)
notificationtypeid = db.Column(
db.Integer,
db.ForeignKey('notificationtypes.notificationtypeid'),
nullable=True
)
businessunitid = db.Column(db.Integer, nullable=True)
appid = db.Column(db.Integer, nullable=True)
notification = db.Column(db.Text, nullable=False, comment='The message content')
starttime = db.Column(db.DateTime, nullable=True)
endtime = db.Column(db.DateTime, nullable=True)
ticketnumber = db.Column(db.String(50), nullable=True)
link = db.Column(db.String(500), nullable=True)
isactive = db.Column(db.Boolean, default=True)
isshopfloor = db.Column(db.Boolean, default=False)
employeesso = db.Column(db.String(100), nullable=True)
employeename = db.Column(db.String(100), nullable=True)
# Relationships
notificationtype = db.relationship('NotificationType', backref='notifications')
def __repr__(self):
return f"<Notification {self.notificationid}>"
@property
def is_current(self):
"""Check if notification is currently active based on dates."""
now = datetime.utcnow()
if not self.isactive:
return False
if self.starttime and now < self.starttime:
return False
if self.endtime and now > self.endtime:
return False
return True
@property
def title(self):
"""Get title - first line or first 100 chars of notification."""
if not self.notification:
return ''
lines = self.notification.split('\n')
return lines[0][:100] if lines else self.notification[:100]
def to_dict(self):
"""Convert to dictionary with related data."""
result = {
'notificationid': self.notificationid,
'notificationtypeid': self.notificationtypeid,
'businessunitid': self.businessunitid,
'appid': self.appid,
'notification': self.notification,
'title': self.title,
'message': self.notification,
'starttime': self.starttime.isoformat() if self.starttime else None,
'endtime': self.endtime.isoformat() if self.endtime else None,
'startdate': self.starttime.isoformat() if self.starttime else None,
'enddate': self.endtime.isoformat() if self.endtime else None,
'ticketnumber': self.ticketnumber,
'link': self.link,
'linkurl': self.link,
'isactive': bool(self.isactive) if self.isactive is not None else True,
'isshopfloor': bool(self.isshopfloor) if self.isshopfloor is not None else False,
'employeesso': self.employeesso,
'employeename': self.employeename,
'iscurrent': self.is_current
}
# Add type info
if self.notificationtype:
result['typename'] = self.notificationtype.typename
result['typecolor'] = self.notificationtype.typecolor
return result
def to_calendar_event(self):
"""Convert to FullCalendar event format."""
# Map Bootstrap color names to hex colors
color_map = {
'success': '#04b962',
'warning': '#ff8800',
'danger': '#f5365c',
'info': '#14abef',
'primary': '#7934f3',
'secondary': '#94614f',
'recognition': '#14abef', # Blue for recognition
}
raw_color = self.notificationtype.typecolor if self.notificationtype else 'info'
# Use mapped color if it's a Bootstrap name, otherwise use as-is (hex)
color = color_map.get(raw_color, raw_color if raw_color.startswith('#') else '#14abef')
# For recognition notifications, include employee name (or SSO as fallback) in title
title = self.title
if raw_color == 'recognition':
employee_display = self.employeename or self.employeesso
if employee_display:
title = f"{employee_display}: {title}"
return {
'id': self.notificationid,
'title': title,
'start': self.starttime.isoformat() if self.starttime else None,
'end': self.endtime.isoformat() if self.endtime else None,
'allDay': True,
'backgroundColor': color,
'borderColor': color,
'extendedProps': {
'notificationid': self.notificationid,
'message': self.notification,
'typename': self.notificationtype.typename if self.notificationtype else None,
'typecolor': raw_color,
'linkurl': self.link,
'ticketnumber': self.ticketnumber,
'employeename': self.employeename,
'employeesso': self.employeesso,
}
}

View File

@@ -0,0 +1,204 @@
"""Notifications plugin main class."""
import json
import logging
from pathlib import Path
from typing import List, Dict, Optional, Type
from flask import Flask, Blueprint
import click
from shopdb.plugins.base import BasePlugin, PluginMeta
from shopdb.extensions import db
from .models import Notification, NotificationType
from .api import notifications_bp
logger = logging.getLogger(__name__)
class NotificationsPlugin(BasePlugin):
"""
Notifications plugin - manages announcements and notifications.
Provides functionality for:
- Creating and managing notifications/announcements
- Displaying banner notifications
- Calendar view of notifications
"""
def __init__(self):
self._manifest = self._load_manifest()
def _load_manifest(self) -> Dict:
"""Load plugin manifest from JSON file."""
manifest_path = Path(__file__).parent / 'manifest.json'
if manifest_path.exists():
with open(manifest_path, 'r') as f:
return json.load(f)
return {}
@property
def meta(self) -> PluginMeta:
"""Return plugin metadata."""
return PluginMeta(
name=self._manifest.get('name', 'notifications'),
version=self._manifest.get('version', '1.0.0'),
description=self._manifest.get(
'description',
'Notifications and announcements management'
),
author=self._manifest.get('author', 'ShopDB Team'),
dependencies=self._manifest.get('dependencies', []),
core_version=self._manifest.get('core_version', '>=1.0.0'),
api_prefix=self._manifest.get('api_prefix', '/api/notifications'),
)
def get_blueprint(self) -> Optional[Blueprint]:
"""Return Flask Blueprint with API routes."""
return notifications_bp
def get_models(self) -> List[Type]:
"""Return list of SQLAlchemy model classes."""
return [Notification, NotificationType]
def init_app(self, app: Flask, db_instance) -> None:
"""Initialize plugin with Flask app."""
logger.info(f"Notifications plugin initialized (v{self.meta.version})")
def on_install(self, app: Flask) -> None:
"""Called when plugin is installed."""
with app.app_context():
self._ensure_notification_types()
logger.info("Notifications plugin installed")
def _ensure_notification_types(self) -> None:
"""Ensure default notification types exist."""
default_types = [
('Awareness', 'General awareness notification', '#17a2b8', 'info-circle'),
('Change', 'Planned change notification', '#ffc107', 'exchange-alt'),
('Incident', 'Incident or outage notification', '#dc3545', 'exclamation-triangle'),
('Maintenance', 'Scheduled maintenance notification', '#6c757d', 'wrench'),
('General', 'General announcement', '#28a745', 'bullhorn'),
]
for typename, description, color, icon in default_types:
existing = NotificationType.query.filter_by(typename=typename).first()
if not existing:
t = NotificationType(
typename=typename,
description=description,
color=color,
icon=icon
)
db.session.add(t)
logger.debug(f"Created notification type: {typename}")
db.session.commit()
def on_uninstall(self, app: Flask) -> None:
"""Called when plugin is uninstalled."""
logger.info("Notifications plugin uninstalled")
def get_cli_commands(self) -> List:
"""Return CLI commands for this plugin."""
@click.group('notifications')
def notifications_cli():
"""Notifications plugin commands."""
pass
@notifications_cli.command('list-types')
def list_types():
"""List all notification types."""
from flask import current_app
with current_app.app_context():
types = NotificationType.query.filter_by(isactive=True).all()
if not types:
click.echo('No notification types found.')
return
click.echo('Notification Types:')
for t in types:
click.echo(f" [{t.notificationtypeid}] {t.typename} ({t.color})")
@notifications_cli.command('stats')
def stats():
"""Show notification statistics."""
from flask import current_app
from datetime import datetime
with current_app.app_context():
now = datetime.utcnow()
total = Notification.query.filter(
Notification.isactive == True
).count()
active = Notification.query.filter(
Notification.isactive == True,
Notification.startdate <= now,
db.or_(
Notification.enddate.is_(None),
Notification.enddate >= now
)
).count()
click.echo(f"Total notifications: {total}")
click.echo(f"Currently active: {active}")
@notifications_cli.command('create')
@click.option('--title', required=True, help='Notification title')
@click.option('--message', required=True, help='Notification message')
@click.option('--type', 'type_name', default='General', help='Notification type')
def create_notification(title, message, type_name):
"""Create a new notification."""
from flask import current_app
with current_app.app_context():
ntype = NotificationType.query.filter_by(typename=type_name).first()
if not ntype:
click.echo(f"Error: Notification type '{type_name}' not found.")
return
n = Notification(
title=title,
message=message,
notificationtypeid=ntype.notificationtypeid
)
db.session.add(n)
db.session.commit()
click.echo(f"Created notification #{n.notificationid}: {title}")
return [notifications_cli]
def get_dashboard_widgets(self) -> List[Dict]:
"""Return dashboard widget definitions."""
return [
{
'name': 'Active Notifications',
'component': 'NotificationsWidget',
'endpoint': '/api/notifications/dashboard/summary',
'size': 'small',
'position': 1,
},
]
def get_navigation_items(self) -> List[Dict]:
"""Return navigation menu items."""
return [
{
'name': 'Notifications',
'icon': 'bell',
'route': '/notifications',
'position': 5,
},
{
'name': 'Calendar',
'icon': 'calendar',
'route': '/calendar',
'position': 6,
},
]

View File

@@ -1,5 +1,9 @@
"""Printers plugin API."""
from .routes import printers_bp
from .routes import printers_bp # Legacy Machine-based API
from .asset_routes import printers_asset_bp # New Asset-based API
__all__ = ['printers_bp']
__all__ = [
'printers_bp', # Legacy
'printers_asset_bp', # New
]

View File

@@ -0,0 +1,472 @@
"""Printers API routes - new Asset-based architecture."""
from flask import Blueprint, request
from flask_jwt_extended import jwt_required
from shopdb.extensions import db
from shopdb.core.models import Asset, AssetType, Vendor, Model, Communication, CommunicationType
from shopdb.utils.responses import (
success_response,
error_response,
paginated_response,
ErrorCodes
)
from shopdb.utils.pagination import get_pagination_params, paginate_query
from ..models import Printer, PrinterType
from ..services import ZabbixService
printers_asset_bp = Blueprint('printers_asset', __name__)
# =============================================================================
# Printer Types
# =============================================================================
@printers_asset_bp.route('/types', methods=['GET'])
@jwt_required()
def list_printer_types():
"""List all printer types."""
page, per_page = get_pagination_params(request)
query = PrinterType.query
if request.args.get('active', 'true').lower() != 'false':
query = query.filter(PrinterType.isactive == True)
if search := request.args.get('search'):
query = query.filter(PrinterType.printertype.ilike(f'%{search}%'))
query = query.order_by(PrinterType.printertype)
items, total = paginate_query(query, page, per_page)
data = [t.to_dict() for t in items]
return paginated_response(data, page, per_page, total)
@printers_asset_bp.route('/types/<int:type_id>', methods=['GET'])
@jwt_required()
def get_printer_type(type_id: int):
"""Get a single printer type."""
t = PrinterType.query.get(type_id)
if not t:
return error_response(
ErrorCodes.NOT_FOUND,
f'Printer type with ID {type_id} not found',
http_code=404
)
return success_response(t.to_dict())
@printers_asset_bp.route('/types', methods=['POST'])
@jwt_required()
def create_printer_type():
"""Create a new printer type."""
data = request.get_json()
if not data or not data.get('printertype'):
return error_response(ErrorCodes.VALIDATION_ERROR, 'printertype is required')
if PrinterType.query.filter_by(printertype=data['printertype']).first():
return error_response(
ErrorCodes.CONFLICT,
f"Printer type '{data['printertype']}' already exists",
http_code=409
)
t = PrinterType(
printertype=data['printertype'],
description=data.get('description'),
icon=data.get('icon')
)
db.session.add(t)
db.session.commit()
return success_response(t.to_dict(), message='Printer type created', http_code=201)
# =============================================================================
# Printers CRUD
# =============================================================================
@printers_asset_bp.route('', methods=['GET'])
@jwt_required()
def list_printers():
"""
List all printers with filtering and pagination.
Query parameters:
- page, per_page: Pagination
- active: Filter by active status
- search: Search by asset number, name, or hostname
- type_id: Filter by printer type ID
- vendor_id: Filter by vendor ID
- location_id: Filter by location ID
- businessunit_id: Filter by business unit ID
"""
page, per_page = get_pagination_params(request)
# Join Printer with Asset
query = db.session.query(Printer).join(Asset)
# Active filter
if request.args.get('active', 'true').lower() != 'false':
query = query.filter(Asset.isactive == True)
# Search filter
if search := request.args.get('search'):
query = query.filter(
db.or_(
Asset.assetnumber.ilike(f'%{search}%'),
Asset.name.ilike(f'%{search}%'),
Asset.serialnumber.ilike(f'%{search}%'),
Printer.hostname.ilike(f'%{search}%'),
Printer.windowsname.ilike(f'%{search}%')
)
)
# Type filter
if type_id := request.args.get('type_id'):
query = query.filter(Printer.printertypeid == int(type_id))
# Vendor filter
if vendor_id := request.args.get('vendor_id'):
query = query.filter(Printer.vendorid == int(vendor_id))
# Location filter
if location_id := request.args.get('location_id'):
query = query.filter(Asset.locationid == int(location_id))
# Business unit filter
if bu_id := request.args.get('businessunit_id'):
query = query.filter(Asset.businessunitid == int(bu_id))
# Sorting
sort_by = request.args.get('sort', 'hostname')
sort_dir = request.args.get('dir', 'asc')
if sort_by == 'hostname':
col = Printer.hostname
elif sort_by == 'assetnumber':
col = Asset.assetnumber
elif sort_by == 'name':
col = Asset.name
else:
col = Printer.hostname
query = query.order_by(col.desc() if sort_dir == 'desc' else col)
items, total = paginate_query(query, page, per_page)
# Build response with both asset and printer data
data = []
for printer in items:
item = printer.asset.to_dict() if printer.asset else {}
item['printer'] = printer.to_dict()
# Add primary IP address
if printer.asset:
primary_comm = Communication.query.filter_by(
assetid=printer.asset.assetid,
isprimary=True
).first()
if not primary_comm:
primary_comm = Communication.query.filter_by(
assetid=printer.asset.assetid
).first()
item['ipaddress'] = primary_comm.ipaddress if primary_comm else None
data.append(item)
return paginated_response(data, page, per_page, total)
@printers_asset_bp.route('/<int:printer_id>', methods=['GET'])
@jwt_required()
def get_printer(printer_id: int):
"""Get a single printer with full details."""
printer = Printer.query.get(printer_id)
if not printer:
return error_response(
ErrorCodes.NOT_FOUND,
f'Printer with ID {printer_id} not found',
http_code=404
)
result = printer.asset.to_dict() if printer.asset else {}
result['printer'] = printer.to_dict()
# Add communications
if printer.asset:
comms = Communication.query.filter_by(assetid=printer.asset.assetid).all()
result['communications'] = [c.to_dict() for c in comms]
return success_response(result)
@printers_asset_bp.route('/by-asset/<int:asset_id>', methods=['GET'])
@jwt_required()
def get_printer_by_asset(asset_id: int):
"""Get printer data by asset ID."""
printer = Printer.query.filter_by(assetid=asset_id).first()
if not printer:
return error_response(
ErrorCodes.NOT_FOUND,
f'Printer for asset {asset_id} not found',
http_code=404
)
result = printer.asset.to_dict() if printer.asset else {}
result['printer'] = printer.to_dict()
return success_response(result)
@printers_asset_bp.route('', methods=['POST'])
@jwt_required()
def create_printer():
"""
Create new printer (creates both Asset and Printer records).
Required fields:
- assetnumber: Business identifier
Optional fields:
- name, serialnumber, statusid, locationid, businessunitid
- printertypeid, vendorid, modelnumberid, hostname
- windowsname, sharename, iscsf, installpath, pin
- iscolor, isduplex, isnetwork
- mapleft, maptop, notes
"""
data = request.get_json()
if not data:
return error_response(ErrorCodes.VALIDATION_ERROR, 'No data provided')
if not data.get('assetnumber'):
return error_response(ErrorCodes.VALIDATION_ERROR, 'assetnumber is required')
# Check for duplicate assetnumber
if Asset.query.filter_by(assetnumber=data['assetnumber']).first():
return error_response(
ErrorCodes.CONFLICT,
f"Asset with number '{data['assetnumber']}' already exists",
http_code=409
)
# Get printer asset type
printer_type = AssetType.query.filter_by(assettype='printer').first()
if not printer_type:
return error_response(
ErrorCodes.INTERNAL_ERROR,
'Printer asset type not found. Plugin may not be properly installed.',
http_code=500
)
# Create the core asset
asset = Asset(
assetnumber=data['assetnumber'],
name=data.get('name'),
serialnumber=data.get('serialnumber'),
assettypeid=printer_type.assettypeid,
statusid=data.get('statusid', 1),
locationid=data.get('locationid'),
businessunitid=data.get('businessunitid'),
mapleft=data.get('mapleft'),
maptop=data.get('maptop'),
notes=data.get('notes')
)
db.session.add(asset)
db.session.flush() # Get the assetid
# Create the printer extension
printer = Printer(
assetid=asset.assetid,
printertypeid=data.get('printertypeid'),
vendorid=data.get('vendorid'),
modelnumberid=data.get('modelnumberid'),
hostname=data.get('hostname'),
windowsname=data.get('windowsname'),
sharename=data.get('sharename'),
iscsf=data.get('iscsf', False),
installpath=data.get('installpath'),
pin=data.get('pin'),
iscolor=data.get('iscolor', False),
isduplex=data.get('isduplex', False),
isnetwork=data.get('isnetwork', True)
)
db.session.add(printer)
# Create communication record if IP provided
if data.get('ipaddress'):
ip_comtype = CommunicationType.query.filter_by(comtype='IP').first()
if ip_comtype:
comm = Communication(
assetid=asset.assetid,
comtypeid=ip_comtype.comtypeid,
ipaddress=data['ipaddress'],
isprimary=True
)
db.session.add(comm)
db.session.commit()
result = asset.to_dict()
result['printer'] = printer.to_dict()
return success_response(result, message='Printer created', http_code=201)
@printers_asset_bp.route('/<int:printer_id>', methods=['PUT'])
@jwt_required()
def update_printer(printer_id: int):
"""Update printer (both Asset and Printer records)."""
printer = Printer.query.get(printer_id)
if not printer:
return error_response(
ErrorCodes.NOT_FOUND,
f'Printer with ID {printer_id} not found',
http_code=404
)
data = request.get_json()
if not data:
return error_response(ErrorCodes.VALIDATION_ERROR, 'No data provided')
asset = printer.asset
# Check for conflicting assetnumber
if 'assetnumber' in data and data['assetnumber'] != asset.assetnumber:
if Asset.query.filter_by(assetnumber=data['assetnumber']).first():
return error_response(
ErrorCodes.CONFLICT,
f"Asset with number '{data['assetnumber']}' already exists",
http_code=409
)
# Update asset fields
asset_fields = ['assetnumber', 'name', 'serialnumber', 'statusid',
'locationid', 'businessunitid', 'mapleft', 'maptop', 'notes', 'isactive']
for key in asset_fields:
if key in data:
setattr(asset, key, data[key])
# Update printer fields
printer_fields = ['printertypeid', 'vendorid', 'modelnumberid', 'hostname',
'windowsname', 'sharename', 'iscsf', 'installpath', 'pin',
'iscolor', 'isduplex', 'isnetwork']
for key in printer_fields:
if key in data:
setattr(printer, key, data[key])
db.session.commit()
result = asset.to_dict()
result['printer'] = printer.to_dict()
return success_response(result, message='Printer updated')
@printers_asset_bp.route('/<int:printer_id>', methods=['DELETE'])
@jwt_required()
def delete_printer(printer_id: int):
"""Delete (soft delete) printer."""
printer = Printer.query.get(printer_id)
if not printer:
return error_response(
ErrorCodes.NOT_FOUND,
f'Printer with ID {printer_id} not found',
http_code=404
)
# Soft delete the asset
printer.asset.isactive = False
db.session.commit()
return success_response(message='Printer deleted')
# =============================================================================
# Supply Levels (Zabbix Integration)
# =============================================================================
@printers_asset_bp.route('/<int:printer_id>/supplies', methods=['GET'])
@jwt_required(optional=True)
def get_printer_supplies(printer_id: int):
"""Get supply levels from Zabbix (real-time lookup)."""
printer = Printer.query.get(printer_id)
if not printer:
return error_response(ErrorCodes.NOT_FOUND, 'Printer not found', http_code=404)
# Get IP address from communications
comm = Communication.query.filter_by(
assetid=printer.assetid,
isprimary=True
).first()
if not comm:
comm = Communication.query.filter_by(assetid=printer.assetid).first()
if not comm or not comm.ipaddress:
return error_response(ErrorCodes.VALIDATION_ERROR, 'Printer has no IP address')
service = ZabbixService()
if not service.isconfigured:
return error_response(ErrorCodes.BAD_REQUEST, 'Zabbix not configured')
supplies = service.getsuppliesbyip(comm.ipaddress)
return success_response({
'ipaddress': comm.ipaddress,
'supplies': supplies or []
})
# =============================================================================
# Dashboard
# =============================================================================
@printers_asset_bp.route('/dashboard/summary', methods=['GET'])
@jwt_required()
def dashboard_summary():
"""Get printer dashboard summary data."""
# Total active printers
total = db.session.query(Printer).join(Asset).filter(
Asset.isactive == True
).count()
# Count by printer type
by_type = db.session.query(
PrinterType.printertype,
db.func.count(Printer.printerid)
).join(Printer, Printer.printertypeid == PrinterType.printertypeid
).join(Asset, Asset.assetid == Printer.assetid
).filter(Asset.isactive == True
).group_by(PrinterType.printertype
).all()
# Count by vendor
by_vendor = db.session.query(
Vendor.vendor,
db.func.count(Printer.printerid)
).join(Printer, Printer.vendorid == Vendor.vendorid
).join(Asset, Asset.assetid == Printer.assetid
).filter(Asset.isactive == True
).group_by(Vendor.vendor
).all()
return success_response({
'total': total,
'by_type': [{'type': t, 'count': c} for t, c in by_type],
'by_vendor': [{'vendor': v, 'count': c} for v, c in by_vendor],
})

View File

@@ -1,7 +1,10 @@
"""Printers plugin models."""
from .printer_extension import PrinterData
from .printer_extension import PrinterData # Legacy model for Machine-based architecture
from .printer import Printer, PrinterType # New Asset-based models
__all__ = [
'PrinterData',
'PrinterData', # Legacy
'Printer', # New
'PrinterType', # New
]

View File

@@ -0,0 +1,122 @@
"""Printer plugin models - new Asset-based architecture."""
from shopdb.extensions import db
from shopdb.core.models.base import BaseModel
class PrinterType(BaseModel):
"""
Printer type classification.
Examples: Laser, Inkjet, Label, MFP, Plotter, etc.
"""
__tablename__ = 'printertypes'
printertypeid = db.Column(db.Integer, primary_key=True)
printertype = db.Column(db.String(100), unique=True, nullable=False)
description = db.Column(db.Text)
icon = db.Column(db.String(50), comment='Icon name for UI')
def __repr__(self):
return f"<PrinterType {self.printertype}>"
class Printer(BaseModel):
"""
Printer-specific extension data (new Asset architecture).
Links to core Asset table via assetid.
Stores printer-specific fields like type, Windows name, share name, etc.
"""
__tablename__ = 'printers'
printerid = db.Column(db.Integer, primary_key=True)
# Link to core asset
assetid = db.Column(
db.Integer,
db.ForeignKey('assets.assetid', ondelete='CASCADE'),
unique=True,
nullable=False,
index=True
)
# Printer classification
printertypeid = db.Column(
db.Integer,
db.ForeignKey('printertypes.printertypeid'),
nullable=True
)
# Vendor
vendorid = db.Column(
db.Integer,
db.ForeignKey('vendors.vendorid'),
nullable=True
)
modelnumberid = db.Column(
db.Integer,
db.ForeignKey('models.modelnumberid'),
nullable=True
)
# Network identity
hostname = db.Column(
db.String(100),
index=True,
comment='Network hostname'
)
# Windows/Network naming
windowsname = db.Column(
db.String(255),
comment='Windows printer name (e.g., \\\\server\\printer)'
)
sharename = db.Column(
db.String(100),
comment='CSF/share name'
)
# Installation
iscsf = db.Column(db.Boolean, default=False, comment='Is CSF printer')
installpath = db.Column(db.String(255), comment='Driver install path')
# Printer PIN (for secure print)
pin = db.Column(db.String(20))
# Features
iscolor = db.Column(db.Boolean, default=False, comment='Color capable')
isduplex = db.Column(db.Boolean, default=False, comment='Duplex capable')
isnetwork = db.Column(db.Boolean, default=True, comment='Network connected')
# Relationships
asset = db.relationship(
'Asset',
backref=db.backref('printer', uselist=False, lazy='joined')
)
printertype = db.relationship('PrinterType', backref='printers')
vendor = db.relationship('Vendor', backref='printer_items')
model = db.relationship('Model', backref='printer_items')
__table_args__ = (
db.Index('idx_printer_type', 'printertypeid'),
db.Index('idx_printer_hostname', 'hostname'),
db.Index('idx_printer_windowsname', 'windowsname'),
)
def __repr__(self):
return f"<Printer {self.hostname or self.assetid}>"
def to_dict(self):
"""Convert to dictionary with related names."""
result = super().to_dict()
# Add related object names
if self.printertype:
result['printertype_name'] = self.printertype.printertype
if self.vendor:
result['vendor_name'] = self.vendor.vendor
if self.model:
result['model_name'] = self.model.modelnumber
return result

View File

@@ -11,9 +11,10 @@ import click
from shopdb.plugins.base import BasePlugin, PluginMeta
from shopdb.extensions import db
from shopdb.core.models.machine import MachineType
from shopdb.core.models import AssetType
from .models import PrinterData
from .api import printers_bp
from .models import PrinterData, Printer, PrinterType
from .api import printers_bp, printers_asset_bp
from .services import ZabbixService
logger = logging.getLogger(__name__)
@@ -21,11 +22,15 @@ logger = logging.getLogger(__name__)
class PrintersPlugin(BasePlugin):
"""
Printers plugin - extends machines with printer-specific functionality.
Printers plugin - manages printer assets.
Printers use the unified Machine model with machinetype.category = 'Printer'.
This plugin adds:
- PrinterData table for printer-specific fields (windowsname, sharename, etc.)
Supports both legacy Machine-based architecture and new Asset-based architecture:
- Legacy: PrinterData table linked to machines
- New: Printer table linked to assets
Features:
- PrinterType classification
- Windows/network naming
- Zabbix integration for real-time supply level lookups
"""
@@ -46,7 +51,7 @@ class PrintersPlugin(BasePlugin):
"""Return plugin metadata."""
return PluginMeta(
name=self._manifest.get('name', 'printers'),
version=self._manifest.get('version', '1.0.0'),
version=self._manifest.get('version', '2.0.0'),
description=self._manifest.get(
'description',
'Printer management with Zabbix integration'
@@ -58,12 +63,21 @@ class PrintersPlugin(BasePlugin):
)
def get_blueprint(self) -> Optional[Blueprint]:
"""Return Flask Blueprint with API routes."""
return printers_bp
"""
Return Flask Blueprint with API routes.
Returns the new Asset-based blueprint.
Legacy Machine-based blueprint is registered separately in init_app.
"""
return printers_asset_bp
def get_models(self) -> List[Type]:
"""Return list of SQLAlchemy model classes."""
return [PrinterData]
return [
PrinterData, # Legacy Machine-based
Printer, # New Asset-based
PrinterType, # New printer type classification
]
def get_services(self) -> Dict[str, Type]:
"""Return plugin services."""
@@ -82,16 +96,63 @@ class PrintersPlugin(BasePlugin):
"""Initialize plugin with Flask app."""
app.config.setdefault('ZABBIX_URL', '')
app.config.setdefault('ZABBIX_TOKEN', '')
# Register legacy blueprint for backward compatibility
app.register_blueprint(printers_bp, url_prefix='/api/printers/legacy')
logger.info(f"Printers plugin initialized (v{self.meta.version})")
def on_install(self, app: Flask) -> None:
"""Called when plugin is installed."""
with app.app_context():
self._ensureprintertypes()
self._ensure_asset_type()
self._ensure_printer_types()
self._ensure_legacy_machine_types()
logger.info("Printers plugin installed")
def _ensureprintertypes(self) -> None:
"""Ensure basic printer machine types exist."""
def _ensure_asset_type(self) -> None:
"""Ensure printer asset type exists."""
existing = AssetType.query.filter_by(assettype='printer').first()
if not existing:
at = AssetType(
assettype='printer',
plugin_name='printers',
table_name='printers',
description='Printers (laser, inkjet, label, MFP, plotter)',
icon='printer'
)
db.session.add(at)
logger.debug("Created asset type: printer")
db.session.commit()
def _ensure_printer_types(self) -> None:
"""Ensure basic printer types exist (new architecture)."""
printer_types = [
('Laser', 'Standard laser printer', 'printer'),
('Inkjet', 'Inkjet printer', 'printer'),
('Label', 'Label/barcode printer', 'barcode'),
('MFP', 'Multifunction printer with scan/copy/fax', 'printer'),
('Plotter', 'Large format plotter', 'drafting-compass'),
('Thermal', 'Thermal printer', 'temperature-high'),
('Dot Matrix', 'Dot matrix printer', 'th'),
('Other', 'Other printer type', 'printer'),
]
for name, description, icon in printer_types:
existing = PrinterType.query.filter_by(printertype=name).first()
if not existing:
pt = PrinterType(
printertype=name,
description=description,
icon=icon
)
db.session.add(pt)
logger.debug(f"Created printer type: {name}")
db.session.commit()
def _ensure_legacy_machine_types(self) -> None:
"""Ensure basic printer machine types exist (legacy architecture)."""
printertypes = [
('Laser Printer', 'Printer', 'Standard laser printer'),
('Inkjet Printer', 'Printer', 'Inkjet printer'),

5
plugins/usb/__init__.py Normal file
View File

@@ -0,0 +1,5 @@
"""USB device checkout plugin."""
from .plugin import USBPlugin
__all__ = ['USBPlugin']

View File

@@ -0,0 +1,5 @@
"""USB plugin API."""
from .routes import usb_bp
__all__ = ['usb_bp']

275
plugins/usb/api/routes.py Normal file
View File

@@ -0,0 +1,275 @@
"""USB plugin API endpoints."""
from flask import Blueprint, request
from flask_jwt_extended import jwt_required
from datetime import datetime
from shopdb.extensions import db
from shopdb.core.models import Machine, MachineType, Vendor, Model
from shopdb.utils.responses import (
success_response,
error_response,
paginated_response,
ErrorCodes
)
from shopdb.utils.pagination import get_pagination_params, paginate_query
from ..models import USBCheckout
usb_bp = Blueprint('usb', __name__)
def get_usb_machinetype_id():
"""Get the USB Device machine type ID dynamically."""
usb_type = MachineType.query.filter(
MachineType.machinetype.ilike('%usb%')
).first()
return usb_type.machinetypeid if usb_type else None
@usb_bp.route('', methods=['GET'])
@jwt_required()
def list_usb_devices():
"""
List all USB devices with checkout status.
Query parameters:
- page, per_page: Pagination
- search: Search by serial number or alias
- available: Filter to only available (not checked out) devices
"""
page, per_page = get_pagination_params(request)
usb_type_id = get_usb_machinetype_id()
if not usb_type_id:
return success_response([]) # No USB type found
# Get USB devices from machines table
query = db.session.query(Machine).filter(
Machine.machinetypeid == usb_type_id,
Machine.isactive == True
)
# Search filter
if search := request.args.get('search'):
query = query.filter(
db.or_(
Machine.serialnumber.ilike(f'%{search}%'),
Machine.alias.ilike(f'%{search}%'),
Machine.machinenumber.ilike(f'%{search}%')
)
)
query = query.order_by(Machine.alias)
items, total = paginate_query(query, page, per_page)
# Build response with checkout status
data = []
for device in items:
# Check if currently checked out
active_checkout = USBCheckout.query.filter_by(
machineid=device.machineid,
checkin_time=None
).first()
item = {
'machineid': device.machineid,
'machinenumber': device.machinenumber,
'alias': device.alias,
'serialnumber': device.serialnumber,
'notes': device.notes,
'vendor_name': device.vendor.vendorname if device.vendor else None,
'model_name': device.model.modelnumber if device.model else None,
'is_checked_out': active_checkout is not None,
'current_checkout': active_checkout.to_dict() if active_checkout else None
}
data.append(item)
# Filter by availability if requested
if request.args.get('available', '').lower() == 'true':
data = [d for d in data if not d['is_checked_out']]
total = len(data)
return paginated_response(data, page, per_page, total)
@usb_bp.route('/<int:device_id>', methods=['GET'])
@jwt_required()
def get_usb_device(device_id: int):
"""Get a single USB device with checkout history."""
device = Machine.query.filter_by(
machineid=device_id,
machinetypeid=get_usb_machinetype_id()
).first()
if not device:
return error_response(
ErrorCodes.NOT_FOUND,
f'USB device with ID {device_id} not found',
http_code=404
)
# Get checkout history
checkouts = USBCheckout.query.filter_by(
machineid=device_id
).order_by(USBCheckout.checkout_time.desc()).limit(50).all()
# Check current checkout
active_checkout = next((c for c in checkouts if c.checkin_time is None), None)
result = {
'machineid': device.machineid,
'machinenumber': device.machinenumber,
'alias': device.alias,
'serialnumber': device.serialnumber,
'notes': device.notes,
'vendor_name': device.vendor.vendorname if device.vendor else None,
'model_name': device.model.modelnumber if device.model else None,
'is_checked_out': active_checkout is not None,
'current_checkout': active_checkout.to_dict() if active_checkout else None,
'checkout_history': [c.to_dict() for c in checkouts]
}
return success_response(result)
@usb_bp.route('/<int:device_id>/checkout', methods=['POST'])
@jwt_required()
def checkout_device(device_id: int):
"""Check out a USB device."""
device = Machine.query.filter_by(
machineid=device_id,
machinetypeid=get_usb_machinetype_id()
).first()
if not device:
return error_response(
ErrorCodes.NOT_FOUND,
f'USB device with ID {device_id} not found',
http_code=404
)
# Check if already checked out
active_checkout = USBCheckout.query.filter_by(
machineid=device_id,
checkin_time=None
).first()
if active_checkout:
return error_response(
ErrorCodes.CONFLICT,
f'Device is already checked out by {active_checkout.sso}',
http_code=409
)
data = request.get_json() or {}
if not data.get('sso'):
return error_response(ErrorCodes.VALIDATION_ERROR, 'sso is required')
checkout = USBCheckout(
machineid=device_id,
sso=data['sso'],
checkout_name=data.get('name'),
checkout_reason=data.get('reason'),
checkout_time=datetime.utcnow()
)
db.session.add(checkout)
db.session.commit()
return success_response(checkout.to_dict(), message='Device checked out', http_code=201)
@usb_bp.route('/<int:device_id>/checkin', methods=['POST'])
@jwt_required()
def checkin_device(device_id: int):
"""Check in a USB device."""
device = Machine.query.filter_by(
machineid=device_id,
machinetypeid=get_usb_machinetype_id()
).first()
if not device:
return error_response(
ErrorCodes.NOT_FOUND,
f'USB device with ID {device_id} not found',
http_code=404
)
# Find active checkout
active_checkout = USBCheckout.query.filter_by(
machineid=device_id,
checkin_time=None
).first()
if not active_checkout:
return error_response(
ErrorCodes.VALIDATION_ERROR,
'Device is not currently checked out',
http_code=400
)
data = request.get_json() or {}
active_checkout.checkin_time = datetime.utcnow()
active_checkout.was_wiped = data.get('was_wiped', False)
active_checkout.checkin_notes = data.get('notes')
db.session.commit()
return success_response(active_checkout.to_dict(), message='Device checked in')
@usb_bp.route('/<int:device_id>/history', methods=['GET'])
@jwt_required()
def get_checkout_history(device_id: int):
"""Get checkout history for a USB device."""
page, per_page = get_pagination_params(request)
query = USBCheckout.query.filter_by(
machineid=device_id
).order_by(USBCheckout.checkout_time.desc())
items, total = paginate_query(query, page, per_page)
data = [c.to_dict() for c in items]
return paginated_response(data, page, per_page, total)
@usb_bp.route('/checkouts', methods=['GET'])
@jwt_required()
def list_all_checkouts():
"""List all checkouts (active and historical)."""
page, per_page = get_pagination_params(request)
query = db.session.query(USBCheckout).join(
Machine, USBCheckout.machineid == Machine.machineid
)
# Filter by active only
if request.args.get('active', '').lower() == 'true':
query = query.filter(USBCheckout.checkin_time == None)
# Filter by user
if sso := request.args.get('sso'):
query = query.filter(USBCheckout.sso == sso)
query = query.order_by(USBCheckout.checkout_time.desc())
items, total = paginate_query(query, page, per_page)
# Include device info
data = []
for checkout in items:
device = Machine.query.get(checkout.machineid)
item = checkout.to_dict()
item['device'] = {
'machineid': device.machineid,
'alias': device.alias,
'serialnumber': device.serialnumber
} if device else None
data.append(item)
return paginated_response(data, page, per_page, total)

View File

@@ -0,0 +1,9 @@
{
"name": "usb",
"version": "1.0.0",
"description": "USB device checkout management",
"author": "ShopDB Team",
"dependencies": [],
"core_version": ">=1.0.0",
"api_prefix": "/api/usb"
}

View File

@@ -0,0 +1,5 @@
"""USB plugin models."""
from .usb_checkout import USBCheckout
__all__ = ['USBCheckout']

View File

@@ -0,0 +1,38 @@
"""USB Checkout model."""
from shopdb.extensions import db
from datetime import datetime
class USBCheckout(db.Model):
"""
USB device checkout tracking.
References machines table (USB devices have machinetypeid=44).
"""
__tablename__ = 'usbcheckouts'
checkoutid = db.Column(db.Integer, primary_key=True)
machineid = db.Column(db.Integer, db.ForeignKey('machines.machineid'), nullable=False, index=True)
sso = db.Column(db.String(20), nullable=False, index=True)
checkout_name = db.Column(db.String(100))
checkout_reason = db.Column(db.Text)
checkout_time = db.Column(db.DateTime, default=datetime.utcnow, index=True)
checkin_time = db.Column(db.DateTime, index=True)
was_wiped = db.Column(db.Boolean, default=False)
checkin_notes = db.Column(db.Text)
def to_dict(self):
"""Convert to dictionary."""
return {
'checkoutid': self.checkoutid,
'machineid': self.machineid,
'sso': self.sso,
'checkout_name': self.checkout_name,
'checkout_reason': self.checkout_reason,
'checkout_time': self.checkout_time.isoformat() if self.checkout_time else None,
'checkin_time': self.checkin_time.isoformat() if self.checkin_time else None,
'was_wiped': self.was_wiped,
'checkin_notes': self.checkin_notes,
'is_checked_out': self.checkin_time is None
}

View File

@@ -0,0 +1,169 @@
"""USB device plugin models."""
from datetime import datetime
from shopdb.extensions import db
from shopdb.core.models.base import BaseModel, AuditMixin
class USBDeviceType(BaseModel):
"""
USB device type classification.
Examples: Flash Drive, External HDD, External SSD, Card Reader
"""
__tablename__ = 'usbdevicetypes'
usbdevicetypeid = db.Column(db.Integer, primary_key=True)
typename = db.Column(db.String(50), unique=True, nullable=False)
description = db.Column(db.Text)
icon = db.Column(db.String(50), default='usb', comment='Icon name for UI')
def __repr__(self):
return f"<USBDeviceType {self.typename}>"
class USBDevice(BaseModel, AuditMixin):
"""
USB device model.
Tracks USB storage devices that can be checked out by users.
"""
__tablename__ = 'usbdevices'
usbdeviceid = db.Column(db.Integer, primary_key=True)
# Identification
serialnumber = db.Column(db.String(100), unique=True, nullable=False)
label = db.Column(db.String(100), nullable=True, comment='Human-readable label')
assetnumber = db.Column(db.String(50), nullable=True, comment='Optional asset tag')
# Classification
usbdevicetypeid = db.Column(
db.Integer,
db.ForeignKey('usbdevicetypes.usbdevicetypeid'),
nullable=True
)
# Specifications
capacitygb = db.Column(db.Integer, nullable=True, comment='Capacity in GB')
vendorid = db.Column(db.String(10), nullable=True, comment='USB Vendor ID (hex)')
productid = db.Column(db.String(10), nullable=True, comment='USB Product ID (hex)')
manufacturer = db.Column(db.String(100), nullable=True)
productname = db.Column(db.String(100), nullable=True)
# Current status
ischeckedout = db.Column(db.Boolean, default=False)
currentuserid = db.Column(db.String(50), nullable=True, comment='SSO of current user')
currentusername = db.Column(db.String(100), nullable=True, comment='Name of current user')
currentcheckoutdate = db.Column(db.DateTime, nullable=True)
# Location
storagelocation = db.Column(db.String(200), nullable=True, comment='Where device is stored when not checked out')
# Notes
notes = db.Column(db.Text, nullable=True)
# Relationships
devicetype = db.relationship('USBDeviceType', backref='devices')
# Indexes
__table_args__ = (
db.Index('idx_usb_serial', 'serialnumber'),
db.Index('idx_usb_checkedout', 'ischeckedout'),
db.Index('idx_usb_type', 'usbdevicetypeid'),
db.Index('idx_usb_currentuser', 'currentuserid'),
)
def __repr__(self):
return f"<USBDevice {self.label or self.serialnumber}>"
@property
def display_name(self):
"""Get display name (label if set, otherwise serial number)."""
return self.label or self.serialnumber
def to_dict(self):
"""Convert to dictionary with related data."""
result = super().to_dict()
# Add type info
if self.devicetype:
result['typename'] = self.devicetype.typename
result['typeicon'] = self.devicetype.icon
# Add computed property
result['displayname'] = self.display_name
return result
class USBCheckout(BaseModel):
"""
USB device checkout history.
Tracks when devices are checked out and returned.
"""
__tablename__ = 'usbcheckouts'
usbcheckoutid = db.Column(db.Integer, primary_key=True)
# Device reference
usbdeviceid = db.Column(
db.Integer,
db.ForeignKey('usbdevices.usbdeviceid', ondelete='CASCADE'),
nullable=False
)
# User info
userid = db.Column(db.String(50), nullable=False, comment='SSO of user')
username = db.Column(db.String(100), nullable=True, comment='Name of user')
# Checkout details
checkoutdate = db.Column(db.DateTime, nullable=False, default=datetime.utcnow)
checkindate = db.Column(db.DateTime, nullable=True)
expectedreturndate = db.Column(db.DateTime, nullable=True)
# Metadata
purpose = db.Column(db.String(500), nullable=True, comment='Reason for checkout')
notes = db.Column(db.Text, nullable=True)
checkedoutby = db.Column(db.String(50), nullable=True, comment='Admin who processed checkout')
checkedinby = db.Column(db.String(50), nullable=True, comment='Admin who processed checkin')
# Relationships
device = db.relationship('USBDevice', backref=db.backref('checkouts', lazy='dynamic'))
# Indexes
__table_args__ = (
db.Index('idx_usbcheckout_device', 'usbdeviceid'),
db.Index('idx_usbcheckout_user', 'userid'),
db.Index('idx_usbcheckout_dates', 'checkoutdate', 'checkindate'),
)
def __repr__(self):
return f"<USBCheckout device={self.usbdeviceid} user={self.userid}>"
@property
def is_active(self):
"""Check if this checkout is currently active (not returned)."""
return self.checkindate is None
@property
def duration_days(self):
"""Get duration of checkout in days."""
end = self.checkindate or datetime.utcnow()
delta = end - self.checkoutdate
return delta.days
def to_dict(self):
"""Convert to dictionary with computed fields."""
result = super().to_dict()
result['isactivecheckout'] = self.is_active
result['durationdays'] = self.duration_days
# Add device info if loaded
if self.device:
result['devicelabel'] = self.device.label
result['deviceserialnumber'] = self.device.serialnumber
return result

80
plugins/usb/plugin.py Normal file
View File

@@ -0,0 +1,80 @@
"""USB plugin main class."""
import json
import logging
from pathlib import Path
from typing import List, Dict, Optional, Type
from flask import Flask, Blueprint
from shopdb.plugins.base import BasePlugin, PluginMeta
from shopdb.extensions import db
from .models import USBCheckout
from .api import usb_bp
logger = logging.getLogger(__name__)
class USBPlugin(BasePlugin):
"""
USB plugin - manages USB device checkouts.
USB devices are stored in the machines table (machinetypeid=44).
This plugin provides checkout/checkin tracking.
"""
def __init__(self):
self._manifest = self._load_manifest()
def _load_manifest(self) -> Dict:
"""Load plugin manifest from JSON file."""
manifest_path = Path(__file__).parent / 'manifest.json'
if manifest_path.exists():
with open(manifest_path, 'r') as f:
return json.load(f)
return {}
@property
def meta(self) -> PluginMeta:
"""Return plugin metadata."""
return PluginMeta(
name=self._manifest.get('name', 'usb'),
version=self._manifest.get('version', '1.0.0'),
description=self._manifest.get('description', 'USB device checkout management'),
author=self._manifest.get('author', 'ShopDB Team'),
dependencies=self._manifest.get('dependencies', []),
core_version=self._manifest.get('core_version', '>=1.0.0'),
api_prefix=self._manifest.get('api_prefix', '/api/usb'),
)
def get_blueprint(self) -> Optional[Blueprint]:
"""Return Flask Blueprint with API routes."""
return usb_bp
def get_models(self) -> List[Type]:
"""Return list of SQLAlchemy model classes."""
return [USBCheckout]
def init_app(self, app: Flask, db_instance) -> None:
"""Initialize plugin with Flask app."""
logger.info(f"USB plugin initialized (v{self.meta.version})")
def on_install(self, app: Flask) -> None:
"""Called when plugin is installed."""
logger.info("USB plugin installed")
def on_uninstall(self, app: Flask) -> None:
"""Called when plugin is uninstalled."""
logger.info("USB plugin uninstalled")
def get_navigation_items(self) -> List[Dict]:
"""Return navigation menu items."""
return [
{
'name': 'USB Devices',
'icon': 'usb',
'route': '/usb',
'position': 45,
},
]