3 回答
TA贡献1804条经验 获得超3个赞
您必须在循环后放置:wg.Wait()for
for condition {
wg.Add(1)
go func() {
// a concurrent job here
wg.Done()
}()
}
wg.Wait()
注意:工作本身应具有并发性质。
这是我测试的解决方案 - 按顺序从输入文件中读取,然后执行并发任务,最后按顺序写入输出文件,请尝试以下操作:n
package main
import (
"bufio"
"fmt"
"log"
"os"
"runtime"
"sort"
"strings"
"sync"
)
type sortQueue struct {
index int
data []string
}
func main() {
n := runtime.NumCPU()
a := make(chan sortQueue, n)
b := make(chan sortQueue, n)
var wg sync.WaitGroup
for i := 0; i < n; i++ {
wg.Add(1)
go parSort(a, b, &wg)
}
go func() {
file, err := os.Open("data.txt")
if err != nil {
log.Fatal(err)
}
defer file.Close()
scanner := bufio.NewScanner(file)
i := 0
for scanner.Scan() {
a <- sortQueue{index: i, data: strings.Fields(scanner.Text())}
i++
}
close(a)
err = scanner.Err()
if err != nil {
log.Fatal(err)
}
}()
fres, err := os.Create("resdef.txt")
if err != nil {
log.Fatal(err)
}
defer fres.Close()
go func() {
wg.Wait()
close(b)
}()
writeToFile(fres, b, n)
}
func writeToFile(f *os.File, b chan sortQueue, n int) {
m := make(map[int][]string, n)
order := 0
for v := range b {
m[v.index] = v.data
var slice []string
exist := true
for exist {
slice, exist = m[order]
if exist {
delete(m, order)
order++
s := strings.Join(slice, " ")
fmt.Println(s)
_, err := f.WriteString(s + "\n")
if err != nil {
log.Fatal(err)
}
}
}
}
}
func parSort(a, b chan sortQueue, wg *sync.WaitGroup) {
defer wg.Done()
for q := range a {
sort.Slice(q.data, func(i, j int) bool { return q.data[i] < q.data[j] })
b <- q
}
}
data.txt文件:
1 2 0 3
a 1 b d 0 c
aa cc bb
输出:
0 1 2 3
0 1 a b c d
aa bb cc
TA贡献1906条经验 获得超3个赞
您没有并行化任何内容,因为对于每个调用,您都有匹配的调用。这是一对一的:你生成一个Go例程,然后立即阻止主要的Go例程,等待新生成的例程完成。wg.Add(1)wg.Wait()
a 的要点是等待许多事情完成,只需调用一次,即可生成所有 Go 例程。WaitGroupwg.Wait()
但是,除了将呼叫固定为 ,还需要控制对扫描仪的并发访问。一种方法可能是使用一个通道,让扫描仪向等待的 Go 例程发出文本行:wg.Wait
lines := make(chan string)
go func() {
for line := range lines {
go func(line string) {
words := strings.Fields(line)
shellsort(words)
writeToFile(fres, words)
}(line)
}
}()
scanner := bufio.NewScanner(file)
for scanner.Scan() {
lines <- scanner.Text()
}
close(lines)
请注意,这可能会导致文件中出现乱码输出,因为您有许多并发的 Go 例程同时写入其结果。您可以通过第二个通道控制输出:
lines := make(chan string)
out := make(chan []string)
go func() {
for line := range lines {
go func(line string) {
words := strings.Fields(line)
shellsort(words)
out <- words
}(line)
}
}()
go func() {
for words := range out {
writeToFile(fres, words)
}
}()
scanner := bufio.NewScanner(file)
for scanner.Scan() {
lines <- scanner.Text()
}
close(lines)
close(out)
此时,您可以重构为“读取器”、“处理器”和“写入器”,它们形成通过通道进行通信的管道。
读取器和写入器使用单个 go 例程来防止对资源的并发访问,而处理器生成许多 go 例程(当前未绑定)以跨多个处理器“扇出”工作:
package main
import (
"bufio"
"os"
"strings"
)
func main() {
lines := reader()
out := processor(lines)
writer(out)
}
func reader() chan<- string {
lines := make(chan string)
file, err := os.Open("data.txt")
checkerr(err)
go func() {
scanner := bufio.NewScanner(file)
for scanner.Scan() {
lines <- scanner.Text()
}
close(lines)
}()
return lines
}
func processor(lines chan<- string) chan []string {
out := make(chan []string)
go func() {
for line := range lines {
go func(line string) {
words := strings.Fields(line)
shellsort(words)
out <- words
}(line)
}
close(out)
}()
return out
}
func writer(out chan<- []string) {
fres, err := os.Create("resdef.txt")
checkerr(err)
for words := range out {
writeToFile(fres, words)
}
}
TA贡献1846条经验 获得超7个赞
正如其他答案所说,通过等待每次循环迭代,您将并发性限制为1(无并发)。有很多方法可以解决这个问题,但什么是正确的完全取决于什么需要时间,而这个问题还没有表现出来。并发不会神奇地使事情变得更快;它只是让事情同时发生,这只会让事情变得更快,如果花费大量时间的事情可以同时发生。WaitGroup
据推测,在您的代码中,需要很长时间的事情是排序。如果是这种情况,您可以执行如下操作:
results := make(chan []string)
for scanner.Scan() {
wg.Add(1)
go func(line string) {
words := strings.Fields(line)
shellsort(words)
result <- words
}(scanner.Text())
}
go func() {
wg.Wait()
close(results)
}()
for words := range results {
writeToFile(fres, words)
}
这会将 移动到应有的位置,并避免并发使用扫描程序和写入器。这应该比串行处理更快,如果排序花费了大量的处理时间。Wait
- 3 回答
- 0 关注
- 133 浏览
添加回答
举报
