为了账号安全,请及时绑定邮箱和手机立即绑定

关闭和发送到通道之间的竞争条件

关闭和发送到通道之间的竞争条件

Go
一只甜甜圈 2023-08-14 17:17:21
我正在尝试使用工作池构建通用管道库。我为源、管道和接收器创建了一个接口。您会看到,管道的工作是从输入通道接收数据,对其进行处理,然后将结果输出到通道上。这是它的预期行为:从输入通道接收数据。将数据委托给可用的工作人员。Worker 将结果发送到输出通道。所有工作人员完成后关闭输出通道。func (p *pipe) Process(in chan interface{}) (out chan interface{}) {    var wg sync.WaitGroup    out = make(chan interface{}, 100)    go func() {        for i := 1; i <= 100; i++ {            go p.work(in, out, &wg)        }        wg.Wait()        close(out)    }()    return}func (p *pipe) work(jobs <-chan interface{}, out chan<- interface{}, wg *sync.WaitGroup) {    for j := range jobs {        func(j Job) {            defer wg.Done()            wg.Add(1)            res := doSomethingWith(j)            out <- res        }(j)    }}但是,运行它可能会退出而不处理所有输入,或者出现错误并显示send on closed channel消息。使用该标志构建源会在和-race之间发出数据争用警告。close(out)out <- res我认为可能会发生以下情况。一旦一些工人完成了工作,wg计数器就会瞬间归零。因此,wg.Wait()完成并且程序继续进行close(out)。与此同时,作业通道尚未完成数据生成,这意味着一些工作人员仍在另一个 goroutine 中运行。由于out通道已经关闭,因此会导致恐慌。等待组应该放在其他地方吗?或者有没有更好的方法来等待所有工人完成?
查看完整描述

2 回答

?
撒科打诨

TA贡献1934条经验 获得超2个赞

目前尚不清楚为什么每个工作需要一名工作人员,但如果您这样做,您可以重组您的外循环设置(请参阅下面未经测试的代码)。这首先就消除了对工作池的需要。


不过,在解雇任何员工之前,请务必wg.Add 先执行此操作。在这里,您正好剥离了 100 名员工:


var wg sync.WaitGroup

out = make(chan interface{}, 100)

go func() {

    for i := 1; i <= 100; i++ {

        go p.work(in, out, &wg)

    }

    wg.Wait()

    close(out)

}()

因此,您可以这样做:


var wg sync.WaitGroup

out = make(chan interface{}, 100)

go func() {

    wg.Add(100)  // ADDED - count the 100 workers

    for i := 1; i <= 100; i++ {

        go p.work(in, out, &wg)

    }

    wg.Wait()

    close(out)

}()

请注意,您现在可以将wg其自身移至派生出工作线程的 goroutine 中。如果你放弃让每个工人将工作分拆为新的 goroutine 的想法,这可以让事情变得更干净。但是,如果每个工作人员要派生另一个 goroutine,则该工作人员本身也必须使用wg.Add,如下所示:


for j := range jobs {

    wg.Add(1)  // ADDED - count the spun-off goroutines

    func(j Job) {

        res := doSomethingWith(j)


        out <- res

        wg.Done()  // MOVED (for illustration only, can defer as before)

    }(j)

}

wg.Done() // ADDED - our work in `p.work` is now done

wg.Add(1)也就是说,每个匿名函数都是通道的另一个用户,因此在分拆新的 goroutine 之前增加通道用户计数 ( )。当您读完输入通道后jobs,调用wg.Done()(可能通过较早的defer,但我在此处的末尾展示了它)。


思考这个问题的关键是计算此时可以wg写入通道的活动 goroutine 的数量。只有当没有goroutine 打算再写入时,它才会变为零。 这使得关闭通道是安全的。


考虑使用相当简单的(但未经测试):


func (p *pipe) Process(in chan interface{}) (out chan interface{}) {

    out = make(chan interface{})

    var wg sync.WaitGroup

    go func() {

        defer close(out)

        for j := range in {

            wg.Add(1)

            go func(j Job) {

                res := doSomethingWith(j)

                out <- res

                wg.Done()

            }(j)

        }

        wg.Wait()

    }()

    return out

}

现在,您有一个 goroutine 正在in以最快的速度读取通道,并在读取过程中分拆作业。每项传入的工作都会获得一个 goroutine,除非他们提前完成工作。没有池,每个工作只有一名工人(与您的代码相同,只是我们淘汰了没有做任何有用事情的池)。


或者,由于只有一定数量的 CPU 可用,请像之前在开始时所做的那样分拆一定数量的 goroutine,但让每个 goroutine 运行一个作业直至完成,并交付其结果,然后返回读取下一个作业:


func (p *pipe) Process(in chan interface{}) (out chan interface{}) {

    out = make(chan interface{})

    go func() {

        defer close(out)

        var wg sync.WaitGroup

        ncpu := runtime.NumCPU()  // or something fancier if you like

        wg.Add(ncpu)

        for i := 0; i < ncpu; i++ {

            go func() {

                defer wg.Done()

                for j := range in {

                    out <- doSomethingWith(j)

                }

            }()

        }

        wg.Wait()

    }

    return out

}

通过使用,runtime.NumCPU()我们只能获得与运行作业的 CPU 一样多的工作线程来读取作业。这些是池,它们一次只做一项工作。


如果输出通道读取器结构良好(即不会导致管道阻塞),通常不需要缓冲输出通道。如果不是,这里的缓冲深度会限制您可以“领先于”正在使用结果的人的工作数量。根据“提前工作”的有用程度来设置它,不一定是 CPU 数量、预期作业数量等。


查看完整回答
反对 回复 2023-08-14
?
一只萌萌小番薯

TA贡献1795条经验 获得超7个赞

作业的完成速度可能与发送的速度一样快。在这种情况下,即使有更多的项目需要处理,WaitGroup 也会在零附近浮动。


解决此问题的一种方法是在发送作业之前添加一项,并在发送所有作业后减少该作业,从而有效地将发送者视为“作业”之一。在这种情况下,我们最好在wg.Add发送方中执行以下操作:


func (p *pipe) Process(in chan interface{}) (out chan interface{}) {

    var wg sync.WaitGroup

    out = make(chan interface{}, 100)

    go func() {

        for i := 1; i <= 100; i++ {

            wg.Add(1)

            go p.work(in, out, &wg)

        }

        wg.Wait()

        close(out)

    }()


    return

}


func (p *pipe) work(jobs <-chan interface{}, out chan<- interface{}, wg *sync.WaitGroup) {

    for j := range jobs {

        func(j Job) {

            res := doSomethingWith(j)


            out <- res

            wg.Done()

        }(j)

    }

}

我在代码中注意到的一件事是,每个作业都会启动一个 goroutine。同时,每个作业jobs循环处理通道,直到清空/关闭。似乎没有必要两者都做。


查看完整回答
反对 回复 2023-08-14
  • 2 回答
  • 0 关注
  • 96 浏览
慕课专栏
更多

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信