Initial commit: UDC/CLM parser for ShopDB

Python parsers for Universal Data Collector (UDC) and Control Loop
Monitor (CLM) data from CNC machines.

- clmparser.py: Main parser for CLM JSON data files
  - Incremental imports using file hashes
  - Extracts parts, measurements, tool data, violations
  - Calculates cycle times and changeovers

- udcparser.py: Parser for raw UDC log files
  - Application events, errors, connections

- config.py: Database configuration (dev/prod)
- backfill_changeover.py: One-time migration utility

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
cproudlock
2025-12-16 07:54:54 -05:00
commit 5c707c3cd4
6 changed files with 2182 additions and 0 deletions

View File

@@ -0,0 +1,47 @@
#!/usr/bin/env python3
"""Backfill changeover values for udcparts"""
import mysql.connector
conn = mysql.connector.connect(
host='127.0.0.1', port=3306, user='root',
password='rootpassword', database='shopdb'
)
cursor = conn.cursor()
# Get all machines
cursor.execute("SELECT DISTINCT machinenumber FROM udcsessions ORDER BY machinenumber")
machines = [row[0] for row in cursor.fetchall()]
total_updated = 0
for machine in machines:
# Get all parts for this machine ordered by start time
cursor.execute("""
SELECT p.partrunid, p.programstart, p.programend
FROM udcparts p
JOIN udcsessions s ON p.sessionid = s.sessionid
WHERE s.machinenumber = %s AND p.programstart IS NOT NULL
ORDER BY p.programstart ASC
""", (machine,))
parts = cursor.fetchall()
prev_end = None
updates = []
for partrunid, programstart, programend in parts:
if prev_end and programstart:
changeover = int((programstart - prev_end).total_seconds())
# Only set if reasonable (> 0 and < 24 hours)
if 0 < changeover < 86400:
updates.append((changeover, partrunid))
prev_end = programend if programend else prev_end
# Batch update
if updates:
cursor.executemany("UPDATE udcparts SET changeover = %s WHERE partrunid = %s", updates)
conn.commit()
total_updated += len(updates)
print(f"Machine {machine}: {len(updates)} changeovers set")
print(f"\nTotal updated: {total_updated}")
cursor.close()
conn.close()

1221
parser/clmparser.py Normal file

File diff suppressed because it is too large Load Diff

36
parser/config.py Normal file
View File

@@ -0,0 +1,36 @@
"""
UDC Parser Configuration
"""
import platform
# Detect OS
IS_WINDOWS = platform.system() == 'Windows'
# CLM Data paths
CLM_DATA_PATH_WINDOWS = r'S:\SPC\UDC\CLM_Data'
CLM_DATA_PATH_LINUX = '/home/camp/projects/UDC/CLM_Data'
CLM_DATA_PATH = CLM_DATA_PATH_WINDOWS if IS_WINDOWS else CLM_DATA_PATH_LINUX
# Database - Development (Docker on Linux)
DB_CONFIG_DEV = {
'host': '127.0.0.1',
'port': 3306,
'user': 'root',
'password': 'rootpassword',
'database': 'shopdb'
}
# Database - Production (update these values)
DB_CONFIG_PROD = {
'host': 'PROD_MYSQL_HOST', # TODO: Update with production host
'port': 3306,
'user': 'PROD_USER', # TODO: Update with production user
'password': 'PROD_PASSWORD', # TODO: Update with production password
'database': 'shopdb'
}
# Default config based on OS (Windows = prod, Linux = dev)
DB_CONFIG = DB_CONFIG_PROD if IS_WINDOWS else DB_CONFIG_DEV
# Batch insert size
BATCH_SIZE = 1000

1
parser/requirements.txt Normal file
View File

@@ -0,0 +1 @@
mysql-connector-python>=8.0.0

857
parser/udcparser.py Normal file
View File

@@ -0,0 +1,857 @@
#!/usr/bin/env python3
"""
UDC Log Parser
Parses UDC (Universal Data Collector) log files and imports data into ShopDB.
Usage:
python udcparser.py [--dir /path/to/logs] [--file specific_file.log]
"""
import re
import os
import sys
import glob
import argparse
from datetime import datetime
from typing import Optional, Dict, List, Any
from concurrent.futures import ThreadPoolExecutor, as_completed
import mysql.connector
from mysql.connector import Error
# Import config
try:
from config import DB_CONFIG_DEV as DB_CONFIG, LOG_DIRECTORY, BATCH_SIZE
except ImportError:
DB_CONFIG = {
'host': '127.0.0.1',
'port': 3306,
'user': 'root',
'password': 'rootpassword',
'database': 'shopdb'
}
LOG_DIRECTORY = '/home/camp/projects/UDC/LogFiles'
BATCH_SIZE = 1000
# =============================================================================
# Regex Patterns
# =============================================================================
# Timestamp pattern: MM/DD/YYYY HH:MM:SS
RE_TIMESTAMP = re.compile(r'(\d{2}/\d{2}/\d{4} \d{2}:\d{2}:\d{2})')
# Log start/end
RE_START_LOG = re.compile(r'^Start Log : (\d{2}/\d{2}/\d{4} \d{2}:\d{2}:\d{2})')
RE_END_LOG = re.compile(r'^End Log : (\d{2}/\d{2}/\d{4} \d{2}:\d{2}:\d{2})')
# Headers loaded
RE_HEADERS = re.compile(r'Headers loaded \(Path 1\): Part=(\S*) Oper=(\S*) Serial=(\S*) ProgName=(\S*) Job=(\S*)')
# Badge number
RE_BADGE = re.compile(r'BadgeNumber = (\S+)')
# Program start/end
RE_START_PROGRAM = re.compile(r': Start of Program')
RE_END_PROGRAM = re.compile(r': End of Program')
# Data entry with timestamp
RE_DATA_ENTRY = re.compile(r'^DATA ENTRY : (\d{2}/\d{2}/\d{4} \d{2}:\d{2}:\d{2})')
# Process/Tool/Machine data
RE_PROCESSDATA = re.compile(
r'\+\+\.PROCESSDATA//METHOD//(\w+)//\s*DIMID//(\w+)//\s*DESCRIPTION//(.+?)//\s*SEQ//\s*(\d+)//\s*MIN//(.+?)//\s*MAX//(.+?)//\s*ACTUAL//(.+?)//\s*DEVIATION//(.+?)//\s*OOT//\s*(\d)//\s*--'
)
RE_TOOLDATA = re.compile(
r'\+\+\.TOOLDATA//METHOD//(\w+)//\s*DIMID//(\w+)//\s*DESCRIPTION//(.+?)//\s*SEQ//\s*(\d+)//\s*MIN//(.+?)//\s*MAX//(.+?)//\s*ACTUAL//(.+?)//\s*DEVIATION//(.+?)//\s*OOT//\s*(\d)//\s*--'
)
RE_MACHINEDATA = re.compile(
r'\+\+\.MACHINEDATA//METHOD//(\w+)//\s*DIMID//(\w+)//.*?MIN//(.+?)//\s*MAX//(.+?)//\s*ACTUAL//(.+?)//\s*DEVIATION//(.+?)//\s*OOT//\s*(\d)//\s*--'
)
# Item crossing
RE_ITEM_CROSSING = re.compile(r'\+\+\.ITEM-CROSSING//ITEMNO//(.+?)//DESCRIPTION//(.+?)//--')
# Manual request
RE_MANUAL_REQUEST = re.compile(r'\+\+\.MANUALREQUEST\d?//\s*DESCRIPTION//(.+?)//\s*--')
# Message
RE_MESSAGE = re.compile(r'\+\+\.MESSAGE//')
# Start/End markers in data
RE_START = re.compile(r'\+\+\.START//PROGNAME//(.+?)//SNUMBNO//(.+?)//--')
RE_END = re.compile(r'\+\+\.END//--')
# Error patterns
RE_ERROR_MSG = re.compile(r'^ERROR\s*:\s*(?:Message:\s*)?(.+)', re.IGNORECASE)
RE_EXCEPTION = re.compile(r':\s*!Exception\s*:\s*(.+)')
RE_CANNOT = re.compile(r':\s*((?:cannot|could not|failed to|unable to).+)', re.IGNORECASE)
# Connection patterns
RE_SERIAL_OPEN = re.compile(r':\s*Serial Connection (\d+) Open')
RE_SERIAL_ACCEPT = re.compile(r':\s*Accepting RS232 Strings on (COM\d+)')
RE_SERIAL_CLOSE = re.compile(r':\s*Serial Connection (\d+) Close')
RE_COM_RAW = re.compile(r':\s*\[(COM\d+)\] RAW LINE:')
def parse_timestamp(ts_str: str) -> Optional[datetime]:
"""Parse MM/DD/YYYY HH:MM:SS to datetime"""
try:
return datetime.strptime(ts_str, '%m/%d/%Y %H:%M:%S')
except (ValueError, TypeError):
return None
def parse_numeric(val_str: str) -> Optional[float]:
"""Parse numeric value, handling spaces and leading minus signs"""
if not val_str:
return None
try:
cleaned = val_str.strip().replace(' ', '')
if cleaned == '' or cleaned == '-':
return None
return float(cleaned)
except (ValueError, TypeError):
return None
def extract_machine_number(filename: str) -> Optional[str]:
"""Extract machine number from filename like UDC_Log_3110.log"""
match = re.search(r'UDC_Log_(\d+)', filename)
if match:
return match.group(1)
return None
class UDCParser:
"""Parser for UDC log files"""
def __init__(self, db_config: dict):
self.db_config = db_config
self.conn = None
self.cursor = None
# Current state
self.current_session_id = None
self.current_partrun_id = None
self.current_timestamp = None
self.current_badge = None
self.current_headers = {}
self.last_program_end = None
self.pending_manual_request = None
# Counters for current part run
self.measurement_count = 0
self.manual_count = 0
self.probe_count = 0
self.oot_count = 0
# Batch storage
self.measurements_batch = []
self.tooldata_batch = []
self.events_batch = []
self.errors_batch = []
self.connections_batch = []
# Track current machine number for errors/connections outside sessions
self.current_machine_number = None
def connect(self):
"""Connect to MySQL database"""
try:
self.conn = mysql.connector.connect(**self.db_config)
self.cursor = self.conn.cursor(dictionary=True)
print(f"Connected to MySQL at {self.db_config['host']}")
return True
except Error as e:
print(f"Error connecting to MySQL: {e}")
return False
def disconnect(self):
"""Disconnect from database"""
if self.cursor:
self.cursor.close()
if self.conn:
self.conn.close()
def is_file_imported(self, filename: str) -> bool:
"""Check if a log file has already been imported"""
basename = os.path.basename(filename)
self.cursor.execute(
"SELECT sessionid FROM udcsessions WHERE logfilename = %s",
(basename,)
)
return self.cursor.fetchone() is not None
def create_session(self, filename: str, machine_number: str, start_time: datetime) -> int:
"""Create a new session record"""
basename = os.path.basename(filename)
self.cursor.execute(
"""INSERT INTO udcsessions (machinenumber, logfilename, sessionstart, dateadded)
VALUES (%s, %s, %s, NOW())""",
(machine_number, basename, start_time)
)
self.conn.commit()
return self.cursor.lastrowid
def update_session_end(self, session_id: int, end_time: datetime, record_count: int):
"""Update session with end time and record count"""
self.cursor.execute(
"""UPDATE udcsessions SET sessionend = %s, recordcount = %s WHERE sessionid = %s""",
(end_time, record_count, session_id)
)
self.conn.commit()
def create_part_run(self) -> int:
"""Create a new part run record"""
changeover = None
if self.last_program_end and self.current_timestamp:
changeover = int((self.current_timestamp - self.last_program_end).total_seconds())
self.cursor.execute(
"""INSERT INTO udcparts
(sessionid, partnumber, opernumber, serialnumber, programname, jobnumber,
badgenumber, programstart, changeover, dateadded)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, NOW())""",
(
self.current_session_id,
self.current_headers.get('part'),
self.current_headers.get('oper'),
self.current_headers.get('serial'),
self.current_headers.get('progname'),
self.current_headers.get('job'),
self.current_badge,
self.current_timestamp,
changeover
)
)
self.conn.commit()
# Reset counters
self.measurement_count = 0
self.manual_count = 0
self.probe_count = 0
self.oot_count = 0
return self.cursor.lastrowid
def end_part_run(self):
"""Finalize the current part run"""
if not self.current_partrun_id:
return
# Calculate cycle time
cycletime = None
if self.current_timestamp:
self.cursor.execute(
"SELECT programstart FROM udcparts WHERE partrunid = %s",
(self.current_partrun_id,)
)
row = self.cursor.fetchone()
if row and row['programstart']:
cycletime = int((self.current_timestamp - row['programstart']).total_seconds())
# Update part run with end time and counts
self.cursor.execute(
"""UPDATE udcparts SET
programend = %s, cycletime = %s,
measurementcount = %s, manualcount = %s, probecount = %s, ootcount = %s
WHERE partrunid = %s""",
(
self.current_timestamp, cycletime,
self.measurement_count, self.manual_count, self.probe_count, self.oot_count,
self.current_partrun_id
)
)
self.conn.commit()
self.last_program_end = self.current_timestamp
self.current_partrun_id = None
def add_measurement(self, event_type: str, method: str, dimid: str, description: str,
seq: int, minval: float, maxval: float, actualval: float,
deviation: float, oot: int):
"""Add a measurement record"""
self.measurements_batch.append((
self.current_partrun_id,
self.current_session_id,
self.current_timestamp,
event_type,
method,
dimid,
description[:255] if description else None,
seq,
minval,
maxval,
actualval,
deviation,
oot
))
# Update counters
self.measurement_count += 1
if method and method.upper() == 'MANUAL':
self.manual_count += 1
elif method and method.upper() == 'PROBE':
self.probe_count += 1
if oot:
self.oot_count += 1
# Check for pending manual request
if self.pending_manual_request and method and method.upper() == 'MANUAL':
self.complete_manual_request()
# Flush batch if needed
if len(self.measurements_batch) >= BATCH_SIZE:
self.flush_measurements()
def add_tooldata(self, method: str, dimid: str, description: str,
toolnumber: int, minval: float, maxval: float, actualval: float,
deviation: float, oot: int):
"""Add a tool data record"""
self.tooldata_batch.append((
self.current_partrun_id,
self.current_session_id,
self.current_timestamp,
method,
dimid,
description[:255] if description else None,
toolnumber,
minval,
maxval,
actualval,
deviation,
oot
))
# Flush batch if needed
if len(self.tooldata_batch) >= BATCH_SIZE:
self.flush_tooldata()
def add_event(self, event_type: str, item_number: str = None, description: str = None):
"""Add an event record"""
self.events_batch.append((
self.current_partrun_id,
self.current_session_id,
self.current_timestamp,
event_type,
item_number,
description
))
if len(self.events_batch) >= BATCH_SIZE:
self.flush_events()
def start_manual_request(self, description: str):
"""Record start of a manual request"""
self.pending_manual_request = {
'time': self.current_timestamp,
'description': description,
'partrunid': self.current_partrun_id
}
def complete_manual_request(self):
"""Complete a manual request with the response measurement"""
if not self.pending_manual_request:
return
response_seconds = None
if self.pending_manual_request['time'] and self.current_timestamp:
response_seconds = int((self.current_timestamp - self.pending_manual_request['time']).total_seconds())
self.cursor.execute(
"""INSERT INTO udcmanualrequests
(partrunid, requesttime, responsetime, responseseconds, description)
VALUES (%s, %s, %s, %s, %s)""",
(
self.pending_manual_request['partrunid'],
self.pending_manual_request['time'],
self.current_timestamp,
response_seconds,
self.pending_manual_request['description'][:255] if self.pending_manual_request['description'] else None
)
)
self.pending_manual_request = None
def flush_measurements(self):
"""Flush measurement batch to database"""
if not self.measurements_batch:
return
self.cursor.executemany(
"""INSERT INTO udcmeasurements
(partrunid, sessionid, eventtime, eventtype, method, dimid, description,
seqnumber, minval, maxval, actualval, deviation, oot)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)""",
self.measurements_batch
)
self.conn.commit()
self.measurements_batch = []
def flush_tooldata(self):
"""Flush tool data batch to database"""
if not self.tooldata_batch:
return
self.cursor.executemany(
"""INSERT INTO udctooldata
(partrunid, sessionid, eventtime, method, dimid, description,
toolnumber, minval, maxval, actualval, deviation, oot)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)""",
self.tooldata_batch
)
self.conn.commit()
self.tooldata_batch = []
def flush_events(self):
"""Flush event batch to database"""
if not self.events_batch:
return
self.cursor.executemany(
"""INSERT INTO udcevents
(partrunid, sessionid, eventtime, eventtype, itemnumber, description)
VALUES (%s, %s, %s, %s, %s, %s)""",
self.events_batch
)
self.conn.commit()
self.events_batch = []
def add_error(self, error_type: str, error_message: str, source_method: str = None):
"""Add an error record"""
self.errors_batch.append((
self.current_session_id,
self.current_machine_number,
self.current_timestamp,
error_type[:50] if error_type else 'UNKNOWN',
error_message[:500] if error_message else None,
source_method[:100] if source_method else None
))
if len(self.errors_batch) >= BATCH_SIZE:
self.flush_errors()
def add_connection(self, event_type: str, comport: str = None, details: str = None):
"""Add a connection event record"""
self.connections_batch.append((
self.current_session_id,
self.current_machine_number,
self.current_timestamp,
event_type[:20] if event_type else 'UNKNOWN',
comport[:10] if comport else None,
details[:255] if details else None
))
if len(self.connections_batch) >= BATCH_SIZE:
self.flush_connections()
def flush_errors(self):
"""Flush error batch to database"""
if not self.errors_batch:
return
self.cursor.executemany(
"""INSERT INTO udcerrors
(sessionid, machinenumber, eventtime, errortype, errormessage, sourcemethod)
VALUES (%s, %s, %s, %s, %s, %s)""",
self.errors_batch
)
self.conn.commit()
self.errors_batch = []
def flush_connections(self):
"""Flush connection batch to database"""
if not self.connections_batch:
return
self.cursor.executemany(
"""INSERT INTO udcconnections
(sessionid, machinenumber, eventtime, eventtype, comport, details)
VALUES (%s, %s, %s, %s, %s, %s)""",
self.connections_batch
)
self.conn.commit()
self.connections_batch = []
def parse_file(self, filepath: str) -> Dict[str, Any]:
"""Parse a single UDC log file"""
filename = os.path.basename(filepath)
machine_number = extract_machine_number(filename)
if not machine_number:
return {'success': False, 'error': f'Could not extract machine number from {filename}'}
# Check if already imported
if self.is_file_imported(filepath):
return {'success': False, 'error': f'File already imported: {filename}'}
print(f"Parsing {filename} (machine {machine_number})...")
# Reset state
self.current_session_id = None
self.current_partrun_id = None
self.current_timestamp = None
self.current_badge = None
self.current_headers = {}
self.last_program_end = None
self.pending_manual_request = None
self.measurements_batch = []
self.tooldata_batch = []
self.events_batch = []
self.errors_batch = []
self.connections_batch = []
self.current_machine_number = machine_number
record_count = 0
session_start = None
session_end = None
try:
with open(filepath, 'r', encoding='utf-8', errors='ignore') as f:
for line_num, line in enumerate(f, 1):
line = line.rstrip()
# Skip empty lines
if not line:
continue
# Check for log start
match = RE_START_LOG.search(line)
if match:
ts = parse_timestamp(match.group(1))
if ts and not session_start:
session_start = ts
self.current_timestamp = ts
self.current_session_id = self.create_session(filepath, machine_number, ts)
continue
# Check for log end
match = RE_END_LOG.search(line)
if match:
ts = parse_timestamp(match.group(1))
if ts:
session_end = ts
self.current_timestamp = ts
continue
# Check for DATA ENTRY timestamp
match = RE_DATA_ENTRY.search(line)
if match:
ts = parse_timestamp(match.group(1))
if ts:
self.current_timestamp = ts
continue
# Check for any other timestamp in line
match = RE_TIMESTAMP.search(line)
if match and ': ' in line:
ts = parse_timestamp(match.group(1))
if ts:
self.current_timestamp = ts
# Check for headers
match = RE_HEADERS.search(line)
if match:
self.current_headers = {
'part': match.group(1) if match.group(1) else None,
'oper': match.group(2) if match.group(2) else None,
'serial': match.group(3) if match.group(3) else None,
'progname': match.group(4) if match.group(4) else None,
'job': match.group(5) if match.group(5) else None
}
continue
# Check for badge
match = RE_BADGE.search(line)
if match:
self.current_badge = match.group(1)
continue
# Check for program start
if RE_START_PROGRAM.search(line) or RE_START.search(line):
if self.current_partrun_id:
self.end_part_run()
self.current_partrun_id = self.create_part_run()
record_count += 1
continue
# Check for program end
if RE_END_PROGRAM.search(line) or RE_END.search(line):
self.end_part_run()
record_count += 1
continue
# Check for PROCESSDATA
match = RE_PROCESSDATA.search(line)
if match:
self.add_measurement(
'PROCESSDATA',
match.group(1), # method
match.group(2), # dimid
match.group(3), # description
int(match.group(4)) if match.group(4) else 0, # seq
parse_numeric(match.group(5)), # min
parse_numeric(match.group(6)), # max
parse_numeric(match.group(7)), # actual
parse_numeric(match.group(8)), # deviation
int(match.group(9)) if match.group(9) else 0 # oot
)
record_count += 1
continue
# Check for TOOLDATA
match = RE_TOOLDATA.search(line)
if match:
self.add_tooldata(
match.group(1), # method
match.group(2), # dimid
match.group(3), # description
int(match.group(4)) if match.group(4) else 0, # toolnumber (SEQ)
parse_numeric(match.group(5)), # min
parse_numeric(match.group(6)), # max
parse_numeric(match.group(7)), # actual
parse_numeric(match.group(8)), # deviation
int(match.group(9)) if match.group(9) else 0 # oot
)
record_count += 1
continue
# Check for MACHINEDATA (simplified pattern)
match = RE_MACHINEDATA.search(line)
if match:
self.add_measurement(
'MACHINEDATA',
match.group(1), # method
match.group(2), # dimid
None, # description
0, # seq
parse_numeric(match.group(3)),
parse_numeric(match.group(4)),
parse_numeric(match.group(5)),
parse_numeric(match.group(6)),
int(match.group(7)) if match.group(7) else 0
)
record_count += 1
continue
# Check for item crossing
match = RE_ITEM_CROSSING.search(line)
if match:
self.add_event('ITEM-CROSSING', match.group(1), match.group(2))
record_count += 1
continue
# Check for manual request
match = RE_MANUAL_REQUEST.search(line)
if match:
self.start_manual_request(match.group(1))
self.add_event('MANUALREQUEST', None, match.group(1))
record_count += 1
continue
# Check for message
if RE_MESSAGE.search(line):
self.add_event('MESSAGE', None, line[:500])
record_count += 1
continue
# Check for errors
match = RE_ERROR_MSG.search(line)
if match:
self.add_error('ERROR', match.group(1))
record_count += 1
continue
match = RE_EXCEPTION.search(line)
if match:
self.add_error('EXCEPTION', match.group(1))
record_count += 1
continue
match = RE_CANNOT.search(line)
if match:
self.add_error('FAILURE', match.group(1))
record_count += 1
continue
# Check for connection events
match = RE_SERIAL_OPEN.search(line)
if match:
self.add_connection('OPEN', f'Serial{match.group(1)}', 'Serial connection opened')
record_count += 1
continue
match = RE_SERIAL_ACCEPT.search(line)
if match:
self.add_connection('LISTENING', match.group(1), f'Accepting RS232 strings on {match.group(1)}')
record_count += 1
continue
match = RE_SERIAL_CLOSE.search(line)
if match:
self.add_connection('CLOSE', f'Serial{match.group(1)}', 'Serial connection closed')
record_count += 1
continue
# Flush remaining batches
self.flush_measurements()
self.flush_tooldata()
self.flush_events()
self.flush_errors()
self.flush_connections()
# Close any open part run
if self.current_partrun_id:
self.end_part_run()
# Update session end
if self.current_session_id:
self.update_session_end(self.current_session_id, session_end or self.current_timestamp, record_count)
print(f" Parsed {record_count} records")
return {'success': True, 'records': record_count, 'session_id': self.current_session_id}
except Exception as e:
print(f" Error parsing file: {e}")
return {'success': False, 'error': str(e)}
def parse_directory(self, directory: str, max_workers: int = 4) -> Dict[str, Any]:
"""Parse all UDC log files in a directory using parallel processing"""
results = {
'total_files': 0,
'imported': 0,
'skipped': 0,
'errors': 0,
'total_records': 0
}
# Find all log files
pattern = os.path.join(directory, 'UDC_Log_*.log')
files = glob.glob(pattern)
# Also check subdirectories
for subdir in ['3204_BAD', '7606_BAD', 'Dual Spindle']:
subpath = os.path.join(directory, subdir)
if os.path.isdir(subpath):
files.extend(glob.glob(os.path.join(subpath, '**', 'UDC_Log_*.log'), recursive=True))
results['total_files'] = len(files)
print(f"Found {len(files)} log files")
if max_workers <= 1:
# Sequential processing
for filepath in sorted(files):
result = self.parse_file(filepath)
if result['success']:
results['imported'] += 1
results['total_records'] += result.get('records', 0)
elif 'already imported' in result.get('error', '').lower():
results['skipped'] += 1
else:
results['errors'] += 1
print(f" Error: {result.get('error')}")
else:
# Parallel processing
print(f"Using {max_workers} parallel workers...")
completed = 0
with ThreadPoolExecutor(max_workers=max_workers) as executor:
# Submit all files for processing
futures = {
executor.submit(parse_file_worker, filepath, self.db_config): filepath
for filepath in sorted(files)
}
for future in as_completed(futures):
completed += 1
filepath = futures[future]
try:
result = future.result()
if result['success']:
results['imported'] += 1
results['total_records'] += result.get('records', 0)
elif 'already imported' in result.get('error', '').lower():
results['skipped'] += 1
else:
results['errors'] += 1
print(f" Error in {os.path.basename(filepath)}: {result.get('error')}")
except Exception as e:
results['errors'] += 1
print(f" Worker error for {os.path.basename(filepath)}: {e}")
# Progress update
if completed % 10 == 0 or completed == len(files):
print(f" Progress: {completed}/{len(files)} files processed...", end='\r')
print() # New line after progress
return results
def parse_file_worker(filepath: str, db_config: dict) -> Dict[str, Any]:
"""Worker function for parallel file parsing. Creates its own DB connection."""
parser = UDCParser(db_config)
if not parser.connect():
return {'success': False, 'error': 'Failed to connect to database'}
try:
result = parser.parse_file(filepath)
return result
finally:
parser.disconnect()
def main():
parser = argparse.ArgumentParser(description='Parse UDC log files and import to database')
parser.add_argument('--dir', default=LOG_DIRECTORY, help='Directory containing log files')
parser.add_argument('--file', help='Parse a specific file instead of directory')
parser.add_argument('--workers', type=int, default=2, help='Number of parallel workers (default: 2, use 1 for sequential)')
parser.add_argument('--host', default=DB_CONFIG['host'], help='MySQL host')
parser.add_argument('--port', type=int, default=DB_CONFIG['port'], help='MySQL port')
parser.add_argument('--user', default=DB_CONFIG['user'], help='MySQL user')
parser.add_argument('--password', default=DB_CONFIG['password'], help='MySQL password')
parser.add_argument('--database', default=DB_CONFIG['database'], help='MySQL database')
args = parser.parse_args()
# Build config from args
db_config = {
'host': args.host,
'port': args.port,
'user': args.user,
'password': args.password,
'database': args.database
}
# Create parser and connect
udc_parser = UDCParser(db_config)
if not udc_parser.connect():
sys.exit(1)
try:
if args.file:
result = udc_parser.parse_file(args.file)
if result['success']:
print(f"\nSuccessfully imported {result.get('records', 0)} records")
else:
print(f"\nFailed: {result.get('error')}")
sys.exit(1)
else:
results = udc_parser.parse_directory(args.dir, max_workers=args.workers)
print(f"\n{'='*50}")
print(f"Import Summary:")
print(f" Total files found: {results['total_files']}")
print(f" Files imported: {results['imported']}")
print(f" Files skipped: {results['skipped']}")
print(f" Files with errors: {results['errors']}")
print(f" Total records: {results['total_records']}")
finally:
udc_parser.disconnect()
if __name__ == '__main__':
main()