Building Custom Network Monitoring Tools with Python

Empower your network security with advanced Python-based monitoring solutions

Introduction

In today’s complex digital landscape, robust network monitoring is crucial for maintaining security and performance. Python, with its versatile libraries and ease of use, offers powerful tools for creating custom network monitoring solutions. This guide will explore advanced techniques for both blue team (defensive) and red team (offensive) approaches, integrating UI elements and demonstrating how to scale your tools with Flask.

Essential Python Libraries for Network Monitoring

Core Libraries

  • psutil: System and process utilities
  • scapy: Packet manipulation
  • pyshark: Python wrapper for tshark
  • netifaces: Network interface information

Additional Libraries

  • pysnmp: SNMP protocol implementation
  • netaddr: Network address manipulation
  • shodan: Shodan API wrapper
  • flask: Web application framework

Install these libraries using pip:

pip install psutil scapy pyshark netifaces pysnmp netaddr shodan flask

Blue Team: Defensive Monitoring Techniques

1. Real-time Traffic Analysis

Monitor network traffic in real-time to detect anomalies:

import pyshark
import time

def analyze_traffic(interface):
    capture = pyshark.LiveCapture(interface=interface)
    for packet in capture.sniff_continuously():
        try:
            if 'IP' in packet:
                print(f"Source IP: {packet.ip.src}, Destination IP: {packet.ip.dst}")
                if hasattr(packet, 'tcp'):
                    print(f"Source Port: {packet.tcp.srcport}, Destination Port: {packet.tcp.dstport}")
                elif hasattr(packet, 'udp'):
                    print(f"Source Port: {packet.udp.srcport}, Destination Port: {packet.udp.dstport}")
        except AttributeError:
            pass

analyze_traffic('eth0')  # Replace with your network interface

2. File Access Monitoring

Monitor file access events to detect potential data breaches:

import os
import time
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler

class FileMonitor(FileSystemEventHandler):
    def on_any_event(self, event):
        if event.is_directory:
            return
        print(f"File {event.src_path} was {event.event_type}")

def monitor_directory(path):
    event_handler = FileMonitor()
    observer = Observer()
    observer.schedule(event_handler, path, recursive=True)
    observer.start()
    try:
        while True:
            time.sleep(1)
    except KeyboardInterrupt:
        observer.stop()
    observer.join()

monitor_directory("/path/to/monitor")  # Replace with the directory you want to monitor

3. Firewall Log Analysis

Analyze firewall logs to detect potential security threats:

import re
from collections import Counter

def analyze_firewall_logs(log_file):
    ip_pattern = r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}'
    blocked_ips = []

    with open(log_file, 'r') as f:
        for line in f:
            if 'BLOCK' in line:
                ip_match = re.search(ip_pattern, line)
                if ip_match:
                    blocked_ips.append(ip_match.group())
    
    ip_counts = Counter(blocked_ips)
    print("Top 10 blocked IPs:")
    for ip, count in ip_counts.most_common(10):
        print(f"{ip}: {count} times")

analyze_firewall_logs('/var/log/firewall.log')  # Replace with your firewall log file

Red Team: Offensive Monitoring Techniques

Note: These techniques are for educational purposes only. Always obtain proper authorization before performing any security testing.

1. Network Vulnerability Scanner

Scan network for potential vulnerabilities:

import nmap

def scan_network(target):
    nm = nmap.PortScanner()
    nm.scan(target, arguments='-sV -sS -O -p-')
    
    for host in nm.all_hosts():
        print(f"Host: {host}")
        print(f"State: {nm[host].state()}")
        for proto in nm[host].all_protocols():
            print(f"Protocol: {proto}")
            ports = nm[host][proto].keys()
            for port in ports:
                print(f"Port: {port}\tState: {nm[host][proto][port]['state']}\tService: {nm[host][proto][port]['name']}")

scan_network('192.168.1.0/24')  # Replace with your target network

2. ARP Spoofing Detection

Detect potential ARP spoofing attacks:

from scapy.all import *

def arp_monitor(pkt):
    if ARP in pkt and pkt[ARP].op == 2:
        return f"ARP: {pkt[ARP].psrc} is at {pkt[ARP].hwsrc}"

print("Starting ARP monitoring...")
sniff(prn=arp_monitor, filter="arp", store=0)

3. Shodan Integration for Vulnerability Assessment

Use Shodan to identify potentially vulnerable devices:

import shodan

SHODAN_API_KEY = "your_api_key_here"

def search_vulnerable_devices(query):
    api = shodan.Shodan(SHODAN_API_KEY)
    try:
        results = api.search(query)
        print(f"Results found: {results['total']}")
        for result in results['matches']:
            print(f"IP: {result['ip_str']}")
            print(f"Port: {result['port']}")
            print(f"Organization: {result.get('org', 'N/A')}")
            print(f"Location: {result.get('location', 'N/A')}")
            print(result['data'])
            print('')
    except shodan.APIError as e:
        print(f"Error: {e}")

search_vulnerable_devices("apache")  # Replace with your search query

Building a Web-Based Network Monitoring Dashboard

Create a Flask-based web application to visualize network monitoring data:

from flask import Flask, render_template
import psutil
import netifaces
import threading
import time

app = Flask(__name__)

network_data = {}

def update_network_data():
    while True:
        interfaces = netifaces.interfaces()
        for interface in interfaces:
            try:
                network_data[interface] = psutil.net_io_counters(pernic=True)[interface]
            except KeyError:
                pass
        time.sleep(1)

@app.route('/')
def index():
    return render_template('index.html', network_data=network_data)

if __name__ == '__main__':
    threading.Thread(target=update_network_data, daemon=True).start()
    app.run(debug=True)

Create an HTML template (index.html) in a ‘templates’ folder:

This setup creates a real-time network monitoring dashboard with charts for each network interface.


Advanced Network Monitoring Features


1. Router and Switch Monitoring with SNMP


Monitor network devices using SNMP:


from pysnmp.hlapi import *

def get_snmp_data(ip, community, oid):
    errorIndication, errorStatus, errorIndex, varBinds = next(
        getCmd(SnmpEngine(),
               CommunityData(community),
               UdpTransportTarget((ip, 161)),
               ContextData(),
               ObjectType(ObjectIdentity(oid)))
    )
    if errorIndication:
        print(errorIndication)
    elif errorStatus:
        print('%s at %s' % (errorStatus.prettyPrint(),
                            errorIndex and varBinds[int(errorIndex) - 1][0] or '?'))
    else:
        for varBind in varBinds:
            print(' = '.join([x.prettyPrint() for x in varBind]))

# Example usage
get_snmp_data('192.168.1.1', 'public', '1.3.6.1.2.1.1.1.0')  # System description
get_snmp_data('192.168.1.1', 'public', '1.3.6.1.2.1.2.2.1.10')  # Interface statistics


2. Payload Analysis


Analyze network payloads for potential threats:


from scapy.all import *

def analyze_payload(packet):
    if packet.haslayer(Raw):
        payload = packet[Raw].load
        if b"password" in payload.lower():
            print(f"Potential password transmission detected: {packet.summary()}")
        # Add more payload analysis logic here

sniff(prn=analyze_payload, filter="tcp", store=0)


3. Automated Vulnerability Scanning


Integrate with OpenVAS for automated vulnerability scanning:


from gvm.connections import UnixSocketConnection
from gvm.protocols.latest import Gmp
from gvm.transforms import EtreeCheckCommandTransform

connection = UnixSocketConnection()
transform = EtreeCheckCommandTransform()

def run_vulnerability_scan(target_ip):
    with Gmp(connection, transform=transform) as gmp:
        gmp.authenticate('admin', 'admin')
        
        # Create a target
        target_id = gmp.create_target(name=f"Scan {target_ip}", hosts=[target_ip])
        
        # Create a task
        task_id = gmp.create_task(name=f"Scan Task for {target_ip}", config_id='daba56c8-73ec-11df-a475-002264764cea', target_id=target_id)
        
        # Start the scan
        report_id = gmp.start_task(task_id)
        
        print(f"Scan started for {target_ip}. Report ID: {report_id}")

run_vulnerability_scan('192.168.1.100')  # Replace with target IP


Best Practices and Security Considerations


    1. Data Encryption: Always encrypt sensitive data, especially when transmitting over networks.


    1. Secure Authentication: Implement strong authentication mechanisms for accessing monitoring tools.


    1. Regular Updates: Keep all libraries and dependencies up-to-date to address potential vulnerabilities.


    1. Least Privilege Principle: Run monitoring scripts with the minimum necessary privileges.


    1. Data Retention Policies: Implement proper data retention and deletion policies for collected network data.


    1. Logging and Auditing: Maintain comprehensive logs of all monitoring activities for auditing purposes.


    1. Ethical Considerations: Always obtain proper authorization before performing any network monitoring or testing activities.


Conclusion


Building custom network monitoring tools with Python empowers IT professionals to create tailored solutions for their specific network environments. By combining defensive and offensive techniques, integrating with powerful libraries, and creating user-friendly interfaces, you can develop comprehensive network monitoring systems that enhance security, performance, and visibility across your infrastructure.


Remember to always use these tools responsibly and in compliance with all relevant laws and regulations. Happy coding, and may your networks remain secure and efficient!


Join Our Community!

🌟 Get exclusive insights and the latest IT tools and scripts, straight to your inbox.

πŸ”’ We respect your privacy. Unsubscribe at any time.

Andy N

Information Technology Support Analyst with over seven years of experience (in the telecommunications and manufacturing industries) ranging from user support to administering and maintaining core IT systems.

Related Posts

×