1 回答

TA贡献1887条经验 获得超5个赞
这种行为的原因在于 Go 的调度程序(这个问题的较短版本在 golang-nuts)。上面的 goroutines 都在同一时间点开始执行(如计时所示,加上检查 startTime 变量的内存位置证明时间对象不是“回收”),但是一旦它们命中 http.Get() 就会取消调度. 计时是递增的,因为 http.Get() 造成了瓶颈,不允许并发执行生成的 goroutine 数量。似乎这里使用了某种 FIFO 队列。
推荐观看阅读:
解释 Golang I/O 多路复用 netpoller 模型
队列、公平性和 Go 调度程序
研究等待组的大小,我发现一些值显示出更加一致的时间(而不是增量)。所以我想知道等待组大小对总时间和个人时间的影响是什么。我将上面重构为一个程序,该程序在给定范围内对每个 waitgroupsize 进行多次实验,并将每次运行的总计时和单独计时保存到 sqlite 数据库中。生成的数据集可以很容易地用于 Jupyter Notebook 等。不幸的是,在当前设置下,我只能获得大约 40K 的请求,然后才会受到限制。看我的github对于某些数据集,如果您有兴趣但不想等待数据,因为它需要很长时间才能完成。有趣的结果是,对于小型 wg 大小,并发/顺序比率急剧下降,您会在最后看到连接开始受到限制。该运行当时被手动中止。
并发运行时间/顺序运行时间与等待组大小:
不同等待组大小的个别时间图。
package main
import (
"database/sql"
"fmt"
"log"
"net/http"
"os"
"path/filepath"
"runtime"
"sync"
"time"
_ "github.com/mattn/go-sqlite3"
)
///// global vars
const REQUESTS int = 100 // Single run size, performed two times (concurrent and sequential)
const URL string = "SET_YOUR_OWN" // Some file on a CDN somewhere; used for the GET requests
const DBNAME string = "netRand.db" // Name of the db file. Saved next to the executable
const WGMIN int = 1 // Start range for waitgroup size (inclusive)
const WGMAX int = 101 // Stop range for waitgroup size (exclusive)
const NREPEAT int = 10 // Number of times to repeat a run for a specific waitgroup size
//// types
type timingResult struct {
// Container for collecting results before persisting to DB
WaitgroupSize int
ConcurrentTimingsMs [REQUESTS]int64
ConcurrentTotalMs int64
SequentialTimingsMs [REQUESTS]int64
SequentialTotalMs int64
}
//// main
func main() {
db := setupDb()
defer db.Close()
for i := WGMIN; i < WGMAX; i++ {
// waitgroup size range
for j := 0; j < NREPEAT; j++ {
// repeat for more data points
timings := requestTimes(i)
persistTimings(timings, db)
fmt.Printf("\n======== %v of %v ============\n", j+1, NREPEAT)
fmt.Printf("current waitgroup size: %v\n", i)
fmt.Printf("max waitgroup size: %v\n", WGMAX-1)
}
}
}
func requestTimes(waitgroupSize int) timingResult {
// do NTIMES requests in go routines with waitgroupSize
// do NTIMES requests sequentially
timings_concurrent, total_concurrent := concurrentRequests(waitgroupSize)
timings_sequential, total_sequential := sequentialRequests()
return timingResult{
WaitgroupSize: waitgroupSize,
ConcurrentTimingsMs: timings_concurrent,
ConcurrentTotalMs: total_concurrent,
SequentialTimingsMs: timings_sequential,
SequentialTotalMs: total_sequential,
}
}
func persistTimings(timings timingResult, db *sql.DB) {
persistRun(timings, db)
currentRunId := getCurrentRunId(db)
persistConcurrentTimings(currentRunId, timings, db)
persistSequentialTimings(currentRunId, timings, db)
}
func concurrentRequests(waitgroupSize int) ([REQUESTS]int64, int64) {
start := time.Now()
var wg sync.WaitGroup
var timings [REQUESTS]int64
ch := make(chan int64, REQUESTS)
for i := range timings {
wg.Add(1)
go func() {
defer wg.Done()
doGetChannel(URL, ch)
}()
// waitgroupsize is controlled using modulo
// making sure experiment size is always NTIMES
// independent of waitgroupsize
if i%waitgroupSize == 0 {
wg.Wait()
}
}
wg.Wait()
close(ch)
count := 0
for ret := range ch {
timings[count] = ret
count++
}
return timings, time.Since(start).Milliseconds()
}
func doGetChannel(address string, channel chan int64) {
// time get request and send to channel
startSub := time.Now().UnixMilli()
_, err := http.Get(address)
if err != nil {
log.Fatalln(err)
}
stopSub := time.Now().UnixMilli()
delta := stopSub - startSub
channel <- delta
}
func sequentialRequests() ([REQUESTS]int64, int64) {
startGo := time.Now()
var timings_sequential [REQUESTS]int64
for i := range timings_sequential {
timings_sequential[i] = doGetReturn(URL)
}
return timings_sequential, time.Since(startGo).Milliseconds()
}
func doGetReturn(address string) int64 {
// time get request without a waitgroup/channel
start := time.Now()
_, err := http.Get(address)
if err != nil {
log.Fatalln(err)
}
duration := time.Since(start).Milliseconds()
return duration
}
//// DB
func setupDb() *sql.DB {
// __________________________runs____________________
// | |
// concurrent_timings(fk: run_id) sequential_timings(fk: run_id)
//
const createRuns string = `
CREATE TABLE IF NOT EXISTS runs (
run_id INTEGER NOT NULL PRIMARY KEY,
time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
waitgroup_size INTEGER,
concurrent_total_ms INTEGER,
sequential_total_ms INTEGER,
concurrent_sequential_ratio REAL
);`
const createSequentialTimings string = `
CREATE TABLE IF NOT EXISTS sequential_timings (
run INTEGER,
call_number INTEGER,
timing_ms INTEGER,
FOREIGN KEY(run) REFERENCES runs(run_id)
);`
const createConcurrentTimings string = `
CREATE TABLE IF NOT EXISTS concurrent_timings (
run INTEGER,
channel_position INTEGER,
timing_ms INTEGER,
FOREIGN KEY(run) REFERENCES runs(run_id)
);`
// retrieve platform appropriate connection string
dbString := getConnectionString(DBNAME)
db, err := sql.Open("sqlite3", dbString)
if err != nil {
log.Fatalln(err)
}
if _, err := db.Exec(createRuns); err != nil {
log.Fatalln(err)
}
if _, err := db.Exec(createSequentialTimings); err != nil {
log.Fatalln(err)
}
if _, err := db.Exec(createConcurrentTimings); err != nil {
log.Fatalln(err)
}
return db
}
func getConnectionString(dbName string) string {
// Generate platform appropriate connection string
// the db is placed in the same directory as the current executable
// retrieve the path to the currently executed executable
ex, err := os.Executable()
if err != nil {
panic(err)
}
// retrieve path to containing dir
dbDir := filepath.Dir(ex)
// Append platform appropriate separator and dbName
if runtime.GOOS == "windows" {
dbDir = dbDir + "\\" + dbName
} else {
dbDir = dbDir + "/" + dbName
}
return dbDir
}
func persistRun(timings timingResult, db *sql.DB) {
tx, err := db.Begin()
if err != nil {
log.Fatalln(err)
}
insertRun, err := db.Prepare(`INSERT INTO runs(
waitgroup_size,
sequential_total_ms,
concurrent_total_ms,
concurrent_sequential_ratio)
VALUES(?, ?, ?, ?)`)
if err != nil {
log.Fatalln(err)
}
defer tx.Stmt(insertRun).Close()
_, err = tx.Stmt(insertRun).Exec(
timings.WaitgroupSize,
timings.SequentialTotalMs,
timings.ConcurrentTotalMs,
float32(timings.ConcurrentTotalMs)/float32(timings.SequentialTotalMs),
)
if err != nil {
log.Fatalln(err)
}
err = tx.Commit()
if err != nil {
log.Fatalln(err)
}
}
func getCurrentRunId(db *sql.DB) int {
rows, err := db.Query("SELECT MAX(run_id) FROM runs")
if err != nil {
log.Fatal(err)
}
var run_id int
for rows.Next() {
err = rows.Scan(&run_id)
if err != nil {
log.Fatalln(err)
}
}
rows.Close()
return run_id
}
func persistConcurrentTimings(runId int, timings timingResult, db *sql.DB) {
tx, err := db.Begin()
if err != nil {
log.Fatalln(err)
}
insertTiming, err := db.Prepare(`INSERT INTO concurrent_timings(
run,
channel_position,
timing_ms)
VALUES(?, ?, ?)`)
if err != nil {
log.Fatalln(err)
}
for i, timing := range timings.ConcurrentTimingsMs {
_, err = tx.Stmt(insertTiming).Exec(
runId,
i,
timing,
)
if err != nil {
log.Fatalln(err)
}
}
err = tx.Commit()
if err != nil {
log.Fatalln(err)
}
}
func persistSequentialTimings(runId int, timings timingResult, db *sql.DB) {
tx, err := db.Begin()
if err != nil {
log.Fatalln(err)
}
insertTiming, err := db.Prepare(`INSERT INTO sequential_timings(
run,
call_number,
timing_ms)
VALUES(?, ?, ?)`)
if err != nil {
log.Fatalln(err)
}
for i, timing := range timings.SequentialTimingsMs {
_, err = tx.Stmt(insertTiming).Exec(
runId,
i,
timing,
)
if err != nil {
log.Fatalln(err)
}
}
err = tx.Commit()
if err != nil {
log.Fatalln(err)
}
}
- 1 回答
- 0 关注
- 167 浏览
添加回答
举报