Files
udc-parser/parser/clmparser.py
cproudlock 149a223ce2 Add --max-age filter to skip old files during import
- Default: 90 days (matches retention policy)
- Use --max-age 0 to disable filtering
- Extracts date from CLM filename pattern
- Shows "Skipped (too old)" count in summary
- Prevents re-importing data that would be immediately purged

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-16 10:08:25 -05:00

1265 lines
53 KiB
Python

#!/usr/bin/env python3
"""
CLM Data Parser
Parses CLM_Data JSON files (authoritative UDC data source) and imports to ShopDB.
Usage:
python clmparser.py [--dir /path/to/CLM_Data] [--file specific_file.json] [--machine 3101]
CLM_Data Structure:
/CLM_Data/{MachineNumber}/Data/{PartNum}_{Oper}_{Serial}_{Date}_{Time}.json
JSON Structure:
- HeaderValues: Part info, badge, machine, timestamps
- ItemCrossing: Program steps with DataInput measurements
- End: Part run completion
Production Path (Windows): S:\\SPC\\UDC\\CLM_Data\\*Machine Number*\\Data\\
IMPORTANT: Production only READS from this location, NEVER modifies or deletes.
"""
import json
import os
import sys
import glob
import re
import argparse
import hashlib
from datetime import datetime, timedelta
from typing import Optional, Dict, List, Any
from concurrent.futures import ThreadPoolExecutor, as_completed
import mysql.connector
from mysql.connector import Error
# Import config
try:
from config import DB_CONFIG, CLM_DATA_PATH, BATCH_SIZE
except ImportError:
import platform
IS_WINDOWS = platform.system() == 'Windows'
DB_CONFIG = {
'host': '127.0.0.1',
'port': 3306,
'user': 'root',
'password': 'rootpassword',
'database': 'shopdb'
}
CLM_DATA_PATH = r'S:\SPC\UDC\CLM_Data' if IS_WINDOWS else '/home/camp/projects/UDC/CLM_Data'
BATCH_SIZE = 1000
def parse_clm_timestamp(ts_str: str) -> Optional[datetime]:
"""Parse CLM timestamp: MM/DD/YYYY HH:MM:SS"""
if not ts_str:
return None
try:
return datetime.strptime(ts_str, '%m/%d/%Y %H:%M:%S')
except (ValueError, TypeError):
return None
def parse_numeric(val) -> Optional[float]:
"""Parse numeric value from JSON (already numeric or string)"""
if val is None:
return None
try:
return float(val)
except (ValueError, TypeError):
return None
def file_hash(filepath: str) -> Optional[str]:
"""Generate MD5 hash of file for duplicate detection"""
try:
hash_md5 = hashlib.md5()
with open(filepath, 'rb') as f:
for chunk in iter(lambda: f.read(4096), b''):
hash_md5.update(chunk)
return hash_md5.hexdigest()
except OSError as e:
print(f" Warning: Could not hash file {filepath}: {e}")
return None
def is_file_complete(filepath: str) -> bool:
"""Check if CLM JSON file has an End block (indicates job is complete)"""
try:
with open(filepath, 'r', encoding='utf-8') as f:
data = json.load(f)
if not isinstance(data, list):
return False
for item in data:
if 'End' in item:
return True
return False
except (json.JSONDecodeError, OSError):
return False
class CLMParser:
"""Parser for CLM_Data JSON files"""
def __init__(self, db_config: dict, max_age_days: int = None):
self.db_config = db_config
self.conn = None
self.cursor = None
self.measurements_batch = []
self.events_batch = []
self.manual_requests_batch = []
self.header_updates_batch = []
self.violations_batch = []
self.max_age_days = max_age_days # None = no limit
self.cutoff_date = None
if max_age_days:
self.cutoff_date = (datetime.now() - timedelta(days=max_age_days)).date()
def get_file_date(self, filename: str) -> Optional[datetime]:
"""Extract date from CLM filename: {Part}_{Oper}_{Serial}_{YYYY-MM-DD}_{HH-MM-SS}.json"""
match = re.search(r'(\d{4}-\d{2}-\d{2})_(\d{2}-\d{2}-\d{2})\.json$', filename)
if match:
try:
date_str = match.group(1)
time_str = match.group(2).replace('-', ':')
return datetime.strptime(f"{date_str} {time_str}", '%Y-%m-%d %H:%M:%S')
except ValueError:
return None
return None
def is_file_too_old(self, filename: str) -> bool:
"""Check if file date is older than max_age_days"""
if not self.cutoff_date:
return False
file_date = self.get_file_date(filename)
if file_date:
return file_date.date() < self.cutoff_date
return False # If can't parse date, don't skip
def connect(self):
"""Connect to MySQL database"""
try:
self.conn = mysql.connector.connect(**self.db_config)
self.cursor = self.conn.cursor(dictionary=True)
print(f"Connected to MySQL at {self.db_config['host']}")
return True
except Error as e:
print(f"Error connecting to MySQL: {e}")
return False
def disconnect(self):
"""Disconnect from database"""
if self.cursor:
self.cursor.close()
if self.conn:
self.conn.close()
def ensure_clm_tracking_table(self):
"""Create tracking table for CLM imports if not exists"""
self.cursor.execute("""
CREATE TABLE IF NOT EXISTS udcclmfiles (
fileid INT AUTO_INCREMENT PRIMARY KEY,
filename VARCHAR(180) NOT NULL,
machinenumber VARCHAR(10),
filehash VARCHAR(32),
partrunid INT,
sessionid INT,
recordcount INT DEFAULT 0,
importdate DATETIME DEFAULT CURRENT_TIMESTAMP,
UNIQUE KEY uk_filename (filename),
UNIQUE KEY uk_filehash (filehash),
INDEX idx_machine (machinenumber),
INDEX idx_importdate (importdate)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
""")
self.conn.commit()
def load_known_files(self) -> dict:
"""Load all known files into memory for fast lookup (avoids per-file DB queries)"""
self.cursor.execute("""
SELECT filename, filehash, filemodtime, partrunid, recordcount
FROM udcclmfiles
""")
known_files = {}
for row in self.cursor.fetchall():
known_files[row['filename']] = {
'filehash': row['filehash'],
'filemodtime': row['filemodtime'],
'partrunid': row['partrunid'],
'recordcount': row['recordcount']
}
return known_files
def get_import_status(self, filename: str, filehash: str = None) -> dict:
"""Check import status of a CLM file
Returns: {'imported': bool, 'hash_changed': bool, 'partrunid': int or None, 'stored_hash': str or None, 'filemodtime': datetime or None}
"""
basename = os.path.basename(filename)
result = {'imported': False, 'hash_changed': False, 'partrunid': None, 'stored_hash': None, 'filemodtime': None}
# Check by filename
self.cursor.execute(
"SELECT fileid, filehash, partrunid, filemodtime FROM udcclmfiles WHERE filename = %s",
(basename,)
)
row = self.cursor.fetchone()
if row:
result['imported'] = True
result['stored_hash'] = row['filehash']
result['partrunid'] = row['partrunid']
result['filemodtime'] = row['filemodtime']
if filehash and row['filehash'] != filehash:
result['hash_changed'] = True
return result
# Check by hash if provided (different filename, same content)
if filehash:
self.cursor.execute(
"SELECT fileid, partrunid FROM udcclmfiles WHERE filehash = %s",
(filehash,)
)
row = self.cursor.fetchone()
if row:
result['imported'] = True
result['partrunid'] = row['partrunid']
return result
return result
def quick_check_imported(self, filepath: str, known_files: dict = None) -> dict:
"""Fast check if file needs processing (no hash computation).
Uses file modification time to skip unchanged files.
If known_files dict is provided, uses in-memory lookup (much faster).
Returns: {'skip': bool, 'reason': str, 'needs_hash_check': bool, 'partrunid': int or None}
"""
basename = os.path.basename(filepath)
# Use in-memory cache if provided (much faster than DB query per file)
if known_files is not None:
if basename not in known_files:
return {'skip': False, 'reason': 'new', 'needs_hash_check': False, 'partrunid': None}
file_info = known_files[basename]
stored_modtime = file_info['filemodtime']
# If we have a stored modtime, skip without checking current file modtime
# (avoids slow network stat calls - files rarely change after creation)
if stored_modtime:
return {'skip': True, 'reason': 'unchanged', 'needs_hash_check': False, 'partrunid': file_info['partrunid']}
# No stored modtime - need to check current modtime and maybe update
try:
file_mtime = datetime.fromtimestamp(os.path.getmtime(filepath))
except OSError:
return {'skip': False, 'reason': 'cannot_read', 'needs_hash_check': False, 'partrunid': None}
# File exists in DB but no modtime stored - check hash to be safe
return {'skip': False, 'reason': 'no_modtime', 'needs_hash_check': True, 'partrunid': file_info['partrunid']}
# No cache - get file modification time
try:
file_mtime = datetime.fromtimestamp(os.path.getmtime(filepath))
except OSError:
return {'skip': False, 'reason': 'cannot_read', 'needs_hash_check': False, 'partrunid': None}
# Fallback to DB query if no cache provided
self.cursor.execute(
"SELECT fileid, filehash, partrunid, filemodtime FROM udcclmfiles WHERE filename = %s",
(basename,)
)
row = self.cursor.fetchone()
if not row:
return {'skip': False, 'reason': 'new', 'needs_hash_check': False, 'partrunid': None}
stored_modtime = row['filemodtime']
if stored_modtime and file_mtime <= stored_modtime:
return {'skip': True, 'reason': 'unchanged', 'needs_hash_check': False, 'partrunid': row['partrunid']}
return {'skip': False, 'reason': 'modified', 'needs_hash_check': True, 'partrunid': row['partrunid']}
def is_file_imported(self, filename: str, filehash: str = None) -> bool:
"""Check if a CLM file has already been imported (simple boolean check)"""
status = self.get_import_status(filename, filehash)
return status['imported'] and not status['hash_changed']
def delete_partrun(self, partrun_id: int):
"""Delete a part run and all associated records for re-import"""
# Delete in order due to foreign key constraints
self.cursor.execute("DELETE FROM udcmanualrequests WHERE partrunid = %s", (partrun_id,))
self.cursor.execute("DELETE FROM udcmeasurements WHERE partrunid = %s", (partrun_id,))
self.cursor.execute("DELETE FROM udcevents WHERE partrunid = %s", (partrun_id,))
self.cursor.execute("DELETE FROM udcparts WHERE partrunid = %s", (partrun_id,))
self.cursor.execute("DELETE FROM udcclmfiles WHERE partrunid = %s", (partrun_id,))
self.conn.commit()
def record_import(self, filepath: str, machine_number: str, filehash: str,
partrun_id: int, session_id: int, record_count: int):
"""Record successful import of a CLM file"""
basename = os.path.basename(filepath)
# Get file modification time for fast skip checks on future runs
try:
file_mtime = datetime.fromtimestamp(os.path.getmtime(filepath))
except OSError:
file_mtime = None
self.cursor.execute("""
INSERT INTO udcclmfiles (filename, machinenumber, filehash, filemodtime, partrunid, sessionid, recordcount)
VALUES (%s, %s, %s, %s, %s, %s, %s)
ON DUPLICATE KEY UPDATE
partrunid = VALUES(partrunid),
sessionid = VALUES(sessionid),
recordcount = VALUES(recordcount),
filemodtime = VALUES(filemodtime),
importdate = CURRENT_TIMESTAMP
""", (basename, machine_number, filehash, file_mtime, partrun_id, session_id, record_count))
self.conn.commit()
def record_skipped_file(self, filepath: str, machine_number: str, filehash: str, skip_reason: str):
"""Record a skipped file (tool setup, incomplete) so it's not re-checked every run"""
basename = os.path.basename(filepath)
try:
file_mtime = datetime.fromtimestamp(os.path.getmtime(filepath))
except OSError:
file_mtime = None
# Use recordcount = -1 to indicate skipped file, store reason in a comment-like way
self.cursor.execute("""
INSERT INTO udcclmfiles (filename, machinenumber, filehash, filemodtime, partrunid, sessionid, recordcount)
VALUES (%s, %s, %s, %s, NULL, NULL, -1)
ON DUPLICATE KEY UPDATE
filemodtime = VALUES(filemodtime),
importdate = CURRENT_TIMESTAMP
""", (basename, machine_number, filehash, file_mtime))
self.conn.commit()
def get_or_create_session(self, machine_number: str, start_time: datetime) -> int:
"""Get or create a session for the machine and date"""
# For CLM data, we create one session per machine per day
session_date = start_time.date()
session_name = f"CLM_{machine_number}_{session_date.isoformat()}"
# Check if session exists
self.cursor.execute(
"""SELECT sessionid FROM udcsessions
WHERE machinenumber = %s AND logfilename = %s""",
(machine_number, session_name)
)
row = self.cursor.fetchone()
if row:
return row['sessionid']
# Create new session
self.cursor.execute(
"""INSERT INTO udcsessions (machinenumber, logfilename, sessionstart, dateadded)
VALUES (%s, %s, %s, NOW())""",
(machine_number, session_name, start_time)
)
self.conn.commit()
return self.cursor.lastrowid
def create_part_run(self, session_id: int, header: dict, start_time: datetime,
end_time: datetime = None, machine_number: str = None) -> int:
"""Create a part run record from CLM header data"""
cycletime = None
if start_time and end_time:
cycletime = int((end_time - start_time).total_seconds())
# Calculate changeover time (time since previous part ended on this machine)
changeover = None
if start_time and machine_number:
self.cursor.execute(
"""SELECT MAX(p.programend) as prev_end
FROM udcparts p
JOIN udcsessions s ON p.sessionid = s.sessionid
WHERE s.machinenumber = %s AND p.programend < %s AND p.programend IS NOT NULL""",
(machine_number, start_time)
)
row = self.cursor.fetchone()
if row and row['prev_end']:
changeover_secs = int((start_time - row['prev_end']).total_seconds())
# Only set if reasonable (> 0 and < 24 hours)
if 0 < changeover_secs < 86400:
changeover = changeover_secs
# Strip whitespace and truncate strings to fit column sizes
partnumber = (header.get('PartNum') or '').strip()[:50]
opernumber = (header.get('PartOper') or '').strip()[:20]
serialnumber = (header.get('SerialNum') or '').strip()[:50]
programname = (header.get('ProgName') or '').strip()[:100]
jobnumber = (header.get('ProgNum') or '').strip()[:50]
badgenumber = (header.get('BadgeNumber') or '').strip()[:50]
self.cursor.execute(
"""INSERT INTO udcparts
(sessionid, partnumber, opernumber, serialnumber, programname, jobnumber,
badgenumber, programstart, programend, cycletime, changeover, dateadded)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, NOW())""",
(
session_id,
partnumber or None,
opernumber or None,
serialnumber or None,
programname or None,
jobnumber or None,
badgenumber or None,
start_time,
end_time,
cycletime,
changeover
)
)
self.conn.commit()
return self.cursor.lastrowid
def update_part_run_counts(self, partrun_id: int, measurement_count: int,
manual_count: int, probe_count: int, oot_count: int):
"""Update part run with measurement counts"""
self.cursor.execute(
"""UPDATE udcparts SET
measurementcount = %s, manualcount = %s, probecount = %s, ootcount = %s
WHERE partrunid = %s""",
(measurement_count, manual_count, probe_count, oot_count, partrun_id)
)
self.conn.commit()
def add_measurement(self, partrun_id: int, session_id: int, event_time: datetime,
event_type: str, method: str, dimid: str, description: str,
seq: int, minval: float, maxval: float, actualval: float,
deviation: float, oot: int):
"""Add a measurement to the batch"""
self.measurements_batch.append((
partrun_id,
session_id,
event_time,
event_type,
method,
dimid,
description[:255] if description else None,
seq,
minval,
maxval,
actualval,
deviation,
oot
))
if len(self.measurements_batch) >= BATCH_SIZE:
self.flush_measurements()
def add_event(self, partrun_id: int, session_id: int, event_time: datetime,
event_type: str, item_number: str, description: str):
"""Add an event to the batch"""
self.events_batch.append((
partrun_id,
session_id,
event_time,
event_type,
item_number,
description[:500] if description else None
))
if len(self.events_batch) >= BATCH_SIZE:
self.flush_events()
def add_manual_request(self, partrun_id: int, request_time: datetime,
response_time: datetime, description: str):
"""Add a manual request timing record"""
response_seconds = None
if request_time and response_time:
response_seconds = int((response_time - request_time).total_seconds())
self.cursor.execute(
"""INSERT INTO udcmanualrequests
(partrunid, requesttime, responsetime, responseseconds, description)
VALUES (%s, %s, %s, %s, %s)""",
(
partrun_id,
request_time,
response_time,
response_seconds,
description[:255] if description else None
)
)
self.conn.commit()
return response_seconds
def flush_measurements(self):
"""Flush measurement batch to database"""
if not self.measurements_batch:
return
self.cursor.executemany(
"""INSERT INTO udcmeasurements
(partrunid, sessionid, eventtime, eventtype, method, dimid, description,
seqnumber, minval, maxval, actualval, deviation, oot)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)""",
self.measurements_batch
)
self.conn.commit()
self.measurements_batch = []
def flush_events(self):
"""Flush event batch to database"""
if not self.events_batch:
return
self.cursor.executemany(
"""INSERT INTO udcevents
(partrunid, sessionid, eventtime, eventtype, itemnumber, description)
VALUES (%s, %s, %s, %s, %s, %s)""",
self.events_batch
)
self.conn.commit()
self.events_batch = []
def add_header_update(self, partrun_id: int, session_id: int, machine_number: str,
event_time: datetime, details: str, description: str, badge: str):
"""Add a header update (badge change) to the batch"""
# Strip whitespace and truncate badge
clean_badge = badge.strip()[:20] if badge else None
self.header_updates_batch.append((
partrun_id,
session_id,
machine_number,
event_time,
details[:255] if details else None,
description[:255] if description else None,
clean_badge
))
if len(self.header_updates_batch) >= BATCH_SIZE:
self.flush_header_updates()
def flush_header_updates(self):
"""Flush header updates batch to database"""
if not self.header_updates_batch:
return
self.cursor.executemany(
"""INSERT INTO udcheaderupdates
(partrunid, sessionid, machinenumber, eventtime, details, description, badgenumber)
VALUES (%s, %s, %s, %s, %s, %s, %s)""",
self.header_updates_batch
)
self.conn.commit()
self.header_updates_batch = []
def add_violation(self, partrun_id: int, session_id: int, machine_number: str,
event_time: datetime, previous_val: float, current_val: float,
badge: str, item_no: str, crossing_desc: str):
"""Add a parameter violation to the batch"""
# Strip whitespace and truncate badge
clean_badge = badge.strip()[:20] if badge else None
self.violations_batch.append((
partrun_id,
session_id,
machine_number,
event_time,
previous_val,
current_val,
clean_badge,
item_no[:20] if item_no else None,
crossing_desc[:255] if crossing_desc else None
))
if len(self.violations_batch) >= BATCH_SIZE:
self.flush_violations()
def flush_violations(self):
"""Flush violations batch to database"""
if not self.violations_batch:
return
self.cursor.executemany(
"""INSERT INTO udcviolations
(partrunid, sessionid, machinenumber, eventtime, previousval, currentval,
badgenumber, itemno, crossingdesc)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)""",
self.violations_batch
)
self.conn.commit()
self.violations_batch = []
def parse_file(self, filepath: str, known_files: dict = None) -> Dict[str, Any]:
"""Parse a single CLM JSON file"""
filename = os.path.basename(filepath)
# Check if file is too old (based on filename date)
if self.is_file_too_old(filename):
return {'success': False, 'error': f'File too old (>{self.max_age_days} days): {filename}', 'skip_silent': True, 'too_old': True}
# Extract machine number from parent folder
parent_dir = os.path.basename(os.path.dirname(os.path.dirname(filepath)))
machine_number = parent_dir if parent_dir.isdigit() else None
if not machine_number:
# Try to extract from file path
parts = filepath.split(os.sep)
for part in parts:
if part.isdigit() and len(part) == 4:
machine_number = part
break
if not machine_number:
return {'success': False, 'error': f'Could not determine machine number for {filename}'}
# Quick check by filename + modification time (avoids reading file for hash)
quick_status = self.quick_check_imported(filepath, known_files)
if quick_status['skip']:
return {'success': False, 'error': f'File unchanged: {filename}', 'skip_silent': True}
# Only compute hash if file is new or modification time changed
fhash = file_hash(filepath)
if fhash is None:
return {'success': False, 'error': f'Could not read file: {filename}'}
# Full import status check with hash
import_status = self.get_import_status(filepath, fhash)
file_complete = is_file_complete(filepath)
if import_status['imported']:
if import_status['hash_changed']:
# File was updated since last import
if file_complete:
# Job is now complete - delete old records and re-import
print(f"Re-importing completed file: {filename}")
self.delete_partrun(import_status['partrunid'])
else:
# Still incomplete, skip for now
return {'success': False, 'error': f'File updated but still incomplete: {filename}', 'in_progress': True}
else:
# Hash unchanged - already fully imported (update modtime for future fast skips)
return {'success': False, 'error': f'File already imported: {filename}'}
print(f"Parsing {filename} (machine {machine_number})...")
try:
with open(filepath, 'r', encoding='utf-8') as f:
data = json.load(f)
except json.JSONDecodeError as e:
self.record_skipped_file(filepath, machine_number, fhash, 'json_error')
return {'success': False, 'error': f'JSON parse error: {e}', 'skip_silent': True}
except Exception as e:
self.record_skipped_file(filepath, machine_number, fhash, 'read_error')
return {'success': False, 'error': f'File read error: {e}', 'skip_silent': True}
if not isinstance(data, list) or len(data) == 0:
self.record_skipped_file(filepath, machine_number, fhash, 'invalid_json')
return {'success': False, 'error': 'Invalid JSON structure (expected non-empty array)', 'skip_silent': True}
# Extract header
header = None
end_data = None
item_crossings = [] # Store full crossing data with measurements
for item in data:
if 'HeaderValues' in item:
header = item['HeaderValues']
elif 'End' in item:
end_data = item['End']
elif 'ItemCrossing' in item:
crossing = item['ItemCrossing']
crossing_time = crossing.get('TimeDate')
crossing_desc = crossing.get('Description', '')
item_no = crossing.get('ItemNo')
# Extract measurements, header updates, and violations from Items
measurements_in_crossing = []
header_updates_in_crossing = []
violations_in_crossing = []
if 'Items' in crossing and crossing['Items']:
for measure_item in crossing['Items']:
if 'DataInput' in measure_item:
measurements_in_crossing.append(measure_item['DataInput'])
elif 'HeaderUpdate' in measure_item:
header_updates_in_crossing.append(measure_item['HeaderUpdate'])
elif 'ItemViolation' in measure_item:
violations_in_crossing.append(measure_item['ItemViolation'])
item_crossings.append({
'time': crossing_time,
'description': crossing_desc,
'item_no': item_no,
'measurements': measurements_in_crossing,
'header_updates': header_updates_in_crossing,
'violations': violations_in_crossing
})
if not header:
# Files without HeaderValues are tool setup/calibration runs, not part runs
# Record in udcclmfiles so we don't re-check every run
self.record_skipped_file(filepath, machine_number, fhash, 'no_header')
return {'success': False, 'error': 'No HeaderValues (tool setup file)', 'skip_silent': True}
# Parse timestamps
start_time = parse_clm_timestamp(header.get('TimeDate'))
end_time = parse_clm_timestamp(end_data.get('TimeDate')) if end_data else None
if not start_time:
self.record_skipped_file(filepath, machine_number, fhash, 'no_timestamp')
return {'success': False, 'error': 'Could not parse start timestamp', 'skip_silent': True}
# Get or create session
session_id = self.get_or_create_session(machine_number, start_time)
# Create part run (pass machine_number for changeover calculation)
partrun_id = self.create_part_run(session_id, header, start_time, end_time, machine_number)
# Counters
measurement_count = 0
manual_count = 0
probe_count = 0
oot_count = 0
record_count = 0
manual_request_count = 0
header_update_count = 0
violation_count = 0
# Process item crossings with their measurements
for crossing in item_crossings:
crossing_time = parse_clm_timestamp(crossing['time'])
item_no = crossing['item_no']
crossing_desc = crossing['description']
# Add the crossing event
self.add_event(partrun_id, session_id, crossing_time, 'ITEM-CROSSING',
item_no, crossing_desc)
record_count += 1
# Process measurements in this crossing
for m in crossing['measurements']:
event_time = parse_clm_timestamp(m.get('TimeDate'))
method = m.get('Method', '').upper()
dimid = m.get('DimID', '')
description = m.get('Description', '')
seq = int(m.get('Sequence', 0)) if m.get('Sequence') else 0
minval = parse_numeric(m.get('Min'))
maxval = parse_numeric(m.get('Max'))
actualval = parse_numeric(m.get('Actual'))
deviation = parse_numeric(m.get('Deviation'))
oot = 1 if m.get('OutOfTolerance', 'N') == 'Y' else 0
self.add_measurement(partrun_id, session_id, event_time, 'PROCESSDATA',
method, dimid, description, seq, minval, maxval,
actualval, deviation, oot)
measurement_count += 1
# For MANUAL measurements, record timing (crossing_time = prompt, event_time = response)
if method == 'MANUAL' and crossing_time and event_time:
self.add_manual_request(partrun_id, crossing_time, event_time, description)
manual_request_count += 1
if method == 'MANUAL':
manual_count += 1
elif method == 'PROBE':
probe_count += 1
if oot:
oot_count += 1
record_count += 1
# Process header updates (badge changes)
for hu in crossing.get('header_updates', []):
hu_time = parse_clm_timestamp(hu.get('TimeDate'))
self.add_header_update(
partrun_id, session_id, machine_number, hu_time,
hu.get('Details', ''), hu.get('Description', ''),
hu.get('BadgeNumber', '')
)
header_update_count += 1
# Process violations (parameter changes outside limits)
for v in crossing.get('violations', []):
v_time = parse_clm_timestamp(v.get('TimeDate'))
self.add_violation(
partrun_id, session_id, machine_number, v_time,
parse_numeric(v.get('Previous')),
parse_numeric(v.get('Current')),
v.get('BadgeNumber', ''),
item_no,
crossing_desc
)
violation_count += 1
# Flush remaining batches
self.flush_measurements()
self.flush_events()
self.flush_header_updates()
self.flush_violations()
# Update part run counts
self.update_part_run_counts(partrun_id, measurement_count, manual_count,
probe_count, oot_count)
# Record successful import
self.record_import(filepath, machine_number, fhash, partrun_id, session_id, record_count)
print(f" Imported: {measurement_count} measurements, {len(item_crossings)} events, "
f"{oot_count} OOT, {manual_request_count} manual timings, "
f"{header_update_count} badge changes, {violation_count} violations")
return {
'success': True,
'records': record_count,
'measurements': measurement_count,
'oot_count': oot_count,
'header_updates': header_update_count,
'violations': violation_count,
'partrun_id': partrun_id,
'session_id': session_id
}
def parse_machine_folder(self, machine_path: str, known_files: dict = None) -> Dict[str, Any]:
"""Parse all JSON files for a single machine"""
data_path = os.path.join(machine_path, 'Data')
if not os.path.isdir(data_path):
return {'success': False, 'error': f'No Data folder in {machine_path}'}
files = glob.glob(os.path.join(data_path, '*.json'))
results = {
'total_files': len(files),
'imported': 0,
'updated': 0,
'skipped': 0,
'skipped_old': 0,
'in_progress': 0,
'tool_setup': 0,
'errors': 0,
'total_records': 0,
'total_measurements': 0,
'total_oot': 0
}
machine_num = os.path.basename(machine_path)
print(f"\nProcessing machine {machine_num}: {len(files)} files")
for filepath in sorted(files):
result = self.parse_file(filepath, known_files)
if result['success']:
results['imported'] += 1
results['total_records'] += result.get('records', 0)
results['total_measurements'] += result.get('measurements', 0)
results['total_oot'] += result.get('oot_count', 0)
elif result.get('in_progress'):
results['in_progress'] += 1
elif result.get('skip_silent'):
# Silent skips: unchanged files, tool setup files, or old files
if result.get('too_old'):
results['skipped_old'] += 1
elif 'tool setup' in result.get('error', '').lower():
results['tool_setup'] += 1
else:
results['skipped'] += 1 # Unchanged files
elif 'already imported' in result.get('error', '').lower():
results['skipped'] += 1
else:
results['errors'] += 1
print(f" Error: {result.get('error')}")
return results
def ensure_active_sessions_table(self):
"""Create active sessions tracking table if not exists"""
self.cursor.execute("""
CREATE TABLE IF NOT EXISTS udcactivesessions (
machinenumber VARCHAR(20) PRIMARY KEY,
filepath VARCHAR(255),
partnumber VARCHAR(50),
badgenumber VARCHAR(20),
partsrun INT DEFAULT 0,
lastupdate DATETIME,
sessionstart DATETIME,
lastseen DATETIME DEFAULT CURRENT_TIMESTAMP
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
""")
self.conn.commit()
def parse_active_file(self, filepath: str) -> Optional[Dict[str, Any]]:
"""Parse an incomplete file to get current state (without importing)"""
try:
with open(filepath, 'r', encoding='utf-8') as f:
data = json.load(f)
except (json.JSONDecodeError, OSError):
return None
if not isinstance(data, list) or len(data) == 0:
return None
# Extract machine number from path
parent_dir = os.path.basename(os.path.dirname(os.path.dirname(filepath)))
machine_number = parent_dir if parent_dir.isdigit() else None
if not machine_number:
parts = filepath.split(os.sep)
for part in parts:
if part.isdigit() and len(part) == 4:
machine_number = part
break
if not machine_number:
return None
# Extract header and count parts
header = None
parts_run = 0
last_badge = None
last_part = None
first_time = None
last_time = None
for item in data:
if 'HeaderValues' in item:
header = item['HeaderValues']
hdr_time = parse_clm_timestamp(header.get('TimeDate'))
if hdr_time:
if not first_time or hdr_time < first_time:
first_time = hdr_time
if not last_time or hdr_time > last_time:
last_time = hdr_time
# Initial badge/part from header
if header.get('BadgeNumber'):
last_badge = header.get('BadgeNumber', '').strip()
if header.get('PartNum'):
last_part = header.get('PartNum', '').strip()
elif 'Part' in item:
# New part started
parts_run += 1
part_data = item['Part']
if part_data.get('BadgeNumber'):
last_badge = part_data.get('BadgeNumber', '').strip()
if part_data.get('PartNum'):
last_part = part_data.get('PartNum', '').strip()
part_time = parse_clm_timestamp(part_data.get('TimeDate'))
if part_time:
if not first_time or part_time < first_time:
first_time = part_time
if not last_time or part_time > last_time:
last_time = part_time
elif 'ItemCrossing' in item:
crossing = item['ItemCrossing']
crossing_time = parse_clm_timestamp(crossing.get('TimeDate'))
if crossing_time:
if not last_time or crossing_time > last_time:
last_time = crossing_time
if not header:
return None
# If no Part blocks yet, count as 1 (the header starts the first part)
if parts_run == 0:
parts_run = 1
return {
'machinenumber': machine_number,
'filepath': filepath,
'partnumber': last_part,
'badgenumber': last_badge,
'partsrun': parts_run,
'sessionstart': first_time,
'lastupdate': last_time
}
def _scan_machine_folder(self, machine_path: str, cutoff_date: datetime) -> Dict[str, Any]:
"""Scan a single machine folder for incomplete files. Used for parallel scanning."""
result = {
'files_scanned': 0,
'files_skipped_old': 0,
'incomplete_files': [] # list of (filepath, state)
}
data_path = os.path.join(machine_path, 'Data')
if not os.path.isdir(data_path):
return result
files = glob.glob(os.path.join(data_path, '*.json'))
for filepath in files:
# Quick filter by filename date (format: *_YYYY-MM-DD_*.json)
basename = os.path.basename(filepath)
parts = basename.replace('.json', '').split('_')
if len(parts) >= 4:
try:
file_date = datetime.strptime(parts[-2], '%Y-%m-%d')
if file_date < cutoff_date:
result['files_skipped_old'] += 1
continue
except ValueError:
pass # Can't parse date, scan anyway
result['files_scanned'] += 1
if not is_file_complete(filepath):
state = self.parse_active_file(filepath)
if state and state['lastupdate']:
result['incomplete_files'].append((filepath, state))
return result
def update_active_sessions(self, clm_directory: str, verbose: bool = True, max_age_days: int = 7, max_workers: int = 8) -> Dict[str, Any]:
"""Scan for incomplete files and update active sessions table.
Only tracks the MOST RECENT incomplete file per machine.
Older incomplete files are considered stale/abandoned.
Uses filename date to skip files older than max_age_days without reading them.
Uses parallel scanning with max_workers threads.
"""
self.ensure_active_sessions_table()
results = {
'active_machines': 0,
'updated': 0,
'cleared': 0,
'stale_skipped': 0,
'files_scanned': 0,
'files_skipped_old': 0
}
# Calculate cutoff date
cutoff_date = datetime.now() - timedelta(days=max_age_days)
# Collect all incomplete files per machine, then pick the newest
machine_incomplete_files = {} # machine -> list of (filepath, state)
# Find all machine folders
machine_folders = [os.path.join(clm_directory, item) for item in os.listdir(clm_directory)
if os.path.isdir(os.path.join(clm_directory, item)) and item.isdigit()]
if verbose:
print(f" Scanning {len(machine_folders)} machines with {max_workers} threads...")
# Parallel scan
completed = 0
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = {executor.submit(self._scan_machine_folder, path, cutoff_date): path for path in machine_folders}
for future in as_completed(futures):
completed += 1
if verbose and completed % 10 == 0:
print(f" Completed {completed}/{len(machine_folders)} machines...", end='\r')
scan_result = future.result()
results['files_scanned'] += scan_result['files_scanned']
results['files_skipped_old'] += scan_result['files_skipped_old']
for filepath, state in scan_result['incomplete_files']:
machine = state['machinenumber']
if machine not in machine_incomplete_files:
machine_incomplete_files[machine] = []
machine_incomplete_files[machine].append((filepath, state))
# For each machine, only use the most recent incomplete file
active_machines = set()
for machine, file_list in machine_incomplete_files.items():
# Sort by lastupdate descending, pick the newest
file_list.sort(key=lambda x: x[1]['lastupdate'], reverse=True)
newest_filepath, newest_state = file_list[0]
# Count stale files (all but the newest)
results['stale_skipped'] += len(file_list) - 1
active_machines.add(machine)
# Update or insert active session with the newest file
self.cursor.execute("""
INSERT INTO udcactivesessions
(machinenumber, filepath, partnumber, badgenumber, partsrun,
lastupdate, sessionstart, lastseen)
VALUES (%s, %s, %s, %s, %s, %s, %s, NOW())
ON DUPLICATE KEY UPDATE
filepath = VALUES(filepath),
partnumber = VALUES(partnumber),
badgenumber = VALUES(badgenumber),
partsrun = VALUES(partsrun),
lastupdate = VALUES(lastupdate),
sessionstart = VALUES(sessionstart),
lastseen = NOW()
""", (
newest_state['machinenumber'],
os.path.basename(newest_state['filepath']),
newest_state['partnumber'],
newest_state['badgenumber'],
newest_state['partsrun'],
newest_state['lastupdate'],
newest_state['sessionstart']
))
results['updated'] += 1
self.conn.commit()
results['active_machines'] = len(active_machines)
if verbose:
print(f" Scanned {results['files_scanned']} files across {len(machine_folders)} machines" + " " * 20)
# Clear machines that are no longer active (not seen in this scan)
if active_machines:
placeholders = ','.join(['%s'] * len(active_machines))
self.cursor.execute(f"""
DELETE FROM udcactivesessions
WHERE machinenumber NOT IN ({placeholders})
""", tuple(active_machines))
else:
# No active machines, clear entire table
self.cursor.execute("DELETE FROM udcactivesessions")
results['cleared'] = self.cursor.rowcount
self.conn.commit()
return results
def parse_all_machines(self, clm_directory: str) -> Dict[str, Any]:
"""Parse all machine folders in CLM_Data directory"""
results = {
'machines_processed': 0,
'total_files': 0,
'imported': 0,
'skipped': 0,
'skipped_old': 0,
'in_progress': 0,
'tool_setup': 0,
'errors': 0,
'total_records': 0,
'total_measurements': 0,
'total_oot': 0
}
# Load all known files into memory for fast lookup (one query instead of per-file)
print("Loading known files from database...")
known_files = self.load_known_files()
print(f" {len(known_files)} files already tracked")
# Find all machine folders (directories with numeric names)
machine_folders = []
for item in os.listdir(clm_directory):
item_path = os.path.join(clm_directory, item)
if os.path.isdir(item_path) and item.isdigit():
machine_folders.append(item_path)
print(f"Found {len(machine_folders)} machine folders")
for machine_path in sorted(machine_folders):
machine_results = self.parse_machine_folder(machine_path, known_files)
if machine_results.get('success', True): # Allow partial success
results['machines_processed'] += 1
results['total_files'] += machine_results.get('total_files', 0)
results['imported'] += machine_results.get('imported', 0)
results['skipped'] += machine_results.get('skipped', 0)
results['skipped_old'] += machine_results.get('skipped_old', 0)
results['in_progress'] += machine_results.get('in_progress', 0)
results['tool_setup'] += machine_results.get('tool_setup', 0)
results['errors'] += machine_results.get('errors', 0)
results['total_records'] += machine_results.get('total_records', 0)
results['total_measurements'] += machine_results.get('total_measurements', 0)
results['total_oot'] += machine_results.get('total_oot', 0)
return results
def main():
parser = argparse.ArgumentParser(description='Parse CLM_Data JSON files and import to database')
parser.add_argument('--dir', default=CLM_DATA_PATH, help='CLM_Data directory')
parser.add_argument('--file', help='Parse a specific JSON file')
parser.add_argument('--machine', help='Parse only a specific machine folder')
parser.add_argument('--active-only', action='store_true', help='Only update active sessions (skip imports)')
parser.add_argument('--max-age', type=int, default=90, help='Max file age in days (default: 90, 0=no limit)')
parser.add_argument('--host', default=DB_CONFIG['host'], help='MySQL host')
parser.add_argument('--port', type=int, default=DB_CONFIG['port'], help='MySQL port')
parser.add_argument('--user', default=DB_CONFIG['user'], help='MySQL user')
parser.add_argument('--password', default=DB_CONFIG['password'], help='MySQL password')
parser.add_argument('--database', default=DB_CONFIG['database'], help='MySQL database')
args = parser.parse_args()
# Build config from args
db_config = {
'host': args.host,
'port': args.port,
'user': args.user,
'password': args.password,
'database': args.database
}
# Max age (0 = no limit)
max_age = args.max_age if args.max_age > 0 else None
# Create parser and connect
clm_parser = CLMParser(db_config, max_age_days=max_age)
if not clm_parser.connect():
sys.exit(1)
try:
# Ensure tracking table exists
clm_parser.ensure_clm_tracking_table()
if args.active_only:
# Only update active sessions, skip imports
print("Updating active sessions...")
active_results = clm_parser.update_active_sessions(args.dir)
print(f"\n{'='*50}")
print(f"Active Sessions Summary:")
print(f" Active machines: {active_results['active_machines']}")
print(f" Sessions updated: {active_results['updated']}")
print(f" Stale files skipped: {active_results['stale_skipped']}")
print(f" Old sessions cleared: {active_results['cleared']}")
elif args.file:
# Parse single file
result = clm_parser.parse_file(args.file)
if result['success']:
print(f"\nSuccessfully imported {result.get('measurements', 0)} measurements")
else:
print(f"\nFailed: {result.get('error')}")
sys.exit(1)
elif args.machine:
# Parse specific machine
machine_path = os.path.join(args.dir, args.machine)
if not os.path.isdir(machine_path):
print(f"Machine folder not found: {machine_path}")
sys.exit(1)
# Load known files for fast lookup
print("Loading known files from database...")
known_files = clm_parser.load_known_files()
print(f" {len(known_files)} files already tracked")
results = clm_parser.parse_machine_folder(machine_path, known_files)
print(f"\n{'='*50}")
print(f"Machine {args.machine} Import Summary:")
print(f" Total files found: {results.get('total_files', 0)}")
print(f" Files imported: {results.get('imported', 0)}")
print(f" Files skipped: {results.get('skipped', 0)}")
print(f" Skipped (too old): {results.get('skipped_old', 0)}")
print(f" Jobs in progress: {results.get('in_progress', 0)}")
print(f" Files with errors: {results.get('errors', 0)}")
print(f" Total measurements: {results.get('total_measurements', 0)}")
print(f" Total OOT: {results.get('total_oot', 0)}")
else:
# Parse all machines
if max_age:
print(f"Max file age: {max_age} days")
results = clm_parser.parse_all_machines(args.dir)
print(f"\n{'='*50}")
print(f"CLM Data Import Summary:")
print(f" Machines processed: {results['machines_processed']}")
print(f" Total files found: {results['total_files']}")
print(f" Files imported: {results['imported']}")
print(f" Files skipped: {results['skipped']}")
print(f" Skipped (too old): {results['skipped_old']}")
print(f" Jobs in progress: {results['in_progress']}")
print(f" Files with errors: {results['errors']}")
print(f" Total measurements: {results['total_measurements']}")
print(f" Total OOT: {results['total_oot']}")
# Also update active sessions
print(f"\nUpdating active sessions...")
active_results = clm_parser.update_active_sessions(args.dir)
print(f" Active machines: {active_results['active_machines']}")
finally:
clm_parser.disconnect()
if __name__ == '__main__':
main()