From 1e75818310c58a0b8979a43a40334520e32e4b92 Mon Sep 17 00:00:00 2001 From: cproudlock Date: Thu, 18 Dec 2025 12:17:16 -0500 Subject: [PATCH] Add NinjaOne CVE resolution tool MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - ninjaone.py: API client and CVE analyzer - Queries organizations, devices, OS/software patches - Prioritizes CVEs by CVSS severity - Generates remediation reports 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --- .gitignore | 20 ++++ README.md | 52 ++++++++- config.example.py | 10 ++ ninjaone.py | 264 ++++++++++++++++++++++++++++++++++++++++++++++ requirements.txt | 1 + 5 files changed, 346 insertions(+), 1 deletion(-) create mode 100644 .gitignore create mode 100644 config.example.py create mode 100644 ninjaone.py create mode 100644 requirements.txt diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..89fa436 --- /dev/null +++ b/.gitignore @@ -0,0 +1,20 @@ +# Config with secrets +config.py + +# Python +__pycache__/ +*.py[cod] +*$py.class +*.so +.Python +venv/ +ENV/ + +# Reports +cve_report_*.txt +*.log + +# IDE +.idea/ +.vscode/ +*.swp diff --git a/README.md b/README.md index 142d237..0d907f1 100644 --- a/README.md +++ b/README.md @@ -1 +1,51 @@ -# CVE +# CVE Resolution Tool for NinjaOne + +Queries NinjaOne's REST API to aggregate CVE/vulnerability data across all organizations and prioritize remediation efforts. + +## Setup + +1. Copy config example and add your NinjaOne API credentials: + ```bash + cp config.example.py config.py + ``` + +2. Get API credentials from NinjaOne: + - Go to Administration > Apps > API + - Create a new API application (Client Credentials grant type) + - Copy the Client ID and Client Secret + +3. Install dependencies: + ```bash + pip install -r requirements.txt + ``` + +## Usage + +```bash +python ninjaone.py +``` + +## NinjaOne API Endpoints Used + +| Endpoint | Description | +|----------|-------------| +| `GET /api/v2/organizations` | List all organizations | +| `GET /api/v2/devices-detailed` | List all devices with details | +| `GET /api/v2/device/{id}/software` | Device software inventory | +| `GET /api/v2/queries/os-patches` | OS patches report | +| `GET /api/v2/queries/software-patches` | Third-party software patches | +| `GET /api/v2/vulnerability/scan-groups` | Vulnerability scan data | + +## Output + +Generates a prioritized CVE remediation report showing: +- Summary across all organizations +- Patches grouped by CVSS severity (Critical/High/Medium/Low) +- Critical items requiring immediate attention +- Affected device counts + +## Resources + +- [NinjaOne API Docs](https://app.ninjarmm.com/apidocs/) +- [NinjaOne API Reference](https://app.ninjarmm.com/apidocs-beta/core-resources) +- [Postman Collection](https://www.postman.com/ninjaone/ninjaone-api-workspace/collection/8gh1ujj/ninjaone-public-api-2-0) diff --git a/config.example.py b/config.example.py new file mode 100644 index 0000000..6c93853 --- /dev/null +++ b/config.example.py @@ -0,0 +1,10 @@ +# NinjaOne API Configuration +# Copy this to config.py and fill in your credentials + +NINJAONE_CONFIG = { + 'client_id': 'your_client_id', + 'client_secret': 'your_client_secret', + 'base_url': 'https://app.ninjarmm.com', # or your regional URL + 'token_url': 'https://app.ninjarmm.com/oauth/token', + 'scope': 'monitoring management' +} diff --git a/ninjaone.py b/ninjaone.py new file mode 100644 index 0000000..97edaae --- /dev/null +++ b/ninjaone.py @@ -0,0 +1,264 @@ +#!/usr/bin/env python3 +""" +NinjaOne CVE Resolution Tool +Queries NinjaOne API to aggregate CVE/vulnerability data across organizations +and prioritize remediation efforts. +""" + +import requests +from typing import Dict, List, Any, Optional +from datetime import datetime +import json + +try: + from config import NINJAONE_CONFIG +except ImportError: + print("Error: Copy config.example.py to config.py and add your credentials") + exit(1) + + +class NinjaOneAPI: + """NinjaOne REST API client for CVE management""" + + def __init__(self): + self.config = NINJAONE_CONFIG + self.base_url = self.config['base_url'] + self.access_token = None + self.token_expires = None + + def authenticate(self) -> bool: + """Authenticate using OAuth2 client credentials flow""" + try: + response = requests.post( + self.config['token_url'], + data={ + 'grant_type': 'client_credentials', + 'client_id': self.config['client_id'], + 'client_secret': self.config['client_secret'], + 'scope': self.config['scope'] + } + ) + response.raise_for_status() + token_data = response.json() + self.access_token = token_data['access_token'] + print("Successfully authenticated with NinjaOne API") + return True + except requests.exceptions.RequestException as e: + print(f"Authentication failed: {e}") + return False + + def _request(self, method: str, endpoint: str, params: Dict = None) -> Optional[Dict]: + """Make authenticated API request""" + if not self.access_token: + if not self.authenticate(): + return None + + headers = { + 'Authorization': f'Bearer {self.access_token}', + 'Content-Type': 'application/json' + } + url = f"{self.base_url}/api/v2{endpoint}" + + try: + response = requests.request(method, url, headers=headers, params=params) + response.raise_for_status() + return response.json() + except requests.exceptions.RequestException as e: + print(f"API request failed: {e}") + return None + + def get_organizations(self) -> List[Dict]: + """Get all organizations""" + result = self._request('GET', '/organizations') + return result if result else [] + + def get_devices(self, org_id: int = None) -> List[Dict]: + """Get all devices, optionally filtered by organization""" + params = {} + if org_id: + params['org'] = org_id + result = self._request('GET', '/devices-detailed', params=params) + return result if result else [] + + def get_device_software(self, device_id: int) -> List[Dict]: + """Get software installed on a device""" + result = self._request('GET', f'/device/{device_id}/software') + return result if result else [] + + def get_os_patches(self, status: str = 'pending') -> List[Dict]: + """ + Get OS patches report + status: 'pending', 'failed', 'rejected', 'installed' + """ + result = self._request('GET', f'/queries/os-patches', {'status': status}) + return result if result else [] + + def get_software_patches(self, status: str = 'pending') -> List[Dict]: + """ + Get software/third-party patches report + status: 'pending', 'failed', 'rejected', 'approved' + """ + result = self._request('GET', f'/queries/software-patches', {'status': status}) + return result if result else [] + + def get_vulnerability_scan_groups(self) -> List[Dict]: + """Get vulnerability scan groups""" + result = self._request('GET', '/vulnerability/scan-groups') + return result if result else [] + + +class CVEAnalyzer: + """Analyze and prioritize CVE remediation across organizations""" + + def __init__(self, api: NinjaOneAPI): + self.api = api + self.organizations = [] + self.devices = [] + self.vulnerabilities = [] + + def collect_data(self): + """Collect all vulnerability-related data from NinjaOne""" + print("\n=== Collecting Data from NinjaOne ===\n") + + # Get organizations + print("Fetching organizations...") + self.organizations = self.api.get_organizations() + print(f" Found {len(self.organizations)} organizations") + + # Get all devices + print("Fetching devices...") + self.devices = self.api.get_devices() + print(f" Found {len(self.devices)} devices") + + # Get pending OS patches + print("Fetching pending OS patches...") + os_patches = self.api.get_os_patches('pending') + print(f" Found {len(os_patches)} pending OS patches") + + # Get pending software patches + print("Fetching pending software patches...") + sw_patches = self.api.get_software_patches('pending') + print(f" Found {len(sw_patches)} pending software patches") + + return { + 'organizations': self.organizations, + 'devices': self.devices, + 'os_patches': os_patches, + 'software_patches': sw_patches + } + + def analyze_by_severity(self, patches: List[Dict]) -> Dict[str, List]: + """Group patches by CVSS severity""" + severity_groups = { + 'critical': [], # CVSS 9.0-10.0 + 'high': [], # CVSS 7.0-8.9 + 'medium': [], # CVSS 4.0-6.9 + 'low': [], # CVSS 0.1-3.9 + 'unknown': [] # No CVSS score + } + + for patch in patches: + cvss = patch.get('cvssScore') or patch.get('cvss_score') + if cvss is None: + severity_groups['unknown'].append(patch) + elif cvss >= 9.0: + severity_groups['critical'].append(patch) + elif cvss >= 7.0: + severity_groups['high'].append(patch) + elif cvss >= 4.0: + severity_groups['medium'].append(patch) + else: + severity_groups['low'].append(patch) + + return severity_groups + + def analyze_by_organization(self, patches: List[Dict]) -> Dict[str, List]: + """Group patches by organization""" + org_groups = {} + for patch in patches: + org_name = patch.get('organizationName', 'Unknown') + if org_name not in org_groups: + org_groups[org_name] = [] + org_groups[org_name].append(patch) + return org_groups + + def generate_report(self, data: Dict) -> str: + """Generate CVE remediation priority report""" + report = [] + report.append("=" * 60) + report.append("CVE REMEDIATION PRIORITY REPORT") + report.append(f"Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") + report.append("=" * 60) + + # Summary + report.append(f"\nORGANIZATIONS: {len(data['organizations'])}") + report.append(f"TOTAL DEVICES: {len(data['devices'])}") + report.append(f"PENDING OS PATCHES: {len(data['os_patches'])}") + report.append(f"PENDING SOFTWARE PATCHES: {len(data['software_patches'])}") + + # Analyze OS patches by severity + if data['os_patches']: + report.append("\n--- OS PATCHES BY SEVERITY ---") + os_severity = self.analyze_by_severity(data['os_patches']) + for level, patches in os_severity.items(): + if patches: + report.append(f" {level.upper()}: {len(patches)}") + + # Analyze software patches by severity + if data['software_patches']: + report.append("\n--- SOFTWARE PATCHES BY SEVERITY ---") + sw_severity = self.analyze_by_severity(data['software_patches']) + for level, patches in sw_severity.items(): + if patches: + report.append(f" {level.upper()}: {len(patches)}") + + # Critical items requiring immediate attention + report.append("\n" + "=" * 60) + report.append("CRITICAL ITEMS REQUIRING IMMEDIATE ATTENTION") + report.append("=" * 60) + + all_patches = data['os_patches'] + data['software_patches'] + critical = [p for p in all_patches if (p.get('cvssScore') or p.get('cvss_score') or 0) >= 9.0] + + if critical: + for patch in critical[:20]: # Top 20 + cve = patch.get('cve', patch.get('cveId', 'N/A')) + name = patch.get('name', patch.get('patchName', 'Unknown')) + cvss = patch.get('cvssScore', patch.get('cvss_score', 'N/A')) + devices = patch.get('deviceCount', patch.get('affectedDevices', 'N/A')) + report.append(f" - {cve} (CVSS: {cvss}) - {name}") + report.append(f" Affected devices: {devices}") + else: + report.append(" No critical vulnerabilities found") + + return "\n".join(report) + + +def main(): + """Main entry point""" + print("NinjaOne CVE Resolution Tool") + print("-" * 40) + + # Initialize API client + api = NinjaOneAPI() + if not api.authenticate(): + return + + # Create analyzer and collect data + analyzer = CVEAnalyzer(api) + data = analyzer.collect_data() + + # Generate report + report = analyzer.generate_report(data) + print(report) + + # Save report + timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') + filename = f"cve_report_{timestamp}.txt" + with open(filename, 'w') as f: + f.write(report) + print(f"\nReport saved to: {filename}") + + +if __name__ == '__main__': + main() diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..a8608b2 --- /dev/null +++ b/requirements.txt @@ -0,0 +1 @@ +requests>=2.28.0