V2EX 07月25日 11:29
[Go 编程语言] 看到公司其他同学写的 go 批量处理代码,这风骚的感觉像在平行世界一样,还能这么玩?
index_new5.html
../../../zaker_core/zaker_tpl_static/wap/tpl_guoji1.html

 

该脚本展示了如何使用Go语言处理Excel文件中的数据。通过封装的`lineopt`库,可以便捷地读取Excel文件的行数据,并结合`util`包提供的生产者-消费者模式,实现高效的数据并行处理。脚本的核心在于`TaskConsumer`,它允许用户自定义数据处理逻辑,并指定消费者数量以优化性能。示例代码展示了如何设置Excel文件路径、定义处理函数,并启动多消费者并行消费Excel数据,处理逻辑包括跳过首行,体现了其灵活性和实用性。

✨ **Excel数据读取便捷化**:`lineopt`包提供了`IterExcel2`等函数,能够以迭代器(`iter.Seq2`)的形式逐行读取Excel文件,并返回行索引和行数据(`[]string`),极大地简化了Excel文件的访问过程,无需手动解析。

🚀 **生产者-消费者模式实现**:`util`包中的`TaskConsumer`利用Go语言的Goroutine和Channel,构建了一个高效的生产者-消费者模型。生产者负责读取Excel数据并发送到Channel,多个消费者Goroutine则并行从Channel获取数据进行处理,有效提升了数据处理的吞吐量。

⚙️ **灵活的消费者配置与处理逻辑**:用户可以通过`TaskConsumer`的`consumerNumber`参数指定并行处理的消费者数量,根据实际需求调整并行度。同时,`SetC`方法允许用户传入自定义的消费函数,该函数接收行索引和行数据,可以实现各种复杂的数据校验、转换或业务逻辑处理。

🚫 **跳过首行优化**:在提供的示例脚本中,通过在消费函数中添加`if index == 1 { return }`的逻辑,实现了对Excel文件首行的跳过处理,这在很多场景下(如Excel文件包含表头)非常实用,避免了将表头信息作为数据进行处理。

🔄 **错误处理与流程控制**:虽然示例代码中未详细展示,但`ChannelConsume2`函数可以接收一个返回`error`的消费函数,并能处理特定的错误类型(如`lineopt.Stop`),为实际应用中的错误捕获和流程控制提供了基础。

这里是一些脚本调用的地方,工具源码放在后面两个代码块了。

util.TaskConsumer[[]string](10).      SetP(lineopt.IterExcel2("xxx.xlsx")).       SetC(func(index int, row []string) (err error) {            if index == 1 {             return          }           // .....            // 这里是逻辑处理函数            return      }).     Run()

这是两个封装的函数的源码。

package lineoptimport (    "bufio" "fmt"   "github.com/xuri/excelize/v2"   "iter"  "log/slog"  "os")func IterLine2(filePath string) iter.Seq2[int, string] {   return func(yield func(int, string) bool) {     f, errF := os.OpenFile(filePath, os.O_RDONLY, 0666)     if errF != nil {            return      }       defer func(f *os.File) {            err := f.Close()            if err != nil {             fmt.Println(err)            }       }(f)        scanner := bufio.NewScanner(f)      index := 1      for scanner.Scan() {            line := scanner.Text()          if !yield(index, line) {                return          }           index += 1      }   }}func IterLine(filePath string) iter.Seq[string] { return func(yield func(string) bool) {      for _, item := range IterLine2(filePath) {          if !yield(item) {               return          }       }   }}func MapIterExcel2(config ExcelTarget) iter.Seq2[int, []string] { return func(yield func(int, []string) bool) {       f, err := excelize.OpenFile(config.FilePath)        if err != nil {         slog.Error(err.Error())         return      }       defer f.Close()     targetSheet := config.TargetSheet       if targetSheet == "" {          targetSheet = f.GetSheetName(0)     }       rows, err := f.Rows(targetSheet)        if err != nil {         slog.Error(err.Error())         return      }       index := 1      for rows.Next() {           row, err := rows.Columns()          if err != nil {             slog.Error(err.Error())             return          }           if !yield(index, row) {             return          }           index += 1      }       return  }}func MapIterExcel(config ExcelTarget) iter.Seq[[]string] {    return func(yield func([]string) bool) {        for _, value := range MapIterExcel2(config) {           if !yield(value) {              return          }       }   }}func IterExcel2(filePath string) iter.Seq2[int, []string] {   return func(yield func(int, []string) bool) {       for index, value := range MapIterExcel2(ExcelTarget{FilePath: filePath}) {          if !yield(index, value) {               return          }       }   }}func IterExcel(filePath string) iter.Seq[[]string] {  return func(yield func([]string) bool) {        for _, value := range MapIterExcel2(ExcelTarget{FilePath: filePath}) {          if !yield(value) {              return          }       }   }}func IterExcelSheet2(filePath string, sheetName string) iter.Seq2[int, []string] {    return func(yield func(int, []string) bool) {       for index, value := range MapIterExcel2(ExcelTarget{            FilePath:    filePath,          TargetSheet: sheetName,     }) {            if !yield(index, value) {               return          }       }   }}func IterExcelSheet(filePath string, sheetName string) iter.Seq[[]string] {   return func(yield func([]string) bool) {        for _, value := range MapIterExcel2(ExcelTarget{            FilePath:    filePath,          TargetSheet: sheetName,     }) {            if !yield(value) {              return          }       }   }}
package utilimport (  "dt/app/util/lineopt"   "errors"    "iter"  "sync")func ChannelConsume[d any](queue chan d, job func(item d), number ...int) *sync.WaitGroup {  counter := 10   if len(number) == 1 && number[0] > 0 {       counter = number[0] }   return StartTogether(func() {       for item := range queue {           job(item)       }   }, counter)}// Together 并行执行func Together(job func(), counter int) {    var wg sync.WaitGroup   for i := 1; i <= counter; i++ {      wg.Add(1)       go func() {         defer wg.Done()         job()       }() }   wg.Wait()}func StartTogether(job func(), counter int) *sync.WaitGroup { var wg sync.WaitGroup   for i := 1; i <= counter; i++ {      wg.Add(1)       go func() {         defer wg.Done()         job()       }() }   return &wg}type chanData[d any] struct {    index int   data  d}func ChannelConsume2[d any](queue chan chanData[d], job func(index int, item d) (err error), number ...int) *sync.WaitGroup {   counter := 10   if len(number) == 1 && number[0] > 0 {       counter = number[0] }   return StartTogether(func() {       for item := range queue {           err := job(item.index, item.data)           if errors.Is(err, lineopt.Stop) {               // 目前不可以直接停止,会导致消费者阻塞掉              //return            }       }   }, counter)}type ProducerConsumer[T any] struct {   consumerNumber int  queue          chan chanData[T] p              iter.Seq2[int, T]    c              func(index int, item T) (err error)  once           sync.Once}func (itself *ProducerConsumer[T]) SetC(c func(index int, item T) (err error)) *ProducerConsumer[T] {  itself.c = c    return itself}func (itself *ProducerConsumer[T]) SetP(p iter.Seq2[int, T]) *ProducerConsumer[T] {   itself.p = p    return itself}// 生产者消费者都有可能发生阻塞,// 生产者阻塞的原因是因为 queue 容量不够了// 消费者阻塞的原因的是因为 queue 没有 close// 生产者只需要实现即可func (itself *ProducerConsumer[T]) do() {  task := ChannelConsume2(itself.queue, func(index int, item T) (err error) {     return itself.c(index, item)    }, itself.consumerNumber)   defer task.Wait()   defer close(itself.queue)   for index, v := range itself.p {        select {        case itself.queue <- chanData[T]{            index,          v,      }:          break           // 需要一个可以知道提前截止的操作      }   }}func (itself *ProducerConsumer[T]) Run() {    itself.once.Do(func() {     itself.do() })}func TaskConsumer[T any](consumerNumber ...int) *ProducerConsumer[T] {   n := 1  if len(consumerNumber) > 0 {     n = consumerNumber[0]   }   return &ProducerConsumer[T]{        queue:          make(chan chanData[T], n),      consumerNumber: n,  }}

Fish AI Reader

Fish AI Reader

AI辅助创作,多种专业模板,深度分析,高质量内容生成。从观点提取到深度思考,FishAI为您提供全方位的创作支持。新版本引入自定义参数,让您的创作更加个性化和精准。

FishAI

FishAI

鱼阅,AI 时代的下一个智能信息助手,助你摆脱信息焦虑

联系邮箱 441953276@qq.com

相关标签

Go语言 Excel处理 生产者消费者模式 并行处理 脚本开发
相关文章