为了账号安全,请及时绑定邮箱和手机立即绑定

添加回调而不是使用默认实现

添加回调而不是使用默认实现

Go
红颜莎娜 2022-07-04 16:49:50
我正在使用以下代码,它按预期工作。用户向配置中添加testers一个新条目(现在它是硬编码的,但它将来自配置文件),该条目返回一个TAP他需要检查并通过 http 调用并行运行它们的列表。还有一个我需要支持的用例,即用户还将提供一个function/method/callback函数将通过 http/curl/websocket/他需要的任何东西实现调用(而不是 check() 函数),并且该函数将返回响应无论是 200/400/500。例如,假设用户除了配置点击列表之外还实现了两个函数/回调,程序将执行与列表相同的函数testers,这些函数将调用其他站点,例如: "http://www.yahoo.com"和https://www.bing.comcurl 或 http(只是为了演示区别)甚至一些是实现方法检查以返回一些子进程执行结果。我怎样才能以干净的方式做到这一点?package mainimport (    "fmt"    "net/http"    "time")type HT interface {    Name() string    Check() (*testerResponse, error)}type testerResponse struct {    err  error    name string    res  http.Response    url  string}type Tap struct {    url     string    name    string    timeout time.Duration    client  *http.Client}func NewTap(name, url string, timeout time.Duration) *Tap {    return &Tap{        url:    url,        name:   name,        client: &http.Client{Timeout: timeout},    }}func (p *Tap) Check() testerResponse {    fmt.Printf("Fetching %s %s \n", p.name, p.url)    // theres really no need for NewTap    nt := NewTap(p.name, p.url, p.timeout)    res, err := nt.client.Get(p.url)    if err != nil {        return testerResponse{err: err}    }    // need to close body    res.Body.Close()    return testerResponse{name: p.name, res: *res, url: p.url}}func (p *Tap) Name() string {    return p.name}// makeJobs fills up our jobs channelfunc makeJobs(jobs chan<- Tap, taps []Tap) {    for _, t := range taps {        jobs <- t    }更新 我尝试过以下 https://play.golang.org/p/cRPPzke27dZ但不确定如何调用该custom handlers check()方法以在并行调用(例如配置)中从它们获取testers数据
查看完整描述

2 回答

?
婷婷同学_

TA贡献1844条经验 获得超8个赞

更新 5(接受的答案)

*既然您对这个问题感兴趣,那么您可能也对这个问题感兴趣。有关如何使用自动取消超时运行每个作业的更多信息,请参见此处。*


要回答您的问题,您将如何添加随机函数..


我不知道你想要返回什么类型,但你可以做任何你想做的事情。


有大约一百万种不同的方法可以做到这一点,这只是一个例子:


package main


import (

    "encoding/json"

    "fmt"


    "github.com/gammazero/workerpool"

)


var (

    numWorkers = 10

)


type MyReturnType struct {

    Name string

    Data interface{}

}


func wrapJob(rc chan MyReturnType, f func() MyReturnType) func() {

    return func() {

        rc <- f()

    }

}


func main() {

    // create results chan and worker pool

    // should prob make your results channel typed to what you need

    jobs := []func() MyReturnType {

        func() MyReturnType {

            // whatever you want to do here

            return MyReturnType{Name: "job1", Data: map[string]string{"Whatever": "You want"}}

        },

        func() MyReturnType {

            // whatever you want to do here

            // do a curl or a kubectl or whatever you want

            resultFromCurl := "i am a result"

            return MyReturnType{Name: "job2", Data: resultFromCurl}

        },

    }


    results := make(chan MyReturnType, len(jobs))

    pool := workerpool.New(numWorkers)


    for _, job := range jobs {

        j := job

        pool.Submit(wrapJob(results, j))

    }


    // Wait for all jobs to finish

    pool.StopWait()


    // Close results chan

    close(results)


    // Iterate over results, printing to console

    for res := range results {

        prettyPrint(res)

    }

}


func prettyPrint(i interface{}) {

    prettyJSON, err := json.MarshalIndent(i, "", "    ")

    if err != nil {

        fmt.Printf("Error : %s \n", err.Error())

    }

    fmt.Printf("MyReturnType %s\n", string(prettyJSON))

}

返回:


// MyReturnType {

//     "Name": "job2",

//     "Data": "i am a result"

// }

// MyReturnType {

//     "Name": "job1",

//     "Data": {

//         "Whatever": "You want"

//     }

// }

更新 4

在研究了几个小时之后,我建议使用类似的东西workerpool,你可以在这里找到。老实说,使用workerpool似乎在这里最有意义。它看起来已经准备好生产并且被少数相当大的名字使用(请参阅他们的 repo 中的自述文件)。


我写了一个小例子,展示了如何使用workerpool:


package main


import (

    "fmt"

    "net/http"

    "time"


    "github.com/gammazero/workerpool"

)


var (

    numWorkers = 10

    urls       = []string{"yahoo.com", "example.com", "google.com"}

)


func main() {

    // create results chan and worker pool

    // should prob make your results channel typed to what you need

    results := make(chan interface{}, len(urls))

    pool := workerpool.New(numWorkers)


    // Create jobs by iterating over urls

    for i, u := range urls {

        url := u

        jobNum := i


        // Create job

        f := func() {

            start := time.Now()

            c := &http.Client{}

            r, e := c.Get("http://" + url)

            if e != nil {

                fmt.Println(e.Error())

            }

            took := time.Since(start).Milliseconds()

            o := fmt.Sprintf("completed job '%d' to '%s' in '%dms' with status code '%d'\n", jobNum, url, took, r.StatusCode)

            results <- o

        }


        // Add job to workerpool

        pool.Submit(f)

    }


    // Wait for all jobs to finish

    pool.StopWait()


    // Close results chan

    close(results)


    // Iterate over results, printing to console

    for res := range results {

        fmt.Printf(res.(string))

    }

}

哪个输出:


// completed job '1' to 'example.com' in '81ms' with status code '200'

// completed job '2' to 'google.com' in '249ms' with status code '200'

// completed job '0' to 'yahoo.com' in '816ms' with status code '200'

更新 3

我继续编写了一个工作池库(在 的帮助下workerpool),因为我还想更深入地研究通道和并发设计。


你可以在这里找到 repo和下面的代码。


如何使用:


pool := New(3)


pool.Job(func() {

    c := &http.Client{}

    r, e := c.Get("http://google.com")

    if e != nil {

        panic(e.Error())

    }

    fmt.Printf("To google.com %d\n", r.StatusCode)

})


pool.Job(func() {

    c := &http.Client{}

    r, e := c.Get("http://yahoo.com")

    if e != nil {

        panic(e.Error())

    }

    fmt.Printf("To yahoo.com %d\n", r.StatusCode)

})


pool.Job(func() {

    c := &http.Client{}

    r, e := c.Get("http://example.com")

    if e != nil {

        panic(e.Error())

    }

    fmt.Printf("To example.com %d\n", r.StatusCode)

})


pool.Seal()

工作池代码(水坑)

package puddle


import (

    "container/list"

    "fmt"

    "net/http"

    "sync"

    "time"

)


const (

    idleTimeout = time.Second * 2

)


// New creates a new puddle (aka worker pool)

func New(maxWorkers int) Puddle {

    // There must be at least one worker

    if maxWorkers < 1 {

        maxWorkers = 1

    }


    p := &puddle{

        maxWorkers: maxWorkers,

        jobs:       make(chan func(), 1),

        workers:    make(chan func()),

        killswitch: make(chan struct{}),

    }


    // Start accepting/working jobs as they come in

    go p.serve()


    return p

}


// Puddle knows how to interact with worker pools

type Puddle interface {

    Job(f func())

    Seal()

}


// puddle is a worker pool that holds workers, tasks, and misc metadata

type puddle struct {

    maxWorkers int

    jobs       chan func()

    workers    chan func()

    killswitch chan struct{}

    queue      List

    once       sync.Once

    stopped    int32

    waiting    int32

    wait       bool

}


// Job submits a new task to the worker pool

func (p *puddle) Job(f func()) {

    if f != nil {

        p.jobs <- f

    }

}


// Seal stops worker pool and waits for queued tasks to complete

func (p *puddle) Seal() {

    p.stop(true)

}


func (p *puddle) stop(wait bool) {

    p.once.Do(func() {

        p.wait = wait

        // Close task queue and wait for currently running tasks to finish

        close(p.jobs)

    })

    <-p.killswitch

}


func (p *puddle) killWorkerIfIdle() bool {

    select {

    case p.workers <- nil:

        // Kill worker

        return true

    default:

        // No ready workers

        return false

    }

}


// process puts new jobs onto the queue, and removes jobs from the queue as workers become available.

// Returns false if puddle is stopped.

func (p *puddle) process() bool {

    select {

    case task, ok := <-p.jobs:

        if !ok {

            return false

        }

        p.queue.PushBack(task)

    case p.workers <- p.queue.Front().Value.(func()):

        // Give task to ready worker

        p.queue.PopFront()

    }

    return true

}


func (p *puddle) serve() {

    defer close(p.killswitch)

    timeout := time.NewTimer(idleTimeout)

    var workerCount int

    var idle bool


Serving:

    for {

        if p.queue.Len() != 0 {

            if !p.process() {

                break Serving

            }

            continue

        }


        select {

        case job, ok := <-p.jobs:

            if !ok {

                break Serving

            }


            // Give a task to our workers

            select {

            case p.workers <- job:

            default:

                // If we are not maxed on workers, create a new one

                if workerCount < p.maxWorkers {

                    go startJob(job, p.workers)

                    workerCount++

                } else {

                    // Place a task on the back of the queue

                    p.queue.PushBack(job)

                }

            }

            idle = false

        case <-timeout.C:

            // Timed out waiting for work to arrive.  Kill a ready worker if

            // pool has been idle for a whole timeout.

            if idle && workerCount > 0 {

                if p.killWorkerIfIdle() {

                    workerCount--

                }

            }

            idle = true

            timeout.Reset(idleTimeout)

        }

    }


    // Allow queued jobs to complete

    if p.wait {

        p.work()

    }


    // Stop all workers before shutting down

    for workerCount > 0 {

        p.workers <- nil

        workerCount--

    }


    timeout.Stop()

}


// work removes each task from the waiting queue and gives it to

// workers until queue is empty.

func (p *puddle) work() {

    for p.queue.Len() != 0 {

        // A worker is ready, so give task to worker.

        p.workers <- p.queue.PopFront()

    }

}


// startJob runs initial task, then starts a worker waiting for more.

func startJob(job func(), workerQueue chan func()) {

    job()

    go worker(workerQueue)

}


// worker executes tasks and stops when it receives a nil task.

func worker(queue chan func()) {

    for job := range queue {

        if job == nil {

            return

        }

        job()

    }

}


// List wraps `container/list`

type List struct {

    list.List

}


// PopFront removes then returns first element in list as func()

func (l *List) PopFront() func() {

    f := l.Front()

    l.Remove(f)

    return f.Value.(func())

}

更新 2

由于您询问如何使用代码,这就是您要这样做的方式。


我变成worker了它自己的包,并编写了另一个 repo 来展示如何使用该包。


工人包

如何使用工人包

worker包裹

package worker


import "fmt"


type JobResponse struct {

    err  error

    name string

    res  int

    url  string

}


type Job interface {

    Name() string

    Callback() JobResponse

}


func Do(jobs []Job, maxWorkers int) {

    jobsPool := make(chan Job, len(jobs))

    resultsPool := make(chan JobResponse, len(jobs))


    for i := 0; i < maxWorkers; i++ {

        go worker(jobsPool, resultsPool)

    }


    makeJobs(jobsPool, jobs)

    getResults(resultsPool, jobs)

}


func worker(jobs <-chan Job, response chan<- JobResponse) {

    for n := range jobs {

        response <- n.Callback()

    }

}


func makeJobs(jobs chan<- Job, queue []Job) {

    for _, t := range queue {

        jobs <- t

    }

}


func getResults(response <-chan JobResponse, queue []Job) {

    for range queue {

        job := <-response

        status := fmt.Sprintf("[result] '%s' to '%s' was fetched with status '%d'\n", job.name, job.url, job.res)

        if job.err != nil {

            status = fmt.Sprintf(job.err.Error())

        }

        fmt.Printf(status)

    }

}

如何使用工人包

package main


import (

    "github.com/oze4/worker"

)


func main() {

    jobs := []worker.Job{

        AddedByUser{name: "1"},

        AddedByUser{name: "2"},

        AddedByUser{name: "3"},

        AddedByUser{name: "4"},

        AddedByUser{name: "5"},

        AddedByUser{name: "6"},

    }

    

    worker.Do(jobs, 5)

}


type AddedByUser struct {

    name string

}


func (a AddedByUser) Name() string {

    return a.name

}


func (a AddedByUser) Callback() worker.JobResponse {

    // User added func/callback goes here

    return worker.JobResponse{}

}

更新

我重命名了一些东西,希望能帮助它更清楚一点。


这是您需要的基础知识:


package main


import (

    "fmt"

)


func main() {

    fmt.Println("Hello, playground")

}


type JobResponse struct {

    err  error

    name string

    res  int

    url  string

}


type Job interface {

    Name() string

    Callback() JobResponse

}


func worker(jobs <-chan Job, response chan<- JobResponse) {

    for n := range jobs {

        response <- n.Callback()

    }

}


func makeJobs(jobs chan<- Job, queue []Job) {

    for _, t := range queue {

        jobs <- t

    }

}


func getResults(response <-chan JobResponse, queue []Job) {

    for range queue {

        j := <-response

        status := fmt.Sprintf("[result] '%s' to '%s' was fetched with status '%d'\n", j.name, j.url, j.res)

        if j.err != nil {

            status = fmt.Sprintf(j.err.Error())

        }

        fmt.Printf(status)

    }

}

只要我满足Job接口,我就可以将它传递给 worker、makeJobs 和 getResults:


type AddedByUser struct {

    name string

}


func (a AddedByUser) Name() string {

    return a.name

}


func (a AddedByUser) Callback() JobResponse {

    // User added func/callback goes here

    return JobResponse{}

}

像这样:


package main


import (

    "fmt"

)


func main() {

    jobsPool := make(chan Job, len(testers))

    resultsPool := make(chan JobResponse, len(testers))


    maxWorkers := 5

    for i := 0; i < maxWorkers; i++ {

        go worker(jobsPool, resultsPool)

    }


    makeJobs(jobsPool, testers)

    getResults(resultsPool, testers)

}


var testers = []Job{

    AddedByUser{name: "abu"}, // Using different types in Job

    Tap{name: "tap"},         // Using different types in Job

}


type AddedByUser struct {

    name string

}


func (a AddedByUser) Name() string {

    return a.name

}


func (a AddedByUser) Callback() JobResponse {

    // User added func/callback goes here

    return JobResponse{}

}


type Tap struct {

    name string

}


func (t Tap) Name() string {

    return t.name

}


func (t Tap) Callback() JobResponse {

    // User added func/callback goes here

    return JobResponse{}

}


type JobResponse struct {

    err  error

    name string

    res  int

    url  string

}


type Job interface {

    Name() string

    Callback() JobResponse

}


func worker(jobs <-chan Job, response chan<- JobResponse) {

    for n := range jobs {

        response <- n.Callback()

    }

}


func makeJobs(jobs chan<- Job, queue []Job) {

    for _, t := range queue {

        jobs <- t

    }

}


func getResults(response <-chan JobResponse, queue []Job) {

    for range queue {

        job := <-response

        status := fmt.Sprintf("[result] '%s' to '%s' was fetched with status '%d'\n", job.name, job.url, job.res)

        if job.err != nil {

            status = fmt.Sprintf(job.err.Error())

        }

        fmt.Printf(status)

    }

}

原始答案

[添加此答案是因为 OP 和我一直在此线程之外交谈]


您的代码中有几个错误,但最终您所要做的就是接受人们给您的建议。你只需要连接点。我建议对您的代码进行故障排除并尝试完全了解问题所在。老实说,这是唯一的学习方法。


我能记住的最大问题是:


需要修改您的HT界面,以便Check(...)签名匹配每个方法

否则,这些结构 ( Tap, Tap1, Tap2) 不满足HT接口,因此不实现 HT

funcs worker(...)、makeJobs(...)和getResults(...)中的参数类型从更改[]Tap为[]HT 

您没有将所有 Taps 聚合到一个切片中

我们可以将所有不同的 Taps 用作 HT 的唯一原因是因为它们都实现了 HT

你在找这样的东西吗?


https://play.golang.org/p/zLmKOKAnX4C


package main


import (

    "fmt"

    "net/http"


    // "os/exec"

    "time"

)


type HT interface {

    Name() string

    Check() testerResponse

}


type testerResponse struct {

    err  error

    name string

    //res  http.Response

    res int

    url string

}


type Tap struct {

    url     string

    name    string

    timeout time.Duration

    client  *http.Client

}


func (p *Tap) Check() testerResponse {

    fmt.Printf("[job][Tap1] Fetching %s %s \n", p.name, p.url)

    p.client = &http.Client{Timeout: p.timeout}

    res, err := p.client.Get(p.url)

    if err != nil {

        return testerResponse{err: err}

    }


    // need to close body

    res.Body.Close()

    return testerResponse{name: p.name, res: res.StatusCode, url: p.url}

}


func (p *Tap) Name() string {

    return p.name

}


// ---- CUSTOM CHECKS-------------

// ---- 1. NEW specific function -------------


type Tap2 struct {

    url     string

    name    string

    timeout time.Duration

    client  *http.Client

}


func (p *Tap2) Check() testerResponse {

    // Do some request here.....

    fmt.Printf("[job][Tap2] Fetching %s %s \n", p.name, p.url)

    return testerResponse{res: 200, url: p.url, name: p.name}

}


func (p *Tap2) Name() string {

    return "yahoo custom check"

}


// ---- 2. NEW specific function which is not running http


type Tap3 struct {

    url     string

    name    string

    timeout time.Duration

    client  *http.Client

}


func (p *Tap3) Check() testerResponse {

    // Do some request here....

    fmt.Printf("[job][Tap3] Fetching %s %s \n", p.name, p.url)

    return testerResponse{res: 200, url: p.url, name: p.name}

}


func (p *Tap3) Name() string {

    return "custom check2"

}


// makeJobs fills up our jobs channel

func makeJobs(jch chan<- HT, jobs []HT) {

    for _, t := range jobs {

        jch <- t

    }

}


// getResults takes a job from our jobs channel, gets the result, and

// places it on the results channel

func getResults(tr <-chan testerResponse, jobs []HT) []testerResponse {

    var rts []testerResponse

    var r testerResponse

    for range jobs {

        r = <-tr

        status := fmt.Sprintf("[result] '%s' to '%s' was fetched with status '%d'\n", r.name, r.url, r.res)

        if r.err != nil {

            status = fmt.Sprintf(r.err.Error())

        }

        fmt.Printf(status)

        rts = append(rts, r)

    }

    return rts

}


// worker defines our worker func. as long as there is a job in the

// "queue" we continue to pick up  the "next" job

func worker(jobs <-chan HT, results chan<- testerResponse) {

    for n := range jobs {

        results <- n.Check()

    }

}


var (

    testers1 = []Tap{

        {

            name:    "First Tap1",

            url:     "http://google.com",

            timeout: time.Second * 20,

        },

        {

            name:    "Second Tap1",

            url:     "http://stackoverflow.com",

            timeout: time.Second * 20,

        },

    }


    testers2 = []Tap2{

        {

            name: "First Tap2",

            url:  "http://1.tap2.com",

        },

        {

            name: "Second Tap2",

            url:  "http://2.tap2.com",

        },

    }


    testers3 = []Tap3{

        {

            name: "First Tap3",

            url:  "http://1.tap3.com",

        },

        {

            name: "Second Tap3",

            url:  "http://2.tap3.com",

        },

    }

)


func main() {

    // Aggregate all testers into one slice

    var testers []HT

    for _, t1 := range testers1 {

        testers = append(testers, &t1)

    }

    for _, t2 := range testers2 {

        testers = append(testers, &t2)

    }

    for _, t3 := range testers3 {

        testers = append(testers, &t3)

    }


    // Make buffered channels

    buffer := len(testers)

    jobsPipe := make(chan HT, buffer)                // Jobs will be of type `HT`

    resultsPipe := make(chan testerResponse, buffer) // Results will be of type `testerResponse`


    // Create worker pool

    // Max workers default is 5

    maxWorkers := 5

    for i := 0; i < maxWorkers; i++ {

        go worker(jobsPipe, resultsPipe)

    }


    makeJobs(jobsPipe, testers)

    getResults(resultsPipe, testers)

    //fmt.Println("at the end",tr)

}

哪个输出:


// [job][Tap1] Fetching Second Tap1 http://stackoverflow.com 

// [job][Tap2] Fetching Second Tap2 http://2.tap2.com 

// [job][Tap3] Fetching Second Tap3 http://2.tap3.com 

// [job][Tap3] Fetching Second Tap3 http://2.tap3.com 

// [result] 'Second Tap2' to 'http://2.tap2.com' was fetched with status '200'

// [result] 'Second Tap3' to 'http://2.tap3.com' was fetched with status '200'

// [result] 'Second Tap3' to 'http://2.tap3.com' was fetched with status '200'

// [job][Tap2] Fetching Second Tap2 http://2.tap2.com 

// [job][Tap1] Fetching Second Tap1 http://stackoverflow.com 

// [result] 'Second Tap2' to 'http://2.tap2.com' was fetched with status '200'

// [result] 'Second Tap1' to 'http://stackoverflow.com' was fetched with status '200'

// [result] 'Second Tap1' to 'http://stackoverflow.com' was fetched with status '200'


查看完整回答
反对 回复 2022-07-04
?
qq_遁去的一_1

TA贡献1725条经验 获得超8个赞

据我了解,您希望您的员工接受其他测试人员


查看您的代码后,您似乎将所有部分都放在了正确的位置,并且需要在此处进行一些小的更改


// makeJobs fills up our jobs channel

func makeJobs(jobs chan<- HT, taps []Tap) {

    for _, t := range taps {

        jobs <- t

    }

}


// getResults takes a job from our jobs channel, gets the result, and

// places it on the results channel

func getResults(tr <-chan HT, taps []Tap) {

    for range taps {

        r := <-tr

        status := fmt.Sprintf("'%s' to '%s' was fetched with status '%d'\n", r.name, r.url, r.res.StatusCode)

        if r.err != nil {

            status = fmt.Sprintf(r.err.Error())

        }

        fmt.Printf(status)

    }

}


// worker defines our worker func. as long as there is a job in the

// "queue" we continue to pick up  the "next" job

func worker(jobs <-chan HT, results chan<- testerResponse) {

    for n := range jobs {

        results <- n.Check()

    }

}

现在,如果您看到您的作业队列可以接受任何实现 HT 接口的类型,那么如果您想要一个新作业,请说 Tap2,您只需


type Tap2 struct{...}


func (p *Tap2) Check() testerResponse {...}


func (p *Tap) Name() string {...}

现在您可以将 Tap 和 Tap2 推送到同一个 jobQueue,因为 job Queue 接受任何实现 HT 的类型


查看完整回答
反对 回复 2022-07-04
  • 2 回答
  • 0 关注
  • 146 浏览
慕课专栏
更多

添加回答

举报

0/150
提交
取消
微信客服

购课补贴
联系客服咨询优惠详情

帮助反馈 APP下载

慕课网APP
您的移动学习伙伴

公众号

扫描二维码
关注慕课网微信公众号