为了账号安全,请及时绑定邮箱和手机立即绑定

在顺序执行之前等待通道中的 N 个项目

在顺序执行之前等待通道中的 N 个项目

Go
慕哥9229398 2023-05-08 15:49:57
所以我很新去!但是我对我想尝试的事情有这个想法。我想要一个从通道接受字符串的 go 例程,但只有在它收到 N 个字符串后才应该对它们执行。我四处寻找类似的问题或案例,但我只发现了一些想法是并行执行多个例程并等待汇总结果。我想到了创建一个数组并将其传递给长度足够的例程的想法。但是我想保持一定的关注点分离并在接收端控制它。我的问题是。这是出于某种原因的不良做法吗?有没有更好的方法来做到这一点,它是什么?func main() {    ch := make(chan string)    go func() {        tasks := []string{}        for {            tasks = append(tasks,<- ch)            if len(tasks) < 3 {                fmt.Println("Queue still to small")            }            if len(tasks) > 3 {                for i := 0; i < len(tasks); i++ {                    fmt.Println(tasks[i])                }            }        }    }()    ch <- "Msg 1"    time.Sleep(time.Second)    ch <- "Msg 2"    time.Sleep(time.Second)    ch <- "Msg 3"    time.Sleep(time.Second)    ch <- "Msg 4"    time.Sleep(time.Second)}编辑更简单更准确的例子。
查看完整描述

2 回答

?
慕娘9325324

TA贡献1783条经验 获得超4个赞

根据一些评论,您正在寻找的似乎是某种形式的批处理。

批处理有几种情况,当您想要获取批处理并将其一起发送时:

  1. 批量大小足够

  2. 已经过了足够的时间,应该冲洗部分批次

您给出的示例不考虑第二种情况。如果您只是因为停止加载而从不冲水,这可能会导致一些尴尬的行为。

因此,我建议要么查看库(例如,cloudfoundry/go-batching),要么简单地使用通道、计时器和选择语句。

package main


import (

    "fmt"

    "time"

)


func main() {

    ch := make(chan string)

    go func() {

        tasks := []string{}

        timer := time.NewTimer(time.Second) // Adjust this based on a reasonable user experience

        for {

            select {

            case <-timer.C:

                fmt.Println("Flush partial batch due to time")

                flush(tasks)

                tasks = nil

                timer.Reset(time.Second)

            case data := <-ch:

                tasks = append(tasks, data)


                // Reset the timer for each data point so that we only flush

                // partial batches when we stop receiving data.

                if !timer.Stop() {

                    <-timer.C

                }

                timer.Reset(time.Second)


                // Guard clause to for batch size

                if len(tasks) < 3 {

                    fmt.Println("Queue still too small")

                    continue

                }


                flush(tasks)

                tasks = nil // reset tasks

            }

        }

    }()


    ch <- "Msg 1"

    time.Sleep(time.Second)

    ch <- "Msg 2"

    time.Sleep(time.Second)

    ch <- "Msg 3"

    time.Sleep(time.Second)

    ch <- "Msg 4"

    time.Sleep(time.Second)

}


func flush(tasks []string) {

    // Guard against emtpy flushes

    if len(tasks) == 0 {

        return

    }


    fmt.Println("Flush")

    for _, t := range tasks {

        fmt.Println(t)

    }

}


查看完整回答
反对 回复 2023-05-08
?
萧十郎

TA贡献1815条经验 获得超12个赞

我可以看到批处理结果的东西是如何有用的。但它确实需要定制解决方案。有很多方法可以解决这个问题——我试过使用Sync.WaitGroup但它变得很乱。似乎使用 async.Mutex来锁定批处理功能是最好的方法。但是,当 mutex 是最好的答案时,imo 应该触发对设计的重新检查,因为 imo,它应该是最后一个选项。


package main


import (

    "context"

    "fmt"

    "sync"

    "sync/atomic"

)


func main() {


    ctx, canc := context.WithCancel(context.Background())

    acc := NewAccumulator(4, ctx)

    go func() {

        for i := 0; i < 10; i++ {

            acc.Write("hi")

        }

        canc()

    }()


    read := acc.ReadChan()

    for batch := range read {

        fmt.Println(batch)

    }

    fmt.Println("done")

}


type Accumulator struct {

    count    int64

    size     int

    in       chan string

    out      chan []string

    ctx      context.Context

    doneFlag int64

    mu   sync.Mutex

}


func NewAccumulator(size int, parentCtx context.Context) *Accumulator {

    a := &Accumulator{

        size: size,

        in:   make(chan string, size),

        out:  make(chan []string, 1),

        ctx:  parentCtx,

    }


    go func() {

        <-a.ctx.Done()

        atomic.AddInt64(&a.doneFlag, 1)

        close(a.in)

        a.mu.Lock()

        a.batch()

        a.mu.Unlock()

        close(a.out)

    }()

    return a

}


func (a *Accumulator) Write(s string) {

    if atomic.LoadInt64(&a.doneFlag) > 0 {

        panic("write to closed accumulator")

    }

    a.in <- s

    atomic.AddInt64(&a.count, 1)

    a.mu.Lock()

    if atomic.LoadInt64(&a.count) == int64(a.size) {

        a.batch()

    }

    a.mu.Unlock()

}


func (a *Accumulator) batch() {

    batch := make([]string, 0)

    for i := 0; i < a.size; i++ {

        msg := <-a.in

        if msg != "" {

            batch = append(batch, msg)

        }

    }

    fmt.Println("batching", batch)

    a.out <- batch

    atomic.StoreInt64(&a.count, 0)

}


func (a *Accumulator) ReadChan() <-chan []string {

    return a.out

}

最好只拥有一个累积字符串的切片,当该切片达到一定大小时,然后开始一些处理。


查看完整回答
反对 回复 2023-05-08
  • 2 回答
  • 0 关注
  • 82 浏览
慕课专栏
更多

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信