为了账号安全,请及时绑定邮箱和手机立即绑定

从 kafka 发送消息到 websocket

从 kafka 发送消息到 websocket

Go
米脂 2022-11-23 19:48:43
我在 golang 中使用 gin-framework 创建了一个 web 服务。在这个项目中,我还使用了一些来自特定主题的 kafka 消息。我想要实现的是将我从主题中收到的消息倒入 websocket 中。所以通信只是一种方式,超过 1 个人可以连接到网络套接字并看到传入的消息。我想使用通道,所以在接收 kafka 消息的函数中我有这样的东西:ch <- KafkaMessage在 gin 框架中,我创建了这样的东西:requestRouterRPCv1.GET("wf-live", wsWorkFlowLive)func wsWorkFlowLive(c *gin.Context) {    ws, err := upGrader.Upgrade(c.Writer, c.Request, nil)    if err != nil {        log.Println("error get connection")        log.Fatal(err)    }    defer ws.Close()    err = ws.WriteJSON(<-ch)    if err != nil {        log.Println("error write message: " + err.Error())    }}var upGrader = websocket.Upgrader{    CheckOrigin: func(r *http.Request) bool {        return true    },}这是我用来测试 websocket 的 html 片段: <!DOCTYPE html>   <html>     <head>       <meta charset="UTF-8" />       <title>index</title>    </head>     <body>       <h1>test websocket</h1>       <p id="message-json"></p>      <p id="message-text"></p>            <script>        function jsonWS() {          var ws = new WebSocket("ws://ws.acme.com/ws/v1/wf-live");          ws.onmessage = function (evt) {            console.log("Received Message: " + evt.data);            document.getElementById("message-json").innerText += evt.data;          };          ws.onclose = function (evt) {            console.log("Connection closed.");          };        }        // Start websocket        jsonWS();      </script>    </body>  </html>但是我想念一些东西,我是个新手,因为一旦收到第一条 kafka 消息,我就会出现以下错误行为:仅显示第一条消息,连接快速关闭后要查看第二个,我必须刷新页面,这不是 websocket 方式因为连接关闭通道它不是红色的,所以它一直停留在 cosume 函数中,直到我读取它。我不能有这种行为为了避免第 3 点,我想我必须有一种机制,只有当一个或多个 ws 连接时,我才将消息发送到通道。
查看完整描述

2 回答

?
FFIVE

TA贡献1797条经验 获得超6个赞

Gorilla 聊天示例接近您的需要。该示例将从任何客户端接收到的消息广播到所有连接的客户端。执行以下操作以使代码适应您的用例:


更改客户端读取泵以丢弃收到的消息,而不是将消息发送到集线器。


func (c *Client) readPump() {

    defer func() {

        c.hub.unregister <- c

        c.conn.Close()

    }()

    c.conn.SetReadLimit(maxMessageSize)

    c.conn.SetReadDeadline(time.Now().Add(pongWait))

    c.conn.SetPongHandler(func(string) error { 

    c.conn.SetReadDeadline(time.Now().Add(pongWait)); return nil })

    for {

        if _, _, err := c.NextReader(); err != nil {

          break

        }

    }

}

更改您的 Kafka 读取循环以将消息发送到Hub.broadcast通道。


for {

    msg, err := c.ReadMessage(xxx)

    if err != nil {

          // handle error

    }

    hub.broadcast <- msg

}

删除将客户端发送队列中的消息合并为单个 websocket 消息的代码,或者调整客户端以在单个 websocket 消息中处理多个 Kafka 消息。


查看完整回答
反对 回复 2022-11-23
?
翻翻过去那场雪

TA贡献2065条经验 获得超13个赞

你很接近。只是缺少两件事。

1- 使用select语句继续从 kafka 通道接收新消息。

2- 保持活跃的 websocket 连接。这个答案有更多细节

让我知道这是否适合您。


查看完整回答
反对 回复 2022-11-23
  • 2 回答
  • 0 关注
  • 163 浏览
慕课专栏
更多

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信