from flask import Flask, jsonify, send_from_directory import pymysql from datetime import datetime, timedelta import os app = Flask(__name__, static_folder='public') # Database configuration DB_CONFIG = { 'host': os.environ.get('DB_HOST', 'localhost'), 'port': int(os.environ.get('DB_PORT', 3306)), 'user': os.environ.get('DB_USER', '570005354'), 'password': os.environ.get('DB_PASS', '570005354'), 'database': os.environ.get('DB_NAME', 'shopdb'), 'charset': 'utf8mb4', 'cursorclass': pymysql.cursors.DictCursor } # Get database connection def get_db_connection(): return pymysql.connect(**DB_CONFIG) # Serve static files (index.html, logo, etc) @app.route('/') def index(): return send_from_directory('public', 'index.html') @app.route('/') def static_files(path): return send_from_directory('public', path) # API endpoint to get notifications @app.route('/api/notifications') def get_notifications(): try: now = datetime.now() future = now + timedelta(hours=72) # 72 hours from now # Connect to database conn = get_db_connection() cursor = conn.cursor() # Query with isshopfloor filter and 72-hour window query = """ SELECT n.notificationid, n.notification, n.starttime, n.endtime, n.ticketnumber, n.link, n.isactive, n.isshopfloor, nt.typename, nt.typecolor FROM notifications n LEFT JOIN notificationtypes nt ON n.notificationtypeid = nt.notificationtypeid WHERE n.isactive = 1 AND n.isshopfloor = 1 AND ( (n.starttime <= %s AND (n.endtime IS NULL OR n.endtime >= %s)) OR (n.starttime BETWEEN %s AND %s) ) ORDER BY n.starttime ASC """ cursor.execute(query, (future, now, now, future)) rows = cursor.fetchall() cursor.close() conn.close() # Convert bit fields to boolean and datetime to ISO strings for row in rows: # PyMySQL returns bytes for bit fields, convert to boolean row['isactive'] = bool(row['isactive'][0]) if isinstance(row['isactive'], bytes) else bool(row['isactive']) row['isshopfloor'] = bool(row['isshopfloor'][0]) if isinstance(row['isshopfloor'], bytes) else bool(row['isshopfloor']) if row['starttime']: row['starttime'] = row['starttime'].isoformat() if hasattr(row['starttime'], 'isoformat') else str(row['starttime']) if row['endtime']: row['endtime'] = row['endtime'].isoformat() if hasattr(row['endtime'], 'isoformat') else str(row['endtime']) # Categorize notifications current_events = [] upcoming_events = [] for notification in rows: start = datetime.fromisoformat(notification['starttime']) end = datetime.fromisoformat(notification['endtime']) if notification['endtime'] else None if start <= now and (end is None or end >= now): current_events.append(notification) else: upcoming_events.append(notification) # Sort current events by severity priority, then by starttime severity_priority = { 'danger': 1, 'warning': 2, 'success': 3, 'secondary': 4 } current_events.sort(key=lambda x: ( severity_priority.get(x['typecolor'], 4), datetime.fromisoformat(x['starttime']) )) upcoming_events.sort(key=lambda x: datetime.fromisoformat(x['starttime'])) return jsonify({ 'success': True, 'timestamp': datetime.now().isoformat(), 'current': current_events, 'upcoming': upcoming_events }) except pymysql.Error as err: import traceback error_details = { 'success': False, 'error': f'Database error: {str(err)}', 'error_code': err.args[0] if err.args else None, 'traceback': traceback.format_exc() } print(f"DATABASE ERROR: {error_details}") return jsonify(error_details), 500 except Exception as e: import traceback error_details = { 'success': False, 'error': f'Server error: {str(e)}', 'type': type(e).__name__, 'traceback': traceback.format_exc() } print(f"SERVER ERROR: {error_details}") return jsonify(error_details), 500 # Health check endpoint @app.route('/health') def health(): return jsonify({ 'status': 'ok', 'timestamp': datetime.now().isoformat() }) if __name__ == '__main__': port = int(os.environ.get('PORT', 3001)) print(f'Shopfloor Dashboard running on port {port}') print(f'Access at: http://localhost:{port}') print(f'Using PyMySQL connector for MySQL 5.6 compatibility') app.run(host='0.0.0.0', port=port, debug=False)