这里是一些脚本调用的地方,工具源码放在后面两个代码块了。
util.TaskConsumer[[]string](10). SetP(lineopt.IterExcel2("xxx.xlsx")). SetC(func(index int, row []string) (err error) { if index == 1 { return } // ..... // 这里是逻辑处理函数 return }). Run()
这是两个封装的函数的源码。
package lineoptimport ( "bufio" "fmt" "github.com/xuri/excelize/v2" "iter" "log/slog" "os")func IterLine2(filePath string) iter.Seq2[int, string] { return func(yield func(int, string) bool) { f, errF := os.OpenFile(filePath, os.O_RDONLY, 0666) if errF != nil { return } defer func(f *os.File) { err := f.Close() if err != nil { fmt.Println(err) } }(f) scanner := bufio.NewScanner(f) index := 1 for scanner.Scan() { line := scanner.Text() if !yield(index, line) { return } index += 1 } }}func IterLine(filePath string) iter.Seq[string] { return func(yield func(string) bool) { for _, item := range IterLine2(filePath) { if !yield(item) { return } } }}func MapIterExcel2(config ExcelTarget) iter.Seq2[int, []string] { return func(yield func(int, []string) bool) { f, err := excelize.OpenFile(config.FilePath) if err != nil { slog.Error(err.Error()) return } defer f.Close() targetSheet := config.TargetSheet if targetSheet == "" { targetSheet = f.GetSheetName(0) } rows, err := f.Rows(targetSheet) if err != nil { slog.Error(err.Error()) return } index := 1 for rows.Next() { row, err := rows.Columns() if err != nil { slog.Error(err.Error()) return } if !yield(index, row) { return } index += 1 } return }}func MapIterExcel(config ExcelTarget) iter.Seq[[]string] { return func(yield func([]string) bool) { for _, value := range MapIterExcel2(config) { if !yield(value) { return } } }}func IterExcel2(filePath string) iter.Seq2[int, []string] { return func(yield func(int, []string) bool) { for index, value := range MapIterExcel2(ExcelTarget{FilePath: filePath}) { if !yield(index, value) { return } } }}func IterExcel(filePath string) iter.Seq[[]string] { return func(yield func([]string) bool) { for _, value := range MapIterExcel2(ExcelTarget{FilePath: filePath}) { if !yield(value) { return } } }}func IterExcelSheet2(filePath string, sheetName string) iter.Seq2[int, []string] { return func(yield func(int, []string) bool) { for index, value := range MapIterExcel2(ExcelTarget{ FilePath: filePath, TargetSheet: sheetName, }) { if !yield(index, value) { return } } }}func IterExcelSheet(filePath string, sheetName string) iter.Seq[[]string] { return func(yield func([]string) bool) { for _, value := range MapIterExcel2(ExcelTarget{ FilePath: filePath, TargetSheet: sheetName, }) { if !yield(value) { return } } }}
package utilimport ( "dt/app/util/lineopt" "errors" "iter" "sync")func ChannelConsume[d any](queue chan d, job func(item d), number ...int) *sync.WaitGroup { counter := 10 if len(number) == 1 && number[0] > 0 { counter = number[0] } return StartTogether(func() { for item := range queue { job(item) } }, counter)}// Together 并行执行func Together(job func(), counter int) { var wg sync.WaitGroup for i := 1; i <= counter; i++ { wg.Add(1) go func() { defer wg.Done() job() }() } wg.Wait()}func StartTogether(job func(), counter int) *sync.WaitGroup { var wg sync.WaitGroup for i := 1; i <= counter; i++ { wg.Add(1) go func() { defer wg.Done() job() }() } return &wg}type chanData[d any] struct { index int data d}func ChannelConsume2[d any](queue chan chanData[d], job func(index int, item d) (err error), number ...int) *sync.WaitGroup { counter := 10 if len(number) == 1 && number[0] > 0 { counter = number[0] } return StartTogether(func() { for item := range queue { err := job(item.index, item.data) if errors.Is(err, lineopt.Stop) { // 目前不可以直接停止,会导致消费者阻塞掉 //return } } }, counter)}type ProducerConsumer[T any] struct { consumerNumber int queue chan chanData[T] p iter.Seq2[int, T] c func(index int, item T) (err error) once sync.Once}func (itself *ProducerConsumer[T]) SetC(c func(index int, item T) (err error)) *ProducerConsumer[T] { itself.c = c return itself}func (itself *ProducerConsumer[T]) SetP(p iter.Seq2[int, T]) *ProducerConsumer[T] { itself.p = p return itself}// 生产者消费者都有可能发生阻塞,// 生产者阻塞的原因是因为 queue 容量不够了// 消费者阻塞的原因的是因为 queue 没有 close// 生产者只需要实现即可func (itself *ProducerConsumer[T]) do() { task := ChannelConsume2(itself.queue, func(index int, item T) (err error) { return itself.c(index, item) }, itself.consumerNumber) defer task.Wait() defer close(itself.queue) for index, v := range itself.p { select { case itself.queue <- chanData[T]{ index, v, }: break // 需要一个可以知道提前截止的操作 } }}func (itself *ProducerConsumer[T]) Run() { itself.once.Do(func() { itself.do() })}func TaskConsumer[T any](consumerNumber ...int) *ProducerConsumer[T] { n := 1 if len(consumerNumber) > 0 { n = consumerNumber[0] } return &ProducerConsumer[T]{ queue: make(chan chanData[T], n), consumerNumber: n, }}