为了账号安全,请及时绑定邮箱和手机立即绑定

如何使用通道和 goroutine 构建 Go 网络服务器?

如何使用通道和 goroutine 构建 Go 网络服务器?

Go
qq_笑_17 2022-01-04 09:48:35
我正在实现一个服务器来流式传输许多浮点数数组。我需要一些帮助来设计我的系统以实现以下目标:音频进程必须是独立的,即使没有任何请求进来也能完成它的工作。我目前的方法使 DataProcess 函数等待直到有请求。因为通道只能给1个请求提供数据,2个以上的请求怎么才能拿到我在DataProcess中准备好的数据呢?为了实际流式传输数据,请求处理程序不能等待整个 DataProcess 完成,无论如何,一旦我们完成 DataProcess 中的每个迭代,就给处理程序数据?任何答复表示赞赏。这是我目前的想法:package mainimport ("fmt""io""net/http""strconv""time")func main() {    c := AudioProcess()    handleHello := makeHello(c)    http.HandleFunc("/", handleHello)    http.ListenAndServe(":8000", nil)}func makeHello(c chan string) func(http.ResponseWriter, *http.Request) {    return func(w http.ResponseWriter, r *http.Request) {        for item := range c { // this loop runs when channel c is closed            io.WriteString(w, item)        }    }}func AudioProcess() chan string {    c := make(chan string)    go func() {        for i := 0; i <= 10; i++ { // Iterate the audio file            c <- strconv.Itoa(i) // have my frame of samples, send to channel c            time.Sleep(time.Second)            fmt.Println("send ", i) // logging        }        close(c) // done processing, close channel c        }()        return c    }
查看完整描述

1 回答

?
偶然的你

TA贡献1841条经验 获得超3个赞

我不完全确定这是否能解决您的问题,因为我不完全了解您的用例,但是,我已经在下面提出了一个解决方案。

我已经将 Gin 用于 HTTP 路由器,因为它对我来说更舒服,但我很确定您可以调整代码以适合您的。我这样做很匆忙(抱歉),所以可能存在我不知道的问题,但如果有任何问题,请告诉我。

简而言之:

  1. 我创建了一个Manager可以处理多个Client. 它还包含一个sync.Mutex以确保在任何给定时间只有一个线程正在修改clients

  2. 有一个InitBackgroundTask()会生成一个随机float64数,并将其传递给 ALLclients中的一个Manager(如果有的话)。如果没有clients,我们就睡觉,然后继续……

  3. 索引处理程序处理添加和删除客户端。客户端通过 UUID 进行识别;

  4. 现在可能会发生 3 件事。客户端在通过<-c.Writer.CloseNotify()通道断开连接时会自动删除(因为该方法返回从而调用defer)。我们也可以float64在下一个后台任务tick中接收随机数。最后,如果我们在 20 秒内没有收到任何东西,我们也可以终止。

我在这里对您的需求做了几个假设(例如,后台任务将每 Y 分钟返回一次 X)。如果您正在寻找更细粒度的流媒体,我建议您改用 websockets(并且仍然可以使用下面的模式)。

如果您有任何问题,请告诉我。

代码:


package main


import (

    "github.com/gin-gonic/gin"

    "github.com/satori/go.uuid"

    "log"

    "math/rand"

    "net/http"

    "sync"

    "time"

)


type Client struct {

    uuid string

    out  chan float64

}


type Manager struct {

    clients map[string]*Client

    mutex   sync.Mutex

}


func NewManager() *Manager {

    m := new(Manager)

    m.clients = make(map[string]*Client)

    return m

}


func (m *Manager) AddClient(c *Client) {

    m.mutex.Lock()

    defer m.mutex.Unlock()

    log.Printf("add client: %s\n", c.uuid)

    m.clients[c.uuid] = c

}


func (m *Manager) DeleteClient(id string) {

    m.mutex.Lock()

    defer m.mutex.Unlock()

    // log.Println("delete client: %s", c.uuid)

    delete(m.clients, id)

}


func (m *Manager) InitBackgroundTask() {

    for {

        f64 := rand.Float64()

        log.Printf("active clients: %d\n", len(m.clients))

        for _, c := range m.clients {

            c.out <- f64

        }

        log.Printf("sent output (%+v), sleeping for 10s...\n", f64)

        time.Sleep(time.Second * 10)

    }

}


func main() {

    r := gin.Default()

    m := NewManager()


    go m.InitBackgroundTask()


    r.GET("/", func(c *gin.Context) {

        cl := new(Client)

        cl.uuid = uuid.NewV4().String()

        cl.out = make(chan float64)


        defer m.DeleteClient(cl.uuid)

        m.AddClient(cl)


        select {

        case <-c.Writer.CloseNotify():

            log.Printf("%s : disconnected\n", cl.uuid)

        case out := <-cl.out:

            log.Printf("%s : received %+v\n", out)

            c.JSON(http.StatusOK, gin.H{

                "output": out,

            })

        case <-time.After(time.Second * 20):

            log.Println("timed out")

        }

    })


    r.Run()

}

注意:如果您在 Chrome 上进行测试,您可能需要在 URL 的末尾附加一个随机参数,以便实际发出请求,例如?rand=001,?rand=002等等。


查看完整回答
反对 回复 2022-01-04
  • 1 回答
  • 0 关注
  • 150 浏览
慕课专栏
更多

添加回答

举报

0/150
提交
取消
微信客服

购课补贴
联系客服咨询优惠详情

帮助反馈 APP下载

慕课网APP
您的移动学习伙伴

公众号

扫描二维码
关注慕课网微信公众号