亚洲在线久爱草,狠狠天天香蕉网,天天搞日日干久草,伊人亚洲日本欧美

為了賬號安全,請及時綁定郵箱和手機立即綁定
已解決430363個問題,去搜搜看,總會有你想問的

io.Pipe() 導致 WaitGroup 卡住

io.Pipe() 導致 WaitGroup 卡住

Go
九州編程 2022-12-19 10:52:32
我正在處理一個巨大的數據文件,大約是。100 GB。這個巨大文件中的每一行都是一段 JSON 數據,我想讀取、壓縮這些數據并將其存儲在內存數據庫中。var wg sync.WaitGroupfor {    line, err := reader.ReadString('\n')    if err != nil {        break    }    go func(index int) {        wg.Add(1)        pr, pw := io.Pipe()        zw := lzw.NewWriter(pw, lzw.LSB, 8)        _, err := io.Copy(zw, strings.NewReader(line))        pw.Close()        zw.Close()        if err != nil {            fmt.Println(err.Error())        }        b, err := io.ReadAll(pr)        if err != nil {            fmt.Println(err.Error())        }        client.Set(ctx, fmt.Sprintf("%d", index), base64.StdEncoding.EncodeToString(b), time.Hour*1000)        pr.Close()        wg.Done()    }(index)    if index%10000 == 0 {        fmt.Println(index)        wg.Wait()    }    index += 1}但是,此代碼在處理前 10000 行后停止。當我向下移動時wg.Add(1),zw.Close()它繼續處理該行的其余部分(但變得不穩定)。如果沒有lzw,io.Pipe()當我嘗試以未壓縮的方式存儲確切的值時,一切都可以正常工作。我不確定我是否沒有WaitGroup正確使用 the 還是有一些io.Pipe()我還不知道的與 the 相關的東西。
查看完整描述

1 回答

?
心有法竹

TA貢獻1866條經驗 獲得超5個贊

1-刪除pr, pw := io.Pipe()使代碼更簡單,因為它是多余的,

試試這個:


line, err := reader.ReadString('\n')

if err == io.EOF {

    wg.Wait()

    break

}

if err != nil {

    log.Fatal(err)

}

wg.Add(1)

go func(index int) {

    var buf bytes.Buffer

    { // lexical scoping (static scoping)

        zw := lzw.NewWriter(&buf, lzw.LSB, 8)

        n, err := zw.Write([]byte(line)) // n, err := io.Copy(zw, strings.NewReader(line))

        if err != nil {

            log.Fatal(err)

        }

        if int(n) != len(line) {

            log.Fatal(n, len(line))

        }

        // It is the caller's responsibility to call Close on the WriteCloser when finished writing.

        if err = zw.Close(); err != nil {

            log.Fatal(err)

        }

    }

    ctx, cancelFunc := context.WithTimeout(context.Background(), 100*time.Millisecond)

    client.Set(ctx, fmt.Sprintf("%d", index), base64.StdEncoding.EncodeToString(buf.Bytes()), 1000*time.Hour)


    cancelFunc()

    wg.Done()

}(index)


if index%tenThousand == 0 {

    wg.Wait()

}

2-你需要把wg.Add(1)之前go func(index int) {:


    wg.Add(1)

    go func(index int) {

3-wg.Wait()邏輯:


if index%10000 == 0 {

        fmt.Println(index)

        wg.Wait()

    }

如果 . 最后一次迭代會發生什么index%10000 != 0。所以在這里當err == io.EOF你需要wg.Wait()讓所有的 goroutines 加入時:


if err == io.EOF {

    wg.Wait()

    fmt.Println("\n**** All done **** index =", index)

    break

}

4-您可以使用詞法范圍(靜態范圍)來限制一些變量范圍并使代碼更易于管理 - 并知道何時Close:lzw.NewWriter


{ // lexical scoping (static scoping)

    zw := lzw.NewWriter(bufio.NewWriter(&buf), lzw.LSB, 8)

    n, err := io.Copy(zw, strings.NewReader(line))

    if err != nil {

        log.Fatal(err)

    }

    if int(n) != len(line) {

        log.Fatal(n, len(line))

    }

    // It is the caller's responsibility to call Close on the WriteCloser when finished writing.

    if err = zw.Close(); err != nil {

        log.Fatal(err)

    }

}

5- 始終檢查錯誤,例如:


 if err = zw.Close(); err != nil {

    log.Fatal(err)

}

這是接近你的代碼的工作版本 - 試試這個只是為了試驗并發邏輯看看會發生什么(不推薦因為它有多余的 goroutines 并且io.Pipe- 只是工作:


package main


import (

    "bufio"

    "compress/lzw"

    "context"

    "encoding/base64"

    "fmt"

    "io"

    "log"

    "strings"

    "sync"

    "time"

)


func main() {

    index := 0

    client := &myClient{}

    reader := bufio.NewReader(file)

    // your code:

    var wg sync.WaitGroup

    for {

        index++

        line, err := reader.ReadString('\n')

        if err != nil {

            msg <- fmt.Sprint(index, " Done not waiting with err: ", err, time.Now())

            wg.Wait() // break waiting // if index%tenThousand != 0

            break

        }

        wg.Add(1)

        go func(i int) {

            msg <- fmt.Sprint(i, " Enter running ... ", time.Now())

            asyncReader, asyncWriter := io.Pipe() // make it async to read and write

            zipWriter := lzw.NewWriter(asyncWriter, lzw.LSB, 8)

            go func() { // async

                _, err := io.Copy(zipWriter, strings.NewReader(line))

                if err != nil {

                    log.Fatal(err)

                }

                _ = zipWriter.Close()

                _ = asyncWriter.Close() // for io.ReadAll

            }()

            b, err := io.ReadAll(asyncReader)

            if err != nil {

                log.Fatal(err)

            }

            client.Set(context.Background(), fmt.Sprintf("%d", i), base64.StdEncoding.EncodeToString(b), time.Hour*1000)

            asyncReader.Close()

            time.Sleep(1 * time.Second)

            msg <- fmt.Sprint(i, " Exit running ... ", time.Now())

            wg.Done()

        }(index)


        msg <- fmt.Sprint(index, " ", index%tenThousand == 0, " after go call")

        if index%tenThousand == 0 {

            wg.Wait()

            msg <- fmt.Sprint("..", index, " Done waiting after go call. ", time.Now())

        }

    }

    msg <- "Bye forever."


    wg.Wait()

    close(msg)

    wgMsg.Wait()

}


// just for the Go Playground:

const tenThousand = 2


type myClient struct {

}


func (p *myClient) Set(ctx context.Context, a, b string, t time.Duration) {

    // fmt.Println("a =", a, ", b =", b, ", t =", t)

    if ctx.Err() != nil {

        fmt.Println(ctx.Err())

    }

}


var file, myw = io.Pipe()


func init() {

    go func() {

        for i := 1; i <= tenThousand+1; i++ {

            fmt.Fprintf(myw, "%d text to compress aaaaaaaaaaaaaa\n", i)

        }

        myw.Close()

    }()

    wgMsg.Add(1)

    go func() {

        defer wgMsg.Done()

        for s := range msg {

            fmt.Println(s)

        }

    }()

}


var msg = make(chan string, 100)

var wgMsg sync.WaitGroup


輸出:


1 false after go call

2 true after go call

1 Enter running ... 2009-11-10 23:00:00 +0000 UTC m=+0.000000001

2 Enter running ... 2009-11-10 23:00:00 +0000 UTC m=+0.000000001

1 Exit running ... 2009-11-10 23:00:01 +0000 UTC m=+1.000000001

2 Exit running ... 2009-11-10 23:00:01 +0000 UTC m=+1.000000001

..2 Done waiting after go call. 2009-11-10 23:00:01 +0000 UTC m=+1.000000001

3 false after go call

3 Enter running ... 2009-11-10 23:00:01 +0000 UTC m=+1.000000001

4 Done not waiting with err: EOF 2009-11-10 23:00:01 +0000 UTC m=+1.000000001

3 Exit running ... 2009-11-10 23:00:02 +0000 UTC m=+2.000000001

Bye forever.


查看完整回答
反對 回復 2022-12-19
  • 1 回答
  • 0 關注
  • 122 瀏覽
慕課專欄
更多

添加回答

舉報

0/150
提交
取消
微信客服

購課補貼
聯系客服咨詢優惠詳情

幫助反饋 APP下載

慕課網APP
您的移動學習伙伴

公眾號

掃描二維碼
關注慕課網微信公眾號