为了账号安全,请及时绑定邮箱和手机立即绑定

每个用户的 Golang 服务器发送事件

每个用户的 Golang 服务器发送事件

Go
喵喔喔 2023-03-15 15:42:41
我使用 Go 已经有一段时间了,但之前从未使用过 SSE。我有一个问题,有人可以提供一个服务器发送事件的工作示例,该示例只会发送给特定用户(连接)。我正在使用大猩猩 - 会话进行身份验证,我想使用 UserID 来分隔连接。或者我应该通过 Ajax 使用 5 秒轮询?非常感谢这是我发现并尝试过的:https://gist.github.com/ismasan/3fb75381cd2deb6bfa9c它不会发送给单个用户,如果连接关闭,go func 不会停止https://github.com/striversity/gotr/blob/master/010-server-sent-event-part-2/main.go这正是我所需要的,但一旦连接被删除它就不会跟踪。所以现在,一旦您在私人窗口中关闭并打开浏览器,它就根本无法工作。此外,如上所述,go 例程继续进行。
查看完整描述

1 回答

?
慕容森

TA贡献1853条经验 获得超18个赞

创建一个“代理”以将消息分发给连接的用户:


type Broker struct {

    // users is a map where the key is the user id

    // and the value is a slice of channels to connections

    // for that user id

    users map[string][]chan []byte


    // actions is a channel of functions to call

    // in the broker's goroutine. The broker executes

    // everything in that single goroutine to avoid

    // data races.

    actions chan func()

}


// run executes in a goroutine. It simply gets and 

// calls functions.

func (b *Broker) run() {

    for a := range b.actions {

        a()

    }

}


func newBroker() *Broker {

    b := &Broker{

        users:   make(map[string][]chan []byte),

        actions: make(chan func()),

    }

    go b.run()

    return b

}


// addUserChan adds a channel for user with given id.

func (b *Broker) addUserChan(id string, ch chan []byte) {

    b.actions <- func() {

        b.users[id] = append(b.users[id], ch)

    }

}


// removeUserchan removes a channel for a user with the given id.

func (b *Broker) removeUserChan(id string, ch chan []byte) {

    // The broker may be trying to send to 

    // ch, but nothing is receiving. Pump ch

    // to prevent broker from getting stuck.

    go func() { for range ch {} }()


    b.actions <- func() {

        chs := b.users[id]

        i := 0

        for _, c := range chs {

            if c != ch {

                chs[i] = c

                i = i + 1

            }

        }

        if i == 0 {

            delete(b.users, id)

        } else {

            b.users[id] = chs[:i]

        }

        // Close channel to break loop at beginning

        // of removeUserChan.

        // This must be done in broker goroutine

        // to ensure that broker does not send to

        // closed goroutine.

        close(ch)

    }

}


// sendToUser sends a message to all channels for the given user id.

func (b *Broker) sendToUser(id string, data []byte) {

    b.actions <- func() {

        for _, ch := range b.users[id] {

            ch <- data

        }

    }

}

在包级别使用代理声明一个变量:


 var broker = newBroker()

使用代理编写 SSE 端点:


func sseEndpoint(w http.ResponseWriter, r *http.Request) {

    // I assume that user id is in query string for this example,

    // You should use your authentication code to get the id.

    id := r.FormValue("id")


    // Do the usual SSE setup.

    flusher := w.(http.Flusher)

    w.Header().Set("Content-Type", "text/event-stream")

    w.Header().Set("Cache-Control", "no-cache")

    w.Header().Set("Connection", "keep-alive")


    // Create channel to receive messages for this connection.  

    // Register that channel with the broker.

    // On return from the function, remove the channel

    // from the broker.

    ch := make(chan []byte)

    broker.addUserChan(id, ch)

    defer broker.removeUserChan(id, ch)

    for {

        select {

        case <-r.Context().Done():

            // User closed the connection. We are out of here.

            return

        case m := <-ch:

            // We got a message. Do the usual SSE stuff.

            fmt.Fprintf(w, "data: %s\n\n", m)

            flusher.Flush()

        }

    }

}

将代码添加到您的应用程序以调用 Broker.sendToUser。


查看完整回答
反对 回复 2023-03-15
  • 1 回答
  • 0 关注
  • 140 浏览
慕课专栏
更多

添加回答

举报

0/150
提交
取消
微信客服

购课补贴
联系客服咨询优惠详情

帮助反馈 APP下载

慕课网APP
您的移动学习伙伴

公众号

扫描二维码
关注慕课网微信公众号