Flask backend with Vue 3 frontend for shop floor machine management. Includes database schema export for MySQL shopdb_flask database. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
341 lines
11 KiB
Python
341 lines
11 KiB
Python
#!/usr/bin/env python3
|
|
"""Import applications data from original ShopDB MySQL database."""
|
|
|
|
import os
|
|
import sys
|
|
import pymysql
|
|
from dotenv import load_dotenv
|
|
|
|
# Add project to path
|
|
sys.path.insert(0, '/home/camp/projects/shopdb-flask')
|
|
|
|
# Load environment variables
|
|
load_dotenv('/home/camp/projects/shopdb-flask/.env')
|
|
|
|
# Source MySQL connection (original shopdb)
|
|
MYSQL_CONFIG = {
|
|
'host': '127.0.0.1',
|
|
'port': 3306,
|
|
'user': 'root',
|
|
'password': 'rootpassword',
|
|
'database': 'shopdb',
|
|
'charset': 'utf8mb4',
|
|
'cursorclass': pymysql.cursors.DictCursor
|
|
}
|
|
|
|
|
|
def get_mysql_connection():
|
|
"""Get MySQL connection to source database."""
|
|
return pymysql.connect(**MYSQL_CONFIG)
|
|
|
|
|
|
def to_bool(val):
|
|
"""Convert MySQL bit field to Python bool."""
|
|
if val is None:
|
|
return False
|
|
if isinstance(val, bytes):
|
|
return val != b'\x00'
|
|
return bool(val)
|
|
|
|
|
|
def import_appowners(mysql_conn, db, AppOwner):
|
|
"""Import app owners from MySQL."""
|
|
print("Importing app owners...")
|
|
cursor = mysql_conn.cursor()
|
|
cursor.execute("SELECT * FROM appowners WHERE isactive = 1")
|
|
owners = cursor.fetchall()
|
|
|
|
count = 0
|
|
for o in owners:
|
|
existing = AppOwner.query.filter_by(appownerid=o['appownerid']).first()
|
|
if not existing:
|
|
owner = AppOwner(
|
|
appownerid=o['appownerid'],
|
|
appowner=o['appowner'],
|
|
sso=o.get('sso'),
|
|
isactive=True
|
|
)
|
|
db.session.add(owner)
|
|
count += 1
|
|
|
|
db.session.commit()
|
|
print(f" Imported {count} app owners")
|
|
return count
|
|
|
|
|
|
def import_supportteams(mysql_conn, db, SupportTeam):
|
|
"""Import support teams from MySQL."""
|
|
print("Importing support teams...")
|
|
cursor = mysql_conn.cursor()
|
|
# Note: source table has typo 'supporteamid' instead of 'supportteamid'
|
|
cursor.execute("SELECT supporteamid as supportteamid, teamname, teamurl, appownerid, isactive FROM supportteams WHERE isactive = 1")
|
|
teams = cursor.fetchall()
|
|
|
|
count = 0
|
|
for t in teams:
|
|
existing = SupportTeam.query.filter_by(supportteamid=t['supportteamid']).first()
|
|
if not existing:
|
|
team = SupportTeam(
|
|
supportteamid=t['supportteamid'],
|
|
teamname=t['teamname'],
|
|
teamurl=t.get('teamurl'),
|
|
appownerid=t.get('appownerid'),
|
|
isactive=True
|
|
)
|
|
db.session.add(team)
|
|
count += 1
|
|
|
|
db.session.commit()
|
|
print(f" Imported {count} support teams")
|
|
return count
|
|
|
|
|
|
def import_applications(mysql_conn, db, Application):
|
|
"""Import applications from MySQL."""
|
|
print("Importing applications...")
|
|
cursor = mysql_conn.cursor()
|
|
cursor.execute("""
|
|
SELECT a.*, st.supporteamid as supportteamid
|
|
FROM applications a
|
|
LEFT JOIN supportteams st ON a.supportteamid = st.supporteamid
|
|
WHERE a.isactive = 1
|
|
""")
|
|
apps = cursor.fetchall()
|
|
|
|
count = 0
|
|
skipped_dupes = 0
|
|
seen_names = set()
|
|
for a in apps:
|
|
# Skip duplicate appnames (source has some)
|
|
if a['appname'] in seen_names:
|
|
skipped_dupes += 1
|
|
continue
|
|
seen_names.add(a['appname'])
|
|
|
|
existing = Application.query.filter_by(appname=a['appname']).first()
|
|
if not existing:
|
|
app = Application(
|
|
appid=a['appid'],
|
|
appname=a['appname'],
|
|
appdescription=a.get('appdescription'),
|
|
supportteamid=a.get('supportteamid'),
|
|
isinstallable=to_bool(a.get('isinstallable')),
|
|
applicationnotes=a.get('applicationnotes'),
|
|
installpath=a.get('installpath'),
|
|
applicationlink=a.get('applicationlink'),
|
|
documentationpath=a.get('documentationpath'),
|
|
ishidden=to_bool(a.get('ishidden')),
|
|
isprinter=to_bool(a.get('isprinter')),
|
|
islicenced=to_bool(a.get('islicenced')),
|
|
image=a.get('image'),
|
|
isactive=True
|
|
)
|
|
db.session.add(app)
|
|
count += 1
|
|
|
|
db.session.commit()
|
|
print(f" Imported {count} applications (skipped {skipped_dupes} duplicates)")
|
|
return count
|
|
|
|
|
|
def import_appversions(mysql_conn, db, AppVersion, Application):
|
|
"""Import application versions from MySQL."""
|
|
print("Importing app versions...")
|
|
cursor = mysql_conn.cursor()
|
|
cursor.execute("""
|
|
SELECT av.*, a.appname
|
|
FROM appversions av
|
|
JOIN applications a ON av.appid = a.appid
|
|
WHERE av.isactive = 1
|
|
""")
|
|
versions = cursor.fetchall()
|
|
|
|
# Build app lookup by name (since we may have different IDs)
|
|
app_map = {a.appname: a.appid for a in Application.query.all()}
|
|
|
|
count = 0
|
|
skipped = 0
|
|
for v in versions:
|
|
# Look up app by name
|
|
new_appid = app_map.get(v['appname'])
|
|
if not new_appid:
|
|
skipped += 1
|
|
continue
|
|
|
|
existing = AppVersion.query.filter_by(
|
|
appid=new_appid,
|
|
version=v['version']
|
|
).first()
|
|
|
|
if not existing:
|
|
version = AppVersion(
|
|
appversionid=v['appversionid'],
|
|
appid=new_appid,
|
|
version=v['version'],
|
|
releasedate=v.get('releasedate'),
|
|
notes=v.get('notes'),
|
|
isactive=True
|
|
)
|
|
db.session.add(version)
|
|
count += 1
|
|
|
|
db.session.commit()
|
|
print(f" Imported {count} app versions (skipped {skipped})")
|
|
return count
|
|
|
|
|
|
def import_installedapps(mysql_conn, db, InstalledApp, Machine, Application, AppVersion):
|
|
"""Import installed apps (PC-application relationships) from MySQL."""
|
|
print("Importing installed apps...")
|
|
cursor = mysql_conn.cursor()
|
|
cursor.execute("""
|
|
SELECT ia.*, m.machinenumber, a.appname, av.version as version_str
|
|
FROM installedapps ia
|
|
JOIN machines m ON ia.machineid = m.machineid
|
|
JOIN applications a ON ia.appid = a.appid
|
|
LEFT JOIN appversions av ON ia.appversionid = av.appversionid
|
|
WHERE ia.isactive = 1
|
|
""")
|
|
installed = cursor.fetchall()
|
|
|
|
# Build lookup for machines by machinenumber (since IDs may differ)
|
|
machine_map = {m.machinenumber: m.machineid for m in Machine.query.all()}
|
|
# Build app lookup by name
|
|
app_map = {a.appname: a.appid for a in Application.query.all()}
|
|
# Build version lookup by (appid, version)
|
|
version_map = {(v.appid, v.version): v.appversionid for v in AppVersion.query.all()}
|
|
|
|
count = 0
|
|
skipped = 0
|
|
for i in installed:
|
|
# Look up machine by machinenumber
|
|
new_machineid = machine_map.get(i['machinenumber'])
|
|
if not new_machineid:
|
|
skipped += 1
|
|
continue
|
|
|
|
# Look up app by name
|
|
new_appid = app_map.get(i['appname'])
|
|
if not new_appid:
|
|
skipped += 1
|
|
continue
|
|
|
|
# Look up version if exists
|
|
new_versionid = None
|
|
if i.get('version_str'):
|
|
new_versionid = version_map.get((new_appid, i['version_str']))
|
|
|
|
# Check if already exists
|
|
existing = InstalledApp.query.filter_by(
|
|
machineid=new_machineid,
|
|
appid=new_appid
|
|
).first()
|
|
|
|
if not existing:
|
|
installed_app = InstalledApp(
|
|
machineid=new_machineid,
|
|
appid=new_appid,
|
|
appversionid=new_versionid,
|
|
isactive=True
|
|
)
|
|
db.session.add(installed_app)
|
|
count += 1
|
|
elif new_versionid and not existing.appversionid:
|
|
# Update existing with version if we now have it
|
|
existing.appversionid = new_versionid
|
|
count += 1
|
|
|
|
db.session.commit()
|
|
print(f" Imported {count} installed apps (skipped {skipped})")
|
|
return count
|
|
|
|
|
|
def import_knowledgebase(mysql_conn, db, KnowledgeBase, Application):
|
|
"""Import knowledge base articles from MySQL."""
|
|
print("Importing knowledge base articles...")
|
|
cursor = mysql_conn.cursor()
|
|
cursor.execute("""
|
|
SELECT kb.*, a.appname
|
|
FROM knowledgebase kb
|
|
LEFT JOIN applications a ON kb.appid = a.appid
|
|
WHERE kb.isactive = 1
|
|
""")
|
|
articles = cursor.fetchall()
|
|
|
|
# Build app lookup by name
|
|
app_map = {a.appname: a.appid for a in Application.query.all()}
|
|
|
|
count = 0
|
|
skipped = 0
|
|
for article in articles:
|
|
existing = KnowledgeBase.query.filter_by(linkid=article['linkid']).first()
|
|
if not existing:
|
|
# Look up app by name if exists
|
|
new_appid = None
|
|
if article.get('appname'):
|
|
new_appid = app_map.get(article['appname'])
|
|
|
|
kb = KnowledgeBase(
|
|
linkid=article['linkid'],
|
|
appid=new_appid,
|
|
shortdescription=article.get('shortdescription'),
|
|
linkurl=article.get('linkurl'),
|
|
keywords=article.get('keywords'),
|
|
clicks=article.get('clicks') or 0,
|
|
isactive=True
|
|
)
|
|
db.session.add(kb)
|
|
count += 1
|
|
else:
|
|
skipped += 1
|
|
|
|
db.session.commit()
|
|
print(f" Imported {count} knowledge base articles (skipped {skipped} existing)")
|
|
return count
|
|
|
|
|
|
def main():
|
|
"""Main import function."""
|
|
print("=" * 60)
|
|
print("ShopDB Applications Import")
|
|
print("=" * 60)
|
|
|
|
from shopdb import create_app
|
|
from shopdb.extensions import db
|
|
from shopdb.core.models import Application, AppOwner, SupportTeam, AppVersion, InstalledApp, Machine, KnowledgeBase
|
|
|
|
app = create_app()
|
|
|
|
with app.app_context():
|
|
# Create any missing tables
|
|
print("\nCreating tables if needed...")
|
|
db.create_all()
|
|
|
|
print("\nConnecting to MySQL...")
|
|
mysql_conn = get_mysql_connection()
|
|
print(" Connected!")
|
|
|
|
try:
|
|
print("\n--- Application Data ---")
|
|
import_appowners(mysql_conn, db, AppOwner)
|
|
import_supportteams(mysql_conn, db, SupportTeam)
|
|
import_applications(mysql_conn, db, Application)
|
|
import_appversions(mysql_conn, db, AppVersion, Application)
|
|
|
|
print("\n--- PC-Application Relationships ---")
|
|
import_installedapps(mysql_conn, db, InstalledApp, Machine, Application, AppVersion)
|
|
|
|
print("\n--- Knowledge Base ---")
|
|
import_knowledgebase(mysql_conn, db, KnowledgeBase, Application)
|
|
|
|
print("\n" + "=" * 60)
|
|
print("Import complete!")
|
|
print("=" * 60)
|
|
|
|
finally:
|
|
mysql_conn.close()
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|