BREAKING CHANGE: Replaced Node.js backend with Python Flask Reason: npm not available on production server, Python/pip is available. Changes: - Created app.py (Flask) to replace server.js (Node.js) - Created requirements.txt with only 2 dependencies (Flask, mysql-connector-python) - Updated README.md with Flask installation and deployment instructions - Maintained all existing functionality: * Same API endpoints (/api/notifications, /health) * Same database queries (isshopfloor filter, 72-hour window) * Same priority sorting (incidents first) * Serves static files from public/ directory * Same environment variable configuration Dependencies: - Flask==3.0.0 - mysql-connector-python==8.2.0 The public/index.html frontend remains unchanged - only the backend was converted. Tested and verified: - API endpoint returns correct data - Health check responds - Dashboard displays properly - Database connectivity working - PM2 process manager compatible 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
132 lines
4.4 KiB
Python
132 lines
4.4 KiB
Python
from flask import Flask, jsonify, send_from_directory
|
|
import mysql.connector
|
|
from datetime import datetime, timedelta
|
|
import os
|
|
|
|
app = Flask(__name__, static_folder='public')
|
|
|
|
# Database configuration
|
|
DB_CONFIG = {
|
|
'host': os.environ.get('DB_HOST', 'localhost'),
|
|
'port': int(os.environ.get('DB_PORT', 3306)),
|
|
'user': os.environ.get('DB_USER', '570005354'),
|
|
'password': os.environ.get('DB_PASS', '570005354'),
|
|
'database': os.environ.get('DB_NAME', 'shopdb')
|
|
}
|
|
|
|
# Get database connection
|
|
def get_db_connection():
|
|
return mysql.connector.connect(**DB_CONFIG)
|
|
|
|
# Serve static files (index.html, logo, etc)
|
|
@app.route('/')
|
|
def index():
|
|
return send_from_directory('public', 'index.html')
|
|
|
|
@app.route('/<path:path>')
|
|
def static_files(path):
|
|
return send_from_directory('public', path)
|
|
|
|
# API endpoint to get notifications
|
|
@app.route('/api/notifications')
|
|
def get_notifications():
|
|
try:
|
|
now = datetime.now()
|
|
future = now + timedelta(hours=72) # 72 hours from now
|
|
|
|
# Connect to database
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor(dictionary=True)
|
|
|
|
# Query with isshopfloor filter and 72-hour window
|
|
query = """
|
|
SELECT n.notificationid, n.notification, n.starttime, n.endtime, n.ticketnumber, n.link,
|
|
n.isactive, n.isshopfloor, nt.typename, nt.typecolor
|
|
FROM notifications n
|
|
LEFT JOIN notificationtypes nt ON n.notificationtypeid = nt.notificationtypeid
|
|
WHERE n.isactive = 1
|
|
AND n.isshopfloor = 1
|
|
AND (
|
|
(n.starttime <= %s AND (n.endtime IS NULL OR n.endtime >= %s))
|
|
OR (n.starttime BETWEEN %s AND %s)
|
|
)
|
|
ORDER BY n.starttime ASC
|
|
"""
|
|
|
|
cursor.execute(query, (future, now, now, future))
|
|
rows = cursor.fetchall()
|
|
|
|
cursor.close()
|
|
conn.close()
|
|
|
|
# Convert bit fields to boolean and datetime to ISO strings
|
|
for row in rows:
|
|
row['isactive'] = bool(row['isactive'])
|
|
row['isshopfloor'] = bool(row['isshopfloor'])
|
|
if row['starttime']:
|
|
row['starttime'] = row['starttime'].isoformat()
|
|
if row['endtime']:
|
|
row['endtime'] = row['endtime'].isoformat()
|
|
|
|
# Categorize notifications
|
|
current_events = []
|
|
upcoming_events = []
|
|
|
|
for notification in rows:
|
|
start = datetime.fromisoformat(notification['starttime'])
|
|
end = datetime.fromisoformat(notification['endtime']) if notification['endtime'] else None
|
|
|
|
if start <= now and (end is None or end >= now):
|
|
current_events.append(notification)
|
|
else:
|
|
upcoming_events.append(notification)
|
|
|
|
# Sort current events by severity priority, then by starttime
|
|
# Priority: Incident (danger) > Change (warning) > Awareness/TBD (success)
|
|
severity_priority = {
|
|
'danger': 1, # Incident - highest priority
|
|
'warning': 2, # Change
|
|
'success': 3, # Awareness/TBD - lowest priority
|
|
'secondary': 4 # Fallback
|
|
}
|
|
|
|
current_events.sort(key=lambda x: (
|
|
severity_priority.get(x['typecolor'], 4),
|
|
datetime.fromisoformat(x['starttime'])
|
|
))
|
|
|
|
# Upcoming events stay sorted by starttime only
|
|
upcoming_events.sort(key=lambda x: datetime.fromisoformat(x['starttime']))
|
|
|
|
return jsonify({
|
|
'success': True,
|
|
'timestamp': datetime.now().isoformat(),
|
|
'current': current_events,
|
|
'upcoming': upcoming_events
|
|
})
|
|
|
|
except mysql.connector.Error as err:
|
|
return jsonify({
|
|
'success': False,
|
|
'error': f'Database error: {str(err)}'
|
|
}), 500
|
|
except Exception as e:
|
|
return jsonify({
|
|
'success': False,
|
|
'error': f'Server error: {str(e)}'
|
|
}), 500
|
|
|
|
# Health check endpoint
|
|
@app.route('/health')
|
|
def health():
|
|
return jsonify({
|
|
'status': 'ok',
|
|
'timestamp': datetime.now().isoformat()
|
|
})
|
|
|
|
if __name__ == '__main__':
|
|
port = int(os.environ.get('PORT', 3001))
|
|
print(f'Shopfloor Dashboard running on port {port}')
|
|
print(f'Access at: http://localhost:{port}')
|
|
app.run(host='0.0.0.0', port=port, debug=False)
|