为了账号安全,请及时绑定邮箱和手机立即绑定

goroutines 有很高的空闲唤醒呼叫

goroutines 有很高的空闲唤醒呼叫

Go
德玛西亚99 2022-12-05 16:47:20
我正在使用 GoLang 使用 goroutines 同时运行两个 websocket 客户端(一个用于私有数据,一个用于公共数据)。从表面上看,一切似乎都很好。两个客户端都接收从 websocket 服务器传输的数据。我相信我可能设置了错误,但是,因为当我检查活动监视器时,我的程序始终有 500 - 1500 次空闲唤醒,并且正在使用 >200% 的 CPU。对于像两个 websocket 客户端这样简单的事情来说,这似乎并不正常。我将代码放在片段中,这样阅读起来就更少了(希望这样更容易理解),但如果您需要完整的代码,我也可以将其发布。这是运行 ws 客户端的主要功能中的代码comms := make(chan os.Signal, 1)signal.Notify(comms, os.Interrupt, syscall.SIGTERM)ctx := context.Background()ctx, cancel := context.WithCancel(ctx)var wg sync.WaitGroupwg.Add(1)go pubSocket.PubListen(ctx, &wg, &activeSubs, testing)wg.Add(1)go privSocket.PrivListen(ctx, &wg, &activeSubs, testing)<- commscancel()wg.Wait()这是客户端如何运行 go 例程的代码func (socket *Socket) PubListen(ctx context.Context, wg *sync.WaitGroup, subManager *ConnStatus, testing bool) {    defer wg.Done()    for {        select {        case <- ctx.Done():            log.Println("closing public socket")            socket.Close()            return        default:            socket.OnTextMessage = func(message string, socket Socket) {                log.Println(message)                pubJsonDecoder(message, testing)                //tradesParser(message);            }        }    }}func (socket *Socket) PrivListen(ctx context.Context, wg *sync.WaitGroup, subManager *ConnStatus, testing bool) {    defer wg.Done()    for {        select {        case <- ctx.Done():            log.Println("closing private socket")            socket.Close()            return        default:            socket.OnTextMessage = func(message string, socket Socket) {                log.Println(message)            }        }    }}关于为什么 Idle Wake Ups 如此之高的任何想法?我应该使用多线程而不是并发吗?在此先感谢您的帮助!
查看完整描述

2 回答

?
慕运维8079593

TA贡献1876条经验 获得超5个赞

你在这里浪费 CPU(多余的循环):


  for {

       // ...

        default:

        // High CPU usage here.

        }

    }

尝试这样的事情:


 func (socket *Socket) PubListen(ctx context.Context, wg *sync.WaitGroup, subManager *ConnStatus, testing bool) {

    defer wg.Done()

    defer socket.Close()


    socket.OnTextMessage = func(message string, socket Socket) {

        log.Println(message)

        pubJsonDecoder(message, testing)

        //tradesParser(message);

    }


    <-ctx.Done()

    log.Println("closing public socket")

}


func (socket *Socket) PrivListen(ctx context.Context, wg *sync.WaitGroup, subManager *ConnStatus, testing bool) {

    defer wg.Done()

    defer socket.Close()


    socket.OnTextMessage = func(message string, socket Socket) {

        log.Println(message)

    }


    <-ctx.Done()

    log.Println("closing private socket")

}

这也可能有帮助:

https ://github.com/gorilla/websocket/blob/master/examples/chat/client.go


查看完整回答
反对 回复 2022-12-05
?
慕后森

TA贡献1802条经验 获得超5个赞

看起来你可能有几个微调器。在 for - select 语句的默认情况下,您正在为 OnTextMessage() 分配处理函数。如果没有其他案例准备就绪,则默认案例始终执行。因为在默认情况下没有任何阻塞,所以 for 循环就会失控。两个像这样旋转的 goroutines 可能会挂住 2 个核心。Websockets 是网络 IO,那些 goroutines 很可能并行运行。这就是您看到 200% 利用率的原因。


查看 gorilla/websocket 库。我不会说它比任何其他 websocket 库更好或更差,我对它有很多经验。


https://github.com/gorilla/websocket


下面是我多次使用的实现。它的设置方式是注册在收到特定消息时触发的处理函数。假设消息中的一个值是“type”:“start-job”,websocket 服务器将调用您分配给“start-job”websocket 消息的处理程序。感觉就像为 http 路由器编写端点。


包服务器ws


上下文.go


package serverws


import (

    "errors"

    "fmt"

    "strings"

    "sync"

)


// ConnContext is the connection context to track a connected websocket user

type ConnContext struct {

    specialKey  string

    supportGzip string

    UserID      string

    mu         sync.Mutex // Websockets are not thread safe, we'll use a mutex to lock writes.

}


// HashKeyAsCtx returns a ConnContext based on the hash provided

func HashKeyAsCtx(hashKey string) (*ConnContext, error) {

    values := strings.Split(hashKey, ":")

    if len(values) != 3 {

        return nil, errors.New("Invalid Key received: " + hashKey)

    }

    return &ConnContext{values[0], values[1], values[2], sync.Mutex{}}, nil

}


// AsHashKey returns the hash key for a given connection context ConnContext

func (ctx *ConnContext) AsHashKey() string {

    return strings.Join([]string{ctx.specialKey, ctx.supportGzip, ctx.UserID}, ":")

}


// String returns a string of the hash of a given connection context ConnContext

func (ctx *ConnContext) String() string {

    return fmt.Sprint("specialkey: ", ctx.specialKey, " gzip ", ctx.supportGzip, " auth ", ctx.UserID)

}

wshandler.go


package serverws


import (

    "encoding/json"

    "errors"

    "fmt"

    "net/http"

    "strings"

    "sync"

    "time"


    "github.com/gorilla/websocket"

    "github.com/rs/zerolog/log"

)


var (

    receiveFunctionMap = make(map[string]ReceiveObjectFunc)

    ctxHashMap         sync.Map

)


// ReceiveObjectFunc is a function signature for a websocket request handler

type ReceiveObjectFunc func(conn *websocket.Conn, ctx *ConnContext, t map[string]interface{})


// WebSocketHandler does what it says, handles WebSockets (makes them easier for us to deal with)

type WebSocketHandler struct {

    wsupgrader websocket.Upgrader

}


// WebSocketMessage that is sent over a websocket.   Messages must have a conversation type so the server and the client JS know

// what is being discussed and what signals to raise on the server and the client.

// The "Notification" message instructs the client to display an alert popup.

type WebSocketMessage struct {

    MessageType string      `json:"type"`

    Message     interface{} `json:"message"`

}


// NewWebSocketHandler sets up a new websocket.

func NewWebSocketHandler() *WebSocketHandler {

    wsh := new(WebSocketHandler)

    wsh.wsupgrader = websocket.Upgrader{

        ReadBufferSize:  4096,

        WriteBufferSize: 4096,

    }

    return wsh


}


// RegisterMessageType sets up an event bus for a message type.   When messages arrive from the client that match messageTypeName,

// the function you wrote to handle that message is then called.

func (wsh *WebSocketHandler) RegisterMessageType(messageTypeName string, f ReceiveObjectFunc) {

    receiveFunctionMap[messageTypeName] = f

}


// onMessage triggers when the underlying websocket has received a message.

func (wsh *WebSocketHandler) onMessage(conn *websocket.Conn, ctx *ConnContext, msg []byte, msgType int) {

    //  Handling text messages or binary messages. Binary is usually some gzip text.

    if msgType == websocket.TextMessage {

        wsh.processIncomingTextMsg(conn, ctx, msg)

    }

    if msgType == websocket.BinaryMessage {


    }

}


// onOpen triggers when the underlying websocket has established a connection.

func (wsh *WebSocketHandler) onOpen(conn *websocket.Conn, r *http.Request) (ctx *ConnContext, err error) {

    //user, err := gothic.GetFromSession("ID", r)

    user := "TestUser"

    if err := r.ParseForm(); err != nil {

        return nil, errors.New("parameter check error")

    }


    specialKey := r.FormValue("specialKey")

    supportGzip := r.FormValue("support_gzip")


    if user != "" && err == nil {

        ctx = &ConnContext{specialKey, supportGzip, user, sync.Mutex{}}

    } else {

        ctx = &ConnContext{specialKey, supportGzip, "", sync.Mutex{}}

    }


    keyString := ctx.AsHashKey()


    if oldConn, ok := ctxHashMap.Load(keyString); ok {

        wsh.onClose(oldConn.(*websocket.Conn), ctx)

        oldConn.(*websocket.Conn).Close()

    }

    ctxHashMap.Store(keyString, conn)

    return ctx, nil

}


// onClose triggers when the underlying websocket has been closed down

func (wsh *WebSocketHandler) onClose(conn *websocket.Conn, ctx *ConnContext) {

    //log.Info().Msg(("client close itself as " + ctx.String()))

    wsh.closeConnWithCtx(ctx)

}


// onError triggers when a websocket connection breaks

func (wsh *WebSocketHandler) onError(errMsg string) {

    //log.Error().Msg(errMsg)

}


// HandleConn happens when a user connects to us at the listening point.  We ask

// the user to authenticate and then send the required HTTP Upgrade return code.

func (wsh *WebSocketHandler) HandleConn(w http.ResponseWriter, r *http.Request) {


    user := ""

    if r.URL.Path == "/websocket" {

        user = "TestUser" // authenticate however you want

        if user == "" {

            fmt.Println("UNAUTHENTICATED USER TRIED TO CONNECT TO WEBSOCKET FROM ", r.Header.Get("X-Forwarded-For"))

            return

        }

    }

    // don't do this.  You need to check the origin, but this is here as a place holder

    wsh.wsupgrader.CheckOrigin = func(r *http.Request) bool {

        return true

    }


    conn, err := wsh.wsupgrader.Upgrade(w, r, nil)

    if err != nil {

        log.Error().Msg("Failed to set websocket upgrade: " + err.Error())

        return

    }

    defer conn.Close()


    ctx, err := wsh.onOpen(conn, r)

    if err != nil {

        log.Error().Msg("Open connection failed " + err.Error() + r.URL.RawQuery)

        if user != "" {

            ctx.UserID = user

        }

        return

    }


    if user != "" {

        ctx.UserID = user

    }

    conn.SetPingHandler(func(message string) error {

        conn.WriteControl(websocket.PongMessage, []byte(message), time.Now().Add(time.Second))

        return nil

    })


    // Message pump for the underlying websocket connection

    for {

        t, msg, err := conn.ReadMessage()

        if err != nil {

            // Read errors are when the user closes the tab. Ignore.

            wsh.onClose(conn, ctx)

            return

        }


        switch t {

        case websocket.TextMessage, websocket.BinaryMessage:

            wsh.onMessage(conn, ctx, msg, t)

        case websocket.CloseMessage:

            wsh.onClose(conn, ctx)

            return

        case websocket.PingMessage:

        case websocket.PongMessage:

        }


    }


}


func (wsh *WebSocketHandler) closeConnWithCtx(ctx *ConnContext) {

    keyString := ctx.AsHashKey()

    ctxHashMap.Delete(keyString)

}


func (wsh *WebSocketHandler) processIncomingTextMsg(conn *websocket.Conn, ctx *ConnContext, msg []byte) {

    //log.Debug().Msg("CLIENT SAID " + string(msg))

    data := WebSocketMessage{}


    // try to turn this into data

    err := json.Unmarshal(msg, &data)


    // And try to get at the data underneath

    var raw = make(map[string]interface{})

    terr := json.Unmarshal(msg, &raw)


    if err == nil {

        // What kind of message is this?

        if receiveFunctionMap[data.MessageType] != nil {

            // We'll try to cast this message and call the handler for it

            if terr == nil {

                if v, ok := raw["message"].(map[string]interface{}); ok {

                    receiveFunctionMap[data.MessageType](conn, ctx, v)

                } else {

                    log.Debug().Msg("Nonsense sent over the websocket.")

                }

            } else {

                log.Debug().Msg("Nonsense sent over the websocket.")

            }

        }

    } else {

        // Received garbage from the transmitter.

    }

}


// SendJSONToSocket sends a specific message to a specific websocket

func (wsh *WebSocketHandler) SendJSONToSocket(socketID string, msg interface{}) {

    fields := strings.Split(socketID, ":")

    message, _ := json.Marshal(msg)


    ctxHashMap.Range(func(key interface{}, value interface{}) bool {

        if ctx, err := HashKeyAsCtx(key.(string)); err != nil {

            wsh.onError(err.Error())

        } else {

            if ctx.specialKey == fields[0] {

                ctx.mu.Lock()

                if value != nil {

                    err = value.(*websocket.Conn).WriteMessage(websocket.TextMessage, message)

                }

                ctx.mu.Unlock()

            }

            if err != nil {

                ctx.mu.Lock() // We'll lock here even though we're going to destroy this

                wsh.onClose(value.(*websocket.Conn), ctx)

                value.(*websocket.Conn).Close()

                ctxHashMap.Delete(key) // Remove the websocket immediately

                //wsh.onError("WRITE ERR TO USER " + key.(string) + " ERR: " + err.Error())

            }

        }

        return true

    })

}

包 wsocket


类型.go


package wsocket




// Acknowledgement is for ACKing simple messages and sending errors

type Acknowledgement struct {

    ResponseID string `json:"responseId"`

    Status     string `json:"status"`

    IPAddress  string `json:"ipaddress"`

    ErrorText  string `json:"errortext"`

}



wsocket.go


package wsocket


import (

    "fmt"

    server "project/serverws"

    "project/utils"

    "sync"

    "time"


    "github.com/gin-gonic/gin"

    "github.com/gorilla/websocket"

    // "github.com/mitchellh/mapstructure"

    "github.com/inconshreveable/log15"

)

var (

    WebSocket         *server.WebSocketHandler // So other packages can send out websocket messages

    WebSocketLocation string

    Log               log15.Logger = log15.New("package", "wsocket"

)


func SetupWebsockets(r *gin.Engine, socket *server.WebSocketHandler, debug_mode bool) {


    WebSocket = socket

    WebSocketLocation = "example.mydomain.com"

    //WebSocketLocation = "example.mydomain.com"

    r.GET("/websocket", func(c *gin.Context) {

        socket.HandleConn(c.Writer, c.Request)


    })


socket.RegisterMessageType("Hello", func(conn *websocket.Conn, ctx *server.ConnContext, data map[string]interface{}) {


        response := Acknowledgement{

            ResponseID: "Hello",

            Status:     fmt.Sprintf("OK/%v", ctx.AuthID),

            IPAddress:  conn.RemoteAddr().String(),

        }

        // mapstructure.Decode(data, &request) -- used if we wanted to read what was fed in

        socket.SendJSONToSocket(ctx.AsHashKey(), &response)

    })


socket.RegisterMessageType("start-job", func(conn *websocket.Conn, ctx *server.ConnContext, data map[string]interface{}) {


        response := Acknowledgement{

            ResponseID: "starting_job",

            Status:     fmt.Sprintf("%s is being dialed.", data["did"]),

            IPAddress:  conn.RemoteAddr().String(),

        }

        // mapstructure.Decode(data, &request) -- used if we wanted to read what was fed in to a struct.

        socket.SendJSONToSocket(ctx.AsHashKey(), &response)


    })

此实现用于 Web 应用程序。这是 JavaScript 客户端的简化版本。您可以使用此实现处理许多并发连接,并且您所做的所有通信都是定义对象/结构,这些对象/结构包含与下面的开关中的案例匹配的 responseID,它基本上是一个长的开关语句,将其序列化并将其发送到另一端,另一方会确认。我有一些版本在几个生产环境中运行。


网络套接字.js


$(() => {


    function wsMessage(object) {

        switch (object.responseId) {

            case "Hello": // HELLO! :-)

                console.log("Heartbeat received, we're connected.");

                break;


            case "Notification":

                if (object.errortext != "") {

                    $.notify({

                        // options

                        message: '<center><B><i class="fas fa-exclamation-triangle"></i>&nbsp;&nbsp;' + object.errortext + '</B></center>',

                    }, {

                        // settings

                        type: 'danger',

                        offset: 50,

                        placement: {

                            align: 'center',

                        }

                       

                    });


                } else {

                    $.notify({

                        // options

                        message: '<center><B>' + object.status + '</B></center>',

                    }, {

                        // settings

                        type: 'success',

                        offset: 50,

                        placement: {

                            align: 'center',

                        }

                    });

                }

                break;

           

        }


    }




    $(document).ready(function () {


        function heartbeat() {

            if (!websocket) return;

            if (websocket.readyState !== 1) return;

            websocket.send("{\"type\": \"Hello\", \"message\": { \"RequestID\": \"Hello\", \"User\":\"" + /*getCookie("_loginuser")*/"TestUser" + "\"} }");

            setTimeout(heartbeat, 24000);

        }

        //TODO: CHANGE TO WSS once tls is enabled.

        function wireUpWebsocket() {

            websocket = new WebSocket('wss://' + WEBSOCKET_LOCATION + '/websocket?specialKey=' + WEBSOCKET_KEY + '&support_gzip=0');


            websocket.onopen = function (event) {

                console.log("Websocket connected.");


                heartbeat();

                //if it exists

                if (typeof (wsReady) !== 'undefined') {

                    //execute it

                    wsReady();

                }


            };


            websocket.onerror = function (event) {

                console.log("WEBSOCKET ERROR " + event.data);


            };


            websocket.onmessage = function (event) {

                wsMessage(JSON.parse(event.data));

            };


            websocket.onclose = function () {

                // Don't close!

                // Replace key

                console.log("WEBSOCKET CLOSED");

                WEBSOCKET_KEY = Math.random().toString(36).substring(2, 15) + Math.random().toString(36).substring(2, 15) + Math.random().toString(36).substring(2, 15) + Math.random().toString(36).substring(2, 15);

                websocketreconnects++;

                if (websocketreconnects > 30) { // Too much, time to bounce

                    // location.reload(); Don't reload the page anymore, just re-connect.

                }

                setTimeout(function () { wireUpWebsocket(); }, 3000);

            };

        }


        wireUpWebsocket();

    });


});


function getCookie(name) {

    var value = "; " + document.cookie;

    var parts = value.split("; " + name + "=");

    if (parts.length == 2) return parts.pop().split(";").shift();

}


function setCookie(cname, cvalue, exdays) {

    var d = new Date();

    d.setTime(d.getTime() + (exdays * 24 * 60 * 60 * 1000));

    var expires = "expires=" + d.toUTCString();

    document.cookie = cname + "=" + cvalue + ";" + expires + ";path=/";

}

在无限循环中一遍又一遍地分配处理函数肯定是行不通的。


https://github.com/gorilla/websocket


查看完整回答
反对 回复 2022-12-05
  • 2 回答
  • 0 关注
  • 116 浏览
慕课专栏
更多

添加回答

举报

0/150
提交
取消
微信客服

购课补贴
联系客服咨询优惠详情

帮助反馈 APP下载

慕课网APP
您的移动学习伙伴

公众号

扫描二维码
关注慕课网微信公众号