为了账号安全,请及时绑定邮箱和手机立即绑定

如何并行化递归函数

如何并行化递归函数

Go
慕妹3146593 2022-08-01 11:00:39
我试图在 Go 中并行化递归问题,我不确定最好的方法是什么。我有一个递归函数,它的工作原理如下:func recFunc(input string) (result []string) {    for subInput := range getSubInputs(input) {        subOutput := recFunc(subInput)        result = result.append(result, subOutput...)    }    result = result.append(result, getOutput(input)...)}func main() {    output := recFunc("some_input")    ...}因此,该函数调用自身时间(其中N在某个级别为0),生成自己的输出并返回列表中的所有内容。N现在我想让这个函数并行运行。但我不确定最干净的方法来做到这一点。我的想法:有一个“结果”通道,所有函数调用都向该通道发送其结果。在 main 函数中收集结果。有一个等待组,用于确定何时收集所有结果。问题:我需要等待等待组并并行收集所有结果。我可以为此启动一个单独的 go 函数,但是我该如何退出这个单独的 go 函数呢?func recFunc(input string) (result []string, outputChannel chan []string, waitGroup &sync.WaitGroup) {    defer waitGroup.Done()    waitGroup.Add(len(getSubInputs(input))    for subInput := range getSubInputs(input) {        go recFunc(subInput)    }    outputChannel <-getOutput(input)}func main() {    outputChannel := make(chan []string)    waitGroup := sync.WaitGroup{}    waitGroup.Add(1)    go recFunc("some_input", outputChannel, &waitGroup)    result := []string{}    go func() {       nextResult := <- outputChannel       result = append(result, nextResult ...)    }    waitGroup.Wait()}也许有更好的方法来做到这一点?或者,我如何确保收集结果的匿名 go 函数在完成时被截断?
查看完整描述

2 回答

?
小唯快跑啊

TA贡献1863条经验 获得超2个赞

博士;

  • 递归算法应该对昂贵的资源(网络连接、goroutines、堆栈空间等)有限制。

  • 应支持取消 - 以确保在不再需要结果时可以快速清理昂贵的操作

  • 分支遍历应支持错误报告;这允许错误冒泡堆栈并返回部分结果,而不会使整个递归遍历失败。


对于同步结果 ( 无论是否使用递归 - 建议使用通道。此外,对于具有许多 goroutine 的长时间运行的作业,请提供取消方法(上下文。上下文)以帮助清理。

由于递归可能导致资源呈指数级消耗,因此设置限制非常重要(请参阅有界并行性)。

以下是我经常用于异步任务的设计模式:

  • 始终支持采取上下文。取消的上下文

  • 任务所需的工作人员数

  • 返回 a 的结果和 a(将只返回一个错误或chanchan errornil)

var (

    workers = 10

    ctx     = context.TODO() // use request context here - otherwise context.Background()

    input   = "abc"

)


resultC, errC := recJob(ctx, workers, input) // returns results & `error` channels


// asynchronous results - so read that channel first in the event of partial results ...

for r := range resultC {

    fmt.Println(r)

}


// ... then check for any errors

if err := <-errC; err != nil {

    log.Fatal(err)

}

递归:

由于递归可以快速水平扩展,因此需要一种一致的方式来填充有限的工人列表,同时还要确保当工人被释放时,他们能够快速从其他(过度工作)工人那里接手工作。

与其创建经理层,不如采用合作的同事对等系统:

  • 每个工作线程共享一个输入通道

  • 在输入上递归之前 () 检查是否有任何其他工作线程处于空闲状态subIinputs

    • 如果是这样,请委派给该工作人员

    • 如果不是,则当前工作线程继续递归该分支

有了这个算法,有限的工人数量很快就会被工作所淹没。任何提前完成分支的工人 - 将很快从另一个工人那里获得子分支。最终,所有工作线程都将用完子分支,此时所有工作线程都将空闲(阻止),递归任务可以完成。

要实现这一目标,需要进行一些认真的协调。允许工作线程写入输入通道有助于通过委派进行这种对等协调。“递归深度”用于跟踪所有工作线程的所有分支何时耗尽。WaitGroup

(为了包括上下文支持和错误链接 - 我更新了您的函数以采用a并返回可选):getSubInputsctxerror

func recFunc(ctx context.Context, input string, in chan string, out chan<- string, rwg *sync.WaitGroup) error {


    defer rwg.Done() // decrement recursion count when a depth of recursion has completed


    subInputs, err := getSubInputs(ctx, input)

    if err != nil {

        return err

    }


    for subInput := range subInputs { 

        rwg.Add(1) // about to recurse (or delegate recursion)


        select {

        case in <- subInput:

            // delegated - to another goroutine


        case <-ctx.Done():

            // context canceled...


            // but first we need to undo the earlier `rwg.Add(1)`

            // as this work item was never delegated or handled by this worker

            rwg.Done()

            return ctx.Err()


        default:

            // noone available to delegate - so this worker will need to recurse this item themselves

            err = recFunc(ctx, subInput, in, out, rwg)

            if err != nil {

                return err

            }

        }


        select {

        case <-ctx.Done():

            // always check context when doing anything potentially blocking (in this case writing to `out`)

            // context canceled

            return ctx.Err()


        case out <- subInput:

        }

    }


    return nil

}

连接工件:

recJob创建:

  • 输入和输出通道 - 由所有工人共享

  • “递归”检测所有工作线程何时空闲WaitGroup

    • 然后可以安全地关闭“输出”通道

  • 所有工作线程的错误通道

  • 通过将初始输入写入输入通道来启动递归工作负载

func recJob(ctx context.Context, workers int, input string) (resultsC <-chan string, errC <-chan error) {


    // RW channels

    out := make(chan string)

    eC := make(chan error, 1)


    // R-only channels returned to caller

    resultsC, errC = out, eC


    // create workers + waitgroup logic

    go func() {


        var err error // error that will be returned to call via error channel


        defer func() {

            close(out)

            eC <- err

            close(eC)

        }()


        var wg sync.WaitGroup

        wg.Add(1)

        in := make(chan string) // input channel: shared by all workers (to read from and also to write to when they need to delegate)


        workerErrC := createWorkers(ctx, workers, in, out, &wg)


        // get the ball rolling, pass input job to one of the workers

        // Note: must be done *after* workers are created - otherwise deadlock

        in <- input


        errCount := 0


        // wait for all worker error codes to return

        for err2 := range workerErrC {

            if err2 != nil {

                log.Println("worker error:", err2)

                errCount++

            }

        }


        // all workers have completed

        if errCount > 0 {

            err = fmt.Errorf("PARTIAL RESULT: %d of %d workers encountered errors", errCount, workers)

            return

        }


        log.Printf("All %d workers have FINISHED\n", workers)

    }()


    return

}

最后,创建工作线程:


func createWorkers(ctx context.Context, workers int, in chan string, out chan<- string, rwg *sync.WaitGroup) (errC <-chan error) {


    eC := make(chan error) // RW-version

    errC = eC              // RO-version (returned to caller)


    // track the completeness of the workers - so we know when to wrap up

    var wg sync.WaitGroup

    wg.Add(workers)


    for i := 0; i < workers; i++ {

        i := i

        go func() {

            defer wg.Done()


            var err error


            // ensure the current worker's return code gets returned

            // via the common workers' error-channel

            defer func() {

                if err != nil {

                    log.Printf("worker #%3d ERRORED: %s\n", i+1, err)

                } else {

                    log.Printf("worker #%3d FINISHED.\n", i+1)

                }

                eC <- err

            }()


            log.Printf("worker #%3d STARTED successfully\n", i+1)


            // worker scans for input

            for input := range in {


                err = recFunc(ctx, input, in, out, rwg)

                if err != nil {

                    log.Printf("worker #%3d recurseManagers ERROR: %s\n", i+1, err)

                    return

                }

            }


        }()

    }


    go func() {

        rwg.Wait() // wait for all recursion to finish

        close(in)  // safe to close input channel as all workers are blocked (i.e. no new inputs)

        wg.Wait()  // now wait for all workers to return

        close(eC)  // finally, signal to caller we're truly done by closing workers' error-channel

    }()


    return

}


查看完整回答
反对 回复 2022-08-01
?
元芳怎么了

TA贡献1798条经验 获得超7个赞

我可以为此启动一个单独的 go 函数,但是我该如何退出这个单独的 go 函数呢?


您可以在单独的 go-routine 中通过输出通道。在这种情况下,当通道关闭时,go-routine将安全退出range


go func() {

   for nextResult := range outputChannel {

     result = append(result, nextResult ...)

   }

}

因此,现在我们需要注意的是,在作为递归函数调用的一部分生成的所有go-routine都成功存在之后,通道被关闭。


为此,您可以在所有 go 例程中使用共享等待组,并在主函数中等待该等待组,就像您已经在做的那样。等待结束后,关闭输出通道,以便其他 go-routine 也安全退出


func recFunc(input string, outputChannel chan, wg &sync.WaitGroup) {

    defer wg.Done()

    for subInput := range getSubInputs(input) {

        wg.Add(1)

        go recFunc(subInput)

    }

    outputChannel <-getOutput(input)

}


func main() {

    outputChannel := make(chan []string)

    waitGroup := sync.WaitGroup{}


    waitGroup.Add(1)

    go recFunc("some_input", outputChannel, &waitGroup)


    result := []string{}

    go func() {

     for nextResult := range outputChannel {

      result = append(result, nextResult ...)

     }

    }

    waitGroup.Wait()

    close(outputChannel)        

}

PS:如果你想有界并行度来限制指数增长,请查看这个


查看完整回答
反对 回复 2022-08-01
  • 2 回答
  • 0 关注
  • 176 浏览
慕课专栏
更多

添加回答

举报

0/150
提交
取消
微信客服

购课补贴
联系客服咨询优惠详情

帮助反馈 APP下载

慕课网APP
您的移动学习伙伴

公众号

扫描二维码
关注慕课网微信公众号