Fix NinjaOne API integration and patch parsing

- Use us2.ninjarmm.com instance (not app.ninjarmm.com)
- Handle results wrapper in API responses
- Parse impact field for severity (critical/high/medium/low)
- Fetch all patches, not just pending

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
cproudlock
2025-12-18 12:52:10 -05:00
parent 1e75818310
commit d28dd504f2
2 changed files with 46 additions and 39 deletions

View File

@@ -85,21 +85,27 @@ class NinjaOneAPI:
result = self._request('GET', f'/device/{device_id}/software')
return result if result else []
def get_os_patches(self, status: str = 'pending') -> List[Dict]:
def get_os_patches(self, status: str = None) -> List[Dict]:
"""
Get OS patches report
status: 'pending', 'failed', 'rejected', 'installed'
status: 'pending', 'failed', 'rejected', 'installed' (None = all)
"""
result = self._request('GET', f'/queries/os-patches', {'status': status})
return result if result else []
params = {'status': status} if status else {}
result = self._request('GET', '/queries/os-patches', params)
if result and isinstance(result, dict) and 'results' in result:
return result['results']
return result if isinstance(result, list) else []
def get_software_patches(self, status: str = 'pending') -> List[Dict]:
def get_software_patches(self, status: str = None) -> List[Dict]:
"""
Get software/third-party patches report
status: 'pending', 'failed', 'rejected', 'approved'
status: 'pending', 'failed', 'rejected', 'approved' (None = all)
"""
result = self._request('GET', f'/queries/software-patches', {'status': status})
return result if result else []
params = {'status': status} if status else {}
result = self._request('GET', '/queries/software-patches', params)
if result and isinstance(result, dict) and 'results' in result:
return result['results']
return result if isinstance(result, list) else []
def get_vulnerability_scan_groups(self) -> List[Dict]:
"""Get vulnerability scan groups"""
@@ -130,15 +136,15 @@ class CVEAnalyzer:
self.devices = self.api.get_devices()
print(f" Found {len(self.devices)} devices")
# Get pending OS patches
print("Fetching pending OS patches...")
os_patches = self.api.get_os_patches('pending')
print(f" Found {len(os_patches)} pending OS patches")
# Get all OS patches
print("Fetching OS patches...")
os_patches = self.api.get_os_patches()
print(f" Found {len(os_patches)} OS patches")
# Get pending software patches
print("Fetching pending software patches...")
sw_patches = self.api.get_software_patches('pending')
print(f" Found {len(sw_patches)} pending software patches")
# Get all software patches
print("Fetching software patches...")
sw_patches = self.api.get_software_patches()
print(f" Found {len(sw_patches)} software patches")
return {
'organizations': self.organizations,
@@ -148,27 +154,29 @@ class CVEAnalyzer:
}
def analyze_by_severity(self, patches: List[Dict]) -> Dict[str, List]:
"""Group patches by CVSS severity"""
"""Group patches by impact/severity"""
severity_groups = {
'critical': [], # CVSS 9.0-10.0
'high': [], # CVSS 7.0-8.9
'medium': [], # CVSS 4.0-6.9
'low': [], # CVSS 0.1-3.9
'unknown': [] # No CVSS score
'critical': [],
'high': [],
'medium': [],
'low': [],
'unknown': []
}
for patch in patches:
cvss = patch.get('cvssScore') or patch.get('cvss_score')
if cvss is None:
severity_groups['unknown'].append(patch)
elif cvss >= 9.0:
if not isinstance(patch, dict):
continue
impact = patch.get('impact', 'unknown').lower()
if impact == 'critical':
severity_groups['critical'].append(patch)
elif cvss >= 7.0:
elif impact == 'high':
severity_groups['high'].append(patch)
elif cvss >= 4.0:
elif impact == 'medium':
severity_groups['medium'].append(patch)
else:
elif impact == 'low':
severity_groups['low'].append(patch)
else:
severity_groups['unknown'].append(patch)
return severity_groups
@@ -193,8 +201,8 @@ class CVEAnalyzer:
# Summary
report.append(f"\nORGANIZATIONS: {len(data['organizations'])}")
report.append(f"TOTAL DEVICES: {len(data['devices'])}")
report.append(f"PENDING OS PATCHES: {len(data['os_patches'])}")
report.append(f"PENDING SOFTWARE PATCHES: {len(data['software_patches'])}")
report.append(f"OS PATCHES: {len(data['os_patches'])}")
report.append(f"SOFTWARE PATCHES: {len(data['software_patches'])}")
# Analyze OS patches by severity
if data['os_patches']:
@@ -218,16 +226,15 @@ class CVEAnalyzer:
report.append("=" * 60)
all_patches = data['os_patches'] + data['software_patches']
critical = [p for p in all_patches if (p.get('cvssScore') or p.get('cvss_score') or 0) >= 9.0]
critical = [p for p in all_patches if isinstance(p, dict) and p.get('impact', '').lower() == 'critical']
if critical:
for patch in critical[:20]: # Top 20
cve = patch.get('cve', patch.get('cveId', 'N/A'))
name = patch.get('name', patch.get('patchName', 'Unknown'))
cvss = patch.get('cvssScore', patch.get('cvss_score', 'N/A'))
devices = patch.get('deviceCount', patch.get('affectedDevices', 'N/A'))
report.append(f" - {cve} (CVSS: {cvss}) - {name}")
report.append(f" Affected devices: {devices}")
title = patch.get('title', 'Unknown')
status = patch.get('status', 'N/A')
device_id = patch.get('deviceId', 'N/A')
report.append(f" - {title}")
report.append(f" Status: {status}, Device ID: {device_id}")
else:
report.append(" No critical vulnerabilities found")