#!/usr/bin/env python3 """ CLM Data Parser Parses CLM_Data JSON files (authoritative UDC data source) and imports to ShopDB. Usage: python clmparser.py [--dir /path/to/CLM_Data] [--file specific_file.json] [--machine 3101] CLM_Data Structure: /CLM_Data/{MachineNumber}/Data/{PartNum}_{Oper}_{Serial}_{Date}_{Time}.json JSON Structure: - HeaderValues: Part info, badge, machine, timestamps - ItemCrossing: Program steps with DataInput measurements - End: Part run completion Production Path (Windows): S:\\SPC\\UDC\\CLM_Data\\*Machine Number*\\Data\\ IMPORTANT: Production only READS from this location, NEVER modifies or deletes. """ import json import os import sys import glob import re import argparse import hashlib from datetime import datetime, timedelta from typing import Optional, Dict, List, Any from concurrent.futures import ThreadPoolExecutor, as_completed import mysql.connector from mysql.connector import Error # Import config try: from config import DB_CONFIG, CLM_DATA_PATH, BATCH_SIZE except ImportError: import platform IS_WINDOWS = platform.system() == 'Windows' DB_CONFIG = { 'host': '127.0.0.1', 'port': 3306, 'user': 'root', 'password': 'rootpassword', 'database': 'shopdb' } CLM_DATA_PATH = r'S:\SPC\UDC\CLM_Data' if IS_WINDOWS else '/home/camp/projects/UDC/CLM_Data' BATCH_SIZE = 1000 def parse_clm_timestamp(ts_str: str) -> Optional[datetime]: """Parse CLM timestamp: MM/DD/YYYY HH:MM:SS""" if not ts_str: return None try: return datetime.strptime(ts_str, '%m/%d/%Y %H:%M:%S') except (ValueError, TypeError): return None def parse_numeric(val) -> Optional[float]: """Parse numeric value from JSON (already numeric or string)""" if val is None: return None try: return float(val) except (ValueError, TypeError): return None def file_hash(filepath: str) -> Optional[str]: """Generate MD5 hash of file for duplicate detection""" try: hash_md5 = hashlib.md5() with open(filepath, 'rb') as f: for chunk in iter(lambda: f.read(4096), b''): hash_md5.update(chunk) return hash_md5.hexdigest() except OSError as e: print(f" Warning: Could not hash file {filepath}: {e}") return None def is_file_complete(filepath: str) -> bool: """Check if CLM JSON file has an End block (indicates job is complete)""" try: with open(filepath, 'r', encoding='utf-8') as f: data = json.load(f) if not isinstance(data, list): return False for item in data: if 'End' in item: return True return False except (json.JSONDecodeError, OSError): return False class CLMParser: """Parser for CLM_Data JSON files""" def __init__(self, db_config: dict, max_age_days: int = None): self.db_config = db_config self.conn = None self.cursor = None self.measurements_batch = [] self.events_batch = [] self.manual_requests_batch = [] self.header_updates_batch = [] self.violations_batch = [] self.max_age_days = max_age_days # None = no limit self.cutoff_date = None if max_age_days: self.cutoff_date = (datetime.now() - timedelta(days=max_age_days)).date() def get_file_date(self, filename: str) -> Optional[datetime]: """Extract date from CLM filename: {Part}_{Oper}_{Serial}_{YYYY-MM-DD}_{HH-MM-SS}.json""" match = re.search(r'(\d{4}-\d{2}-\d{2})_(\d{2}-\d{2}-\d{2})\.json$', filename) if match: try: date_str = match.group(1) time_str = match.group(2).replace('-', ':') return datetime.strptime(f"{date_str} {time_str}", '%Y-%m-%d %H:%M:%S') except ValueError: return None return None def is_file_too_old(self, filename: str) -> bool: """Check if file date is older than max_age_days""" if not self.cutoff_date: return False file_date = self.get_file_date(filename) if file_date: return file_date.date() < self.cutoff_date return False # If can't parse date, don't skip def connect(self): """Connect to MySQL database""" try: self.conn = mysql.connector.connect(**self.db_config) self.cursor = self.conn.cursor(dictionary=True) print(f"Connected to MySQL at {self.db_config['host']}") return True except Error as e: print(f"Error connecting to MySQL: {e}") return False def disconnect(self): """Disconnect from database""" if self.cursor: self.cursor.close() if self.conn: self.conn.close() def ensure_clm_tracking_table(self): """Create tracking table for CLM imports if not exists""" self.cursor.execute(""" CREATE TABLE IF NOT EXISTS udcclmfiles ( fileid INT AUTO_INCREMENT PRIMARY KEY, filename VARCHAR(180) NOT NULL, machinenumber VARCHAR(10), filehash VARCHAR(32), partrunid INT, sessionid INT, recordcount INT DEFAULT 0, importdate DATETIME DEFAULT CURRENT_TIMESTAMP, UNIQUE KEY uk_filename (filename), UNIQUE KEY uk_filehash (filehash), INDEX idx_machine (machinenumber), INDEX idx_importdate (importdate) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 """) self.conn.commit() def load_known_files(self) -> dict: """Load all known files into memory for fast lookup (avoids per-file DB queries)""" self.cursor.execute(""" SELECT filename, filehash, filemodtime, partrunid, recordcount FROM udcclmfiles """) known_files = {} for row in self.cursor.fetchall(): known_files[row['filename']] = { 'filehash': row['filehash'], 'filemodtime': row['filemodtime'], 'partrunid': row['partrunid'], 'recordcount': row['recordcount'] } return known_files def get_import_status(self, filename: str, filehash: str = None) -> dict: """Check import status of a CLM file Returns: {'imported': bool, 'hash_changed': bool, 'partrunid': int or None, 'stored_hash': str or None, 'filemodtime': datetime or None} """ basename = os.path.basename(filename) result = {'imported': False, 'hash_changed': False, 'partrunid': None, 'stored_hash': None, 'filemodtime': None} # Check by filename self.cursor.execute( "SELECT fileid, filehash, partrunid, filemodtime FROM udcclmfiles WHERE filename = %s", (basename,) ) row = self.cursor.fetchone() if row: result['imported'] = True result['stored_hash'] = row['filehash'] result['partrunid'] = row['partrunid'] result['filemodtime'] = row['filemodtime'] if filehash and row['filehash'] != filehash: result['hash_changed'] = True return result # Check by hash if provided (different filename, same content) if filehash: self.cursor.execute( "SELECT fileid, partrunid FROM udcclmfiles WHERE filehash = %s", (filehash,) ) row = self.cursor.fetchone() if row: result['imported'] = True result['partrunid'] = row['partrunid'] return result return result def quick_check_imported(self, filepath: str, known_files: dict = None) -> dict: """Fast check if file needs processing (no hash computation). Uses file modification time to skip unchanged files. If known_files dict is provided, uses in-memory lookup (much faster). Returns: {'skip': bool, 'reason': str, 'needs_hash_check': bool, 'partrunid': int or None} """ basename = os.path.basename(filepath) # Use in-memory cache if provided (much faster than DB query per file) if known_files is not None: if basename not in known_files: return {'skip': False, 'reason': 'new', 'needs_hash_check': False, 'partrunid': None} file_info = known_files[basename] stored_modtime = file_info['filemodtime'] # If we have a stored modtime, skip without checking current file modtime # (avoids slow network stat calls - files rarely change after creation) if stored_modtime: return {'skip': True, 'reason': 'unchanged', 'needs_hash_check': False, 'partrunid': file_info['partrunid']} # No stored modtime - need to check current modtime and maybe update try: file_mtime = datetime.fromtimestamp(os.path.getmtime(filepath)) except OSError: return {'skip': False, 'reason': 'cannot_read', 'needs_hash_check': False, 'partrunid': None} # File exists in DB but no modtime stored - check hash to be safe return {'skip': False, 'reason': 'no_modtime', 'needs_hash_check': True, 'partrunid': file_info['partrunid']} # No cache - get file modification time try: file_mtime = datetime.fromtimestamp(os.path.getmtime(filepath)) except OSError: return {'skip': False, 'reason': 'cannot_read', 'needs_hash_check': False, 'partrunid': None} # Fallback to DB query if no cache provided self.cursor.execute( "SELECT fileid, filehash, partrunid, filemodtime FROM udcclmfiles WHERE filename = %s", (basename,) ) row = self.cursor.fetchone() if not row: return {'skip': False, 'reason': 'new', 'needs_hash_check': False, 'partrunid': None} stored_modtime = row['filemodtime'] if stored_modtime and file_mtime <= stored_modtime: return {'skip': True, 'reason': 'unchanged', 'needs_hash_check': False, 'partrunid': row['partrunid']} return {'skip': False, 'reason': 'modified', 'needs_hash_check': True, 'partrunid': row['partrunid']} def is_file_imported(self, filename: str, filehash: str = None) -> bool: """Check if a CLM file has already been imported (simple boolean check)""" status = self.get_import_status(filename, filehash) return status['imported'] and not status['hash_changed'] def delete_partrun(self, partrun_id: int): """Delete a part run and all associated records for re-import""" # Delete in order due to foreign key constraints self.cursor.execute("DELETE FROM udcmanualrequests WHERE partrunid = %s", (partrun_id,)) self.cursor.execute("DELETE FROM udcmeasurements WHERE partrunid = %s", (partrun_id,)) self.cursor.execute("DELETE FROM udcevents WHERE partrunid = %s", (partrun_id,)) self.cursor.execute("DELETE FROM udcparts WHERE partrunid = %s", (partrun_id,)) self.cursor.execute("DELETE FROM udcclmfiles WHERE partrunid = %s", (partrun_id,)) self.conn.commit() def record_import(self, filepath: str, machine_number: str, filehash: str, partrun_id: int, session_id: int, record_count: int): """Record successful import of a CLM file""" basename = os.path.basename(filepath) # Get file modification time for fast skip checks on future runs try: file_mtime = datetime.fromtimestamp(os.path.getmtime(filepath)) except OSError: file_mtime = None self.cursor.execute(""" INSERT INTO udcclmfiles (filename, machinenumber, filehash, filemodtime, partrunid, sessionid, recordcount) VALUES (%s, %s, %s, %s, %s, %s, %s) ON DUPLICATE KEY UPDATE partrunid = VALUES(partrunid), sessionid = VALUES(sessionid), recordcount = VALUES(recordcount), filemodtime = VALUES(filemodtime), importdate = CURRENT_TIMESTAMP """, (basename, machine_number, filehash, file_mtime, partrun_id, session_id, record_count)) self.conn.commit() def record_skipped_file(self, filepath: str, machine_number: str, filehash: str, skip_reason: str): """Record a skipped file (tool setup, incomplete) so it's not re-checked every run""" basename = os.path.basename(filepath) try: file_mtime = datetime.fromtimestamp(os.path.getmtime(filepath)) except OSError: file_mtime = None # Use recordcount = -1 to indicate skipped file, store reason in a comment-like way self.cursor.execute(""" INSERT INTO udcclmfiles (filename, machinenumber, filehash, filemodtime, partrunid, sessionid, recordcount) VALUES (%s, %s, %s, %s, NULL, NULL, -1) ON DUPLICATE KEY UPDATE filemodtime = VALUES(filemodtime), importdate = CURRENT_TIMESTAMP """, (basename, machine_number, filehash, file_mtime)) self.conn.commit() def get_or_create_session(self, machine_number: str, start_time: datetime) -> int: """Get or create a session for the machine and date""" # For CLM data, we create one session per machine per day session_date = start_time.date() session_name = f"CLM_{machine_number}_{session_date.isoformat()}" # Check if session exists self.cursor.execute( """SELECT sessionid FROM udcsessions WHERE machinenumber = %s AND logfilename = %s""", (machine_number, session_name) ) row = self.cursor.fetchone() if row: return row['sessionid'] # Create new session self.cursor.execute( """INSERT INTO udcsessions (machinenumber, logfilename, sessionstart, dateadded) VALUES (%s, %s, %s, NOW())""", (machine_number, session_name, start_time) ) self.conn.commit() return self.cursor.lastrowid def create_part_run(self, session_id: int, header: dict, start_time: datetime, end_time: datetime = None, machine_number: str = None) -> int: """Create a part run record from CLM header data""" cycletime = None if start_time and end_time: cycletime = int((end_time - start_time).total_seconds()) # Calculate changeover time (time since previous part ended on this machine) changeover = None if start_time and machine_number: self.cursor.execute( """SELECT MAX(p.programend) as prev_end FROM udcparts p JOIN udcsessions s ON p.sessionid = s.sessionid WHERE s.machinenumber = %s AND p.programend < %s AND p.programend IS NOT NULL""", (machine_number, start_time) ) row = self.cursor.fetchone() if row and row['prev_end']: changeover_secs = int((start_time - row['prev_end']).total_seconds()) # Only set if reasonable (> 0 and < 24 hours) if 0 < changeover_secs < 86400: changeover = changeover_secs # Strip whitespace and truncate strings to fit column sizes partnumber = (header.get('PartNum') or '').strip()[:50] opernumber = (header.get('PartOper') or '').strip()[:20] serialnumber = (header.get('SerialNum') or '').strip()[:50] programname = (header.get('ProgName') or '').strip()[:100] jobnumber = (header.get('ProgNum') or '').strip()[:50] badgenumber = (header.get('BadgeNumber') or '').strip()[:50] self.cursor.execute( """INSERT INTO udcparts (sessionid, partnumber, opernumber, serialnumber, programname, jobnumber, badgenumber, programstart, programend, cycletime, changeover, dateadded) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, NOW())""", ( session_id, partnumber or None, opernumber or None, serialnumber or None, programname or None, jobnumber or None, badgenumber or None, start_time, end_time, cycletime, changeover ) ) self.conn.commit() return self.cursor.lastrowid def update_part_run_counts(self, partrun_id: int, measurement_count: int, manual_count: int, probe_count: int, oot_count: int): """Update part run with measurement counts""" self.cursor.execute( """UPDATE udcparts SET measurementcount = %s, manualcount = %s, probecount = %s, ootcount = %s WHERE partrunid = %s""", (measurement_count, manual_count, probe_count, oot_count, partrun_id) ) self.conn.commit() def add_measurement(self, partrun_id: int, session_id: int, event_time: datetime, event_type: str, method: str, dimid: str, description: str, seq: int, minval: float, maxval: float, actualval: float, deviation: float, oot: int): """Add a measurement to the batch""" self.measurements_batch.append(( partrun_id, session_id, event_time, event_type, method, dimid, description[:255] if description else None, seq, minval, maxval, actualval, deviation, oot )) if len(self.measurements_batch) >= BATCH_SIZE: self.flush_measurements() def add_event(self, partrun_id: int, session_id: int, event_time: datetime, event_type: str, item_number: str, description: str): """Add an event to the batch""" self.events_batch.append(( partrun_id, session_id, event_time, event_type, item_number, description[:500] if description else None )) if len(self.events_batch) >= BATCH_SIZE: self.flush_events() def add_manual_request(self, partrun_id: int, request_time: datetime, response_time: datetime, description: str): """Add a manual request timing record""" response_seconds = None if request_time and response_time: response_seconds = int((response_time - request_time).total_seconds()) self.cursor.execute( """INSERT INTO udcmanualrequests (partrunid, requesttime, responsetime, responseseconds, description) VALUES (%s, %s, %s, %s, %s)""", ( partrun_id, request_time, response_time, response_seconds, description[:255] if description else None ) ) self.conn.commit() return response_seconds def flush_measurements(self): """Flush measurement batch to database""" if not self.measurements_batch: return self.cursor.executemany( """INSERT INTO udcmeasurements (partrunid, sessionid, eventtime, eventtype, method, dimid, description, seqnumber, minval, maxval, actualval, deviation, oot) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)""", self.measurements_batch ) self.conn.commit() self.measurements_batch = [] def flush_events(self): """Flush event batch to database""" if not self.events_batch: return self.cursor.executemany( """INSERT INTO udcevents (partrunid, sessionid, eventtime, eventtype, itemnumber, description) VALUES (%s, %s, %s, %s, %s, %s)""", self.events_batch ) self.conn.commit() self.events_batch = [] def add_header_update(self, partrun_id: int, session_id: int, machine_number: str, event_time: datetime, details: str, description: str, badge: str): """Add a header update (badge change) to the batch""" # Strip whitespace and truncate badge clean_badge = badge.strip()[:20] if badge else None self.header_updates_batch.append(( partrun_id, session_id, machine_number, event_time, details[:255] if details else None, description[:255] if description else None, clean_badge )) if len(self.header_updates_batch) >= BATCH_SIZE: self.flush_header_updates() def flush_header_updates(self): """Flush header updates batch to database""" if not self.header_updates_batch: return self.cursor.executemany( """INSERT INTO udcheaderupdates (partrunid, sessionid, machinenumber, eventtime, details, description, badgenumber) VALUES (%s, %s, %s, %s, %s, %s, %s)""", self.header_updates_batch ) self.conn.commit() self.header_updates_batch = [] def add_violation(self, partrun_id: int, session_id: int, machine_number: str, event_time: datetime, previous_val: float, current_val: float, badge: str, item_no: str, crossing_desc: str): """Add a parameter violation to the batch""" # Strip whitespace and truncate badge clean_badge = badge.strip()[:20] if badge else None self.violations_batch.append(( partrun_id, session_id, machine_number, event_time, previous_val, current_val, clean_badge, item_no[:20] if item_no else None, crossing_desc[:255] if crossing_desc else None )) if len(self.violations_batch) >= BATCH_SIZE: self.flush_violations() def flush_violations(self): """Flush violations batch to database""" if not self.violations_batch: return self.cursor.executemany( """INSERT INTO udcviolations (partrunid, sessionid, machinenumber, eventtime, previousval, currentval, badgenumber, itemno, crossingdesc) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)""", self.violations_batch ) self.conn.commit() self.violations_batch = [] def parse_file(self, filepath: str, known_files: dict = None) -> Dict[str, Any]: """Parse a single CLM JSON file""" filename = os.path.basename(filepath) # Check if file is too old (based on filename date) if self.is_file_too_old(filename): return {'success': False, 'error': f'File too old (>{self.max_age_days} days): {filename}', 'skip_silent': True, 'too_old': True} # Extract machine number from parent folder parent_dir = os.path.basename(os.path.dirname(os.path.dirname(filepath))) machine_number = parent_dir if parent_dir.isdigit() else None if not machine_number: # Try to extract from file path parts = filepath.split(os.sep) for part in parts: if part.isdigit() and len(part) == 4: machine_number = part break if not machine_number: return {'success': False, 'error': f'Could not determine machine number for {filename}'} # Quick check by filename + modification time (avoids reading file for hash) quick_status = self.quick_check_imported(filepath, known_files) if quick_status['skip']: return {'success': False, 'error': f'File unchanged: {filename}', 'skip_silent': True} # Only compute hash if file is new or modification time changed fhash = file_hash(filepath) if fhash is None: return {'success': False, 'error': f'Could not read file: {filename}'} # Full import status check with hash import_status = self.get_import_status(filepath, fhash) file_complete = is_file_complete(filepath) if import_status['imported']: if import_status['hash_changed']: # File was updated since last import if file_complete: # Job is now complete - delete old records and re-import print(f"Re-importing completed file: {filename}") self.delete_partrun(import_status['partrunid']) else: # Still incomplete, skip for now return {'success': False, 'error': f'File updated but still incomplete: {filename}', 'in_progress': True} else: # Hash unchanged - already fully imported (update modtime for future fast skips) return {'success': False, 'error': f'File already imported: {filename}'} print(f"Parsing {filename} (machine {machine_number})...") try: with open(filepath, 'r', encoding='utf-8') as f: data = json.load(f) except json.JSONDecodeError as e: self.record_skipped_file(filepath, machine_number, fhash, 'json_error') return {'success': False, 'error': f'JSON parse error: {e}', 'skip_silent': True} except Exception as e: self.record_skipped_file(filepath, machine_number, fhash, 'read_error') return {'success': False, 'error': f'File read error: {e}', 'skip_silent': True} if not isinstance(data, list) or len(data) == 0: self.record_skipped_file(filepath, machine_number, fhash, 'invalid_json') return {'success': False, 'error': 'Invalid JSON structure (expected non-empty array)', 'skip_silent': True} # Extract header header = None end_data = None item_crossings = [] # Store full crossing data with measurements for item in data: if 'HeaderValues' in item: header = item['HeaderValues'] elif 'End' in item: end_data = item['End'] elif 'ItemCrossing' in item: crossing = item['ItemCrossing'] crossing_time = crossing.get('TimeDate') crossing_desc = crossing.get('Description', '') item_no = crossing.get('ItemNo') # Extract measurements, header updates, and violations from Items measurements_in_crossing = [] header_updates_in_crossing = [] violations_in_crossing = [] if 'Items' in crossing and crossing['Items']: for measure_item in crossing['Items']: if 'DataInput' in measure_item: measurements_in_crossing.append(measure_item['DataInput']) elif 'HeaderUpdate' in measure_item: header_updates_in_crossing.append(measure_item['HeaderUpdate']) elif 'ItemViolation' in measure_item: violations_in_crossing.append(measure_item['ItemViolation']) item_crossings.append({ 'time': crossing_time, 'description': crossing_desc, 'item_no': item_no, 'measurements': measurements_in_crossing, 'header_updates': header_updates_in_crossing, 'violations': violations_in_crossing }) if not header: # Files without HeaderValues are tool setup/calibration runs, not part runs # Record in udcclmfiles so we don't re-check every run self.record_skipped_file(filepath, machine_number, fhash, 'no_header') return {'success': False, 'error': 'No HeaderValues (tool setup file)', 'skip_silent': True} # Parse timestamps start_time = parse_clm_timestamp(header.get('TimeDate')) end_time = parse_clm_timestamp(end_data.get('TimeDate')) if end_data else None if not start_time: self.record_skipped_file(filepath, machine_number, fhash, 'no_timestamp') return {'success': False, 'error': 'Could not parse start timestamp', 'skip_silent': True} # Get or create session session_id = self.get_or_create_session(machine_number, start_time) # Create part run (pass machine_number for changeover calculation) partrun_id = self.create_part_run(session_id, header, start_time, end_time, machine_number) # Counters measurement_count = 0 manual_count = 0 probe_count = 0 oot_count = 0 record_count = 0 manual_request_count = 0 header_update_count = 0 violation_count = 0 # Process item crossings with their measurements for crossing in item_crossings: crossing_time = parse_clm_timestamp(crossing['time']) item_no = crossing['item_no'] crossing_desc = crossing['description'] # Add the crossing event self.add_event(partrun_id, session_id, crossing_time, 'ITEM-CROSSING', item_no, crossing_desc) record_count += 1 # Process measurements in this crossing for m in crossing['measurements']: event_time = parse_clm_timestamp(m.get('TimeDate')) method = m.get('Method', '').upper() dimid = m.get('DimID', '') description = m.get('Description', '') seq = int(m.get('Sequence', 0)) if m.get('Sequence') else 0 minval = parse_numeric(m.get('Min')) maxval = parse_numeric(m.get('Max')) actualval = parse_numeric(m.get('Actual')) deviation = parse_numeric(m.get('Deviation')) oot = 1 if m.get('OutOfTolerance', 'N') == 'Y' else 0 self.add_measurement(partrun_id, session_id, event_time, 'PROCESSDATA', method, dimid, description, seq, minval, maxval, actualval, deviation, oot) measurement_count += 1 # For MANUAL measurements, record timing (crossing_time = prompt, event_time = response) if method == 'MANUAL' and crossing_time and event_time: self.add_manual_request(partrun_id, crossing_time, event_time, description) manual_request_count += 1 if method == 'MANUAL': manual_count += 1 elif method == 'PROBE': probe_count += 1 if oot: oot_count += 1 record_count += 1 # Process header updates (badge changes) for hu in crossing.get('header_updates', []): hu_time = parse_clm_timestamp(hu.get('TimeDate')) self.add_header_update( partrun_id, session_id, machine_number, hu_time, hu.get('Details', ''), hu.get('Description', ''), hu.get('BadgeNumber', '') ) header_update_count += 1 # Process violations (parameter changes outside limits) for v in crossing.get('violations', []): v_time = parse_clm_timestamp(v.get('TimeDate')) self.add_violation( partrun_id, session_id, machine_number, v_time, parse_numeric(v.get('Previous')), parse_numeric(v.get('Current')), v.get('BadgeNumber', ''), item_no, crossing_desc ) violation_count += 1 # Flush remaining batches self.flush_measurements() self.flush_events() self.flush_header_updates() self.flush_violations() # Update part run counts self.update_part_run_counts(partrun_id, measurement_count, manual_count, probe_count, oot_count) # Record successful import self.record_import(filepath, machine_number, fhash, partrun_id, session_id, record_count) print(f" Imported: {measurement_count} measurements, {len(item_crossings)} events, " f"{oot_count} OOT, {manual_request_count} manual timings, " f"{header_update_count} badge changes, {violation_count} violations") return { 'success': True, 'records': record_count, 'measurements': measurement_count, 'oot_count': oot_count, 'header_updates': header_update_count, 'violations': violation_count, 'partrun_id': partrun_id, 'session_id': session_id } def parse_machine_folder(self, machine_path: str, known_files: dict = None) -> Dict[str, Any]: """Parse all JSON files for a single machine""" data_path = os.path.join(machine_path, 'Data') if not os.path.isdir(data_path): return {'success': False, 'error': f'No Data folder in {machine_path}'} files = glob.glob(os.path.join(data_path, '*.json')) results = { 'total_files': len(files), 'imported': 0, 'updated': 0, 'skipped': 0, 'skipped_old': 0, 'in_progress': 0, 'tool_setup': 0, 'errors': 0, 'total_records': 0, 'total_measurements': 0, 'total_oot': 0 } machine_num = os.path.basename(machine_path) print(f"\nProcessing machine {machine_num}: {len(files)} files") for filepath in sorted(files): result = self.parse_file(filepath, known_files) if result['success']: results['imported'] += 1 results['total_records'] += result.get('records', 0) results['total_measurements'] += result.get('measurements', 0) results['total_oot'] += result.get('oot_count', 0) elif result.get('in_progress'): results['in_progress'] += 1 elif result.get('skip_silent'): # Silent skips: unchanged files, tool setup files, or old files if result.get('too_old'): results['skipped_old'] += 1 elif 'tool setup' in result.get('error', '').lower(): results['tool_setup'] += 1 else: results['skipped'] += 1 # Unchanged files elif 'already imported' in result.get('error', '').lower(): results['skipped'] += 1 else: results['errors'] += 1 print(f" Error: {result.get('error')}") return results def ensure_active_sessions_table(self): """Create active sessions tracking table if not exists""" self.cursor.execute(""" CREATE TABLE IF NOT EXISTS udcactivesessions ( machinenumber VARCHAR(20) PRIMARY KEY, filepath VARCHAR(255), partnumber VARCHAR(50), badgenumber VARCHAR(20), partsrun INT DEFAULT 0, lastupdate DATETIME, sessionstart DATETIME, lastseen DATETIME DEFAULT CURRENT_TIMESTAMP ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 """) self.conn.commit() def parse_active_file(self, filepath: str) -> Optional[Dict[str, Any]]: """Parse an incomplete file to get current state (without importing)""" try: with open(filepath, 'r', encoding='utf-8') as f: data = json.load(f) except (json.JSONDecodeError, OSError): return None if not isinstance(data, list) or len(data) == 0: return None # Extract machine number from path parent_dir = os.path.basename(os.path.dirname(os.path.dirname(filepath))) machine_number = parent_dir if parent_dir.isdigit() else None if not machine_number: parts = filepath.split(os.sep) for part in parts: if part.isdigit() and len(part) == 4: machine_number = part break if not machine_number: return None # Extract header and count parts header = None parts_run = 0 last_badge = None last_part = None first_time = None last_time = None for item in data: if 'HeaderValues' in item: header = item['HeaderValues'] hdr_time = parse_clm_timestamp(header.get('TimeDate')) if hdr_time: if not first_time or hdr_time < first_time: first_time = hdr_time if not last_time or hdr_time > last_time: last_time = hdr_time # Initial badge/part from header if header.get('BadgeNumber'): last_badge = header.get('BadgeNumber', '').strip() if header.get('PartNum'): last_part = header.get('PartNum', '').strip() elif 'Part' in item: # New part started parts_run += 1 part_data = item['Part'] if part_data.get('BadgeNumber'): last_badge = part_data.get('BadgeNumber', '').strip() if part_data.get('PartNum'): last_part = part_data.get('PartNum', '').strip() part_time = parse_clm_timestamp(part_data.get('TimeDate')) if part_time: if not first_time or part_time < first_time: first_time = part_time if not last_time or part_time > last_time: last_time = part_time elif 'ItemCrossing' in item: crossing = item['ItemCrossing'] crossing_time = parse_clm_timestamp(crossing.get('TimeDate')) if crossing_time: if not last_time or crossing_time > last_time: last_time = crossing_time if not header: return None # If no Part blocks yet, count as 1 (the header starts the first part) if parts_run == 0: parts_run = 1 return { 'machinenumber': machine_number, 'filepath': filepath, 'partnumber': last_part, 'badgenumber': last_badge, 'partsrun': parts_run, 'sessionstart': first_time, 'lastupdate': last_time } def _scan_machine_folder(self, machine_path: str, cutoff_date: datetime) -> Dict[str, Any]: """Scan a single machine folder for incomplete files. Used for parallel scanning.""" result = { 'files_scanned': 0, 'files_skipped_old': 0, 'incomplete_files': [] # list of (filepath, state) } data_path = os.path.join(machine_path, 'Data') if not os.path.isdir(data_path): return result files = glob.glob(os.path.join(data_path, '*.json')) for filepath in files: # Quick filter by filename date (format: *_YYYY-MM-DD_*.json) basename = os.path.basename(filepath) parts = basename.replace('.json', '').split('_') if len(parts) >= 4: try: file_date = datetime.strptime(parts[-2], '%Y-%m-%d') if file_date < cutoff_date: result['files_skipped_old'] += 1 continue except ValueError: pass # Can't parse date, scan anyway result['files_scanned'] += 1 if not is_file_complete(filepath): state = self.parse_active_file(filepath) if state and state['lastupdate']: result['incomplete_files'].append((filepath, state)) return result def update_active_sessions(self, clm_directory: str, verbose: bool = True, max_age_days: int = 7, max_workers: int = 8) -> Dict[str, Any]: """Scan for incomplete files and update active sessions table. Only tracks the MOST RECENT incomplete file per machine. Older incomplete files are considered stale/abandoned. Uses filename date to skip files older than max_age_days without reading them. Uses parallel scanning with max_workers threads. """ self.ensure_active_sessions_table() results = { 'active_machines': 0, 'updated': 0, 'cleared': 0, 'stale_skipped': 0, 'files_scanned': 0, 'files_skipped_old': 0 } # Calculate cutoff date cutoff_date = datetime.now() - timedelta(days=max_age_days) # Collect all incomplete files per machine, then pick the newest machine_incomplete_files = {} # machine -> list of (filepath, state) # Find all machine folders machine_folders = [os.path.join(clm_directory, item) for item in os.listdir(clm_directory) if os.path.isdir(os.path.join(clm_directory, item)) and item.isdigit()] if verbose: print(f" Scanning {len(machine_folders)} machines with {max_workers} threads...") # Parallel scan completed = 0 with ThreadPoolExecutor(max_workers=max_workers) as executor: futures = {executor.submit(self._scan_machine_folder, path, cutoff_date): path for path in machine_folders} for future in as_completed(futures): completed += 1 if verbose and completed % 10 == 0: print(f" Completed {completed}/{len(machine_folders)} machines...", end='\r') scan_result = future.result() results['files_scanned'] += scan_result['files_scanned'] results['files_skipped_old'] += scan_result['files_skipped_old'] for filepath, state in scan_result['incomplete_files']: machine = state['machinenumber'] if machine not in machine_incomplete_files: machine_incomplete_files[machine] = [] machine_incomplete_files[machine].append((filepath, state)) # For each machine, only use the most recent incomplete file active_machines = set() for machine, file_list in machine_incomplete_files.items(): # Sort by lastupdate descending, pick the newest file_list.sort(key=lambda x: x[1]['lastupdate'], reverse=True) newest_filepath, newest_state = file_list[0] # Count stale files (all but the newest) results['stale_skipped'] += len(file_list) - 1 active_machines.add(machine) # Update or insert active session with the newest file self.cursor.execute(""" INSERT INTO udcactivesessions (machinenumber, filepath, partnumber, badgenumber, partsrun, lastupdate, sessionstart, lastseen) VALUES (%s, %s, %s, %s, %s, %s, %s, NOW()) ON DUPLICATE KEY UPDATE filepath = VALUES(filepath), partnumber = VALUES(partnumber), badgenumber = VALUES(badgenumber), partsrun = VALUES(partsrun), lastupdate = VALUES(lastupdate), sessionstart = VALUES(sessionstart), lastseen = NOW() """, ( newest_state['machinenumber'], os.path.basename(newest_state['filepath']), newest_state['partnumber'], newest_state['badgenumber'], newest_state['partsrun'], newest_state['lastupdate'], newest_state['sessionstart'] )) results['updated'] += 1 self.conn.commit() results['active_machines'] = len(active_machines) if verbose: print(f" Scanned {results['files_scanned']} files across {len(machine_folders)} machines" + " " * 20) # Clear machines that are no longer active (not seen in this scan) if active_machines: placeholders = ','.join(['%s'] * len(active_machines)) self.cursor.execute(f""" DELETE FROM udcactivesessions WHERE machinenumber NOT IN ({placeholders}) """, tuple(active_machines)) else: # No active machines, clear entire table self.cursor.execute("DELETE FROM udcactivesessions") results['cleared'] = self.cursor.rowcount self.conn.commit() return results def parse_all_machines(self, clm_directory: str) -> Dict[str, Any]: """Parse all machine folders in CLM_Data directory""" results = { 'machines_processed': 0, 'total_files': 0, 'imported': 0, 'skipped': 0, 'skipped_old': 0, 'in_progress': 0, 'tool_setup': 0, 'errors': 0, 'total_records': 0, 'total_measurements': 0, 'total_oot': 0 } # Load all known files into memory for fast lookup (one query instead of per-file) print("Loading known files from database...") known_files = self.load_known_files() print(f" {len(known_files)} files already tracked") # Find all machine folders (directories with numeric names) machine_folders = [] for item in os.listdir(clm_directory): item_path = os.path.join(clm_directory, item) if os.path.isdir(item_path) and item.isdigit(): machine_folders.append(item_path) print(f"Found {len(machine_folders)} machine folders") for machine_path in sorted(machine_folders): machine_results = self.parse_machine_folder(machine_path, known_files) if machine_results.get('success', True): # Allow partial success results['machines_processed'] += 1 results['total_files'] += machine_results.get('total_files', 0) results['imported'] += machine_results.get('imported', 0) results['skipped'] += machine_results.get('skipped', 0) results['skipped_old'] += machine_results.get('skipped_old', 0) results['in_progress'] += machine_results.get('in_progress', 0) results['tool_setup'] += machine_results.get('tool_setup', 0) results['errors'] += machine_results.get('errors', 0) results['total_records'] += machine_results.get('total_records', 0) results['total_measurements'] += machine_results.get('total_measurements', 0) results['total_oot'] += machine_results.get('total_oot', 0) return results def main(): parser = argparse.ArgumentParser(description='Parse CLM_Data JSON files and import to database') parser.add_argument('--dir', default=CLM_DATA_PATH, help='CLM_Data directory') parser.add_argument('--file', help='Parse a specific JSON file') parser.add_argument('--machine', help='Parse only a specific machine folder') parser.add_argument('--active-only', action='store_true', help='Only update active sessions (skip imports)') parser.add_argument('--max-age', type=int, default=90, help='Max file age in days (default: 90, 0=no limit)') parser.add_argument('--host', default=DB_CONFIG['host'], help='MySQL host') parser.add_argument('--port', type=int, default=DB_CONFIG['port'], help='MySQL port') parser.add_argument('--user', default=DB_CONFIG['user'], help='MySQL user') parser.add_argument('--password', default=DB_CONFIG['password'], help='MySQL password') parser.add_argument('--database', default=DB_CONFIG['database'], help='MySQL database') args = parser.parse_args() # Build config from args db_config = { 'host': args.host, 'port': args.port, 'user': args.user, 'password': args.password, 'database': args.database } # Max age (0 = no limit) max_age = args.max_age if args.max_age > 0 else None # Create parser and connect clm_parser = CLMParser(db_config, max_age_days=max_age) if not clm_parser.connect(): sys.exit(1) try: # Ensure tracking table exists clm_parser.ensure_clm_tracking_table() if args.active_only: # Only update active sessions, skip imports print("Updating active sessions...") active_results = clm_parser.update_active_sessions(args.dir) print(f"\n{'='*50}") print(f"Active Sessions Summary:") print(f" Active machines: {active_results['active_machines']}") print(f" Sessions updated: {active_results['updated']}") print(f" Stale files skipped: {active_results['stale_skipped']}") print(f" Old sessions cleared: {active_results['cleared']}") elif args.file: # Parse single file result = clm_parser.parse_file(args.file) if result['success']: print(f"\nSuccessfully imported {result.get('measurements', 0)} measurements") else: print(f"\nFailed: {result.get('error')}") sys.exit(1) elif args.machine: # Parse specific machine machine_path = os.path.join(args.dir, args.machine) if not os.path.isdir(machine_path): print(f"Machine folder not found: {machine_path}") sys.exit(1) # Load known files for fast lookup print("Loading known files from database...") known_files = clm_parser.load_known_files() print(f" {len(known_files)} files already tracked") results = clm_parser.parse_machine_folder(machine_path, known_files) print(f"\n{'='*50}") print(f"Machine {args.machine} Import Summary:") print(f" Total files found: {results.get('total_files', 0)}") print(f" Files imported: {results.get('imported', 0)}") print(f" Files skipped: {results.get('skipped', 0)}") print(f" Skipped (too old): {results.get('skipped_old', 0)}") print(f" Jobs in progress: {results.get('in_progress', 0)}") print(f" Files with errors: {results.get('errors', 0)}") print(f" Total measurements: {results.get('total_measurements', 0)}") print(f" Total OOT: {results.get('total_oot', 0)}") else: # Parse all machines if max_age: print(f"Max file age: {max_age} days") results = clm_parser.parse_all_machines(args.dir) print(f"\n{'='*50}") print(f"CLM Data Import Summary:") print(f" Machines processed: {results['machines_processed']}") print(f" Total files found: {results['total_files']}") print(f" Files imported: {results['imported']}") print(f" Files skipped: {results['skipped']}") print(f" Skipped (too old): {results['skipped_old']}") print(f" Jobs in progress: {results['in_progress']}") print(f" Files with errors: {results['errors']}") print(f" Total measurements: {results['total_measurements']}") print(f" Total OOT: {results['total_oot']}") # Also update active sessions print(f"\nUpdating active sessions...") active_results = clm_parser.update_active_sessions(args.dir) print(f" Active machines: {active_results['active_machines']}") finally: clm_parser.disconnect() if __name__ == '__main__': main()