#!/usr/bin/env python3 """ SNMP Scanner for HP and Xerox Printers Walks useful OID ranges and exports to individual CSV files Compatible with pysnmp v7.1 (pysnmp-lextudio) """ import asyncio from pysnmp.hlapi.v3arch.asyncio import * import csv import os from datetime import datetime # Configuration PRINTERS = [ "10.80.92.70", "10.80.92.23", "10.80.92.45", "10.80.92.52", "10.80.92.48", "10.80.92.69", "10.80.92.252", "10.80.92.54", "10.80.92.46", "10.80.92.57", "10.80.92.26", "10.80.92.53", "10.80.92.24", "10.80.92.62", "10.80.92.28", "10.80.92.51", "10.80.92.56", "10.80.92.25", "10.80.92.49", "10.80.92.20", "10.80.92.63", "10.80.92.67", "10.80.92.55", "10.80.92.65", "10.80.92.71", "10.80.92.251", "10.80.92.61", "10.80.92.253", "10.80.92.222" ] COMMUNITY = "WestJeff2025" OUTPUT_DIR = "output" # Optimized OID ranges - only what we need for reports and Zabbix # This significantly speeds up scanning by avoiding unnecessary data OID_RANGES = [ "1.3.6.1.2.1.1", # System info (sysDescr, sysName, sysLocation) "1.3.6.1.2.1.25.3.2.1.3", # Device model "1.3.6.1.2.1.43.5.1.1.17", # Printer serial number "1.3.6.1.2.1.43.10.2.1.4", # Supply max capacity "1.3.6.1.2.1.43.10.2.1.9", # Supply current level "1.3.6.1.2.1.43.11.1.1.6", # Supply descriptions (part numbers) "1.3.6.1.2.1.43.11.1.1.8", # Supply max capacity (toner/ink cartridges) "1.3.6.1.2.1.43.11.1.1.9", # Supply level (percentage or pages) "1.3.6.1.2.1.2.2.1.6", # MAC address "1.3.6.1.4.1.11.2.3.9.4.2.1", # HP maintenance kit info "1.3.6.1.4.1.11.2.3.9.4.2.1.4.1.5.4", # HP DesignJet ink level percentages ] # Specific single OIDs - these are retrieved first for speed SPECIFIC_OIDS = [ "1.3.6.1.2.1.1.1.0", # sysDescr (basic model info) "1.3.6.1.2.1.1.5.0", # sysName (hostname) "1.3.6.1.2.1.1.6.0", # sysLocation "1.3.6.1.2.1.25.3.2.1.3.1", # Device model (primary) ] async def snmp_get(printer_ip, community, oid): """ Perform SNMP GET on a specific OID Returns tuple: (oid, value) or None on error """ try: snmpEngine = SnmpEngine() errorIndication, errorStatus, errorIndex, varBinds = await get_cmd( snmpEngine, CommunityData(community, mpModel=0), await UdpTransportTarget.create((printer_ip, 161), timeout=2.0, retries=2), ContextData(), ObjectType(ObjectIdentity(oid)) ) snmpEngine.close_dispatcher() if errorIndication: return None elif errorStatus: return None else: for varBind in varBinds: return (str(varBind[0]), str(varBind[1])) except Exception as e: return None async def snmp_walk(printer_ip, community, oid): """ Perform SNMP walk on a specific OID for a printer Returns list of tuples: (oid, value) """ results = [] try: snmpEngine = SnmpEngine() async for (errorIndication, errorStatus, errorIndex, varBinds) in walk_cmd( snmpEngine, CommunityData(community, mpModel=0), await UdpTransportTarget.create((printer_ip, 161), timeout=2.0, retries=2), ContextData(), ObjectType(ObjectIdentity(oid)), lexicographicMode=False ): if errorIndication: print(f" Error on {printer_ip}: {errorIndication}") break elif errorStatus: print(f" Error on {printer_ip}: {errorStatus.prettyPrint()} at {errorIndex}") break else: for varBind in varBinds: oid_str = str(varBind[0]) value = str(varBind[1]) results.append((oid_str, value)) snmpEngine.close_dispatcher() except Exception as e: print(f" Exception on {printer_ip}: {e}") return results async def scan_printer(printer_ip): """ Scan a single printer and save results to CSV """ printer_name = f"printer-{printer_ip.replace('.', '-')}.printer.geaerospace.net" csv_filename = os.path.join(OUTPUT_DIR, f"{printer_name}.csv") print(f"Scanning {printer_ip}...") all_results = [] # Get specific OIDs concurrently (much faster) get_tasks = [snmp_get(printer_ip, COMMUNITY, oid) for oid in SPECIFIC_OIDS] get_results = await asyncio.gather(*get_tasks, return_exceptions=True) for result in get_results: if result and not isinstance(result, Exception): all_results.append(result) # Walk each OID range (walks must be sequential per OID) for oid in OID_RANGES: print(f" Walking OID {oid}...") results = await snmp_walk(printer_ip, COMMUNITY, oid) all_results.extend(results) # Save to CSV if all_results: with open(csv_filename, 'w', newline='', encoding='utf-8') as csvfile: writer = csv.writer(csvfile) writer.writerow(['OID', 'Value', 'Scanned_At']) timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S') for oid, value in all_results: writer.writerow([oid, value, timestamp]) print(f" ✓ Saved {len(all_results)} OIDs to {csv_filename}") else: print(f" ✗ No data retrieved from {printer_ip}") return printer_ip, len(all_results) async def scan_printers_batch(printers, batch_size=10): """ Scan printers in batches to avoid overwhelming the network Increased to 10 concurrent scans for faster performance """ results = [] for i in range(0, len(printers), batch_size): batch = printers[i:i + batch_size] print(f"\nBatch {i//batch_size + 1}: Scanning {len(batch)} printers concurrently...") batch_results = await asyncio.gather(*[scan_printer(ip) for ip in batch], return_exceptions=True) for result in batch_results: if isinstance(result, Exception): print(f" Error in batch: {result}") else: results.append(result) return results async def main(): """ Main function to scan all printers """ print("=" * 60) print("SNMP Printer Scanner - Optimized for Reports & Zabbix") print("=" * 60) print(f"Total printers to scan: {len(PRINTERS)}") print(f"Community string: {COMMUNITY}") print(f"Output directory: {OUTPUT_DIR}") print(f"Scanning {len(OID_RANGES)} optimized OID ranges (supplies, model, serial)") print("=" * 60) print() # Create output directory if it doesn't exist os.makedirs(OUTPUT_DIR, exist_ok=True) start_time = datetime.now() # Scan printers in batches (10 concurrent scans for speed) results = await scan_printers_batch(PRINTERS, batch_size=10) end_time = datetime.now() duration = (end_time - start_time).total_seconds() # Summary print() print("=" * 60) print("Scan Complete") print("=" * 60) successful = sum(1 for _, count in results if count > 0) print(f"Successful scans: {successful}/{len(PRINTERS)}") print(f"Total time: {duration:.2f} seconds") print(f"Results saved to: {OUTPUT_DIR}/") print("=" * 60) if __name__ == "__main__": asyncio.run(main())