3 回答
TA贡献1951条经验 获得超3个赞
这是我的评论的后续内容,在您添加示例解决方案后添加。为了比我在注释中更清晰,您的示例代码实际上并没有那么糟糕。以下是您的原始示例:
// Busywait solution
for {
select {
case <-time.After(to):
if cbOneDone && cbTwoDone {
fmt.Println("Both CB executed (we could poll more often)")
return nil
}
fmt.Println("Timeout!")
return fmt.Errorf("Timeout")
case <-cbOneDoneCh:
cbOneDone = true
case <-cbTwoDoneCh:
cbTwoDone = true
}
}
这不是一个“繁忙的等待”,但它确实有几个错误(包括你需要一个只为已完成的通道发送语义的事实,或者可能更容易,至少同样好,在完成时只关闭它们一次,也许使用)。我们想做的是:sync.Once
启动计时器 作为超时。
to使用计时器的通道和两个“完成”通道输入选择循环。
我们希望在发生以下第一个事件时退出 select 循环:
计时器触发,或
双“完成”通道已发出信号。
如果我们要转到两个已完成的通道,我们也希望清除变量(设置为 ),以便选择不会旋转 - 这将变成真正的忙碌等待 - 但目前让我们假设我们在回调时只发送一次,否则只会泄漏通道, 以便我们可以按照编写的方式使用您的代码,因为这些选择只会返回一次。以下是更新后的代码:closeChnil
t := timer.NewTimer(to)
for !cbOneDone || !cbTwoDone {
select {
case <-t.C:
fmt.Println("Timeout!")
return fmt.Errorf("timeout")
}
case <-cbOneDoneCh:
cbOneDone = true
case <-cbTwoDoneCh:
cbTwoDone = true
}
}
// insert t.Stop() and receive here to drain t.C if desired
fmt.Println("Both CB executed")
return nil
请注意,我们最多将经历两次循环:
如果我们从两个 Done 通道接收到每个通道,则循环停止而不会超时。没有旋转/忙碌等待:我们从未收到过任何东西。我们返回零(无错误)。
t.C如果我们从一个 Done 通道接收,循环将恢复,但会阻塞等待计时器或另一个 Done 通道。
如果我们从 接收到 ,则表示我们尚未收到两个回调。我们可能有一个,但有一个暂停,我们选择放弃,这是我们的目标。我们返回一个错误,而不通过循环返回。
t.C
一个真正的版本需要更多的工作来正确清理并避免泄漏“完成”通道(以及计时器通道及其goroutine;参见评论),但这是一般的想法。您已经将回调转换为通道操作,并且已经具有其通道的计时器。
TA贡献1934条经验 获得超2个赞
下面的代码有两个变体,
第一个是常规模式,没有什么花哨的,它做了工作,做得很好。您将回调启动到例程中,使它们推送到接收器,收听该接收器以获取结果或超时。注意接收器通道的初始容量,为了防止泄漏例程,它必须与回调次数匹配。
第二个工厂将同步机制分解成小函数进行组装,提供两种等待方法,waitAll 和 waitOne。写起来不错,但效率肯定更低,分配更多,渠道更多来回,推理更复杂,更微妙。
package main
import (
"fmt"
"log"
"sync"
"time"
)
func main() {
ExampleOne()
ExampleTwo()
ExampleThree()
fmt.Println("Hello, playground")
}
func ExampleOne() {
log.Println("start reg")
errs := make(chan error, 2)
go func() {
fn := callbackWithOpts("reg: so slow", 2*time.Second, nil)
errs <- fn()
}()
go func() {
fn := callbackWithOpts("reg: too fast", time.Millisecond, fmt.Errorf("broke!"))
errs <- fn()
}()
select {
case err := <-errs: // capture only one result,
// the fastest to finish.
if err != nil {
log.Println(err)
}
case <-time.After(time.Second): // or wait that many amount of time,
// in case they are all so slow.
}
log.Println("done reg")
}
func ExampleTwo() {
log.Println("start wait")
errs := waitAll(
withTimeout(time.Second,
callbackWithOpts("waitAll: so slow", 2*time.Second, nil),
),
withTimeout(time.Second,
callbackWithOpts("waitAll: too fast", time.Millisecond, nil),
),
)
for err := range trim(errs) {
if err != nil {
log.Println(err)
}
}
log.Println("done wait")
}
func ExampleThree() {
log.Println("start waitOne")
errs := waitOne(
withTimeout(time.Second,
callbackWithOpts("waitOne: so slow", 2*time.Second, nil),
),
withTimeout(time.Second,
callbackWithOpts("waitOne: too fast", time.Millisecond, nil),
),
)
for err := range trim(errs) {
if err != nil {
log.Println(err)
}
}
log.Println("done waitOne")
}
// a configurable callback for playing
func callbackWithOpts(msg string, tout time.Duration, err error) func() error {
return func() error {
<-time.After(tout)
fmt.Println(msg)
return err
}
}
// withTimeout return a function that returns first error or times out and return nil
func withTimeout(tout time.Duration, h func() error) func() error {
return func() error {
d := make(chan error, 1)
go func() {
d <- h()
}()
select {
case err := <-d:
return err
case <-time.After(tout):
}
return nil
}
}
// wait launches all func() and return their errors into the returned error channel; (merge)
// It is the caller responsability to drain the output error channel.
func waitAll(h ...func() error) chan error {
d := make(chan error, len(h))
var wg sync.WaitGroup
for i := 0; i < len(h); i++ {
wg.Add(1)
go func(h func() error) {
defer wg.Done()
d <- h()
}(h[i])
}
go func() {
wg.Wait()
close(d)
}()
return d
}
// wait launches all func() and return the first error into the returned error channel
// It is the caller responsability to drain the output error channel.
func waitOne(h ...func() error) chan error {
d := make(chan error, len(h))
one := make(chan error, 1)
var wg sync.WaitGroup
for i := 0; i < len(h); i++ {
wg.Add(1)
go func(h func() error) {
defer wg.Done()
d <- h()
}(h[i])
}
go func() {
for err := range d {
one <- err
close(one)
break
}
}()
go func() {
wg.Wait()
close(d)
}()
return one
}
func trim(err chan error) chan error {
out := make(chan error)
go func() {
for e := range err {
out <- e
}
close(out)
}()
return out
}
TA贡献1864条经验 获得超2个赞
func wait(ctx context.Context, wg *sync.WaitGroup) error {
done := make(chan struct{}, 1)
go func() {
wg.Wait()
done <- struct{}{}
}()
select {
case <-done:
// Counter is 0, so all callbacks completed.
return nil
case <-ctx.Done():
// Context cancelled.
return ctx.Err()
}
}
或者,您可以传递 a 和 块而不是 on ,但我认为使用上下文更习惯用语。time.Duration<-time.After(d)<-ctx.Done()
- 3 回答
- 0 关注
- 126 浏览
添加回答
举报
