为了账号安全,请及时绑定邮箱和手机立即绑定

从 kafka 发送消息到 websocket

从 kafka 发送消息到 websocket

Go
米脂 2022-11-23 19:48:43

我在 golang 中使用 gin-framework 创建了一个 web 服务。在这个项目中,我还使用了一些来自特定主题的 kafka 消息。我想要实现的是将我从主题中收到的消息倒入 websocket 中。所以通信只是一种方式,超过 1 个人可以连接到网络套接字并看到传入的消息。我想使用通道,所以在接收 kafka 消息的函数中我有这样的东西:


ch <- KafkaMessage

在 gin 框架中,我创建了这样的东西:


requestRouterRPCv1.GET("wf-live", wsWorkFlowLive)


func wsWorkFlowLive(c *gin.Context) {

    ws, err := upGrader.Upgrade(c.Writer, c.Request, nil)

    if err != nil {

        log.Println("error get connection")

        log.Fatal(err)

    }

    defer ws.Close()


    err = ws.WriteJSON(<-ch)


    if err != nil {

        log.Println("error write message: " + err.Error())

    }

}


var upGrader = websocket.Upgrader{

    CheckOrigin: func(r *http.Request) bool {

        return true

    },

}

这是我用来测试 websocket 的 html 片段:


 <!DOCTYPE html>

   <html>

     <head>

       <meta charset="UTF-8" />

       <title>index</title>

    </head>

     <body>

       <h1>test websocket</h1>

       <p id="message-json"></p>

      <p id="message-text"></p>

            <script>

        function jsonWS() {

          var ws = new WebSocket("ws://ws.acme.com/ws/v1/wf-live");


          ws.onmessage = function (evt) {

            console.log("Received Message: " + evt.data);

            document.getElementById("message-json").innerText += evt.data;

          };


          ws.onclose = function (evt) {

            console.log("Connection closed.");

          };

        }


        // Start websocket

        jsonWS();

      </script>

    </body>

  </html>

但是我想念一些东西,我是个新手,因为一旦收到第一条 kafka 消息,我就会出现以下错误行为:

  1. 仅显示第一条消息,连接快速关闭后

  2. 要查看第二个,我必须刷新页面,这不是 websocket 方式

  3. 因为连接关闭通道它不是红色的,所以它一直停留在 cosume 函数中,直到我读取它。我不能有这种行为

  4. 为了避免第 3 点,我想我必须有一种机制,只有当一个或多个 ws 连接时,我才将消息发送到通道。


查看完整描述

2 回答

?
FFIVE

TA贡献1486条经验 获得超5个赞

Gorilla 聊天示例接近您的需要。该示例将从任何客户端接收到的消息广播到所有连接的客户端。执行以下操作以使代码适应您的用例:


更改客户端读取泵以丢弃收到的消息,而不是将消息发送到集线器。


func (c *Client) readPump() {

    defer func() {

        c.hub.unregister <- c

        c.conn.Close()

    }()

    c.conn.SetReadLimit(maxMessageSize)

    c.conn.SetReadDeadline(time.Now().Add(pongWait))

    c.conn.SetPongHandler(func(string) error { 

    c.conn.SetReadDeadline(time.Now().Add(pongWait)); return nil })

    for {

        if _, _, err := c.NextReader(); err != nil {

          break

        }

    }

}

更改您的 Kafka 读取循环以将消息发送到Hub.broadcast通道。


for {

    msg, err := c.ReadMessage(xxx)

    if err != nil {

          // handle error

    }

    hub.broadcast <- msg

}

删除将客户端发送队列中的消息合并为单个 websocket 消息的代码,或者调整客户端以在单个 websocket 消息中处理多个 Kafka 消息。


查看完整回答
反对 回复 2022-11-23
?
翻翻过去那场雪

TA贡献1775条经验 获得超13个赞

你很接近。只是缺少两件事。

1- 使用select语句继续从 kafka 通道接收新消息。

2- 保持活跃的 websocket 连接。这个答案有更多细节

让我知道这是否适合您。


查看完整回答
反对 回复 2022-11-23

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信