为了账号安全,请及时绑定邮箱和手机立即绑定

等待多个回调,超时,无需忙于等待或轮询

等待多个回调,超时,无需忙于等待或轮询

Go
阿晨1998 2022-09-19 20:53:29
在 Go 中,我有两个最终不会触发的回调。registerCb(func() {...})registerCb(func() {...})/* Wait for both func to execute with timeout */我想等待它们,但如果其中一个没有执行,则会超时。同步。WaitGroup 不起作用,因为它是阻塞的,而不是基于通道的。此外,您调用 WaitGroup.Done() 而不会在回调之外出现恐慌的风险。我目前的解决方案是只使用两个布尔值和一个繁忙的等待循环。但这并不令人满意。是否有任何不使用轮询或忙碌等待的惯用方法?更新:下面是一些代码,演示了一个繁忙的等待解决方案,但应该在两个回调都触发后立即返回,或者在超时之后返回,而不使用轮询package mainimport (    "fmt"    "log"    "sync"    "time")var cbOne func()var cbTwo func()func registerCbOne(cb func()) {    cbOne = cb}func registerCbTwo(cb func()) {    cbTwo = cb}func executeCallbacks() {    <-time.After(1 * time.Second)    cbOne()    // Might never happen    //<-time.After(1 * time.Second)    //cbTwo()}func main() {    // Some process in background will execute our callbacks    go func() {        executeCallbacks()    }()    err := WaitAllOrTimeout(3 * time.Second)    if err != nil {        fmt.Println("Error: ", err.Error())    }    fmt.Println("Hello, playground")}func WaitAllOrTimeout(to time.Duration) error {    cbOneDoneCh := make(chan bool, 1)    cbTwoDoneCh := make(chan bool, 1)    cbOneDone := false    cbTwoDone := false        registerCbOne(func() {        fmt.Println("cb One");        cbOneDoneCh <- true    })    registerCbTwo(func() {        fmt.Println("cb Two");        cbTwoDoneCh <- true    })    // Wait for cbOne and cbTwo to be executed or a timeout        // Busywait solution    for {        select {             case <-time.After(to):                 if cbOneDone && cbTwoDone {                     fmt.Println("Both CB executed (we could poll more often)")                     return nil                 }                 fmt.Println("Timeout!")                 return fmt.Errorf("Timeout")             case <-cbOneDoneCh:                 cbOneDone = true             case <-cbTwoDoneCh:                 cbTwoDone = true        }    }}
查看完整描述

3 回答

?
饮歌长啸

TA贡献1951条经验 获得超3个赞

这是我的评论的后续内容,在您添加示例解决方案后添加。为了比我在注释中更清晰,您的示例代码实际上并没有那么糟糕。以下是您的原始示例:

// Busywait solution

for {

    select {

         case <-time.After(to):

             if cbOneDone && cbTwoDone {

                 fmt.Println("Both CB executed (we could poll more often)")

                 return nil

             }

             fmt.Println("Timeout!")

             return fmt.Errorf("Timeout")

         case <-cbOneDoneCh:

             cbOneDone = true

         case <-cbTwoDoneCh:

             cbTwoDone = true

    }

}

这不是一个“繁忙的等待”,但它确实有几个错误(包括你需要一个只为已完成的通道发送语义的事实,或者可能更容易,至少同样好,在完成时只关闭它们一次,也许使用)。我们想做的是:sync.Once

  1. 启动计时器 作为超时。to

  2. 使用计时器的通道和两个“完成”通道输入选择循环。

我们希望在发生以下第一个事件退出 select 循环:

  • 计时器触发,或

  • “完成”通道已发出信号。

如果我们要转到两个已完成的通道,我们也希望清除变量(设置为 ),以便选择不会旋转 - 这将变成真正的忙碌等待 - 但目前让我们假设我们在回调时只发送一次,否则只会泄漏通道, 以便我们可以按照编写的方式使用您的代码,因为这些选择只会返回一次。以下是更新后的代码:closeChnil

t := timer.NewTimer(to)

for !cbOneDone || !cbTwoDone {

    select {

    case <-t.C:

        fmt.Println("Timeout!")

        return fmt.Errorf("timeout")

    }

    case <-cbOneDoneCh:

        cbOneDone = true

    case <-cbTwoDoneCh:

        cbTwoDone = true

    }

}

// insert t.Stop() and receive here to drain t.C if desired

fmt.Println("Both CB executed")

return nil

请注意,我们最多将经历两次循环:

  • 如果我们从两个 Done 通道接收到每个通道,则循环停止而不会超时。没有旋转/忙碌等待:我们从未收到过任何东西。我们返回零(无错误)。t.C

  • 如果我们从一个 Done 通道接收,循环将恢复,但会阻塞等待计时器或另一个 Done 通道。

  • 如果我们从 接收到 ,则表示我们尚未收到两个回调。我们可能有一个,但有一个暂停,我们选择放弃,这是我们的目标。我们返回一个错误,而不通过循环返回。t.C

一个真正的版本需要更多的工作来正确清理并避免泄漏“完成”通道(以及计时器通道及其goroutine;参见评论),但这是一般的想法。您已经将回调转换为通道操作,并且已经具有其通道的计时器。


查看完整回答
反对 回复 2022-09-19
?
撒科打诨

TA贡献1934条经验 获得超2个赞

下面的代码有两个变体,

  • 第一个是常规模式,没有什么花哨的,它做了工作,做得很好。您将回调启动到例程中,使它们推送到接收器,收听该接收器以获取结果或超时。注意接收器通道的初始容量,为了防止泄漏例程,它必须与回调次数匹配。

  • 第二个工厂将同步机制分解成小函数进行组装,提供两种等待方法,waitAll 和 waitOne。写起来不错,但效率肯定更低,分配更多,渠道更多来回,推理更复杂,更微妙。

package main


import (

    "fmt"

    "log"

    "sync"

    "time"

)


func main() {

    ExampleOne()

    ExampleTwo()

    ExampleThree()


    fmt.Println("Hello, playground")

}


func ExampleOne() {

    log.Println("start reg")

    errs := make(chan error, 2)

    go func() {

        fn := callbackWithOpts("reg: so slow", 2*time.Second, nil)

        errs <- fn()

    }()

    go func() {

        fn := callbackWithOpts("reg: too fast", time.Millisecond, fmt.Errorf("broke!"))

        errs <- fn()

    }()


    select {

    case err := <-errs: // capture only one result,

        // the fastest to finish.

        if err != nil {

            log.Println(err)

        }

    case <-time.After(time.Second): // or wait that many amount of time,

        // in case they are all so slow.

    }

    log.Println("done reg")

}


func ExampleTwo() {

    log.Println("start wait")

    errs := waitAll(

        withTimeout(time.Second,

            callbackWithOpts("waitAll: so slow", 2*time.Second, nil),

        ),

        withTimeout(time.Second,

            callbackWithOpts("waitAll: too fast", time.Millisecond, nil),

        ),

    )

    for err := range trim(errs) {

        if err != nil {

            log.Println(err)

        }

    }

    log.Println("done wait")

}


func ExampleThree() {

    log.Println("start waitOne")

    errs := waitOne(

        withTimeout(time.Second,

            callbackWithOpts("waitOne: so slow", 2*time.Second, nil),

        ),

        withTimeout(time.Second,

            callbackWithOpts("waitOne: too fast", time.Millisecond, nil),

        ),

    )

    for err := range trim(errs) {

        if err != nil {

            log.Println(err)

        }

    }

    log.Println("done waitOne")

}


// a configurable callback for playing

func callbackWithOpts(msg string, tout time.Duration, err error) func() error {

    return func() error {

        <-time.After(tout)

        fmt.Println(msg)

        return err

    }

}


// withTimeout return a function that returns first error or times out and return nil

func withTimeout(tout time.Duration, h func() error) func() error {

    return func() error {

        d := make(chan error, 1)

        go func() {

            d <- h()

        }()

        select {

        case err := <-d:

            return err

        case <-time.After(tout):

        }

        return nil

    }

}


// wait launches all func() and return their errors into the returned error channel; (merge)

// It is the caller responsability to drain the output error channel.

func waitAll(h ...func() error) chan error {

    d := make(chan error, len(h))

    var wg sync.WaitGroup

    for i := 0; i < len(h); i++ {

        wg.Add(1)

        go func(h func() error) {

            defer wg.Done()

            d <- h()

        }(h[i])

    }

    go func() {

        wg.Wait()

        close(d)

    }()

    return d

}


// wait launches all func() and return the first error into the returned error channel

// It is the caller responsability to drain the output error channel.

func waitOne(h ...func() error) chan error {

    d := make(chan error, len(h))

    one := make(chan error, 1)

    var wg sync.WaitGroup

    for i := 0; i < len(h); i++ {

        wg.Add(1)

        go func(h func() error) {

            defer wg.Done()

            d <- h()

        }(h[i])

    }

    go func() {

        for err := range d {

            one <- err

                close(one)

            break

        }

    }()

    go func() {

        wg.Wait()

        close(d)

    }()

    return one

}


func trim(err chan error) chan error {

    out := make(chan error)

    go func() {

        for e := range err {

            out <- e

        }

        close(out)

    }()

    return out

}


查看完整回答
反对 回复 2022-09-19
?
慕斯王

TA贡献1864条经验 获得超2个赞

func wait(ctx context.Context, wg *sync.WaitGroup) error {

    done := make(chan struct{}, 1)

    go func() {

        wg.Wait()

        done <- struct{}{}

    }()


    select {

    case <-done:

        // Counter is 0, so all callbacks completed.

        return nil

    case <-ctx.Done():

        // Context cancelled.

        return ctx.Err()

    }

}

或者,您可以传递 a 和 块而不是 on ,但我认为使用上下文更习惯用语。time.Duration<-time.After(d)<-ctx.Done()


查看完整回答
反对 回复 2022-09-19
  • 3 回答
  • 0 关注
  • 126 浏览
慕课专栏
更多

添加回答

举报

0/150
提交
取消
微信客服

购课补贴
联系客服咨询优惠详情

帮助反馈 APP下载

慕课网APP
您的移动学习伙伴

公众号

扫描二维码
关注慕课网微信公众号