Close Menu
    DevStackTipsDevStackTips
    • Home
    • News & Updates
      1. Tech & Work
      2. View All

      This week in AI dev tools: Gemini 2.5 Pro and Flash GA, GitHub Copilot Spaces, and more (June 20, 2025)

      June 20, 2025

      Gemini 2.5 Pro and Flash are generally available and Gemini 2.5 Flash-Lite preview is announced

      June 19, 2025

      CSS Cascade Layers Vs. BEM Vs. Utility Classes: Specificity Control

      June 19, 2025

      IBM launches new integration to help unify AI security and governance

      June 18, 2025

      “We’re creating a game that’s steeped in dark fantasy elements” — Capcom talks building a samurai adventure with Onimusha: Way of the Sword

      June 22, 2025

      I tested this beastly gaming laptop with flawless performance — but I’m obsessed with a different feature

      June 22, 2025

      I changed 10 settings on my Fire TV for better performance and fewer distractions

      June 22, 2025

      I love that transparent technology is making a comeback — and one of the biggest gaming companies has joined the fun

      June 21, 2025
    • Development
      1. Algorithms & Data Structures
      2. Artificial Intelligence
      3. Back-End Development
      4. Databases
      5. Front-End Development
      6. Libraries & Frameworks
      7. Machine Learning
      8. Security
      9. Software Engineering
      10. Tools & IDEs
      11. Web Design
      12. Web Development
      13. Web Security
      14. Programming Languages
        • PHP
        • JavaScript
      Featured

      Understanding JavaScript Promise

      June 22, 2025
      Recent

      Understanding JavaScript Promise

      June 22, 2025

      Lakeflow: Revolutionizing SCD2 Pipelines with Change Data Capture (CDC)

      June 21, 2025

      vitorccs/laravel-csv

      June 21, 2025
    • Operating Systems
      1. Windows
      2. Linux
      3. macOS
      Featured

      “We’re creating a game that’s steeped in dark fantasy elements” — Capcom talks building a samurai adventure with Onimusha: Way of the Sword

      June 22, 2025
      Recent

      “We’re creating a game that’s steeped in dark fantasy elements” — Capcom talks building a samurai adventure with Onimusha: Way of the Sword

      June 22, 2025

      I tested this beastly gaming laptop with flawless performance — but I’m obsessed with a different feature

      June 22, 2025

      6 Best Free and Open Source Graphical Data Hashing Tools

      June 22, 2025
    • Learning Resources
      • Books
      • Cheatsheets
      • Tutorials & Guides
    Home»Development»Machine Learning»A Code Implementation of a Real‑Time In‑Memory Sensor Alert Pipeline in Google Colab with FastStream, RabbitMQ, TestRabbitBroker, Pydantic

    A Code Implementation of a Real‑Time In‑Memory Sensor Alert Pipeline in Google Colab with FastStream, RabbitMQ, TestRabbitBroker, Pydantic

    April 21, 2025

    In this notebook, we demonstrate how to build a fully in-memory “sensor alert” pipeline in Google Colab using FastStream, a high-performance, Python-native stream processing framework, and its integration with RabbitMQ. By leveraging faststream.rabbit’s RabbitBroker and TestRabbitBroker, we simulate a message broker without needing external infrastructure. We orchestrate four distinct stages: ingestion & validation, normalization, monitoring & alert generation, and archiving, each defined as Pydantic models (RawSensorData, NormalizedData, AlertData) to ensure data quality and type safety. Under the hood, Python’s asyncio powers asynchronous message flow, while nest_asyncio enables nested event loops in Colab. We also employ the standard logging module for traceable pipeline execution and pandas for final result inspection, making it easy to visualize archived alerts in a DataFrame.

    Copy CodeCopiedUse a different Browser
    !pip install -q faststream[rabbit] nest_asyncio

    We install FastStream with its RabbitMQ integration, providing the core stream-processing framework and broker connectors, as well as the nest_asyncio package, which enables nested asyncio event loops in environments like Colab. All this is achieved while keeping the output minimal with the -q flag.

    Copy CodeCopiedUse a different Browser
    import nest_asyncio, asyncio, logging
    nest_asyncio.apply()
    

    We import the nest_asyncio, asyncio, and logging modules, then apply nest_asyncio.apply() to patch Python’s event loop so that you can run nested asynchronous tasks inside environments like Colab or Jupyter notebooks without errors. The logging import readies you to instrument your pipeline with detailed runtime logs.

    Copy CodeCopiedUse a different Browser
    logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
    logger = logging.getLogger("sensor_pipeline")

    We configure Python’s built‑in logging to emit INFO‑level (and above) messages prefixed with a timestamp and severity, then create a dedicated logger named “sensor_pipeline” for emitting structured logs within your streaming pipeline.

    Copy CodeCopiedUse a different Browser
    from faststream import FastStream
    from faststream.rabbit import RabbitBroker, TestRabbitBroker
    from pydantic import BaseModel, Field, validator
    import pandas as pd
    from typing import List

    We bring in FastStream’s core FastStream class alongside its RabbitMQ connectors (RabbitBroker for real brokers and TestRabbitBroker for in‑memory testing), Pydantic’s BaseModel, Field, and validator for declarative data validation, pandas for tabular result inspection, and Python’s List type for annotating our in‑memory archives.

    Copy CodeCopiedUse a different Browser
    broker = RabbitBroker("amqp://guest:guest@localhost:5672/")
    app    = FastStream(broker)
    

    We instantiate a RabbitBroker pointed at a (local) RabbitMQ server using the AMQP URL, then create a FastStream application bound to that broker, setting up the messaging backbone for your pipeline stages.

    Copy CodeCopiedUse a different Browser
    class RawSensorData(BaseModel):
        sensor_id: str       = Field(..., examples=["sensor_1"])
        reading_celsius: float = Field(..., ge=-50, le=150, examples=[23.5])
       
        @validator("sensor_id")
        def must_start_with_sensor(cls, v):
            if not v.startswith("sensor_"):
                raise ValueError("sensor_id must start with 'sensor_'")
            return v
    
    
    class NormalizedData(BaseModel):
        sensor_id: str
        reading_kelvin: float
    
    
    class AlertData(BaseModel):
        sensor_id: str
        reading_kelvin: float
        alert: bool

    These Pydantic models define the schema for each stage: RawSensorData enforces input validity (e.g., reading range and a sensor_ prefix), NormalizedData converts Celsius to Kelvin, and AlertData encapsulates the final alert payload (including a boolean flag), ensuring a type-safe data flow throughout the pipeline.

    Copy CodeCopiedUse a different Browser
    archive: List[AlertData] = []
    
    
    @broker.subscriber("sensor_input")
    @broker.publisher("normalized_input")
    async def ingest_and_validate(raw: RawSensorData) -> dict:
        logger.info(f"Ingested raw data: {raw.json()}")
        return raw.dict()
    
    
    @broker.subscriber("normalized_input")
    @broker.publisher("sensor_alert")
    async def normalize(data: dict) -> dict:
        norm = NormalizedData(
            sensor_id=data["sensor_id"],
            reading_kelvin=data["reading_celsius"] + 273.15
        )
        logger.info(f"Normalized to Kelvin: {norm.json()}")
        return norm.dict()
    
    
    ALERT_THRESHOLD_K = 323.15  
       
    @broker.subscriber("sensor_alert")
    @broker.publisher("archive_topic")
    async def monitor(data: dict) -> dict:
        alert_flag = data["reading_kelvin"] > ALERT_THRESHOLD_K
        alert = AlertData(
            sensor_id=data["sensor_id"],
            reading_kelvin=data["reading_kelvin"],
            alert=alert_flag
        )
        logger.info(f"Monitor result: {alert.json()}")
        return alert.dict()
    
    
    @broker.subscriber("archive_topic")
    async def archive_data(payload: dict):
        rec = AlertData(**payload)
        archive.append(rec)
        logger.info(f"Archived: {rec.json()}")

    An in-memory archive list collects all finalized alerts, while four asynchronous functions, wired via @broker.subscriber/@broker.publisher, form the pipeline stages. These functions ingest and validate raw sensor inputs, convert Celsius to Kelvin, check against an alert threshold, and finally archive each AlertData record, emitting logs at every step for full traceability.

    Copy CodeCopiedUse a different Browser
    async def main():
        readings = [
            {"sensor_id": "sensor_1", "reading_celsius": 45.2},
            {"sensor_id": "sensor_2", "reading_celsius": 75.1},
            {"sensor_id": "sensor_3", "reading_celsius": 50.0},
        ]
        async with TestRabbitBroker(broker) as tb:
            for r in readings:
                await tb.publish(r, "sensor_input")
            await asyncio.sleep(0.1)
           
        df = pd.DataFrame([a.dict() for a in archive])
        print("nFinal Archived Alerts:")
        display(df)
    
    
    asyncio.run(main())
    

    Finally, the main coroutine publishes a set of sample sensor readings into the in-memory TestRabbitBroker, pauses briefly to allow each pipeline stage to run, and then collates the resulting AlertData records from the archive into a pandas DataFrame for easy display and verification of the end-to-end alert flow. At the end, asyncio.run(main()) kicks off the entire async demo in Colab.

    In conclusion, this tutorial demonstrates how FastStream, combined with RabbitMQ abstractions and in-memory testing via TestRabbitBroker, can accelerate the development of real-time data pipelines without the overhead of deploying external brokers. With Pydantic handling schema validation, asyncio managing concurrency, and pandas enabling quick data analysis, this pattern provides a robust foundation for sensor monitoring, ETL tasks, or event‑driven workflows. You can seamlessly transition from this in‑memory demo to production by swapping in a live broker URL (RabbitMQ, Kafka, NATS, or Redis) and running faststream run under uvicorn or your preferred ASGI server, unlocking scalable, maintainable stream processing in any Python environment.


    Here is the Colab Notebook. Also, don’t forget to follow us on Twitter and join our Telegram Channel and LinkedIn Group. Don’t Forget to join our 90k+ ML SubReddit.

    🔥 [Register Now] miniCON Virtual Conference on AGENTIC AI: FREE REGISTRATION + Certificate of Attendance + 4 Hour Short Event (May 21, 9 am- 1 pm PST) + Hands on Workshop

    The post A Code Implementation of a Real‑Time In‑Memory Sensor Alert Pipeline in Google Colab with FastStream, RabbitMQ, TestRabbitBroker, Pydantic appeared first on MarkTechPost.

    Source: Read More 

    Facebook Twitter Reddit Email Copy Link
    Previous ArticleBuild an automated generative AI solution evaluation pipeline with Amazon Nova
    Next Article Anthropic Releases a Comprehensive Guide to Building Coding Agents with Claude Code

    Related Posts

    Machine Learning

    How to Evaluate Jailbreak Methods: A Case Study with the StrongREJECT Benchmark

    June 22, 2025
    Machine Learning

    Texas A&M Researchers Introduce a Two-Phase Machine Learning Method Named ‘ShockCast’ for High-Speed Flow Simulation with Neural Temporal Re-Meshing

    June 22, 2025
    Leave A Reply Cancel Reply

    For security, use of Google's reCAPTCHA service is required which is subject to the Google Privacy Policy and Terms of Use.

    Continue Reading

    CVE-2025-4466 – iSourcecode Gym Management System SQL Injection Vulnerability

    Common Vulnerabilities and Exposures (CVEs)

    Oracle ERP Test Automation Guide – Examples and Best Practices

    Development

    CVE-2025-49651 – Lablup BackendAI Unauthenticated Session Hijacking

    Common Vulnerabilities and Exposures (CVEs)

    One of World of Warcraft’s deadliest entities makes a world-shattering return after nearly 20 years — and he’s city-sized

    News & Updates

    Highlights

    CVE-2025-3200: Wiesemann & Theis Com-Server Devices Exposed by Deprecated TLS Protocols

    April 29, 2025

    CVE-2025-3200: Wiesemann & Theis Com-Server Devices Exposed by Deprecated TLS Protocols

    A coordinated security advisory from CERT@VDE and Wiesemann & Theis GmbH has revealed critical vulnerabilities impacting several Wiesemann & Theis products, including the Com-Server++ and related mode …
    Read more

    Published Date:
    Apr 29, 2025 (3 hours, 3 minutes ago)

    Vulnerabilities has been mentioned in this article.

    CVE-2025-3200

    CVE-2025-46617

    CVE-2025-46616

    AI stirs up the recipe for concrete in MIT study

    June 2, 2025

    Dynatrace Live Debugger, Mistral Agents API, and more – SD Times Daily Digest

    May 29, 2025

    Top Smart Car Accessories in 2025: Enhancing Connectivity and Safety on the Road

    June 17, 2025
    © DevStackTips 2025. All rights reserved.
    • Contact
    • Privacy Policy

    Type above and press Enter to search. Press Esc to cancel.