SNMP Parser Documentationο
A comprehensive Python package for parsing and analyzing SNMP walk outputs with advanced features for network discovery, parallel operations, and structured data export.
π§ This tool is focused on parsing actual SNMP command-line output β not compiling MIBs.
Table of Contentsο
Featuresο
π Comprehensive SNMP Operationsο
Parse standard and numeric SNMP walk outputs
Execute live SNMP operations (walk, get, bulk walk)
Support for SNMP v1, v2c, and v3
Parallel SNMP operations across multiple hosts
Network discovery for SNMP-enabled devices
π Data Processing & Analysisο
Group results by SNMP tables (ifTable, ipAddrTable, etc.)
Extract system information automatically
Filter entries by OID patterns
Support for all standard SNMP data types
πΎ Export & Integrationο
Export to JSON, CSV formats
Structured data models for easy integration
Command-line interface for automation
File-based parsing for offline analysis
β‘ Performance & Reliabilityο
Concurrent execution with ThreadPoolExecutor
Retry mechanisms with exponential backoff
Timeout handling and error recovery
Memory-efficient parsing for large datasets
Installationο
Requirementsο
Python 3.8 or higher
SNMP tools (snmpwalk, snmpget, snmpset, snmpbulkwalk) for live operations
Install from PyPIο
pip install snmpwalk-parser
Install from Sourceο
git clone https://github.com/kunalraut666/snmpwalk-parser.git
cd snmpwalk-parser
pip install -e .
Install SNMP Tools (Optional)ο
For live SNMP operations, install the SNMP utilities:
Ubuntu/Debian:
sudo apt-get install snmp snmp-mibs-downloader
CentOS/RHEL:
sudo yum install net-snmp-utils
macOS:
brew install net-snmp
Quick Startο
Basic Parsingο
from snmpwalk_parser import SNMPParser
# Parse SNMP output from file
parser = SNMPParser()
entries = parser.parse_file("snmpwalk_output.txt")
# Display results
for entry in entries[:5]: # Show first 5 entries
print(f"OID: {entry.oid}")
print(f"Value: {entry.value}")
print(f"Type: {entry.type}")
print("---")
Live SNMP Operationsο
from snmpwalk_parser import SNMPRunner
# Initialize SNMP runner
runner = SNMPRunner()
# Perform SNMP walk
result = runner.run_snmpwalk(
host="192.168.1.1",
community="public",
oid="1.3.6.1.2.1.1" # System information
)
print(f"Retrieved {result.get_entry_count()} entries")
print(f"System info: {result.system_info}")
Export Resultsο
# Export to JSON
result.export_to_file("output.json", "json")
# Export to CSV
result.export_to_file("output.csv", "csv")
# Get as dictionary
data = result.to_dict()
Core Componentsο
SNMPParser Classο
The core parsing engine that processes SNMP walk outputs.
from snmpwalk_parser.core import SNMPParser
parser = SNMPParser()
# Parse text output
entries = parser.parse_snmpwalk_output(snmp_text)
# Parse from file
entries = parser.parse_file("output.txt")
# Group by tables
tables = parser.group_by_table(entries)
# Extract system information
system_info = parser.get_system_info(entries)
# Get interface information
interfaces = parser.get_interface_info(entries)
SNMPRunner Classο
Handles live SNMP operations with advanced features.
from snmpwalk_parser.snmp_runner import SNMPRunner
# Initialize with custom settings
runner = SNMPRunner(timeout=30, retries=3)
# SNMP Walk
result = runner.run_snmpwalk("192.168.1.1", "public")
# SNMP Get
result = runner.run_snmpget("192.168.1.1", "public", ["sysDescr.0", "sysName.0"])
# SNMP Bulk Walk (faster for large tables)
result = runner.run_snmpbulkwalk("192.168.1.1", "public", "1.3.6.1.2.1.2")
# Parallel operations
hosts = ["192.168.1.1", "192.168.1.2", "192.168.1.3"]
results = runner.run_parallel_snmpwalk(hosts, "public")
# Network discovery
discovered_hosts = runner.discover_snmp_hosts("192.168.1.0/24", "public")
Data Modelsο
SNMPEntryο
Represents a single SNMP entry.
from snmpwalk_parser.models import SNMPEntry
entry = SNMPEntry(
oid="1.3.6.1.2.1.1.1.0",
key="sysDescr",
index="0",
type="STRING",
value="Linux server 5.4.0",
raw_value='"Linux server 5.4.0"'
)
# Check OID format
print(entry.is_numeric_oid()) # True
print(entry.is_named_oid()) # False
# Get table name
print(entry.get_table_name()) # "sysDescr"
# Convert to dictionary
data = entry.to_dict()
SNMPWalkResultο
Container for complete SNMP walk results.
from snmpwalk_parser.models import SNMPWalkResult
result = SNMPWalkResult(
host="192.168.1.1",
community="public",
oid="1.3.6.1.2.1"
)
# Add entries
for entry in parsed_entries:
result.add_entry(entry)
# Access data
print(f"Total entries: {result.get_entry_count()}")
print(f"Tables found: {result.get_table_names()}")
print(f"System info: {result.system_info}")
# Get summary
summary = result.get_summary()
CLI Usageο
The package includes a comprehensive command-line interface for all operations.
Installationο
The CLI is automatically available after installing the package:
snmpwalk-parser --help
Commandsο
SNMP Walkο
# Basic walk
snmpwalk-parser walk 192.168.1.1 -c public
# Walk specific OID
snmpwalk-parser walk 192.168.1.1 -c public -o 1.3.6.1.2.1.1
# Save to file
snmpwalk-parser walk 192.168.1.1 -c public --output results.json --format json
# Show system information
snmpwalk-parser walk 192.168.1.1 -c public --show-system
# Show interface information
snmpwalk-parser walk 192.168.1.1 -c public --show-interfaces
SNMP Getο
# Get specific OIDs
snmpwalk-parser get 192.168.1.1 -c public -o sysDescr.0 sysName.0
# Export to CSV
snmpwalk-parser get 192.168.1.1 -c public -o sysDescr.0 --output system.csv --format csv
Bulk Walkο
# Bulk walk for better performance
snmpwalk-parser bulk 192.168.1.1 -c public -o 1.3.6.1.2.1.2
# Custom repetitions
snmpwalk-parser bulk 192.168.1.1 -c public -o 1.3.6.1.2.1.2 -m 25
Parse Existing Outputο
# Parse saved SNMP output
snmpwalk-parser parse -f snmpwalk_output.txt
# Filter by pattern
snmpwalk-parser parse -f output.txt --filter "ifDescr.*"
# Export parsed data
snmpwalk-parser parse -f output.txt --output parsed.json --format json
Network Discoveryο
# Discover SNMP hosts
snmpwalk-parser discover 192.168.1.0/24 -c public
# Custom timeout
snmpwalk-parser discover 10.0.0.0/16 -c public -t 10
# Save discovered hosts
snmpwalk-parser discover 192.168.1.0/24 -c public --output hosts.json
Parallel Operationsο
# Parallel walk on multiple hosts
snmpwalk-parser parallel -H 192.168.1.1 192.168.1.2 192.168.1.3 -c public
# Custom worker count
snmpwalk-parser parallel -H 192.168.1.{1..10} -c public -w 20
# Export results
snmpwalk-parser parallel -H 192.168.1.1 192.168.1.2 -c public --output parallel.json
Global Optionsο
# Verbose output
snmpwalk-parser walk 192.168.1.1 -c public -v
# Disable colors
snmpwalk-parser walk 192.168.1.1 -c public --no-color
# Custom timeout and retries
snmpwalk-parser walk 192.168.1.1 -c public -t 60 -r 5
# SNMP version
snmpwalk-parser walk 192.168.1.1 -c public -V 1
API Referenceο
SNMPParser Methodsο
parse_snmpwalk_output(text: str) -> List[SNMPEntry]ο
Parse SNMP walk output text.
Parameters:
text: Raw SNMP walk output
Returns:
List of SNMPEntry objects
parse_file(file_path: str) -> List[SNMPEntry]ο
Parse SNMP output from file.
Parameters:
file_path: Path to file containing SNMP output
Returns:
List of SNMPEntry objects
group_by_table(entries: List[SNMPEntry]) -> Dict[str, SNMPTable]ο
Group entries by SNMP table.
Parameters:
entries: List of SNMPEntry objects
Returns:
Dictionary mapping table names to SNMPTable objects
get_system_info(entries: List[SNMPEntry]) -> Dict[str, Any]ο
Extract system information from entries.
Parameters:
entries: List of SNMPEntry objects
Returns:
Dictionary with system information
SNMPRunner Methodsο
run_snmpwalk(host, community, oid, version, timeout, retries) -> SNMPWalkResultο
Execute SNMP walk operation.
Parameters:
host: Target host IP or hostnamecommunity: SNMP community string (default: βpublicβ)oid: Starting OID (default: ββ for full walk)version: SNMP version (β1β, β2cβ, β3β, default: β2cβ)timeout: Timeout in seconds (optional)retries: Number of retries (optional)
Returns:
SNMPWalkResult object
run_snmpget(host, community, oids, version, timeout) -> SNMPWalkResultο
Execute SNMP get operation.
Parameters:
host: Target hostcommunity: SNMP community stringoids: List of OIDs to retrieveversion: SNMP version (default: β2cβ)timeout: Timeout in seconds (optional)
Returns:
SNMPWalkResult object
run_parallel_snmpwalk(hosts, community, oid, version, max_workers) -> Dictο
Execute parallel SNMP walks.
Parameters:
hosts: List of host addressescommunity: SNMP community stringoid: Starting OID (default: ββ)version: SNMP version (default: β2cβ)max_workers: Number of parallel workers (default: 10)
Returns:
Dictionary mapping hosts to results
discover_snmp_hosts(network, community, timeout) -> List[str]ο
Discover SNMP-enabled hosts in network.
Parameters:
network: Network in CIDR notation (e.g., β192.168.1.0/24β)community: SNMP community stringtimeout: Timeout per host (default: 5)
Returns:
List of discovered host addresses
Examplesο
Example 1: Network Interface Analysisο
from snmpwalk_parser import SNMPRunner, SNMPParser
runner = SNMPRunner()
parser = SNMPParser()
# Get interface table
result = runner.run_snmpwalk("192.168.1.1", "public", "1.3.6.1.2.1.2")
# Extract interface information
interfaces = parser.get_interface_info(result.entries)
for iface in interfaces:
print(f"Interface {iface['index']}: {iface.get('description', 'N/A')}")
print(f" Speed: {iface.get('speed', 'Unknown')}")
print(f" Status: {iface.get('status', 'Unknown')}")
Example 2: System Monitoringο
import json
from snmpwalk_parser import SNMPRunner
def monitor_systems(hosts, community="public"):
runner = SNMPRunner()
# Get system information from all hosts
results = runner.run_parallel_snmpwalk(
hosts=hosts,
community=community,
oid="1.3.6.1.2.1.1" # System MIB
)
monitoring_data = {}
for host, result in results.items():
if hasattr(result, 'system_info'):
monitoring_data[host] = {
'system_description': result.system_info.get('system_description'),
'system_uptime': result.system_info.get('system_uptime'),
'system_name': result.system_info.get('system_name'),
'system_location': result.system_info.get('system_location')
}
else:
monitoring_data[host] = {'error': str(result)}
return monitoring_data
# Monitor multiple systems
hosts = ["192.168.1.1", "192.168.1.2", "192.168.1.3"]
data = monitor_systems(hosts)
# Save monitoring data
with open("monitoring_report.json", "w") as f:
json.dump(data, f, indent=2)
Example 3: Network Discovery and Inventoryο
from snmpwalk_parser import SNMPRunner
import csv
def network_inventory(network, community="public"):
runner = SNMPRunner()
print(f"Discovering devices in {network}...")
hosts = runner.discover_snmp_hosts(network, community)
print(f"Found {len(hosts)} SNMP-enabled devices")
inventory = []
for host in hosts:
try:
# Get system information
result = runner.run_snmpwalk(host, community, "1.3.6.1.2.1.1")
device_info = {
'ip_address': host,
'system_description': result.system_info.get('system_description', 'Unknown'),
'system_name': result.system_info.get('system_name', 'Unknown'),
'system_location': result.system_info.get('system_location', 'Unknown'),
'system_contact': result.system_info.get('system_contact', 'Unknown'),
'uptime': result.system_info.get('system_uptime', 'Unknown')
}
inventory.append(device_info)
print(f"β {host}: {device_info['system_description'][:50]}...")
except Exception as e:
print(f"β {host}: Failed - {e}")
return inventory
# Perform network inventory
devices = network_inventory("192.168.1.0/24")
# Export to CSV
with open("network_inventory.csv", "w", newline="") as f:
if devices:
writer = csv.DictWriter(f, fieldnames=devices[0].keys())
writer.writeheader()
writer.writerows(devices)
print(f"Inventory saved to network_inventory.csv ({len(devices)} devices)")
Example 4: Custom SNMP Data Processingο
from snmpwalk_parser import SNMPParser
import re
def analyze_interface_utilization(snmp_file):
parser = SNMPParser()
entries = parser.parse_file(snmp_file)
# Filter interface-related entries
interface_entries = parser.filter_by_oid_pattern(entries, r"1\.3\.6\.1\.2\.1\.2\.2\.1\.")
# Group by interface index
interfaces = {}
for entry in interface_entries:
index = entry.index
if index not in interfaces:
interfaces[index] = {}
# Map OID to readable names
oid_map = {
'2': 'description',
'3': 'type',
'5': 'speed',
'8': 'oper_status',
'10': 'in_octets',
'16': 'out_octets',
'13': 'in_discards',
'19': 'out_discards',
'14': 'in_errors',
'20': 'out_errors'
}
oid_suffix = entry.oid.split('.')[-2]
if oid_suffix in oid_map:
interfaces[index][oid_map[oid_suffix]] = entry.value
# Calculate utilization (simplified example)
for idx, iface in interfaces.items():
if 'in_octets' in iface and 'out_octets' in iface and 'speed' in iface:
try:
total_octets = int(iface['in_octets']) + int(iface['out_octets'])
speed_bps = int(iface['speed'])
# This is a simplified calculation - real utilization needs time sampling
print(f"Interface {idx} ({iface.get('description', 'Unknown')}):")
print(f" Speed: {speed_bps:,} bps")
print(f" Total octets: {total_octets:,}")
print(f" Status: {iface.get('oper_status', 'Unknown')}")
print()
except (ValueError, TypeError):
continue
# Analyze interface data
analyze_interface_utilization("interface_walk.txt")
Advanced Usageο
Custom SNMP Types Processingο
from snmpwalk_parser.core import SNMPParser
class CustomSNMPParser(SNMPParser):
def __init__(self):
super().__init__()
# Add custom type processors
self.type_processors['CUSTOM_TYPE'] = self._process_custom_type
def _process_custom_type(self, value: str):
# Custom processing logic
return f"processed_{value}"
# Use custom parser
parser = CustomSNMPParser()
entries = parser.parse_file("custom_output.txt")
Batch Processingο
from snmpwalk_parser import SNMPRunner
from concurrent.futures import ThreadPoolExecutor
import glob
def process_network_batch(network_configs):
"""Process multiple networks in parallel."""
runner = SNMPRunner()
def process_network(config):
network = config['network']
community = config['community']
hosts = runner.discover_snmp_hosts(network, community)
results = runner.run_parallel_snmpwalk(hosts, community)
return {
'network': network,
'hosts_found': len(hosts),
'successful_queries': sum(1 for r in results.values()
if hasattr(r, 'entries')),
'results': results
}
with ThreadPoolExecutor(max_workers=5) as executor:
futures = [executor.submit(process_network, config)
for config in network_configs]
return [future.result() for future in futures]
# Process multiple networks
networks = [
{'network': '192.168.1.0/24', 'community': 'public'},
{'network': '192.168.2.0/24', 'community': 'private'},
{'network': '10.0.0.0/24', 'community': 'public'}
]
results = process_network_batch(networks)
Error Handling and Loggingο
import logging
from snmpwalk_parser import SNMPRunner, SNMPError
# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def robust_snmp_query(host, community, max_attempts=3):
runner = SNMPRunner(timeout=30, retries=2)
for attempt in range(max_attempts):
try:
result = runner.run_snmpwalk(host, community)
logger.info(f"Successfully queried {host} on attempt {attempt + 1}")
return result
except Exception as e:
logger.warning(f"Attempt {attempt + 1} failed for {host}: {e}")
if attempt == max_attempts - 1:
logger.error(f"All attempts failed for {host}")
return SNMPError(code=1, message=str(e), details=f"Host: {host}")
return None
# Use robust querying
result = robust_snmp_query("192.168.1.1", "public")
if isinstance(result, SNMPError):
print(f"Query failed: {result}")
else:
print(f"Query successful: {result.get_entry_count()} entries")
Performance Optimizationο
Memory Efficient Processingο
from snmpwalk_parser import SNMPParser
def process_large_file_efficiently(file_path, chunk_size=1000):
"""Process large SNMP files in chunks to optimize memory usage."""
parser = SNMPParser()
with open(file_path, 'r') as f:
lines = []
for line in f:
lines.append(line.strip())
if len(lines) >= chunk_size:
# Process chunk
chunk_text = '\n'.join(lines)
entries = parser.parse_snmpwalk_output(chunk_text)
# Process entries (e.g., save to database, analyze, etc.)
yield entries
lines = [] # Reset for next chunk
# Process remaining lines
if lines:
chunk_text = '\n'.join(lines)
entries = parser.parse_snmpwalk_output(chunk_text)
yield entries
# Process large file
for chunk_entries in process_large_file_efficiently("large_snmp_output.txt"):
print(f"Processed chunk with {len(chunk_entries)} entries")
Optimized Parallel Operationsο
from snmpwalk_parser import SNMPRunner
import asyncio
from concurrent.futures import ThreadPoolExecutor
class OptimizedSNMPRunner:
def __init__(self, max_workers=50):
self.runner = SNMPRunner()
self.max_workers = max_workers
async def query_hosts_async(self, hosts, community, oid=""):
loop = asyncio.get_event_loop()
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
tasks = [
loop.run_in_executor(
executor,
self.runner.run_snmpwalk,
host, community, oid
)
for host in hosts
]
results = await asyncio.gather(*tasks, return_exceptions=True)
return dict(zip(hosts, results))
# Usage
async def main():
optimizer = OptimizedSNMPRunner(max_workers=100)
hosts = [f"192.168.1.{i}" for i in range(1, 255)]
results = await optimizer.query_hosts_async(hosts, "public")
successful = sum(1 for r in results.values() if not isinstance(r, Exception))
print(f"Successfully queried {successful}/{len(hosts)} hosts")
# Run async operation
# asyncio.run(main())
Contributingο
We welcome contributions! Please see our contributing guidelines:
Development Setupο
# Clone repository
git clone https://github.com/kunalraut666/snmpwalk-parser.git
cd snmpwalk-parser
# Create virtual environment
python -m venv venv
source venv/bin/activate # On Windows: venv\Scripts\activate
# Install development dependencies
pip install -e .[dev]
# Run tests
pytest tests/
# Run linting
flake8 src/
black src/
# Generate documentation
cd docs
make html
Testingο
# Run all tests
pytest
# Run with coverage
pytest --cov=snmpwalk_parser
# Run specific test categories
pytest tests/test_parser.py
pytest tests/test_runner.py
pytest tests/test_cli.py
Submitting Changesο
Fork the repository
Create a feature branch:
git checkout -b feature-nameMake your changes and add tests
Ensure all tests pass:
pytestCommit your changes:
git commit -am 'Add feature'Push to the branch:
git push origin feature-nameSubmit a pull request
Changelogο
Version 1.0.1ο
Initial release
Basic SNMP parsing functionality
Live SNMP operations
CLI interface
Parallel processing
Network discovery
Version 1.1.0 (Planned)ο
SNMP v3 authentication support
Advanced filtering options
Database integration
Web interface
Performance optimizations
Licenseο
This project is licensed under the MIT License β see the LICENSE file for details.
Supportο
Documentation: https://snmpwalk-parser.readthedocs.io
Issues: https://github.com/kunalraut666/snmpwalk-parser/issues
PyPI: https://pypi.org/project/snmpwalk-parser/
Repository: https://github.com/kunalraut666/snmpwalk-parser
Made with β€οΈ by Kunal Raut