diff --git a/parser/config.py b/parser/config.py index f92c837..b8e714f 100644 --- a/parser/config.py +++ b/parser/config.py @@ -11,6 +11,11 @@ CLM_DATA_PATH_WINDOWS = r'S:\SPC\UDC\CLM_Data' CLM_DATA_PATH_LINUX = '/home/camp/projects/UDC/CLM_Data' CLM_DATA_PATH = CLM_DATA_PATH_WINDOWS if IS_WINDOWS else CLM_DATA_PATH_LINUX +# UDC Log paths +UDC_LOG_PATH_WINDOWS = r'S:\SPC\UDC\LogFiles' +UDC_LOG_PATH_LINUX = '/home/camp/projects/UDC/LogFiles' +UDC_LOG_PATH = UDC_LOG_PATH_WINDOWS if IS_WINDOWS else UDC_LOG_PATH_LINUX + # Database - Development (Docker on Linux) DB_CONFIG_DEV = { 'host': '127.0.0.1', diff --git a/parser/udcparser.py b/parser/udcparser.py index c6aec51..b16e7ec 100644 --- a/parser/udcparser.py +++ b/parser/udcparser.py @@ -17,11 +17,14 @@ from typing import Optional, Dict, List, Any from concurrent.futures import ThreadPoolExecutor, as_completed import mysql.connector from mysql.connector import Error +from mysql.connector.pooling import MySQLConnectionPool # Import config try: - from config import DB_CONFIG_DEV as DB_CONFIG, LOG_DIRECTORY, BATCH_SIZE + from config import DB_CONFIG, UDC_LOG_PATH as LOG_DIRECTORY, BATCH_SIZE except ImportError: + import platform + IS_WINDOWS = platform.system() == 'Windows' DB_CONFIG = { 'host': '127.0.0.1', 'port': 3306, @@ -29,9 +32,35 @@ except ImportError: 'password': 'rootpassword', 'database': 'shopdb' } - LOG_DIRECTORY = '/home/camp/projects/UDC/LogFiles' + LOG_DIRECTORY = r'S:\SPC\UDC\LogFiles' if IS_WINDOWS else '/home/camp/projects/UDC/LogFiles' BATCH_SIZE = 1000 +# Global connection pool (initialized in main or parse_directory) +_connection_pool = None + + +def init_connection_pool(db_config: dict, pool_size: int = 5) -> MySQLConnectionPool: + """Initialize the global connection pool""" + global _connection_pool + if _connection_pool is None: + _connection_pool = MySQLConnectionPool( + pool_name="udc_pool", + pool_size=pool_size, + pool_reset_session=True, + **db_config + ) + print(f"Connection pool created (size={pool_size})") + return _connection_pool + + +def get_connection_from_pool(): + """Get a connection from the pool""" + global _connection_pool + if _connection_pool is None: + raise RuntimeError("Connection pool not initialized. Call init_connection_pool() first.") + return _connection_pool.get_connection() + + # ============================================================================= # Regex Patterns # ============================================================================= @@ -57,28 +86,29 @@ RE_END_PROGRAM = re.compile(r': End of Program') RE_DATA_ENTRY = re.compile(r'^DATA ENTRY : (\d{2}/\d{2}/\d{4} \d{2}:\d{2}:\d{2})') # Process/Tool/Machine data +# Note: Log format has spaces after // like "// DIMID//" so we need \s* after each // RE_PROCESSDATA = re.compile( - r'\+\+\.PROCESSDATA//METHOD//(\w+)//\s*DIMID//(\w+)//\s*DESCRIPTION//(.+?)//\s*SEQ//\s*(\d+)//\s*MIN//(.+?)//\s*MAX//(.+?)//\s*ACTUAL//(.+?)//\s*DEVIATION//(.+?)//\s*OOT//\s*(\d)//\s*--' + r'\+\+\.PROCESSDATA//\s*METHOD//\s*(\w+)//\s*DIMID//\s*(\w+)//\s*DESCRIPTION//\s*(.+?)//\s*SEQ//\s*(\d+)//\s*MIN//\s*(.+?)//\s*MAX//\s*(.+?)//\s*ACTUAL//\s*(.+?)//\s*DEVIATION//\s*(.+?)//\s*OOT//\s*(\d)//\s*--' ) RE_TOOLDATA = re.compile( - r'\+\+\.TOOLDATA//METHOD//(\w+)//\s*DIMID//(\w+)//\s*DESCRIPTION//(.+?)//\s*SEQ//\s*(\d+)//\s*MIN//(.+?)//\s*MAX//(.+?)//\s*ACTUAL//(.+?)//\s*DEVIATION//(.+?)//\s*OOT//\s*(\d)//\s*--' + r'\+\+\.TOOLDATA//\s*METHOD//\s*(\w+)//\s*DIMID//\s*(\w+)//\s*DESCRIPTION//\s*(.+?)//\s*SEQ//\s*(\d+)//\s*MIN//\s*(.+?)//\s*MAX//\s*(.+?)//\s*ACTUAL//\s*(.+?)//\s*DEVIATION//\s*(.+?)//\s*OOT//\s*(\d)//\s*--' ) RE_MACHINEDATA = re.compile( - r'\+\+\.MACHINEDATA//METHOD//(\w+)//\s*DIMID//(\w+)//.*?MIN//(.+?)//\s*MAX//(.+?)//\s*ACTUAL//(.+?)//\s*DEVIATION//(.+?)//\s*OOT//\s*(\d)//\s*--' + r'\+\+\.MACHINEDATA//\s*METHOD//\s*(\w+)//\s*DIMID//\s*(\w+)//.*?MIN//\s*(.+?)//\s*MAX//\s*(.+?)//\s*ACTUAL//\s*(.+?)//\s*DEVIATION//\s*(.+?)//\s*OOT//\s*(\d)//\s*--' ) # Item crossing -RE_ITEM_CROSSING = re.compile(r'\+\+\.ITEM-CROSSING//ITEMNO//(.+?)//DESCRIPTION//(.+?)//--') +RE_ITEM_CROSSING = re.compile(r'\+\+\.ITEM-CROSSING//\s*ITEMNO//\s*(.+?)//\s*DESCRIPTION//\s*(.+?)//\s*--') # Manual request -RE_MANUAL_REQUEST = re.compile(r'\+\+\.MANUALREQUEST\d?//\s*DESCRIPTION//(.+?)//\s*--') +RE_MANUAL_REQUEST = re.compile(r'\+\+\.MANUALREQUEST\d?//\s*DESCRIPTION//\s*(.+?)//\s*--') # Message RE_MESSAGE = re.compile(r'\+\+\.MESSAGE//') # Start/End markers in data -RE_START = re.compile(r'\+\+\.START//PROGNAME//(.+?)//SNUMBNO//(.+?)//--') -RE_END = re.compile(r'\+\+\.END//--') +RE_START = re.compile(r'\+\+\.START//\s*PROGNAME//\s*(.+?)//\s*SNUMBNO//\s*(.+?)//\s*--') +RE_END = re.compile(r'\+\+\.END//\s*--') # Error patterns RE_ERROR_MSG = re.compile(r'^ERROR\s*:\s*(?:Message:\s*)?(.+)', re.IGNORECASE) @@ -124,10 +154,12 @@ def extract_machine_number(filename: str) -> Optional[str]: class UDCParser: """Parser for UDC log files""" - def __init__(self, db_config: dict): + def __init__(self, db_config: dict = None, use_pool: bool = False): self.db_config = db_config + self.use_pool = use_pool self.conn = None self.cursor = None + self._owns_connection = True # Track if we should close the connection # Current state self.current_session_id = None @@ -155,25 +187,31 @@ class UDCParser: self.current_machine_number = None def connect(self): - """Connect to MySQL database""" + """Connect to MySQL database (or get connection from pool)""" try: - self.conn = mysql.connector.connect(**self.db_config) + if self.use_pool: + self.conn = get_connection_from_pool() + self._owns_connection = True + else: + self.conn = mysql.connector.connect(**self.db_config) + print(f"Connected to MySQL at {self.db_config['host']}") self.cursor = self.conn.cursor(dictionary=True) - print(f"Connected to MySQL at {self.db_config['host']}") return True except Error as e: print(f"Error connecting to MySQL: {e}") return False def disconnect(self): - """Disconnect from database""" + """Disconnect from database (returns connection to pool if pooled)""" if self.cursor: self.cursor.close() - if self.conn: - self.conn.close() + self.cursor = None + if self.conn and self._owns_connection: + self.conn.close() # For pooled connections, this returns to pool + self.conn = None def is_file_imported(self, filename: str) -> bool: - """Check if a log file has already been imported""" + """Check if a log file has already been imported (DEPRECATED - use get_file_state instead)""" basename = os.path.basename(filename) self.cursor.execute( "SELECT sessionid FROM udcsessions WHERE logfilename = %s", @@ -181,6 +219,27 @@ class UDCParser: ) return self.cursor.fetchone() is not None + def get_file_state(self, filepath: str) -> Dict[str, Any]: + """Get the last known state of a log file (offset, size)""" + self.cursor.execute( + "SELECT fileid, lastoffset, filesize FROM udcfilestate WHERE filepath = %s", + (filepath,) + ) + row = self.cursor.fetchone() + if row: + return {'fileid': row['fileid'], 'offset': row['lastoffset'], 'size': row['filesize']} + return {'fileid': None, 'offset': 0, 'size': 0} + + def update_file_state(self, filepath: str, machine_number: str, offset: int, filesize: int): + """Update the file state after parsing""" + self.cursor.execute( + """INSERT INTO udcfilestate (filepath, machinenumber, lastoffset, lastparsed, filesize) + VALUES (%s, %s, %s, NOW(), %s) + ON DUPLICATE KEY UPDATE lastoffset = %s, lastparsed = NOW(), filesize = %s""", + (filepath, machine_number, offset, filesize, offset, filesize) + ) + self.conn.commit() + def create_session(self, filename: str, machine_number: str, start_time: datetime) -> int: """Create a new session record""" basename = os.path.basename(filename) @@ -472,19 +531,37 @@ class UDCParser: self.conn.commit() self.connections_batch = [] - def parse_file(self, filepath: str) -> Dict[str, Any]: - """Parse a single UDC log file""" + def parse_file(self, filepath: str, force_full: bool = False) -> Dict[str, Any]: + """Parse a single UDC log file incrementally from last known position""" filename = os.path.basename(filepath) machine_number = extract_machine_number(filename) if not machine_number: return {'success': False, 'error': f'Could not extract machine number from {filename}'} - # Check if already imported - if self.is_file_imported(filepath): - return {'success': False, 'error': f'File already imported: {filename}'} + # Get current file size + try: + current_size = os.path.getsize(filepath) + except OSError as e: + return {'success': False, 'error': f'Cannot access file: {e}'} - print(f"Parsing {filename} (machine {machine_number})...") + # Get last known state + file_state = self.get_file_state(filepath) + start_offset = 0 if force_full else file_state['offset'] + + # Check if file has new content + if not force_full and current_size <= file_state['size'] and file_state['offset'] > 0: + return {'success': False, 'error': f'No new content in {filename}', 'skipped': True} + + # If file is smaller than last offset, it was likely rotated/truncated - start fresh + if current_size < start_offset: + print(f" File {filename} appears truncated, parsing from beginning...") + start_offset = 0 + + if start_offset > 0: + print(f"Parsing {filename} (machine {machine_number}) from offset {start_offset}...") + else: + print(f"Parsing {filename} (machine {machine_number})...") # Reset state self.current_session_id = None @@ -507,7 +584,14 @@ class UDCParser: try: with open(filepath, 'r', encoding='utf-8', errors='ignore') as f: - for line_num, line in enumerate(f, 1): + # Seek to last known position + if start_offset > 0: + f.seek(start_offset) + # If we're in the middle of a line, skip to next complete line + if start_offset > 0: + f.readline() # Skip partial line + + for line in f: line = line.rstrip() # Skip empty lines @@ -707,14 +791,17 @@ class UDCParser: if self.current_session_id: self.update_session_end(self.current_session_id, session_end or self.current_timestamp, record_count) + # Update file state with current position + self.update_file_state(filepath, machine_number, current_size, current_size) + print(f" Parsed {record_count} records") - return {'success': True, 'records': record_count, 'session_id': self.current_session_id} + return {'success': True, 'records': record_count, 'session_id': self.current_session_id, 'new_content': True} except Exception as e: print(f" Error parsing file: {e}") return {'success': False, 'error': str(e)} - def parse_directory(self, directory: str, max_workers: int = 4) -> Dict[str, Any]: + def parse_directory(self, directory: str, max_workers: int = 4, force_full: bool = False) -> Dict[str, Any]: """Parse all UDC log files in a directory using parallel processing""" results = { 'total_files': 0, @@ -740,25 +827,31 @@ class UDCParser: if max_workers <= 1: # Sequential processing for filepath in sorted(files): - result = self.parse_file(filepath) + result = self.parse_file(filepath, force_full=force_full) if result['success']: results['imported'] += 1 results['total_records'] += result.get('records', 0) + elif result.get('skipped') or 'no new content' in result.get('error', '').lower(): + results['skipped'] += 1 elif 'already imported' in result.get('error', '').lower(): results['skipped'] += 1 else: results['errors'] += 1 print(f" Error: {result.get('error')}") else: - # Parallel processing - print(f"Using {max_workers} parallel workers...") + # Parallel processing with connection pool + # Pool size = workers + 1 buffer for safety + pool_size = min(max_workers + 1, 10) + init_connection_pool(self.db_config, pool_size=pool_size) + + print(f"Using {max_workers} parallel workers with connection pool...") completed = 0 with ThreadPoolExecutor(max_workers=max_workers) as executor: - # Submit all files for processing + # Submit all files for processing (use pool, not individual connections) futures = { - executor.submit(parse_file_worker, filepath, self.db_config): filepath + executor.submit(parse_file_worker, filepath, None, True, 3, force_full): filepath for filepath in sorted(files) } @@ -772,6 +865,8 @@ class UDCParser: if result['success']: results['imported'] += 1 results['total_records'] += result.get('records', 0) + elif result.get('skipped') or 'no new content' in result.get('error', '').lower(): + results['skipped'] += 1 elif 'already imported' in result.get('error', '').lower(): results['skipped'] += 1 else: @@ -791,24 +886,48 @@ class UDCParser: return results -def parse_file_worker(filepath: str, db_config: dict) -> Dict[str, Any]: - """Worker function for parallel file parsing. Creates its own DB connection.""" - parser = UDCParser(db_config) - if not parser.connect(): - return {'success': False, 'error': 'Failed to connect to database'} +def parse_file_worker(filepath: str, db_config: dict = None, use_pool: bool = True, max_retries: int = 3, force_full: bool = False) -> Dict[str, Any]: + """Worker function for parallel file parsing. Uses connection pool by default with retry logic.""" + last_error = None - try: - result = parser.parse_file(filepath) - return result - finally: - parser.disconnect() + for attempt in range(max_retries): + parser = UDCParser(db_config=db_config, use_pool=use_pool) + try: + if not parser.connect(): + last_error = 'Failed to connect to database' + continue + + # Verify connection is alive before using + try: + parser.conn.ping(reconnect=True, attempts=2, delay=1) + except Exception: + parser.disconnect() + last_error = 'Connection ping failed' + continue + + result = parser.parse_file(filepath, force_full=force_full) + return result + + except Error as e: + last_error = str(e) + # Check if it's a connection error worth retrying + if e.errno in (2006, 2013, 2055): # Connection lost errors + parser.disconnect() + continue + else: + return {'success': False, 'error': str(e)} + finally: + parser.disconnect() + + return {'success': False, 'error': f'Failed after {max_retries} attempts: {last_error}'} def main(): parser = argparse.ArgumentParser(description='Parse UDC log files and import to database') parser.add_argument('--dir', default=LOG_DIRECTORY, help='Directory containing log files') parser.add_argument('--file', help='Parse a specific file instead of directory') - parser.add_argument('--workers', type=int, default=2, help='Number of parallel workers (default: 2, use 1 for sequential)') + parser.add_argument('--force', action='store_true', help='Force full re-parse, ignoring previous position') + parser.add_argument('--workers', type=int, default=1, help='Number of parallel workers (default: 1 sequential, increase only if server supports it)') parser.add_argument('--host', default=DB_CONFIG['host'], help='MySQL host') parser.add_argument('--port', type=int, default=DB_CONFIG['port'], help='MySQL port') parser.add_argument('--user', default=DB_CONFIG['user'], help='MySQL user') @@ -833,14 +952,16 @@ def main(): try: if args.file: - result = udc_parser.parse_file(args.file) + result = udc_parser.parse_file(args.file, force_full=args.force) if result['success']: print(f"\nSuccessfully imported {result.get('records', 0)} records") + elif result.get('skipped'): + print(f"\nNo new content to parse") else: print(f"\nFailed: {result.get('error')}") sys.exit(1) else: - results = udc_parser.parse_directory(args.dir, max_workers=args.workers) + results = udc_parser.parse_directory(args.dir, max_workers=args.workers, force_full=args.force) print(f"\n{'='*50}") print(f"Import Summary:") print(f" Total files found: {results['total_files']}")