亚洲在线久爱草,狠狠天天香蕉网,天天搞日日干久草,伊人亚洲日本欧美

為了賬號安全,請及時綁定郵箱和手機立即綁定
已解決430363個問題,去搜搜看,總會有你想問的

關閉具有周期性依賴關系的通道

關閉具有周期性依賴關系的通道

Go
呼喚遠方 2022-03-02 13:30:32
我正在嘗試在 Golang 中實現類似 mapreduce 的方法。我的設計如下:地圖工作者從映射器輸入通道中拉出項目并輸出到映射器輸出通道然后由單個 goroutine 讀取映射器輸出通道。此例程維護先前看到的鍵值對的映射。如果映射器輸出的下一項具有匹配的鍵,它會將具有匹配鍵的新舊值都發送到 reduce-input 通道。reduce-input 管道將兩個值縮減為一個鍵值對,并將結果提交到同一個 map-output 通道。這導致 mapper 輸出和 reduce 輸入之間存在循環依賴關系,我現在不知道如何表示 mapper 輸出已完成(并關閉通道)。打破這種循環依賴或知道何時關閉具有這種循環行為的通道的最佳方法是什么?下面的代碼存在死鎖,map 輸出通道和 reduce 輸入通道相互等待。type MapFn func(input int) (int, int)type ReduceFn func(a int, b int) inttype kvPair struct {    k int    v int}type reducePair struct {    k  int    v1 int    v2 int}func MapReduce(mapFn MapFn, reduceFn ReduceFn, input []int, nMappers int, nReducers int) (map[int]int, error) {    inputMapChan := make(chan int, len(input))    outputMapChan := make(chan *kvPair, len(input))    reduceInputChan := make(chan *reducePair)    outputMapMap := make(map[int]int)    go func() {        for v := range input {            inputMapChan <- v        }        close(inputMapChan)    }()    for i := 0; i < nMappers; i++ {        go func() {            for v := range inputMapChan {                k, v := mapFn(v)                outputMapChan <- &kvPair{k, v}            }        }()    }    for i := 0; i < nReducers; i++ {        go func() {            for v := range reduceInputChan {                reduceValue := reduceFn(v.v1, v.v2)                outputMapChan <- &kvPair{v.k, reduceValue}            }        }()    }    for v := range outputMapChan {        key := v.k        value := v.v        other, ok := outputMapMap[key]        if ok {            delete(outputMapMap, key)            reduceInputChan <- &reducePair{key, value, other}        } else {            outputMapMap[key] = value        }    }    return outputMapMap, nil}
查看完整描述

1 回答

?
BIG陽

TA貢獻1859條經驗 獲得超6個贊

嘗試這個:


package main


import "fmt"

import "sync"

import "sync/atomic"

import "runtime"

import "math/rand"

import "time"


type MapFn func(input int) *kvPair

type ReduceFn func(a int, b int) int


type kvPair struct {

    k int

    v int

}


type reducePair struct {

    k  int

    v1 int

    v2 int

}


func MapReduce(mapFn MapFn, reduceFn ReduceFn, input []int, nMappers int, nReducers int) (map[int]int, error) {

    inputMapChan := make(chan int, len(input))

    outputMapChan := make(chan *kvPair, len(input))

    reduceInputChan := make(chan *reducePair)

    outputMapMap := make(map[int]int)


    wg := sync.WaitGroup{}

    wg.Add(1)

    go func() {

        defer wg.Done()

        for _, v := range input {

            inputMapChan <- v

        }

        close(inputMapChan)

    }()


    for i := 0; i < nMappers; i++ {

        wg.Add(1)

        go func() {

            defer wg.Done()

            for v := range inputMapChan {

                outputMapChan <- mapFn(v)

            }

        }()

    }


    finished := false

    go func() {

        wg.Wait()

        finished = true

    }()


    var count int64

    for i := 0; i < nReducers; i++ {

        go func() {

            for v := range reduceInputChan {

                reduceValue := reduceFn(v.v1, v.v2)

                outputMapChan <- &kvPair{v.k, reduceValue}

                atomic.AddInt64(&count, -1)

            }

        }()

    }


    wg2 := sync.WaitGroup{}

    wg2.Add(1)

    go func() {

        defer wg2.Done()

        for {

            select {

            default:

                if finished && atomic.LoadInt64(&count) == 0 && len(outputMapChan) == 0 {

                    return

                }

                //runtime.Gosched()

            case v := <-outputMapChan:

                key := v.k

                value := v.v

                if other, ok := outputMapMap[key]; ok {

                    delete(outputMapMap, key)

                    atomic.AddInt64(&count, 1)

                    reduceInputChan <- &reducePair{key, value, other}

                } else {

                    outputMapMap[key] = value

                }

            }

        }

    }()


    wg2.Wait()

    return outputMapMap, nil

}


func main() {

    fmt.Println("NumCPU =", runtime.NumCPU())

    t := time.Now()

    a := rand.Perm(1000000)

    //a = []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 1, 12, 13, 1, 16, 2}

    m, err := MapReduce(mp, rdc, a, 2, 2)

    if err != nil {

        panic(err)

    }

    fmt.Println(time.Since(t)) //883ms

    fmt.Println(m)

    fmt.Println("done.")

}


func mp(input int) *kvPair {

    return &kvPair{input & 7, input >> 3}

}

func rdc(a int, b int) int {

    b <<= 3

    if a != 0 {

        b |= a

    }

    return b

}



查看完整回答
反對 回復 2022-03-02
  • 1 回答
  • 0 關注
  • 184 瀏覽
慕課專欄
更多

添加回答

舉報

0/150
提交
取消
微信客服

購課補貼
聯系客服咨詢優惠詳情

幫助反饋 APP下載

慕課網APP
您的移動學習伙伴

公眾號

掃描二維碼
關注慕課網微信公眾號