From 5c707c3cd43e6ca3bdab905f3d57a92a868cda98 Mon Sep 17 00:00:00 2001 From: cproudlock Date: Tue, 16 Dec 2025 07:54:54 -0500 Subject: [PATCH] Initial commit: UDC/CLM parser for ShopDB MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Python parsers for Universal Data Collector (UDC) and Control Loop Monitor (CLM) data from CNC machines. - clmparser.py: Main parser for CLM JSON data files - Incremental imports using file hashes - Extracts parts, measurements, tool data, violations - Calculates cycle times and changeovers - udcparser.py: Parser for raw UDC log files - Application events, errors, connections - config.py: Database configuration (dev/prod) - backfill_changeover.py: One-time migration utility 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude --- .gitignore | 20 + parser/backfill_changeover.py | 47 ++ parser/clmparser.py | 1221 +++++++++++++++++++++++++++++++++ parser/config.py | 36 + parser/requirements.txt | 1 + parser/udcparser.py | 857 +++++++++++++++++++++++ 6 files changed, 2182 insertions(+) create mode 100644 .gitignore create mode 100644 parser/backfill_changeover.py create mode 100644 parser/clmparser.py create mode 100644 parser/config.py create mode 100644 parser/requirements.txt create mode 100644 parser/udcparser.py diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..d2d8b91 --- /dev/null +++ b/.gitignore @@ -0,0 +1,20 @@ +# Data directories (large files, not for version control) +CLM_Data/ +LogFiles/ + +# Python +__pycache__/ +*.py[cod] +*$py.class +*.so +.Python +venv/ +ENV/ + +# IDE +.vscode/ +.idea/ + +# OS +.DS_Store +Thumbs.db diff --git a/parser/backfill_changeover.py b/parser/backfill_changeover.py new file mode 100644 index 0000000..661d4c0 --- /dev/null +++ b/parser/backfill_changeover.py @@ -0,0 +1,47 @@ +#!/usr/bin/env python3 +"""Backfill changeover values for udcparts""" +import mysql.connector + +conn = mysql.connector.connect( + host='127.0.0.1', port=3306, user='root', + password='rootpassword', database='shopdb' +) +cursor = conn.cursor() + +# Get all machines +cursor.execute("SELECT DISTINCT machinenumber FROM udcsessions ORDER BY machinenumber") +machines = [row[0] for row in cursor.fetchall()] + +total_updated = 0 +for machine in machines: + # Get all parts for this machine ordered by start time + cursor.execute(""" + SELECT p.partrunid, p.programstart, p.programend + FROM udcparts p + JOIN udcsessions s ON p.sessionid = s.sessionid + WHERE s.machinenumber = %s AND p.programstart IS NOT NULL + ORDER BY p.programstart ASC + """, (machine,)) + parts = cursor.fetchall() + + prev_end = None + updates = [] + for partrunid, programstart, programend in parts: + if prev_end and programstart: + changeover = int((programstart - prev_end).total_seconds()) + # Only set if reasonable (> 0 and < 24 hours) + if 0 < changeover < 86400: + updates.append((changeover, partrunid)) + prev_end = programend if programend else prev_end + + # Batch update + if updates: + cursor.executemany("UPDATE udcparts SET changeover = %s WHERE partrunid = %s", updates) + conn.commit() + total_updated += len(updates) + + print(f"Machine {machine}: {len(updates)} changeovers set") + +print(f"\nTotal updated: {total_updated}") +cursor.close() +conn.close() diff --git a/parser/clmparser.py b/parser/clmparser.py new file mode 100644 index 0000000..b9f10e9 --- /dev/null +++ b/parser/clmparser.py @@ -0,0 +1,1221 @@ +#!/usr/bin/env python3 +""" +CLM Data Parser +Parses CLM_Data JSON files (authoritative UDC data source) and imports to ShopDB. + +Usage: + python clmparser.py [--dir /path/to/CLM_Data] [--file specific_file.json] [--machine 3101] + +CLM_Data Structure: + /CLM_Data/{MachineNumber}/Data/{PartNum}_{Oper}_{Serial}_{Date}_{Time}.json + +JSON Structure: + - HeaderValues: Part info, badge, machine, timestamps + - ItemCrossing: Program steps with DataInput measurements + - End: Part run completion + +Production Path (Windows): S:\\SPC\\UDC\\CLM_Data\\*Machine Number*\\Data\\ +IMPORTANT: Production only READS from this location, NEVER modifies or deletes. +""" + +import json +import os +import sys +import glob +import argparse +import hashlib +from datetime import datetime, timedelta +from typing import Optional, Dict, List, Any +from concurrent.futures import ThreadPoolExecutor, as_completed +import mysql.connector +from mysql.connector import Error + +# Import config +try: + from config import DB_CONFIG, CLM_DATA_PATH, BATCH_SIZE +except ImportError: + import platform + IS_WINDOWS = platform.system() == 'Windows' + DB_CONFIG = { + 'host': '127.0.0.1', + 'port': 3306, + 'user': 'root', + 'password': 'rootpassword', + 'database': 'shopdb' + } + CLM_DATA_PATH = r'S:\SPC\UDC\CLM_Data' if IS_WINDOWS else '/home/camp/projects/UDC/CLM_Data' + BATCH_SIZE = 1000 + + +def parse_clm_timestamp(ts_str: str) -> Optional[datetime]: + """Parse CLM timestamp: MM/DD/YYYY HH:MM:SS""" + if not ts_str: + return None + try: + return datetime.strptime(ts_str, '%m/%d/%Y %H:%M:%S') + except (ValueError, TypeError): + return None + + +def parse_numeric(val) -> Optional[float]: + """Parse numeric value from JSON (already numeric or string)""" + if val is None: + return None + try: + return float(val) + except (ValueError, TypeError): + return None + + +def file_hash(filepath: str) -> Optional[str]: + """Generate MD5 hash of file for duplicate detection""" + try: + hash_md5 = hashlib.md5() + with open(filepath, 'rb') as f: + for chunk in iter(lambda: f.read(4096), b''): + hash_md5.update(chunk) + return hash_md5.hexdigest() + except OSError as e: + print(f" Warning: Could not hash file {filepath}: {e}") + return None + + +def is_file_complete(filepath: str) -> bool: + """Check if CLM JSON file has an End block (indicates job is complete)""" + try: + with open(filepath, 'r', encoding='utf-8') as f: + data = json.load(f) + if not isinstance(data, list): + return False + for item in data: + if 'End' in item: + return True + return False + except (json.JSONDecodeError, OSError): + return False + + +class CLMParser: + """Parser for CLM_Data JSON files""" + + def __init__(self, db_config: dict): + self.db_config = db_config + self.conn = None + self.cursor = None + self.measurements_batch = [] + self.events_batch = [] + self.manual_requests_batch = [] + self.header_updates_batch = [] + self.violations_batch = [] + + def connect(self): + """Connect to MySQL database""" + try: + self.conn = mysql.connector.connect(**self.db_config) + self.cursor = self.conn.cursor(dictionary=True) + print(f"Connected to MySQL at {self.db_config['host']}") + return True + except Error as e: + print(f"Error connecting to MySQL: {e}") + return False + + def disconnect(self): + """Disconnect from database""" + if self.cursor: + self.cursor.close() + if self.conn: + self.conn.close() + + def ensure_clm_tracking_table(self): + """Create tracking table for CLM imports if not exists""" + self.cursor.execute(""" + CREATE TABLE IF NOT EXISTS udcclmfiles ( + fileid INT AUTO_INCREMENT PRIMARY KEY, + filename VARCHAR(180) NOT NULL, + machinenumber VARCHAR(10), + filehash VARCHAR(32), + partrunid INT, + sessionid INT, + recordcount INT DEFAULT 0, + importdate DATETIME DEFAULT CURRENT_TIMESTAMP, + UNIQUE KEY uk_filename (filename), + UNIQUE KEY uk_filehash (filehash), + INDEX idx_machine (machinenumber), + INDEX idx_importdate (importdate) + ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 + """) + self.conn.commit() + + def load_known_files(self) -> dict: + """Load all known files into memory for fast lookup (avoids per-file DB queries)""" + self.cursor.execute(""" + SELECT filename, filehash, filemodtime, partrunid, recordcount + FROM udcclmfiles + """) + known_files = {} + for row in self.cursor.fetchall(): + known_files[row['filename']] = { + 'filehash': row['filehash'], + 'filemodtime': row['filemodtime'], + 'partrunid': row['partrunid'], + 'recordcount': row['recordcount'] + } + return known_files + + def get_import_status(self, filename: str, filehash: str = None) -> dict: + """Check import status of a CLM file + Returns: {'imported': bool, 'hash_changed': bool, 'partrunid': int or None, 'stored_hash': str or None, 'filemodtime': datetime or None} + """ + basename = os.path.basename(filename) + result = {'imported': False, 'hash_changed': False, 'partrunid': None, 'stored_hash': None, 'filemodtime': None} + + # Check by filename + self.cursor.execute( + "SELECT fileid, filehash, partrunid, filemodtime FROM udcclmfiles WHERE filename = %s", + (basename,) + ) + row = self.cursor.fetchone() + if row: + result['imported'] = True + result['stored_hash'] = row['filehash'] + result['partrunid'] = row['partrunid'] + result['filemodtime'] = row['filemodtime'] + if filehash and row['filehash'] != filehash: + result['hash_changed'] = True + return result + + # Check by hash if provided (different filename, same content) + if filehash: + self.cursor.execute( + "SELECT fileid, partrunid FROM udcclmfiles WHERE filehash = %s", + (filehash,) + ) + row = self.cursor.fetchone() + if row: + result['imported'] = True + result['partrunid'] = row['partrunid'] + return result + + return result + + def quick_check_imported(self, filepath: str, known_files: dict = None) -> dict: + """Fast check if file needs processing (no hash computation). + Uses file modification time to skip unchanged files. + If known_files dict is provided, uses in-memory lookup (much faster). + Returns: {'skip': bool, 'reason': str, 'needs_hash_check': bool, 'partrunid': int or None} + """ + basename = os.path.basename(filepath) + + # Use in-memory cache if provided (much faster than DB query per file) + if known_files is not None: + if basename not in known_files: + return {'skip': False, 'reason': 'new', 'needs_hash_check': False, 'partrunid': None} + + file_info = known_files[basename] + stored_modtime = file_info['filemodtime'] + + # If we have a stored modtime, skip without checking current file modtime + # (avoids slow network stat calls - files rarely change after creation) + if stored_modtime: + return {'skip': True, 'reason': 'unchanged', 'needs_hash_check': False, 'partrunid': file_info['partrunid']} + + # No stored modtime - need to check current modtime and maybe update + try: + file_mtime = datetime.fromtimestamp(os.path.getmtime(filepath)) + except OSError: + return {'skip': False, 'reason': 'cannot_read', 'needs_hash_check': False, 'partrunid': None} + + # File exists in DB but no modtime stored - check hash to be safe + return {'skip': False, 'reason': 'no_modtime', 'needs_hash_check': True, 'partrunid': file_info['partrunid']} + + # No cache - get file modification time + try: + file_mtime = datetime.fromtimestamp(os.path.getmtime(filepath)) + except OSError: + return {'skip': False, 'reason': 'cannot_read', 'needs_hash_check': False, 'partrunid': None} + + # Fallback to DB query if no cache provided + self.cursor.execute( + "SELECT fileid, filehash, partrunid, filemodtime FROM udcclmfiles WHERE filename = %s", + (basename,) + ) + row = self.cursor.fetchone() + + if not row: + return {'skip': False, 'reason': 'new', 'needs_hash_check': False, 'partrunid': None} + + stored_modtime = row['filemodtime'] + + if stored_modtime and file_mtime <= stored_modtime: + return {'skip': True, 'reason': 'unchanged', 'needs_hash_check': False, 'partrunid': row['partrunid']} + + return {'skip': False, 'reason': 'modified', 'needs_hash_check': True, 'partrunid': row['partrunid']} + + def is_file_imported(self, filename: str, filehash: str = None) -> bool: + """Check if a CLM file has already been imported (simple boolean check)""" + status = self.get_import_status(filename, filehash) + return status['imported'] and not status['hash_changed'] + + def delete_partrun(self, partrun_id: int): + """Delete a part run and all associated records for re-import""" + # Delete in order due to foreign key constraints + self.cursor.execute("DELETE FROM udcmanualrequests WHERE partrunid = %s", (partrun_id,)) + self.cursor.execute("DELETE FROM udcmeasurements WHERE partrunid = %s", (partrun_id,)) + self.cursor.execute("DELETE FROM udcevents WHERE partrunid = %s", (partrun_id,)) + self.cursor.execute("DELETE FROM udcparts WHERE partrunid = %s", (partrun_id,)) + self.cursor.execute("DELETE FROM udcclmfiles WHERE partrunid = %s", (partrun_id,)) + self.conn.commit() + + def record_import(self, filepath: str, machine_number: str, filehash: str, + partrun_id: int, session_id: int, record_count: int): + """Record successful import of a CLM file""" + basename = os.path.basename(filepath) + # Get file modification time for fast skip checks on future runs + try: + file_mtime = datetime.fromtimestamp(os.path.getmtime(filepath)) + except OSError: + file_mtime = None + + self.cursor.execute(""" + INSERT INTO udcclmfiles (filename, machinenumber, filehash, filemodtime, partrunid, sessionid, recordcount) + VALUES (%s, %s, %s, %s, %s, %s, %s) + ON DUPLICATE KEY UPDATE + partrunid = VALUES(partrunid), + sessionid = VALUES(sessionid), + recordcount = VALUES(recordcount), + filemodtime = VALUES(filemodtime), + importdate = CURRENT_TIMESTAMP + """, (basename, machine_number, filehash, file_mtime, partrun_id, session_id, record_count)) + self.conn.commit() + + def record_skipped_file(self, filepath: str, machine_number: str, filehash: str, skip_reason: str): + """Record a skipped file (tool setup, incomplete) so it's not re-checked every run""" + basename = os.path.basename(filepath) + try: + file_mtime = datetime.fromtimestamp(os.path.getmtime(filepath)) + except OSError: + file_mtime = None + + # Use recordcount = -1 to indicate skipped file, store reason in a comment-like way + self.cursor.execute(""" + INSERT INTO udcclmfiles (filename, machinenumber, filehash, filemodtime, partrunid, sessionid, recordcount) + VALUES (%s, %s, %s, %s, NULL, NULL, -1) + ON DUPLICATE KEY UPDATE + filemodtime = VALUES(filemodtime), + importdate = CURRENT_TIMESTAMP + """, (basename, machine_number, filehash, file_mtime)) + self.conn.commit() + + def get_or_create_session(self, machine_number: str, start_time: datetime) -> int: + """Get or create a session for the machine and date""" + # For CLM data, we create one session per machine per day + session_date = start_time.date() + session_name = f"CLM_{machine_number}_{session_date.isoformat()}" + + # Check if session exists + self.cursor.execute( + """SELECT sessionid FROM udcsessions + WHERE machinenumber = %s AND logfilename = %s""", + (machine_number, session_name) + ) + row = self.cursor.fetchone() + if row: + return row['sessionid'] + + # Create new session + self.cursor.execute( + """INSERT INTO udcsessions (machinenumber, logfilename, sessionstart, dateadded) + VALUES (%s, %s, %s, NOW())""", + (machine_number, session_name, start_time) + ) + self.conn.commit() + return self.cursor.lastrowid + + def create_part_run(self, session_id: int, header: dict, start_time: datetime, + end_time: datetime = None, machine_number: str = None) -> int: + """Create a part run record from CLM header data""" + cycletime = None + if start_time and end_time: + cycletime = int((end_time - start_time).total_seconds()) + + # Calculate changeover time (time since previous part ended on this machine) + changeover = None + if start_time and machine_number: + self.cursor.execute( + """SELECT MAX(p.programend) as prev_end + FROM udcparts p + JOIN udcsessions s ON p.sessionid = s.sessionid + WHERE s.machinenumber = %s AND p.programend < %s AND p.programend IS NOT NULL""", + (machine_number, start_time) + ) + row = self.cursor.fetchone() + if row and row['prev_end']: + changeover_secs = int((start_time - row['prev_end']).total_seconds()) + # Only set if reasonable (> 0 and < 24 hours) + if 0 < changeover_secs < 86400: + changeover = changeover_secs + + # Strip whitespace and truncate strings to fit column sizes + partnumber = (header.get('PartNum') or '').strip()[:50] + opernumber = (header.get('PartOper') or '').strip()[:20] + serialnumber = (header.get('SerialNum') or '').strip()[:50] + programname = (header.get('ProgName') or '').strip()[:100] + jobnumber = (header.get('ProgNum') or '').strip()[:50] + badgenumber = (header.get('BadgeNumber') or '').strip()[:50] + + self.cursor.execute( + """INSERT INTO udcparts + (sessionid, partnumber, opernumber, serialnumber, programname, jobnumber, + badgenumber, programstart, programend, cycletime, changeover, dateadded) + VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, NOW())""", + ( + session_id, + partnumber or None, + opernumber or None, + serialnumber or None, + programname or None, + jobnumber or None, + badgenumber or None, + start_time, + end_time, + cycletime, + changeover + ) + ) + self.conn.commit() + return self.cursor.lastrowid + + def update_part_run_counts(self, partrun_id: int, measurement_count: int, + manual_count: int, probe_count: int, oot_count: int): + """Update part run with measurement counts""" + self.cursor.execute( + """UPDATE udcparts SET + measurementcount = %s, manualcount = %s, probecount = %s, ootcount = %s + WHERE partrunid = %s""", + (measurement_count, manual_count, probe_count, oot_count, partrun_id) + ) + self.conn.commit() + + def add_measurement(self, partrun_id: int, session_id: int, event_time: datetime, + event_type: str, method: str, dimid: str, description: str, + seq: int, minval: float, maxval: float, actualval: float, + deviation: float, oot: int): + """Add a measurement to the batch""" + self.measurements_batch.append(( + partrun_id, + session_id, + event_time, + event_type, + method, + dimid, + description[:255] if description else None, + seq, + minval, + maxval, + actualval, + deviation, + oot + )) + + if len(self.measurements_batch) >= BATCH_SIZE: + self.flush_measurements() + + def add_event(self, partrun_id: int, session_id: int, event_time: datetime, + event_type: str, item_number: str, description: str): + """Add an event to the batch""" + self.events_batch.append(( + partrun_id, + session_id, + event_time, + event_type, + item_number, + description[:500] if description else None + )) + + if len(self.events_batch) >= BATCH_SIZE: + self.flush_events() + + def add_manual_request(self, partrun_id: int, request_time: datetime, + response_time: datetime, description: str): + """Add a manual request timing record""" + response_seconds = None + if request_time and response_time: + response_seconds = int((response_time - request_time).total_seconds()) + + self.cursor.execute( + """INSERT INTO udcmanualrequests + (partrunid, requesttime, responsetime, responseseconds, description) + VALUES (%s, %s, %s, %s, %s)""", + ( + partrun_id, + request_time, + response_time, + response_seconds, + description[:255] if description else None + ) + ) + self.conn.commit() + return response_seconds + + def flush_measurements(self): + """Flush measurement batch to database""" + if not self.measurements_batch: + return + + self.cursor.executemany( + """INSERT INTO udcmeasurements + (partrunid, sessionid, eventtime, eventtype, method, dimid, description, + seqnumber, minval, maxval, actualval, deviation, oot) + VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)""", + self.measurements_batch + ) + self.conn.commit() + self.measurements_batch = [] + + def flush_events(self): + """Flush event batch to database""" + if not self.events_batch: + return + + self.cursor.executemany( + """INSERT INTO udcevents + (partrunid, sessionid, eventtime, eventtype, itemnumber, description) + VALUES (%s, %s, %s, %s, %s, %s)""", + self.events_batch + ) + self.conn.commit() + self.events_batch = [] + + def add_header_update(self, partrun_id: int, session_id: int, machine_number: str, + event_time: datetime, details: str, description: str, badge: str): + """Add a header update (badge change) to the batch""" + # Strip whitespace and truncate badge + clean_badge = badge.strip()[:20] if badge else None + self.header_updates_batch.append(( + partrun_id, + session_id, + machine_number, + event_time, + details[:255] if details else None, + description[:255] if description else None, + clean_badge + )) + + if len(self.header_updates_batch) >= BATCH_SIZE: + self.flush_header_updates() + + def flush_header_updates(self): + """Flush header updates batch to database""" + if not self.header_updates_batch: + return + + self.cursor.executemany( + """INSERT INTO udcheaderupdates + (partrunid, sessionid, machinenumber, eventtime, details, description, badgenumber) + VALUES (%s, %s, %s, %s, %s, %s, %s)""", + self.header_updates_batch + ) + self.conn.commit() + self.header_updates_batch = [] + + def add_violation(self, partrun_id: int, session_id: int, machine_number: str, + event_time: datetime, previous_val: float, current_val: float, + badge: str, item_no: str, crossing_desc: str): + """Add a parameter violation to the batch""" + # Strip whitespace and truncate badge + clean_badge = badge.strip()[:20] if badge else None + self.violations_batch.append(( + partrun_id, + session_id, + machine_number, + event_time, + previous_val, + current_val, + clean_badge, + item_no[:20] if item_no else None, + crossing_desc[:255] if crossing_desc else None + )) + + if len(self.violations_batch) >= BATCH_SIZE: + self.flush_violations() + + def flush_violations(self): + """Flush violations batch to database""" + if not self.violations_batch: + return + + self.cursor.executemany( + """INSERT INTO udcviolations + (partrunid, sessionid, machinenumber, eventtime, previousval, currentval, + badgenumber, itemno, crossingdesc) + VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)""", + self.violations_batch + ) + self.conn.commit() + self.violations_batch = [] + + def parse_file(self, filepath: str, known_files: dict = None) -> Dict[str, Any]: + """Parse a single CLM JSON file""" + filename = os.path.basename(filepath) + + # Extract machine number from parent folder + parent_dir = os.path.basename(os.path.dirname(os.path.dirname(filepath))) + machine_number = parent_dir if parent_dir.isdigit() else None + + if not machine_number: + # Try to extract from file path + parts = filepath.split(os.sep) + for part in parts: + if part.isdigit() and len(part) == 4: + machine_number = part + break + + if not machine_number: + return {'success': False, 'error': f'Could not determine machine number for {filename}'} + + # Quick check by filename + modification time (avoids reading file for hash) + quick_status = self.quick_check_imported(filepath, known_files) + if quick_status['skip']: + return {'success': False, 'error': f'File unchanged: {filename}', 'skip_silent': True} + + # Only compute hash if file is new or modification time changed + fhash = file_hash(filepath) + if fhash is None: + return {'success': False, 'error': f'Could not read file: {filename}'} + + # Full import status check with hash + import_status = self.get_import_status(filepath, fhash) + file_complete = is_file_complete(filepath) + + if import_status['imported']: + if import_status['hash_changed']: + # File was updated since last import + if file_complete: + # Job is now complete - delete old records and re-import + print(f"Re-importing completed file: {filename}") + self.delete_partrun(import_status['partrunid']) + else: + # Still incomplete, skip for now + return {'success': False, 'error': f'File updated but still incomplete: {filename}', 'in_progress': True} + else: + # Hash unchanged - already fully imported (update modtime for future fast skips) + return {'success': False, 'error': f'File already imported: {filename}'} + + print(f"Parsing {filename} (machine {machine_number})...") + + try: + with open(filepath, 'r', encoding='utf-8') as f: + data = json.load(f) + except json.JSONDecodeError as e: + self.record_skipped_file(filepath, machine_number, fhash, 'json_error') + return {'success': False, 'error': f'JSON parse error: {e}', 'skip_silent': True} + except Exception as e: + self.record_skipped_file(filepath, machine_number, fhash, 'read_error') + return {'success': False, 'error': f'File read error: {e}', 'skip_silent': True} + + if not isinstance(data, list) or len(data) == 0: + self.record_skipped_file(filepath, machine_number, fhash, 'invalid_json') + return {'success': False, 'error': 'Invalid JSON structure (expected non-empty array)', 'skip_silent': True} + + # Extract header + header = None + end_data = None + item_crossings = [] # Store full crossing data with measurements + + for item in data: + if 'HeaderValues' in item: + header = item['HeaderValues'] + elif 'End' in item: + end_data = item['End'] + elif 'ItemCrossing' in item: + crossing = item['ItemCrossing'] + crossing_time = crossing.get('TimeDate') + crossing_desc = crossing.get('Description', '') + item_no = crossing.get('ItemNo') + + # Extract measurements, header updates, and violations from Items + measurements_in_crossing = [] + header_updates_in_crossing = [] + violations_in_crossing = [] + if 'Items' in crossing and crossing['Items']: + for measure_item in crossing['Items']: + if 'DataInput' in measure_item: + measurements_in_crossing.append(measure_item['DataInput']) + elif 'HeaderUpdate' in measure_item: + header_updates_in_crossing.append(measure_item['HeaderUpdate']) + elif 'ItemViolation' in measure_item: + violations_in_crossing.append(measure_item['ItemViolation']) + + item_crossings.append({ + 'time': crossing_time, + 'description': crossing_desc, + 'item_no': item_no, + 'measurements': measurements_in_crossing, + 'header_updates': header_updates_in_crossing, + 'violations': violations_in_crossing + }) + + if not header: + # Files without HeaderValues are tool setup/calibration runs, not part runs + # Record in udcclmfiles so we don't re-check every run + self.record_skipped_file(filepath, machine_number, fhash, 'no_header') + return {'success': False, 'error': 'No HeaderValues (tool setup file)', 'skip_silent': True} + + # Parse timestamps + start_time = parse_clm_timestamp(header.get('TimeDate')) + end_time = parse_clm_timestamp(end_data.get('TimeDate')) if end_data else None + + if not start_time: + self.record_skipped_file(filepath, machine_number, fhash, 'no_timestamp') + return {'success': False, 'error': 'Could not parse start timestamp', 'skip_silent': True} + + # Get or create session + session_id = self.get_or_create_session(machine_number, start_time) + + # Create part run (pass machine_number for changeover calculation) + partrun_id = self.create_part_run(session_id, header, start_time, end_time, machine_number) + + # Counters + measurement_count = 0 + manual_count = 0 + probe_count = 0 + oot_count = 0 + record_count = 0 + manual_request_count = 0 + header_update_count = 0 + violation_count = 0 + + # Process item crossings with their measurements + for crossing in item_crossings: + crossing_time = parse_clm_timestamp(crossing['time']) + item_no = crossing['item_no'] + crossing_desc = crossing['description'] + + # Add the crossing event + self.add_event(partrun_id, session_id, crossing_time, 'ITEM-CROSSING', + item_no, crossing_desc) + record_count += 1 + + # Process measurements in this crossing + for m in crossing['measurements']: + event_time = parse_clm_timestamp(m.get('TimeDate')) + method = m.get('Method', '').upper() + dimid = m.get('DimID', '') + description = m.get('Description', '') + seq = int(m.get('Sequence', 0)) if m.get('Sequence') else 0 + minval = parse_numeric(m.get('Min')) + maxval = parse_numeric(m.get('Max')) + actualval = parse_numeric(m.get('Actual')) + deviation = parse_numeric(m.get('Deviation')) + oot = 1 if m.get('OutOfTolerance', 'N') == 'Y' else 0 + + self.add_measurement(partrun_id, session_id, event_time, 'PROCESSDATA', + method, dimid, description, seq, minval, maxval, + actualval, deviation, oot) + + measurement_count += 1 + + # For MANUAL measurements, record timing (crossing_time = prompt, event_time = response) + if method == 'MANUAL' and crossing_time and event_time: + self.add_manual_request(partrun_id, crossing_time, event_time, description) + manual_request_count += 1 + + if method == 'MANUAL': + manual_count += 1 + elif method == 'PROBE': + probe_count += 1 + if oot: + oot_count += 1 + record_count += 1 + + # Process header updates (badge changes) + for hu in crossing.get('header_updates', []): + hu_time = parse_clm_timestamp(hu.get('TimeDate')) + self.add_header_update( + partrun_id, session_id, machine_number, hu_time, + hu.get('Details', ''), hu.get('Description', ''), + hu.get('BadgeNumber', '') + ) + header_update_count += 1 + + # Process violations (parameter changes outside limits) + for v in crossing.get('violations', []): + v_time = parse_clm_timestamp(v.get('TimeDate')) + self.add_violation( + partrun_id, session_id, machine_number, v_time, + parse_numeric(v.get('Previous')), + parse_numeric(v.get('Current')), + v.get('BadgeNumber', ''), + item_no, + crossing_desc + ) + violation_count += 1 + + # Flush remaining batches + self.flush_measurements() + self.flush_events() + self.flush_header_updates() + self.flush_violations() + + # Update part run counts + self.update_part_run_counts(partrun_id, measurement_count, manual_count, + probe_count, oot_count) + + # Record successful import + self.record_import(filepath, machine_number, fhash, partrun_id, session_id, record_count) + + print(f" Imported: {measurement_count} measurements, {len(item_crossings)} events, " + f"{oot_count} OOT, {manual_request_count} manual timings, " + f"{header_update_count} badge changes, {violation_count} violations") + + return { + 'success': True, + 'records': record_count, + 'measurements': measurement_count, + 'oot_count': oot_count, + 'header_updates': header_update_count, + 'violations': violation_count, + 'partrun_id': partrun_id, + 'session_id': session_id + } + + def parse_machine_folder(self, machine_path: str, known_files: dict = None) -> Dict[str, Any]: + """Parse all JSON files for a single machine""" + data_path = os.path.join(machine_path, 'Data') + if not os.path.isdir(data_path): + return {'success': False, 'error': f'No Data folder in {machine_path}'} + + files = glob.glob(os.path.join(data_path, '*.json')) + results = { + 'total_files': len(files), + 'imported': 0, + 'updated': 0, + 'skipped': 0, + 'in_progress': 0, + 'tool_setup': 0, + 'errors': 0, + 'total_records': 0, + 'total_measurements': 0, + 'total_oot': 0 + } + + machine_num = os.path.basename(machine_path) + print(f"\nProcessing machine {machine_num}: {len(files)} files") + + for filepath in sorted(files): + result = self.parse_file(filepath, known_files) + + if result['success']: + results['imported'] += 1 + results['total_records'] += result.get('records', 0) + results['total_measurements'] += result.get('measurements', 0) + results['total_oot'] += result.get('oot_count', 0) + elif result.get('in_progress'): + results['in_progress'] += 1 + elif result.get('skip_silent'): + # Silent skips: unchanged files or tool setup files + if 'tool setup' in result.get('error', '').lower(): + results['tool_setup'] += 1 + else: + results['skipped'] += 1 # Unchanged files + elif 'already imported' in result.get('error', '').lower(): + results['skipped'] += 1 + else: + results['errors'] += 1 + print(f" Error: {result.get('error')}") + + return results + + def ensure_active_sessions_table(self): + """Create active sessions tracking table if not exists""" + self.cursor.execute(""" + CREATE TABLE IF NOT EXISTS udcactivesessions ( + machinenumber VARCHAR(20) PRIMARY KEY, + filepath VARCHAR(255), + partnumber VARCHAR(50), + badgenumber VARCHAR(20), + partsrun INT DEFAULT 0, + lastupdate DATETIME, + sessionstart DATETIME, + lastseen DATETIME DEFAULT CURRENT_TIMESTAMP + ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 + """) + self.conn.commit() + + def parse_active_file(self, filepath: str) -> Optional[Dict[str, Any]]: + """Parse an incomplete file to get current state (without importing)""" + try: + with open(filepath, 'r', encoding='utf-8') as f: + data = json.load(f) + except (json.JSONDecodeError, OSError): + return None + + if not isinstance(data, list) or len(data) == 0: + return None + + # Extract machine number from path + parent_dir = os.path.basename(os.path.dirname(os.path.dirname(filepath))) + machine_number = parent_dir if parent_dir.isdigit() else None + + if not machine_number: + parts = filepath.split(os.sep) + for part in parts: + if part.isdigit() and len(part) == 4: + machine_number = part + break + + if not machine_number: + return None + + # Extract header and count parts + header = None + parts_run = 0 + last_badge = None + last_part = None + first_time = None + last_time = None + + for item in data: + if 'HeaderValues' in item: + header = item['HeaderValues'] + hdr_time = parse_clm_timestamp(header.get('TimeDate')) + if hdr_time: + if not first_time or hdr_time < first_time: + first_time = hdr_time + if not last_time or hdr_time > last_time: + last_time = hdr_time + # Initial badge/part from header + if header.get('BadgeNumber'): + last_badge = header.get('BadgeNumber', '').strip() + if header.get('PartNum'): + last_part = header.get('PartNum', '').strip() + + elif 'Part' in item: + # New part started + parts_run += 1 + part_data = item['Part'] + if part_data.get('BadgeNumber'): + last_badge = part_data.get('BadgeNumber', '').strip() + if part_data.get('PartNum'): + last_part = part_data.get('PartNum', '').strip() + part_time = parse_clm_timestamp(part_data.get('TimeDate')) + if part_time: + if not first_time or part_time < first_time: + first_time = part_time + if not last_time or part_time > last_time: + last_time = part_time + + elif 'ItemCrossing' in item: + crossing = item['ItemCrossing'] + crossing_time = parse_clm_timestamp(crossing.get('TimeDate')) + if crossing_time: + if not last_time or crossing_time > last_time: + last_time = crossing_time + + if not header: + return None + + # If no Part blocks yet, count as 1 (the header starts the first part) + if parts_run == 0: + parts_run = 1 + + return { + 'machinenumber': machine_number, + 'filepath': filepath, + 'partnumber': last_part, + 'badgenumber': last_badge, + 'partsrun': parts_run, + 'sessionstart': first_time, + 'lastupdate': last_time + } + + def _scan_machine_folder(self, machine_path: str, cutoff_date: datetime) -> Dict[str, Any]: + """Scan a single machine folder for incomplete files. Used for parallel scanning.""" + result = { + 'files_scanned': 0, + 'files_skipped_old': 0, + 'incomplete_files': [] # list of (filepath, state) + } + + data_path = os.path.join(machine_path, 'Data') + if not os.path.isdir(data_path): + return result + + files = glob.glob(os.path.join(data_path, '*.json')) + for filepath in files: + # Quick filter by filename date (format: *_YYYY-MM-DD_*.json) + basename = os.path.basename(filepath) + parts = basename.replace('.json', '').split('_') + if len(parts) >= 4: + try: + file_date = datetime.strptime(parts[-2], '%Y-%m-%d') + if file_date < cutoff_date: + result['files_skipped_old'] += 1 + continue + except ValueError: + pass # Can't parse date, scan anyway + + result['files_scanned'] += 1 + if not is_file_complete(filepath): + state = self.parse_active_file(filepath) + if state and state['lastupdate']: + result['incomplete_files'].append((filepath, state)) + + return result + + def update_active_sessions(self, clm_directory: str, verbose: bool = True, max_age_days: int = 7, max_workers: int = 8) -> Dict[str, Any]: + """Scan for incomplete files and update active sessions table. + + Only tracks the MOST RECENT incomplete file per machine. + Older incomplete files are considered stale/abandoned. + Uses filename date to skip files older than max_age_days without reading them. + Uses parallel scanning with max_workers threads. + """ + self.ensure_active_sessions_table() + + results = { + 'active_machines': 0, + 'updated': 0, + 'cleared': 0, + 'stale_skipped': 0, + 'files_scanned': 0, + 'files_skipped_old': 0 + } + + # Calculate cutoff date + cutoff_date = datetime.now() - timedelta(days=max_age_days) + + # Collect all incomplete files per machine, then pick the newest + machine_incomplete_files = {} # machine -> list of (filepath, state) + + # Find all machine folders + machine_folders = [os.path.join(clm_directory, item) for item in os.listdir(clm_directory) + if os.path.isdir(os.path.join(clm_directory, item)) and item.isdigit()] + + if verbose: + print(f" Scanning {len(machine_folders)} machines with {max_workers} threads...") + + # Parallel scan + completed = 0 + with ThreadPoolExecutor(max_workers=max_workers) as executor: + futures = {executor.submit(self._scan_machine_folder, path, cutoff_date): path for path in machine_folders} + + for future in as_completed(futures): + completed += 1 + if verbose and completed % 10 == 0: + print(f" Completed {completed}/{len(machine_folders)} machines...", end='\r') + + scan_result = future.result() + results['files_scanned'] += scan_result['files_scanned'] + results['files_skipped_old'] += scan_result['files_skipped_old'] + + for filepath, state in scan_result['incomplete_files']: + machine = state['machinenumber'] + if machine not in machine_incomplete_files: + machine_incomplete_files[machine] = [] + machine_incomplete_files[machine].append((filepath, state)) + + # For each machine, only use the most recent incomplete file + active_machines = set() + for machine, file_list in machine_incomplete_files.items(): + # Sort by lastupdate descending, pick the newest + file_list.sort(key=lambda x: x[1]['lastupdate'], reverse=True) + newest_filepath, newest_state = file_list[0] + + # Count stale files (all but the newest) + results['stale_skipped'] += len(file_list) - 1 + + active_machines.add(machine) + + # Update or insert active session with the newest file + self.cursor.execute(""" + INSERT INTO udcactivesessions + (machinenumber, filepath, partnumber, badgenumber, partsrun, + lastupdate, sessionstart, lastseen) + VALUES (%s, %s, %s, %s, %s, %s, %s, NOW()) + ON DUPLICATE KEY UPDATE + filepath = VALUES(filepath), + partnumber = VALUES(partnumber), + badgenumber = VALUES(badgenumber), + partsrun = VALUES(partsrun), + lastupdate = VALUES(lastupdate), + sessionstart = VALUES(sessionstart), + lastseen = NOW() + """, ( + newest_state['machinenumber'], + os.path.basename(newest_state['filepath']), + newest_state['partnumber'], + newest_state['badgenumber'], + newest_state['partsrun'], + newest_state['lastupdate'], + newest_state['sessionstart'] + )) + results['updated'] += 1 + + self.conn.commit() + results['active_machines'] = len(active_machines) + + if verbose: + print(f" Scanned {results['files_scanned']} files across {len(machine_folders)} machines" + " " * 20) + + # Clear machines that are no longer active (not seen in this scan) + if active_machines: + placeholders = ','.join(['%s'] * len(active_machines)) + self.cursor.execute(f""" + DELETE FROM udcactivesessions + WHERE machinenumber NOT IN ({placeholders}) + """, tuple(active_machines)) + else: + # No active machines, clear entire table + self.cursor.execute("DELETE FROM udcactivesessions") + + results['cleared'] = self.cursor.rowcount + self.conn.commit() + + return results + + def parse_all_machines(self, clm_directory: str) -> Dict[str, Any]: + """Parse all machine folders in CLM_Data directory""" + results = { + 'machines_processed': 0, + 'total_files': 0, + 'imported': 0, + 'skipped': 0, + 'in_progress': 0, + 'tool_setup': 0, + 'errors': 0, + 'total_records': 0, + 'total_measurements': 0, + 'total_oot': 0 + } + + # Load all known files into memory for fast lookup (one query instead of per-file) + print("Loading known files from database...") + known_files = self.load_known_files() + print(f" {len(known_files)} files already tracked") + + # Find all machine folders (directories with numeric names) + machine_folders = [] + for item in os.listdir(clm_directory): + item_path = os.path.join(clm_directory, item) + if os.path.isdir(item_path) and item.isdigit(): + machine_folders.append(item_path) + + print(f"Found {len(machine_folders)} machine folders") + + for machine_path in sorted(machine_folders): + machine_results = self.parse_machine_folder(machine_path, known_files) + + if machine_results.get('success', True): # Allow partial success + results['machines_processed'] += 1 + results['total_files'] += machine_results.get('total_files', 0) + results['imported'] += machine_results.get('imported', 0) + results['skipped'] += machine_results.get('skipped', 0) + results['in_progress'] += machine_results.get('in_progress', 0) + results['tool_setup'] += machine_results.get('tool_setup', 0) + results['errors'] += machine_results.get('errors', 0) + results['total_records'] += machine_results.get('total_records', 0) + results['total_measurements'] += machine_results.get('total_measurements', 0) + results['total_oot'] += machine_results.get('total_oot', 0) + + return results + + +def main(): + parser = argparse.ArgumentParser(description='Parse CLM_Data JSON files and import to database') + parser.add_argument('--dir', default=CLM_DATA_PATH, help='CLM_Data directory') + parser.add_argument('--file', help='Parse a specific JSON file') + parser.add_argument('--machine', help='Parse only a specific machine folder') + parser.add_argument('--active-only', action='store_true', help='Only update active sessions (skip imports)') + parser.add_argument('--host', default=DB_CONFIG['host'], help='MySQL host') + parser.add_argument('--port', type=int, default=DB_CONFIG['port'], help='MySQL port') + parser.add_argument('--user', default=DB_CONFIG['user'], help='MySQL user') + parser.add_argument('--password', default=DB_CONFIG['password'], help='MySQL password') + parser.add_argument('--database', default=DB_CONFIG['database'], help='MySQL database') + + args = parser.parse_args() + + # Build config from args + db_config = { + 'host': args.host, + 'port': args.port, + 'user': args.user, + 'password': args.password, + 'database': args.database + } + + # Create parser and connect + clm_parser = CLMParser(db_config) + if not clm_parser.connect(): + sys.exit(1) + + try: + # Ensure tracking table exists + clm_parser.ensure_clm_tracking_table() + + if args.active_only: + # Only update active sessions, skip imports + print("Updating active sessions...") + active_results = clm_parser.update_active_sessions(args.dir) + print(f"\n{'='*50}") + print(f"Active Sessions Summary:") + print(f" Active machines: {active_results['active_machines']}") + print(f" Sessions updated: {active_results['updated']}") + print(f" Stale files skipped: {active_results['stale_skipped']}") + print(f" Old sessions cleared: {active_results['cleared']}") + + elif args.file: + # Parse single file + result = clm_parser.parse_file(args.file) + if result['success']: + print(f"\nSuccessfully imported {result.get('measurements', 0)} measurements") + else: + print(f"\nFailed: {result.get('error')}") + sys.exit(1) + + elif args.machine: + # Parse specific machine + machine_path = os.path.join(args.dir, args.machine) + if not os.path.isdir(machine_path): + print(f"Machine folder not found: {machine_path}") + sys.exit(1) + # Load known files for fast lookup + print("Loading known files from database...") + known_files = clm_parser.load_known_files() + print(f" {len(known_files)} files already tracked") + results = clm_parser.parse_machine_folder(machine_path, known_files) + print(f"\n{'='*50}") + print(f"Machine {args.machine} Import Summary:") + print(f" Total files found: {results.get('total_files', 0)}") + print(f" Files imported: {results.get('imported', 0)}") + print(f" Files skipped: {results.get('skipped', 0)}") + print(f" Jobs in progress: {results.get('in_progress', 0)}") + print(f" Files with errors: {results.get('errors', 0)}") + print(f" Total measurements: {results.get('total_measurements', 0)}") + print(f" Total OOT: {results.get('total_oot', 0)}") + + else: + # Parse all machines + results = clm_parser.parse_all_machines(args.dir) + print(f"\n{'='*50}") + print(f"CLM Data Import Summary:") + print(f" Machines processed: {results['machines_processed']}") + print(f" Total files found: {results['total_files']}") + print(f" Files imported: {results['imported']}") + print(f" Files skipped: {results['skipped']}") + print(f" Jobs in progress: {results['in_progress']}") + print(f" Files with errors: {results['errors']}") + print(f" Total measurements: {results['total_measurements']}") + print(f" Total OOT: {results['total_oot']}") + + # Also update active sessions + print(f"\nUpdating active sessions...") + active_results = clm_parser.update_active_sessions(args.dir) + print(f" Active machines: {active_results['active_machines']}") + + finally: + clm_parser.disconnect() + + +if __name__ == '__main__': + main() diff --git a/parser/config.py b/parser/config.py new file mode 100644 index 0000000..f92c837 --- /dev/null +++ b/parser/config.py @@ -0,0 +1,36 @@ +""" +UDC Parser Configuration +""" +import platform + +# Detect OS +IS_WINDOWS = platform.system() == 'Windows' + +# CLM Data paths +CLM_DATA_PATH_WINDOWS = r'S:\SPC\UDC\CLM_Data' +CLM_DATA_PATH_LINUX = '/home/camp/projects/UDC/CLM_Data' +CLM_DATA_PATH = CLM_DATA_PATH_WINDOWS if IS_WINDOWS else CLM_DATA_PATH_LINUX + +# Database - Development (Docker on Linux) +DB_CONFIG_DEV = { + 'host': '127.0.0.1', + 'port': 3306, + 'user': 'root', + 'password': 'rootpassword', + 'database': 'shopdb' +} + +# Database - Production (update these values) +DB_CONFIG_PROD = { + 'host': 'PROD_MYSQL_HOST', # TODO: Update with production host + 'port': 3306, + 'user': 'PROD_USER', # TODO: Update with production user + 'password': 'PROD_PASSWORD', # TODO: Update with production password + 'database': 'shopdb' +} + +# Default config based on OS (Windows = prod, Linux = dev) +DB_CONFIG = DB_CONFIG_PROD if IS_WINDOWS else DB_CONFIG_DEV + +# Batch insert size +BATCH_SIZE = 1000 diff --git a/parser/requirements.txt b/parser/requirements.txt new file mode 100644 index 0000000..e67ba94 --- /dev/null +++ b/parser/requirements.txt @@ -0,0 +1 @@ +mysql-connector-python>=8.0.0 diff --git a/parser/udcparser.py b/parser/udcparser.py new file mode 100644 index 0000000..c6aec51 --- /dev/null +++ b/parser/udcparser.py @@ -0,0 +1,857 @@ +#!/usr/bin/env python3 +""" +UDC Log Parser +Parses UDC (Universal Data Collector) log files and imports data into ShopDB. + +Usage: + python udcparser.py [--dir /path/to/logs] [--file specific_file.log] +""" + +import re +import os +import sys +import glob +import argparse +from datetime import datetime +from typing import Optional, Dict, List, Any +from concurrent.futures import ThreadPoolExecutor, as_completed +import mysql.connector +from mysql.connector import Error + +# Import config +try: + from config import DB_CONFIG_DEV as DB_CONFIG, LOG_DIRECTORY, BATCH_SIZE +except ImportError: + DB_CONFIG = { + 'host': '127.0.0.1', + 'port': 3306, + 'user': 'root', + 'password': 'rootpassword', + 'database': 'shopdb' + } + LOG_DIRECTORY = '/home/camp/projects/UDC/LogFiles' + BATCH_SIZE = 1000 + +# ============================================================================= +# Regex Patterns +# ============================================================================= + +# Timestamp pattern: MM/DD/YYYY HH:MM:SS +RE_TIMESTAMP = re.compile(r'(\d{2}/\d{2}/\d{4} \d{2}:\d{2}:\d{2})') + +# Log start/end +RE_START_LOG = re.compile(r'^Start Log : (\d{2}/\d{2}/\d{4} \d{2}:\d{2}:\d{2})') +RE_END_LOG = re.compile(r'^End Log : (\d{2}/\d{2}/\d{4} \d{2}:\d{2}:\d{2})') + +# Headers loaded +RE_HEADERS = re.compile(r'Headers loaded \(Path 1\): Part=(\S*) Oper=(\S*) Serial=(\S*) ProgName=(\S*) Job=(\S*)') + +# Badge number +RE_BADGE = re.compile(r'BadgeNumber = (\S+)') + +# Program start/end +RE_START_PROGRAM = re.compile(r': Start of Program') +RE_END_PROGRAM = re.compile(r': End of Program') + +# Data entry with timestamp +RE_DATA_ENTRY = re.compile(r'^DATA ENTRY : (\d{2}/\d{2}/\d{4} \d{2}:\d{2}:\d{2})') + +# Process/Tool/Machine data +RE_PROCESSDATA = re.compile( + r'\+\+\.PROCESSDATA//METHOD//(\w+)//\s*DIMID//(\w+)//\s*DESCRIPTION//(.+?)//\s*SEQ//\s*(\d+)//\s*MIN//(.+?)//\s*MAX//(.+?)//\s*ACTUAL//(.+?)//\s*DEVIATION//(.+?)//\s*OOT//\s*(\d)//\s*--' +) +RE_TOOLDATA = re.compile( + r'\+\+\.TOOLDATA//METHOD//(\w+)//\s*DIMID//(\w+)//\s*DESCRIPTION//(.+?)//\s*SEQ//\s*(\d+)//\s*MIN//(.+?)//\s*MAX//(.+?)//\s*ACTUAL//(.+?)//\s*DEVIATION//(.+?)//\s*OOT//\s*(\d)//\s*--' +) +RE_MACHINEDATA = re.compile( + r'\+\+\.MACHINEDATA//METHOD//(\w+)//\s*DIMID//(\w+)//.*?MIN//(.+?)//\s*MAX//(.+?)//\s*ACTUAL//(.+?)//\s*DEVIATION//(.+?)//\s*OOT//\s*(\d)//\s*--' +) + +# Item crossing +RE_ITEM_CROSSING = re.compile(r'\+\+\.ITEM-CROSSING//ITEMNO//(.+?)//DESCRIPTION//(.+?)//--') + +# Manual request +RE_MANUAL_REQUEST = re.compile(r'\+\+\.MANUALREQUEST\d?//\s*DESCRIPTION//(.+?)//\s*--') + +# Message +RE_MESSAGE = re.compile(r'\+\+\.MESSAGE//') + +# Start/End markers in data +RE_START = re.compile(r'\+\+\.START//PROGNAME//(.+?)//SNUMBNO//(.+?)//--') +RE_END = re.compile(r'\+\+\.END//--') + +# Error patterns +RE_ERROR_MSG = re.compile(r'^ERROR\s*:\s*(?:Message:\s*)?(.+)', re.IGNORECASE) +RE_EXCEPTION = re.compile(r':\s*!Exception\s*:\s*(.+)') +RE_CANNOT = re.compile(r':\s*((?:cannot|could not|failed to|unable to).+)', re.IGNORECASE) + +# Connection patterns +RE_SERIAL_OPEN = re.compile(r':\s*Serial Connection (\d+) Open') +RE_SERIAL_ACCEPT = re.compile(r':\s*Accepting RS232 Strings on (COM\d+)') +RE_SERIAL_CLOSE = re.compile(r':\s*Serial Connection (\d+) Close') +RE_COM_RAW = re.compile(r':\s*\[(COM\d+)\] RAW LINE:') + + +def parse_timestamp(ts_str: str) -> Optional[datetime]: + """Parse MM/DD/YYYY HH:MM:SS to datetime""" + try: + return datetime.strptime(ts_str, '%m/%d/%Y %H:%M:%S') + except (ValueError, TypeError): + return None + + +def parse_numeric(val_str: str) -> Optional[float]: + """Parse numeric value, handling spaces and leading minus signs""" + if not val_str: + return None + try: + cleaned = val_str.strip().replace(' ', '') + if cleaned == '' or cleaned == '-': + return None + return float(cleaned) + except (ValueError, TypeError): + return None + + +def extract_machine_number(filename: str) -> Optional[str]: + """Extract machine number from filename like UDC_Log_3110.log""" + match = re.search(r'UDC_Log_(\d+)', filename) + if match: + return match.group(1) + return None + + +class UDCParser: + """Parser for UDC log files""" + + def __init__(self, db_config: dict): + self.db_config = db_config + self.conn = None + self.cursor = None + + # Current state + self.current_session_id = None + self.current_partrun_id = None + self.current_timestamp = None + self.current_badge = None + self.current_headers = {} + self.last_program_end = None + self.pending_manual_request = None + + # Counters for current part run + self.measurement_count = 0 + self.manual_count = 0 + self.probe_count = 0 + self.oot_count = 0 + + # Batch storage + self.measurements_batch = [] + self.tooldata_batch = [] + self.events_batch = [] + self.errors_batch = [] + self.connections_batch = [] + + # Track current machine number for errors/connections outside sessions + self.current_machine_number = None + + def connect(self): + """Connect to MySQL database""" + try: + self.conn = mysql.connector.connect(**self.db_config) + self.cursor = self.conn.cursor(dictionary=True) + print(f"Connected to MySQL at {self.db_config['host']}") + return True + except Error as e: + print(f"Error connecting to MySQL: {e}") + return False + + def disconnect(self): + """Disconnect from database""" + if self.cursor: + self.cursor.close() + if self.conn: + self.conn.close() + + def is_file_imported(self, filename: str) -> bool: + """Check if a log file has already been imported""" + basename = os.path.basename(filename) + self.cursor.execute( + "SELECT sessionid FROM udcsessions WHERE logfilename = %s", + (basename,) + ) + return self.cursor.fetchone() is not None + + def create_session(self, filename: str, machine_number: str, start_time: datetime) -> int: + """Create a new session record""" + basename = os.path.basename(filename) + self.cursor.execute( + """INSERT INTO udcsessions (machinenumber, logfilename, sessionstart, dateadded) + VALUES (%s, %s, %s, NOW())""", + (machine_number, basename, start_time) + ) + self.conn.commit() + return self.cursor.lastrowid + + def update_session_end(self, session_id: int, end_time: datetime, record_count: int): + """Update session with end time and record count""" + self.cursor.execute( + """UPDATE udcsessions SET sessionend = %s, recordcount = %s WHERE sessionid = %s""", + (end_time, record_count, session_id) + ) + self.conn.commit() + + def create_part_run(self) -> int: + """Create a new part run record""" + changeover = None + if self.last_program_end and self.current_timestamp: + changeover = int((self.current_timestamp - self.last_program_end).total_seconds()) + + self.cursor.execute( + """INSERT INTO udcparts + (sessionid, partnumber, opernumber, serialnumber, programname, jobnumber, + badgenumber, programstart, changeover, dateadded) + VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, NOW())""", + ( + self.current_session_id, + self.current_headers.get('part'), + self.current_headers.get('oper'), + self.current_headers.get('serial'), + self.current_headers.get('progname'), + self.current_headers.get('job'), + self.current_badge, + self.current_timestamp, + changeover + ) + ) + self.conn.commit() + + # Reset counters + self.measurement_count = 0 + self.manual_count = 0 + self.probe_count = 0 + self.oot_count = 0 + + return self.cursor.lastrowid + + def end_part_run(self): + """Finalize the current part run""" + if not self.current_partrun_id: + return + + # Calculate cycle time + cycletime = None + if self.current_timestamp: + self.cursor.execute( + "SELECT programstart FROM udcparts WHERE partrunid = %s", + (self.current_partrun_id,) + ) + row = self.cursor.fetchone() + if row and row['programstart']: + cycletime = int((self.current_timestamp - row['programstart']).total_seconds()) + + # Update part run with end time and counts + self.cursor.execute( + """UPDATE udcparts SET + programend = %s, cycletime = %s, + measurementcount = %s, manualcount = %s, probecount = %s, ootcount = %s + WHERE partrunid = %s""", + ( + self.current_timestamp, cycletime, + self.measurement_count, self.manual_count, self.probe_count, self.oot_count, + self.current_partrun_id + ) + ) + self.conn.commit() + + self.last_program_end = self.current_timestamp + self.current_partrun_id = None + + def add_measurement(self, event_type: str, method: str, dimid: str, description: str, + seq: int, minval: float, maxval: float, actualval: float, + deviation: float, oot: int): + """Add a measurement record""" + self.measurements_batch.append(( + self.current_partrun_id, + self.current_session_id, + self.current_timestamp, + event_type, + method, + dimid, + description[:255] if description else None, + seq, + minval, + maxval, + actualval, + deviation, + oot + )) + + # Update counters + self.measurement_count += 1 + if method and method.upper() == 'MANUAL': + self.manual_count += 1 + elif method and method.upper() == 'PROBE': + self.probe_count += 1 + if oot: + self.oot_count += 1 + + # Check for pending manual request + if self.pending_manual_request and method and method.upper() == 'MANUAL': + self.complete_manual_request() + + # Flush batch if needed + if len(self.measurements_batch) >= BATCH_SIZE: + self.flush_measurements() + + def add_tooldata(self, method: str, dimid: str, description: str, + toolnumber: int, minval: float, maxval: float, actualval: float, + deviation: float, oot: int): + """Add a tool data record""" + self.tooldata_batch.append(( + self.current_partrun_id, + self.current_session_id, + self.current_timestamp, + method, + dimid, + description[:255] if description else None, + toolnumber, + minval, + maxval, + actualval, + deviation, + oot + )) + + # Flush batch if needed + if len(self.tooldata_batch) >= BATCH_SIZE: + self.flush_tooldata() + + def add_event(self, event_type: str, item_number: str = None, description: str = None): + """Add an event record""" + self.events_batch.append(( + self.current_partrun_id, + self.current_session_id, + self.current_timestamp, + event_type, + item_number, + description + )) + + if len(self.events_batch) >= BATCH_SIZE: + self.flush_events() + + def start_manual_request(self, description: str): + """Record start of a manual request""" + self.pending_manual_request = { + 'time': self.current_timestamp, + 'description': description, + 'partrunid': self.current_partrun_id + } + + def complete_manual_request(self): + """Complete a manual request with the response measurement""" + if not self.pending_manual_request: + return + + response_seconds = None + if self.pending_manual_request['time'] and self.current_timestamp: + response_seconds = int((self.current_timestamp - self.pending_manual_request['time']).total_seconds()) + + self.cursor.execute( + """INSERT INTO udcmanualrequests + (partrunid, requesttime, responsetime, responseseconds, description) + VALUES (%s, %s, %s, %s, %s)""", + ( + self.pending_manual_request['partrunid'], + self.pending_manual_request['time'], + self.current_timestamp, + response_seconds, + self.pending_manual_request['description'][:255] if self.pending_manual_request['description'] else None + ) + ) + + self.pending_manual_request = None + + def flush_measurements(self): + """Flush measurement batch to database""" + if not self.measurements_batch: + return + + self.cursor.executemany( + """INSERT INTO udcmeasurements + (partrunid, sessionid, eventtime, eventtype, method, dimid, description, + seqnumber, minval, maxval, actualval, deviation, oot) + VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)""", + self.measurements_batch + ) + self.conn.commit() + self.measurements_batch = [] + + def flush_tooldata(self): + """Flush tool data batch to database""" + if not self.tooldata_batch: + return + + self.cursor.executemany( + """INSERT INTO udctooldata + (partrunid, sessionid, eventtime, method, dimid, description, + toolnumber, minval, maxval, actualval, deviation, oot) + VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)""", + self.tooldata_batch + ) + self.conn.commit() + self.tooldata_batch = [] + + def flush_events(self): + """Flush event batch to database""" + if not self.events_batch: + return + + self.cursor.executemany( + """INSERT INTO udcevents + (partrunid, sessionid, eventtime, eventtype, itemnumber, description) + VALUES (%s, %s, %s, %s, %s, %s)""", + self.events_batch + ) + self.conn.commit() + self.events_batch = [] + + def add_error(self, error_type: str, error_message: str, source_method: str = None): + """Add an error record""" + self.errors_batch.append(( + self.current_session_id, + self.current_machine_number, + self.current_timestamp, + error_type[:50] if error_type else 'UNKNOWN', + error_message[:500] if error_message else None, + source_method[:100] if source_method else None + )) + + if len(self.errors_batch) >= BATCH_SIZE: + self.flush_errors() + + def add_connection(self, event_type: str, comport: str = None, details: str = None): + """Add a connection event record""" + self.connections_batch.append(( + self.current_session_id, + self.current_machine_number, + self.current_timestamp, + event_type[:20] if event_type else 'UNKNOWN', + comport[:10] if comport else None, + details[:255] if details else None + )) + + if len(self.connections_batch) >= BATCH_SIZE: + self.flush_connections() + + def flush_errors(self): + """Flush error batch to database""" + if not self.errors_batch: + return + + self.cursor.executemany( + """INSERT INTO udcerrors + (sessionid, machinenumber, eventtime, errortype, errormessage, sourcemethod) + VALUES (%s, %s, %s, %s, %s, %s)""", + self.errors_batch + ) + self.conn.commit() + self.errors_batch = [] + + def flush_connections(self): + """Flush connection batch to database""" + if not self.connections_batch: + return + + self.cursor.executemany( + """INSERT INTO udcconnections + (sessionid, machinenumber, eventtime, eventtype, comport, details) + VALUES (%s, %s, %s, %s, %s, %s)""", + self.connections_batch + ) + self.conn.commit() + self.connections_batch = [] + + def parse_file(self, filepath: str) -> Dict[str, Any]: + """Parse a single UDC log file""" + filename = os.path.basename(filepath) + machine_number = extract_machine_number(filename) + + if not machine_number: + return {'success': False, 'error': f'Could not extract machine number from {filename}'} + + # Check if already imported + if self.is_file_imported(filepath): + return {'success': False, 'error': f'File already imported: {filename}'} + + print(f"Parsing {filename} (machine {machine_number})...") + + # Reset state + self.current_session_id = None + self.current_partrun_id = None + self.current_timestamp = None + self.current_badge = None + self.current_headers = {} + self.last_program_end = None + self.pending_manual_request = None + self.measurements_batch = [] + self.tooldata_batch = [] + self.events_batch = [] + self.errors_batch = [] + self.connections_batch = [] + self.current_machine_number = machine_number + + record_count = 0 + session_start = None + session_end = None + + try: + with open(filepath, 'r', encoding='utf-8', errors='ignore') as f: + for line_num, line in enumerate(f, 1): + line = line.rstrip() + + # Skip empty lines + if not line: + continue + + # Check for log start + match = RE_START_LOG.search(line) + if match: + ts = parse_timestamp(match.group(1)) + if ts and not session_start: + session_start = ts + self.current_timestamp = ts + self.current_session_id = self.create_session(filepath, machine_number, ts) + continue + + # Check for log end + match = RE_END_LOG.search(line) + if match: + ts = parse_timestamp(match.group(1)) + if ts: + session_end = ts + self.current_timestamp = ts + continue + + # Check for DATA ENTRY timestamp + match = RE_DATA_ENTRY.search(line) + if match: + ts = parse_timestamp(match.group(1)) + if ts: + self.current_timestamp = ts + continue + + # Check for any other timestamp in line + match = RE_TIMESTAMP.search(line) + if match and ': ' in line: + ts = parse_timestamp(match.group(1)) + if ts: + self.current_timestamp = ts + + # Check for headers + match = RE_HEADERS.search(line) + if match: + self.current_headers = { + 'part': match.group(1) if match.group(1) else None, + 'oper': match.group(2) if match.group(2) else None, + 'serial': match.group(3) if match.group(3) else None, + 'progname': match.group(4) if match.group(4) else None, + 'job': match.group(5) if match.group(5) else None + } + continue + + # Check for badge + match = RE_BADGE.search(line) + if match: + self.current_badge = match.group(1) + continue + + # Check for program start + if RE_START_PROGRAM.search(line) or RE_START.search(line): + if self.current_partrun_id: + self.end_part_run() + self.current_partrun_id = self.create_part_run() + record_count += 1 + continue + + # Check for program end + if RE_END_PROGRAM.search(line) or RE_END.search(line): + self.end_part_run() + record_count += 1 + continue + + # Check for PROCESSDATA + match = RE_PROCESSDATA.search(line) + if match: + self.add_measurement( + 'PROCESSDATA', + match.group(1), # method + match.group(2), # dimid + match.group(3), # description + int(match.group(4)) if match.group(4) else 0, # seq + parse_numeric(match.group(5)), # min + parse_numeric(match.group(6)), # max + parse_numeric(match.group(7)), # actual + parse_numeric(match.group(8)), # deviation + int(match.group(9)) if match.group(9) else 0 # oot + ) + record_count += 1 + continue + + # Check for TOOLDATA + match = RE_TOOLDATA.search(line) + if match: + self.add_tooldata( + match.group(1), # method + match.group(2), # dimid + match.group(3), # description + int(match.group(4)) if match.group(4) else 0, # toolnumber (SEQ) + parse_numeric(match.group(5)), # min + parse_numeric(match.group(6)), # max + parse_numeric(match.group(7)), # actual + parse_numeric(match.group(8)), # deviation + int(match.group(9)) if match.group(9) else 0 # oot + ) + record_count += 1 + continue + + # Check for MACHINEDATA (simplified pattern) + match = RE_MACHINEDATA.search(line) + if match: + self.add_measurement( + 'MACHINEDATA', + match.group(1), # method + match.group(2), # dimid + None, # description + 0, # seq + parse_numeric(match.group(3)), + parse_numeric(match.group(4)), + parse_numeric(match.group(5)), + parse_numeric(match.group(6)), + int(match.group(7)) if match.group(7) else 0 + ) + record_count += 1 + continue + + # Check for item crossing + match = RE_ITEM_CROSSING.search(line) + if match: + self.add_event('ITEM-CROSSING', match.group(1), match.group(2)) + record_count += 1 + continue + + # Check for manual request + match = RE_MANUAL_REQUEST.search(line) + if match: + self.start_manual_request(match.group(1)) + self.add_event('MANUALREQUEST', None, match.group(1)) + record_count += 1 + continue + + # Check for message + if RE_MESSAGE.search(line): + self.add_event('MESSAGE', None, line[:500]) + record_count += 1 + continue + + # Check for errors + match = RE_ERROR_MSG.search(line) + if match: + self.add_error('ERROR', match.group(1)) + record_count += 1 + continue + + match = RE_EXCEPTION.search(line) + if match: + self.add_error('EXCEPTION', match.group(1)) + record_count += 1 + continue + + match = RE_CANNOT.search(line) + if match: + self.add_error('FAILURE', match.group(1)) + record_count += 1 + continue + + # Check for connection events + match = RE_SERIAL_OPEN.search(line) + if match: + self.add_connection('OPEN', f'Serial{match.group(1)}', 'Serial connection opened') + record_count += 1 + continue + + match = RE_SERIAL_ACCEPT.search(line) + if match: + self.add_connection('LISTENING', match.group(1), f'Accepting RS232 strings on {match.group(1)}') + record_count += 1 + continue + + match = RE_SERIAL_CLOSE.search(line) + if match: + self.add_connection('CLOSE', f'Serial{match.group(1)}', 'Serial connection closed') + record_count += 1 + continue + + # Flush remaining batches + self.flush_measurements() + self.flush_tooldata() + self.flush_events() + self.flush_errors() + self.flush_connections() + + # Close any open part run + if self.current_partrun_id: + self.end_part_run() + + # Update session end + if self.current_session_id: + self.update_session_end(self.current_session_id, session_end or self.current_timestamp, record_count) + + print(f" Parsed {record_count} records") + return {'success': True, 'records': record_count, 'session_id': self.current_session_id} + + except Exception as e: + print(f" Error parsing file: {e}") + return {'success': False, 'error': str(e)} + + def parse_directory(self, directory: str, max_workers: int = 4) -> Dict[str, Any]: + """Parse all UDC log files in a directory using parallel processing""" + results = { + 'total_files': 0, + 'imported': 0, + 'skipped': 0, + 'errors': 0, + 'total_records': 0 + } + + # Find all log files + pattern = os.path.join(directory, 'UDC_Log_*.log') + files = glob.glob(pattern) + + # Also check subdirectories + for subdir in ['3204_BAD', '7606_BAD', 'Dual Spindle']: + subpath = os.path.join(directory, subdir) + if os.path.isdir(subpath): + files.extend(glob.glob(os.path.join(subpath, '**', 'UDC_Log_*.log'), recursive=True)) + + results['total_files'] = len(files) + print(f"Found {len(files)} log files") + + if max_workers <= 1: + # Sequential processing + for filepath in sorted(files): + result = self.parse_file(filepath) + + if result['success']: + results['imported'] += 1 + results['total_records'] += result.get('records', 0) + elif 'already imported' in result.get('error', '').lower(): + results['skipped'] += 1 + else: + results['errors'] += 1 + print(f" Error: {result.get('error')}") + else: + # Parallel processing + print(f"Using {max_workers} parallel workers...") + completed = 0 + + with ThreadPoolExecutor(max_workers=max_workers) as executor: + # Submit all files for processing + futures = { + executor.submit(parse_file_worker, filepath, self.db_config): filepath + for filepath in sorted(files) + } + + for future in as_completed(futures): + completed += 1 + filepath = futures[future] + + try: + result = future.result() + + if result['success']: + results['imported'] += 1 + results['total_records'] += result.get('records', 0) + elif 'already imported' in result.get('error', '').lower(): + results['skipped'] += 1 + else: + results['errors'] += 1 + print(f" Error in {os.path.basename(filepath)}: {result.get('error')}") + + except Exception as e: + results['errors'] += 1 + print(f" Worker error for {os.path.basename(filepath)}: {e}") + + # Progress update + if completed % 10 == 0 or completed == len(files): + print(f" Progress: {completed}/{len(files)} files processed...", end='\r') + + print() # New line after progress + + return results + + +def parse_file_worker(filepath: str, db_config: dict) -> Dict[str, Any]: + """Worker function for parallel file parsing. Creates its own DB connection.""" + parser = UDCParser(db_config) + if not parser.connect(): + return {'success': False, 'error': 'Failed to connect to database'} + + try: + result = parser.parse_file(filepath) + return result + finally: + parser.disconnect() + + +def main(): + parser = argparse.ArgumentParser(description='Parse UDC log files and import to database') + parser.add_argument('--dir', default=LOG_DIRECTORY, help='Directory containing log files') + parser.add_argument('--file', help='Parse a specific file instead of directory') + parser.add_argument('--workers', type=int, default=2, help='Number of parallel workers (default: 2, use 1 for sequential)') + parser.add_argument('--host', default=DB_CONFIG['host'], help='MySQL host') + parser.add_argument('--port', type=int, default=DB_CONFIG['port'], help='MySQL port') + parser.add_argument('--user', default=DB_CONFIG['user'], help='MySQL user') + parser.add_argument('--password', default=DB_CONFIG['password'], help='MySQL password') + parser.add_argument('--database', default=DB_CONFIG['database'], help='MySQL database') + + args = parser.parse_args() + + # Build config from args + db_config = { + 'host': args.host, + 'port': args.port, + 'user': args.user, + 'password': args.password, + 'database': args.database + } + + # Create parser and connect + udc_parser = UDCParser(db_config) + if not udc_parser.connect(): + sys.exit(1) + + try: + if args.file: + result = udc_parser.parse_file(args.file) + if result['success']: + print(f"\nSuccessfully imported {result.get('records', 0)} records") + else: + print(f"\nFailed: {result.get('error')}") + sys.exit(1) + else: + results = udc_parser.parse_directory(args.dir, max_workers=args.workers) + print(f"\n{'='*50}") + print(f"Import Summary:") + print(f" Total files found: {results['total_files']}") + print(f" Files imported: {results['imported']}") + print(f" Files skipped: {results['skipped']}") + print(f" Files with errors: {results['errors']}") + print(f" Total records: {results['total_records']}") + + finally: + udc_parser.disconnect() + + +if __name__ == '__main__': + main()