MarkTechPost@AI 前天 13:40
A Coding Guide to Build and Validate End-to-End Partitioned Data Pipelines in Dagster with Machine Learning Integration
index_new5.html
../../../zaker_core/zaker_tpl_static/wap/tpl_guoji1.html

 

本文详细介绍了如何使用Dagster构建一个端到端的分区数据流水线。教程涵盖了安装必要的库(Dagster, Pandas, scikit-learn),设置自定义的CSV IOManager来持久化数据资产,定义每日分区以独立处理数据,以及创建一系列资产来处理销售数据。这些资产包括生成合成销售数据、清洗数据(处理缺失值和异常值)、进行特征工程(创建交互项和标准化特征),并添加数据质量检查以验证数据的完整性。最后,通过训练一个简单的线性回归模型并输出评估指标,展示了如何将机器学习集成到流水线中,确保整个流程的可重现性和健壮性。

📊 **数据流水线构建与分区管理**:教程通过安装Dagster、Pandas和scikit-learn等库,并设置了一个基础目录和起始日期,为构建数据流水线奠定了基础。核心在于定义了`DailyPartitionsDefinition`,使得数据能够按天进行独立处理和管理,这是构建可扩展和可维护数据流水线的关键一步。同时,自定义`CSVIOManager`确保了数据资产(DataFrame)能够被高效地保存为CSV文件,并在需要时重新加载,实现了数据的持久化和复用。

🧼 **数据清洗与特征工程**:流水线中的`clean_sales`资产负责处理原始销售数据中的常见问题,如删除包含缺失值的行(`dropna`)并使用分位数(`quantile`)来裁剪极端异常值,从而提高数据的稳定性。紧随其后,`features`资产执行了关键的特征工程步骤,通过创建交互项(如`units_sq`和`units_promo`)以及对关键特征进行标准化(`z_score`),为下游的机器学习模型准备了更具表现力的输入,这是提升模型性能的重要环节。

✅ **数据质量检查与模型训练**:为了保证数据在整个流水线中的质量,`clean_sales_quality`资产被设计为执行严格的数据质量检查,验证数据中是否存在空值,`promo`字段是否仅包含0或1,以及`units`字段是否在预期的裁剪范围内。这些检查是确保数据可靠性的重要保障。随后,`tiny_model_metrics`资产展示了如何将机器学习模型集成到流水线中,通过训练一个简单的线性回归模型,并输出其在训练集上的R²分数和各特征的系数,为模型评估和迭代提供了基础。

🚀 **端到端执行与可重现性**:通过`Definitions`对象将所有资产、资产检查和资源(如`csv_io_manager`)进行注册,并最终通过`materialize`函数执行整个数据流水线。这种方式确保了从数据生成到模型训练的整个流程是可重现的,并且每个环节的输出(包括元数据和模型指标)都被妥善存储和记录。教程最后还演示了如何检查输出文件的存在和大小,以及加载JSON格式的模型指标,直观地验证了流水线的成功运行。

In this tutorial, we implement an advanced data pipeline using Dagster. We set up a custom CSV-based IOManager to persist assets, define partitioned daily data generation, and process synthetic sales data through cleaning, feature engineering, and model training. Along the way, we add a data-quality asset check to validate nulls, ranges, and categorical values, and we ensure that metadata and outputs are stored in a structured way. The focus throughout is on hands-on implementation, showing how to integrate raw data ingestion, transformations, quality checks, and machine learning into a single reproducible workflow.

import sys, subprocess, json, ossubprocess.check_call([sys.executable, "-m", "pip", "install", "-q", "dagster", "pandas", "scikit-learn"])import numpy as np, pandas as pdfrom pathlib import Pathfrom dagster import (   asset, AssetCheckResult, asset_check, Definitions, materialize, Output,   DailyPartitionsDefinition, IOManager, io_manager)from sklearn.linear_model import LinearRegressionBASE = Path("/content/dagstore"); BASE.mkdir(parents=True, exist_ok=True)START = "2025-08-01" 

We begin by installing the required libraries, Dagster, Pandas, and scikit-learn, so that we have the full toolset available in Colab. We then import essential modules, set up NumPy and Pandas for data handling, and define a base directory along with a start date to organize our pipeline outputs.

class CSVIOManager(IOManager):   def __init__(self, base: Path): self.base = base   def _path(self, key, ext): return self.base / f"{'_'.join(key.path)}.{ext}"   def handle_output(self, context, obj):       if isinstance(obj, pd.DataFrame):           p = self._path(context.asset_key, "csv"); obj.to_csv(p, index=False)           context.log.info(f"Saved {context.asset_key} -> {p}")       else:           p = self._path(context.asset_key, "json"); p.write_text(json.dumps(obj, indent=2))           context.log.info(f"Saved {context.asset_key} -> {p}")   def load_input(self, context):       k = context.upstream_output.asset_key; p = self._path(k, "csv")       df = pd.read_csv(p); context.log.info(f"Loaded {k} <- {p} ({len(df)} rows)"); return df@io_managerdef csv_io_manager(_): return CSVIOManager(BASE)daily = DailyPartitionsDefinition(start_date=START)

We define a custom CSVIOManager to save asset outputs as CSV or JSON files and reload them when needed. We then register it with Dagster as csv_io_manager and set up a daily partitioning scheme so that our pipeline can process data for each date independently.

@asset(partitions_def=daily, description="Synthetic raw sales with noise & occasional nulls.")def raw_sales(context) -> Output[pd.DataFrame]:   rng = np.random.default_rng(42)   n = 200; day = context.partition_key   x = rng.normal(100, 20, n); promo = rng.integers(0, 2, n); noise = rng.normal(0, 10, n)   sales = 2.5 * x + 30 * promo + noise + 50   x[rng.choice(n, size=max(1, n // 50), replace=False)] = np.nan   df = pd.DataFrame({"date": day, "units": x, "promo": promo, "sales": sales})   meta = {"rows": n, "null_units": int(df["units"].isna().sum()), "head": df.head().to_markdown()}   return Output(df, metadata=meta)@asset(description="Clean nulls, clip outliers for robust downstream modeling.")def clean_sales(context, raw_sales: pd.DataFrame) -> Output[pd.DataFrame]:   df = raw_sales.dropna(subset=["units"]).copy()   lo, hi = df["units"].quantile([0.01, 0.99]); df["units"] = df["units"].clip(lo, hi)   meta = {"rows": len(df), "units_min": float(df.units.min()), "units_max": float(df.units.max())}   return Output(df, metadata=meta)@asset(description="Feature engineering: interactions & standardized columns.")def features(context, clean_sales: pd.DataFrame) -> Output[pd.DataFrame]:   df = clean_sales.copy()   df["units_sq"] = df["units"] ** 2; df["units_promo"] = df["units"] * df["promo"]   for c in ["units", "units_sq", "units_promo"]:       mu, sigma = df[c].mean(), df[c].std(ddof=0) or 1.0       df[f"z_{c}"] = (df[c] - mu) / sigma   return Output(df, metadata={"rows": len(df), "cols": list(df.columns)})

We create three core assets for the pipeline. First, raw_sales generates synthetic daily sales data with noise and occasional missing values, simulating real-world imperfections. Next, clean_sales removes nulls and clips outliers to stabilize the dataset, while logging metadata about ranges and row counts. Finally, features perform feature engineering by adding interaction and standardized variables, preparing the data for downstream modeling.

@asset_check(asset=clean_sales, description="No nulls; promo in {0,1}; units within clipped bounds.")def clean_sales_quality(clean_sales: pd.DataFrame) -> AssetCheckResult:   nulls = int(clean_sales.isna().sum().sum())   promo_ok = bool(set(clean_sales["promo"].unique()).issubset({0, 1}))   units_ok = bool(clean_sales["units"].between(clean_sales["units"].min(), clean_sales["units"].max()).all())   passed = bool((nulls == 0) and promo_ok and units_ok)   return AssetCheckResult(       passed=passed,       metadata={"nulls": nulls, "promo_ok": promo_ok, "units_ok": units_ok},   )@asset(description="Train a tiny linear regressor; emit R^2 and coefficients.")def tiny_model_metrics(context, features: pd.DataFrame) -> dict:   X = features[["z_units", "z_units_sq", "z_units_promo", "promo"]].values   y = features["sales"].values   model = LinearRegression().fit(X, y)   return {"r2_train": float(model.score(X, y)),           **{n: float(c) for n, c in zip(["z_units","z_units_sq","z_units_promo","promo"], model.coef_)}}

We strengthen the pipeline with validation and modeling. The clean_sales_quality asset check enforces data integrity by verifying that there are no nulls, the promo field only has 0/1 values, and the cleaned units remain within valid bounds. After that, tiny_model_metrics trains a simple linear regression on the engineered features and outputs key metrics like training and learned coefficients, giving us a lightweight but complete modeling step within the Dagster workflow.

defs = Definitions(   assets=[raw_sales, clean_sales, features, tiny_model_metrics, clean_sales_quality],   resources={"io_manager": csv_io_manager})if __name__ == "__main__":   run_day = os.environ.get("RUN_DATE") or START   print("Materializing everything for:", run_day)   result = materialize(       [raw_sales, clean_sales, features, tiny_model_metrics, clean_sales_quality],       partition_key=run_day,       resources={"io_manager": csv_io_manager},   )   print("Run success:", result.success)   for fname in ["raw_sales.csv","clean_sales.csv","features.csv","tiny_model_metrics.json"]:       f = BASE / fname       if f.exists():           print(fname, "->", f.stat().st_size, "bytes")           if fname.endswith(".json"):               print("Metrics:", json.loads(f.read_text()))

We register our assets and the IO manager in Definitions, then materialize the entire DAG for a selected partition key in one run. We persist CSV/JSON artifacts to /content/dagstore and print a quick success flag, plus saved file sizes and model metrics for immediate verification.

In conclusion, we materialize all assets and checks in a single Dagster run, confirm data quality, and train a regression model whose metrics are stored for inspection. We keep the pipeline modular, with each asset producing and persisting its outputs in CSV or JSON, and ensure compatibility by explicitly converting metadata values to supported types. This tutorial demonstrates how we can combine partitioning, asset definitions, and checks to build a technically robust and reproducible workflow, giving us a practical framework to extend toward more complex real-world pipelines.


Check out the FULL CODES here. Feel free to check out our GitHub Page for Tutorials, Codes and Notebooks. Also, feel free to follow us on Twitter and don’t forget to join our 100k+ ML SubReddit and Subscribe to our Newsletter.

The post A Coding Guide to Build and Validate End-to-End Partitioned Data Pipelines in Dagster with Machine Learning Integration appeared first on MarkTechPost.

Fish AI Reader

Fish AI Reader

AI辅助创作,多种专业模板,深度分析,高质量内容生成。从观点提取到深度思考,FishAI为您提供全方位的创作支持。新版本引入自定义参数,让您的创作更加个性化和精准。

FishAI

FishAI

鱼阅,AI 时代的下一个智能信息助手,助你摆脱信息焦虑

联系邮箱 441953276@qq.com

相关标签

Dagster 数据流水线 数据工程 机器学习 数据质量 分区 Python
相关文章