MarkTechPost@AI 前天 08:40
A Code Implementation of a Real‑Time In‑Memory Sensor Alert Pipeline in Google Colab with FastStream, RabbitMQ, TestRabbitBroker, Pydantic
index_new5.html
../../../zaker_core/zaker_tpl_static/wap/tpl_guoji1.html

 

本文介绍如何在Google Colab中使用FastStream构建一个完全基于内存的“传感器警报”管道,FastStream是一个高性能的Python原生流处理框架,并结合RabbitMQ。通过利用faststream.rabbit的RabbitBroker和TestRabbitBroker,模拟消息代理,而无需外部基础设施。该管道由四个不同的阶段组成:摄取和验证、规范化、监控和警报生成以及归档,每个阶段都定义为Pydantic模型,以确保数据质量和类型安全。在底层,Python的asyncio驱动异步消息流,而nest_asyncio支持Colab中的嵌套事件循环。同时使用标准日志模块进行可追踪的管道执行,并使用pandas进行最终结果检查,从而可以轻松地在DataFrame中可视化已归档的警报。

✅ 管道构建基于FastStream框架,结合RabbitMQ的RabbitBroker和TestRabbitBroker实现。该框架允许在没有外部基础设施的情况下,模拟消息代理,简化了开发流程。

✅ 管道包含四个关键阶段:摄取和验证、规范化、监控和警报生成、以及归档。每个阶段都由Pydantic模型定义,确保了数据质量和类型安全。

✅ 管道使用Python的asyncio进行异步消息流,以及nest_asyncio支持Colab中的嵌套事件循环,保证了运行环境的兼容性。

✅ 管道使用标准日志模块进行可追踪的管道执行,并使用pandas进行最终结果检查,方便数据分析与可视化。

In this notebook, we demonstrate how to build a fully in-memory “sensor alert” pipeline in Google Colab using FastStream, a high-performance, Python-native stream processing framework, and its integration with RabbitMQ. By leveraging faststream.rabbit’s RabbitBroker and TestRabbitBroker, we simulate a message broker without needing external infrastructure. We orchestrate four distinct stages: ingestion & validation, normalization, monitoring & alert generation, and archiving, each defined as Pydantic models (RawSensorData, NormalizedData, AlertData) to ensure data quality and type safety. Under the hood, Python’s asyncio powers asynchronous message flow, while nest_asyncio enables nested event loops in Colab. We also employ the standard logging module for traceable pipeline execution and pandas for final result inspection, making it easy to visualize archived alerts in a DataFrame.

!pip install -q faststream[rabbit] nest_asyncio

We install FastStream with its RabbitMQ integration, providing the core stream-processing framework and broker connectors, as well as the nest_asyncio package, which enables nested asyncio event loops in environments like Colab. All this is achieved while keeping the output minimal with the -q flag.

import nest_asyncio, asyncio, loggingnest_asyncio.apply()

We import the nest_asyncio, asyncio, and logging modules, then apply nest_asyncio.apply() to patch Python’s event loop so that you can run nested asynchronous tasks inside environments like Colab or Jupyter notebooks without errors. The logging import readies you to instrument your pipeline with detailed runtime logs.

logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")logger = logging.getLogger("sensor_pipeline")

We configure Python’s built‑in logging to emit INFO‑level (and above) messages prefixed with a timestamp and severity, then create a dedicated logger named “sensor_pipeline” for emitting structured logs within your streaming pipeline.

from faststream import FastStreamfrom faststream.rabbit import RabbitBroker, TestRabbitBrokerfrom pydantic import BaseModel, Field, validatorimport pandas as pdfrom typing import List

We bring in FastStream’s core FastStream class alongside its RabbitMQ connectors (RabbitBroker for real brokers and TestRabbitBroker for in‑memory testing), Pydantic’s BaseModel, Field, and validator for declarative data validation, pandas for tabular result inspection, and Python’s List type for annotating our in‑memory archives.

broker = RabbitBroker("amqp://guest:guest@localhost:5672/")app    = FastStream(broker)

We instantiate a RabbitBroker pointed at a (local) RabbitMQ server using the AMQP URL, then create a FastStream application bound to that broker, setting up the messaging backbone for your pipeline stages.

class RawSensorData(BaseModel):    sensor_id: str       = Field(..., examples=["sensor_1"])    reading_celsius: float = Field(..., ge=-50, le=150, examples=[23.5])       @validator("sensor_id")    def must_start_with_sensor(cls, v):        if not v.startswith("sensor_"):            raise ValueError("sensor_id must start with 'sensor_'")        return vclass NormalizedData(BaseModel):    sensor_id: str    reading_kelvin: floatclass AlertData(BaseModel):    sensor_id: str    reading_kelvin: float    alert: bool

These Pydantic models define the schema for each stage: RawSensorData enforces input validity (e.g., reading range and a sensor_ prefix), NormalizedData converts Celsius to Kelvin, and AlertData encapsulates the final alert payload (including a boolean flag), ensuring a type-safe data flow throughout the pipeline.

archive: List[AlertData] = []@broker.subscriber("sensor_input")@broker.publisher("normalized_input")async def ingest_and_validate(raw: RawSensorData) -> dict:    logger.info(f"Ingested raw data: {raw.json()}")    return raw.dict()@broker.subscriber("normalized_input")@broker.publisher("sensor_alert")async def normalize(data: dict) -> dict:    norm = NormalizedData(        sensor_id=data["sensor_id"],        reading_kelvin=data["reading_celsius"] + 273.15    )    logger.info(f"Normalized to Kelvin: {norm.json()}")    return norm.dict()ALERT_THRESHOLD_K = 323.15     @broker.subscriber("sensor_alert")@broker.publisher("archive_topic")async def monitor(data: dict) -> dict:    alert_flag = data["reading_kelvin"] > ALERT_THRESHOLD_K    alert = AlertData(        sensor_id=data["sensor_id"],        reading_kelvin=data["reading_kelvin"],        alert=alert_flag    )    logger.info(f"Monitor result: {alert.json()}")    return alert.dict()@broker.subscriber("archive_topic")async def archive_data(payload: dict):    rec = AlertData(**payload)    archive.append(rec)    logger.info(f"Archived: {rec.json()}")

An in-memory archive list collects all finalized alerts, while four asynchronous functions, wired via @broker.subscriber/@broker.publisher, form the pipeline stages. These functions ingest and validate raw sensor inputs, convert Celsius to Kelvin, check against an alert threshold, and finally archive each AlertData record, emitting logs at every step for full traceability.

async def main():    readings = [        {"sensor_id": "sensor_1", "reading_celsius": 45.2},        {"sensor_id": "sensor_2", "reading_celsius": 75.1},        {"sensor_id": "sensor_3", "reading_celsius": 50.0},    ]    async with TestRabbitBroker(broker) as tb:        for r in readings:            await tb.publish(r, "sensor_input")        await asyncio.sleep(0.1)           df = pd.DataFrame([a.dict() for a in archive])    print("\nFinal Archived Alerts:")    display(df)asyncio.run(main())

Finally, the main coroutine publishes a set of sample sensor readings into the in-memory TestRabbitBroker, pauses briefly to allow each pipeline stage to run, and then collates the resulting AlertData records from the archive into a pandas DataFrame for easy display and verification of the end-to-end alert flow. At the end, asyncio.run(main()) kicks off the entire async demo in Colab.

In conclusion, this tutorial demonstrates how FastStream, combined with RabbitMQ abstractions and in-memory testing via TestRabbitBroker, can accelerate the development of real-time data pipelines without the overhead of deploying external brokers. With Pydantic handling schema validation, asyncio managing concurrency, and pandas enabling quick data analysis, this pattern provides a robust foundation for sensor monitoring, ETL tasks, or event‑driven workflows. You can seamlessly transition from this in‑memory demo to production by swapping in a live broker URL (RabbitMQ, Kafka, NATS, or Redis) and running faststream run under uvicorn or your preferred ASGI server, unlocking scalable, maintainable stream processing in any Python environment.


Here is the Colab Notebook. Also, don’t forget to follow us on Twitter and join our Telegram Channel and LinkedIn Group. Don’t Forget to join our 90k+ ML SubReddit.

[Register Now] miniCON Virtual Conference on AGENTIC AI: FREE REGISTRATION + Certificate of Attendance + 4 Hour Short Event (May 21, 9 am- 1 pm PST) + Hands on Workshop

The post A Code Implementation of a Real‑Time In‑Memory Sensor Alert Pipeline in Google Colab with FastStream, RabbitMQ, TestRabbitBroker, Pydantic appeared first on MarkTechPost.

Fish AI Reader

Fish AI Reader

AI辅助创作,多种专业模板,深度分析,高质量内容生成。从观点提取到深度思考,FishAI为您提供全方位的创作支持。新版本引入自定义参数,让您的创作更加个性化和精准。

FishAI

FishAI

鱼阅,AI 时代的下一个智能信息助手,助你摆脱信息焦虑

联系邮箱 441953276@qq.com

相关标签

FastStream RabbitMQ 传感器警报 Google Colab Pydantic
相关文章