V2EX 07月25日 13:24
[Go 编程语言] 看到公司其他同学写的 go 批量处理代码,这风骚的感觉像在平行世界一样,还能这么玩?
index_new5.html
../../../zaker_core/zaker_tpl_static/wap/tpl_guoji1.html

 

该脚本展示了如何使用Go语言的lineopt库来读取Excel文件并进行数据处理。通过TaskConsumer工具,可以高效地并行消费Excel数据。代码中定义了多种迭代器函数,如IterLine2、MapIterExcel2等,方便从文件或Excel中提取数据。同时,也提供了ChannelConsume2函数,用于启动多个消费者协程,并行处理数据,并支持在处理过程中捕获错误。这种模式能够有效提升数据处理的效率和并发能力,特别适用于需要处理大量Excel数据的场景。

💡 **Excel数据读取与迭代:** 脚本的核心功能之一是利用lineopt库提供的函数(如IterExcel2)来读取Excel文件。这些函数能够生成数据迭代器,逐行或逐单元格地提取Excel中的数据,并支持指定工作表,极大地简化了Excel文件的访问过程。

🚀 **并行消费与任务处理:** 通过util.TaskConsumer工具,脚本能够启动指定数量的消费者协程,对Excel数据进行并行处理。这利用了Go语言的并发特性,将数据消费任务分散到多个协程中,显著提高了处理速度,尤其适合处理大型数据集。

🔄 **数据处理流程定制:** 脚本允许用户通过SetC方法自定义数据处理逻辑。这里的`func(index int, row []string) (err error)`是一个回调函数,它会在每个Excel行被消费时被调用,用户可以在此函数中实现具体的业务逻辑,例如数据校验、转换或存储。

⚠️ **异常处理与控制流:** 脚本在设计中考虑了异常处理,例如在打开文件或读取数据时可能出现的错误。虽然文中提到的`errors.Is(err, lineopt.Stop)`暗示了对特定错误(如停止信号)的处理,但实际的控制流(如如何优雅地停止消费者)需要进一步的实现细节。

这里是一些脚本调用的地方,工具源码放在后面两个代码块了。

util.TaskConsumer[[]string](10).      SetP(lineopt.IterExcel2("xxx.xlsx")).       SetC(func(index int, row []string) (err error) {            if index == 1 {             return          }           // .....            // 这里是逻辑处理函数            return      }).     Run()

这是两个封装的函数的源码。

package lineoptimport (    "bufio" "fmt"   "github.com/xuri/excelize/v2"   "iter"  "log/slog"  "os")func IterLine2(filePath string) iter.Seq2[int, string] {   return func(yield func(int, string) bool) {     f, errF := os.OpenFile(filePath, os.O_RDONLY, 0666)     if errF != nil {            return      }       defer func(f *os.File) {            err := f.Close()            if err != nil {             fmt.Println(err)            }       }(f)        scanner := bufio.NewScanner(f)      index := 1      for scanner.Scan() {            line := scanner.Text()          if !yield(index, line) {                return          }           index += 1      }   }}func IterLine(filePath string) iter.Seq[string] { return func(yield func(string) bool) {      for _, item := range IterLine2(filePath) {          if !yield(item) {               return          }       }   }}func MapIterExcel2(config ExcelTarget) iter.Seq2[int, []string] { return func(yield func(int, []string) bool) {       f, err := excelize.OpenFile(config.FilePath)        if err != nil {         slog.Error(err.Error())         return      }       defer f.Close()     targetSheet := config.TargetSheet       if targetSheet == "" {          targetSheet = f.GetSheetName(0)     }       rows, err := f.Rows(targetSheet)        if err != nil {         slog.Error(err.Error())         return      }       index := 1      for rows.Next() {           row, err := rows.Columns()          if err != nil {             slog.Error(err.Error())             return          }           if !yield(index, row) {             return          }           index += 1      }       return  }}func MapIterExcel(config ExcelTarget) iter.Seq[[]string] {    return func(yield func([]string) bool) {        for _, value := range MapIterExcel2(config) {           if !yield(value) {              return          }       }   }}func IterExcel2(filePath string) iter.Seq2[int, []string] {   return func(yield func(int, []string) bool) {       for index, value := range MapIterExcel2(ExcelTarget{FilePath: filePath}) {          if !yield(index, value) {               return          }       }   }}func IterExcel(filePath string) iter.Seq[[]string] {  return func(yield func([]string) bool) {        for _, value := range MapIterExcel2(ExcelTarget{FilePath: filePath}) {          if !yield(value) {              return          }       }   }}func IterExcelSheet2(filePath string, sheetName string) iter.Seq2[int, []string] {    return func(yield func(int, []string) bool) {       for index, value := range MapIterExcel2(ExcelTarget{            FilePath:    filePath,          TargetSheet: sheetName,     }) {            if !yield(index, value) {               return          }       }   }}func IterExcelSheet(filePath string, sheetName string) iter.Seq[[]string] {   return func(yield func([]string) bool) {        for _, value := range MapIterExcel2(ExcelTarget{            FilePath:    filePath,          TargetSheet: sheetName,     }) {            if !yield(value) {              return          }       }   }}
package utilimport (  "dt/app/util/lineopt"   "errors"    "iter"  "sync")func ChannelConsume[d any](queue chan d, job func(item d), number ...int) *sync.WaitGroup {  counter := 10   if len(number) == 1 && number[0] > 0 {       counter = number[0] }   return StartTogether(func() {       for item := range queue {           job(item)       }   }, counter)}// Together 并行执行func Together(job func(), counter int) {    var wg sync.WaitGroup   for i := 1; i <= counter; i++ {      wg.Add(1)       go func() {         defer wg.Done()         job()       }() }   wg.Wait()}func StartTogether(job func(), counter int) *sync.WaitGroup { var wg sync.WaitGroup   for i := 1; i <= counter; i++ {      wg.Add(1)       go func() {         defer wg.Done()         job()       }() }   return &wg}type chanData[d any] struct {    index int   data  d}func ChannelConsume2[d any](queue chan chanData[d], job func(index int, item d) (err error), number ...int) *sync.WaitGroup {   counter := 10   if len(number) == 1 && number[0] > 0 {       counter = number[0] }   return StartTogether(func() {       for item := range queue {           err := job(item.index, item.data)           if errors.Is(err, lineopt.Stop) {               // 目前不可以直接停止,会导致消费者阻塞掉              //return            }       }   }, counter)}type ProducerConsumer[T any] struct {   consumerNumber int  queue          chan chanData[T] p              iter.Seq2[int, T]    c              func(index int, item T) (err error)  once           sync.Once}func (itself *ProducerConsumer[T]) SetC(c func(index int, item T) (err error)) *ProducerConsumer[T] {  itself.c = c    return itself}func (itself *ProducerConsumer[T]) SetP(p iter.Seq2[int, T]) *ProducerConsumer[T] {   itself.p = p    return itself}// 生产者消费者都有可能发生阻塞,// 生产者阻塞的原因是因为 queue 容量不够了// 消费者阻塞的原因的是因为 queue 没有 close// 生产者只需要实现即可func (itself *ProducerConsumer[T]) do() {  task := ChannelConsume2(itself.queue, func(index int, item T) (err error) {     return itself.c(index, item)    }, itself.consumerNumber)   defer task.Wait()   defer close(itself.queue)   for index, v := range itself.p {        select {        case itself.queue <- chanData[T]{            index,          v,      }:          break           // 需要一个可以知道提前截止的操作      }   }}func (itself *ProducerConsumer[T]) Run() {    itself.once.Do(func() {     itself.do() })}func TaskConsumer[T any](consumerNumber ...int) *ProducerConsumer[T] {   n := 1  if len(consumerNumber) > 0 {     n = consumerNumber[0]   }   return &ProducerConsumer[T]{        queue:          make(chan chanData[T], n),      consumerNumber: n,  }}

Fish AI Reader

Fish AI Reader

AI辅助创作,多种专业模板,深度分析,高质量内容生成。从观点提取到深度思考,FishAI为您提供全方位的创作支持。新版本引入自定义参数,让您的创作更加个性化和精准。

FishAI

FishAI

鱼阅,AI 时代的下一个智能信息助手,助你摆脱信息焦虑

联系邮箱 441953276@qq.com

相关标签

Go语言 Excel处理 并发编程 数据处理 脚本工具
相关文章