为了账号安全,请及时绑定邮箱和手机立即绑定

使用通道作为队列的死锁

使用通道作为队列的死锁

Go
哆啦的时光机 2022-09-05 10:39:01
我正在学习 Go,我正在尝试实现作业队列。我想做的是:让主 goroutine 馈送行通过一个通道供多个解析器工作线程(将一条线解析为 s 结构),并让每个分析器将结构发送到其他工作线程(goroutines)将处理的结构通道(发送到数据库等)。代码如下所示:lineParseQ := make(chan string, 5)jobProcessQ := make(chan myStruct, 5)doneQ := make(chan myStruct, 5)fileName := "myfile.csv"file, err := os.Open(fileName)if err != nil {    log.Fatal(err)}defer file.Close()reader := bufio.NewReader(file)// Start line parsing workers and send to jobProcessQfor i := 1; i <= 2; i++ {    go lineToStructWorker(i, lineParseQ, jobProcessQ)}// Process myStruct from jobProcessQfor i := 1; i <= 5; i++ {    go WorkerProcessStruct(i, jobProcessQ, doneQ)}lineCount := 0 countSend := 0for {    line, err := reader.ReadString('\n')        if err != nil && err != io.EOF {        log.Fatal(err)    }        if err == io.EOF {        break    }        lineCount++        if lineCount > 1 {        countSend++        lineParseQ <- line[:len(line)-1]    // Avoid last char '\n'    }}for i := 0; i < countSend; i++ {    fmt.Printf("Received %+v.\n", <-doneQ)}close(doneQ)close(jobProcessQ)close(lineParseQ)这是一个简化的游乐场:https://play.golang.org/p/yz84g6CJraa工人看起来像这样:func lineToStructWorker(workerID int, lineQ <-chan string, strQ chan<- myStruct ) {    for j := range lineQ {        strQ <- lineToStruct(j) // just parses the csv to a struct...    }}func WorkerProcessStruct(workerID int, strQ <-chan myStruct, done chan<- myStruct) {    for a := range strQ {        time.Sleep(time.Millisecond * 500) // fake long operation...        done <- a    }}我知道问题与“完成”通道有关,因为如果我不使用它,就不会有错误,但我不知道如何解决它。
查看完整描述

2 回答

?
MYYA

TA贡献1868条经验 获得超4个赞

在完成将所有行发送到 之前,您不会开始读取,这比缓冲区空间的行数多。因此,一旦缓冲区已满,发送块(开始填充缓冲区),一旦缓冲区已满,它就会死锁。将发送到 的循环、从 读取的循环或两者一起移动,以分隔 goroutine,例如:doneQlineParseQdoneQlineParseQlineParseQdoneQ


go func() {

    for _, line := range lines {

        countSend++

        lineParseQ <- line

    }

    close(lineParseQ)

}()

这仍然会在结束时陷入僵局,因为你已经在同一个goroutine中有一个过端通道和之后的通道;由于继续直到通道关闭,并且关闭在完成后,您仍然有一个死锁。您需要将关闭放在适当的位置;也就是说,无论是在发送例程中,还是在监视发送例程时,如果给定通道有多个发送方,则阻止发送例程。rangecloserangerangeWaitGroup


// Start line parsing workers and send to jobProcessQ

wg := new(sync.WaitGroup)

for i := 1; i <= 2; i++ {

    wg.Add(1)

    go lineToStructWorker(i, lineParseQ, jobProcessQ, wg)

}


// Process myStruct from jobProcessQ

for i := 1; i <= 5; i++ {

    go WorkerProcessStruct(i, jobProcessQ, doneQ)

}


countSend := 0


go func() {

    for _, line := range lines {

        countSend++

        lineParseQ <- line

    }

    close(lineParseQ)

}()


go func() {

    wg.Wait()

    close(jobProcessQ)

}()


for a := range doneQ {

    fmt.Printf("Received %v.\n", a)

}


// ...


func lineToStructWorker(workerID int, lineQ <-chan string, strQ chan<- myStruct, wg *sync.WaitGroup) {

    for j := range lineQ {

        strQ <- lineToStruct(j) // just parses the csv to a struct...

    }

    wg.Done()

}


func WorkerProcessStruct(workerID int, strQ <-chan myStruct, done chan<- myStruct) {

    for a := range strQ {

        time.Sleep(time.Millisecond * 500) // fake long operation...

        done <- a

    }

    close(done)

}

完整的工作示例如下:https://play.golang.org/p/XsnewSZeb2X


查看完整回答
反对 回复 2022-09-05
?
小唯快跑啊

TA贡献1863条经验 获得超2个赞

协调管道,将每个部分分成几个阶段。当您知道管道的一部分已完成(并且没有人写入特定通道)时,请关闭该通道以指示所有“工作人员”退出,例如sync.WaitGroup


var wg sync.WaitGroup

for i := 1; i <= 5; i++ {

    i := i

    wg.Add(1)

    go func() {

        Worker(i)

        wg.Done()

    }()

}


// wg.Wait() signals the above have completed

缓冲通道对于处理突发工作负载非常方便,但有时它们用于避免不良设计中的死锁。如果要避免在 goroutine 中运行管道的某些部分,可以缓冲一些通道(通常与 worker 的数量匹配),以避免主 goroutine 堵塞。


如果您有读取和写入的依赖部分,并希望避免死锁 - 请确保它们位于单独的goroutine中。将管道的所有部分都拥有自己的goroutine,甚至可以消除对缓冲通道的需求:


// putting all channel work into separate goroutines

// removes the need for buffered channels

lineParseQ := make(chan string, 0)

jobProcessQ := make(chan myStruct, 0)

doneQ := make(chan myStruct, 0)

当然,这是一个权衡 - 一个goroutine的资源成本约为2K - 而缓冲通道要少得多。与大多数设计一样,这取决于如何使用它。


也不要被臭名昭著的Go for-loop gotcha抓住,所以使用闭包赋值来避免这种情况:


for i := 1; i <= 5; i++ {

    i := i       // new i (not the i above)

    go func() {

        myfunc(i) // otherwise all goroutines will most likely get '5'

    }()

}

最后,请确保在退出之前等待所有结果得到处理。从基于通道的函数返回并认为所有结果都已处理是一个常见的错误。在服务中,这最终将是正确的。但在独立的可执行文件中,处理循环可能仍在处理结果。


go func() {

    wgW.Wait()   // waiting on worker goroutines to finish

    close(doneQ) // safe to close results channel now

}()


// ensure we don't return until all results have been processed

for a := range doneQ {

    fmt.Printf("Received %v.\n", a)

}

通过在主goroutine中处理结果,我们确保在未处理所有内容的情况下不会过早返回。


将它们全部放在一起:


https://play.golang.org/p/MjLpQ5xglP3


查看完整回答
反对 回复 2022-09-05
  • 2 回答
  • 0 关注
  • 133 浏览
慕课专栏
更多

添加回答

举报

0/150
提交
取消
微信客服

购课补贴
联系客服咨询优惠详情

帮助反馈 APP下载

慕课网APP
您的移动学习伙伴

公众号

扫描二维码
关注慕课网微信公众号