亚洲在线久爱草,狠狠天天香蕉网,天天搞日日干久草,伊人亚洲日本欧美

為了賬號安全,請及時綁定郵箱和手機立即綁定
已解決430363個問題,去搜搜看,總會有你想問的

如何確保在 goroutines 中啟動的 goroutines 彼此同步?

如何確保在 goroutines 中啟動的 goroutines 彼此同步?

Go
SMILET 2022-10-10 19:30:33
這是我第一次使用 Go 的并發特性,我正在深入研究。我想對 API 進行并發調用。該請求基于我想要收到的帖子的標簽(可以有 1 <= N 個標簽)。響應正文如下所示:{    "posts": [        {            "id": 1,            "author": "Name",            "authorId": 1,            "likes": num_likes,            "popularity": popularity_decimal,            "reads": num_reads,            "tags": [ "tag1", "tag2" ]        },        ...    ]}我的計劃是將一堆通道菊花鏈在一起,并產生一些從這些通道讀取和或寫入的 goroutine:- for each tag, add it to a tagsChannel inside a goroutine- use that tagsChannel inside another goroutine to make concurrent GET requests to the endpoint- for each response of that request, pass the underlying slice of posts into another goroutine- for each individual post inside the slice of posts, add the post to a postChannel- inside another goroutine, iterate over postChannel and insert each post into a data structure這是我到目前為止所擁有的:func (srv *server) Get() {    // Using red-black tree prevents any duplicates, fast insertion    // and retrieval times, and is sorted already on ID.    rbt := tree.NewWithIntComparator()    // concurrent approach    tagChan := make(chan string)                       // tags -> tagChan    postChan := make(chan models.Post)                 // tagChan -> GET -> post -> postChan    errChan := make(chan error)                        // for synchronizing errors across goroutines    wg := &sync.WaitGroup{}                            // for synchronizing goroutines    wg.Add(4)    // create a go func to synchronize our wait groups    // once all goroutines are finished, we can close our errChan    go func() {        wg.Wait()        close(errChan)    }()    go insertTags(tags, tagChan, wg)    go fetch(postChan, tagChan, errChan, wg)    go addPostToTree(rbt, postChan, wg)    for err := range errChan {        if err != nil {            srv.HandleError(err, http.StatusInternalServerError).ServeHTTP(w, r)        }    }}
查看完整描述

1 回答

?
米脂

TA貢獻1836條經驗 獲得超3個贊

首先,整個設計相當復雜。說到最后我的想法。

您的代碼中有兩個問題:

  1. posts通道永遠不會關閉,因此addPostToTree可能永遠不會存在循環,從而導致一個 waitGroup 永遠不會減少(在您的情況下,程序掛起)。程序有可能無限期地等待死鎖(認為其他 goroutine 會釋放它,但所有 goroutine 都卡住了)。
    解決方法:您可以關閉postChan頻道。但是怎么做?始終建議制作人始終關閉頻道,但您有多個制作人。所以最好的選擇是,等待所有生產者完成,然后關閉通道。為了等待所有生產者完成,您需要創建另一個 waitGroup 并使用它來跟蹤子例程。

代碼:

// fetch completes a GET request to the endpoint

func fetch(posts chan<- models.Post, tags <-chan string, errs chan<- error, group *sync.WaitGroup) {

    postsWG := &sync.WaitGroup{}

    for tag := range tags {

        ep, err := formURL(tag)

        if err != nil {

            errs <- err

        }

        postsWG.Add(1) // QUESTION should I use a separate wait group here?

        go func() {

            resp, err := http.Get(ep.String())

            if err != nil {

                errs <- err

            }

            container := models.PostContainer{}

            err = json.NewDecoder(resp.Body).Decode(&container)

            defer resp.Body.Close()

            go insertPosts(posts, container.Posts, postsWG)

        }()

    }


    defer func() {

        postsWG.Wait()

        close(posts)

        group.Done()

    }()

}

現在,我們還有另一個問題,主要的 waitGroup 應該使用3而不是初始化4。這是因為主例程只增加了 3 個例程wg.Add(3),因此它必須只跟蹤這些例程。對于子例程,我們使用不同的 waitGroup,因此這不再是父例程的頭疼問題。

代碼:


errChan := make(chan error)                        // for synchronizing errors across goroutines

    wg := &sync.WaitGroup{}                            // for synchronizing goroutines

    wg.Add(3)

    // create a go func to synchronize our wait groups

    // once all goroutines are finished, we can close our errChan

TLDR——


復雜設計 - 由于主等待組在一個地方啟動,但每個 goroutine 都在根據需要修改這個 waitGroup。因此,沒有單一的所有者,這使得調試和維護超級復雜(+ 不能確保它沒有錯誤)。

我建議將其分解并為每個子例程設置單獨的跟蹤器。這樣,正在運行更多例程的調用者只能專注于跟蹤其子 goroutine。然后,該例程將僅在其完成后才通知其父 waitGroup(及其子程序完成,而不是讓子程序直接通知祖父母)。


另外,fetch在進行 HTTP 調用并獲得響應后的方法中,為什么要創建另一個 goroutine 來處理這些數據?無論哪種方式,這個 goroutine 在數據插入發生之前都無法退出,也不會執行數據處理發生的其他操作。據我了解,第二個 goroutine 是多余的。


group.Add(1) // QUESTION should I add a separate wait group here and pass it to insertPosts?

            go insertPosts(posts, container.Posts, group)

            defer group.Done()


查看完整回答
反對 回復 2022-10-10
  • 1 回答
  • 0 關注
  • 98 瀏覽
慕課專欄
更多

添加回答

舉報

0/150
提交
取消
微信客服

購課補貼
聯系客服咨詢優惠詳情

幫助反饋 APP下載

慕課網APP
您的移動學習伙伴

公眾號

掃描二維碼
關注慕課網微信公眾號