Close Menu
    DevStackTipsDevStackTips
    • Home
    • News & Updates
      1. Tech & Work
      2. View All

      Sunshine And March Vibes (2025 Wallpapers Edition)

      June 2, 2025

      The Case For Minimal WordPress Setups: A Contrarian View On Theme Frameworks

      June 2, 2025

      How To Fix Largest Contentful Paint Issues With Subpart Analysis

      June 2, 2025

      How To Prevent WordPress SQL Injection Attacks

      June 2, 2025

      How Red Hat just quietly, radically transformed enterprise server Linux

      June 2, 2025

      OpenAI wants ChatGPT to be your ‘super assistant’ – what that means

      June 2, 2025

      The best Linux VPNs of 2025: Expert tested and reviewed

      June 2, 2025

      One of my favorite gaming PCs is 60% off right now

      June 2, 2025
    • Development
      1. Algorithms & Data Structures
      2. Artificial Intelligence
      3. Back-End Development
      4. Databases
      5. Front-End Development
      6. Libraries & Frameworks
      7. Machine Learning
      8. Security
      9. Software Engineering
      10. Tools & IDEs
      11. Web Design
      12. Web Development
      13. Web Security
      14. Programming Languages
        • PHP
        • JavaScript
      Featured

      `document.currentScript` is more useful than I thought.

      June 2, 2025
      Recent

      `document.currentScript` is more useful than I thought.

      June 2, 2025

      Adobe Sensei and GenAI in Practice for Enterprise CMS

      June 2, 2025

      Over The Air Updates for React Native Apps

      June 2, 2025
    • Operating Systems
      1. Windows
      2. Linux
      3. macOS
      Featured

      You can now open ChatGPT on Windows 11 with Win+C (if you change the Settings)

      June 2, 2025
      Recent

      You can now open ChatGPT on Windows 11 with Win+C (if you change the Settings)

      June 2, 2025

      Microsoft says Copilot can use location to change Outlook’s UI on Android

      June 2, 2025

      TempoMail — Command Line Temporary Email in Linux

      June 2, 2025
    • Learning Resources
      • Books
      • Cheatsheets
      • Tutorials & Guides
    Home»Development»How to Build a Real-Time Intrusion Detection System with Python and Open-Source Libraries

    How to Build a Real-Time Intrusion Detection System with Python and Open-Source Libraries

    January 21, 2025

    An Intrusion Detection System (IDS) is like a security camera for your network. Just as security cameras help identify suspicious activities in the physical world, an IDS will monitor your network to help detect any potential cyber attacks and security breaches.

    By the end of this tutorial, you will know how an IDS works and be able to build your own real-time network monitoring system using Python.

    Table of Contents

    • Understanding the Types of IDS

    • How to Setup Your Development Environment

    • Building the Core IDS Components

      • Building the Packet Capture Engine

      • Building the Traffic Analysis Module

      • Building the Detection Engine

      • Building the Alert System

      • Putting It All Together

    • Ideas to Extend the IDS

    • Security Considerations

    • Testing the IDS on Mock Data

    • Wrapping Up

    Understanding the Types of IDS

    Before we jump into the coding part, let’s understand the types of IDS:

    1. Network-based IDS (NIDS): This system monitors network traffic for suspicious activity.

    2. Host-based IDS (HIDS): This system monitors system logs and file changes on individual hosts and is not directly deployed in the network.

    3. Signature-based IDS: This system is either in the network or on the host and identifies attack patterns based on known patterns.

    4. Anomaly-based IDS: This system identifies unusual behavior using heuristics and prediction algorithms that are trained on previously seen attack patterns.

    For this tutorial, you will be building a hybrid system that combines signature-based and anomaly-based detection systems to monitor network traffic.

    How to Setup Your Development Environment

    Let’s start by setting up our Python environment (I’m using Python 3) and installing the following prerequisites:

    pip install scapy
    pip install python-nmap
    pip install numpy
    pip install sklearn
    

    Building the Core IDS Components

    Our IDS will comprise of four main components:

    1. A packet capture system

    2. Traffic analysis module

    3. A detection engine

    4. An alert system

    Building the Packet Capture Engine

    Let’s start with the packet capture engine. We’ll use Scapy for this. Scapy is a networking library that allows us to perform network and network-related operations using Python.

    First, we’ll define our PacketCapture class that will serve as the basis of our IDS.

    from scapy.all import sniff, IP, TCP
    from collections import defaultdict
    import threading
    import queue
    
    class PacketCapture:
        def __init__(self):
            self.packet_queue = queue.Queue()
            self.stop_capture = threading.Event()
    
        def packet_callback(self, packet):
            if IP in packet and TCP in packet:
                self.packet_queue.put(packet)
    
        def start_capture(self, interface="eth0"):
            def capture_thread():
                sniff(iface=interface,
                      prn=self.packet_callback,
                      store=0,
                      stop_filter=lambda _: self.stop_capture.is_set())
    
            self.capture_thread = threading.Thread(target=capture_thread)
            self.capture_thread.start()
    
        def stop(self):
            self.stop_capture.set()
            self.capture_thread.join()
    

    Let’s quickly walk through the code and understand what these functions do. For this, you will be using threading and queues to efficiently process and capture network packets.

    The init method initializes the class by creating a queue.Queue to store captured packets and a threading Event to control when the packet capture should stop. The packet_callback method acts as a handler for each captured packet and checks if the packet contains both IP and TCP layers. If so, it adds it to the queue for further processing.

    The start_capture method begins capturing packets on a specified interface (defaulting to eth0 to capture packets from the Ethernet interface). Run ifconfig to understand the available interfaces and select the appropriate interface from the list.

    The function spawns a separate thread to run Scapy’s sniff function, which continuously monitors the interface for packets. The stop_filter parameter ensures the capture stops when the stop_capture event is triggered.

    The stop method stops the capture by setting the stop_capture event and waits for the thread to finish execution, ensuring the process terminates cleanly. This design allows for seamless real-time packet capturing without blocking the main thread.

    Building the Traffic Analysis Module

    Now, let’s write the traffic analysis module. This module will process captured packets and extract relevant features.

    class TrafficAnalyzer:
        def __init__(self):
            self.connections = defaultdict(list)
            self.flow_stats = defaultdict(lambda: {
                'packet_count': 0,
                'byte_count': 0,
                'start_time': None,
                'last_time': None
            })
    
        def analyze_packet(self, packet):
            if IP in packet and TCP in packet:
                ip_src = packet[IP].src
                ip_dst = packet[IP].dst
                port_src = packet[TCP].sport
                port_dst = packet[TCP].dport
    
                flow_key = (ip_src, ip_dst, port_src, port_dst)
    
                # Update flow statistics
                stats = self.flow_stats[flow_key]
                stats['packet_count'] += 1
                stats['byte_count'] += len(packet)
                current_time = packet.time
    
                if not stats['start_time']:
                    stats['start_time'] = current_time
                stats['last_time'] = current_time
    
                return self.extract_features(packet, stats)
    
        def extract_features(self, packet, stats):
            return {
                'packet_size': len(packet),
                'flow_duration': stats['last_time'] - stats['start_time'],
                'packet_rate': stats['packet_count'] / (stats['last_time'] - stats['start_time']),
                'byte_rate': stats['byte_count'] / (stats['last_time'] - stats['start_time']),
                'tcp_flags': packet[TCP].flags,
                'window_size': packet[TCP].window
            }
    

    In this code section, we define the TrafficAnalyzer class to analyze network traffic. Here we track connection flows and calculate statistics for packets in real time. We use the defaultdict data structure in Python to manage connections and flow statistics by organizing data by unique flows.

    The __init__ method initializes two attributes: connections, which stores lists of related packets for each flow, and flow_stats, which stores aggregated statistics for each flow, such as packet count, byte count, start time, and the time of the most recent packet.

    The analyze_packet method processes each packet. If the packet contains IP and TCP layers, it extracts the source and destination IPs and ports, forming a unique flow_key to identify the flow. It updates the statistics for the flow by incrementing the packet count, adding the packet’s size to the byte count, and setting or updating the start and last time of the flow. Eventually, it calls extract_features to calculate and return additional metrics.

    The extract_features method computes detailed characteristics of the flow and the current packet. These include the packet size, flow duration, packet rate, byte rate, TCP flags, and the TCP window size. These metrics are quite useful to identify patterns, anomalies, or potential threats in network traffic.

    Building the Detection Engine

    Now we will define our detection engine that will implement both the signature as well as the anomaly-based detection mechanisms:

    from sklearn.ensemble import IsolationForest
    import numpy as np
    
    class DetectionEngine:
        def __init__(self):
            self.anomaly_detector = IsolationForest(
                contamination=0.1,
                random_state=42
            )
            self.signature_rules = self.load_signature_rules()
            self.training_data = []
    
        def load_signature_rules(self):
            return {
                'syn_flood': {
                    'condition': lambda features: (
                        features['tcp_flags'] == 2 and  # SYN flag
                        features['packet_rate'] > 100
                    )
                },
                'port_scan': {
                    'condition': lambda features: (
                        features['packet_size'] < 100 and
                        features['packet_rate'] > 50
                    )
                }
            }
    
        def train_anomaly_detector(self, normal_traffic_data):
            self.anomaly_detector.fit(normal_traffic_data)
    
        def detect_threats(self, features):
            threats = []
    
            # Signature-based detection
            for rule_name, rule in self.signature_rules.items():
                if rule['condition'](features):
                    threats.append({
                        'type': 'signature',
                        'rule': rule_name,
                        'confidence': 1.0
                    })
    
            # Anomaly-based detection
            feature_vector = np.array([[
                features['packet_size'],
                features['packet_rate'],
                features['byte_rate']
            ]])
    
            anomaly_score = self.anomaly_detector.score_samples(feature_vector)[0]
            if anomaly_score < -0.5:  # Threshold for anomaly detection
                threats.append({
                    'type': 'anomaly',
                    'score': anomaly_score,
                    'confidence': min(1.0, abs(anomaly_score))
                })
    
            return threats
    

    This code defines a hybrid system that combines the signature-based and anomaly-based detection methods. We use the Isolation Forest model to detect anomalies and also use pre-defined rules for identifying specific attack patterns. If you would like to know more about how the Isolation Forest model works, check out this article.

    Hostinger

    In this code snippet, the train_anomaly_detector method trains the Isolation Forest model using a dataset of normal traffic features. This enables the model to differentiate typical traffic patterns from anomalies.

    The detect_threats method evaluates network traffic features for potential threats using two approaches:

    1. Signature-Based Detection: It iteratively goes through each of the predefined rules, applying the rule’s condition to the traffic features. If a rule matches, a signature-based threat is recorded with high confidence.

    2. Anomaly-Based Detection: It processes the feature vector (packet size, packet rate, and byte rate) through the Isolation Forest model to calculate an anomaly score. If the score indicates unusual behavior, the detection engine triggers it as an anomaly and produces a confidence score proportional to the anomaly’s severity.

    Finally, we return the aggregated list of identified threats with their respective annotation (either signature or anomaly), the rule or score that triggered the anomaly, and a confidence score that suggests how likely it is that the identified pattern is a threat.

    Building the Alert System

    Now let’s build the last component of our IDS which is the alert system. It will process and log detected threats in a structured way. You will also have the option to extend the system to include additional notification mechanisms like Slack, Jira tickets, and so on

    import logging
    import json
    from datetime import datetime
    
    class AlertSystem:
        def __init__(self, log_file="ids_alerts.log"):
            self.logger = logging.getLogger("IDS_Alerts")
            self.logger.setLevel(logging.INFO)
    
            handler = logging.FileHandler(log_file)
            formatter = logging.Formatter(
                '%(asctime)s - %(levelname)s - %(message)s'
            )
            handler.setFormatter(formatter)
            self.logger.addHandler(handler)
    
        def generate_alert(self, threat, packet_info):
            alert = {
                'timestamp': datetime.now().isoformat(),
                'threat_type': threat['type'],
                'source_ip': packet_info.get('source_ip'),
                'destination_ip': packet_info.get('destination_ip'),
                'confidence': threat.get('confidence', 0.0),
                'details': threat
            }
    
            self.logger.warning(json.dumps(alert))
    
            if threat['confidence'] > 0.8:
                self.logger.critical(
                    f"High confidence threat detected: {json.dumps(alert)}"
                )
                # Implement additional notification methods here
                # (e.g., email, Slack, SIEM integration)
    

    The init method sets up a logger named IDS_Alerts with an INFO logging level to capture alert information. It writes logs to a specified file, ids_alerts.log by default. A FileHandler directs logs to the file, while the Formatter ensures the logs follow a consistent format.

    The generate_alert method is responsible for creating structured alert entries. Each alert includes key information such as the timestamp of detection, the type of threat, the source and destination IPs involved, the confidence level of the detection, and additional threat-specific details. These alerts are logged as WARNING level messages in JSON format.

    If the confidence level of a detected threat is high (greater than 0.8), the alert is escalated and logged as a CRITICAL level message. Note that this method is designed to be extensible, allowing for additional notification mechanisms, such as sending alerts via email or integrating with third-party systems like Slack or SIEM solutions.

    Putting it All Together

    Now let’s integrate all the components together into our fully functional IDS solution:

    class IntrusionDetectionSystem:
        def __init__(self, interface="eth0"):
            self.packet_capture = PacketCapture()
            self.traffic_analyzer = TrafficAnalyzer()
            self.detection_engine = DetectionEngine()
            self.alert_system = AlertSystem()
    
            self.interface = interface
    
        def start(self):
            print(f"Starting IDS on interface {self.interface}")
            self.packet_capture.start_capture(self.interface)
    
            while True:
                try:
                    packet = self.packet_capture.packet_queue.get(timeout=1)
                    features = self.traffic_analyzer.analyze_packet(packet)
    
                    if features:
                        threats = self.detection_engine.detect_threats(features)
    
                        for threat in threats:
                            packet_info = {
                                'source_ip': packet[IP].src,
                                'destination_ip': packet[IP].dst,
                                'source_port': packet[TCP].sport,
                                'destination_port': packet[TCP].dport
                            }
                            self.alert_system.generate_alert(threat, packet_info)
    
                except queue.Empty:
                    continue
                except KeyboardInterrupt:
                    print("Stopping IDS...")
                    self.packet_capture.stop()
                    break
    
    if __name__ == "__main__":
        ids = IntrusionDetectionSystem()
        ids.start()
    

    In this code, the IntrusionDetectionSystem class sets up its core components: PacketCapture for capturing packets from a network interface, TrafficAnalyzer for extracting and analyzing packet features, DetectionEngine for identifying threats using both signature-based and anomaly-based methods, and AlertSystem for logging and escalating detected threats. The interface parameter specifies the network interface to monitor, defaulting to eth0 (the generally named ethernet interface on most systems).

    The start function initiates the IDS. It begins by starting packet capture on the specified interface and enters a loop to continuously process incoming packets. For each packet captured, the system extracts its features using the TrafficAnalyzer and analyzes them for potential threats using the DetectionEngine. If any threats are detected, the system generates detailed alerts through the AlertSystem.

    The system runs in a loop until interrupted by either of the two key exceptions: queue.Empty, which occurs if no packets are available for processing, and KeyboardInterrupt, which stops the IDS gracefully by halting packet capture and exiting the loop.

    Ideas to Extend the IDS

    To enhance or extend the IDS, you can consider designing or implementing the following features / improvements:

    1. Machine Learning enhancements: You can enhance the IDS capabilities by incorporating deep learning models like Auto Encoders for anomaly detection and using RNNs for sequential pattern analysis. This will improve the system’s ability to identify complex and evolving threats by leveraging advanced feature engineering.

    2. Performance optimizations: You can optimize the IDS using PyPy for faster execution, packet sampling to handle high-traffic networks, and parallel processing to scale the system efficiently.

    3. Integration capabilities: You can extend the IDS by considering support for a REST API for remote monitoring, enabling seamless interaction with external systems.

    Security Considerations

    When deploying the IDS, note that the system is a proof-of-concept and is not intended for production use-cases. Also keep the following things in mind:

    • Run the system with appropriate permissions (root/admin required for packet capture)

    • Secure the alert logs and implement proper log rotation

    • Regularly update signature rules and retrain anomaly detection models

    • Monitor system resource usage, especially in high-traffic environments

    • Implement proper access controls for the IDS configuration and alerts

    Testing the IDS on Mock Data

    To validate the functionality of your IDS, you can test it using mock data that will simulate real-world network traffic. This will allow you to observe how the system processes packets, analyzes traffic, and generates alerts without requiring a live network environment.

    Use the following function to test the IDS:

    from scapy.all import IP, TCP
    
    def test_ids():
        # Create test packets to simulate various scenarios
        test_packets = [
            # Normal traffic
            IP(src="192.168.1.1", dst="192.168.1.2") / TCP(sport=1234, dport=80, flags="A"),
            IP(src="192.168.1.3", dst="192.168.1.4") / TCP(sport=1235, dport=443, flags="P"),
    
            # SYN flood simulation
            IP(src="10.0.0.1", dst="192.168.1.2") / TCP(sport=5678, dport=80, flags="S"),
            IP(src="10.0.0.2", dst="192.168.1.2") / TCP(sport=5679, dport=80, flags="S"),
            IP(src="10.0.0.3", dst="192.168.1.2") / TCP(sport=5680, dport=80, flags="S"),
    
            # Port scan simulation
            IP(src="192.168.1.100", dst="192.168.1.2") / TCP(sport=4321, dport=22, flags="S"),
            IP(src="192.168.1.100", dst="192.168.1.2") / TCP(sport=4321, dport=23, flags="S"),
            IP(src="192.168.1.100", dst="192.168.1.2") / TCP(sport=4321, dport=25, flags="S"),
        ]
    
        ids = IntrusionDetectionSystem()
    
        # Simulate packet processing and threat detection
        print("Starting IDS Test...")
        for i, packet in enumerate(test_packets, 1):
            print(f"nProcessing packet {i}: {packet.summary()}")
    
            # Analyze the packet
            features = ids.traffic_analyzer.analyze_packet(packet)
    
            if features:
                # Detect threats based on features
                threats = ids.detection_engine.detect_threats(features)
    
                if threats:
                    print(f"Detected threats: {threats}")
                else:
                    print("No threats detected.")
            else:
                print("Packet does not contain IP/TCP layers or is ignored.")
    
        print("nIDS Test Completed.")
    
    if __name__ == "__main__":
        test_ids()
    

    This will test the system against a variety of attacks like SYN flooding and port scanning.

    Wrapping Up

    Now you know how to build a basic intrusion detection system with Python and a few open-source libraries! This IDS demonstrates some core concepts of network security and real-time threat detection.

    Keep in mind that this tutorial is for educational purposes only. There are professionally designed enterprise-grade systems like Snort and Suricata that can handle advanced threats and large-scale deployments.

    I hope you gained insights into network security fundamentals and learned how Python can be used to build practical security solutions.

    Source: freeCodeCamp Programming Tutorials: Python, JavaScript, Git & More 

    Facebook Twitter Reddit Email Copy Link
    Previous ArticleDeepSeek’s new open-source AI model can outperform o1 for a fraction of the cost
    Next Article How to Integrate Discord Webhooks with Next.js 15 – Example Project

    Related Posts

    Security

    Chrome Zero-Day Alert: CVE-2025-5419 Actively Exploited in the Wild

    June 2, 2025
    Security

    CISA Adds 5 Actively Exploited Vulnerabilities to KEV Catalog: ASUS Routers, Craft CMS, and ConnectWise Targeted

    June 2, 2025
    Leave A Reply Cancel Reply

    Continue Reading

    New ‘Cuckoo’ Persistent macOS Spyware Targeting Intel and Arm Macs

    Development

    U.S. Seizes Domains Used by AI-Powered Russian Bot Farm for Disinformation

    Development

    Fortifying Debian With SELinux by Enforcing Mandatory Access Control for Ultimate System Security

    Learning Resources

    FACTS Grounding: A new benchmark for evaluating the factuality of large language models

    Artificial Intelligence

    Highlights

    5 Best Business Smartphones – Top Picks for Every Professional

    December 6, 2024

    Are you contemplating getting a smartphone that can meet your business needs? Here are the…

    How to migrate WordPress to Webflow – Steps to follow!

    June 28, 2024

    This $41 controller with Hall Effect sticks converted me to PC gaming

    April 28, 2025

    CVE-2025-44619 – Tinxy WiFi Lock Controller RF Authentication Bypass

    May 30, 2025
    © DevStackTips 2025. All rights reserved.
    • Contact
    • Privacy Policy

    Type above and press Enter to search. Press Esc to cancel.