为了账号安全,请及时绑定邮箱和手机立即绑定

并发运行速度不快

并发运行速度不快

Go
阿晨1998 2022-09-12 16:05:42
我已经写了一个代码,试图使用并发,但它无助于更快的运行。我该如何改进?package mainimport (    "bufio"    "fmt"    "os"    "strings"    "sync")var wg sync.WaitGroupfunc checkerr(e error) {    if e != nil {        fmt.Println(e)    }}func readFile() {    file, err := os.Open("data.txt")    checkerr(err)    fres, err := os.Create("resdef.txt")    checkerr(err)    defer file.Close()    defer fres.Close()    scanner := bufio.NewScanner(file)    for scanner.Scan() {        wg.Add(1)        go func() {            words := strings.Fields(scanner.Text())            shellsort(words)            writeToFile(fres, words)            wg.Done()        }()        wg.Wait()    }}func shellsort(words []string) {    for inc := len(words) / 2; inc > 0; inc = (inc + 1) * 5 / 11 {        for i := inc; i < len(words); i++ {            j, temp := i, words[i]            for ; j >= inc && strings.ToLower(words[j-inc]) > strings.ToLower(temp); j -= inc {                words[j] = words[j-inc]            }            words[j] = temp        }    }}func writeToFile(f *os.File, words []string) {    datawriter := bufio.NewWriter(f)    for _, s := range words {        datawriter.WriteString(s + " ")    }    datawriter.WriteString("\n")    datawriter.Flush()}func main() {    readFile()}一切都很好,除了在没有并发的情况下,做所有事情需要相同的时间。
查看完整描述

3 回答

?
狐的传说

TA贡献1804条经验 获得超3个赞

您必须在循环后放置:wg.Wait()for


    for condition {

        wg.Add(1)

        go func() {

            // a concurrent job here

            wg.Done()

        }()       

    }

    wg.Wait()

注意:工作本身应具有并发性质。


这是我测试的解决方案 - 按顺序从输入文件中读取,然后执行并发任务,最后按顺序写入输出文件,请尝试以下操作:n


package main


import (

    "bufio"

    "fmt"

    "log"

    "os"

    "runtime"

    "sort"

    "strings"

    "sync"

)


type sortQueue struct {

    index int

    data  []string

}


func main() {

    n := runtime.NumCPU()

    a := make(chan sortQueue, n)

    b := make(chan sortQueue, n)

    var wg sync.WaitGroup

    for i := 0; i < n; i++ {

        wg.Add(1)

        go parSort(a, b, &wg)

    }

    go func() {

        file, err := os.Open("data.txt")

        if err != nil {

            log.Fatal(err)

        }

        defer file.Close()

        scanner := bufio.NewScanner(file)

        i := 0

        for scanner.Scan() {

            a <- sortQueue{index: i, data: strings.Fields(scanner.Text())}

            i++

        }

        close(a)

        err = scanner.Err()

        if err != nil {

            log.Fatal(err)

        }

    }()


    fres, err := os.Create("resdef.txt")

    if err != nil {

        log.Fatal(err)

    }

    defer fres.Close()

    go func() {

        wg.Wait()

        close(b)

    }()

    writeToFile(fres, b, n)

}


func writeToFile(f *os.File, b chan sortQueue, n int) {

    m := make(map[int][]string, n)

    order := 0

    for v := range b {

        m[v.index] = v.data

        var slice []string

        exist := true

        for exist {

            slice, exist = m[order]

            if exist {

                delete(m, order)

                order++

                s := strings.Join(slice, " ")

                fmt.Println(s)

                _, err := f.WriteString(s + "\n")

                if err != nil {

                    log.Fatal(err)

                }

            }

        }

    }

}


func parSort(a, b chan sortQueue, wg *sync.WaitGroup) {

    defer wg.Done()

    for q := range a {

        sort.Slice(q.data, func(i, j int) bool { return q.data[i] < q.data[j] })

        b <- q

    }

}


data.txt文件:


1 2 0 3


a 1 b d 0 c 


aa cc bb


输出:


0 1 2 3


0 1 a b c d


aa bb cc


查看完整回答
反对 回复 2022-09-12
?
一只名叫tom的猫

TA贡献1906条经验 获得超3个赞

您没有并行化任何内容,因为对于每个调用,您都有匹配的调用。这是一对一的:你生成一个Go例程,然后立即阻止主要的Go例程,等待新生成的例程完成。wg.Add(1)wg.Wait()


a 的要点是等待许多事情完成,只需调用一次,即可生成所有 Go 例程。WaitGroupwg.Wait()


但是,除了将呼叫固定为 ,还需要控制对扫描仪的并发访问。一种方法可能是使用一个通道,让扫描仪向等待的 Go 例程发出文本行:wg.Wait


    lines := make(chan string)


    go func() {

        for line := range lines {

            go func(line string) {

                words := strings.Fields(line)

                shellsort(words)

                writeToFile(fres, words)

            }(line)

        }

    }()


    scanner := bufio.NewScanner(file)

    for scanner.Scan() {

        lines <- scanner.Text()

    }

    close(lines)


请注意,这可能会导致文件中出现乱码输出,因为您有许多并发的 Go 例程同时写入其结果。您可以通过第二个通道控制输出:


    lines := make(chan string)

    out := make(chan []string)


    go func() {

        for line := range lines {

            go func(line string) {

                words := strings.Fields(line)

                shellsort(words)

                out <- words

            }(line)

        }

    }()


    go func() {

        for words := range out {

            writeToFile(fres, words)

        }

    }()


    scanner := bufio.NewScanner(file)

    for scanner.Scan() {

        lines <- scanner.Text()

    }

    close(lines)

    close(out)

此时,您可以重构为“读取器”、“处理器”和“写入器”,它们形成通过通道进行通信的管道。


读取器和写入器使用单个 go 例程来防止对资源的并发访问,而处理器生成许多 go 例程(当前未绑定)以跨多个处理器“扇出”工作:


package main


import (

    "bufio"

    "os"

    "strings"

)


func main() {

    lines := reader()

    out := processor(lines)

    writer(out)

}



func reader() chan<- string {

    lines := make(chan string)


    file, err := os.Open("data.txt")

    checkerr(err)

    go func() {

        scanner := bufio.NewScanner(file)

        for scanner.Scan() {

            lines <- scanner.Text()

        }

        close(lines)

    }()


    return lines

}



func processor(lines chan<- string) chan []string {

    out := make(chan []string)


    go func() {

        for line := range lines {

            go func(line string) {

                words := strings.Fields(line)

                shellsort(words)

                out <- words

            }(line)

        }

        close(out)

    }()

    return out

}


func writer(out chan<- []string) {

    fres, err := os.Create("resdef.txt")

    checkerr(err)

    for words := range out {

        writeToFile(fres, words)

    }

}


查看完整回答
反对 回复 2022-09-12
?
喵喵时光机

TA贡献1846条经验 获得超7个赞

正如其他答案所说,通过等待每次循环迭代,您将并发性限制为1(无并发)。有很多方法可以解决这个问题,但什么是正确的完全取决于什么需要时间,而这个问题还没有表现出来。并发不会神奇地使事情变得更快;它只是让事情同时发生,这只会让事情变得更快,如果花费大量时间的事情可以同时发生。WaitGroup


据推测,在您的代码中,需要很长时间的事情是排序。如果是这种情况,您可以执行如下操作:


results := make(chan []string)

for scanner.Scan() {

    wg.Add(1)

    go func(line string) {

        words := strings.Fields(line)

        shellsort(words)

        result <- words

    }(scanner.Text())

}


go func() {

    wg.Wait()

    close(results)

}()


for words := range results {

    writeToFile(fres, words)

}

这会将 移动到应有的位置,并避免并发使用扫描程序和写入器。这应该比串行处理更快,如果排序花费了大量的处理时间。Wait


查看完整回答
反对 回复 2022-09-12
  • 3 回答
  • 0 关注
  • 133 浏览
慕课专栏
更多

添加回答

举报

0/150
提交
取消
微信客服

购课补贴
联系客服咨询优惠详情

帮助反馈 APP下载

慕课网APP
您的移动学习伙伴

公众号

扫描二维码
关注慕课网微信公众号