亚洲在线久爱草,狠狠天天香蕉网,天天搞日日干久草,伊人亚洲日本欧美

為了賬號安全,請及時綁定郵箱和手機立即綁定
已解決430363個問題,去搜搜看,總會有你想問的

Go net/http 在高負載下泄漏內存

Go net/http 在高負載下泄漏內存

Go
森欄 2023-08-14 16:32:32
我正在開發一個使用該net/http包調用客戶端 URL 的 API。根據用戶國家/操作系統,goroutines 中的每個請求(POST 調用)會同時調用 1 到 8 個 URL。該應用程序以大約 1000-1500 個請求的低 qps 運行,但將應用程序擴展到 3k 請求時,內存會突然增加,即使僅調用 1 個客戶端 URL,應用程序也會在幾分鐘后停止響應(響應時間遠高于 50 秒) )。我正在使用 Go 本機net/http包和gorilla/mux路由器。關于這個問題的其他問題說要關閉響應正文,但我已經使用了        req, err := http.NewRequest("POST", "client_post_url", bytes.NewBuffer(requestBody))        req.Header.Set("Content-Type", "application/json")        req.Header.Set("Connection", "Keep-Alive")        response, err := common.Client.Do(req)        status := 0        if err != nil {//handle and return}        defer response.Body.Close() //used with/without io.Copy        status = response.StatusCode        body, _ := ioutil.ReadAll(response.Body)        _, err = io.Copy(ioutil.Discard, response.Body)我需要重用連接,因此我已經在 init 方法中初始化了 http 客戶端和傳輸全局變量,如下所示。    common.Transport = &http.Transport{        TLSClientConfig: &tls.Config{            InsecureSkipVerify: true,        },        DialContext: (&net.Dialer{            //Timeout: time.Duration(300) * time.Millisecond,            KeepAlive: 30 * time.Second,        }).DialContext,        //ForceAttemptHTTP2:     true,        DisableKeepAlives: false,        //MaxIdleConns:      0,        //IdleConnTimeout:   0,        //TLSHandshakeTimeout: time.Duration(300) * time.Millisecond,        //ExpectContinueTimeout: 1 * time.Second,    }    common.Client = &http.Client{        Timeout:   time.Duration(300) * time.Millisecond,        Transport: common.Transport,    }我讀過使用 keep-alive 會導致內存泄漏,我嘗試了一些組合來根據請求禁用 keep-alive/close 請求標志。但似乎沒有任何作用。另外,如果我不進行任何 http 調用并time.Sleep(300 * time.Millisecond)在同時調用每個 url 的 goroutine 中使用,則應用程序可以正常工作而不會出現任何泄漏。所以我確信這與 client/http 包在高負載下連接未釋放或未正確使用有關。我應該采取什么方法來實現這一目標?創建自定義服務器和自定義處理程序類型來接受請求和路由請求是否會像幾篇文章中的 C10K 方法中提到的那樣工作?如果需要,我可以分享包含所有詳細信息的示例代碼。上面只是補充了我覺得問題所在的部分。
查看完整描述

2 回答

?
慕無忌1623718

TA貢獻1744條經驗 獲得超4個贊

這段代碼沒有泄露。


為了演示,讓我們稍微更新一下它,以便該帖子可以重現。


主程序


package main


import (

    "bytes"

    "crypto/tls"

    _ "expvar"

    "fmt"

    "io"

    "io/ioutil"

    "log"

    "math/rand"

    "net"

    "net/http"

    _ "net/http/pprof"

    "os"

    "runtime"

    "strconv"

    "sync"

    "time"


    "github.com/gorilla/mux"

)


var (

    //http client

    Client *http.Client


    //http Transport

    Transport *http.Transport

)



func init() {


    go http.ListenAndServe("localhost:6060", nil)


    //Get Any command line argument passed

    args := os.Args[1:]

    numCPU := runtime.NumCPU()

    if len(args) > 1 {

        numCPU, _ = strconv.Atoi(args[0])

    }


    Transport = &http.Transport{

        TLSClientConfig: &tls.Config{

            InsecureSkipVerify: true,

        },

        DialContext: (&net.Dialer{

            //Timeout: time.Duration() * time.Millisecond,

            KeepAlive: 30 * time.Second,

        }).DialContext,

        //ForceAttemptHTTP2:     true,

        DisableKeepAlives: false,

        //MaxIdleConns:      0,

        //IdleConnTimeout:   0,

        //TLSHandshakeTimeout: time.Duration(300) * time.Millisecond,

        //ExpectContinueTimeout: 1 * time.Second,

    }


    Client = &http.Client{

        // Timeout:   time.Duration(300) * time.Millisecond,

        Transport: Transport,

    }


    runtime.GOMAXPROCS(numCPU)


    rand.Seed(time.Now().UTC().UnixNano())

}


func main() {


    router := mux.NewRouter().StrictSlash(true)

    router.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {

        _, _ = fmt.Fprintf(w, "Hello!!!")

    })


    router.HandleFunc("/{name}", func(w http.ResponseWriter, r *http.Request) {

        vars := mux.Vars(r)


        prepareRequest(w, r, vars["name"])


    }).Methods("POST", "GET")


    // Register pprof handlers

    // router.HandleFunc("/debug/pprof/", pprof.Index)

    // router.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline)

    // router.HandleFunc("/debug/pprof/profile", pprof.Profile)

    // router.HandleFunc("/debug/pprof/symbol", pprof.Symbol)

    // router.HandleFunc("/debug/pprof/trace", pprof.Trace)


    routerMiddleWare := http.TimeoutHandler(router, 500*time.Millisecond, "Timeout")


    srv := &http.Server{

        Addr: "localhost:8080",

        /*ReadTimeout:  500 * time.Millisecond,

          WriteTimeout: 500 * time.Millisecond,

          IdleTimeout:  10 * time.Second,*/

        Handler: routerMiddleWare,

    }


    log.Fatal(srv.ListenAndServe())

}


func prepareRequest(w http.ResponseWriter, r *http.Request, name string) {


    // go func() {

    //  make(chan []byte) <- make([]byte, 10024)

    // }()


    //other part of the code and call to goroutine

    var urls []string

    urls = append(urls,

        "http://localhost:7000/",

        "http://localhost:7000/",

    )

    results, s, c := callUrls(urls)

    finalCall(w, results, s, c)


}


type Response struct {

    Status int

    Url    string

    Body   string

}


func callUrls(urls []string) ([]*Response, []string, []string) {

    var wg sync.WaitGroup

    wg.Add(len(urls))

    ch := make(chan func() (*Response, string, string), len(urls))

    for _, url := range urls {

        go func(url string) {

            //decide if request is valid for client to make http call using country/os

            isValid := true //assuming url to be called

            if isValid {

                //make post call

                //request body have many more paramter, just sample included.

                //instead of creating new request, time.Sleep for 300ms doesn't cause any memory leak.

                req, err := http.NewRequest("POST", url, bytes.NewBuffer([]byte(`{"body":"param"}`)))

                if err != nil {

                    wg.Done()

                    ch <- func() (*Response, string, string) {

                        return &Response{Status: 500, Url: url, Body: ""}, err.Error(), "500"

                    }

                    return

                }

                req.Header.Set("Content-Type", "application/json")

                req.Header.Set("Connection", "Keep-Alive")

                //req.Close = true


                response, err := Client.Do(req)


                if err != nil {

                    wg.Done()

                    ch <- func() (*Response, string, string) {

                        return &Response{Status: 500, Url: url, Body: ""}, err.Error(), "500"

                    }

                    return

                }


                defer response.Body.Close()

                body, _ := ioutil.ReadAll(response.Body)

                io.Copy(ioutil.Discard, response.Body)


                //Close the body, forced this

                //Also tried without defer, and only wothout following line

                response.Body.Close()


                //do something with response body replace a few string etc.

                //and return

                wg.Done()

                ch <- func() (*Response, string, string) {

                    return &Response{Status: 200, Url: url, Body: string(body)}, "success", "200"

                }


            } else {

                wg.Done()

                ch <- func() (*Response, string, string) {

                    return &Response{Status: 500, Url: url, Body: ""}, "invalid", "500"

                }

            }


        }(url)

    }

    wg.Wait()

    var (

        results []*Response

        msg     []string

        status  []string

    )

    for {

        r, x, y := (<-ch)()

        if r != nil {


            results = append(results, r)

            msg = append(msg, x)

            status = append(status, y)

        }

        if len(results) == len(urls) {

            return results, msg, status

        }


    }

}


func finalCall(w http.ResponseWriter, results []*Response, msg []string, status []string) {

    fmt.Println("response", "response body", results, msg, status)

}

k/main.go


package main


import "net/http"


func main() {

    y := make([]byte, 100)

    http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {

        w.WriteHeader(http.StatusOK)

        w.Write(y)

    })

    http.ListenAndServe(":7000", nil)

}

安裝額外的可視化工具,并用來ab模擬一些負載,它就可以完成直觀的演示。


go get -u github.com/divan/expvarmon

go run main.go &

go run k/main.go &

ab -n 50000 -c 2500 http://localhost:8080/y

# in a different window, for live preview

expvarmon -ports=6060 -i 500ms

此時您會讀取 的輸出expvarmon,如果它是實時的,您會看到類似的內容

https://img1.sycdn.imooc.com//64d9e6de00018d3106520402.jpg

你可以看到那些東西在揮舞,GC 正在積極工作。

應用程序已加載,內存正在消耗,等待服務器釋放其 conn 并等待 gc 清理它們

https://img1.sycdn.imooc.com//64d9e6ec0001faeb06520244.jpg

您可以看到memstats.Allocmemstats.HeapAlloc,memstats.HeapInuse現在減少了,正如 GC 完成其工作時所預期的那樣,并且不存在泄漏。

如果你要在運行go tool pprof -inuse_space -web http://localhost:6060/debug/pprof/heap后立即檢查ab

https://img1.sycdn.imooc.com//64d9e7020001618725601236.jpg

它表明該應用程序正在使用177Mb內存。


其中大部分102Mb正在被使用net/http.Transport.getConn。


你的經紀人正在負責1Mb,剩下的就是需要做的各種事情。


如果您在服務器和 gc 發布后截取屏幕截圖,您會看到一個更小的圖表。這里就不演示了。


現在讓我們生成一個泄漏并再次使用這兩個工具查看它。


在代碼中取消注釋,



func prepareRequest(w http.ResponseWriter, r *http.Request, name string) {


    go func() {

        make(chan []byte) <- make([]byte, 10024)

    }()

//...

重新啟動應用程序(按q,expvarmon但不是必需的)


go get -u github.com/divan/expvarmon

go run main.go &

go run k/main.go &

ab -n 50000 -c 2500 http://localhost:8080/y

# in a different window, for live preview

expvarmon -ports=6060 -i 500ms

表明

https://img1.sycdn.imooc.com//64d9e7150001104609060339.jpg

https://img1.sycdn.imooc.com//64d9e71e000125dd09040360.jpg

expvarmon可以看到相同的行為,只有數字發生了變化,并且在靜止狀態下,在被 gced 后,它仍然消耗大量內存,比作為比較點的 void golang http 服務器要多得多。

再次,截取堆的屏幕截圖,它顯示您的處理程序現在正在消耗大部分內存 ~ 450Mb,注意箭頭,它顯示有 for452mb分配10kb4.50Mbof 96b。它們分別對應于[]byte被推送到的切片chan []byte。

https://img1.sycdn.imooc.com//64d9e72c0001294b25600941.jpg

最后,您可以檢查堆棧跟蹤以查找死 goroutine,從而泄漏內存,打開http://localhost:6060/debug/pprof/goroutine?debug=1


goroutine profile: total 50012


50000 @ 0x43098f 0x4077fa 0x4077d0 0x4074bb 0x76b85d 0x45d281

#   0x76b85c    main.prepareRequest.func1+0x4c  /home/mh-cbon/gow/src/test/oom/main.go:101


4 @ 0x43098f 0x42c09a 0x42b686 0x4c3a3b 0x4c484b 0x4c482c 0x57d94f 0x590d79 0x6b4c67 0x5397cf 0x53a51d 0x53a754 0x6419ef 0x6af18d 0x6af17f 0x6b5f33 0x6ba4fd 0x45d281

#   0x42b685    internal/poll.runtime_pollWait+0x55     /home/mh-cbon/.gvm/gos/go1.12.7/src/runtime/netpoll.go:182

#   0x4c3a3a    internal/poll.(*pollDesc).wait+0x9a     /home/mh-cbon/.gvm/gos/go1.12.7/src/internal/poll/fd_poll_runtime.go:87

// more...

它告訴我們程序正在托管goroutine,然后它按文件位置分組列出它們,其中第一個數字是本示例第一組中50 012正在運行的實例的計數。50 000接下來是導致 goroutine 存在的堆棧跟蹤。

您可以看到有很多系統問題,就您的情況而言,您不必太擔心。

您必須尋找那些您認為如果您的程序按您認為應該的方式運行則不應運行的程序。

然而,總體而言,您的代碼并不令人滿意,并且可以并且可能應該通過對其分配和總體設計概念進行徹底審查來改進。

** 這是對原始源代碼所做的更改的摘要。

  • 它添加了一個新程序k/main.go來充當后端服務器。

  • 它添加了_ "expvar"導入語句

  • init它啟動 pprof 在階段期間注冊的 std api HTTP 服務器實例go http.ListenAndServe("localhost:6060", nil)

  • 禁用客戶端超時Timeout:   time.Duration(300) * time.Millisecond,,否則負載測試不會返回200s

  • 服務器地址設置為Addr: "localhost:8080",

  • urls其中創建的值設置prepareRequest為 len=2 的靜態列表

  • req, err := http.NewRequest("POST", url, bytes.NewBuffer([]byte(它添加了對{"body":"param"}的錯誤檢查)))

  • 它禁用錯誤檢查io.Copy(ioutil.Discard, response.Body)


查看完整回答
反對 回復 2023-08-14
?
largeQ

TA貢獻2039條經驗 獲得超8個贊

我已經通過用 替換net/httppackage解決了這個問題fasthttp。早些時候我沒有使用它,因為我無法在 fasthttp 客戶端上找到超時方法,但我發現DoTimeoutfasthttp 客戶端確實有一種方法,可以在指定的持續時間后使請求超時。

這里是更新的代碼:

vars.go中 ClientFastHttp *fasthttp.Client

主程序

package main


import (

    "./common"

    "crypto/tls"

    "fmt"

    "github.com/gorilla/mux"

    "github.com/valyala/fasthttp"

    "log"

    "math/rand"

    "net"

    "net/http"

    "net/http/pprof"

    "os"

    "runtime"

    "strconv"

    "sync"

    "time"

)


func init() {


    //Get Any command line argument passed

    args := os.Args[1:]

    numCPU := runtime.NumCPU()

    if len(args) > 1 {

        numCPU, _ = strconv.Atoi(args[0])

    }


    common.Transport = &http.Transport{

        TLSClientConfig: &tls.Config{

            InsecureSkipVerify: true,

        },

        DialContext: (&net.Dialer{

            //Timeout: time.Duration() * time.Millisecond,

            KeepAlive: 30 * time.Second,

        }).DialContext,

        //ForceAttemptHTTP2:     true,

        DisableKeepAlives: false,

        //MaxIdleConns:      0,

        //IdleConnTimeout:   0,

        //TLSHandshakeTimeout: time.Duration(300) * time.Millisecond,

        //ExpectContinueTimeout: 1 * time.Second,

    }


    common.Client = &http.Client{

        Timeout:   time.Duration(300) * time.Millisecond,

        Transport: common.Transport,

    }


    runtime.GOMAXPROCS(numCPU)


    rand.Seed(time.Now().UTC().UnixNano())

}


func main() {


    router := mux.NewRouter().StrictSlash(true)

    router.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {

        _, _ = fmt.Fprintf(w, "Hello!!!")

    })


    router.HandleFunc("/{name}", func(w http.ResponseWriter, r *http.Request) {

        vars := mux.Vars(r)


        prepareRequest(w, r, vars["name"])


    }).Methods("POST")


    // Register pprof handlers

    router.HandleFunc("/debug/pprof/", pprof.Index)

    router.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline)

    router.HandleFunc("/debug/pprof/profile", pprof.Profile)

    router.HandleFunc("/debug/pprof/symbol", pprof.Symbol)

    router.HandleFunc("/debug/pprof/trace", pprof.Trace)


    routerMiddleWare := http.TimeoutHandler(router, 500*time.Millisecond, "Timeout")


    srv := &http.Server{

        Addr: "0.0.0.0:" + "80",

        /*ReadTimeout:  500 * time.Millisecond,

        WriteTimeout: 500 * time.Millisecond,

        IdleTimeout:  10 * time.Second,*/

        Handler: routerMiddleWare,

    }


    log.Fatal(srv.ListenAndServe())

}


func prepareRequest(w http.ResponseWriter, r *http.Request, name string) {


    //other part of the code and call to goroutine

    var urls []string

    results, s, c := callUrls(urls)

    finalCall(w, results, s, c)


}


type Response struct {

    Status int

    Url    string

    Body   string

}


func callUrls(urls []string) ([]*Response, []string, []string) {

    var wg sync.WaitGroup

    wg.Add(len(urls))

    ch := make(chan func() (*Response, string, string), len(urls))

    for _, url := range urls {

        go func(url string) {

            //decide if request is valid for client to make http call using country/os

            isValid := true //assuming url to be called

            if isValid {

                //make post call

                //request body have many more paramter, just sample included.

                //instead of creating new request, time.Sleep for 300ms doesn't cause any memory leak.

                req := fasthttp.AcquireRequest()

                req.SetRequestURI(url)

                req.Header.Set("Content-Type", "application/json")

                req.Header.Set("Connection", "Keep-Alive")

                req.Header.SetMethod("POST")

                req.SetBody([]byte(`{"body":"param"}`))


                resp := fasthttp.AcquireResponse()


                defer fasthttp.ReleaseRequest(req)   // <- do not forget to release

                defer fasthttp.ReleaseResponse(resp) // <- do not forget to release


                //err := clientFastHttp.Do(req, response)

                //endregion

                t := time.Duration(300)


                err := common.ClientFastHttp.DoTimeout(req, resp, t*time.Millisecond)


                body := resp.Body()


                if err != nil {

                    wg.Done()

                    ch <- func() (*Response, string, string) {

                        return &Response{Status: 500, Url: url, Body: ""}, "error", "500"

                    }

                    return

                }


                /*defer response.Body.Close()

                body, _ := ioutil.ReadAll(response.Body)

                _, err = io.Copy(ioutil.Discard, response.Body)


                //Close the body, forced this

                //Also tried without defer, and only wothout following line

                response.Body.Close()*/


                //do something with response body replace a few string etc.

                //and return

                wg.Done()

                ch <- func() (*Response, string, string) {

                    return &Response{Status: 200, Url: url, Body: string(body)}, "success", "200"

                }


            } else {

                wg.Done()

                ch <- func() (*Response, string, string) {

                    return &Response{Status: 500, Url: url, Body: ""}, "invalid", "500"

                }

            }


        }(url)

    }

    wg.Wait()

    var (

        results []*Response

        msg     []string

        status  []string

    )

    for {

        r, x, y := (<-ch)()

        if r != nil {


            results = append(results, r)

            msg = append(msg, x)

            status = append(status, y)

        }

        if len(results) == len(urls) {

            return results, msg, status

        }


    }

}


func finalCall(w http.ResponseWriter, results []*Response, msg []string, status []string) {

    fmt.Println("response", "response body", results, msg, status)

}


查看完整回答
反對 回復 2023-08-14
  • 2 回答
  • 0 關注
  • 221 瀏覽
慕課專欄
更多

添加回答

舉報

0/150
提交
取消
微信客服

購課補貼
聯系客服咨詢優惠詳情

幫助反饋 APP下載

慕課網APP
您的移動學習伙伴

公眾號

掃描二維碼
關注慕課網微信公眾號