为了账号安全,请及时绑定邮箱和手机立即绑定

与超时并行运行循环

与超时并行运行循环

Go
翻过高山走不出你 2022-07-04 16:44:07
我需要运行请求,parallel而不是一个接一个,而是超时。现在我可以去吗?这是我需要并行运行的特定代码,这里的技巧也是使用超时,即根据超时等待所有请求并在所有完成后获得响应。    for _, test := range testers {        checker := NewTap(test.name, test.url, test.timeout)        res, err := checker.Check()        if err != nil {            fmt.Println(err)        }        fmt.Println(res.name)        fmt.Println(res.res.StatusCode)    }这是所有代码(工作代码) https://play.golang.org/p/cXnJJ6PW_CFpackage mainimport (    `fmt`    `net/http`    `time`)type HT interface {    Name() string    Check() (*testerResponse, error)}type testerResponse struct {    name string    res  http.Response}type Tap struct {    url     string    name    string    timeout time.Duration    client  *http.Client}func NewTap(name, url string, timeout time.Duration) *Tap {    return &Tap{        url:    url,        name:   name,        client: &http.Client{Timeout: timeout},    }}func (p *Tap) Check() (*testerResponse, error) {    response := &testerResponse{}    req, err := http.NewRequest("GET", p.url, nil)    if err != nil {        return nil, err    }    res, e := p.client.Do(req)    response.name = p.name    response.res = *res    if err != nil {        return response, e    }    return response, e}func (p *Tap) Name() string {    return p.name}func main() {    var checkers []HT    testers := []Tap{        {            name:    "first call",            url:     "http://stackoverflow.com",            timeout: time.Second * 20,        },        {            name:    "second call",            url:     "http://www.example.com",            timeout: time.Second * 10,        },    }    for _, test := range testers {        checker := NewTap(test.name, test.url, test.timeout)        res, err := checker.Check()        if err != nil {            fmt.Println(err)        }        fmt.Println(res.name)        fmt.Println(res.res.StatusCode)        checkers = append(checkers, checker)    }}
查看完整描述

2 回答

?
千万里不及你

TA贡献1784条经验 获得超9个赞

Go 中一种流行的并发模式是使用工作池。


一个基本的工作池使用两个通道;一个用于放置工作,另一个用于读取结果。在这种情况下,我们的工作频道将是 type Tap,我们的结果频道将是 type testerResponse。


工作人员

从作业通道中获取工作并将结果放在结果通道上。


// worker defines our worker func. as long as there is a job in the

// "queue" we continue to pick up  the "next" job

func worker(jobs <-chan Tap, results chan<- testerResponse) {

    for n := range jobs {

        results <- n.Check()

    }

}

工作

要添加工作,我们需要迭代我们的testers并将它们放在我们的工作频道上。


// makeJobs fills up our jobs channel

func makeJobs(jobs chan<- Tap, taps []Tap) {

    for _, t := range taps {

        jobs <- t

    }

}

结果

为了读取结果,我们需要遍历它们。


// getResults takes a job from our worker pool and gets the result

func getResults(tr <-chan testerResponse, taps []Tap) {

    for range taps {

        r := <- tr

        status := fmt.Sprintf("'%s' to '%s' was fetched with status '%d'\n", r.name, r.url, r.res.StatusCode)

        if r.err != nil {

            status = fmt.Sprintf(r.err.Error())

        }

        fmt.Println(status)

    }

}

最后是我们的主要功能。


func main() {

    // Make buffered channels

    buffer := len(testers)

    jobsPipe := make(chan Tap, buffer)               // Jobs will be of type `Tap`

    resultsPipe := make(chan testerResponse, buffer) // Results will be of type `testerResponse`


    // Create worker pool

    // Max workers default is 5

    // maxWorkers := 5

    // for i := 0; i < maxWorkers; i++ {

    //  go worker(jobsPipe, resultsPipe)

    // }


    // the loop above is the same as doing:

    go worker(jobsPipe, resultsPipe)

    go worker(jobsPipe, resultsPipe)

    go worker(jobsPipe, resultsPipe)

    go worker(jobsPipe, resultsPipe)

    go worker(jobsPipe, resultsPipe)

    // ^^ this creates 5 workers..


    makeJobs(jobsPipe, testers)

    getResults(resultsPipe, testers)

}

把它们放在一起

我将“第二次调用”的超时更改为一毫秒,以显示超时的工作原理。


package main


import (

    "fmt"

    "net/http"

    "time"

)


type HT interface {

    Name() string

    Check() (*testerResponse, error)

}


type testerResponse struct {

    err  error

    name string

    res  http.Response

    url  string

}


type Tap struct {

    url     string

    name    string

    timeout time.Duration

    client  *http.Client

}


func NewTap(name, url string, timeout time.Duration) *Tap {

    return &Tap{

        url:    url,

        name:   name,

        client: &http.Client{Timeout: timeout},

    }

}


func (p *Tap) Check() testerResponse {

    fmt.Printf("Fetching %s %s \n", p.name, p.url)

    // theres really no need for NewTap

    nt := NewTap(p.name, p.url, p.timeout)

    res, err := nt.client.Get(p.url)

    if err != nil {

        return testerResponse{err: err}

    }


    // need to close body

    res.Body.Close()

    return testerResponse{name: p.name, res: *res, url: p.url}

}


func (p *Tap) Name() string {

    return p.name

}


// makeJobs fills up our jobs channel

func makeJobs(jobs chan<- Tap, taps []Tap) {

    for _, t := range taps {

        jobs <- t

    }

}


// getResults takes a job from our jobs channel, gets the result, and

// places it on the results channel

func getResults(tr <-chan testerResponse, taps []Tap) {

    for range taps {

        r := <-tr

        status := fmt.Sprintf("'%s' to '%s' was fetched with status '%d'\n", r.name, r.url, r.res.StatusCode)

        if r.err != nil {

            status = fmt.Sprintf(r.err.Error())

        }

        fmt.Printf(status)

    }

}


// worker defines our worker func. as long as there is a job in the

// "queue" we continue to pick up  the "next" job

func worker(jobs <-chan Tap, results chan<- testerResponse) {

    for n := range jobs {

        results <- n.Check()

    }

}


var (

    testers = []Tap{

        {

            name:    "1",

            url:     "http://google.com",

            timeout: time.Second * 20,

        },

        {

            name:    "2",

            url:     "http://www.yahoo.com",

            timeout: time.Second * 10,

        },

        {

            name:    "3",

            url:     "http://stackoverflow.com",

            timeout: time.Second * 20,

        },

        {

            name:    "4",

            url:     "http://www.example.com",

            timeout: time.Second * 10,

        },

        {

            name:    "5",

            url:     "http://stackoverflow.com",

            timeout: time.Second * 20,

        },

        {

            name:    "6",

            url:     "http://www.example.com",

            timeout: time.Second * 10,

        },

        {

            name:    "7",

            url:     "http://stackoverflow.com",

            timeout: time.Second * 20,

        },

        {

            name:    "8",

            url:     "http://www.example.com",

            timeout: time.Second * 10,

        },

        {

            name:    "9",

            url:     "http://stackoverflow.com",

            timeout: time.Second * 20,

        },

        {

            name:    "10",

            url:     "http://www.example.com",

            timeout: time.Second * 10,

        },

        {

            name:    "11",

            url:     "http://stackoverflow.com",

            timeout: time.Second * 20,

        },

        {

            name:    "12",

            url:     "http://www.example.com",

            timeout: time.Second * 10,

        },

        {

            name:    "13",

            url:     "http://stackoverflow.com",

            timeout: time.Second * 20,

        },

        {

            name:    "14",

            url:     "http://www.example.com",

            timeout: time.Second * 10,

        },

    }

)


func main() {

    // Make buffered channels

    buffer := len(testers)

    jobsPipe := make(chan Tap, buffer)               // Jobs will be of type `Tap`

    resultsPipe := make(chan testerResponse, buffer) // Results will be of type `testerResponse`


    // Create worker pool

    // Max workers default is 5

    // maxWorkers := 5

    // for i := 0; i < maxWorkers; i++ {

    //  go worker(jobsPipe, resultsPipe)

    // }


    // the loop above is the same as doing:

    go worker(jobsPipe, resultsPipe)

    go worker(jobsPipe, resultsPipe)

    go worker(jobsPipe, resultsPipe)

    go worker(jobsPipe, resultsPipe)

    go worker(jobsPipe, resultsPipe)

    // ^^ this creates 5 workers..


    makeJobs(jobsPipe, testers)

    getResults(resultsPipe, testers)

}


哪个输出:


// Fetching http://stackoverflow.com 

// Fetching http://www.example.com 

// Get "http://www.example.com": context deadline exceeded (Client.Timeout exceeded while awaiting headers)

// 'first call' to 'http://stackoverflow.com' was fetched with status '200'


查看完整回答
反对 回复 2022-07-04
?
qq_笑_17

TA贡献1818条经验 获得超7个赞

在 Golang 中可以通过不同的方式实现并行。这是一种带有等待组、互斥锁和无限 goroutine 的幼稚方法,不推荐使用。我认为使用通道是进行并行的首选方式。


package main


import (

    "fmt"

    "net/http"

    "sync"

    "time"

)


type HT interface {

    Name() string

    Check() (*testerResponse, error)

}


type testerResponse struct {

    name string

    res  http.Response

}


type Tap struct {

    url     string

    name    string

    timeout time.Duration

    client  *http.Client

}


func NewTap(name, url string, timeout time.Duration) *Tap {

    return &Tap{

        url:  url,

        name: name,

        client: &http.Client{

            Timeout: timeout,

        },

    }

}


func (p *Tap) Check() (*testerResponse, error) {

    response := &testerResponse{}

    req, err := http.NewRequest("GET", p.url, nil)

    if err != nil {

        return nil, err

    }

    res, e := p.client.Do(req)

    if e != nil {

        return response, e

    }

    response.name = p.name

    response.res = *res


    return response, e

}


func (p *Tap) Name() string {

    return p.name

}


func main() {


    var checkers []HT

    wg := sync.WaitGroup{}

    locker := sync.Mutex{}


    testers := []Tap{

        {

            name:    "first call",

            url:     "http://google.com",

            timeout: time.Second * 20,

        },

        {

            name:    "second call",

            url:     "http://www.example.com",

            timeout: time.Millisecond * 100,

        },

    }


    for _, test := range testers {

        wg.Add(1)

        go func(tst Tap) {

            defer wg.Done()

            checker := NewTap(tst.name, tst.url, tst.timeout)

            res, err := checker.Check()

            if err != nil {

                fmt.Println(err)

            }

            fmt.Println(res.name)

            fmt.Println(res.res.StatusCode)

            locker.Lock()

            defer locker.Unlock()

            checkers = append(checkers, checker)

        }(test)

    }


    wg.Wait()

}



查看完整回答
反对 回复 2022-07-04
  • 2 回答
  • 0 关注
  • 118 浏览
慕课专栏
更多

添加回答

举报

0/150
提交
取消
微信客服

购课补贴
联系客服咨询优惠详情

帮助反馈 APP下载

慕课网APP
您的移动学习伙伴

公众号

扫描二维码
关注慕课网微信公众号