Files
udc-parser/parser/udcparser.py
cproudlock 0f78f76410 Fix sessionid null error when resuming incremental parsing
When parsing resumes from an offset (mid-file), the "Start Log" line
was already processed in a previous run, leaving current_session_id
as None. Added get_or_create_session() to look up or create a session
when resuming from a non-zero offset.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-18 10:31:22 -05:00

998 lines
39 KiB
Python

#!/usr/bin/env python3
"""
UDC Log Parser
Parses UDC (Universal Data Collector) log files and imports data into ShopDB.
Usage:
python udcparser.py [--dir /path/to/logs] [--file specific_file.log]
"""
import re
import os
import sys
import glob
import argparse
from datetime import datetime
from typing import Optional, Dict, List, Any
from concurrent.futures import ThreadPoolExecutor, as_completed
import mysql.connector
from mysql.connector import Error
from mysql.connector.pooling import MySQLConnectionPool
# Import config
try:
from config import DB_CONFIG, UDC_LOG_PATH as LOG_DIRECTORY, BATCH_SIZE
except ImportError:
import platform
IS_WINDOWS = platform.system() == 'Windows'
DB_CONFIG = {
'host': '127.0.0.1',
'port': 3306,
'user': 'root',
'password': 'rootpassword',
'database': 'shopdb'
}
LOG_DIRECTORY = r'S:\SPC\UDC\LogFiles' if IS_WINDOWS else '/home/camp/projects/UDC/LogFiles'
BATCH_SIZE = 1000
# Global connection pool (initialized in main or parse_directory)
_connection_pool = None
def init_connection_pool(db_config: dict, pool_size: int = 5) -> MySQLConnectionPool:
"""Initialize the global connection pool"""
global _connection_pool
if _connection_pool is None:
_connection_pool = MySQLConnectionPool(
pool_name="udc_pool",
pool_size=pool_size,
pool_reset_session=True,
**db_config
)
print(f"Connection pool created (size={pool_size})")
return _connection_pool
def get_connection_from_pool():
"""Get a connection from the pool"""
global _connection_pool
if _connection_pool is None:
raise RuntimeError("Connection pool not initialized. Call init_connection_pool() first.")
return _connection_pool.get_connection()
# =============================================================================
# Regex Patterns
# =============================================================================
# Timestamp pattern: MM/DD/YYYY HH:MM:SS
RE_TIMESTAMP = re.compile(r'(\d{2}/\d{2}/\d{4} \d{2}:\d{2}:\d{2})')
# Log start/end
RE_START_LOG = re.compile(r'^Start Log : (\d{2}/\d{2}/\d{4} \d{2}:\d{2}:\d{2})')
RE_END_LOG = re.compile(r'^End Log : (\d{2}/\d{2}/\d{4} \d{2}:\d{2}:\d{2})')
# Headers loaded
RE_HEADERS = re.compile(r'Headers loaded \(Path 1\): Part=(\S*) Oper=(\S*) Serial=(\S*) ProgName=(\S*) Job=(\S*)')
# Badge number
RE_BADGE = re.compile(r'BadgeNumber = (\S+)')
# Program start/end
RE_START_PROGRAM = re.compile(r': Start of Program')
RE_END_PROGRAM = re.compile(r': End of Program')
# Data entry with timestamp
RE_DATA_ENTRY = re.compile(r'^DATA ENTRY : (\d{2}/\d{2}/\d{4} \d{2}:\d{2}:\d{2})')
# Process/Tool/Machine data
# Note: Log format has spaces after // like "// DIMID//" so we need \s* after each //
RE_PROCESSDATA = re.compile(
r'\+\+\.PROCESSDATA//\s*METHOD//\s*(\w+)//\s*DIMID//\s*(\w+)//\s*DESCRIPTION//\s*(.+?)//\s*SEQ//\s*(\d+)//\s*MIN//\s*(.+?)//\s*MAX//\s*(.+?)//\s*ACTUAL//\s*(.+?)//\s*DEVIATION//\s*(.+?)//\s*OOT//\s*(\d)//\s*--'
)
RE_TOOLDATA = re.compile(
r'\+\+\.TOOLDATA//\s*METHOD//\s*(\w+)//\s*DIMID//\s*(\w+)//\s*DESCRIPTION//\s*(.+?)//\s*SEQ//\s*(\d+)//\s*MIN//\s*(.+?)//\s*MAX//\s*(.+?)//\s*ACTUAL//\s*(.+?)//\s*DEVIATION//\s*(.+?)//\s*OOT//\s*(\d)//\s*--'
)
RE_MACHINEDATA = re.compile(
r'\+\+\.MACHINEDATA//\s*METHOD//\s*(\w+)//\s*DIMID//\s*(\w+)//.*?MIN//\s*(.+?)//\s*MAX//\s*(.+?)//\s*ACTUAL//\s*(.+?)//\s*DEVIATION//\s*(.+?)//\s*OOT//\s*(\d)//\s*--'
)
# Item crossing
RE_ITEM_CROSSING = re.compile(r'\+\+\.ITEM-CROSSING//\s*ITEMNO//\s*(.+?)//\s*DESCRIPTION//\s*(.+?)//\s*--')
# Manual request
RE_MANUAL_REQUEST = re.compile(r'\+\+\.MANUALREQUEST\d?//\s*DESCRIPTION//\s*(.+?)//\s*--')
# Message
RE_MESSAGE = re.compile(r'\+\+\.MESSAGE//')
# Start/End markers in data
RE_START = re.compile(r'\+\+\.START//\s*PROGNAME//\s*(.+?)//\s*SNUMBNO//\s*(.+?)//\s*--')
RE_END = re.compile(r'\+\+\.END//\s*--')
# Error patterns
RE_ERROR_MSG = re.compile(r'^ERROR\s*:\s*(?:Message:\s*)?(.+)', re.IGNORECASE)
RE_EXCEPTION = re.compile(r':\s*!Exception\s*:\s*(.+)')
RE_CANNOT = re.compile(r':\s*((?:cannot|could not|failed to|unable to).+)', re.IGNORECASE)
# Connection patterns
RE_SERIAL_OPEN = re.compile(r':\s*Serial Connection (\d+) Open')
RE_SERIAL_ACCEPT = re.compile(r':\s*Accepting RS232 Strings on (COM\d+)')
RE_SERIAL_CLOSE = re.compile(r':\s*Serial Connection (\d+) Close')
RE_COM_RAW = re.compile(r':\s*\[(COM\d+)\] RAW LINE:')
def parse_timestamp(ts_str: str) -> Optional[datetime]:
"""Parse MM/DD/YYYY HH:MM:SS to datetime"""
try:
return datetime.strptime(ts_str, '%m/%d/%Y %H:%M:%S')
except (ValueError, TypeError):
return None
def parse_numeric(val_str: str) -> Optional[float]:
"""Parse numeric value, handling spaces and leading minus signs"""
if not val_str:
return None
try:
cleaned = val_str.strip().replace(' ', '')
if cleaned == '' or cleaned == '-':
return None
return float(cleaned)
except (ValueError, TypeError):
return None
def extract_machine_number(filename: str) -> Optional[str]:
"""Extract machine number from filename like UDC_Log_3110.log"""
match = re.search(r'UDC_Log_(\d+)', filename)
if match:
return match.group(1)
return None
class UDCParser:
"""Parser for UDC log files"""
def __init__(self, db_config: dict = None, use_pool: bool = False):
self.db_config = db_config
self.use_pool = use_pool
self.conn = None
self.cursor = None
self._owns_connection = True # Track if we should close the connection
# Current state
self.current_session_id = None
self.current_partrun_id = None
self.current_timestamp = None
self.current_badge = None
self.current_headers = {}
self.last_program_end = None
self.pending_manual_request = None
# Counters for current part run
self.measurement_count = 0
self.manual_count = 0
self.probe_count = 0
self.oot_count = 0
# Batch storage
self.measurements_batch = []
self.tooldata_batch = []
self.events_batch = []
self.errors_batch = []
self.connections_batch = []
# Track current machine number for errors/connections outside sessions
self.current_machine_number = None
def connect(self):
"""Connect to MySQL database (or get connection from pool)"""
try:
if self.use_pool:
self.conn = get_connection_from_pool()
self._owns_connection = True
else:
self.conn = mysql.connector.connect(**self.db_config)
print(f"Connected to MySQL at {self.db_config['host']}")
self.cursor = self.conn.cursor(dictionary=True)
return True
except Error as e:
print(f"Error connecting to MySQL: {e}")
return False
def disconnect(self):
"""Disconnect from database (returns connection to pool if pooled)"""
if self.cursor:
self.cursor.close()
self.cursor = None
if self.conn and self._owns_connection:
self.conn.close() # For pooled connections, this returns to pool
self.conn = None
def is_file_imported(self, filename: str) -> bool:
"""Check if a log file has already been imported (DEPRECATED - use get_file_state instead)"""
basename = os.path.basename(filename)
self.cursor.execute(
"SELECT sessionid FROM udcsessions WHERE logfilename = %s",
(basename,)
)
return self.cursor.fetchone() is not None
def get_file_state(self, filepath: str) -> Dict[str, Any]:
"""Get the last known state of a log file (offset, size)"""
self.cursor.execute(
"SELECT fileid, lastoffset, filesize FROM udcfilestate WHERE filepath = %s",
(filepath,)
)
row = self.cursor.fetchone()
if row:
return {'fileid': row['fileid'], 'offset': row['lastoffset'], 'size': row['filesize']}
return {'fileid': None, 'offset': 0, 'size': 0}
def get_or_create_session(self, filepath: str, machine_number: str) -> int:
"""Get existing session for this file or create a new one for incremental parsing"""
basename = os.path.basename(filepath)
# Look for existing session for this file
self.cursor.execute(
"SELECT sessionid FROM udcsessions WHERE logfilename = %s ORDER BY sessionid DESC LIMIT 1",
(basename,)
)
row = self.cursor.fetchone()
if row:
return row['sessionid']
# No existing session, create one with current timestamp
return self.create_session(filepath, machine_number, datetime.now())
def update_file_state(self, filepath: str, machine_number: str, offset: int, filesize: int):
"""Update the file state after parsing"""
self.cursor.execute(
"""INSERT INTO udcfilestate (filepath, machinenumber, lastoffset, lastparsed, filesize)
VALUES (%s, %s, %s, NOW(), %s)
ON DUPLICATE KEY UPDATE lastoffset = %s, lastparsed = NOW(), filesize = %s""",
(filepath, machine_number, offset, filesize, offset, filesize)
)
self.conn.commit()
def create_session(self, filename: str, machine_number: str, start_time: datetime) -> int:
"""Create a new session record"""
basename = os.path.basename(filename)
self.cursor.execute(
"""INSERT INTO udcsessions (machinenumber, logfilename, sessionstart, dateadded)
VALUES (%s, %s, %s, NOW())""",
(machine_number, basename, start_time)
)
self.conn.commit()
return self.cursor.lastrowid
def update_session_end(self, session_id: int, end_time: datetime, record_count: int):
"""Update session with end time and record count"""
self.cursor.execute(
"""UPDATE udcsessions SET sessionend = %s, recordcount = %s WHERE sessionid = %s""",
(end_time, record_count, session_id)
)
self.conn.commit()
def create_part_run(self) -> int:
"""Create a new part run record"""
changeover = None
if self.last_program_end and self.current_timestamp:
changeover = int((self.current_timestamp - self.last_program_end).total_seconds())
self.cursor.execute(
"""INSERT INTO udcparts
(sessionid, partnumber, opernumber, serialnumber, programname, jobnumber,
badgenumber, programstart, changeover, dateadded)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, NOW())""",
(
self.current_session_id,
self.current_headers.get('part'),
self.current_headers.get('oper'),
self.current_headers.get('serial'),
self.current_headers.get('progname'),
self.current_headers.get('job'),
self.current_badge,
self.current_timestamp,
changeover
)
)
self.conn.commit()
# Reset counters
self.measurement_count = 0
self.manual_count = 0
self.probe_count = 0
self.oot_count = 0
return self.cursor.lastrowid
def end_part_run(self):
"""Finalize the current part run"""
if not self.current_partrun_id:
return
# Calculate cycle time
cycletime = None
if self.current_timestamp:
self.cursor.execute(
"SELECT programstart FROM udcparts WHERE partrunid = %s",
(self.current_partrun_id,)
)
row = self.cursor.fetchone()
if row and row['programstart']:
cycletime = int((self.current_timestamp - row['programstart']).total_seconds())
# Update part run with end time and counts
self.cursor.execute(
"""UPDATE udcparts SET
programend = %s, cycletime = %s,
measurementcount = %s, manualcount = %s, probecount = %s, ootcount = %s
WHERE partrunid = %s""",
(
self.current_timestamp, cycletime,
self.measurement_count, self.manual_count, self.probe_count, self.oot_count,
self.current_partrun_id
)
)
self.conn.commit()
self.last_program_end = self.current_timestamp
self.current_partrun_id = None
def add_measurement(self, event_type: str, method: str, dimid: str, description: str,
seq: int, minval: float, maxval: float, actualval: float,
deviation: float, oot: int):
"""Add a measurement record"""
self.measurements_batch.append((
self.current_partrun_id,
self.current_session_id,
self.current_timestamp,
event_type,
method,
dimid,
description[:255] if description else None,
seq,
minval,
maxval,
actualval,
deviation,
oot
))
# Update counters
self.measurement_count += 1
if method and method.upper() == 'MANUAL':
self.manual_count += 1
elif method and method.upper() == 'PROBE':
self.probe_count += 1
if oot:
self.oot_count += 1
# Check for pending manual request
if self.pending_manual_request and method and method.upper() == 'MANUAL':
self.complete_manual_request()
# Flush batch if needed
if len(self.measurements_batch) >= BATCH_SIZE:
self.flush_measurements()
def add_tooldata(self, method: str, dimid: str, description: str,
toolnumber: int, minval: float, maxval: float, actualval: float,
deviation: float, oot: int):
"""Add a tool data record"""
self.tooldata_batch.append((
self.current_partrun_id,
self.current_session_id,
self.current_timestamp,
method,
dimid,
description[:255] if description else None,
toolnumber,
minval,
maxval,
actualval,
deviation,
oot
))
# Flush batch if needed
if len(self.tooldata_batch) >= BATCH_SIZE:
self.flush_tooldata()
def add_event(self, event_type: str, item_number: str = None, description: str = None):
"""Add an event record"""
self.events_batch.append((
self.current_partrun_id,
self.current_session_id,
self.current_timestamp,
event_type,
item_number,
description
))
if len(self.events_batch) >= BATCH_SIZE:
self.flush_events()
def start_manual_request(self, description: str):
"""Record start of a manual request"""
self.pending_manual_request = {
'time': self.current_timestamp,
'description': description,
'partrunid': self.current_partrun_id
}
def complete_manual_request(self):
"""Complete a manual request with the response measurement"""
if not self.pending_manual_request:
return
response_seconds = None
if self.pending_manual_request['time'] and self.current_timestamp:
response_seconds = int((self.current_timestamp - self.pending_manual_request['time']).total_seconds())
self.cursor.execute(
"""INSERT INTO udcmanualrequests
(partrunid, requesttime, responsetime, responseseconds, description)
VALUES (%s, %s, %s, %s, %s)""",
(
self.pending_manual_request['partrunid'],
self.pending_manual_request['time'],
self.current_timestamp,
response_seconds,
self.pending_manual_request['description'][:255] if self.pending_manual_request['description'] else None
)
)
self.pending_manual_request = None
def flush_measurements(self):
"""Flush measurement batch to database"""
if not self.measurements_batch:
return
self.cursor.executemany(
"""INSERT INTO udcmeasurements
(partrunid, sessionid, eventtime, eventtype, method, dimid, description,
seqnumber, minval, maxval, actualval, deviation, oot)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)""",
self.measurements_batch
)
self.conn.commit()
self.measurements_batch = []
def flush_tooldata(self):
"""Flush tool data batch to database"""
if not self.tooldata_batch:
return
self.cursor.executemany(
"""INSERT INTO udctooldata
(partrunid, sessionid, eventtime, method, dimid, description,
toolnumber, minval, maxval, actualval, deviation, oot)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)""",
self.tooldata_batch
)
self.conn.commit()
self.tooldata_batch = []
def flush_events(self):
"""Flush event batch to database"""
if not self.events_batch:
return
self.cursor.executemany(
"""INSERT INTO udcevents
(partrunid, sessionid, eventtime, eventtype, itemnumber, description)
VALUES (%s, %s, %s, %s, %s, %s)""",
self.events_batch
)
self.conn.commit()
self.events_batch = []
def add_error(self, error_type: str, error_message: str, source_method: str = None):
"""Add an error record"""
self.errors_batch.append((
self.current_session_id,
self.current_machine_number,
self.current_timestamp,
error_type[:50] if error_type else 'UNKNOWN',
error_message[:500] if error_message else None,
source_method[:100] if source_method else None
))
if len(self.errors_batch) >= BATCH_SIZE:
self.flush_errors()
def add_connection(self, event_type: str, comport: str = None, details: str = None):
"""Add a connection event record"""
self.connections_batch.append((
self.current_session_id,
self.current_machine_number,
self.current_timestamp,
event_type[:20] if event_type else 'UNKNOWN',
comport[:10] if comport else None,
details[:255] if details else None
))
if len(self.connections_batch) >= BATCH_SIZE:
self.flush_connections()
def flush_errors(self):
"""Flush error batch to database"""
if not self.errors_batch:
return
self.cursor.executemany(
"""INSERT INTO udcerrors
(sessionid, machinenumber, eventtime, errortype, errormessage, sourcemethod)
VALUES (%s, %s, %s, %s, %s, %s)""",
self.errors_batch
)
self.conn.commit()
self.errors_batch = []
def flush_connections(self):
"""Flush connection batch to database"""
if not self.connections_batch:
return
self.cursor.executemany(
"""INSERT INTO udcconnections
(sessionid, machinenumber, eventtime, eventtype, comport, details)
VALUES (%s, %s, %s, %s, %s, %s)""",
self.connections_batch
)
self.conn.commit()
self.connections_batch = []
def parse_file(self, filepath: str, force_full: bool = False) -> Dict[str, Any]:
"""Parse a single UDC log file incrementally from last known position"""
filename = os.path.basename(filepath)
machine_number = extract_machine_number(filename)
if not machine_number:
return {'success': False, 'error': f'Could not extract machine number from {filename}'}
# Get current file size
try:
current_size = os.path.getsize(filepath)
except OSError as e:
return {'success': False, 'error': f'Cannot access file: {e}'}
# Get last known state
file_state = self.get_file_state(filepath)
start_offset = 0 if force_full else file_state['offset']
# Check if file has new content
if not force_full and current_size <= file_state['size'] and file_state['offset'] > 0:
return {'success': False, 'error': f'No new content in {filename}', 'skipped': True}
# If file is smaller than last offset, it was likely rotated/truncated - start fresh
if current_size < start_offset:
print(f" File {filename} appears truncated, parsing from beginning...")
start_offset = 0
if start_offset > 0:
print(f"Parsing {filename} (machine {machine_number}) from offset {start_offset}...")
else:
print(f"Parsing {filename} (machine {machine_number})...")
# Reset state
self.current_session_id = None
# If resuming from offset, get or create session immediately
# (we may not encounter a "Start Log" line when resuming mid-file)
if start_offset > 0:
self.current_session_id = self.get_or_create_session(filepath, machine_number)
self.current_partrun_id = None
self.current_timestamp = None
self.current_badge = None
self.current_headers = {}
self.last_program_end = None
self.pending_manual_request = None
self.measurements_batch = []
self.tooldata_batch = []
self.events_batch = []
self.errors_batch = []
self.connections_batch = []
self.current_machine_number = machine_number
record_count = 0
session_start = None
session_end = None
try:
with open(filepath, 'r', encoding='utf-8', errors='ignore') as f:
# Seek to last known position
if start_offset > 0:
f.seek(start_offset)
# If we're in the middle of a line, skip to next complete line
if start_offset > 0:
f.readline() # Skip partial line
for line in f:
line = line.rstrip()
# Skip empty lines
if not line:
continue
# Check for log start
match = RE_START_LOG.search(line)
if match:
ts = parse_timestamp(match.group(1))
if ts and not session_start:
session_start = ts
self.current_timestamp = ts
self.current_session_id = self.create_session(filepath, machine_number, ts)
continue
# Check for log end
match = RE_END_LOG.search(line)
if match:
ts = parse_timestamp(match.group(1))
if ts:
session_end = ts
self.current_timestamp = ts
continue
# Check for DATA ENTRY timestamp
match = RE_DATA_ENTRY.search(line)
if match:
ts = parse_timestamp(match.group(1))
if ts:
self.current_timestamp = ts
continue
# Check for any other timestamp in line
match = RE_TIMESTAMP.search(line)
if match and ': ' in line:
ts = parse_timestamp(match.group(1))
if ts:
self.current_timestamp = ts
# Check for headers
match = RE_HEADERS.search(line)
if match:
self.current_headers = {
'part': match.group(1) if match.group(1) else None,
'oper': match.group(2) if match.group(2) else None,
'serial': match.group(3) if match.group(3) else None,
'progname': match.group(4) if match.group(4) else None,
'job': match.group(5) if match.group(5) else None
}
continue
# Check for badge
match = RE_BADGE.search(line)
if match:
self.current_badge = match.group(1)
continue
# Check for program start
if RE_START_PROGRAM.search(line) or RE_START.search(line):
if self.current_partrun_id:
self.end_part_run()
self.current_partrun_id = self.create_part_run()
record_count += 1
continue
# Check for program end
if RE_END_PROGRAM.search(line) or RE_END.search(line):
self.end_part_run()
record_count += 1
continue
# Check for PROCESSDATA
match = RE_PROCESSDATA.search(line)
if match:
self.add_measurement(
'PROCESSDATA',
match.group(1), # method
match.group(2), # dimid
match.group(3), # description
int(match.group(4)) if match.group(4) else 0, # seq
parse_numeric(match.group(5)), # min
parse_numeric(match.group(6)), # max
parse_numeric(match.group(7)), # actual
parse_numeric(match.group(8)), # deviation
int(match.group(9)) if match.group(9) else 0 # oot
)
record_count += 1
continue
# Check for TOOLDATA
match = RE_TOOLDATA.search(line)
if match:
self.add_tooldata(
match.group(1), # method
match.group(2), # dimid
match.group(3), # description
int(match.group(4)) if match.group(4) else 0, # toolnumber (SEQ)
parse_numeric(match.group(5)), # min
parse_numeric(match.group(6)), # max
parse_numeric(match.group(7)), # actual
parse_numeric(match.group(8)), # deviation
int(match.group(9)) if match.group(9) else 0 # oot
)
record_count += 1
continue
# Check for MACHINEDATA (simplified pattern)
match = RE_MACHINEDATA.search(line)
if match:
self.add_measurement(
'MACHINEDATA',
match.group(1), # method
match.group(2), # dimid
None, # description
0, # seq
parse_numeric(match.group(3)),
parse_numeric(match.group(4)),
parse_numeric(match.group(5)),
parse_numeric(match.group(6)),
int(match.group(7)) if match.group(7) else 0
)
record_count += 1
continue
# Check for item crossing
match = RE_ITEM_CROSSING.search(line)
if match:
self.add_event('ITEM-CROSSING', match.group(1), match.group(2))
record_count += 1
continue
# Check for manual request
match = RE_MANUAL_REQUEST.search(line)
if match:
self.start_manual_request(match.group(1))
self.add_event('MANUALREQUEST', None, match.group(1))
record_count += 1
continue
# Check for message
if RE_MESSAGE.search(line):
self.add_event('MESSAGE', None, line[:500])
record_count += 1
continue
# Check for errors
match = RE_ERROR_MSG.search(line)
if match:
self.add_error('ERROR', match.group(1))
record_count += 1
continue
match = RE_EXCEPTION.search(line)
if match:
self.add_error('EXCEPTION', match.group(1))
record_count += 1
continue
match = RE_CANNOT.search(line)
if match:
self.add_error('FAILURE', match.group(1))
record_count += 1
continue
# Check for connection events
match = RE_SERIAL_OPEN.search(line)
if match:
self.add_connection('OPEN', f'Serial{match.group(1)}', 'Serial connection opened')
record_count += 1
continue
match = RE_SERIAL_ACCEPT.search(line)
if match:
self.add_connection('LISTENING', match.group(1), f'Accepting RS232 strings on {match.group(1)}')
record_count += 1
continue
match = RE_SERIAL_CLOSE.search(line)
if match:
self.add_connection('CLOSE', f'Serial{match.group(1)}', 'Serial connection closed')
record_count += 1
continue
# Flush remaining batches
self.flush_measurements()
self.flush_tooldata()
self.flush_events()
self.flush_errors()
self.flush_connections()
# Close any open part run
if self.current_partrun_id:
self.end_part_run()
# Update session end
if self.current_session_id:
self.update_session_end(self.current_session_id, session_end or self.current_timestamp, record_count)
# Update file state with current position
self.update_file_state(filepath, machine_number, current_size, current_size)
print(f" Parsed {record_count} records")
return {'success': True, 'records': record_count, 'session_id': self.current_session_id, 'new_content': True}
except Exception as e:
print(f" Error parsing file: {e}")
return {'success': False, 'error': str(e)}
def parse_directory(self, directory: str, max_workers: int = 4, force_full: bool = False) -> Dict[str, Any]:
"""Parse all UDC log files in a directory using parallel processing"""
results = {
'total_files': 0,
'imported': 0,
'skipped': 0,
'errors': 0,
'total_records': 0
}
# Find all log files
pattern = os.path.join(directory, 'UDC_Log_*.log')
files = glob.glob(pattern)
# Also check subdirectories
for subdir in ['3204_BAD', '7606_BAD', 'Dual Spindle']:
subpath = os.path.join(directory, subdir)
if os.path.isdir(subpath):
files.extend(glob.glob(os.path.join(subpath, '**', 'UDC_Log_*.log'), recursive=True))
results['total_files'] = len(files)
print(f"Found {len(files)} log files")
if max_workers <= 1:
# Sequential processing
for filepath in sorted(files):
result = self.parse_file(filepath, force_full=force_full)
if result['success']:
results['imported'] += 1
results['total_records'] += result.get('records', 0)
elif result.get('skipped') or 'no new content' in result.get('error', '').lower():
results['skipped'] += 1
elif 'already imported' in result.get('error', '').lower():
results['skipped'] += 1
else:
results['errors'] += 1
print(f" Error: {result.get('error')}")
else:
# Parallel processing with connection pool
# Pool size = workers + 1 buffer for safety
pool_size = min(max_workers + 1, 10)
init_connection_pool(self.db_config, pool_size=pool_size)
print(f"Using {max_workers} parallel workers with connection pool...")
completed = 0
with ThreadPoolExecutor(max_workers=max_workers) as executor:
# Submit all files for processing (use pool, not individual connections)
futures = {
executor.submit(parse_file_worker, filepath, None, True, 3, force_full): filepath
for filepath in sorted(files)
}
for future in as_completed(futures):
completed += 1
filepath = futures[future]
try:
result = future.result()
if result['success']:
results['imported'] += 1
results['total_records'] += result.get('records', 0)
elif result.get('skipped') or 'no new content' in result.get('error', '').lower():
results['skipped'] += 1
elif 'already imported' in result.get('error', '').lower():
results['skipped'] += 1
else:
results['errors'] += 1
print(f" Error in {os.path.basename(filepath)}: {result.get('error')}")
except Exception as e:
results['errors'] += 1
print(f" Worker error for {os.path.basename(filepath)}: {e}")
# Progress update
if completed % 10 == 0 or completed == len(files):
print(f" Progress: {completed}/{len(files)} files processed...", end='\r')
print() # New line after progress
return results
def parse_file_worker(filepath: str, db_config: dict = None, use_pool: bool = True, max_retries: int = 3, force_full: bool = False) -> Dict[str, Any]:
"""Worker function for parallel file parsing. Uses connection pool by default with retry logic."""
last_error = None
for attempt in range(max_retries):
parser = UDCParser(db_config=db_config, use_pool=use_pool)
try:
if not parser.connect():
last_error = 'Failed to connect to database'
continue
# Verify connection is alive before using
try:
parser.conn.ping(reconnect=True, attempts=2, delay=1)
except Exception:
parser.disconnect()
last_error = 'Connection ping failed'
continue
result = parser.parse_file(filepath, force_full=force_full)
return result
except Error as e:
last_error = str(e)
# Check if it's a connection error worth retrying
if e.errno in (2006, 2013, 2055): # Connection lost errors
parser.disconnect()
continue
else:
return {'success': False, 'error': str(e)}
finally:
parser.disconnect()
return {'success': False, 'error': f'Failed after {max_retries} attempts: {last_error}'}
def main():
parser = argparse.ArgumentParser(description='Parse UDC log files and import to database')
parser.add_argument('--dir', default=LOG_DIRECTORY, help='Directory containing log files')
parser.add_argument('--file', help='Parse a specific file instead of directory')
parser.add_argument('--force', action='store_true', help='Force full re-parse, ignoring previous position')
parser.add_argument('--workers', type=int, default=1, help='Number of parallel workers (default: 1 sequential, increase only if server supports it)')
parser.add_argument('--host', default=DB_CONFIG['host'], help='MySQL host')
parser.add_argument('--port', type=int, default=DB_CONFIG['port'], help='MySQL port')
parser.add_argument('--user', default=DB_CONFIG['user'], help='MySQL user')
parser.add_argument('--password', default=DB_CONFIG['password'], help='MySQL password')
parser.add_argument('--database', default=DB_CONFIG['database'], help='MySQL database')
args = parser.parse_args()
# Build config from args
db_config = {
'host': args.host,
'port': args.port,
'user': args.user,
'password': args.password,
'database': args.database
}
# Create parser and connect
udc_parser = UDCParser(db_config)
if not udc_parser.connect():
sys.exit(1)
try:
if args.file:
result = udc_parser.parse_file(args.file, force_full=args.force)
if result['success']:
print(f"\nSuccessfully imported {result.get('records', 0)} records")
elif result.get('skipped'):
print(f"\nNo new content to parse")
else:
print(f"\nFailed: {result.get('error')}")
sys.exit(1)
else:
results = udc_parser.parse_directory(args.dir, max_workers=args.workers, force_full=args.force)
print(f"\n{'='*50}")
print(f"Import Summary:")
print(f" Total files found: {results['total_files']}")
print(f" Files imported: {results['imported']}")
print(f" Files skipped: {results['skipped']}")
print(f" Files with errors: {results['errors']}")
print(f" Total records: {results['total_records']}")
finally:
udc_parser.disconnect()
if __name__ == '__main__':
main()