Files
cve/ninjaone.py
cproudlock d28dd504f2 Fix NinjaOne API integration and patch parsing
- Use us2.ninjarmm.com instance (not app.ninjarmm.com)
- Handle results wrapper in API responses
- Parse impact field for severity (critical/high/medium/low)
- Fetch all patches, not just pending

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-18 12:52:10 -05:00

272 lines
9.5 KiB
Python

#!/usr/bin/env python3
"""
NinjaOne CVE Resolution Tool
Queries NinjaOne API to aggregate CVE/vulnerability data across organizations
and prioritize remediation efforts.
"""
import requests
from typing import Dict, List, Any, Optional
from datetime import datetime
import json
try:
from config import NINJAONE_CONFIG
except ImportError:
print("Error: Copy config.example.py to config.py and add your credentials")
exit(1)
class NinjaOneAPI:
"""NinjaOne REST API client for CVE management"""
def __init__(self):
self.config = NINJAONE_CONFIG
self.base_url = self.config['base_url']
self.access_token = None
self.token_expires = None
def authenticate(self) -> bool:
"""Authenticate using OAuth2 client credentials flow"""
try:
response = requests.post(
self.config['token_url'],
data={
'grant_type': 'client_credentials',
'client_id': self.config['client_id'],
'client_secret': self.config['client_secret'],
'scope': self.config['scope']
}
)
response.raise_for_status()
token_data = response.json()
self.access_token = token_data['access_token']
print("Successfully authenticated with NinjaOne API")
return True
except requests.exceptions.RequestException as e:
print(f"Authentication failed: {e}")
return False
def _request(self, method: str, endpoint: str, params: Dict = None) -> Optional[Dict]:
"""Make authenticated API request"""
if not self.access_token:
if not self.authenticate():
return None
headers = {
'Authorization': f'Bearer {self.access_token}',
'Content-Type': 'application/json'
}
url = f"{self.base_url}/api/v2{endpoint}"
try:
response = requests.request(method, url, headers=headers, params=params)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
print(f"API request failed: {e}")
return None
def get_organizations(self) -> List[Dict]:
"""Get all organizations"""
result = self._request('GET', '/organizations')
return result if result else []
def get_devices(self, org_id: int = None) -> List[Dict]:
"""Get all devices, optionally filtered by organization"""
params = {}
if org_id:
params['org'] = org_id
result = self._request('GET', '/devices-detailed', params=params)
return result if result else []
def get_device_software(self, device_id: int) -> List[Dict]:
"""Get software installed on a device"""
result = self._request('GET', f'/device/{device_id}/software')
return result if result else []
def get_os_patches(self, status: str = None) -> List[Dict]:
"""
Get OS patches report
status: 'pending', 'failed', 'rejected', 'installed' (None = all)
"""
params = {'status': status} if status else {}
result = self._request('GET', '/queries/os-patches', params)
if result and isinstance(result, dict) and 'results' in result:
return result['results']
return result if isinstance(result, list) else []
def get_software_patches(self, status: str = None) -> List[Dict]:
"""
Get software/third-party patches report
status: 'pending', 'failed', 'rejected', 'approved' (None = all)
"""
params = {'status': status} if status else {}
result = self._request('GET', '/queries/software-patches', params)
if result and isinstance(result, dict) and 'results' in result:
return result['results']
return result if isinstance(result, list) else []
def get_vulnerability_scan_groups(self) -> List[Dict]:
"""Get vulnerability scan groups"""
result = self._request('GET', '/vulnerability/scan-groups')
return result if result else []
class CVEAnalyzer:
"""Analyze and prioritize CVE remediation across organizations"""
def __init__(self, api: NinjaOneAPI):
self.api = api
self.organizations = []
self.devices = []
self.vulnerabilities = []
def collect_data(self):
"""Collect all vulnerability-related data from NinjaOne"""
print("\n=== Collecting Data from NinjaOne ===\n")
# Get organizations
print("Fetching organizations...")
self.organizations = self.api.get_organizations()
print(f" Found {len(self.organizations)} organizations")
# Get all devices
print("Fetching devices...")
self.devices = self.api.get_devices()
print(f" Found {len(self.devices)} devices")
# Get all OS patches
print("Fetching OS patches...")
os_patches = self.api.get_os_patches()
print(f" Found {len(os_patches)} OS patches")
# Get all software patches
print("Fetching software patches...")
sw_patches = self.api.get_software_patches()
print(f" Found {len(sw_patches)} software patches")
return {
'organizations': self.organizations,
'devices': self.devices,
'os_patches': os_patches,
'software_patches': sw_patches
}
def analyze_by_severity(self, patches: List[Dict]) -> Dict[str, List]:
"""Group patches by impact/severity"""
severity_groups = {
'critical': [],
'high': [],
'medium': [],
'low': [],
'unknown': []
}
for patch in patches:
if not isinstance(patch, dict):
continue
impact = patch.get('impact', 'unknown').lower()
if impact == 'critical':
severity_groups['critical'].append(patch)
elif impact == 'high':
severity_groups['high'].append(patch)
elif impact == 'medium':
severity_groups['medium'].append(patch)
elif impact == 'low':
severity_groups['low'].append(patch)
else:
severity_groups['unknown'].append(patch)
return severity_groups
def analyze_by_organization(self, patches: List[Dict]) -> Dict[str, List]:
"""Group patches by organization"""
org_groups = {}
for patch in patches:
org_name = patch.get('organizationName', 'Unknown')
if org_name not in org_groups:
org_groups[org_name] = []
org_groups[org_name].append(patch)
return org_groups
def generate_report(self, data: Dict) -> str:
"""Generate CVE remediation priority report"""
report = []
report.append("=" * 60)
report.append("CVE REMEDIATION PRIORITY REPORT")
report.append(f"Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
report.append("=" * 60)
# Summary
report.append(f"\nORGANIZATIONS: {len(data['organizations'])}")
report.append(f"TOTAL DEVICES: {len(data['devices'])}")
report.append(f"OS PATCHES: {len(data['os_patches'])}")
report.append(f"SOFTWARE PATCHES: {len(data['software_patches'])}")
# Analyze OS patches by severity
if data['os_patches']:
report.append("\n--- OS PATCHES BY SEVERITY ---")
os_severity = self.analyze_by_severity(data['os_patches'])
for level, patches in os_severity.items():
if patches:
report.append(f" {level.upper()}: {len(patches)}")
# Analyze software patches by severity
if data['software_patches']:
report.append("\n--- SOFTWARE PATCHES BY SEVERITY ---")
sw_severity = self.analyze_by_severity(data['software_patches'])
for level, patches in sw_severity.items():
if patches:
report.append(f" {level.upper()}: {len(patches)}")
# Critical items requiring immediate attention
report.append("\n" + "=" * 60)
report.append("CRITICAL ITEMS REQUIRING IMMEDIATE ATTENTION")
report.append("=" * 60)
all_patches = data['os_patches'] + data['software_patches']
critical = [p for p in all_patches if isinstance(p, dict) and p.get('impact', '').lower() == 'critical']
if critical:
for patch in critical[:20]: # Top 20
title = patch.get('title', 'Unknown')
status = patch.get('status', 'N/A')
device_id = patch.get('deviceId', 'N/A')
report.append(f" - {title}")
report.append(f" Status: {status}, Device ID: {device_id}")
else:
report.append(" No critical vulnerabilities found")
return "\n".join(report)
def main():
"""Main entry point"""
print("NinjaOne CVE Resolution Tool")
print("-" * 40)
# Initialize API client
api = NinjaOneAPI()
if not api.authenticate():
return
# Create analyzer and collect data
analyzer = CVEAnalyzer(api)
data = analyzer.collect_data()
# Generate report
report = analyzer.generate_report(data)
print(report)
# Save report
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
filename = f"cve_report_{timestamp}.txt"
with open(filename, 'w') as f:
f.write(report)
print(f"\nReport saved to: {filename}")
if __name__ == '__main__':
main()