1 回答

TA貢獻1887條經驗 獲得超5個贊
這種行為的原因在于 Go 的調度程序(這個問題的較短版本在 golang-nuts)。上面的 goroutines 都在同一時間點開始執行(如計時所示,加上檢查 startTime 變量的內存位置證明時間對象不是“回收”),但是一旦它們命中 http.Get() 就會取消調度. 計時是遞增的,因為 http.Get() 造成了瓶頸,不允許并發執行生成的 goroutine 數量。似乎這里使用了某種 FIFO 隊列。
推薦觀看閱讀:
解釋 Golang I/O 多路復用 netpoller 模型
隊列、公平性和 Go 調度程序
研究等待組的大小,我發現一些值顯示出更加一致的時間(而不是增量)。所以我想知道等待組大小對總時間和個人時間的影響是什么。我將上面重構為一個程序,該程序在給定范圍內對每個 waitgroupsize 進行多次實驗,并將每次運行的總計時和單獨計時保存到 sqlite 數據庫中。生成的數據集可以很容易地用于 Jupyter Notebook 等。不幸的是,在當前設置下,我只能獲得大約 40K 的請求,然后才會受到限制???a >我的github對于某些數據集,如果您有興趣但不想等待數據,因為它需要很長時間才能完成。有趣的結果是,對于小型 wg 大小,并發/順序比率急劇下降,您會在最后看到連接開始受到限制。該運行當時被手動中止。
并發運行時間/順序運行時間與等待組大?。?/p>
不同等待組大小的個別時間圖。
package main
import (
"database/sql"
"fmt"
"log"
"net/http"
"os"
"path/filepath"
"runtime"
"sync"
"time"
_ "github.com/mattn/go-sqlite3"
)
///// global vars
const REQUESTS int = 100 // Single run size, performed two times (concurrent and sequential)
const URL string = "SET_YOUR_OWN" // Some file on a CDN somewhere; used for the GET requests
const DBNAME string = "netRand.db" // Name of the db file. Saved next to the executable
const WGMIN int = 1 // Start range for waitgroup size (inclusive)
const WGMAX int = 101 // Stop range for waitgroup size (exclusive)
const NREPEAT int = 10 // Number of times to repeat a run for a specific waitgroup size
//// types
type timingResult struct {
// Container for collecting results before persisting to DB
WaitgroupSize int
ConcurrentTimingsMs [REQUESTS]int64
ConcurrentTotalMs int64
SequentialTimingsMs [REQUESTS]int64
SequentialTotalMs int64
}
//// main
func main() {
db := setupDb()
defer db.Close()
for i := WGMIN; i < WGMAX; i++ {
// waitgroup size range
for j := 0; j < NREPEAT; j++ {
// repeat for more data points
timings := requestTimes(i)
persistTimings(timings, db)
fmt.Printf("\n======== %v of %v ============\n", j+1, NREPEAT)
fmt.Printf("current waitgroup size: %v\n", i)
fmt.Printf("max waitgroup size: %v\n", WGMAX-1)
}
}
}
func requestTimes(waitgroupSize int) timingResult {
// do NTIMES requests in go routines with waitgroupSize
// do NTIMES requests sequentially
timings_concurrent, total_concurrent := concurrentRequests(waitgroupSize)
timings_sequential, total_sequential := sequentialRequests()
return timingResult{
WaitgroupSize: waitgroupSize,
ConcurrentTimingsMs: timings_concurrent,
ConcurrentTotalMs: total_concurrent,
SequentialTimingsMs: timings_sequential,
SequentialTotalMs: total_sequential,
}
}
func persistTimings(timings timingResult, db *sql.DB) {
persistRun(timings, db)
currentRunId := getCurrentRunId(db)
persistConcurrentTimings(currentRunId, timings, db)
persistSequentialTimings(currentRunId, timings, db)
}
func concurrentRequests(waitgroupSize int) ([REQUESTS]int64, int64) {
start := time.Now()
var wg sync.WaitGroup
var timings [REQUESTS]int64
ch := make(chan int64, REQUESTS)
for i := range timings {
wg.Add(1)
go func() {
defer wg.Done()
doGetChannel(URL, ch)
}()
// waitgroupsize is controlled using modulo
// making sure experiment size is always NTIMES
// independent of waitgroupsize
if i%waitgroupSize == 0 {
wg.Wait()
}
}
wg.Wait()
close(ch)
count := 0
for ret := range ch {
timings[count] = ret
count++
}
return timings, time.Since(start).Milliseconds()
}
func doGetChannel(address string, channel chan int64) {
// time get request and send to channel
startSub := time.Now().UnixMilli()
_, err := http.Get(address)
if err != nil {
log.Fatalln(err)
}
stopSub := time.Now().UnixMilli()
delta := stopSub - startSub
channel <- delta
}
func sequentialRequests() ([REQUESTS]int64, int64) {
startGo := time.Now()
var timings_sequential [REQUESTS]int64
for i := range timings_sequential {
timings_sequential[i] = doGetReturn(URL)
}
return timings_sequential, time.Since(startGo).Milliseconds()
}
func doGetReturn(address string) int64 {
// time get request without a waitgroup/channel
start := time.Now()
_, err := http.Get(address)
if err != nil {
log.Fatalln(err)
}
duration := time.Since(start).Milliseconds()
return duration
}
//// DB
func setupDb() *sql.DB {
// __________________________runs____________________
// | |
// concurrent_timings(fk: run_id) sequential_timings(fk: run_id)
//
const createRuns string = `
CREATE TABLE IF NOT EXISTS runs (
run_id INTEGER NOT NULL PRIMARY KEY,
time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
waitgroup_size INTEGER,
concurrent_total_ms INTEGER,
sequential_total_ms INTEGER,
concurrent_sequential_ratio REAL
);`
const createSequentialTimings string = `
CREATE TABLE IF NOT EXISTS sequential_timings (
run INTEGER,
call_number INTEGER,
timing_ms INTEGER,
FOREIGN KEY(run) REFERENCES runs(run_id)
);`
const createConcurrentTimings string = `
CREATE TABLE IF NOT EXISTS concurrent_timings (
run INTEGER,
channel_position INTEGER,
timing_ms INTEGER,
FOREIGN KEY(run) REFERENCES runs(run_id)
);`
// retrieve platform appropriate connection string
dbString := getConnectionString(DBNAME)
db, err := sql.Open("sqlite3", dbString)
if err != nil {
log.Fatalln(err)
}
if _, err := db.Exec(createRuns); err != nil {
log.Fatalln(err)
}
if _, err := db.Exec(createSequentialTimings); err != nil {
log.Fatalln(err)
}
if _, err := db.Exec(createConcurrentTimings); err != nil {
log.Fatalln(err)
}
return db
}
func getConnectionString(dbName string) string {
// Generate platform appropriate connection string
// the db is placed in the same directory as the current executable
// retrieve the path to the currently executed executable
ex, err := os.Executable()
if err != nil {
panic(err)
}
// retrieve path to containing dir
dbDir := filepath.Dir(ex)
// Append platform appropriate separator and dbName
if runtime.GOOS == "windows" {
dbDir = dbDir + "\\" + dbName
} else {
dbDir = dbDir + "/" + dbName
}
return dbDir
}
func persistRun(timings timingResult, db *sql.DB) {
tx, err := db.Begin()
if err != nil {
log.Fatalln(err)
}
insertRun, err := db.Prepare(`INSERT INTO runs(
waitgroup_size,
sequential_total_ms,
concurrent_total_ms,
concurrent_sequential_ratio)
VALUES(?, ?, ?, ?)`)
if err != nil {
log.Fatalln(err)
}
defer tx.Stmt(insertRun).Close()
_, err = tx.Stmt(insertRun).Exec(
timings.WaitgroupSize,
timings.SequentialTotalMs,
timings.ConcurrentTotalMs,
float32(timings.ConcurrentTotalMs)/float32(timings.SequentialTotalMs),
)
if err != nil {
log.Fatalln(err)
}
err = tx.Commit()
if err != nil {
log.Fatalln(err)
}
}
func getCurrentRunId(db *sql.DB) int {
rows, err := db.Query("SELECT MAX(run_id) FROM runs")
if err != nil {
log.Fatal(err)
}
var run_id int
for rows.Next() {
err = rows.Scan(&run_id)
if err != nil {
log.Fatalln(err)
}
}
rows.Close()
return run_id
}
func persistConcurrentTimings(runId int, timings timingResult, db *sql.DB) {
tx, err := db.Begin()
if err != nil {
log.Fatalln(err)
}
insertTiming, err := db.Prepare(`INSERT INTO concurrent_timings(
run,
channel_position,
timing_ms)
VALUES(?, ?, ?)`)
if err != nil {
log.Fatalln(err)
}
for i, timing := range timings.ConcurrentTimingsMs {
_, err = tx.Stmt(insertTiming).Exec(
runId,
i,
timing,
)
if err != nil {
log.Fatalln(err)
}
}
err = tx.Commit()
if err != nil {
log.Fatalln(err)
}
}
func persistSequentialTimings(runId int, timings timingResult, db *sql.DB) {
tx, err := db.Begin()
if err != nil {
log.Fatalln(err)
}
insertTiming, err := db.Prepare(`INSERT INTO sequential_timings(
run,
call_number,
timing_ms)
VALUES(?, ?, ?)`)
if err != nil {
log.Fatalln(err)
}
for i, timing := range timings.SequentialTimingsMs {
_, err = tx.Stmt(insertTiming).Exec(
runId,
i,
timing,
)
if err != nil {
log.Fatalln(err)
}
}
err = tx.Commit()
if err != nil {
log.Fatalln(err)
}
}
- 1 回答
- 0 關注
- 150 瀏覽
添加回答
舉報