为了账号安全,请及时绑定邮箱和手机立即绑定

将数据从一个戈鲁丁发送到多个其他戈鲁丁

将数据从一个戈鲁丁发送到多个其他戈鲁丁

Go
小怪兽爱吃肉 2022-09-05 09:19:27
在项目中,程序通过websocket接收数据。此数据需要由 n 种算法处理。算法的数量可以动态变化。我的尝试是创建一些发布/订阅模式,可以在其中启动和取消订阅。事实证明,这比预期的更具挑战性。以下是我想出的(基于 https://eli.thegreenplace.net/2020/pubsub-using-channels-in-go/):package pubsubimport (    "context"    "sync"    "time")type Pubsub struct {    sync.RWMutex    subs   []*Subsciption    closed bool}func New() *Pubsub {    ps := &Pubsub{}    ps.subs = []*Subsciption{}    return ps}func (ps *Pubsub) Publish(msg interface{}) {    ps.RLock()    defer ps.RUnlock()    if ps.closed {        return    }    for _, sub := range ps.subs {        // ISSUE1: These goroutines apparently do not exit properly...         go func(ch chan interface{}) {            ch <- msg        }(sub.Data)    }}func (ps *Pubsub) Subscribe() (context.Context, *Subsciption, error) {    ps.Lock()    defer ps.Unlock()    // prep channel    ctx, cancel := context.WithCancel(context.Background())    sub := &Subsciption{        Data:   make(chan interface{}, 1),        cancel: cancel,        ps:     ps,    }    // prep subsciption    ps.subs = append(ps.subs, sub)    return ctx, sub, nil}正如评论中提到的,这(至少)有两个问题:问题 1:在实现中运行了一段时间后,我得到了一些这样的错误:goroutine 120624 [runnable]: bm/internal/pubsub.(*Pubsub).Publish.func1(0x8586c0, 0xc00b44e880, 0xc008617740)     /home/X/Projects/bm/internal/pubsub/pubsub.go:30created by bookmaker/internal/pubsub.(*Pubsub).Publish     /home/X/Projects/bm/internal/pubsub/pubsub.go:30 +0xbb在没有真正理解这一点的情况下,在我看来,在中创建的goroutines确实会累积/泄漏。这是正确的吗,我在这里做错了什么?Publish()问题 2:当我通过它结束订阅时,它试图写入关闭的通道并恐慌。为了缓解这种情况,我创建了一个 goroutine 来关闭延迟通道。这感觉真的偏离了最佳实践,但我无法找到适当的解决方案。什么是确定性的方法?Unsubscribe()Publish()这里有一个小操场供您测试:https://play.golang.org/p/K-L8vLjt7_9
查看完整描述

1 回答

?
侃侃无极

TA贡献2051条经验 获得超10个赞

在深入研究您的解决方案及其问题之前,让我再次推荐此答案中介绍的另一种 Broker 方法:如何使用通道广播消息


现在进入您的解决方案。


每当你启动 goroutine 时,请始终考虑它将如何结束,并确保如果 goroutine 不应该在应用的生命周期内运行,请确保它确实如此。


// ISSUE1: These goroutines apparently do not exit properly... 

go func(ch chan interface{}) {

    ch <- msg

}(sub.Data)

此 goroutine 尝试在 上发送值。这可能是一个阻塞操作:如果 的缓冲区已满并且 上没有现成的接收器,它将阻塞。这是脱离了发射的goroutine的控制,也脱离了对包装的控制。在某些情况下,这可能很好,但这已经给软件包的用户带来了负担。尽量避免这些。尝试创建易于使用且难以滥用的 API。chchchpubsub


此外,仅仅为了在频道上发送价值而启动 goroutine 是一种资源浪费(goroutine 既便宜又轻便,但你不应该尽可能地向它们发送垃圾邮件)。


你这样做是因为你不想被阻止。为避免阻塞,您可以使用具有“合理”高缓冲器的缓冲通道。是的,这并不能解决阻塞问题,只能帮助“慢速”客户端从通道接收。


要“真正”避免在不启动 goroutine 的情况下阻塞,您可以使用非阻塞发送:


select {

case ch <- msg:

default:

    // ch's buffer is full, we cannot deliver now

}

如果发送可以继续,它将发生。如果没有,则立即选择分支。你必须决定该怎么做。“丢失”消息是否可以接受?等到“放弃”可以接受一段时间吗?或者是否可以启动一个goroutine来执行此操作(但随后您将回到我们在这里尝试解决的问题)?或者,在客户端可以从通道接收之前,是否可以被阻止...chdefault


选择合理的高缓冲区,如果遇到它仍然变满的情况,在客户端可以前进并从消息接收之前,阻止可能是可以接受的。如果不能,则整个应用可能处于不可接受的状态,并且“挂起”或“崩溃”可能是可以接受的。


// ISSUE2: close the channel async with a delay to ensure

// nothing will be written to the channel anymore

// via a pending goroutine from Publish()

go func(ch chan interface{}) {

    time.Sleep(500 * time.Millisecond)

    close(ch)

}(s.Data)

关闭通道是向接收器发出的信号,表示通道上不会发送更多值。因此,关闭通道始终是发送者的工作(和责任)。启动 goroutine 以关闭通道,您将该工作和责任“移交给”另一个不会与发送方同步的“实体”(goroutine)。这可能很容易导致死机(在闭合通道上发送是运行时死机,有关其他公理,请参阅未初始化的通道如何工作?)。别这样。


是的,这是必要的,因为您启动了goroutines来发送。如果你不这样做,那么你可以“就地”关闭,而不启动goroutine,因为这样发送者和关闭者将是同一个实体:它本身,其发送和关闭操作受互斥锁保护。因此,解决第一个问题自然而然地解决了第二个问题。Pubsub


通常,如果一个通道有多个发送方,则必须协调关闭通道。必须有一个实体(通常不是任何发送方)等待所有发送方完成,实际上使用 一个 ,然后该单个实体可以安全地关闭通道。请参阅关闭长度未知的通道。sync.WaitGroup


查看完整回答
反对 回复 2022-09-05
  • 1 回答
  • 0 关注
  • 50 浏览
慕课专栏
更多

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信