- ninjaone.py: API client and CVE analyzer - Queries organizations, devices, OS/software patches - Prioritizes CVEs by CVSS severity - Generates remediation reports 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
265 lines
9.4 KiB
Python
265 lines
9.4 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
NinjaOne CVE Resolution Tool
|
|
Queries NinjaOne API to aggregate CVE/vulnerability data across organizations
|
|
and prioritize remediation efforts.
|
|
"""
|
|
|
|
import requests
|
|
from typing import Dict, List, Any, Optional
|
|
from datetime import datetime
|
|
import json
|
|
|
|
try:
|
|
from config import NINJAONE_CONFIG
|
|
except ImportError:
|
|
print("Error: Copy config.example.py to config.py and add your credentials")
|
|
exit(1)
|
|
|
|
|
|
class NinjaOneAPI:
|
|
"""NinjaOne REST API client for CVE management"""
|
|
|
|
def __init__(self):
|
|
self.config = NINJAONE_CONFIG
|
|
self.base_url = self.config['base_url']
|
|
self.access_token = None
|
|
self.token_expires = None
|
|
|
|
def authenticate(self) -> bool:
|
|
"""Authenticate using OAuth2 client credentials flow"""
|
|
try:
|
|
response = requests.post(
|
|
self.config['token_url'],
|
|
data={
|
|
'grant_type': 'client_credentials',
|
|
'client_id': self.config['client_id'],
|
|
'client_secret': self.config['client_secret'],
|
|
'scope': self.config['scope']
|
|
}
|
|
)
|
|
response.raise_for_status()
|
|
token_data = response.json()
|
|
self.access_token = token_data['access_token']
|
|
print("Successfully authenticated with NinjaOne API")
|
|
return True
|
|
except requests.exceptions.RequestException as e:
|
|
print(f"Authentication failed: {e}")
|
|
return False
|
|
|
|
def _request(self, method: str, endpoint: str, params: Dict = None) -> Optional[Dict]:
|
|
"""Make authenticated API request"""
|
|
if not self.access_token:
|
|
if not self.authenticate():
|
|
return None
|
|
|
|
headers = {
|
|
'Authorization': f'Bearer {self.access_token}',
|
|
'Content-Type': 'application/json'
|
|
}
|
|
url = f"{self.base_url}/api/v2{endpoint}"
|
|
|
|
try:
|
|
response = requests.request(method, url, headers=headers, params=params)
|
|
response.raise_for_status()
|
|
return response.json()
|
|
except requests.exceptions.RequestException as e:
|
|
print(f"API request failed: {e}")
|
|
return None
|
|
|
|
def get_organizations(self) -> List[Dict]:
|
|
"""Get all organizations"""
|
|
result = self._request('GET', '/organizations')
|
|
return result if result else []
|
|
|
|
def get_devices(self, org_id: int = None) -> List[Dict]:
|
|
"""Get all devices, optionally filtered by organization"""
|
|
params = {}
|
|
if org_id:
|
|
params['org'] = org_id
|
|
result = self._request('GET', '/devices-detailed', params=params)
|
|
return result if result else []
|
|
|
|
def get_device_software(self, device_id: int) -> List[Dict]:
|
|
"""Get software installed on a device"""
|
|
result = self._request('GET', f'/device/{device_id}/software')
|
|
return result if result else []
|
|
|
|
def get_os_patches(self, status: str = 'pending') -> List[Dict]:
|
|
"""
|
|
Get OS patches report
|
|
status: 'pending', 'failed', 'rejected', 'installed'
|
|
"""
|
|
result = self._request('GET', f'/queries/os-patches', {'status': status})
|
|
return result if result else []
|
|
|
|
def get_software_patches(self, status: str = 'pending') -> List[Dict]:
|
|
"""
|
|
Get software/third-party patches report
|
|
status: 'pending', 'failed', 'rejected', 'approved'
|
|
"""
|
|
result = self._request('GET', f'/queries/software-patches', {'status': status})
|
|
return result if result else []
|
|
|
|
def get_vulnerability_scan_groups(self) -> List[Dict]:
|
|
"""Get vulnerability scan groups"""
|
|
result = self._request('GET', '/vulnerability/scan-groups')
|
|
return result if result else []
|
|
|
|
|
|
class CVEAnalyzer:
|
|
"""Analyze and prioritize CVE remediation across organizations"""
|
|
|
|
def __init__(self, api: NinjaOneAPI):
|
|
self.api = api
|
|
self.organizations = []
|
|
self.devices = []
|
|
self.vulnerabilities = []
|
|
|
|
def collect_data(self):
|
|
"""Collect all vulnerability-related data from NinjaOne"""
|
|
print("\n=== Collecting Data from NinjaOne ===\n")
|
|
|
|
# Get organizations
|
|
print("Fetching organizations...")
|
|
self.organizations = self.api.get_organizations()
|
|
print(f" Found {len(self.organizations)} organizations")
|
|
|
|
# Get all devices
|
|
print("Fetching devices...")
|
|
self.devices = self.api.get_devices()
|
|
print(f" Found {len(self.devices)} devices")
|
|
|
|
# Get pending OS patches
|
|
print("Fetching pending OS patches...")
|
|
os_patches = self.api.get_os_patches('pending')
|
|
print(f" Found {len(os_patches)} pending OS patches")
|
|
|
|
# Get pending software patches
|
|
print("Fetching pending software patches...")
|
|
sw_patches = self.api.get_software_patches('pending')
|
|
print(f" Found {len(sw_patches)} pending software patches")
|
|
|
|
return {
|
|
'organizations': self.organizations,
|
|
'devices': self.devices,
|
|
'os_patches': os_patches,
|
|
'software_patches': sw_patches
|
|
}
|
|
|
|
def analyze_by_severity(self, patches: List[Dict]) -> Dict[str, List]:
|
|
"""Group patches by CVSS severity"""
|
|
severity_groups = {
|
|
'critical': [], # CVSS 9.0-10.0
|
|
'high': [], # CVSS 7.0-8.9
|
|
'medium': [], # CVSS 4.0-6.9
|
|
'low': [], # CVSS 0.1-3.9
|
|
'unknown': [] # No CVSS score
|
|
}
|
|
|
|
for patch in patches:
|
|
cvss = patch.get('cvssScore') or patch.get('cvss_score')
|
|
if cvss is None:
|
|
severity_groups['unknown'].append(patch)
|
|
elif cvss >= 9.0:
|
|
severity_groups['critical'].append(patch)
|
|
elif cvss >= 7.0:
|
|
severity_groups['high'].append(patch)
|
|
elif cvss >= 4.0:
|
|
severity_groups['medium'].append(patch)
|
|
else:
|
|
severity_groups['low'].append(patch)
|
|
|
|
return severity_groups
|
|
|
|
def analyze_by_organization(self, patches: List[Dict]) -> Dict[str, List]:
|
|
"""Group patches by organization"""
|
|
org_groups = {}
|
|
for patch in patches:
|
|
org_name = patch.get('organizationName', 'Unknown')
|
|
if org_name not in org_groups:
|
|
org_groups[org_name] = []
|
|
org_groups[org_name].append(patch)
|
|
return org_groups
|
|
|
|
def generate_report(self, data: Dict) -> str:
|
|
"""Generate CVE remediation priority report"""
|
|
report = []
|
|
report.append("=" * 60)
|
|
report.append("CVE REMEDIATION PRIORITY REPORT")
|
|
report.append(f"Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
|
|
report.append("=" * 60)
|
|
|
|
# Summary
|
|
report.append(f"\nORGANIZATIONS: {len(data['organizations'])}")
|
|
report.append(f"TOTAL DEVICES: {len(data['devices'])}")
|
|
report.append(f"PENDING OS PATCHES: {len(data['os_patches'])}")
|
|
report.append(f"PENDING SOFTWARE PATCHES: {len(data['software_patches'])}")
|
|
|
|
# Analyze OS patches by severity
|
|
if data['os_patches']:
|
|
report.append("\n--- OS PATCHES BY SEVERITY ---")
|
|
os_severity = self.analyze_by_severity(data['os_patches'])
|
|
for level, patches in os_severity.items():
|
|
if patches:
|
|
report.append(f" {level.upper()}: {len(patches)}")
|
|
|
|
# Analyze software patches by severity
|
|
if data['software_patches']:
|
|
report.append("\n--- SOFTWARE PATCHES BY SEVERITY ---")
|
|
sw_severity = self.analyze_by_severity(data['software_patches'])
|
|
for level, patches in sw_severity.items():
|
|
if patches:
|
|
report.append(f" {level.upper()}: {len(patches)}")
|
|
|
|
# Critical items requiring immediate attention
|
|
report.append("\n" + "=" * 60)
|
|
report.append("CRITICAL ITEMS REQUIRING IMMEDIATE ATTENTION")
|
|
report.append("=" * 60)
|
|
|
|
all_patches = data['os_patches'] + data['software_patches']
|
|
critical = [p for p in all_patches if (p.get('cvssScore') or p.get('cvss_score') or 0) >= 9.0]
|
|
|
|
if critical:
|
|
for patch in critical[:20]: # Top 20
|
|
cve = patch.get('cve', patch.get('cveId', 'N/A'))
|
|
name = patch.get('name', patch.get('patchName', 'Unknown'))
|
|
cvss = patch.get('cvssScore', patch.get('cvss_score', 'N/A'))
|
|
devices = patch.get('deviceCount', patch.get('affectedDevices', 'N/A'))
|
|
report.append(f" - {cve} (CVSS: {cvss}) - {name}")
|
|
report.append(f" Affected devices: {devices}")
|
|
else:
|
|
report.append(" No critical vulnerabilities found")
|
|
|
|
return "\n".join(report)
|
|
|
|
|
|
def main():
|
|
"""Main entry point"""
|
|
print("NinjaOne CVE Resolution Tool")
|
|
print("-" * 40)
|
|
|
|
# Initialize API client
|
|
api = NinjaOneAPI()
|
|
if not api.authenticate():
|
|
return
|
|
|
|
# Create analyzer and collect data
|
|
analyzer = CVEAnalyzer(api)
|
|
data = analyzer.collect_data()
|
|
|
|
# Generate report
|
|
report = analyzer.generate_report(data)
|
|
print(report)
|
|
|
|
# Save report
|
|
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
|
|
filename = f"cve_report_{timestamp}.txt"
|
|
with open(filename, 'w') as f:
|
|
f.write(report)
|
|
print(f"\nReport saved to: {filename}")
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|