Add connection pooling and fix regex patterns for log format

- Add UDC_LOG_PATH configuration for Windows/Linux
- Implement MySQL connection pooling for better performance
- Fix regex patterns to handle spaces after // delimiters in log files
- Update parser to support pooled connections

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
cproudlock
2025-12-17 13:49:19 -05:00
parent 149a223ce2
commit 5bb142e33c
2 changed files with 170 additions and 44 deletions

View File

@@ -11,6 +11,11 @@ CLM_DATA_PATH_WINDOWS = r'S:\SPC\UDC\CLM_Data'
CLM_DATA_PATH_LINUX = '/home/camp/projects/UDC/CLM_Data' CLM_DATA_PATH_LINUX = '/home/camp/projects/UDC/CLM_Data'
CLM_DATA_PATH = CLM_DATA_PATH_WINDOWS if IS_WINDOWS else CLM_DATA_PATH_LINUX CLM_DATA_PATH = CLM_DATA_PATH_WINDOWS if IS_WINDOWS else CLM_DATA_PATH_LINUX
# UDC Log paths
UDC_LOG_PATH_WINDOWS = r'S:\SPC\UDC\LogFiles'
UDC_LOG_PATH_LINUX = '/home/camp/projects/UDC/LogFiles'
UDC_LOG_PATH = UDC_LOG_PATH_WINDOWS if IS_WINDOWS else UDC_LOG_PATH_LINUX
# Database - Development (Docker on Linux) # Database - Development (Docker on Linux)
DB_CONFIG_DEV = { DB_CONFIG_DEV = {
'host': '127.0.0.1', 'host': '127.0.0.1',

View File

@@ -17,11 +17,14 @@ from typing import Optional, Dict, List, Any
from concurrent.futures import ThreadPoolExecutor, as_completed from concurrent.futures import ThreadPoolExecutor, as_completed
import mysql.connector import mysql.connector
from mysql.connector import Error from mysql.connector import Error
from mysql.connector.pooling import MySQLConnectionPool
# Import config # Import config
try: try:
from config import DB_CONFIG_DEV as DB_CONFIG, LOG_DIRECTORY, BATCH_SIZE from config import DB_CONFIG, UDC_LOG_PATH as LOG_DIRECTORY, BATCH_SIZE
except ImportError: except ImportError:
import platform
IS_WINDOWS = platform.system() == 'Windows'
DB_CONFIG = { DB_CONFIG = {
'host': '127.0.0.1', 'host': '127.0.0.1',
'port': 3306, 'port': 3306,
@@ -29,9 +32,35 @@ except ImportError:
'password': 'rootpassword', 'password': 'rootpassword',
'database': 'shopdb' 'database': 'shopdb'
} }
LOG_DIRECTORY = '/home/camp/projects/UDC/LogFiles' LOG_DIRECTORY = r'S:\SPC\UDC\LogFiles' if IS_WINDOWS else '/home/camp/projects/UDC/LogFiles'
BATCH_SIZE = 1000 BATCH_SIZE = 1000
# Global connection pool (initialized in main or parse_directory)
_connection_pool = None
def init_connection_pool(db_config: dict, pool_size: int = 5) -> MySQLConnectionPool:
"""Initialize the global connection pool"""
global _connection_pool
if _connection_pool is None:
_connection_pool = MySQLConnectionPool(
pool_name="udc_pool",
pool_size=pool_size,
pool_reset_session=True,
**db_config
)
print(f"Connection pool created (size={pool_size})")
return _connection_pool
def get_connection_from_pool():
"""Get a connection from the pool"""
global _connection_pool
if _connection_pool is None:
raise RuntimeError("Connection pool not initialized. Call init_connection_pool() first.")
return _connection_pool.get_connection()
# ============================================================================= # =============================================================================
# Regex Patterns # Regex Patterns
# ============================================================================= # =============================================================================
@@ -57,28 +86,29 @@ RE_END_PROGRAM = re.compile(r': End of Program')
RE_DATA_ENTRY = re.compile(r'^DATA ENTRY : (\d{2}/\d{2}/\d{4} \d{2}:\d{2}:\d{2})') RE_DATA_ENTRY = re.compile(r'^DATA ENTRY : (\d{2}/\d{2}/\d{4} \d{2}:\d{2}:\d{2})')
# Process/Tool/Machine data # Process/Tool/Machine data
# Note: Log format has spaces after // like "// DIMID//" so we need \s* after each //
RE_PROCESSDATA = re.compile( RE_PROCESSDATA = re.compile(
r'\+\+\.PROCESSDATA//METHOD//(\w+)//\s*DIMID//(\w+)//\s*DESCRIPTION//(.+?)//\s*SEQ//\s*(\d+)//\s*MIN//(.+?)//\s*MAX//(.+?)//\s*ACTUAL//(.+?)//\s*DEVIATION//(.+?)//\s*OOT//\s*(\d)//\s*--' r'\+\+\.PROCESSDATA//\s*METHOD//\s*(\w+)//\s*DIMID//\s*(\w+)//\s*DESCRIPTION//\s*(.+?)//\s*SEQ//\s*(\d+)//\s*MIN//\s*(.+?)//\s*MAX//\s*(.+?)//\s*ACTUAL//\s*(.+?)//\s*DEVIATION//\s*(.+?)//\s*OOT//\s*(\d)//\s*--'
) )
RE_TOOLDATA = re.compile( RE_TOOLDATA = re.compile(
r'\+\+\.TOOLDATA//METHOD//(\w+)//\s*DIMID//(\w+)//\s*DESCRIPTION//(.+?)//\s*SEQ//\s*(\d+)//\s*MIN//(.+?)//\s*MAX//(.+?)//\s*ACTUAL//(.+?)//\s*DEVIATION//(.+?)//\s*OOT//\s*(\d)//\s*--' r'\+\+\.TOOLDATA//\s*METHOD//\s*(\w+)//\s*DIMID//\s*(\w+)//\s*DESCRIPTION//\s*(.+?)//\s*SEQ//\s*(\d+)//\s*MIN//\s*(.+?)//\s*MAX//\s*(.+?)//\s*ACTUAL//\s*(.+?)//\s*DEVIATION//\s*(.+?)//\s*OOT//\s*(\d)//\s*--'
) )
RE_MACHINEDATA = re.compile( RE_MACHINEDATA = re.compile(
r'\+\+\.MACHINEDATA//METHOD//(\w+)//\s*DIMID//(\w+)//.*?MIN//(.+?)//\s*MAX//(.+?)//\s*ACTUAL//(.+?)//\s*DEVIATION//(.+?)//\s*OOT//\s*(\d)//\s*--' r'\+\+\.MACHINEDATA//\s*METHOD//\s*(\w+)//\s*DIMID//\s*(\w+)//.*?MIN//\s*(.+?)//\s*MAX//\s*(.+?)//\s*ACTUAL//\s*(.+?)//\s*DEVIATION//\s*(.+?)//\s*OOT//\s*(\d)//\s*--'
) )
# Item crossing # Item crossing
RE_ITEM_CROSSING = re.compile(r'\+\+\.ITEM-CROSSING//ITEMNO//(.+?)//DESCRIPTION//(.+?)//--') RE_ITEM_CROSSING = re.compile(r'\+\+\.ITEM-CROSSING//\s*ITEMNO//\s*(.+?)//\s*DESCRIPTION//\s*(.+?)//\s*--')
# Manual request # Manual request
RE_MANUAL_REQUEST = re.compile(r'\+\+\.MANUALREQUEST\d?//\s*DESCRIPTION//(.+?)//\s*--') RE_MANUAL_REQUEST = re.compile(r'\+\+\.MANUALREQUEST\d?//\s*DESCRIPTION//\s*(.+?)//\s*--')
# Message # Message
RE_MESSAGE = re.compile(r'\+\+\.MESSAGE//') RE_MESSAGE = re.compile(r'\+\+\.MESSAGE//')
# Start/End markers in data # Start/End markers in data
RE_START = re.compile(r'\+\+\.START//PROGNAME//(.+?)//SNUMBNO//(.+?)//--') RE_START = re.compile(r'\+\+\.START//\s*PROGNAME//\s*(.+?)//\s*SNUMBNO//\s*(.+?)//\s*--')
RE_END = re.compile(r'\+\+\.END//--') RE_END = re.compile(r'\+\+\.END//\s*--')
# Error patterns # Error patterns
RE_ERROR_MSG = re.compile(r'^ERROR\s*:\s*(?:Message:\s*)?(.+)', re.IGNORECASE) RE_ERROR_MSG = re.compile(r'^ERROR\s*:\s*(?:Message:\s*)?(.+)', re.IGNORECASE)
@@ -124,10 +154,12 @@ def extract_machine_number(filename: str) -> Optional[str]:
class UDCParser: class UDCParser:
"""Parser for UDC log files""" """Parser for UDC log files"""
def __init__(self, db_config: dict): def __init__(self, db_config: dict = None, use_pool: bool = False):
self.db_config = db_config self.db_config = db_config
self.use_pool = use_pool
self.conn = None self.conn = None
self.cursor = None self.cursor = None
self._owns_connection = True # Track if we should close the connection
# Current state # Current state
self.current_session_id = None self.current_session_id = None
@@ -155,25 +187,31 @@ class UDCParser:
self.current_machine_number = None self.current_machine_number = None
def connect(self): def connect(self):
"""Connect to MySQL database""" """Connect to MySQL database (or get connection from pool)"""
try: try:
self.conn = mysql.connector.connect(**self.db_config) if self.use_pool:
self.conn = get_connection_from_pool()
self._owns_connection = True
else:
self.conn = mysql.connector.connect(**self.db_config)
print(f"Connected to MySQL at {self.db_config['host']}")
self.cursor = self.conn.cursor(dictionary=True) self.cursor = self.conn.cursor(dictionary=True)
print(f"Connected to MySQL at {self.db_config['host']}")
return True return True
except Error as e: except Error as e:
print(f"Error connecting to MySQL: {e}") print(f"Error connecting to MySQL: {e}")
return False return False
def disconnect(self): def disconnect(self):
"""Disconnect from database""" """Disconnect from database (returns connection to pool if pooled)"""
if self.cursor: if self.cursor:
self.cursor.close() self.cursor.close()
if self.conn: self.cursor = None
self.conn.close() if self.conn and self._owns_connection:
self.conn.close() # For pooled connections, this returns to pool
self.conn = None
def is_file_imported(self, filename: str) -> bool: def is_file_imported(self, filename: str) -> bool:
"""Check if a log file has already been imported""" """Check if a log file has already been imported (DEPRECATED - use get_file_state instead)"""
basename = os.path.basename(filename) basename = os.path.basename(filename)
self.cursor.execute( self.cursor.execute(
"SELECT sessionid FROM udcsessions WHERE logfilename = %s", "SELECT sessionid FROM udcsessions WHERE logfilename = %s",
@@ -181,6 +219,27 @@ class UDCParser:
) )
return self.cursor.fetchone() is not None return self.cursor.fetchone() is not None
def get_file_state(self, filepath: str) -> Dict[str, Any]:
"""Get the last known state of a log file (offset, size)"""
self.cursor.execute(
"SELECT fileid, lastoffset, filesize FROM udcfilestate WHERE filepath = %s",
(filepath,)
)
row = self.cursor.fetchone()
if row:
return {'fileid': row['fileid'], 'offset': row['lastoffset'], 'size': row['filesize']}
return {'fileid': None, 'offset': 0, 'size': 0}
def update_file_state(self, filepath: str, machine_number: str, offset: int, filesize: int):
"""Update the file state after parsing"""
self.cursor.execute(
"""INSERT INTO udcfilestate (filepath, machinenumber, lastoffset, lastparsed, filesize)
VALUES (%s, %s, %s, NOW(), %s)
ON DUPLICATE KEY UPDATE lastoffset = %s, lastparsed = NOW(), filesize = %s""",
(filepath, machine_number, offset, filesize, offset, filesize)
)
self.conn.commit()
def create_session(self, filename: str, machine_number: str, start_time: datetime) -> int: def create_session(self, filename: str, machine_number: str, start_time: datetime) -> int:
"""Create a new session record""" """Create a new session record"""
basename = os.path.basename(filename) basename = os.path.basename(filename)
@@ -472,19 +531,37 @@ class UDCParser:
self.conn.commit() self.conn.commit()
self.connections_batch = [] self.connections_batch = []
def parse_file(self, filepath: str) -> Dict[str, Any]: def parse_file(self, filepath: str, force_full: bool = False) -> Dict[str, Any]:
"""Parse a single UDC log file""" """Parse a single UDC log file incrementally from last known position"""
filename = os.path.basename(filepath) filename = os.path.basename(filepath)
machine_number = extract_machine_number(filename) machine_number = extract_machine_number(filename)
if not machine_number: if not machine_number:
return {'success': False, 'error': f'Could not extract machine number from {filename}'} return {'success': False, 'error': f'Could not extract machine number from {filename}'}
# Check if already imported # Get current file size
if self.is_file_imported(filepath): try:
return {'success': False, 'error': f'File already imported: {filename}'} current_size = os.path.getsize(filepath)
except OSError as e:
return {'success': False, 'error': f'Cannot access file: {e}'}
print(f"Parsing {filename} (machine {machine_number})...") # Get last known state
file_state = self.get_file_state(filepath)
start_offset = 0 if force_full else file_state['offset']
# Check if file has new content
if not force_full and current_size <= file_state['size'] and file_state['offset'] > 0:
return {'success': False, 'error': f'No new content in {filename}', 'skipped': True}
# If file is smaller than last offset, it was likely rotated/truncated - start fresh
if current_size < start_offset:
print(f" File {filename} appears truncated, parsing from beginning...")
start_offset = 0
if start_offset > 0:
print(f"Parsing {filename} (machine {machine_number}) from offset {start_offset}...")
else:
print(f"Parsing {filename} (machine {machine_number})...")
# Reset state # Reset state
self.current_session_id = None self.current_session_id = None
@@ -507,7 +584,14 @@ class UDCParser:
try: try:
with open(filepath, 'r', encoding='utf-8', errors='ignore') as f: with open(filepath, 'r', encoding='utf-8', errors='ignore') as f:
for line_num, line in enumerate(f, 1): # Seek to last known position
if start_offset > 0:
f.seek(start_offset)
# If we're in the middle of a line, skip to next complete line
if start_offset > 0:
f.readline() # Skip partial line
for line in f:
line = line.rstrip() line = line.rstrip()
# Skip empty lines # Skip empty lines
@@ -707,14 +791,17 @@ class UDCParser:
if self.current_session_id: if self.current_session_id:
self.update_session_end(self.current_session_id, session_end or self.current_timestamp, record_count) self.update_session_end(self.current_session_id, session_end or self.current_timestamp, record_count)
# Update file state with current position
self.update_file_state(filepath, machine_number, current_size, current_size)
print(f" Parsed {record_count} records") print(f" Parsed {record_count} records")
return {'success': True, 'records': record_count, 'session_id': self.current_session_id} return {'success': True, 'records': record_count, 'session_id': self.current_session_id, 'new_content': True}
except Exception as e: except Exception as e:
print(f" Error parsing file: {e}") print(f" Error parsing file: {e}")
return {'success': False, 'error': str(e)} return {'success': False, 'error': str(e)}
def parse_directory(self, directory: str, max_workers: int = 4) -> Dict[str, Any]: def parse_directory(self, directory: str, max_workers: int = 4, force_full: bool = False) -> Dict[str, Any]:
"""Parse all UDC log files in a directory using parallel processing""" """Parse all UDC log files in a directory using parallel processing"""
results = { results = {
'total_files': 0, 'total_files': 0,
@@ -740,25 +827,31 @@ class UDCParser:
if max_workers <= 1: if max_workers <= 1:
# Sequential processing # Sequential processing
for filepath in sorted(files): for filepath in sorted(files):
result = self.parse_file(filepath) result = self.parse_file(filepath, force_full=force_full)
if result['success']: if result['success']:
results['imported'] += 1 results['imported'] += 1
results['total_records'] += result.get('records', 0) results['total_records'] += result.get('records', 0)
elif result.get('skipped') or 'no new content' in result.get('error', '').lower():
results['skipped'] += 1
elif 'already imported' in result.get('error', '').lower(): elif 'already imported' in result.get('error', '').lower():
results['skipped'] += 1 results['skipped'] += 1
else: else:
results['errors'] += 1 results['errors'] += 1
print(f" Error: {result.get('error')}") print(f" Error: {result.get('error')}")
else: else:
# Parallel processing # Parallel processing with connection pool
print(f"Using {max_workers} parallel workers...") # Pool size = workers + 1 buffer for safety
pool_size = min(max_workers + 1, 10)
init_connection_pool(self.db_config, pool_size=pool_size)
print(f"Using {max_workers} parallel workers with connection pool...")
completed = 0 completed = 0
with ThreadPoolExecutor(max_workers=max_workers) as executor: with ThreadPoolExecutor(max_workers=max_workers) as executor:
# Submit all files for processing # Submit all files for processing (use pool, not individual connections)
futures = { futures = {
executor.submit(parse_file_worker, filepath, self.db_config): filepath executor.submit(parse_file_worker, filepath, None, True, 3, force_full): filepath
for filepath in sorted(files) for filepath in sorted(files)
} }
@@ -772,6 +865,8 @@ class UDCParser:
if result['success']: if result['success']:
results['imported'] += 1 results['imported'] += 1
results['total_records'] += result.get('records', 0) results['total_records'] += result.get('records', 0)
elif result.get('skipped') or 'no new content' in result.get('error', '').lower():
results['skipped'] += 1
elif 'already imported' in result.get('error', '').lower(): elif 'already imported' in result.get('error', '').lower():
results['skipped'] += 1 results['skipped'] += 1
else: else:
@@ -791,24 +886,48 @@ class UDCParser:
return results return results
def parse_file_worker(filepath: str, db_config: dict) -> Dict[str, Any]: def parse_file_worker(filepath: str, db_config: dict = None, use_pool: bool = True, max_retries: int = 3, force_full: bool = False) -> Dict[str, Any]:
"""Worker function for parallel file parsing. Creates its own DB connection.""" """Worker function for parallel file parsing. Uses connection pool by default with retry logic."""
parser = UDCParser(db_config) last_error = None
if not parser.connect():
return {'success': False, 'error': 'Failed to connect to database'}
try: for attempt in range(max_retries):
result = parser.parse_file(filepath) parser = UDCParser(db_config=db_config, use_pool=use_pool)
return result try:
finally: if not parser.connect():
parser.disconnect() last_error = 'Failed to connect to database'
continue
# Verify connection is alive before using
try:
parser.conn.ping(reconnect=True, attempts=2, delay=1)
except Exception:
parser.disconnect()
last_error = 'Connection ping failed'
continue
result = parser.parse_file(filepath, force_full=force_full)
return result
except Error as e:
last_error = str(e)
# Check if it's a connection error worth retrying
if e.errno in (2006, 2013, 2055): # Connection lost errors
parser.disconnect()
continue
else:
return {'success': False, 'error': str(e)}
finally:
parser.disconnect()
return {'success': False, 'error': f'Failed after {max_retries} attempts: {last_error}'}
def main(): def main():
parser = argparse.ArgumentParser(description='Parse UDC log files and import to database') parser = argparse.ArgumentParser(description='Parse UDC log files and import to database')
parser.add_argument('--dir', default=LOG_DIRECTORY, help='Directory containing log files') parser.add_argument('--dir', default=LOG_DIRECTORY, help='Directory containing log files')
parser.add_argument('--file', help='Parse a specific file instead of directory') parser.add_argument('--file', help='Parse a specific file instead of directory')
parser.add_argument('--workers', type=int, default=2, help='Number of parallel workers (default: 2, use 1 for sequential)') parser.add_argument('--force', action='store_true', help='Force full re-parse, ignoring previous position')
parser.add_argument('--workers', type=int, default=1, help='Number of parallel workers (default: 1 sequential, increase only if server supports it)')
parser.add_argument('--host', default=DB_CONFIG['host'], help='MySQL host') parser.add_argument('--host', default=DB_CONFIG['host'], help='MySQL host')
parser.add_argument('--port', type=int, default=DB_CONFIG['port'], help='MySQL port') parser.add_argument('--port', type=int, default=DB_CONFIG['port'], help='MySQL port')
parser.add_argument('--user', default=DB_CONFIG['user'], help='MySQL user') parser.add_argument('--user', default=DB_CONFIG['user'], help='MySQL user')
@@ -833,14 +952,16 @@ def main():
try: try:
if args.file: if args.file:
result = udc_parser.parse_file(args.file) result = udc_parser.parse_file(args.file, force_full=args.force)
if result['success']: if result['success']:
print(f"\nSuccessfully imported {result.get('records', 0)} records") print(f"\nSuccessfully imported {result.get('records', 0)} records")
elif result.get('skipped'):
print(f"\nNo new content to parse")
else: else:
print(f"\nFailed: {result.get('error')}") print(f"\nFailed: {result.get('error')}")
sys.exit(1) sys.exit(1)
else: else:
results = udc_parser.parse_directory(args.dir, max_workers=args.workers) results = udc_parser.parse_directory(args.dir, max_workers=args.workers, force_full=args.force)
print(f"\n{'='*50}") print(f"\n{'='*50}")
print(f"Import Summary:") print(f"Import Summary:")
print(f" Total files found: {results['total_files']}") print(f" Total files found: {results['total_files']}")