2 回答
TA贡献1868条经验 获得超4个赞
在完成将所有行发送到 之前,您不会开始读取,这比缓冲区空间的行数多。因此,一旦缓冲区已满,发送块(开始填充缓冲区),一旦缓冲区已满,它就会死锁。将发送到 的循环、从 读取的循环或两者一起移动,以分隔 goroutine,例如:doneQlineParseQdoneQlineParseQlineParseQdoneQ
go func() {
for _, line := range lines {
countSend++
lineParseQ <- line
}
close(lineParseQ)
}()
这仍然会在结束时陷入僵局,因为你已经在同一个goroutine中有一个过端通道和之后的通道;由于继续直到通道关闭,并且关闭在完成后,您仍然有一个死锁。您需要将关闭放在适当的位置;也就是说,无论是在发送例程中,还是在监视发送例程时,如果给定通道有多个发送方,则阻止发送例程。rangecloserangerangeWaitGroup
// Start line parsing workers and send to jobProcessQ
wg := new(sync.WaitGroup)
for i := 1; i <= 2; i++ {
wg.Add(1)
go lineToStructWorker(i, lineParseQ, jobProcessQ, wg)
}
// Process myStruct from jobProcessQ
for i := 1; i <= 5; i++ {
go WorkerProcessStruct(i, jobProcessQ, doneQ)
}
countSend := 0
go func() {
for _, line := range lines {
countSend++
lineParseQ <- line
}
close(lineParseQ)
}()
go func() {
wg.Wait()
close(jobProcessQ)
}()
for a := range doneQ {
fmt.Printf("Received %v.\n", a)
}
// ...
func lineToStructWorker(workerID int, lineQ <-chan string, strQ chan<- myStruct, wg *sync.WaitGroup) {
for j := range lineQ {
strQ <- lineToStruct(j) // just parses the csv to a struct...
}
wg.Done()
}
func WorkerProcessStruct(workerID int, strQ <-chan myStruct, done chan<- myStruct) {
for a := range strQ {
time.Sleep(time.Millisecond * 500) // fake long operation...
done <- a
}
close(done)
}
完整的工作示例如下:https://play.golang.org/p/XsnewSZeb2X
TA贡献1863条经验 获得超2个赞
协调管道,将每个部分分成几个阶段。当您知道管道的一部分已完成(并且没有人写入特定通道)时,请关闭该通道以指示所有“工作人员”退出,例如sync.WaitGroup
var wg sync.WaitGroup
for i := 1; i <= 5; i++ {
i := i
wg.Add(1)
go func() {
Worker(i)
wg.Done()
}()
}
// wg.Wait() signals the above have completed
缓冲通道对于处理突发工作负载非常方便,但有时它们用于避免不良设计中的死锁。如果要避免在 goroutine 中运行管道的某些部分,可以缓冲一些通道(通常与 worker 的数量匹配),以避免主 goroutine 堵塞。
如果您有读取和写入的依赖部分,并希望避免死锁 - 请确保它们位于单独的goroutine中。将管道的所有部分都拥有自己的goroutine,甚至可以消除对缓冲通道的需求:
// putting all channel work into separate goroutines
// removes the need for buffered channels
lineParseQ := make(chan string, 0)
jobProcessQ := make(chan myStruct, 0)
doneQ := make(chan myStruct, 0)
当然,这是一个权衡 - 一个goroutine的资源成本约为2K - 而缓冲通道要少得多。与大多数设计一样,这取决于如何使用它。
也不要被臭名昭著的Go for-loop gotcha抓住,所以使用闭包赋值来避免这种情况:
for i := 1; i <= 5; i++ {
i := i // new i (not the i above)
go func() {
myfunc(i) // otherwise all goroutines will most likely get '5'
}()
}
最后,请确保在退出之前等待所有结果得到处理。从基于通道的函数返回并认为所有结果都已处理是一个常见的错误。在服务中,这最终将是正确的。但在独立的可执行文件中,处理循环可能仍在处理结果。
go func() {
wgW.Wait() // waiting on worker goroutines to finish
close(doneQ) // safe to close results channel now
}()
// ensure we don't return until all results have been processed
for a := range doneQ {
fmt.Printf("Received %v.\n", a)
}
通过在主goroutine中处理结果,我们确保在未处理所有内容的情况下不会过早返回。
将它们全部放在一起:
https://play.golang.org/p/MjLpQ5xglP3
- 2 回答
- 0 关注
- 133 浏览
添加回答
举报
