Flask backend with Vue 3 frontend for shop floor machine management. Includes database schema export for MySQL shopdb_flask database. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
582 lines
20 KiB
Python
582 lines
20 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Import data from legacy MySQL ShopDB to new Flask ShopDB.
|
|
|
|
Usage:
|
|
cd /home/camp/projects/shopdb-flask
|
|
source venv/bin/activate
|
|
python scripts/import_from_mysql.py
|
|
"""
|
|
|
|
import os
|
|
import sys
|
|
import pymysql
|
|
from datetime import datetime
|
|
from dotenv import load_dotenv
|
|
|
|
# Add parent directory to path for imports
|
|
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
|
|
|
# Load environment variables
|
|
load_dotenv(os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), '.env'))
|
|
|
|
# MySQL connection settings
|
|
MYSQL_CONFIG = {
|
|
'host': '127.0.0.1',
|
|
'port': 3306,
|
|
'user': 'root',
|
|
'password': 'rootpassword',
|
|
'database': 'shopdb',
|
|
'charset': 'utf8mb4',
|
|
'cursorclass': pymysql.cursors.DictCursor
|
|
}
|
|
|
|
|
|
def get_mysql_connection():
|
|
"""Get MySQL connection."""
|
|
return pymysql.connect(**MYSQL_CONFIG)
|
|
|
|
|
|
def import_vendors(mysql_conn, db, Vendor):
|
|
"""Import vendors from MySQL."""
|
|
print("Importing vendors...")
|
|
cursor = mysql_conn.cursor()
|
|
cursor.execute("SELECT * FROM vendors WHERE isactive = 1")
|
|
vendors = cursor.fetchall()
|
|
|
|
count = 0
|
|
for v in vendors:
|
|
existing = Vendor.query.filter_by(vendor=v['vendor']).first()
|
|
if not existing:
|
|
vendor = Vendor(
|
|
vendor=v['vendor'],
|
|
isactive=True
|
|
)
|
|
db.session.add(vendor)
|
|
count += 1
|
|
|
|
db.session.commit()
|
|
print(f" Imported {count} vendors")
|
|
return count
|
|
|
|
|
|
def import_machinetypes(mysql_conn, db, MachineType):
|
|
"""Import machine types from MySQL with category mapping."""
|
|
print("Importing machine types...")
|
|
cursor = mysql_conn.cursor()
|
|
cursor.execute("SELECT * FROM machinetypes WHERE isactive = 1")
|
|
types = cursor.fetchall()
|
|
|
|
# Category mapping based on machinetype name
|
|
pc_types = ['PC']
|
|
network_types = ['Access Point', 'IDF', 'Switch', 'Server', 'Camera']
|
|
printer_types = ['Printer']
|
|
|
|
count = 0
|
|
for t in types:
|
|
existing = MachineType.query.filter_by(machinetype=t['machinetype']).first()
|
|
if not existing:
|
|
# Determine category
|
|
if t['machinetype'] in pc_types:
|
|
category = 'PC'
|
|
elif t['machinetype'] in network_types:
|
|
category = 'Network'
|
|
elif t['machinetype'] in printer_types:
|
|
category = 'Printer'
|
|
else:
|
|
category = 'Equipment'
|
|
|
|
mt = MachineType(
|
|
machinetype=t['machinetype'],
|
|
category=category,
|
|
description=t.get('machinedescription'),
|
|
isactive=True
|
|
)
|
|
db.session.add(mt)
|
|
count += 1
|
|
|
|
db.session.commit()
|
|
print(f" Imported {count} machine types")
|
|
return count
|
|
|
|
|
|
def import_pctypes(mysql_conn, db, PCType):
|
|
"""Import PC types from MySQL."""
|
|
print("Importing PC types...")
|
|
cursor = mysql_conn.cursor()
|
|
cursor.execute("SELECT * FROM pctype WHERE isactive = '1'")
|
|
types = cursor.fetchall()
|
|
|
|
count = 0
|
|
for t in types:
|
|
existing = PCType.query.filter_by(pctype=t['typename']).first()
|
|
if not existing:
|
|
pctype = PCType(
|
|
pctype=t['typename'],
|
|
description=t.get('description'),
|
|
isactive=True
|
|
)
|
|
db.session.add(pctype)
|
|
count += 1
|
|
|
|
db.session.commit()
|
|
print(f" Imported {count} PC types")
|
|
return count
|
|
|
|
|
|
def import_businessunits(mysql_conn, db, BusinessUnit):
|
|
"""Import business units from MySQL."""
|
|
print("Importing business units...")
|
|
cursor = mysql_conn.cursor()
|
|
cursor.execute("SELECT * FROM businessunits WHERE isactive = 1")
|
|
units = cursor.fetchall()
|
|
|
|
count = 0
|
|
for bu in units:
|
|
existing = BusinessUnit.query.filter_by(businessunit=bu['businessunit']).first()
|
|
if not existing:
|
|
unit = BusinessUnit(
|
|
businessunit=bu['businessunit'],
|
|
isactive=True
|
|
)
|
|
db.session.add(unit)
|
|
count += 1
|
|
|
|
db.session.commit()
|
|
print(f" Imported {count} business units")
|
|
return count
|
|
|
|
|
|
def import_statuses(mysql_conn, db, MachineStatus):
|
|
"""Import machine statuses from MySQL."""
|
|
print("Importing machine statuses...")
|
|
cursor = mysql_conn.cursor()
|
|
cursor.execute("SELECT * FROM machinestatus WHERE isactive = 1")
|
|
statuses = cursor.fetchall()
|
|
|
|
count = 0
|
|
for s in statuses:
|
|
existing = MachineStatus.query.filter_by(status=s['machinestatus']).first()
|
|
if not existing:
|
|
status = MachineStatus(
|
|
status=s['machinestatus'],
|
|
description=s.get('statusdescription'),
|
|
isactive=True
|
|
)
|
|
db.session.add(status)
|
|
count += 1
|
|
|
|
db.session.commit()
|
|
print(f" Imported {count} statuses")
|
|
return count
|
|
|
|
|
|
def import_operatingsystems(mysql_conn, db, OperatingSystem):
|
|
"""Import operating systems from MySQL."""
|
|
print("Importing operating systems...")
|
|
cursor = mysql_conn.cursor()
|
|
cursor.execute("SELECT * FROM operatingsystems")
|
|
os_list = cursor.fetchall()
|
|
|
|
count = 0
|
|
for os_item in os_list:
|
|
os_name = os_item.get('operatingsystem') or os_item.get('osname')
|
|
if not os_name:
|
|
continue
|
|
|
|
existing = OperatingSystem.query.filter_by(osname=os_name).first()
|
|
if not existing:
|
|
os_obj = OperatingSystem(
|
|
osname=os_name,
|
|
osversion=os_item.get('osversion'),
|
|
isactive=True
|
|
)
|
|
db.session.add(os_obj)
|
|
count += 1
|
|
|
|
db.session.commit()
|
|
print(f" Imported {count} operating systems")
|
|
return count
|
|
|
|
|
|
def import_models(mysql_conn, db, Model, Vendor, MachineType):
|
|
"""Import models from MySQL."""
|
|
print("Importing models...")
|
|
cursor = mysql_conn.cursor()
|
|
cursor.execute("""
|
|
SELECT m.*, v.vendor as vendor_name, mt.machinetype as type_name
|
|
FROM models m
|
|
LEFT JOIN vendors v ON m.vendorid = v.vendorid
|
|
LEFT JOIN machinetypes mt ON m.machinetypeid = mt.machinetypeid
|
|
WHERE m.isactive = 1
|
|
""")
|
|
models = cursor.fetchall()
|
|
|
|
count = 0
|
|
for m in models:
|
|
existing = Model.query.filter_by(modelnumber=m['modelnumber']).first()
|
|
if not existing:
|
|
# Find vendor and machinetype in new db
|
|
vendor = Vendor.query.filter_by(vendor=m['vendor_name']).first() if m['vendor_name'] else None
|
|
machinetype = MachineType.query.filter_by(machinetype=m['type_name']).first() if m['type_name'] else None
|
|
|
|
model = Model(
|
|
modelnumber=m['modelnumber'],
|
|
vendorid=vendor.vendorid if vendor else None,
|
|
machinetypeid=machinetype.machinetypeid if machinetype else None,
|
|
notes=m.get('notes'),
|
|
isactive=True
|
|
)
|
|
db.session.add(model)
|
|
count += 1
|
|
|
|
db.session.commit()
|
|
print(f" Imported {count} models")
|
|
return count
|
|
|
|
|
|
def import_relationshiptypes(mysql_conn, db, RelationshipType):
|
|
"""Import relationship types from MySQL."""
|
|
print("Importing relationship/connection types...")
|
|
cursor = mysql_conn.cursor()
|
|
cursor.execute("SELECT * FROM relationshiptypes WHERE isactive = 1")
|
|
types = cursor.fetchall()
|
|
|
|
count = 0
|
|
for rt in types:
|
|
existing = RelationshipType.query.filter_by(relationshiptype=rt['relationshiptype']).first()
|
|
if not existing:
|
|
rel_type = RelationshipType(
|
|
relationshiptype=rt['relationshiptype'],
|
|
description=rt.get('description'),
|
|
isactive=True
|
|
)
|
|
db.session.add(rel_type)
|
|
count += 1
|
|
|
|
db.session.commit()
|
|
print(f" Imported {count} relationship types")
|
|
return count
|
|
|
|
|
|
def import_machines(mysql_conn, db, Machine, MachineType, MachineStatus,
|
|
Vendor, Model, BusinessUnit, OperatingSystem, Location,
|
|
Communication, CommunicationType, PCType):
|
|
"""Import machines (Equipment and PCs) from MySQL."""
|
|
print("Importing machines...")
|
|
cursor = mysql_conn.cursor()
|
|
|
|
# Get machines with related data
|
|
cursor.execute("""
|
|
SELECT m.*,
|
|
mt.machinetype as type_name,
|
|
ms.machinestatus as status_name,
|
|
v.vendor as vendor_name,
|
|
mdl.modelnumber as model_name,
|
|
bu.businessunit as bu_name,
|
|
os.operatingsystem as os_name,
|
|
pt.typename as pctype_name
|
|
FROM machines m
|
|
LEFT JOIN machinetypes mt ON m.machinetypeid = mt.machinetypeid
|
|
LEFT JOIN machinestatus ms ON m.machinestatusid = ms.machinestatusid
|
|
LEFT JOIN models mdl ON m.modelnumberid = mdl.modelnumberid
|
|
LEFT JOIN vendors v ON mdl.vendorid = v.vendorid
|
|
LEFT JOIN businessunits bu ON m.businessunitid = bu.businessunitid
|
|
LEFT JOIN operatingsystems os ON m.osid = os.osid
|
|
LEFT JOIN pctype pt ON m.pctypeid = pt.pctypeid
|
|
WHERE m.isactive = 1
|
|
""")
|
|
machines = cursor.fetchall()
|
|
|
|
# Get or create IP communication type
|
|
ip_comtype = CommunicationType.query.filter_by(comtype='IP').first()
|
|
if not ip_comtype:
|
|
ip_comtype = CommunicationType(comtype='IP', description='IP Network')
|
|
db.session.add(ip_comtype)
|
|
db.session.flush()
|
|
|
|
# Build lookup maps
|
|
type_map = {t.machinetype: t for t in MachineType.query.all()}
|
|
status_map = {s.status: s for s in MachineStatus.query.all()}
|
|
vendor_map = {v.vendor: v for v in Vendor.query.all()}
|
|
model_map = {m.modelnumber: m for m in Model.query.all()}
|
|
bu_map = {b.businessunit: b for b in BusinessUnit.query.all()}
|
|
os_map = {o.osname: o for o in OperatingSystem.query.all()}
|
|
pctype_map = {p.pctype: p for p in PCType.query.all()}
|
|
|
|
# Track old->new ID mapping for relationships
|
|
machine_id_map = {}
|
|
|
|
count = 0
|
|
comm_count = 0
|
|
skipped = 0
|
|
for m in machines:
|
|
# Skip machines without a machinenumber
|
|
if not m.get('machinenumber'):
|
|
skipped += 1
|
|
continue
|
|
|
|
# Check if already exists
|
|
existing = Machine.query.filter_by(machinenumber=m['machinenumber']).first()
|
|
if existing:
|
|
machine_id_map[m['machineid']] = existing.machineid
|
|
continue
|
|
|
|
# Get related objects
|
|
machinetype = type_map.get(m['type_name'])
|
|
status = status_map.get(m['status_name'])
|
|
vendor = vendor_map.get(m['vendor_name'])
|
|
model = model_map.get(m['model_name'])
|
|
bu = bu_map.get(m['bu_name'])
|
|
os_obj = os_map.get(m['os_name'])
|
|
pctype = pctype_map.get(m['pctype_name'])
|
|
|
|
machine = Machine(
|
|
machinenumber=m['machinenumber'],
|
|
alias=m.get('alias'),
|
|
hostname=m.get('hostname'),
|
|
serialnumber=m.get('serialnumber'),
|
|
machinetypeid=machinetype.machinetypeid if machinetype else None,
|
|
pctypeid=pctype.pctypeid if pctype else None,
|
|
statusid=status.statusid if status else None,
|
|
vendorid=vendor.vendorid if vendor else None,
|
|
modelnumberid=model.modelnumberid if model else None,
|
|
businessunitid=bu.businessunitid if bu else None,
|
|
osid=os_obj.osid if os_obj else None,
|
|
mapleft=m.get('mapleft'),
|
|
maptop=m.get('maptop'),
|
|
isvnc=bool(m.get('isvnc')),
|
|
iswinrm=bool(m.get('iswinrm')),
|
|
islocationonly=bool(m.get('islocationonly')),
|
|
loggedinuser=m.get('loggedinuser'),
|
|
notes=m.get('machinenotes'),
|
|
isactive=True
|
|
)
|
|
db.session.add(machine)
|
|
db.session.flush() # Get the new ID
|
|
|
|
machine_id_map[m['machineid']] = machine.machineid
|
|
count += 1
|
|
|
|
# Import IP addresses
|
|
if m.get('ipaddress1'):
|
|
comm = Communication(
|
|
machineid=machine.machineid,
|
|
comtypeid=ip_comtype.comtypeid,
|
|
ipaddress=m['ipaddress1'],
|
|
isprimary=True
|
|
)
|
|
db.session.add(comm)
|
|
comm_count += 1
|
|
|
|
if m.get('ipaddress2'):
|
|
comm = Communication(
|
|
machineid=machine.machineid,
|
|
comtypeid=ip_comtype.comtypeid,
|
|
ipaddress=m['ipaddress2'],
|
|
isprimary=False
|
|
)
|
|
db.session.add(comm)
|
|
comm_count += 1
|
|
|
|
db.session.commit()
|
|
print(f" Imported {count} machines with {comm_count} IP addresses (skipped {skipped} invalid)")
|
|
return machine_id_map
|
|
|
|
|
|
def import_relationships(mysql_conn, db, MachineRelationship, RelationshipType, machine_id_map):
|
|
"""Import machine relationships from MySQL."""
|
|
print("Importing machine relationships...")
|
|
cursor = mysql_conn.cursor()
|
|
cursor.execute("""
|
|
SELECT mr.*, rt.relationshiptype
|
|
FROM machinerelationships mr
|
|
JOIN relationshiptypes rt ON mr.relationshiptypeid = rt.relationshiptypeid
|
|
WHERE mr.isactive = 1
|
|
""")
|
|
relationships = cursor.fetchall()
|
|
|
|
# Build relationship type map
|
|
type_map = {t.relationshiptype: t for t in RelationshipType.query.all()}
|
|
|
|
count = 0
|
|
skipped = 0
|
|
for r in relationships:
|
|
# Map old IDs to new IDs
|
|
parent_id = machine_id_map.get(r['machineid'])
|
|
child_id = machine_id_map.get(r['related_machineid'])
|
|
|
|
if not parent_id or not child_id:
|
|
skipped += 1
|
|
continue
|
|
|
|
rel_type = type_map.get(r['relationshiptype'])
|
|
if not rel_type:
|
|
skipped += 1
|
|
continue
|
|
|
|
# Check if already exists
|
|
existing = MachineRelationship.query.filter_by(
|
|
parentmachineid=parent_id,
|
|
childmachineid=child_id,
|
|
relationshiptypeid=rel_type.relationshiptypeid
|
|
).first()
|
|
|
|
if not existing:
|
|
relationship = MachineRelationship(
|
|
parentmachineid=parent_id,
|
|
childmachineid=child_id,
|
|
relationshiptypeid=rel_type.relationshiptypeid,
|
|
notes=r.get('relationship_notes')
|
|
)
|
|
db.session.add(relationship)
|
|
count += 1
|
|
|
|
db.session.commit()
|
|
print(f" Imported {count} relationships (skipped {skipped})")
|
|
return count
|
|
|
|
|
|
def import_printers(mysql_conn, db, Machine, MachineType, Model, Vendor,
|
|
Communication, CommunicationType):
|
|
"""Import printers from MySQL."""
|
|
print("Importing printers...")
|
|
|
|
# First, ensure we have a Printer machine type
|
|
printer_type = MachineType.query.filter_by(machinetype='Printer').first()
|
|
if not printer_type:
|
|
printer_type = MachineType(machinetype='Printer', category='Printer')
|
|
db.session.add(printer_type)
|
|
db.session.flush()
|
|
|
|
# Get or create IP communication type
|
|
ip_comtype = CommunicationType.query.filter_by(comtype='IP').first()
|
|
if not ip_comtype:
|
|
ip_comtype = CommunicationType(comtype='IP', description='IP Network')
|
|
db.session.add(ip_comtype)
|
|
db.session.flush()
|
|
|
|
cursor = mysql_conn.cursor()
|
|
cursor.execute("""
|
|
SELECT p.*, m.modelnumber, v.vendor as vendor_name
|
|
FROM printers p
|
|
LEFT JOIN models m ON p.modelid = m.modelnumberid
|
|
LEFT JOIN vendors v ON m.vendorid = v.vendorid
|
|
WHERE p.isactive = 1
|
|
""")
|
|
printers = cursor.fetchall()
|
|
|
|
# Build lookup maps
|
|
model_map = {m.modelnumber: m for m in Model.query.all()}
|
|
vendor_map = {v.vendor: v for v in Vendor.query.all()}
|
|
|
|
count = 0
|
|
comm_count = 0
|
|
for p in printers:
|
|
# Use windows name as machine number
|
|
machine_number = p.get('printerwindowsname') or f"Printer_{p['printerid']}"
|
|
|
|
existing = Machine.query.filter_by(machinenumber=machine_number).first()
|
|
if existing:
|
|
continue
|
|
|
|
model = model_map.get(p.get('modelnumber'))
|
|
vendor = vendor_map.get(p.get('vendor_name'))
|
|
|
|
machine = Machine(
|
|
machinenumber=machine_number,
|
|
alias=p.get('printercsfname'),
|
|
hostname=p.get('fqdn'),
|
|
serialnumber=p.get('serialnumber'),
|
|
machinetypeid=printer_type.machinetypeid,
|
|
vendorid=vendor.vendorid if vendor else None,
|
|
modelnumberid=model.modelnumberid if model else None,
|
|
mapleft=p.get('mapleft'),
|
|
maptop=p.get('maptop'),
|
|
notes=p.get('printernotes'),
|
|
isactive=True
|
|
)
|
|
db.session.add(machine)
|
|
db.session.flush()
|
|
count += 1
|
|
|
|
# Import IP address
|
|
if p.get('ipaddress'):
|
|
comm = Communication(
|
|
machineid=machine.machineid,
|
|
comtypeid=ip_comtype.comtypeid,
|
|
ipaddress=p['ipaddress'],
|
|
isprimary=True
|
|
)
|
|
db.session.add(comm)
|
|
comm_count += 1
|
|
|
|
db.session.commit()
|
|
print(f" Imported {count} printers with {comm_count} IP addresses")
|
|
return count
|
|
|
|
|
|
def main():
|
|
"""Main import function."""
|
|
print("=" * 60)
|
|
print("ShopDB MySQL to Flask Migration")
|
|
print("=" * 60)
|
|
|
|
# Initialize Flask app
|
|
from shopdb import create_app
|
|
from shopdb.extensions import db
|
|
from shopdb.core.models import (
|
|
Machine, MachineType, MachineStatus, Vendor, Model,
|
|
BusinessUnit, OperatingSystem, Location, PCType
|
|
)
|
|
from shopdb.core.models.communication import Communication, CommunicationType
|
|
from shopdb.core.models.relationship import MachineRelationship, RelationshipType
|
|
|
|
app = create_app()
|
|
|
|
with app.app_context():
|
|
# Connect to MySQL
|
|
print("\nConnecting to MySQL...")
|
|
mysql_conn = get_mysql_connection()
|
|
print(" Connected!")
|
|
|
|
try:
|
|
# Import reference data
|
|
print("\n--- Reference Data ---")
|
|
import_vendors(mysql_conn, db, Vendor)
|
|
import_machinetypes(mysql_conn, db, MachineType)
|
|
import_pctypes(mysql_conn, db, PCType)
|
|
import_businessunits(mysql_conn, db, BusinessUnit)
|
|
import_statuses(mysql_conn, db, MachineStatus)
|
|
import_operatingsystems(mysql_conn, db, OperatingSystem)
|
|
import_models(mysql_conn, db, Model, Vendor, MachineType)
|
|
import_relationshiptypes(mysql_conn, db, RelationshipType)
|
|
|
|
# Import machines
|
|
print("\n--- Machines ---")
|
|
machine_id_map = import_machines(
|
|
mysql_conn, db, Machine, MachineType, MachineStatus,
|
|
Vendor, Model, BusinessUnit, OperatingSystem, Location,
|
|
Communication, CommunicationType, PCType
|
|
)
|
|
|
|
# Import relationships
|
|
print("\n--- Relationships ---")
|
|
import_relationships(mysql_conn, db, MachineRelationship, RelationshipType, machine_id_map)
|
|
|
|
# Import printers
|
|
print("\n--- Printers ---")
|
|
import_printers(mysql_conn, db, Machine, MachineType, Model, Vendor,
|
|
Communication, CommunicationType)
|
|
|
|
print("\n" + "=" * 60)
|
|
print("Import complete!")
|
|
print("=" * 60)
|
|
|
|
finally:
|
|
mysql_conn.close()
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|