#!/usr/bin/env python3 """ NinjaOne CVE Resolution Tool Queries NinjaOne API to aggregate CVE/vulnerability data across organizations and prioritize remediation efforts. """ import requests from typing import Dict, List, Any, Optional from datetime import datetime import json try: from config import NINJAONE_CONFIG except ImportError: print("Error: Copy config.example.py to config.py and add your credentials") exit(1) class NinjaOneAPI: """NinjaOne REST API client for CVE management""" def __init__(self): self.config = NINJAONE_CONFIG self.base_url = self.config['base_url'] self.access_token = None self.token_expires = None def authenticate(self) -> bool: """Authenticate using OAuth2 client credentials flow""" try: response = requests.post( self.config['token_url'], data={ 'grant_type': 'client_credentials', 'client_id': self.config['client_id'], 'client_secret': self.config['client_secret'], 'scope': self.config['scope'] } ) response.raise_for_status() token_data = response.json() self.access_token = token_data['access_token'] print("Successfully authenticated with NinjaOne API") return True except requests.exceptions.RequestException as e: print(f"Authentication failed: {e}") return False def _request(self, method: str, endpoint: str, params: Dict = None) -> Optional[Dict]: """Make authenticated API request""" if not self.access_token: if not self.authenticate(): return None headers = { 'Authorization': f'Bearer {self.access_token}', 'Content-Type': 'application/json' } url = f"{self.base_url}/api/v2{endpoint}" try: response = requests.request(method, url, headers=headers, params=params) response.raise_for_status() return response.json() except requests.exceptions.RequestException as e: print(f"API request failed: {e}") return None def get_organizations(self) -> List[Dict]: """Get all organizations""" result = self._request('GET', '/organizations') return result if result else [] def get_devices(self, org_id: int = None) -> List[Dict]: """Get all devices, optionally filtered by organization""" params = {} if org_id: params['org'] = org_id result = self._request('GET', '/devices-detailed', params=params) return result if result else [] def get_device_software(self, device_id: int) -> List[Dict]: """Get software installed on a device""" result = self._request('GET', f'/device/{device_id}/software') return result if result else [] def get_os_patches(self, status: str = None) -> List[Dict]: """ Get OS patches report status: 'pending', 'failed', 'rejected', 'installed' (None = all) """ params = {'status': status} if status else {} result = self._request('GET', '/queries/os-patches', params) if result and isinstance(result, dict) and 'results' in result: return result['results'] return result if isinstance(result, list) else [] def get_software_patches(self, status: str = None) -> List[Dict]: """ Get software/third-party patches report status: 'pending', 'failed', 'rejected', 'approved' (None = all) """ params = {'status': status} if status else {} result = self._request('GET', '/queries/software-patches', params) if result and isinstance(result, dict) and 'results' in result: return result['results'] return result if isinstance(result, list) else [] def get_vulnerability_scan_groups(self) -> List[Dict]: """Get vulnerability scan groups""" result = self._request('GET', '/vulnerability/scan-groups') return result if result else [] class CVEAnalyzer: """Analyze and prioritize CVE remediation across organizations""" def __init__(self, api: NinjaOneAPI): self.api = api self.organizations = [] self.devices = [] self.vulnerabilities = [] def collect_data(self): """Collect all vulnerability-related data from NinjaOne""" print("\n=== Collecting Data from NinjaOne ===\n") # Get organizations print("Fetching organizations...") self.organizations = self.api.get_organizations() print(f" Found {len(self.organizations)} organizations") # Get all devices print("Fetching devices...") self.devices = self.api.get_devices() print(f" Found {len(self.devices)} devices") # Get all OS patches print("Fetching OS patches...") os_patches = self.api.get_os_patches() print(f" Found {len(os_patches)} OS patches") # Get all software patches print("Fetching software patches...") sw_patches = self.api.get_software_patches() print(f" Found {len(sw_patches)} software patches") return { 'organizations': self.organizations, 'devices': self.devices, 'os_patches': os_patches, 'software_patches': sw_patches } def analyze_by_severity(self, patches: List[Dict]) -> Dict[str, List]: """Group patches by impact/severity""" severity_groups = { 'critical': [], 'high': [], 'medium': [], 'low': [], 'unknown': [] } for patch in patches: if not isinstance(patch, dict): continue impact = patch.get('impact', 'unknown').lower() if impact == 'critical': severity_groups['critical'].append(patch) elif impact == 'high': severity_groups['high'].append(patch) elif impact == 'medium': severity_groups['medium'].append(patch) elif impact == 'low': severity_groups['low'].append(patch) else: severity_groups['unknown'].append(patch) return severity_groups def analyze_by_organization(self, patches: List[Dict]) -> Dict[str, List]: """Group patches by organization""" org_groups = {} for patch in patches: org_name = patch.get('organizationName', 'Unknown') if org_name not in org_groups: org_groups[org_name] = [] org_groups[org_name].append(patch) return org_groups def generate_report(self, data: Dict) -> str: """Generate CVE remediation priority report""" report = [] report.append("=" * 60) report.append("CVE REMEDIATION PRIORITY REPORT") report.append(f"Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") report.append("=" * 60) # Summary report.append(f"\nORGANIZATIONS: {len(data['organizations'])}") report.append(f"TOTAL DEVICES: {len(data['devices'])}") report.append(f"OS PATCHES: {len(data['os_patches'])}") report.append(f"SOFTWARE PATCHES: {len(data['software_patches'])}") # Analyze OS patches by severity if data['os_patches']: report.append("\n--- OS PATCHES BY SEVERITY ---") os_severity = self.analyze_by_severity(data['os_patches']) for level, patches in os_severity.items(): if patches: report.append(f" {level.upper()}: {len(patches)}") # Analyze software patches by severity if data['software_patches']: report.append("\n--- SOFTWARE PATCHES BY SEVERITY ---") sw_severity = self.analyze_by_severity(data['software_patches']) for level, patches in sw_severity.items(): if patches: report.append(f" {level.upper()}: {len(patches)}") # Critical items requiring immediate attention report.append("\n" + "=" * 60) report.append("CRITICAL ITEMS REQUIRING IMMEDIATE ATTENTION") report.append("=" * 60) all_patches = data['os_patches'] + data['software_patches'] critical = [p for p in all_patches if isinstance(p, dict) and p.get('impact', '').lower() == 'critical'] if critical: for patch in critical[:20]: # Top 20 title = patch.get('title', 'Unknown') status = patch.get('status', 'N/A') device_id = patch.get('deviceId', 'N/A') report.append(f" - {title}") report.append(f" Status: {status}, Device ID: {device_id}") else: report.append(" No critical vulnerabilities found") return "\n".join(report) def main(): """Main entry point""" print("NinjaOne CVE Resolution Tool") print("-" * 40) # Initialize API client api = NinjaOneAPI() if not api.authenticate(): return # Create analyzer and collect data analyzer = CVEAnalyzer(api) data = analyzer.collect_data() # Generate report report = analyzer.generate_report(data) print(report) # Save report timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') filename = f"cve_report_{timestamp}.txt" with open(filename, 'w') as f: f.write(report) print(f"\nReport saved to: {filename}") if __name__ == '__main__': main()