""" Migrate USB checkout data from legacy database. This script migrates USB device and checkout data from the VBScript database to the new USB plugin schema. Usage: python -m scripts.migration.migrate_usb --source """ import argparse import logging from datetime import datetime from sqlalchemy import create_engine, text from sqlalchemy.orm import sessionmaker logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) def get_device_type_mapping(target_session): """Get mapping of type names to IDs in target database.""" result = target_session.execute(text( "SELECT usbdevicetypeid, typename FROM usbdevicetypes" )) return {row.typename.lower(): row.usbdevicetypeid for row in result} def run_migration(source_conn_str, target_conn_str, dry_run=False): """ Run USB device migration. Args: source_conn_str: Source database connection string target_conn_str: Target database connection string dry_run: If True, don't commit changes """ source_engine = create_engine(source_conn_str) target_engine = create_engine(target_conn_str) SourceSession = sessionmaker(bind=source_engine) TargetSession = sessionmaker(bind=target_engine) source_session = SourceSession() target_session = TargetSession() try: # Get type mappings type_mapping = get_device_type_mapping(target_session) default_type_id = type_mapping.get('flash drive', 1) # Migrate USB devices # Adjust table/column names based on actual legacy schema logger.info("Migrating USB devices...") try: devices = source_session.execute(text(""" SELECT * FROM usbdevices """)) device_id_map = {} # Map old IDs to new IDs for device in devices: device_dict = dict(device._mapping) # Determine device type type_name = (device_dict.get('typename') or 'flash drive').lower() type_id = type_mapping.get(type_name, default_type_id) result = target_session.execute(text(""" INSERT INTO usbdevices ( serialnumber, label, assetnumber, usbdevicetypeid, capacitygb, vendorid, productid, manufacturer, productname, ischeckedout, currentuserid, currentusername, storagelocation, notes, isactive, createddate ) VALUES ( :serialnumber, :label, :assetnumber, :usbdevicetypeid, :capacitygb, :vendorid, :productid, :manufacturer, :productname, :ischeckedout, :currentuserid, :currentusername, :storagelocation, :notes, :isactive, :createddate ) """), { 'serialnumber': device_dict.get('serialnumber', f"UNKNOWN_{device_dict.get('usbdeviceid', 0)}"), 'label': device_dict.get('label'), 'assetnumber': device_dict.get('assetnumber'), 'usbdevicetypeid': type_id, 'capacitygb': device_dict.get('capacitygb'), 'vendorid': device_dict.get('vendorid'), 'productid': device_dict.get('productid'), 'manufacturer': device_dict.get('manufacturer'), 'productname': device_dict.get('productname'), 'ischeckedout': device_dict.get('ischeckedout', False), 'currentuserid': device_dict.get('currentuserid'), 'currentusername': device_dict.get('currentusername'), 'storagelocation': device_dict.get('storagelocation'), 'notes': device_dict.get('notes'), 'isactive': device_dict.get('isactive', True), 'createddate': device_dict.get('createddate', datetime.utcnow()), }) # Get the new ID new_id = target_session.execute(text("SELECT LAST_INSERT_ID()")).scalar() device_id_map[device_dict.get('usbdeviceid')] = new_id logger.info(f"Migrated {len(device_id_map)} USB devices") except Exception as e: logger.warning(f"Could not migrate USB devices: {e}") device_id_map = {} # Migrate checkout history logger.info("Migrating USB checkout history...") try: checkouts = source_session.execute(text(""" SELECT * FROM usbcheckouts """)) checkout_count = 0 for checkout in checkouts: checkout_dict = dict(checkout._mapping) old_device_id = checkout_dict.get('usbdeviceid') new_device_id = device_id_map.get(old_device_id) if not new_device_id: logger.warning(f"Skipping checkout - device ID {old_device_id} not found in mapping") continue target_session.execute(text(""" INSERT INTO usbcheckouts ( usbdeviceid, userid, username, checkoutdate, checkindate, expectedreturndate, purpose, notes, checkedoutby, checkedinby, isactive, createddate ) VALUES ( :usbdeviceid, :userid, :username, :checkoutdate, :checkindate, :expectedreturndate, :purpose, :notes, :checkedoutby, :checkedinby, :isactive, :createddate ) """), { 'usbdeviceid': new_device_id, 'userid': checkout_dict.get('userid', 'unknown'), 'username': checkout_dict.get('username'), 'checkoutdate': checkout_dict.get('checkoutdate', datetime.utcnow()), 'checkindate': checkout_dict.get('checkindate'), 'expectedreturndate': checkout_dict.get('expectedreturndate'), 'purpose': checkout_dict.get('purpose'), 'notes': checkout_dict.get('notes'), 'checkedoutby': checkout_dict.get('checkedoutby'), 'checkedinby': checkout_dict.get('checkedinby'), 'isactive': True, 'createddate': checkout_dict.get('createddate', datetime.utcnow()), }) checkout_count += 1 logger.info(f"Migrated {checkout_count} checkout records") except Exception as e: logger.warning(f"Could not migrate USB checkouts: {e}") if dry_run: logger.info("Dry run - rolling back changes") target_session.rollback() else: target_session.commit() logger.info("USB migration complete") finally: source_session.close() target_session.close() def main(): parser = argparse.ArgumentParser(description='Migrate USB devices and checkouts') parser.add_argument('--source', required=True, help='Source database connection string') parser.add_argument('--target', help='Target database connection string') parser.add_argument('--dry-run', action='store_true', help='Dry run without committing') args = parser.parse_args() target = args.target if not target: import os import sys sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.dirname(__file__)))) from shopdb import create_app app = create_app() target = app.config['SQLALCHEMY_DATABASE_URI'] run_migration(args.source, target, args.dry_run) if __name__ == '__main__': main()