为了账号安全,请及时绑定邮箱和手机立即绑定

具有缓冲作业和固定轮询间隔的工作池

具有缓冲作业和固定轮询间隔的工作池

Go
呼唤远方 2022-05-23 15:59:03
我有一个工作人员池在工作频道上监听,并在结果频道上做出响应。作业生产者必须以固定的代码间隔运行。在读取足够的新作业以填满缓冲区之前,必须刷新结果。批量刷新结果和读取新作业至关重要。请参阅下面的示例代码,在这里的操场上运行它。是否可以在没有原子计数器的情况下重写它来跟踪飞行作业?
查看完整描述

1 回答

?
开满天机

TA贡献1786条经验 获得超13个赞

这是您的代码的基于通道的版本,在功能上等同于上面示例的意图。关键点是我们没有使用任何原子值来改变代码的逻辑,因为这不提供 goroutine 之间的同步。goroutine 之间的所有交互都使用通道sync.WaitGroup、 或同步context.Context。可能有更好的方法来解决手头的问题,但这表明没有必要的原子来协调队列和工作人员。


这里唯一在 goroutines 之间仍然不协调的值是len(jobs)在日志输出中的使用。使用它是否有意义取决于你,因为它的值在并发世界中是没有意义的,但它是安全的,因为它是为并发使用而同步的,并且没有基于该值的逻辑。


buf := 5

workers := 3

jobs := make(chan int, buf)


// results buffer must always be larger than workers + buf to prevent deadlock

results := make(chan int, buf*2)


ctx, cancel := context.WithCancel(context.Background())

defer cancel()


// Start workers

var wg sync.WaitGroup

for n := 0; n < workers; n++ {

    wg.Add(1)

    go func(n int) {

        defer wg.Done()

        for jobID := range jobs {

            fmt.Printf("worker %v processing %v - %v jobs left\n", n, jobID, len(jobs))

            time.Sleep(time.Duration(rand.Intn(5)) * pollInterval)

            results <- jobID

        }

        fmt.Printf("worker %v exited", n)

    }(n)

}


var done sync.WaitGroup

done.Add(1)

go func() {

    defer done.Done()

    ticker := time.NewTicker(pollInterval)

    r := make([]string, 0)


    flushResults := func() {

        fmt.Printf("===> results: %v\n", strings.Join(r, ","))

        r = r[:0]

    }


    for {

        select {

        case <-ticker.C:

            flushResults()


            // send max buf jobs, or fill the queue

            for i := 0; i < buf; i++ {

                jobID++

                select {

                case jobs <- jobID:

                    continue

                }

                break

            }

            fmt.Printf("===> send %v jobs\n", i)


        case jobID := <-results:

            r = append(r, fmt.Sprintf("%v", jobID))


        case <-ctx.Done():

            // Close jobs channel to stop workers

            close(jobs)

            // Wait for workers to exit

            wg.Wait()


            // we can close results for easy iteration because we know

            // there are no more workers.

            close(results)

            // Flush remaining results

            for jobID := range results {

                r = append(r, fmt.Sprintf("%v", jobID))

            }

            flushResults()

            return

        }

    }

}()


查看完整回答
反对 回复 2022-05-23
  • 1 回答
  • 0 关注
  • 169 浏览
慕课专栏
更多

添加回答

举报

0/150
提交
取消
微信客服

购课补贴
联系客服咨询优惠详情

帮助反馈 APP下载

慕课网APP
您的移动学习伙伴

公众号

扫描二维码
关注慕课网微信公众号