3 回答

TA貢獻1804條經驗 獲得超3個贊
您必須在循環后放置:wg.Wait()for
for condition {
wg.Add(1)
go func() {
// a concurrent job here
wg.Done()
}()
}
wg.Wait()
注意:工作本身應具有并發性質。
這是我測試的解決方案 - 按順序從輸入文件中讀取,然后執行并發任務,最后按順序寫入輸出文件,請嘗試以下操作:n
package main
import (
"bufio"
"fmt"
"log"
"os"
"runtime"
"sort"
"strings"
"sync"
)
type sortQueue struct {
index int
data []string
}
func main() {
n := runtime.NumCPU()
a := make(chan sortQueue, n)
b := make(chan sortQueue, n)
var wg sync.WaitGroup
for i := 0; i < n; i++ {
wg.Add(1)
go parSort(a, b, &wg)
}
go func() {
file, err := os.Open("data.txt")
if err != nil {
log.Fatal(err)
}
defer file.Close()
scanner := bufio.NewScanner(file)
i := 0
for scanner.Scan() {
a <- sortQueue{index: i, data: strings.Fields(scanner.Text())}
i++
}
close(a)
err = scanner.Err()
if err != nil {
log.Fatal(err)
}
}()
fres, err := os.Create("resdef.txt")
if err != nil {
log.Fatal(err)
}
defer fres.Close()
go func() {
wg.Wait()
close(b)
}()
writeToFile(fres, b, n)
}
func writeToFile(f *os.File, b chan sortQueue, n int) {
m := make(map[int][]string, n)
order := 0
for v := range b {
m[v.index] = v.data
var slice []string
exist := true
for exist {
slice, exist = m[order]
if exist {
delete(m, order)
order++
s := strings.Join(slice, " ")
fmt.Println(s)
_, err := f.WriteString(s + "\n")
if err != nil {
log.Fatal(err)
}
}
}
}
}
func parSort(a, b chan sortQueue, wg *sync.WaitGroup) {
defer wg.Done()
for q := range a {
sort.Slice(q.data, func(i, j int) bool { return q.data[i] < q.data[j] })
b <- q
}
}
data.txt文件:
1 2 0 3
a 1 b d 0 c
aa cc bb
輸出:
0 1 2 3
0 1 a b c d
aa bb cc

TA貢獻1906條經驗 獲得超3個贊
您沒有并行化任何內容,因為對于每個調用,您都有匹配的調用。這是一對一的:你生成一個Go例程,然后立即阻止主要的Go例程,等待新生成的例程完成。wg.Add(1)wg.Wait()
a 的要點是等待許多事情完成,只需調用一次,即可生成所有 Go 例程。WaitGroupwg.Wait()
但是,除了將呼叫固定為 ,還需要控制對掃描儀的并發訪問。一種方法可能是使用一個通道,讓掃描儀向等待的 Go 例程發出文本行:wg.Wait
lines := make(chan string)
go func() {
for line := range lines {
go func(line string) {
words := strings.Fields(line)
shellsort(words)
writeToFile(fres, words)
}(line)
}
}()
scanner := bufio.NewScanner(file)
for scanner.Scan() {
lines <- scanner.Text()
}
close(lines)
請注意,這可能會導致文件中出現亂碼輸出,因為您有許多并發的 Go 例程同時寫入其結果。您可以通過第二個通道控制輸出:
lines := make(chan string)
out := make(chan []string)
go func() {
for line := range lines {
go func(line string) {
words := strings.Fields(line)
shellsort(words)
out <- words
}(line)
}
}()
go func() {
for words := range out {
writeToFile(fres, words)
}
}()
scanner := bufio.NewScanner(file)
for scanner.Scan() {
lines <- scanner.Text()
}
close(lines)
close(out)
此時,您可以重構為“讀取器”、“處理器”和“寫入器”,它們形成通過通道進行通信的管道。
讀取器和寫入器使用單個 go 例程來防止對資源的并發訪問,而處理器生成許多 go 例程(當前未綁定)以跨多個處理器“扇出”工作:
package main
import (
"bufio"
"os"
"strings"
)
func main() {
lines := reader()
out := processor(lines)
writer(out)
}
func reader() chan<- string {
lines := make(chan string)
file, err := os.Open("data.txt")
checkerr(err)
go func() {
scanner := bufio.NewScanner(file)
for scanner.Scan() {
lines <- scanner.Text()
}
close(lines)
}()
return lines
}
func processor(lines chan<- string) chan []string {
out := make(chan []string)
go func() {
for line := range lines {
go func(line string) {
words := strings.Fields(line)
shellsort(words)
out <- words
}(line)
}
close(out)
}()
return out
}
func writer(out chan<- []string) {
fres, err := os.Create("resdef.txt")
checkerr(err)
for words := range out {
writeToFile(fres, words)
}
}

TA貢獻1846條經驗 獲得超7個贊
正如其他答案所說,通過等待每次循環迭代,您將并發性限制為1(無并發)。有很多方法可以解決這個問題,但什么是正確的完全取決于什么需要時間,而這個問題還沒有表現出來。并發不會神奇地使事情變得更快;它只是讓事情同時發生,這只會讓事情變得更快,如果花費大量時間的事情可以同時發生。WaitGroup
據推測,在您的代碼中,需要很長時間的事情是排序。如果是這種情況,您可以執行如下操作:
results := make(chan []string)
for scanner.Scan() {
wg.Add(1)
go func(line string) {
words := strings.Fields(line)
shellsort(words)
result <- words
}(scanner.Text())
}
go func() {
wg.Wait()
close(results)
}()
for words := range results {
writeToFile(fres, words)
}
這會將 移動到應有的位置,并避免并發使用掃描程序和寫入器。這應該比串行處理更快,如果排序花費了大量的處理時間。Wait
- 3 回答
- 0 關注
- 100 瀏覽
添加回答
舉報