1 回答

TA貢獻1859條經驗 獲得超6個贊
嘗試這個:
package main
import "fmt"
import "sync"
import "sync/atomic"
import "runtime"
import "math/rand"
import "time"
type MapFn func(input int) *kvPair
type ReduceFn func(a int, b int) int
type kvPair struct {
k int
v int
}
type reducePair struct {
k int
v1 int
v2 int
}
func MapReduce(mapFn MapFn, reduceFn ReduceFn, input []int, nMappers int, nReducers int) (map[int]int, error) {
inputMapChan := make(chan int, len(input))
outputMapChan := make(chan *kvPair, len(input))
reduceInputChan := make(chan *reducePair)
outputMapMap := make(map[int]int)
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
for _, v := range input {
inputMapChan <- v
}
close(inputMapChan)
}()
for i := 0; i < nMappers; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for v := range inputMapChan {
outputMapChan <- mapFn(v)
}
}()
}
finished := false
go func() {
wg.Wait()
finished = true
}()
var count int64
for i := 0; i < nReducers; i++ {
go func() {
for v := range reduceInputChan {
reduceValue := reduceFn(v.v1, v.v2)
outputMapChan <- &kvPair{v.k, reduceValue}
atomic.AddInt64(&count, -1)
}
}()
}
wg2 := sync.WaitGroup{}
wg2.Add(1)
go func() {
defer wg2.Done()
for {
select {
default:
if finished && atomic.LoadInt64(&count) == 0 && len(outputMapChan) == 0 {
return
}
//runtime.Gosched()
case v := <-outputMapChan:
key := v.k
value := v.v
if other, ok := outputMapMap[key]; ok {
delete(outputMapMap, key)
atomic.AddInt64(&count, 1)
reduceInputChan <- &reducePair{key, value, other}
} else {
outputMapMap[key] = value
}
}
}
}()
wg2.Wait()
return outputMapMap, nil
}
func main() {
fmt.Println("NumCPU =", runtime.NumCPU())
t := time.Now()
a := rand.Perm(1000000)
//a = []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 1, 12, 13, 1, 16, 2}
m, err := MapReduce(mp, rdc, a, 2, 2)
if err != nil {
panic(err)
}
fmt.Println(time.Since(t)) //883ms
fmt.Println(m)
fmt.Println("done.")
}
func mp(input int) *kvPair {
return &kvPair{input & 7, input >> 3}
}
func rdc(a int, b int) int {
b <<= 3
if a != 0 {
b |= a
}
return b
}
- 1 回答
- 0 關注
- 184 瀏覽
添加回答
舉報