Added better error handling and MySQL 5.6 support for Windows Server: 1. Enhanced error logging in app.py: - Detailed database error messages with error codes - Full stack traces logged to console/PM2 - Error details returned in JSON for debugging 2. Created app-pymysql.py: - Alternative version using PyMySQL instead of mysql-connector-python - Better compatibility with older MySQL 5.6 servers - Handles bit field conversion from bytes to boolean - Pure Python implementation (no C extensions) 3. Added requirements-mysql56.txt: - PyMySQL 1.1.0 for MySQL 5.6 compatibility - Use this on Windows servers with old MySQL For production Windows servers with MySQL 5.6, use: pip install -r requirements-mysql56.txt python app-pymysql.py For debugging 500 errors, check console/PM2 logs for detailed error messages. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
145 lines
5.0 KiB
Python
145 lines
5.0 KiB
Python
from flask import Flask, jsonify, send_from_directory
|
|
import pymysql
|
|
from datetime import datetime, timedelta
|
|
import os
|
|
|
|
app = Flask(__name__, static_folder='public')
|
|
|
|
# Database configuration
|
|
DB_CONFIG = {
|
|
'host': os.environ.get('DB_HOST', 'localhost'),
|
|
'port': int(os.environ.get('DB_PORT', 3306)),
|
|
'user': os.environ.get('DB_USER', '570005354'),
|
|
'password': os.environ.get('DB_PASS', '570005354'),
|
|
'database': os.environ.get('DB_NAME', 'shopdb'),
|
|
'charset': 'utf8mb4',
|
|
'cursorclass': pymysql.cursors.DictCursor
|
|
}
|
|
|
|
# Get database connection
|
|
def get_db_connection():
|
|
return pymysql.connect(**DB_CONFIG)
|
|
|
|
# Serve static files (index.html, logo, etc)
|
|
@app.route('/')
|
|
def index():
|
|
return send_from_directory('public', 'index.html')
|
|
|
|
@app.route('/<path:path>')
|
|
def static_files(path):
|
|
return send_from_directory('public', path)
|
|
|
|
# API endpoint to get notifications
|
|
@app.route('/api/notifications')
|
|
def get_notifications():
|
|
try:
|
|
now = datetime.now()
|
|
future = now + timedelta(hours=72) # 72 hours from now
|
|
|
|
# Connect to database
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor()
|
|
|
|
# Query with isshopfloor filter and 72-hour window
|
|
query = """
|
|
SELECT n.notificationid, n.notification, n.starttime, n.endtime, n.ticketnumber, n.link,
|
|
n.isactive, n.isshopfloor, nt.typename, nt.typecolor
|
|
FROM notifications n
|
|
LEFT JOIN notificationtypes nt ON n.notificationtypeid = nt.notificationtypeid
|
|
WHERE n.isactive = 1
|
|
AND n.isshopfloor = 1
|
|
AND (
|
|
(n.starttime <= %s AND (n.endtime IS NULL OR n.endtime >= %s))
|
|
OR (n.starttime BETWEEN %s AND %s)
|
|
)
|
|
ORDER BY n.starttime ASC
|
|
"""
|
|
|
|
cursor.execute(query, (future, now, now, future))
|
|
rows = cursor.fetchall()
|
|
|
|
cursor.close()
|
|
conn.close()
|
|
|
|
# Convert bit fields to boolean and datetime to ISO strings
|
|
for row in rows:
|
|
# PyMySQL returns bytes for bit fields, convert to boolean
|
|
row['isactive'] = bool(row['isactive'][0]) if isinstance(row['isactive'], bytes) else bool(row['isactive'])
|
|
row['isshopfloor'] = bool(row['isshopfloor'][0]) if isinstance(row['isshopfloor'], bytes) else bool(row['isshopfloor'])
|
|
|
|
if row['starttime']:
|
|
row['starttime'] = row['starttime'].isoformat() if hasattr(row['starttime'], 'isoformat') else str(row['starttime'])
|
|
if row['endtime']:
|
|
row['endtime'] = row['endtime'].isoformat() if hasattr(row['endtime'], 'isoformat') else str(row['endtime'])
|
|
|
|
# Categorize notifications
|
|
current_events = []
|
|
upcoming_events = []
|
|
|
|
for notification in rows:
|
|
start = datetime.fromisoformat(notification['starttime'])
|
|
end = datetime.fromisoformat(notification['endtime']) if notification['endtime'] else None
|
|
|
|
if start <= now and (end is None or end >= now):
|
|
current_events.append(notification)
|
|
else:
|
|
upcoming_events.append(notification)
|
|
|
|
# Sort current events by severity priority, then by starttime
|
|
severity_priority = {
|
|
'danger': 1,
|
|
'warning': 2,
|
|
'success': 3,
|
|
'secondary': 4
|
|
}
|
|
|
|
current_events.sort(key=lambda x: (
|
|
severity_priority.get(x['typecolor'], 4),
|
|
datetime.fromisoformat(x['starttime'])
|
|
))
|
|
|
|
upcoming_events.sort(key=lambda x: datetime.fromisoformat(x['starttime']))
|
|
|
|
return jsonify({
|
|
'success': True,
|
|
'timestamp': datetime.now().isoformat(),
|
|
'current': current_events,
|
|
'upcoming': upcoming_events
|
|
})
|
|
|
|
except pymysql.Error as err:
|
|
import traceback
|
|
error_details = {
|
|
'success': False,
|
|
'error': f'Database error: {str(err)}',
|
|
'error_code': err.args[0] if err.args else None,
|
|
'traceback': traceback.format_exc()
|
|
}
|
|
print(f"DATABASE ERROR: {error_details}")
|
|
return jsonify(error_details), 500
|
|
except Exception as e:
|
|
import traceback
|
|
error_details = {
|
|
'success': False,
|
|
'error': f'Server error: {str(e)}',
|
|
'type': type(e).__name__,
|
|
'traceback': traceback.format_exc()
|
|
}
|
|
print(f"SERVER ERROR: {error_details}")
|
|
return jsonify(error_details), 500
|
|
|
|
# Health check endpoint
|
|
@app.route('/health')
|
|
def health():
|
|
return jsonify({
|
|
'status': 'ok',
|
|
'timestamp': datetime.now().isoformat()
|
|
})
|
|
|
|
if __name__ == '__main__':
|
|
port = int(os.environ.get('PORT', 3001))
|
|
print(f'Shopfloor Dashboard running on port {port}')
|
|
print(f'Access at: http://localhost:{port}')
|
|
print(f'Using PyMySQL connector for MySQL 5.6 compatibility')
|
|
app.run(host='0.0.0.0', port=port, debug=False)
|