为了账号安全,请及时绑定邮箱和手机立即绑定

Goroutines管道中的死锁

Goroutines管道中的死锁

Go
千万里不及你 2022-08-01 11:10:45
我需要你的帮助来理解为什么我的函数会导致死锁。当我注释掉像下面这样的行时,它可以正常工作(因此我知道问题在这里)。readFromWorker整体就在这里 https://play.golang.org/p/-0mRDAeD2tr我真的非常感谢你的帮助func readFromWorker(inCh <-chan *data, wg *sync.WaitGroup) {    defer func() {        wg.Done()    }()    //stageIn1 := make(chan *data)    //stageOut1 := make(chan *data)    for v := range inCh {        fmt.Println("v", v)        //stageIn1 <- v    }    //go stage1(stageIn1, stageOut1)    //go stage2(stageOut1)}
查看完整描述

1 回答

?
慕田峪4524236

TA贡献1875条经验 获得超5个赞

我已经评论了你做错的相关部分。另外,我建议考虑一个更好的模式。


请记住,在通道上不会停止循环,除非为它正在循环的同一通道调用。此外,关闭通道的经验法则是,发送到通道的发送方也必须关闭它,因为发送到关闭的通道会导致 。for rangeclosepanic


此外,使用无缓冲和缓冲通道时要非常小心。对于无缓冲的信道,发送方和接收方必须准备就绪,否则您的情况中也会发生死锁。


package main


import (

    "fmt"

    "sync"

)


type data struct {

    id    int

    url   string

    field int

}


type job struct {

    id  int

    url string

}


func sendToWorker(id int, inCh <-chan job, outCh chan<- *data, wg *sync.WaitGroup) {

    // wg.Done() is itself a function call, no need to wrap it inside

    // an anonymous function just to use defer.

    defer wg.Done()


    for v := range inCh {

        // some pre process stuff and then pass to pipeline

        outCh <- &data{id: v.id, url: v.url}

    }

}


func readFromWorker(inCh <-chan *data, wg *sync.WaitGroup) {

    // wg.Done() is itself a function call, no need to wrap it inside

    // an anonymous function just to use defer.

    defer wg.Done()


    var (

        stageIn1  = make(chan *data)

        stageOut1 = make(chan *data)

    )


    // Spawn the goroutines so that there's no deadlock

    // as the sender and receiver both should be ready

    // when using unbuffered channels.

    go stage1(stageIn1, stageOut1)

    go stage2(stageOut1)


    for v := range inCh {

        fmt.Println("v", v)

        stageIn1 <- v

    }

    close(stageIn1)

}


func stage1(in <-chan *data, out chan<- *data) {

    for s := range in {

        fmt.Println("stage1 = ", s)

        out <- s

    }

    // Close the out channel

    close(out)

}


func stage2(out <-chan *data) {

    // Loop until close

    for s := range out {

        fmt.Println("stage2 = ", s)

    }

}


func main() {

    const chanBuffer = 1


    var (

        inputsCh  = make(chan job, chanBuffer)

        resultsCh = make(chan *data, chanBuffer)


        wgInput  sync.WaitGroup

        wgResult sync.WaitGroup

    )


    for i := 1; i <= 4; i++ {

        wgInput.Add(1)

        go sendToWorker(i, inputsCh, resultsCh, &wgInput)

    }


    wgResult.Add(1)

    go readFromWorker(resultsCh, &wgResult)


    for j := 1; j <= 10; j++ {

        inputsCh <- job{id: j, url: "google.com"}

    }


    close(inputsCh)

    wgInput.Wait()

    close(resultsCh)

    wgResult.Wait()

}



查看完整回答
反对 回复 2022-08-01
  • 1 回答
  • 0 关注
  • 158 浏览
慕课专栏
更多

添加回答

举报

0/150
提交
取消
微信客服

购课补贴
联系客服咨询优惠详情

帮助反馈 APP下载

慕课网APP
您的移动学习伙伴

公众号

扫描二维码
关注慕课网微信公众号