Initial commit: Shop Database Flask Application

Flask backend with Vue 3 frontend for shop floor machine management.
Includes database schema export for MySQL shopdb_flask database.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
cproudlock
2026-01-13 16:07:34 -05:00
commit 1196de6e88
188 changed files with 19921 additions and 0 deletions

View File

@@ -0,0 +1,33 @@
"""Core API blueprints."""
from .auth import auth_bp
from .machines import machines_bp
from .machinetypes import machinetypes_bp
from .pctypes import pctypes_bp
from .statuses import statuses_bp
from .vendors import vendors_bp
from .models import models_bp
from .businessunits import businessunits_bp
from .locations import locations_bp
from .operatingsystems import operatingsystems_bp
from .dashboard import dashboard_bp
from .applications import applications_bp
from .knowledgebase import knowledgebase_bp
from .search import search_bp
__all__ = [
'auth_bp',
'machines_bp',
'machinetypes_bp',
'pctypes_bp',
'statuses_bp',
'vendors_bp',
'models_bp',
'businessunits_bp',
'locations_bp',
'operatingsystems_bp',
'dashboard_bp',
'applications_bp',
'knowledgebase_bp',
'search_bp',
]

View File

@@ -0,0 +1,429 @@
"""Applications API endpoints."""
from flask import Blueprint, request
from flask_jwt_extended import jwt_required
from shopdb.extensions import db
from shopdb.core.models import (
Application, AppVersion, AppOwner, SupportTeam, InstalledApp, Machine
)
from shopdb.utils.responses import (
success_response,
error_response,
paginated_response,
ErrorCodes
)
from shopdb.utils.pagination import get_pagination_params, paginate_query
applications_bp = Blueprint('applications', __name__)
@applications_bp.route('', methods=['GET'])
@jwt_required(optional=True)
def list_applications():
"""List all applications."""
page, per_page = get_pagination_params(request)
query = Application.query
if request.args.get('active', 'true').lower() != 'false':
query = query.filter(Application.isactive == True)
# Filter out hidden unless specifically requested
if request.args.get('showhidden', 'false').lower() != 'true':
query = query.filter(Application.ishidden == False)
# Filter by installable
if request.args.get('installable') is not None:
installable = request.args.get('installable').lower() == 'true'
query = query.filter(Application.isinstallable == installable)
if search := request.args.get('search'):
query = query.filter(
db.or_(
Application.appname.ilike(f'%{search}%'),
Application.appdescription.ilike(f'%{search}%')
)
)
query = query.order_by(Application.appname)
items, total = paginate_query(query, page, per_page)
data = []
for app in items:
app_dict = app.to_dict()
if app.supportteam:
app_dict['supportteam'] = {
'supportteamid': app.supportteam.supportteamid,
'teamname': app.supportteam.teamname,
'teamurl': app.supportteam.teamurl,
'owner': {
'appownerid': app.supportteam.owner.appownerid,
'appowner': app.supportteam.owner.appowner,
'sso': app.supportteam.owner.sso
} if app.supportteam.owner else None
}
else:
app_dict['supportteam'] = None
app_dict['installedcount'] = app.installed_on.filter_by(isactive=True).count()
data.append(app_dict)
return paginated_response(data, page, per_page, total)
@applications_bp.route('/<int:app_id>', methods=['GET'])
@jwt_required(optional=True)
def get_application(app_id: int):
"""Get a single application with details."""
app = Application.query.get(app_id)
if not app:
return error_response(ErrorCodes.NOT_FOUND, 'Application not found', http_code=404)
data = app.to_dict()
if app.supportteam:
data['supportteam'] = {
'supportteamid': app.supportteam.supportteamid,
'teamname': app.supportteam.teamname,
'teamurl': app.supportteam.teamurl,
'owner': {
'appownerid': app.supportteam.owner.appownerid,
'appowner': app.supportteam.owner.appowner,
'sso': app.supportteam.owner.sso
} if app.supportteam.owner else None
}
else:
data['supportteam'] = None
data['versions'] = [v.to_dict() for v in app.versions.filter_by(isactive=True).order_by(AppVersion.version.desc()).all()]
data['installedcount'] = app.installed_on.filter_by(isactive=True).count()
return success_response(data)
@applications_bp.route('', methods=['POST'])
@jwt_required()
def create_application():
"""Create a new application."""
data = request.get_json()
if not data or not data.get('appname'):
return error_response(ErrorCodes.VALIDATION_ERROR, 'appname is required')
if Application.query.filter_by(appname=data['appname']).first():
return error_response(
ErrorCodes.CONFLICT,
f"Application '{data['appname']}' already exists",
http_code=409
)
app = Application(
appname=data['appname'],
appdescription=data.get('appdescription'),
supportteamid=data.get('supportteamid'),
isinstallable=data.get('isinstallable', False),
applicationnotes=data.get('applicationnotes'),
installpath=data.get('installpath'),
applicationlink=data.get('applicationlink'),
documentationpath=data.get('documentationpath'),
ishidden=data.get('ishidden', False),
isprinter=data.get('isprinter', False),
islicenced=data.get('islicenced', False),
image=data.get('image')
)
db.session.add(app)
db.session.commit()
return success_response(app.to_dict(), message='Application created', http_code=201)
@applications_bp.route('/<int:app_id>', methods=['PUT'])
@jwt_required()
def update_application(app_id: int):
"""Update an application."""
app = Application.query.get(app_id)
if not app:
return error_response(ErrorCodes.NOT_FOUND, 'Application not found', http_code=404)
data = request.get_json()
if not data:
return error_response(ErrorCodes.VALIDATION_ERROR, 'No data provided')
if 'appname' in data and data['appname'] != app.appname:
if Application.query.filter_by(appname=data['appname']).first():
return error_response(
ErrorCodes.CONFLICT,
f"Application '{data['appname']}' already exists",
http_code=409
)
fields = [
'appname', 'appdescription', 'supportteamid', 'isinstallable',
'applicationnotes', 'installpath', 'applicationlink', 'documentationpath',
'ishidden', 'isprinter', 'islicenced', 'image', 'isactive'
]
for key in fields:
if key in data:
setattr(app, key, data[key])
db.session.commit()
return success_response(app.to_dict(), message='Application updated')
@applications_bp.route('/<int:app_id>', methods=['DELETE'])
@jwt_required()
def delete_application(app_id: int):
"""Delete (deactivate) an application."""
app = Application.query.get(app_id)
if not app:
return error_response(ErrorCodes.NOT_FOUND, 'Application not found', http_code=404)
app.isactive = False
db.session.commit()
return success_response(message='Application deleted')
# ---- Versions ----
@applications_bp.route('/<int:app_id>/versions', methods=['GET'])
@jwt_required(optional=True)
def list_versions(app_id: int):
"""List all versions for an application."""
app = Application.query.get(app_id)
if not app:
return error_response(ErrorCodes.NOT_FOUND, 'Application not found', http_code=404)
versions = app.versions.filter_by(isactive=True).order_by(AppVersion.version.desc()).all()
return success_response([v.to_dict() for v in versions])
@applications_bp.route('/<int:app_id>/versions', methods=['POST'])
@jwt_required()
def create_version(app_id: int):
"""Create a new version for an application."""
app = Application.query.get(app_id)
if not app:
return error_response(ErrorCodes.NOT_FOUND, 'Application not found', http_code=404)
data = request.get_json()
if not data or not data.get('version'):
return error_response(ErrorCodes.VALIDATION_ERROR, 'version is required')
if AppVersion.query.filter_by(appid=app_id, version=data['version']).first():
return error_response(
ErrorCodes.CONFLICT,
f"Version '{data['version']}' already exists for this application",
http_code=409
)
version = AppVersion(
appid=app_id,
version=data['version'],
releasedate=data.get('releasedate'),
notes=data.get('notes')
)
db.session.add(version)
db.session.commit()
return success_response(version.to_dict(), message='Version created', http_code=201)
# ---- Machines with this app installed ----
@applications_bp.route('/<int:app_id>/installed', methods=['GET'])
@jwt_required(optional=True)
def list_installed_machines(app_id: int):
"""List all machines that have this application installed."""
app = Application.query.get(app_id)
if not app:
return error_response(ErrorCodes.NOT_FOUND, 'Application not found', http_code=404)
installed = app.installed_on.filter_by(isactive=True).all()
data = []
for i in installed:
item = i.to_dict()
if i.machine:
item['machine'] = {
'machineid': i.machine.machineid,
'machinenumber': i.machine.machinenumber,
'alias': i.machine.alias,
'hostname': i.machine.hostname
}
data.append(item)
return success_response(data)
# ---- Installed Apps (per machine) ----
@applications_bp.route('/machines/<int:machine_id>', methods=['GET'])
@jwt_required(optional=True)
def list_machine_applications(machine_id: int):
"""List all applications installed on a machine."""
machine = Machine.query.get(machine_id)
if not machine:
return error_response(ErrorCodes.NOT_FOUND, 'Machine not found', http_code=404)
installed = machine.installedapps.filter_by(isactive=True).all()
return success_response([i.to_dict() for i in installed])
@applications_bp.route('/machines/<int:machine_id>', methods=['POST'])
@jwt_required()
def install_application(machine_id: int):
"""Install an application on a machine."""
machine = Machine.query.get(machine_id)
if not machine:
return error_response(ErrorCodes.NOT_FOUND, 'Machine not found', http_code=404)
data = request.get_json()
if not data or not data.get('appid'):
return error_response(ErrorCodes.VALIDATION_ERROR, 'appid is required')
app = Application.query.get(data['appid'])
if not app:
return error_response(ErrorCodes.NOT_FOUND, 'Application not found', http_code=404)
# Check if already installed
existing = InstalledApp.query.filter_by(
machineid=machine_id,
appid=data['appid']
).first()
if existing:
if existing.isactive:
return error_response(
ErrorCodes.CONFLICT,
'Application already installed on this machine',
http_code=409
)
# Reactivate
existing.isactive = True
existing.appversionid = data.get('appversionid')
existing.installeddate = db.func.now()
db.session.commit()
return success_response(existing.to_dict(), message='Application reinstalled')
installed = InstalledApp(
machineid=machine_id,
appid=data['appid'],
appversionid=data.get('appversionid')
)
db.session.add(installed)
db.session.commit()
return success_response(installed.to_dict(), message='Application installed', http_code=201)
@applications_bp.route('/machines/<int:machine_id>/<int:app_id>', methods=['DELETE'])
@jwt_required()
def uninstall_application(machine_id: int, app_id: int):
"""Uninstall an application from a machine."""
installed = InstalledApp.query.filter_by(
machineid=machine_id,
appid=app_id,
isactive=True
).first()
if not installed:
return error_response(ErrorCodes.NOT_FOUND, 'Application not installed on this machine', http_code=404)
installed.isactive = False
db.session.commit()
return success_response(message='Application uninstalled')
@applications_bp.route('/machines/<int:machine_id>/<int:app_id>', methods=['PUT'])
@jwt_required()
def update_installed_app(machine_id: int, app_id: int):
"""Update installed application (e.g., change version)."""
installed = InstalledApp.query.filter_by(
machineid=machine_id,
appid=app_id,
isactive=True
).first()
if not installed:
return error_response(ErrorCodes.NOT_FOUND, 'Application not installed on this machine', http_code=404)
data = request.get_json()
if not data:
return error_response(ErrorCodes.VALIDATION_ERROR, 'No data provided')
if 'appversionid' in data:
installed.appversionid = data['appversionid']
db.session.commit()
return success_response(installed.to_dict(), message='Installation updated')
# ---- Support Teams ----
@applications_bp.route('/supportteams', methods=['GET'])
@jwt_required(optional=True)
def list_support_teams():
"""List all support teams."""
teams = SupportTeam.query.filter_by(isactive=True).order_by(SupportTeam.teamname).all()
data = []
for team in teams:
team_dict = team.to_dict()
team_dict['owner'] = team.owner.appowner if team.owner else None
data.append(team_dict)
return success_response(data)
@applications_bp.route('/supportteams', methods=['POST'])
@jwt_required()
def create_support_team():
"""Create a new support team."""
data = request.get_json()
if not data or not data.get('teamname'):
return error_response(ErrorCodes.VALIDATION_ERROR, 'teamname is required')
team = SupportTeam(
teamname=data['teamname'],
teamurl=data.get('teamurl'),
appownerid=data.get('appownerid')
)
db.session.add(team)
db.session.commit()
return success_response(team.to_dict(), message='Support team created', http_code=201)
# ---- App Owners ----
@applications_bp.route('/appowners', methods=['GET'])
@jwt_required(optional=True)
def list_app_owners():
"""List all application owners."""
owners = AppOwner.query.filter_by(isactive=True).order_by(AppOwner.appowner).all()
return success_response([o.to_dict() for o in owners])
@applications_bp.route('/appowners', methods=['POST'])
@jwt_required()
def create_app_owner():
"""Create a new application owner."""
data = request.get_json()
if not data or not data.get('appowner'):
return error_response(ErrorCodes.VALIDATION_ERROR, 'appowner is required')
owner = AppOwner(
appowner=data['appowner'],
sso=data.get('sso'),
email=data.get('email')
)
db.session.add(owner)
db.session.commit()
return success_response(owner.to_dict(), message='App owner created', http_code=201)

147
shopdb/core/api/auth.py Normal file
View File

@@ -0,0 +1,147 @@
"""Authentication API endpoints."""
from flask import Blueprint, request
from flask_jwt_extended import (
create_access_token,
create_refresh_token,
jwt_required,
get_jwt_identity,
current_user
)
from werkzeug.security import check_password_hash
from shopdb.extensions import db
from shopdb.core.models import User
from shopdb.utils.responses import success_response, error_response, ErrorCodes
auth_bp = Blueprint('auth', __name__)
@auth_bp.route('/login', methods=['POST'])
def login():
"""
Authenticate user and return JWT tokens.
Request:
{
"username": "string",
"password": "string"
}
Response:
{
"data": {
"access_token": "...",
"refresh_token": "...",
"user": {...}
}
}
"""
data = request.get_json()
if not data or not data.get('username') or not data.get('password'):
return error_response(
ErrorCodes.VALIDATION_ERROR,
'Username and password required'
)
user = User.query.filter_by(
username=data['username'],
isactive=True
).first()
if not user or not check_password_hash(user.passwordhash, data['password']):
return error_response(
ErrorCodes.UNAUTHORIZED,
'Invalid username or password',
http_code=401
)
if user.islocked:
return error_response(
ErrorCodes.FORBIDDEN,
'Account is locked',
http_code=403
)
# Create tokens (identity must be a string in Flask-JWT-Extended 4.x)
access_token = create_access_token(
identity=str(user.userid),
additional_claims={
'username': user.username,
'roles': [r.rolename for r in user.roles]
}
)
refresh_token = create_refresh_token(identity=str(user.userid))
# Update last login
user.lastlogindate = db.func.now()
user.failedlogins = 0
db.session.commit()
return success_response({
'access_token': access_token,
'refresh_token': refresh_token,
'token_type': 'Bearer',
'expires_in': 3600,
'user': {
'userid': user.userid,
'username': user.username,
'email': user.email,
'firstname': user.firstname,
'lastname': user.lastname,
'roles': [r.rolename for r in user.roles]
}
})
@auth_bp.route('/refresh', methods=['POST'])
@jwt_required(refresh=True)
def refresh():
"""Refresh access token using refresh token."""
user_id = get_jwt_identity()
user = User.query.get(int(user_id))
if not user or not user.isactive:
return error_response(
ErrorCodes.UNAUTHORIZED,
'User not found or inactive',
http_code=401
)
access_token = create_access_token(
identity=str(user.userid),
additional_claims={
'username': user.username,
'roles': [r.rolename for r in user.roles]
}
)
return success_response({
'access_token': access_token,
'token_type': 'Bearer',
'expires_in': 3600
})
@auth_bp.route('/me', methods=['GET'])
@jwt_required()
def get_current_user():
"""Get current authenticated user info."""
return success_response({
'userid': current_user.userid,
'username': current_user.username,
'email': current_user.email,
'firstname': current_user.firstname,
'lastname': current_user.lastname,
'roles': [r.rolename for r in current_user.roles],
'permissions': current_user.getpermissions()
})
@auth_bp.route('/logout', methods=['POST'])
@jwt_required()
def logout():
"""Logout user (for frontend token cleanup)."""
# In a full implementation, you'd blacklist the token
return success_response(message='Successfully logged out')

View File

@@ -0,0 +1,144 @@
"""Business Units API endpoints - Full CRUD."""
from flask import Blueprint, request
from flask_jwt_extended import jwt_required
from shopdb.extensions import db
from shopdb.core.models import BusinessUnit
from shopdb.utils.responses import (
success_response,
error_response,
paginated_response,
ErrorCodes
)
from shopdb.utils.pagination import get_pagination_params, paginate_query
businessunits_bp = Blueprint('businessunits', __name__)
@businessunits_bp.route('', methods=['GET'])
@jwt_required(optional=True)
def list_businessunits():
"""List all business units."""
page, per_page = get_pagination_params(request)
query = BusinessUnit.query
if request.args.get('active', 'true').lower() != 'false':
query = query.filter(BusinessUnit.isactive == True)
if search := request.args.get('search'):
query = query.filter(
db.or_(
BusinessUnit.businessunit.ilike(f'%{search}%'),
BusinessUnit.code.ilike(f'%{search}%')
)
)
query = query.order_by(BusinessUnit.businessunit)
items, total = paginate_query(query, page, per_page)
data = [bu.to_dict() for bu in items]
return paginated_response(data, page, per_page, total)
@businessunits_bp.route('/<int:bu_id>', methods=['GET'])
@jwt_required(optional=True)
def get_businessunit(bu_id: int):
"""Get a single business unit."""
bu = BusinessUnit.query.get(bu_id)
if not bu:
return error_response(
ErrorCodes.NOT_FOUND,
f'Business unit with ID {bu_id} not found',
http_code=404
)
data = bu.to_dict()
data['parent'] = bu.parent.to_dict() if bu.parent else None
data['children'] = [c.to_dict() for c in bu.children]
return success_response(data)
@businessunits_bp.route('', methods=['POST'])
@jwt_required()
def create_businessunit():
"""Create a new business unit."""
data = request.get_json()
if not data or not data.get('businessunit'):
return error_response(ErrorCodes.VALIDATION_ERROR, 'businessunit is required')
if BusinessUnit.query.filter_by(businessunit=data['businessunit']).first():
return error_response(
ErrorCodes.CONFLICT,
f"Business unit '{data['businessunit']}' already exists",
http_code=409
)
bu = BusinessUnit(
businessunit=data['businessunit'],
code=data.get('code'),
description=data.get('description'),
parentid=data.get('parentid')
)
db.session.add(bu)
db.session.commit()
return success_response(bu.to_dict(), message='Business unit created', http_code=201)
@businessunits_bp.route('/<int:bu_id>', methods=['PUT'])
@jwt_required()
def update_businessunit(bu_id: int):
"""Update a business unit."""
bu = BusinessUnit.query.get(bu_id)
if not bu:
return error_response(
ErrorCodes.NOT_FOUND,
f'Business unit with ID {bu_id} not found',
http_code=404
)
data = request.get_json()
if not data:
return error_response(ErrorCodes.VALIDATION_ERROR, 'No data provided')
if 'businessunit' in data and data['businessunit'] != bu.businessunit:
if BusinessUnit.query.filter_by(businessunit=data['businessunit']).first():
return error_response(
ErrorCodes.CONFLICT,
f"Business unit '{data['businessunit']}' already exists",
http_code=409
)
for key in ['businessunit', 'code', 'description', 'parentid', 'isactive']:
if key in data:
setattr(bu, key, data[key])
db.session.commit()
return success_response(bu.to_dict(), message='Business unit updated')
@businessunits_bp.route('/<int:bu_id>', methods=['DELETE'])
@jwt_required()
def delete_businessunit(bu_id: int):
"""Delete (deactivate) a business unit."""
bu = BusinessUnit.query.get(bu_id)
if not bu:
return error_response(
ErrorCodes.NOT_FOUND,
f'Business unit with ID {bu_id} not found',
http_code=404
)
bu.isactive = False
db.session.commit()
return success_response(message='Business unit deleted')

View File

@@ -0,0 +1,117 @@
"""Dashboard API endpoints."""
from flask import Blueprint
from flask_jwt_extended import jwt_required
from shopdb.extensions import db
from shopdb.core.models import Machine, MachineType, MachineStatus
from shopdb.utils.responses import success_response
dashboard_bp = Blueprint('dashboard', __name__)
@dashboard_bp.route('/summary', methods=['GET'])
@dashboard_bp.route('', methods=['GET'])
@jwt_required(optional=True)
def get_dashboard():
"""Get dashboard summary data."""
# Count machines by category
equipment_count = db.session.query(Machine).join(MachineType).filter(
Machine.isactive == True,
MachineType.category == 'Equipment'
).count()
pc_count = db.session.query(Machine).join(MachineType).filter(
Machine.isactive == True,
MachineType.category == 'PC'
).count()
network_count = db.session.query(Machine).join(MachineType).filter(
Machine.isactive == True,
MachineType.category == 'Network'
).count()
# Count by status
status_counts = db.session.query(
MachineStatus.status,
db.func.count(Machine.machineid)
).outerjoin(
Machine,
db.and_(Machine.statusid == MachineStatus.statusid, Machine.isactive == True)
).group_by(MachineStatus.status).all()
# Recent machines
recent_machines = Machine.query.filter_by(isactive=True).order_by(
Machine.createddate.desc()
).limit(10).all()
# Build status dict
status_dict = {status: count for status, count in status_counts}
return success_response({
# Fields expected by frontend
'totalmachines': equipment_count + pc_count + network_count,
'totalequipment': equipment_count,
'totalpc': pc_count,
'totalnetwork': network_count,
'activemachines': status_dict.get('In Use', 0),
'inrepair': status_dict.get('In Repair', 0),
# Also include structured data
'counts': {
'equipment': equipment_count,
'pcs': pc_count,
'network_devices': network_count,
'total': equipment_count + pc_count + network_count
},
'by_status': status_dict,
'recent': [
{
'machineid': m.machineid,
'machinenumber': m.machinenumber,
'machinetype': m.machinetype.machinetype if m.machinetype else None,
'createddate': m.createddate.isoformat() + 'Z' if m.createddate else None
}
for m in recent_machines
]
})
@dashboard_bp.route('/stats', methods=['GET'])
@jwt_required(optional=True)
def get_stats():
"""Get detailed statistics."""
# Machine type breakdown
type_counts = db.session.query(
MachineType.machinetype,
MachineType.category,
db.func.count(Machine.machineid)
).outerjoin(
Machine,
db.and_(Machine.machinetypeid == MachineType.machinetypeid, Machine.isactive == True)
).filter(MachineType.isactive == True).group_by(
MachineType.machinetypeid
).all()
return success_response({
'by_type': [
{'type': t, 'category': c, 'count': count}
for t, c, count in type_counts
]
})
@dashboard_bp.route('/health', methods=['GET'])
def health_check():
"""Health check endpoint (no auth required)."""
try:
# Test database connection
db.session.execute(db.text('SELECT 1'))
db_status = 'healthy'
except Exception as e:
db_status = f'unhealthy: {str(e)}'
return success_response({
'status': 'ok' if db_status == 'healthy' else 'degraded',
'database': db_status,
'version': '1.0.0'
})

View File

@@ -0,0 +1,207 @@
"""Knowledge Base API endpoints."""
from flask import Blueprint, request
from flask_jwt_extended import jwt_required
from shopdb.extensions import db
from shopdb.core.models import KnowledgeBase, Application
from shopdb.utils.responses import (
success_response,
error_response,
paginated_response,
ErrorCodes
)
from shopdb.utils.pagination import get_pagination_params, paginate_query
knowledgebase_bp = Blueprint('knowledgebase', __name__)
@knowledgebase_bp.route('', methods=['GET'])
@jwt_required(optional=True)
def list_articles():
"""List all knowledge base articles."""
page, per_page = get_pagination_params(request)
query = KnowledgeBase.query.filter_by(isactive=True)
# Search
if search := request.args.get('search'):
query = query.filter(
db.or_(
KnowledgeBase.shortdescription.ilike(f'%{search}%'),
KnowledgeBase.keywords.ilike(f'%{search}%')
)
)
# Filter by topic/application
if appid := request.args.get('appid'):
query = query.filter(KnowledgeBase.appid == int(appid))
# Sort options
sort = request.args.get('sort', 'clicks')
order = request.args.get('order', 'desc')
if sort == 'clicks':
query = query.order_by(
KnowledgeBase.clicks.desc() if order == 'desc' else KnowledgeBase.clicks.asc(),
KnowledgeBase.lastupdated.desc()
)
elif sort == 'topic':
query = query.join(Application).order_by(
Application.appname.desc() if order == 'desc' else Application.appname.asc()
)
elif sort == 'description':
query = query.order_by(
KnowledgeBase.shortdescription.desc() if order == 'desc' else KnowledgeBase.shortdescription.asc()
)
elif sort == 'lastupdated':
query = query.order_by(
KnowledgeBase.lastupdated.desc() if order == 'desc' else KnowledgeBase.lastupdated.asc()
)
else:
query = query.order_by(KnowledgeBase.clicks.desc())
items, total = paginate_query(query, page, per_page)
data = []
for article in items:
article_dict = article.to_dict()
if article.application:
article_dict['application'] = {
'appid': article.application.appid,
'appname': article.application.appname
}
else:
article_dict['application'] = None
data.append(article_dict)
return paginated_response(data, page, per_page, total)
@knowledgebase_bp.route('/stats', methods=['GET'])
@jwt_required(optional=True)
def get_stats():
"""Get knowledge base statistics."""
total_clicks = db.session.query(
db.func.coalesce(db.func.sum(KnowledgeBase.clicks), 0)
).filter(KnowledgeBase.isactive == True).scalar()
total_articles = KnowledgeBase.query.filter_by(isactive=True).count()
return success_response({
'totalclicks': int(total_clicks),
'totalarticles': total_articles
})
@knowledgebase_bp.route('/<int:link_id>', methods=['GET'])
@jwt_required(optional=True)
def get_article(link_id: int):
"""Get a single knowledge base article."""
article = KnowledgeBase.query.get(link_id)
if not article or not article.isactive:
return error_response(ErrorCodes.NOT_FOUND, 'Article not found', http_code=404)
data = article.to_dict()
if article.application:
data['application'] = {
'appid': article.application.appid,
'appname': article.application.appname
}
else:
data['application'] = None
return success_response(data)
@knowledgebase_bp.route('/<int:link_id>/click', methods=['POST'])
@jwt_required(optional=True)
def track_click(link_id: int):
"""Increment click counter and return the URL to redirect to."""
article = KnowledgeBase.query.get(link_id)
if not article or not article.isactive:
return error_response(ErrorCodes.NOT_FOUND, 'Article not found', http_code=404)
article.increment_clicks()
db.session.commit()
return success_response({
'linkurl': article.linkurl,
'clicks': article.clicks
})
@knowledgebase_bp.route('', methods=['POST'])
@jwt_required()
def create_article():
"""Create a new knowledge base article."""
data = request.get_json()
if not data or not data.get('shortdescription'):
return error_response(ErrorCodes.VALIDATION_ERROR, 'shortdescription is required')
if not data.get('linkurl'):
return error_response(ErrorCodes.VALIDATION_ERROR, 'linkurl is required')
# Validate application if provided
if data.get('appid'):
app = Application.query.get(data['appid'])
if not app:
return error_response(ErrorCodes.NOT_FOUND, 'Application not found', http_code=404)
article = KnowledgeBase(
shortdescription=data['shortdescription'],
linkurl=data['linkurl'],
appid=data.get('appid'),
keywords=data.get('keywords'),
clicks=0
)
db.session.add(article)
db.session.commit()
return success_response(article.to_dict(), message='Article created', http_code=201)
@knowledgebase_bp.route('/<int:link_id>', methods=['PUT'])
@jwt_required()
def update_article(link_id: int):
"""Update a knowledge base article."""
article = KnowledgeBase.query.get(link_id)
if not article:
return error_response(ErrorCodes.NOT_FOUND, 'Article not found', http_code=404)
data = request.get_json()
if not data:
return error_response(ErrorCodes.VALIDATION_ERROR, 'No data provided')
# Validate application if being changed
if 'appid' in data and data['appid']:
app = Application.query.get(data['appid'])
if not app:
return error_response(ErrorCodes.NOT_FOUND, 'Application not found', http_code=404)
fields = ['shortdescription', 'linkurl', 'appid', 'keywords', 'isactive']
for key in fields:
if key in data:
setattr(article, key, data[key])
db.session.commit()
return success_response(article.to_dict(), message='Article updated')
@knowledgebase_bp.route('/<int:link_id>', methods=['DELETE'])
@jwt_required()
def delete_article(link_id: int):
"""Delete (deactivate) a knowledge base article."""
article = KnowledgeBase.query.get(link_id)
if not article:
return error_response(ErrorCodes.NOT_FOUND, 'Article not found', http_code=404)
article.isactive = False
db.session.commit()
return success_response(message='Article deleted')

View File

@@ -0,0 +1,144 @@
"""Locations API endpoints - Full CRUD."""
from flask import Blueprint, request
from flask_jwt_extended import jwt_required
from shopdb.extensions import db
from shopdb.core.models import Location
from shopdb.utils.responses import (
success_response,
error_response,
paginated_response,
ErrorCodes
)
from shopdb.utils.pagination import get_pagination_params, paginate_query
locations_bp = Blueprint('locations', __name__)
@locations_bp.route('', methods=['GET'])
@jwt_required()
def list_locations():
"""List all locations."""
page, per_page = get_pagination_params(request)
query = Location.query
if request.args.get('active', 'true').lower() != 'false':
query = query.filter(Location.isactive == True)
if search := request.args.get('search'):
query = query.filter(
db.or_(
Location.locationname.ilike(f'%{search}%'),
Location.building.ilike(f'%{search}%')
)
)
query = query.order_by(Location.locationname)
items, total = paginate_query(query, page, per_page)
data = [loc.to_dict() for loc in items]
return paginated_response(data, page, per_page, total)
@locations_bp.route('/<int:location_id>', methods=['GET'])
@jwt_required()
def get_location(location_id: int):
"""Get a single location."""
loc = Location.query.get(location_id)
if not loc:
return error_response(
ErrorCodes.NOT_FOUND,
f'Location with ID {location_id} not found',
http_code=404
)
return success_response(loc.to_dict())
@locations_bp.route('', methods=['POST'])
@jwt_required()
def create_location():
"""Create a new location."""
data = request.get_json()
if not data or not data.get('locationname'):
return error_response(ErrorCodes.VALIDATION_ERROR, 'locationname is required')
if Location.query.filter_by(locationname=data['locationname']).first():
return error_response(
ErrorCodes.CONFLICT,
f"Location '{data['locationname']}' already exists",
http_code=409
)
loc = Location(
locationname=data['locationname'],
building=data.get('building'),
floor=data.get('floor'),
room=data.get('room'),
description=data.get('description'),
mapimage=data.get('mapimage'),
mapwidth=data.get('mapwidth'),
mapheight=data.get('mapheight')
)
db.session.add(loc)
db.session.commit()
return success_response(loc.to_dict(), message='Location created', http_code=201)
@locations_bp.route('/<int:location_id>', methods=['PUT'])
@jwt_required()
def update_location(location_id: int):
"""Update a location."""
loc = Location.query.get(location_id)
if not loc:
return error_response(
ErrorCodes.NOT_FOUND,
f'Location with ID {location_id} not found',
http_code=404
)
data = request.get_json()
if not data:
return error_response(ErrorCodes.VALIDATION_ERROR, 'No data provided')
if 'locationname' in data and data['locationname'] != loc.locationname:
if Location.query.filter_by(locationname=data['locationname']).first():
return error_response(
ErrorCodes.CONFLICT,
f"Location '{data['locationname']}' already exists",
http_code=409
)
for key in ['locationname', 'building', 'floor', 'room', 'description', 'mapimage', 'mapwidth', 'mapheight', 'isactive']:
if key in data:
setattr(loc, key, data[key])
db.session.commit()
return success_response(loc.to_dict(), message='Location updated')
@locations_bp.route('/<int:location_id>', methods=['DELETE'])
@jwt_required()
def delete_location(location_id: int):
"""Delete (deactivate) a location."""
loc = Location.query.get(location_id)
if not loc:
return error_response(
ErrorCodes.NOT_FOUND,
f'Location with ID {location_id} not found',
http_code=404
)
loc.isactive = False
db.session.commit()
return success_response(message='Location deleted')

567
shopdb/core/api/machines.py Normal file
View File

@@ -0,0 +1,567 @@
"""Machines API endpoints."""
from flask import Blueprint, request
from flask_jwt_extended import jwt_required, current_user
from shopdb.extensions import db
from shopdb.core.models import Machine, MachineType
from shopdb.core.models.relationship import MachineRelationship, RelationshipType
from shopdb.utils.responses import (
success_response,
error_response,
paginated_response,
ErrorCodes
)
from shopdb.utils.pagination import get_pagination_params, paginate_query
machines_bp = Blueprint('machines', __name__)
@machines_bp.route('', methods=['GET'])
@jwt_required(optional=True)
def list_machines():
"""
List all machines with filtering and pagination.
Query params:
page: int (default 1)
per_page: int (default 20, max 100)
machinetype: int (filter by type ID)
pctype: int (filter by PC type ID)
businessunit: int (filter by business unit ID)
status: int (filter by status ID)
category: str (Equipment, PC, Network)
search: str (search in machinenumber, alias, hostname)
active: bool (default true)
sort: str (field name, prefix with - for desc)
"""
page, per_page = get_pagination_params(request)
# Build query
query = Machine.query
# Apply filters
if request.args.get('active', 'true').lower() != 'false':
query = query.filter(Machine.isactive == True)
if machinetype_id := request.args.get('machinetype', type=int):
query = query.filter(Machine.machinetypeid == machinetype_id)
if pctype_id := request.args.get('pctype', type=int):
query = query.filter(Machine.pctypeid == pctype_id)
if businessunit_id := request.args.get('businessunit', type=int):
query = query.filter(Machine.businessunitid == businessunit_id)
if status_id := request.args.get('status', type=int):
query = query.filter(Machine.statusid == status_id)
if category := request.args.get('category'):
query = query.join(MachineType).filter(MachineType.category == category)
if search := request.args.get('search'):
search_term = f'%{search}%'
query = query.filter(
db.or_(
Machine.machinenumber.ilike(search_term),
Machine.alias.ilike(search_term),
Machine.hostname.ilike(search_term),
Machine.serialnumber.ilike(search_term)
)
)
# Filter for machines with map positions
if request.args.get('hasmap', '').lower() == 'true':
query = query.filter(
Machine.mapleft.isnot(None),
Machine.maptop.isnot(None)
)
# Apply sorting
sort_field = request.args.get('sort', 'machinenumber')
desc = sort_field.startswith('-')
if desc:
sort_field = sort_field[1:]
if hasattr(Machine, sort_field):
order = getattr(Machine, sort_field)
query = query.order_by(order.desc() if desc else order)
# For map view, allow fetching all machines without pagination limit
include_map_extras = request.args.get('hasmap', '').lower() == 'true'
fetch_all = request.args.get('all', '').lower() == 'true'
if include_map_extras and fetch_all:
# Get all map machines without pagination
items = query.all()
total = len(items)
else:
# Normal pagination
items, total = paginate_query(query, page, per_page)
# Convert to dicts with relationships
data = []
for m in items:
d = m.to_dict()
# Get machinetype from model (single source of truth)
mt = m.derived_machinetype
d['machinetype'] = mt.machinetype if mt else None
d['machinetypeid'] = mt.machinetypeid if mt else None
d['category'] = mt.category if mt else None
d['status'] = m.status.status if m.status else None
d['statusid'] = m.statusid
d['businessunit'] = m.businessunit.businessunit if m.businessunit else None
d['businessunitid'] = m.businessunitid
d['vendor'] = m.vendor.vendor if m.vendor else None
d['model'] = m.model.modelnumber if m.model else None
d['pctype'] = m.pctype.pctype if m.pctype else None
d['serialnumber'] = m.serialnumber
d['isvnc'] = m.isvnc
d['iswinrm'] = m.iswinrm
# Include extra fields for map view
if include_map_extras:
# Get primary IP address from communications
primary_comm = next(
(c for c in m.communications if c.isprimary and c.ipaddress),
None
)
if not primary_comm:
# Fall back to first communication with IP
primary_comm = next(
(c for c in m.communications if c.ipaddress),
None
)
d['ipaddress'] = primary_comm.ipaddress if primary_comm else None
# Get connected PC (parent machine that is a PC)
connected_pc = None
for rel in m.parent_relationships:
if rel.parent_machine and rel.parent_machine.is_pc:
connected_pc = rel.parent_machine.machinenumber
break
d['connected_pc'] = connected_pc
data.append(d)
return paginated_response(data, page, per_page, total)
@machines_bp.route('/<int:machine_id>', methods=['GET'])
@jwt_required(optional=True)
def get_machine(machine_id: int):
"""Get a single machine by ID."""
machine = Machine.query.get(machine_id)
if not machine:
return error_response(
ErrorCodes.NOT_FOUND,
f'Machine with ID {machine_id} not found',
http_code=404
)
data = machine.to_dict()
# Add related data - machinetype comes from model (single source of truth)
mt = machine.derived_machinetype
data['machinetype'] = mt.to_dict() if mt else None
data['pctype'] = machine.pctype.to_dict() if machine.pctype else None
data['status'] = machine.status.to_dict() if machine.status else None
data['businessunit'] = machine.businessunit.to_dict() if machine.businessunit else None
data['vendor'] = machine.vendor.to_dict() if machine.vendor else None
data['model'] = machine.model.to_dict() if machine.model else None
data['location'] = machine.location.to_dict() if machine.location else None
data['operatingsystem'] = machine.operatingsystem.to_dict() if machine.operatingsystem else None
# Add communications
data['communications'] = [c.to_dict() for c in machine.communications.all()]
return success_response(data)
@machines_bp.route('', methods=['POST'])
@jwt_required()
def create_machine():
"""Create a new machine."""
data = request.get_json()
if not data:
return error_response(ErrorCodes.VALIDATION_ERROR, 'No data provided')
if not data.get('machinenumber'):
return error_response(ErrorCodes.VALIDATION_ERROR, 'machinenumber is required')
if not data.get('modelnumberid'):
return error_response(ErrorCodes.VALIDATION_ERROR, 'modelnumberid is required (determines machine type)')
# Check for duplicate machinenumber
if Machine.query.filter_by(machinenumber=data['machinenumber']).first():
return error_response(
ErrorCodes.CONFLICT,
f"Machine number '{data['machinenumber']}' already exists",
http_code=409
)
# Create machine
allowed_fields = [
'machinenumber', 'alias', 'hostname', 'serialnumber',
'machinetypeid', 'pctypeid', 'businessunitid', 'modelnumberid',
'vendorid', 'statusid', 'locationid', 'osid',
'mapleft', 'maptop', 'islocationonly',
'loggedinuser', 'isvnc', 'iswinrm', 'isshopfloor',
'requiresmanualconfig', 'notes'
]
machine_data = {k: v for k, v in data.items() if k in allowed_fields}
machine = Machine(**machine_data)
machine.createdby = current_user.username
db.session.add(machine)
db.session.commit()
return success_response(
machine.to_dict(),
message='Machine created successfully',
http_code=201
)
@machines_bp.route('/<int:machine_id>', methods=['PUT'])
@jwt_required()
def update_machine(machine_id: int):
"""Update an existing machine."""
machine = Machine.query.get(machine_id)
if not machine:
return error_response(
ErrorCodes.NOT_FOUND,
f'Machine with ID {machine_id} not found',
http_code=404
)
data = request.get_json()
if not data:
return error_response(ErrorCodes.VALIDATION_ERROR, 'No data provided')
# Check for duplicate machinenumber if changed
if 'machinenumber' in data and data['machinenumber'] != machine.machinenumber:
existing = Machine.query.filter_by(machinenumber=data['machinenumber']).first()
if existing:
return error_response(
ErrorCodes.CONFLICT,
f"Machine number '{data['machinenumber']}' already exists",
http_code=409
)
# Update allowed fields
allowed_fields = [
'machinenumber', 'alias', 'hostname', 'serialnumber',
'machinetypeid', 'pctypeid', 'businessunitid', 'modelnumberid',
'vendorid', 'statusid', 'locationid', 'osid',
'mapleft', 'maptop', 'islocationonly',
'loggedinuser', 'isvnc', 'iswinrm', 'isshopfloor',
'requiresmanualconfig', 'notes', 'isactive'
]
for key, value in data.items():
if key in allowed_fields:
setattr(machine, key, value)
machine.modifiedby = current_user.username
db.session.commit()
return success_response(machine.to_dict(), message='Machine updated successfully')
@machines_bp.route('/<int:machine_id>', methods=['DELETE'])
@jwt_required()
def delete_machine(machine_id: int):
"""Soft delete a machine."""
machine = Machine.query.get(machine_id)
if not machine:
return error_response(
ErrorCodes.NOT_FOUND,
f'Machine with ID {machine_id} not found',
http_code=404
)
machine.soft_delete(deleted_by=current_user.username)
db.session.commit()
return success_response(message='Machine deleted successfully')
@machines_bp.route('/<int:machine_id>/communications', methods=['GET'])
@jwt_required()
def get_machine_communications(machine_id: int):
"""Get all communications for a machine."""
machine = Machine.query.get(machine_id)
if not machine:
return error_response(
ErrorCodes.NOT_FOUND,
f'Machine with ID {machine_id} not found',
http_code=404
)
comms = [c.to_dict() for c in machine.communications.all()]
return success_response(comms)
@machines_bp.route('/<int:machine_id>/communication', methods=['PUT'])
@jwt_required()
def update_machine_communication(machine_id: int):
"""Update machine communication (IP address)."""
from shopdb.core.models.communication import Communication, CommunicationType
machine = Machine.query.get(machine_id)
if not machine:
return error_response(
ErrorCodes.NOT_FOUND,
f'Machine with ID {machine_id} not found',
http_code=404
)
data = request.get_json()
if not data:
return error_response(ErrorCodes.VALIDATION_ERROR, 'No data provided')
# Get or create IP communication type
ip_comtype = CommunicationType.query.filter_by(comtype='IP').first()
if not ip_comtype:
ip_comtype = CommunicationType(comtype='IP', description='IP Network')
db.session.add(ip_comtype)
db.session.flush()
# Find existing primary communication or create new one
comms = list(machine.communications.all())
comm = next((c for c in comms if c.isprimary), None)
if not comm:
comm = next((c for c in comms if c.comtypeid == ip_comtype.comtypeid), None)
if not comm:
comm = Communication(machineid=machine_id, comtypeid=ip_comtype.comtypeid)
db.session.add(comm)
# Update fields
if 'ipaddress' in data:
comm.ipaddress = data['ipaddress']
if 'isprimary' in data:
comm.isprimary = data['isprimary']
if 'macaddress' in data:
comm.macaddress = data['macaddress']
db.session.commit()
return success_response({
'communicationid': comm.communicationid,
'ipaddress': comm.ipaddress,
'isprimary': comm.isprimary,
}, message='Communication updated')
# ==================== Machine Relationships ====================
@machines_bp.route('/<int:machine_id>/relationships', methods=['GET'])
@jwt_required(optional=True)
def get_machine_relationships(machine_id: int):
"""Get all relationships for a machine (both parent and child)."""
machine = Machine.query.get(machine_id)
if not machine:
return error_response(
ErrorCodes.NOT_FOUND,
f'Machine with ID {machine_id} not found',
http_code=404
)
relationships = []
my_category = machine.machinetype.category if machine.machinetype else None
seen_ids = set()
# Get all relationships involving this machine
all_rels = list(machine.child_relationships) + list(machine.parent_relationships)
for rel in all_rels:
if rel.relationshipid in seen_ids:
continue
seen_ids.add(rel.relationshipid)
# Determine the related machine (the one that isn't us)
if rel.parentmachineid == machine.machineid:
related = rel.child_machine
else:
related = rel.parent_machine
related_category = related.machinetype.category if related and related.machinetype else None
rel_type = rel.relationship_type.relationshiptype if rel.relationship_type else None
# Determine direction based on relationship type and categories
if rel_type == 'Controls':
# PC controls Equipment - determine from categories
if my_category == 'PC':
direction = 'controls'
else:
direction = 'controlled_by'
elif rel_type == 'Dualpath':
direction = 'dualpath_partner'
else:
# For other types, use parent/child
if rel.parentmachineid == machine.machineid:
direction = 'controls'
else:
direction = 'controlled_by'
relationships.append({
'relationshipid': rel.relationshipid,
'direction': direction,
'relatedmachineid': related.machineid if related else None,
'relatedmachinenumber': related.machinenumber if related else None,
'relatedmachinealias': related.alias if related else None,
'relatedcategory': related_category,
'relationshiptype': rel_type,
'relationshiptypeid': rel.relationshiptypeid,
'notes': rel.notes
})
return success_response(relationships)
@machines_bp.route('/<int:machine_id>/relationships', methods=['POST'])
@jwt_required()
def create_machine_relationship(machine_id: int):
"""Create a relationship for a machine."""
machine = Machine.query.get(machine_id)
if not machine:
return error_response(
ErrorCodes.NOT_FOUND,
f'Machine with ID {machine_id} not found',
http_code=404
)
data = request.get_json()
if not data:
return error_response(ErrorCodes.VALIDATION_ERROR, 'No data provided')
related_machine_id = data.get('relatedmachineid')
relationship_type_id = data.get('relationshiptypeid')
direction = data.get('direction', 'controlled_by') # 'controls' or 'controlled_by'
if not related_machine_id:
return error_response(ErrorCodes.VALIDATION_ERROR, 'relatedmachineid is required')
if not relationship_type_id:
return error_response(ErrorCodes.VALIDATION_ERROR, 'relationshiptypeid is required')
related_machine = Machine.query.get(related_machine_id)
if not related_machine:
return error_response(
ErrorCodes.NOT_FOUND,
f'Related machine with ID {related_machine_id} not found',
http_code=404
)
# Determine parent/child based on direction
if direction == 'controls':
parent_id = machine_id
child_id = related_machine_id
else: # controlled_by
parent_id = related_machine_id
child_id = machine_id
# Check if relationship already exists
existing = MachineRelationship.query.filter_by(
parentmachineid=parent_id,
childmachineid=child_id,
relationshiptypeid=relationship_type_id
).first()
if existing:
return error_response(
ErrorCodes.CONFLICT,
'This relationship already exists',
http_code=409
)
relationship = MachineRelationship(
parentmachineid=parent_id,
childmachineid=child_id,
relationshiptypeid=relationship_type_id,
notes=data.get('notes')
)
db.session.add(relationship)
db.session.commit()
return success_response({
'relationshipid': relationship.relationshipid,
'parentmachineid': relationship.parentmachineid,
'childmachineid': relationship.childmachineid,
'relationshiptypeid': relationship.relationshiptypeid
}, message='Relationship created successfully', http_code=201)
@machines_bp.route('/relationships/<int:relationship_id>', methods=['DELETE'])
@jwt_required()
def delete_machine_relationship(relationship_id: int):
"""Delete a machine relationship."""
relationship = MachineRelationship.query.get(relationship_id)
if not relationship:
return error_response(
ErrorCodes.NOT_FOUND,
f'Relationship with ID {relationship_id} not found',
http_code=404
)
db.session.delete(relationship)
db.session.commit()
return success_response(message='Relationship deleted successfully')
@machines_bp.route('/relationshiptypes', methods=['GET'])
@jwt_required(optional=True)
def list_relationship_types():
"""List all relationship types."""
types = RelationshipType.query.order_by(RelationshipType.relationshiptype).all()
return success_response([{
'relationshiptypeid': t.relationshiptypeid,
'relationshiptype': t.relationshiptype,
'description': t.description
} for t in types])
@machines_bp.route('/relationshiptypes', methods=['POST'])
@jwt_required()
def create_relationship_type():
"""Create a new relationship type."""
data = request.get_json()
if not data:
return error_response(ErrorCodes.VALIDATION_ERROR, 'No data provided')
if not data.get('relationshiptype'):
return error_response(ErrorCodes.VALIDATION_ERROR, 'relationshiptype is required')
existing = RelationshipType.query.filter_by(relationshiptype=data['relationshiptype']).first()
if existing:
return error_response(
ErrorCodes.CONFLICT,
f"Relationship type '{data['relationshiptype']}' already exists",
http_code=409
)
rel_type = RelationshipType(
relationshiptype=data['relationshiptype'],
description=data.get('description')
)
db.session.add(rel_type)
db.session.commit()
return success_response({
'relationshiptypeid': rel_type.relationshiptypeid,
'relationshiptype': rel_type.relationshiptype,
'description': rel_type.description
}, message='Relationship type created successfully', http_code=201)

View File

@@ -0,0 +1,148 @@
"""Machine Types API endpoints - Full CRUD."""
from flask import Blueprint, request
from flask_jwt_extended import jwt_required, current_user
from shopdb.extensions import db
from shopdb.core.models import MachineType
from shopdb.utils.responses import (
success_response,
error_response,
paginated_response,
ErrorCodes
)
from shopdb.utils.pagination import get_pagination_params, paginate_query
machinetypes_bp = Blueprint('machinetypes', __name__)
@machinetypes_bp.route('', methods=['GET'])
@jwt_required(optional=True)
def list_machinetypes():
"""List all machine types with optional filtering."""
page, per_page = get_pagination_params(request)
query = MachineType.query
if request.args.get('active', 'true').lower() != 'false':
query = query.filter(MachineType.isactive == True)
if category := request.args.get('category'):
query = query.filter(MachineType.category == category)
if search := request.args.get('search'):
query = query.filter(MachineType.machinetype.ilike(f'%{search}%'))
query = query.order_by(MachineType.machinetype)
items, total = paginate_query(query, page, per_page)
data = [mt.to_dict() for mt in items]
return paginated_response(data, page, per_page, total)
@machinetypes_bp.route('/<int:type_id>', methods=['GET'])
@jwt_required(optional=True)
def get_machinetype(type_id: int):
"""Get a single machine type."""
mt = MachineType.query.get(type_id)
if not mt:
return error_response(
ErrorCodes.NOT_FOUND,
f'Machine type with ID {type_id} not found',
http_code=404
)
return success_response(mt.to_dict())
@machinetypes_bp.route('', methods=['POST'])
@jwt_required()
def create_machinetype():
"""Create a new machine type."""
data = request.get_json()
if not data or not data.get('machinetype'):
return error_response(ErrorCodes.VALIDATION_ERROR, 'machinetype is required')
if MachineType.query.filter_by(machinetype=data['machinetype']).first():
return error_response(
ErrorCodes.CONFLICT,
f"Machine type '{data['machinetype']}' already exists",
http_code=409
)
mt = MachineType(
machinetype=data['machinetype'],
category=data.get('category', 'Equipment'),
description=data.get('description'),
icon=data.get('icon')
)
db.session.add(mt)
db.session.commit()
return success_response(mt.to_dict(), message='Machine type created', http_code=201)
@machinetypes_bp.route('/<int:type_id>', methods=['PUT'])
@jwt_required()
def update_machinetype(type_id: int):
"""Update a machine type."""
mt = MachineType.query.get(type_id)
if not mt:
return error_response(
ErrorCodes.NOT_FOUND,
f'Machine type with ID {type_id} not found',
http_code=404
)
data = request.get_json()
if not data:
return error_response(ErrorCodes.VALIDATION_ERROR, 'No data provided')
# Check duplicate name
if 'machinetype' in data and data['machinetype'] != mt.machinetype:
if MachineType.query.filter_by(machinetype=data['machinetype']).first():
return error_response(
ErrorCodes.CONFLICT,
f"Machine type '{data['machinetype']}' already exists",
http_code=409
)
for key in ['machinetype', 'category', 'description', 'icon', 'isactive']:
if key in data:
setattr(mt, key, data[key])
db.session.commit()
return success_response(mt.to_dict(), message='Machine type updated')
@machinetypes_bp.route('/<int:type_id>', methods=['DELETE'])
@jwt_required()
def delete_machinetype(type_id: int):
"""Delete (deactivate) a machine type."""
mt = MachineType.query.get(type_id)
if not mt:
return error_response(
ErrorCodes.NOT_FOUND,
f'Machine type with ID {type_id} not found',
http_code=404
)
# Check if any machines use this type
from shopdb.core.models import Machine
if Machine.query.filter_by(machinetypeid=type_id, isactive=True).first():
return error_response(
ErrorCodes.CONFLICT,
'Cannot delete machine type: machines are using it',
http_code=409
)
mt.isactive = False
db.session.commit()
return success_response(message='Machine type deleted')

151
shopdb/core/api/models.py Normal file
View File

@@ -0,0 +1,151 @@
"""Models (equipment models) API endpoints - Full CRUD."""
from flask import Blueprint, request
from flask_jwt_extended import jwt_required
from shopdb.extensions import db
from shopdb.core.models import Model
from shopdb.utils.responses import (
success_response,
error_response,
paginated_response,
ErrorCodes
)
from shopdb.utils.pagination import get_pagination_params, paginate_query
models_bp = Blueprint('models', __name__)
@models_bp.route('', methods=['GET'])
@jwt_required(optional=True)
def list_models():
"""List all equipment models."""
page, per_page = get_pagination_params(request)
query = Model.query
if request.args.get('active', 'true').lower() != 'false':
query = query.filter(Model.isactive == True)
if vendor_id := request.args.get('vendor', type=int):
query = query.filter(Model.vendorid == vendor_id)
if machinetype_id := request.args.get('machinetype', type=int):
query = query.filter(Model.machinetypeid == machinetype_id)
if search := request.args.get('search'):
query = query.filter(Model.modelnumber.ilike(f'%{search}%'))
query = query.order_by(Model.modelnumber)
items, total = paginate_query(query, page, per_page)
data = []
for m in items:
d = m.to_dict()
d['vendor'] = m.vendor.vendor if m.vendor else None
d['machinetype'] = m.machinetype.machinetype if m.machinetype else None
data.append(d)
return paginated_response(data, page, per_page, total)
@models_bp.route('/<int:model_id>', methods=['GET'])
@jwt_required()
def get_model(model_id: int):
"""Get a single model."""
m = Model.query.get(model_id)
if not m:
return error_response(
ErrorCodes.NOT_FOUND,
f'Model with ID {model_id} not found',
http_code=404
)
data = m.to_dict()
data['vendor'] = m.vendor.to_dict() if m.vendor else None
data['machinetype'] = m.machinetype.to_dict() if m.machinetype else None
return success_response(data)
@models_bp.route('', methods=['POST'])
@jwt_required()
def create_model():
"""Create a new model."""
data = request.get_json()
if not data or not data.get('modelnumber'):
return error_response(ErrorCodes.VALIDATION_ERROR, 'modelnumber is required')
# Check duplicate
existing = Model.query.filter_by(
modelnumber=data['modelnumber'],
vendorid=data.get('vendorid')
).first()
if existing:
return error_response(
ErrorCodes.CONFLICT,
f"Model '{data['modelnumber']}' already exists for this vendor",
http_code=409
)
m = Model(
modelnumber=data['modelnumber'],
vendorid=data.get('vendorid'),
machinetypeid=data.get('machinetypeid'),
description=data.get('description'),
imageurl=data.get('imageurl'),
documentationurl=data.get('documentationurl'),
notes=data.get('notes')
)
db.session.add(m)
db.session.commit()
return success_response(m.to_dict(), message='Model created', http_code=201)
@models_bp.route('/<int:model_id>', methods=['PUT'])
@jwt_required()
def update_model(model_id: int):
"""Update a model."""
m = Model.query.get(model_id)
if not m:
return error_response(
ErrorCodes.NOT_FOUND,
f'Model with ID {model_id} not found',
http_code=404
)
data = request.get_json()
if not data:
return error_response(ErrorCodes.VALIDATION_ERROR, 'No data provided')
for key in ['modelnumber', 'vendorid', 'machinetypeid', 'description', 'imageurl', 'documentationurl', 'notes', 'isactive']:
if key in data:
setattr(m, key, data[key])
db.session.commit()
return success_response(m.to_dict(), message='Model updated')
@models_bp.route('/<int:model_id>', methods=['DELETE'])
@jwt_required()
def delete_model(model_id: int):
"""Delete (deactivate) a model."""
m = Model.query.get(model_id)
if not m:
return error_response(
ErrorCodes.NOT_FOUND,
f'Model with ID {model_id} not found',
http_code=404
)
m.isactive = False
db.session.commit()
return success_response(message='Model deleted')

View File

@@ -0,0 +1,131 @@
"""Operating Systems API endpoints - Full CRUD."""
from flask import Blueprint, request
from flask_jwt_extended import jwt_required
from shopdb.extensions import db
from shopdb.core.models import OperatingSystem
from shopdb.utils.responses import (
success_response,
error_response,
paginated_response,
ErrorCodes
)
from shopdb.utils.pagination import get_pagination_params, paginate_query
operatingsystems_bp = Blueprint('operatingsystems', __name__)
@operatingsystems_bp.route('', methods=['GET'])
@jwt_required()
def list_operatingsystems():
"""List all operating systems."""
page, per_page = get_pagination_params(request)
query = OperatingSystem.query
if request.args.get('active', 'true').lower() != 'false':
query = query.filter(OperatingSystem.isactive == True)
if search := request.args.get('search'):
query = query.filter(OperatingSystem.osname.ilike(f'%{search}%'))
query = query.order_by(OperatingSystem.osname)
items, total = paginate_query(query, page, per_page)
data = [os.to_dict() for os in items]
return paginated_response(data, page, per_page, total)
@operatingsystems_bp.route('/<int:os_id>', methods=['GET'])
@jwt_required()
def get_operatingsystem(os_id: int):
"""Get a single operating system."""
os = OperatingSystem.query.get(os_id)
if not os:
return error_response(
ErrorCodes.NOT_FOUND,
f'Operating system with ID {os_id} not found',
http_code=404
)
return success_response(os.to_dict())
@operatingsystems_bp.route('', methods=['POST'])
@jwt_required()
def create_operatingsystem():
"""Create a new operating system."""
data = request.get_json()
if not data or not data.get('osname'):
return error_response(ErrorCodes.VALIDATION_ERROR, 'osname is required')
existing = OperatingSystem.query.filter_by(
osname=data['osname'],
osversion=data.get('osversion')
).first()
if existing:
return error_response(
ErrorCodes.CONFLICT,
f"Operating system '{data['osname']} {data.get('osversion', '')}' already exists",
http_code=409
)
os = OperatingSystem(
osname=data['osname'],
osversion=data.get('osversion'),
architecture=data.get('architecture'),
endoflife=data.get('endoflife')
)
db.session.add(os)
db.session.commit()
return success_response(os.to_dict(), message='Operating system created', http_code=201)
@operatingsystems_bp.route('/<int:os_id>', methods=['PUT'])
@jwt_required()
def update_operatingsystem(os_id: int):
"""Update an operating system."""
os = OperatingSystem.query.get(os_id)
if not os:
return error_response(
ErrorCodes.NOT_FOUND,
f'Operating system with ID {os_id} not found',
http_code=404
)
data = request.get_json()
if not data:
return error_response(ErrorCodes.VALIDATION_ERROR, 'No data provided')
for key in ['osname', 'osversion', 'architecture', 'endoflife', 'isactive']:
if key in data:
setattr(os, key, data[key])
db.session.commit()
return success_response(os.to_dict(), message='Operating system updated')
@operatingsystems_bp.route('/<int:os_id>', methods=['DELETE'])
@jwt_required()
def delete_operatingsystem(os_id: int):
"""Delete (deactivate) an operating system."""
os = OperatingSystem.query.get(os_id)
if not os:
return error_response(
ErrorCodes.NOT_FOUND,
f'Operating system with ID {os_id} not found',
http_code=404
)
os.isactive = False
db.session.commit()
return success_response(message='Operating system deleted')

141
shopdb/core/api/pctypes.py Normal file
View File

@@ -0,0 +1,141 @@
"""PC Types API endpoints - Full CRUD."""
from flask import Blueprint, request
from flask_jwt_extended import jwt_required
from shopdb.extensions import db
from shopdb.core.models import PCType
from shopdb.utils.responses import (
success_response,
error_response,
paginated_response,
ErrorCodes
)
from shopdb.utils.pagination import get_pagination_params, paginate_query
pctypes_bp = Blueprint('pctypes', __name__)
@pctypes_bp.route('', methods=['GET'])
@jwt_required()
def list_pctypes():
"""List all PC types."""
page, per_page = get_pagination_params(request)
query = PCType.query
if request.args.get('active', 'true').lower() != 'false':
query = query.filter(PCType.isactive == True)
if search := request.args.get('search'):
query = query.filter(PCType.pctype.ilike(f'%{search}%'))
query = query.order_by(PCType.pctype)
items, total = paginate_query(query, page, per_page)
data = [pt.to_dict() for pt in items]
return paginated_response(data, page, per_page, total)
@pctypes_bp.route('/<int:type_id>', methods=['GET'])
@jwt_required()
def get_pctype(type_id: int):
"""Get a single PC type."""
pt = PCType.query.get(type_id)
if not pt:
return error_response(
ErrorCodes.NOT_FOUND,
f'PC type with ID {type_id} not found',
http_code=404
)
return success_response(pt.to_dict())
@pctypes_bp.route('', methods=['POST'])
@jwt_required()
def create_pctype():
"""Create a new PC type."""
data = request.get_json()
if not data or not data.get('pctype'):
return error_response(ErrorCodes.VALIDATION_ERROR, 'pctype is required')
if PCType.query.filter_by(pctype=data['pctype']).first():
return error_response(
ErrorCodes.CONFLICT,
f"PC type '{data['pctype']}' already exists",
http_code=409
)
pt = PCType(
pctype=data['pctype'],
description=data.get('description')
)
db.session.add(pt)
db.session.commit()
return success_response(pt.to_dict(), message='PC type created', http_code=201)
@pctypes_bp.route('/<int:type_id>', methods=['PUT'])
@jwt_required()
def update_pctype(type_id: int):
"""Update a PC type."""
pt = PCType.query.get(type_id)
if not pt:
return error_response(
ErrorCodes.NOT_FOUND,
f'PC type with ID {type_id} not found',
http_code=404
)
data = request.get_json()
if not data:
return error_response(ErrorCodes.VALIDATION_ERROR, 'No data provided')
if 'pctype' in data and data['pctype'] != pt.pctype:
if PCType.query.filter_by(pctype=data['pctype']).first():
return error_response(
ErrorCodes.CONFLICT,
f"PC type '{data['pctype']}' already exists",
http_code=409
)
for key in ['pctype', 'description', 'isactive']:
if key in data:
setattr(pt, key, data[key])
db.session.commit()
return success_response(pt.to_dict(), message='PC type updated')
@pctypes_bp.route('/<int:type_id>', methods=['DELETE'])
@jwt_required()
def delete_pctype(type_id: int):
"""Delete (deactivate) a PC type."""
pt = PCType.query.get(type_id)
if not pt:
return error_response(
ErrorCodes.NOT_FOUND,
f'PC type with ID {type_id} not found',
http_code=404
)
from shopdb.core.models import Machine
if Machine.query.filter_by(pctypeid=type_id, isactive=True).first():
return error_response(
ErrorCodes.CONFLICT,
'Cannot delete PC type: machines are using it',
http_code=409
)
pt.isactive = False
db.session.commit()
return success_response(message='PC type deleted')

203
shopdb/core/api/search.py Normal file
View File

@@ -0,0 +1,203 @@
"""Global search API endpoint."""
from flask import Blueprint, request
from flask_jwt_extended import jwt_required
from shopdb.extensions import db
from shopdb.core.models import (
Machine, Application, KnowledgeBase
)
from shopdb.utils.responses import success_response
search_bp = Blueprint('search', __name__)
@search_bp.route('', methods=['GET'])
@jwt_required(optional=True)
def global_search():
"""
Global search across multiple entity types.
Returns combined results from:
- Machines (equipment and PCs)
- Applications
- Knowledge Base articles
- Printers (if available)
Results are sorted by relevance score.
"""
query = request.args.get('q', '').strip()
if not query or len(query) < 2:
return success_response({
'results': [],
'query': query,
'message': 'Search query must be at least 2 characters'
})
if len(query) > 200:
return success_response({
'results': [],
'query': query[:200],
'message': 'Search query too long'
})
results = []
search_term = f'%{query}%'
# Search Machines (Equipment and PCs)
machines = Machine.query.filter(
Machine.isactive == True,
db.or_(
Machine.machinenumber.ilike(search_term),
Machine.alias.ilike(search_term),
Machine.hostname.ilike(search_term),
Machine.serialnumber.ilike(search_term),
Machine.notes.ilike(search_term)
)
).limit(10).all()
for m in machines:
# Determine type: PC, Printer, or Equipment
is_pc = m.pctypeid is not None
is_printer = m.is_printer
# Calculate relevance - exact matches score higher
relevance = 15
if m.machinenumber and query.lower() == m.machinenumber.lower():
relevance = 100
elif m.hostname and query.lower() == m.hostname.lower():
relevance = 100
elif m.alias and query.lower() in m.alias.lower():
relevance = 50
display_name = m.hostname if is_pc and m.hostname else m.machinenumber
if m.alias and not is_pc:
display_name = f"{m.machinenumber} ({m.alias})"
# Determine result type and URL
if is_printer:
result_type = 'printer'
url = f"/printers/{m.machineid}"
elif is_pc:
result_type = 'pc'
url = f"/pcs/{m.machineid}"
else:
result_type = 'machine'
url = f"/machines/{m.machineid}"
# Get location - prefer machine's own location, fall back to parent machine's location
location_name = None
if m.location:
location_name = m.location.locationname
elif m.parent_relationships:
# Check parent machines for location
for rel in m.parent_relationships:
if rel.parent_machine and rel.parent_machine.location:
location_name = rel.parent_machine.location.locationname
break
# Get machinetype from model (single source of truth)
mt = m.derived_machinetype
results.append({
'type': result_type,
'id': m.machineid,
'title': display_name,
'subtitle': mt.machinetype if mt else None,
'location': location_name,
'url': url,
'relevance': relevance
})
# Search Applications
apps = Application.query.filter(
Application.isactive == True,
db.or_(
Application.appname.ilike(search_term),
Application.appdescription.ilike(search_term)
)
).limit(10).all()
for app in apps:
relevance = 20
if query.lower() == app.appname.lower():
relevance = 100
elif query.lower() in app.appname.lower():
relevance = 50
results.append({
'type': 'application',
'id': app.appid,
'title': app.appname,
'subtitle': app.appdescription[:100] if app.appdescription else None,
'url': f"/applications/{app.appid}",
'relevance': relevance
})
# Search Knowledge Base
kb_articles = KnowledgeBase.query.filter(
KnowledgeBase.isactive == True,
db.or_(
KnowledgeBase.shortdescription.ilike(search_term),
KnowledgeBase.keywords.ilike(search_term)
)
).limit(20).all()
for kb in kb_articles:
# Weight by clicks and keyword match
relevance = 10 + (kb.clicks or 0) * 0.1
if kb.keywords and query.lower() in kb.keywords.lower():
relevance += 15
results.append({
'type': 'knowledgebase',
'id': kb.linkid,
'title': kb.shortdescription,
'subtitle': kb.application.appname if kb.application else None,
'url': f"/knowledgebase/{kb.linkid}",
'linkurl': kb.linkurl,
'relevance': relevance
})
# Search Printers (check if printers model exists)
try:
from shopdb.plugins.printers.models import Printer
printers = Printer.query.filter(
Printer.isactive == True,
db.or_(
Printer.printercsfname.ilike(search_term),
Printer.printerwindowsname.ilike(search_term),
Printer.serialnumber.ilike(search_term),
Printer.fqdn.ilike(search_term)
)
).limit(10).all()
for p in printers:
relevance = 15
if p.printercsfname and query.lower() == p.printercsfname.lower():
relevance = 100
display_name = p.printercsfname or p.printerwindowsname or f"Printer #{p.printerid}"
results.append({
'type': 'printer',
'id': p.printerid,
'title': display_name,
'subtitle': p.printerwindowsname if p.printercsfname else None,
'url': f"/printers/{p.printerid}",
'relevance': relevance
})
except ImportError:
pass # Printers plugin not installed
# Sort by relevance (highest first)
results.sort(key=lambda x: x['relevance'], reverse=True)
# Limit total results
results = results[:30]
return success_response({
'results': results,
'query': query,
'total': len(results)
})

139
shopdb/core/api/statuses.py Normal file
View File

@@ -0,0 +1,139 @@
"""Machine Statuses API endpoints - Full CRUD."""
from flask import Blueprint, request
from flask_jwt_extended import jwt_required
from shopdb.extensions import db
from shopdb.core.models import MachineStatus
from shopdb.utils.responses import (
success_response,
error_response,
paginated_response,
ErrorCodes
)
from shopdb.utils.pagination import get_pagination_params, paginate_query
statuses_bp = Blueprint('statuses', __name__)
@statuses_bp.route('', methods=['GET'])
@jwt_required(optional=True)
def list_statuses():
"""List all machine statuses."""
page, per_page = get_pagination_params(request)
query = MachineStatus.query
if request.args.get('active', 'true').lower() != 'false':
query = query.filter(MachineStatus.isactive == True)
query = query.order_by(MachineStatus.status)
items, total = paginate_query(query, page, per_page)
data = [s.to_dict() for s in items]
return paginated_response(data, page, per_page, total)
@statuses_bp.route('/<int:status_id>', methods=['GET'])
@jwt_required(optional=True)
def get_status(status_id: int):
"""Get a single status."""
s = MachineStatus.query.get(status_id)
if not s:
return error_response(
ErrorCodes.NOT_FOUND,
f'Status with ID {status_id} not found',
http_code=404
)
return success_response(s.to_dict())
@statuses_bp.route('', methods=['POST'])
@jwt_required()
def create_status():
"""Create a new status."""
data = request.get_json()
if not data or not data.get('status'):
return error_response(ErrorCodes.VALIDATION_ERROR, 'status is required')
if MachineStatus.query.filter_by(status=data['status']).first():
return error_response(
ErrorCodes.CONFLICT,
f"Status '{data['status']}' already exists",
http_code=409
)
s = MachineStatus(
status=data['status'],
description=data.get('description'),
color=data.get('color')
)
db.session.add(s)
db.session.commit()
return success_response(s.to_dict(), message='Status created', http_code=201)
@statuses_bp.route('/<int:status_id>', methods=['PUT'])
@jwt_required()
def update_status(status_id: int):
"""Update a status."""
s = MachineStatus.query.get(status_id)
if not s:
return error_response(
ErrorCodes.NOT_FOUND,
f'Status with ID {status_id} not found',
http_code=404
)
data = request.get_json()
if not data:
return error_response(ErrorCodes.VALIDATION_ERROR, 'No data provided')
if 'status' in data and data['status'] != s.status:
if MachineStatus.query.filter_by(status=data['status']).first():
return error_response(
ErrorCodes.CONFLICT,
f"Status '{data['status']}' already exists",
http_code=409
)
for key in ['status', 'description', 'color', 'isactive']:
if key in data:
setattr(s, key, data[key])
db.session.commit()
return success_response(s.to_dict(), message='Status updated')
@statuses_bp.route('/<int:status_id>', methods=['DELETE'])
@jwt_required()
def delete_status(status_id: int):
"""Delete (deactivate) a status."""
s = MachineStatus.query.get(status_id)
if not s:
return error_response(
ErrorCodes.NOT_FOUND,
f'Status with ID {status_id} not found',
http_code=404
)
from shopdb.core.models import Machine
if Machine.query.filter_by(statusid=status_id, isactive=True).first():
return error_response(
ErrorCodes.CONFLICT,
'Cannot delete status: machines are using it',
http_code=409
)
s.isactive = False
db.session.commit()
return success_response(message='Status deleted')

137
shopdb/core/api/vendors.py Normal file
View File

@@ -0,0 +1,137 @@
"""Vendors API endpoints - Full CRUD."""
from flask import Blueprint, request
from flask_jwt_extended import jwt_required
from shopdb.extensions import db
from shopdb.core.models import Vendor
from shopdb.utils.responses import (
success_response,
error_response,
paginated_response,
ErrorCodes
)
from shopdb.utils.pagination import get_pagination_params, paginate_query
vendors_bp = Blueprint('vendors', __name__)
@vendors_bp.route('', methods=['GET'])
@jwt_required()
def list_vendors():
"""List all vendors."""
page, per_page = get_pagination_params(request)
query = Vendor.query
if request.args.get('active', 'true').lower() != 'false':
query = query.filter(Vendor.isactive == True)
if search := request.args.get('search'):
query = query.filter(Vendor.vendor.ilike(f'%{search}%'))
query = query.order_by(Vendor.vendor)
items, total = paginate_query(query, page, per_page)
data = [v.to_dict() for v in items]
return paginated_response(data, page, per_page, total)
@vendors_bp.route('/<int:vendor_id>', methods=['GET'])
@jwt_required()
def get_vendor(vendor_id: int):
"""Get a single vendor."""
v = Vendor.query.get(vendor_id)
if not v:
return error_response(
ErrorCodes.NOT_FOUND,
f'Vendor with ID {vendor_id} not found',
http_code=404
)
return success_response(v.to_dict())
@vendors_bp.route('', methods=['POST'])
@jwt_required()
def create_vendor():
"""Create a new vendor."""
data = request.get_json()
if not data or not data.get('vendor'):
return error_response(ErrorCodes.VALIDATION_ERROR, 'vendor is required')
if Vendor.query.filter_by(vendor=data['vendor']).first():
return error_response(
ErrorCodes.CONFLICT,
f"Vendor '{data['vendor']}' already exists",
http_code=409
)
v = Vendor(
vendor=data['vendor'],
description=data.get('description'),
website=data.get('website'),
supportphone=data.get('supportphone'),
supportemail=data.get('supportemail'),
notes=data.get('notes')
)
db.session.add(v)
db.session.commit()
return success_response(v.to_dict(), message='Vendor created', http_code=201)
@vendors_bp.route('/<int:vendor_id>', methods=['PUT'])
@jwt_required()
def update_vendor(vendor_id: int):
"""Update a vendor."""
v = Vendor.query.get(vendor_id)
if not v:
return error_response(
ErrorCodes.NOT_FOUND,
f'Vendor with ID {vendor_id} not found',
http_code=404
)
data = request.get_json()
if not data:
return error_response(ErrorCodes.VALIDATION_ERROR, 'No data provided')
if 'vendor' in data and data['vendor'] != v.vendor:
if Vendor.query.filter_by(vendor=data['vendor']).first():
return error_response(
ErrorCodes.CONFLICT,
f"Vendor '{data['vendor']}' already exists",
http_code=409
)
for key in ['vendor', 'description', 'website', 'supportphone', 'supportemail', 'notes', 'isactive']:
if key in data:
setattr(v, key, data[key])
db.session.commit()
return success_response(v.to_dict(), message='Vendor updated')
@vendors_bp.route('/<int:vendor_id>', methods=['DELETE'])
@jwt_required()
def delete_vendor(vendor_id: int):
"""Delete (deactivate) a vendor."""
v = Vendor.query.get(vendor_id)
if not v:
return error_response(
ErrorCodes.NOT_FOUND,
f'Vendor with ID {vendor_id} not found',
http_code=404
)
v.isactive = False
db.session.commit()
return success_response(message='Vendor deleted')