1 回答
TA贡献1786条经验 获得超13个赞
这是您的代码的基于通道的版本,在功能上等同于上面示例的意图。关键点是我们没有使用任何原子值来改变代码的逻辑,因为这不提供 goroutine 之间的同步。goroutine 之间的所有交互都使用通道sync.WaitGroup、 或同步context.Context。可能有更好的方法来解决手头的问题,但这表明没有必要的原子来协调队列和工作人员。
这里唯一在 goroutines 之间仍然不协调的值是len(jobs)在日志输出中的使用。使用它是否有意义取决于你,因为它的值在并发世界中是没有意义的,但它是安全的,因为它是为并发使用而同步的,并且没有基于该值的逻辑。
buf := 5
workers := 3
jobs := make(chan int, buf)
// results buffer must always be larger than workers + buf to prevent deadlock
results := make(chan int, buf*2)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Start workers
var wg sync.WaitGroup
for n := 0; n < workers; n++ {
wg.Add(1)
go func(n int) {
defer wg.Done()
for jobID := range jobs {
fmt.Printf("worker %v processing %v - %v jobs left\n", n, jobID, len(jobs))
time.Sleep(time.Duration(rand.Intn(5)) * pollInterval)
results <- jobID
}
fmt.Printf("worker %v exited", n)
}(n)
}
var done sync.WaitGroup
done.Add(1)
go func() {
defer done.Done()
ticker := time.NewTicker(pollInterval)
r := make([]string, 0)
flushResults := func() {
fmt.Printf("===> results: %v\n", strings.Join(r, ","))
r = r[:0]
}
for {
select {
case <-ticker.C:
flushResults()
// send max buf jobs, or fill the queue
for i := 0; i < buf; i++ {
jobID++
select {
case jobs <- jobID:
continue
}
break
}
fmt.Printf("===> send %v jobs\n", i)
case jobID := <-results:
r = append(r, fmt.Sprintf("%v", jobID))
case <-ctx.Done():
// Close jobs channel to stop workers
close(jobs)
// Wait for workers to exit
wg.Wait()
// we can close results for easy iteration because we know
// there are no more workers.
close(results)
// Flush remaining results
for jobID := range results {
r = append(r, fmt.Sprintf("%v", jobID))
}
flushResults()
return
}
}
}()
- 1 回答
- 0 关注
- 169 浏览
添加回答
举报
