V2EX 16小时前
[分享创造] LaPluma : 一个轻盈的 Go 数据流处理库
index_new5.html
../../../zaker_core/zaker_tpl_static/wap/tpl_guoji1.html

 

Lapluma是一个为Go语言设计的简洁、可组合的数据处理库,旨在解决标准库中Slice缺乏Filter等操作的问题。它提供了Iterator(串行数据流)和Pipe(并发数据流)两大核心组件。Iterator通过一系列基础操作(如Map、Filter、Reduce)实现数据序列的逐一访问和转换;Pipe则基于Go的channel,将数据处理流水线化,每个操作运行在独立goroutine中,并支持Context集成以实现超时和优雅退出。Lapluma还兼容Go 1.23+的iter包,并提供了两种错误处理模式:前置过滤和使用TryMap跳过失败元素,以确保数据流的健壮性。

✨ Lapluma的核心设计理念是提供一套简洁、可组合且易于理解的数据处理工具。它通过提供一组正交的基础操作,允许开发者将这些模块组合起来,构建满足需求的数据处理流水线,解决了Go标准库中Slice缺乏Filter等高级操作的问题。

🚀 Lapluma包含两个核心组件:Iterator和Pipe。Iterator用于串行数据流,提供如FromSlice、Map、Filter、Reduce等前向迭代器操作;Pipe则基于Go的channel构建并发数据流,每个操作都在独立的goroutine中运行,并与context.Context集成,支持超时控制和优雅退出,同时可以精细控制Map和Filter操作的并行度。

💡 Lapluma兼容Go 1.23+的iter包,允许开发者使用for-range语法遍历其迭代器,简化了数据消费的流程。例如,可以方便地对数据进行链式Map和Filter操作,然后直接通过for-range获取处理后的结果。

⚠️ Lapluma在错误处理上采取了独特的设计,将错误视为数据流的一部分。它推荐使用前置过滤(Pre-filtering)来剔除非法数据,或者使用TryMap函数处理可失败的转换,当handler返回错误时,该元素会被自动跳过,使得数据流水线在遭遇“数据级”错误时仍能保持运行。

最近在学习Go, 打算写点小项目来练手,实现的过程中发现需要在slice上执行Filter操作,但是标准库没有提供,像go-stream这些库提供的又是比较高级的抽象,所以就有了Lapluma这个库

仓库地址:lapluma

核心设计理念

Lapluma旨在提供一套简洁、可组合且易于理解的数据处理工具,通过提供一组正交的基础操作,开发者将这些模块进行组合,构建出满足需求的数据处理流水线

Lapluma提供了两个核心组件:IteratorPipe

1. Iterator - 串行数据流

Iterator 是一个前向迭代器接口,它定义了对数据序列的逐一访问。

主要操作:

示例:

// 创建迭代器data := []int{1, 2, 3, 4, 5}it := iterator.FromSlice(data)// 链式操作result := iterator.Collect(    iterator.Filter(        iterator.Map(it, func(x int) int { return x * 2 }),        func(x int) bool { return x > 5 }    )) // [6, 8, 10]

2. Pipe - 并发数据流

Pipe 基于 Go 的 channel 构建,每个操作(如 Map, Filter)都在一个独立的 goroutine 中运行,形成一条处理流水线。

所有的 Pipe 操作都与 context.Context 集成,可以轻松实现超时控制和优雅退出。

主要操作:

Pipe 提供的 Map 、Filter 、Reduce 等函数与 Iterator 版本功能类似,但它们在内部会启动 Goroutine 进行并发处理。可以为 Map 和 Filter 操作指定并行度和缓冲区大小,从而精细控制并发资源的利用。PS: 现在还每想好具体的并行控制参数,后续打算将并行控制参数用一个struct表示,现在的方案为临时方案

示例:

ctx := context.Background()// 创建并发管道p := pipe.FromSlice([]int{1, 2, 3, 4, 5}, ctx)// 并行处理( 3 个工作协程)result := pipe.Collect(    pipe.Filter(        pipe.Map(p, cpuIntensiveTask, 3), // 并行度 3        func(x int) bool { return x > 10 },        2, // 并行度 2    ))

标准迭代器集成

Pipe也实现了Iterator的接口,所以也算是一种迭代器,兼容 Go 1.23+ 的标准 iter 包, 可以直接通过 for-range 语法遍历

import "iter"// 兼容 Go 1.23+ 的 for-range 语法itForRange := iterator.Filter(   iterator.Map(iterator.FromSlice([]string{"1", "2", "3", "4"}), func(s string) int {     val, _ := strconv.Atoi(s)       return val * 3  }), func(x int) bool { return x < 10 },)fmt.Print("for-range 遍历结果: ")for data := range iterator.Iter(itForRange) {   fmt.Printf("%d ", data) // 输出: 3 6 9}fmt.Println()

错误处理

LaPluma 在设计上有意简化了核心转换函数的签名,例如 Maphandlerfunc(T) R 而不是 func(T) (R, error)。这并非忽略错误,而是一种设计选择:将错误视为数据流的一部分来处理

推荐以下两种模式来处理可能失败的操作:

模式一:前置过滤 (Pre-filtering)

如果某些数据从一开始就是非法的,或者不符合处理条件,应该在进入核心处理逻辑前,使用 Filter 将其剔除。

// 示例:只处理正数pipe := FromSlice([]int{1, -2, 3, -4}, ctx)positivePipe := Filter(pipe, func(n int) bool {    return n > 0})// ... 后续操作只会看到 {1, 3}

模式二:使用 TryMap 处理可失败的转换

当数据转换过程本身可能失败时(例如,解析字符串、调用外部 API ),使用 TryMap 函数。它的 handler 签名为 func(T) (R, error)。当 handler 返回一个非 nil 的 error 时,TryMap 会自动跳过(丢弃) 这个元素,并继续处理下一个。这使得流水线可以在遭遇“数据级”错误时保持运行,而不会被中断。

import (    "strconv"    "errors")// 示例:将字符串转换为整数,失败则跳过stringPipe := FromSlice([]string{"1", "two", "3", "four"}, ctx)// 使用 TryMap ,handler 返回 (int, error)intPipe := TryMap(stringPipe, func(s string) (int, error) {    i, err := strconv.Atoi(s)    if err != nil {        // 返回错误,这个元素将被丢弃        return 0, errors.New("not a number")    }    return i, nil})// 最终 Reduce 只会处理成功转换的 {1, 3}sum := Reduce(intPipe, func(acc, n int) int { return acc + n }, 0)// sum 的结果是 4

PS:若需要收集Map过程中的错误,可以考虑使用在util.goResult[T]作为返回值,要如何设计此场景的错误处理机制还没想好:通过在调用时添加一个onError参数来处理错误;或者返回两个Pipe,用其中一个来处理错误信息;或者其他方案

运行测试

go test ./...

后续计划

Fish AI Reader

Fish AI Reader

AI辅助创作,多种专业模板,深度分析,高质量内容生成。从观点提取到深度思考,FishAI为您提供全方位的创作支持。新版本引入自定义参数,让您的创作更加个性化和精准。

FishAI

FishAI

鱼阅,AI 时代的下一个智能信息助手,助你摆脱信息焦虑

联系邮箱 441953276@qq.com

相关标签

Go Lapluma 数据处理 Iterator Pipe
相关文章