V2EX 07月25日 11:50
[Go 编程语言] 看到公司其他同学写的 go 批量处理代码,这风骚的感觉像在平行世界一样,还能这么玩?
index_new5.html
../../../zaker_core/zaker_tpl_static/wap/tpl_guoji1.html

 

该脚本展示了如何使用Go语言及其相关的库来高效地读取和处理Excel文件。通过封装的`lineopt`和`util`包,代码实现了从Excel文件中迭代读取数据的功能,并利用`util.TaskConsumer`实现了消费者模式,可以并行处理读取到的数据。代码中特别提到了在处理Excel数据时跳过首行(表头)的逻辑,并通过`SetP`和`SetC`方法分别设置数据源和处理逻辑,最后调用`Run`方法启动整个处理流程。这种模式适用于需要批量处理Excel数据的场景,能够提高处理效率和代码的可读性。

🗂️ **Excel数据迭代读取**: `lineopt`包提供了`IterExcel2`函数,用于从指定的Excel文件路径中迭代读取数据。它能返回每一行的数据,并包含行号,方便在处理过程中进行定位和引用。

⚙️ **消费者模式处理**: `util.TaskConsumer`封装了生产者-消费者模式。它允许指定多个消费者(`consumerNumber`)并发处理从Excel读取的数据,通过`SetC`方法传入具体的行数据处理函数,`SetP`方法传入Excel数据源,实现高效的数据消费。

🚫 **跳过表头处理**: 在`SetC`函数中,通过`if index == 1 { return }`的逻辑,可以有效地跳过Excel文件的第一行,通常是表头,确保只处理实际的数据行,避免了不必要的错误。

🚀 **并行处理能力**: 通过`util.StartTogether`和`ChannelConsume2`等函数,脚本能够实现多协程并行处理Excel数据,大大提高了处理速度,尤其是在数据量较大的情况下。

🧩 **模块化与复用**: `lineopt`和`util`包的封装使得Excel处理和并发消费的逻辑可以被复用,提高了代码的模块化程度和可维护性。

这里是一些脚本调用的地方,工具源码放在后面两个代码块了。

util.TaskConsumer[[]string](10).      SetP(lineopt.IterExcel2("xxx.xlsx")).       SetC(func(index int, row []string) (err error) {            if index == 1 {             return          }           // .....            // 这里是逻辑处理函数            return      }).     Run()

这是两个封装的函数的源码。

package lineoptimport (    "bufio" "fmt"   "github.com/xuri/excelize/v2"   "iter"  "log/slog"  "os")func IterLine2(filePath string) iter.Seq2[int, string] {   return func(yield func(int, string) bool) {     f, errF := os.OpenFile(filePath, os.O_RDONLY, 0666)     if errF != nil {            return      }       defer func(f *os.File) {            err := f.Close()            if err != nil {             fmt.Println(err)            }       }(f)        scanner := bufio.NewScanner(f)      index := 1      for scanner.Scan() {            line := scanner.Text()          if !yield(index, line) {                return          }           index += 1      }   }}func IterLine(filePath string) iter.Seq[string] { return func(yield func(string) bool) {      for _, item := range IterLine2(filePath) {          if !yield(item) {               return          }       }   }}func MapIterExcel2(config ExcelTarget) iter.Seq2[int, []string] { return func(yield func(int, []string) bool) {       f, err := excelize.OpenFile(config.FilePath)        if err != nil {         slog.Error(err.Error())         return      }       defer f.Close()     targetSheet := config.TargetSheet       if targetSheet == "" {          targetSheet = f.GetSheetName(0)     }       rows, err := f.Rows(targetSheet)        if err != nil {         slog.Error(err.Error())         return      }       index := 1      for rows.Next() {           row, err := rows.Columns()          if err != nil {             slog.Error(err.Error())             return          }           if !yield(index, row) {             return          }           index += 1      }       return  }}func MapIterExcel(config ExcelTarget) iter.Seq[[]string] {    return func(yield func([]string) bool) {        for _, value := range MapIterExcel2(config) {           if !yield(value) {              return          }       }   }}func IterExcel2(filePath string) iter.Seq2[int, []string] {   return func(yield func(int, []string) bool) {       for index, value := range MapIterExcel2(ExcelTarget{FilePath: filePath}) {          if !yield(index, value) {               return          }       }   }}func IterExcel(filePath string) iter.Seq[[]string] {  return func(yield func([]string) bool) {        for _, value := range MapIterExcel2(ExcelTarget{FilePath: filePath}) {          if !yield(value) {              return          }       }   }}func IterExcelSheet2(filePath string, sheetName string) iter.Seq2[int, []string] {    return func(yield func(int, []string) bool) {       for index, value := range MapIterExcel2(ExcelTarget{            FilePath:    filePath,          TargetSheet: sheetName,     }) {            if !yield(index, value) {               return          }       }   }}func IterExcelSheet(filePath string, sheetName string) iter.Seq[[]string] {   return func(yield func([]string) bool) {        for _, value := range MapIterExcel2(ExcelTarget{            FilePath:    filePath,          TargetSheet: sheetName,     }) {            if !yield(value) {              return          }       }   }}
package utilimport (  "dt/app/util/lineopt"   "errors"    "iter"  "sync")func ChannelConsume[d any](queue chan d, job func(item d), number ...int) *sync.WaitGroup {  counter := 10   if len(number) == 1 && number[0] > 0 {       counter = number[0] }   return StartTogether(func() {       for item := range queue {           job(item)       }   }, counter)}// Together 并行执行func Together(job func(), counter int) {    var wg sync.WaitGroup   for i := 1; i <= counter; i++ {      wg.Add(1)       go func() {         defer wg.Done()         job()       }() }   wg.Wait()}func StartTogether(job func(), counter int) *sync.WaitGroup { var wg sync.WaitGroup   for i := 1; i <= counter; i++ {      wg.Add(1)       go func() {         defer wg.Done()         job()       }() }   return &wg}type chanData[d any] struct {    index int   data  d}func ChannelConsume2[d any](queue chan chanData[d], job func(index int, item d) (err error), number ...int) *sync.WaitGroup {   counter := 10   if len(number) == 1 && number[0] > 0 {       counter = number[0] }   return StartTogether(func() {       for item := range queue {           err := job(item.index, item.data)           if errors.Is(err, lineopt.Stop) {               // 目前不可以直接停止,会导致消费者阻塞掉              //return            }       }   }, counter)}type ProducerConsumer[T any] struct {   consumerNumber int  queue          chan chanData[T] p              iter.Seq2[int, T]    c              func(index int, item T) (err error)  once           sync.Once}func (itself *ProducerConsumer[T]) SetC(c func(index int, item T) (err error)) *ProducerConsumer[T] {  itself.c = c    return itself}func (itself *ProducerConsumer[T]) SetP(p iter.Seq2[int, T]) *ProducerConsumer[T] {   itself.p = p    return itself}// 生产者消费者都有可能发生阻塞,// 生产者阻塞的原因是因为 queue 容量不够了// 消费者阻塞的原因的是因为 queue 没有 close// 生产者只需要实现即可func (itself *ProducerConsumer[T]) do() {  task := ChannelConsume2(itself.queue, func(index int, item T) (err error) {     return itself.c(index, item)    }, itself.consumerNumber)   defer task.Wait()   defer close(itself.queue)   for index, v := range itself.p {        select {        case itself.queue <- chanData[T]{            index,          v,      }:          break           // 需要一个可以知道提前截止的操作      }   }}func (itself *ProducerConsumer[T]) Run() {    itself.once.Do(func() {     itself.do() })}func TaskConsumer[T any](consumerNumber ...int) *ProducerConsumer[T] {   n := 1  if len(consumerNumber) > 0 {     n = consumerNumber[0]   }   return &ProducerConsumer[T]{        queue:          make(chan chanData[T], n),      consumerNumber: n,  }}

Fish AI Reader

Fish AI Reader

AI辅助创作,多种专业模板,深度分析,高质量内容生成。从观点提取到深度思考,FishAI为您提供全方位的创作支持。新版本引入自定义参数,让您的创作更加个性化和精准。

FishAI

FishAI

鱼阅,AI 时代的下一个智能信息助手,助你摆脱信息焦虑

联系邮箱 441953276@qq.com

相关标签

Go语言 Excel处理 生产者消费者模式 并行处理 脚本
相关文章