亚洲在线久爱草,狠狠天天香蕉网,天天搞日日干久草,伊人亚洲日本欧美

為了賬號安全,請及時綁定郵箱和手機立即綁定
已解決430363個問題,去搜搜看,總會有你想問的

添加回調而不是使用默認實現

添加回調而不是使用默認實現

Go
紅顏莎娜 2022-07-04 16:49:50
我正在使用以下代碼,它按預期工作。用戶向配置中添加testers一個新條目(現在它是硬編碼的,但它將來自配置文件),該條目返回一個TAP他需要檢查并通過 http 調用并行運行它們的列表。還有一個我需要支持的用例,即用戶還將提供一個function/method/callback函數將通過 http/curl/websocket/他需要的任何東西實現調用(而不是 check() 函數),并且該函數將返回響應無論是 200/400/500。例如,假設用戶除了配置點擊列表之外還實現了兩個函數/回調,程序將執行與列表相同的函數testers,這些函數將調用其他站點,例如: "http://www.yahoo.com"和https://www.bing.comcurl 或 http(只是為了演示區別)甚至一些是實現方法檢查以返回一些子進程執行結果。我怎樣才能以干凈的方式做到這一點?package mainimport (    "fmt"    "net/http"    "time")type HT interface {    Name() string    Check() (*testerResponse, error)}type testerResponse struct {    err  error    name string    res  http.Response    url  string}type Tap struct {    url     string    name    string    timeout time.Duration    client  *http.Client}func NewTap(name, url string, timeout time.Duration) *Tap {    return &Tap{        url:    url,        name:   name,        client: &http.Client{Timeout: timeout},    }}func (p *Tap) Check() testerResponse {    fmt.Printf("Fetching %s %s \n", p.name, p.url)    // theres really no need for NewTap    nt := NewTap(p.name, p.url, p.timeout)    res, err := nt.client.Get(p.url)    if err != nil {        return testerResponse{err: err}    }    // need to close body    res.Body.Close()    return testerResponse{name: p.name, res: *res, url: p.url}}func (p *Tap) Name() string {    return p.name}// makeJobs fills up our jobs channelfunc makeJobs(jobs chan<- Tap, taps []Tap) {    for _, t := range taps {        jobs <- t    }更新 我嘗試過以下 https://play.golang.org/p/cRPPzke27dZ但不確定如何調用該custom handlers check()方法以在并行調用(例如配置)中從它們獲取testers數據
查看完整描述

2 回答

?
婷婷同學_

TA貢獻1844條經驗 獲得超8個贊

更新 5(接受的答案)

*既然您對這個問題感興趣,那么您可能也對這個問題感興趣。有關如何使用自動取消超時運行每個作業的更多信息,請參見此處。*


要回答您的問題,您將如何添加隨機函數..


我不知道你想要返回什么類型,但你可以做任何你想做的事情。


有大約一百萬種不同的方法可以做到這一點,這只是一個例子:


package main


import (

    "encoding/json"

    "fmt"


    "github.com/gammazero/workerpool"

)


var (

    numWorkers = 10

)


type MyReturnType struct {

    Name string

    Data interface{}

}


func wrapJob(rc chan MyReturnType, f func() MyReturnType) func() {

    return func() {

        rc <- f()

    }

}


func main() {

    // create results chan and worker pool

    // should prob make your results channel typed to what you need

    jobs := []func() MyReturnType {

        func() MyReturnType {

            // whatever you want to do here

            return MyReturnType{Name: "job1", Data: map[string]string{"Whatever": "You want"}}

        },

        func() MyReturnType {

            // whatever you want to do here

            // do a curl or a kubectl or whatever you want

            resultFromCurl := "i am a result"

            return MyReturnType{Name: "job2", Data: resultFromCurl}

        },

    }


    results := make(chan MyReturnType, len(jobs))

    pool := workerpool.New(numWorkers)


    for _, job := range jobs {

        j := job

        pool.Submit(wrapJob(results, j))

    }


    // Wait for all jobs to finish

    pool.StopWait()


    // Close results chan

    close(results)


    // Iterate over results, printing to console

    for res := range results {

        prettyPrint(res)

    }

}


func prettyPrint(i interface{}) {

    prettyJSON, err := json.MarshalIndent(i, "", "    ")

    if err != nil {

        fmt.Printf("Error : %s \n", err.Error())

    }

    fmt.Printf("MyReturnType %s\n", string(prettyJSON))

}

返回:


// MyReturnType {

//     "Name": "job2",

//     "Data": "i am a result"

// }

// MyReturnType {

//     "Name": "job1",

//     "Data": {

//         "Whatever": "You want"

//     }

// }

更新 4

在研究了幾個小時之后,我建議使用類似的東西workerpool,你可以在這里找到。老實說,使用workerpool似乎在這里最有意義。它看起來已經準備好生產并且被少數相當大的名字使用(請參閱他們的 repo 中的自述文件)。


我寫了一個小例子,展示了如何使用workerpool:


package main


import (

    "fmt"

    "net/http"

    "time"


    "github.com/gammazero/workerpool"

)


var (

    numWorkers = 10

    urls       = []string{"yahoo.com", "example.com", "google.com"}

)


func main() {

    // create results chan and worker pool

    // should prob make your results channel typed to what you need

    results := make(chan interface{}, len(urls))

    pool := workerpool.New(numWorkers)


    // Create jobs by iterating over urls

    for i, u := range urls {

        url := u

        jobNum := i


        // Create job

        f := func() {

            start := time.Now()

            c := &http.Client{}

            r, e := c.Get("http://" + url)

            if e != nil {

                fmt.Println(e.Error())

            }

            took := time.Since(start).Milliseconds()

            o := fmt.Sprintf("completed job '%d' to '%s' in '%dms' with status code '%d'\n", jobNum, url, took, r.StatusCode)

            results <- o

        }


        // Add job to workerpool

        pool.Submit(f)

    }


    // Wait for all jobs to finish

    pool.StopWait()


    // Close results chan

    close(results)


    // Iterate over results, printing to console

    for res := range results {

        fmt.Printf(res.(string))

    }

}

哪個輸出:


// completed job '1' to 'example.com' in '81ms' with status code '200'

// completed job '2' to 'google.com' in '249ms' with status code '200'

// completed job '0' to 'yahoo.com' in '816ms' with status code '200'

更新 3

我繼續編寫了一個工作池庫(在 的幫助下workerpool),因為我還想更深入地研究通道和并發設計。


你可以在這里找到 repo和下面的代碼。


如何使用:


pool := New(3)


pool.Job(func() {

    c := &http.Client{}

    r, e := c.Get("http://google.com")

    if e != nil {

        panic(e.Error())

    }

    fmt.Printf("To google.com %d\n", r.StatusCode)

})


pool.Job(func() {

    c := &http.Client{}

    r, e := c.Get("http://yahoo.com")

    if e != nil {

        panic(e.Error())

    }

    fmt.Printf("To yahoo.com %d\n", r.StatusCode)

})


pool.Job(func() {

    c := &http.Client{}

    r, e := c.Get("http://example.com")

    if e != nil {

        panic(e.Error())

    }

    fmt.Printf("To example.com %d\n", r.StatusCode)

})


pool.Seal()

工作池代碼(水坑)

package puddle


import (

    "container/list"

    "fmt"

    "net/http"

    "sync"

    "time"

)


const (

    idleTimeout = time.Second * 2

)


// New creates a new puddle (aka worker pool)

func New(maxWorkers int) Puddle {

    // There must be at least one worker

    if maxWorkers < 1 {

        maxWorkers = 1

    }


    p := &puddle{

        maxWorkers: maxWorkers,

        jobs:       make(chan func(), 1),

        workers:    make(chan func()),

        killswitch: make(chan struct{}),

    }


    // Start accepting/working jobs as they come in

    go p.serve()


    return p

}


// Puddle knows how to interact with worker pools

type Puddle interface {

    Job(f func())

    Seal()

}


// puddle is a worker pool that holds workers, tasks, and misc metadata

type puddle struct {

    maxWorkers int

    jobs       chan func()

    workers    chan func()

    killswitch chan struct{}

    queue      List

    once       sync.Once

    stopped    int32

    waiting    int32

    wait       bool

}


// Job submits a new task to the worker pool

func (p *puddle) Job(f func()) {

    if f != nil {

        p.jobs <- f

    }

}


// Seal stops worker pool and waits for queued tasks to complete

func (p *puddle) Seal() {

    p.stop(true)

}


func (p *puddle) stop(wait bool) {

    p.once.Do(func() {

        p.wait = wait

        // Close task queue and wait for currently running tasks to finish

        close(p.jobs)

    })

    <-p.killswitch

}


func (p *puddle) killWorkerIfIdle() bool {

    select {

    case p.workers <- nil:

        // Kill worker

        return true

    default:

        // No ready workers

        return false

    }

}


// process puts new jobs onto the queue, and removes jobs from the queue as workers become available.

// Returns false if puddle is stopped.

func (p *puddle) process() bool {

    select {

    case task, ok := <-p.jobs:

        if !ok {

            return false

        }

        p.queue.PushBack(task)

    case p.workers <- p.queue.Front().Value.(func()):

        // Give task to ready worker

        p.queue.PopFront()

    }

    return true

}


func (p *puddle) serve() {

    defer close(p.killswitch)

    timeout := time.NewTimer(idleTimeout)

    var workerCount int

    var idle bool


Serving:

    for {

        if p.queue.Len() != 0 {

            if !p.process() {

                break Serving

            }

            continue

        }


        select {

        case job, ok := <-p.jobs:

            if !ok {

                break Serving

            }


            // Give a task to our workers

            select {

            case p.workers <- job:

            default:

                // If we are not maxed on workers, create a new one

                if workerCount < p.maxWorkers {

                    go startJob(job, p.workers)

                    workerCount++

                } else {

                    // Place a task on the back of the queue

                    p.queue.PushBack(job)

                }

            }

            idle = false

        case <-timeout.C:

            // Timed out waiting for work to arrive.  Kill a ready worker if

            // pool has been idle for a whole timeout.

            if idle && workerCount > 0 {

                if p.killWorkerIfIdle() {

                    workerCount--

                }

            }

            idle = true

            timeout.Reset(idleTimeout)

        }

    }


    // Allow queued jobs to complete

    if p.wait {

        p.work()

    }


    // Stop all workers before shutting down

    for workerCount > 0 {

        p.workers <- nil

        workerCount--

    }


    timeout.Stop()

}


// work removes each task from the waiting queue and gives it to

// workers until queue is empty.

func (p *puddle) work() {

    for p.queue.Len() != 0 {

        // A worker is ready, so give task to worker.

        p.workers <- p.queue.PopFront()

    }

}


// startJob runs initial task, then starts a worker waiting for more.

func startJob(job func(), workerQueue chan func()) {

    job()

    go worker(workerQueue)

}


// worker executes tasks and stops when it receives a nil task.

func worker(queue chan func()) {

    for job := range queue {

        if job == nil {

            return

        }

        job()

    }

}


// List wraps `container/list`

type List struct {

    list.List

}


// PopFront removes then returns first element in list as func()

func (l *List) PopFront() func() {

    f := l.Front()

    l.Remove(f)

    return f.Value.(func())

}

更新 2

由于您詢問如何使用代碼,這就是您要這樣做的方式。


我變成worker了它自己的包,并編寫了另一個 repo 來展示如何使用該包。


工人包

如何使用工人包

worker包裹

package worker


import "fmt"


type JobResponse struct {

    err  error

    name string

    res  int

    url  string

}


type Job interface {

    Name() string

    Callback() JobResponse

}


func Do(jobs []Job, maxWorkers int) {

    jobsPool := make(chan Job, len(jobs))

    resultsPool := make(chan JobResponse, len(jobs))


    for i := 0; i < maxWorkers; i++ {

        go worker(jobsPool, resultsPool)

    }


    makeJobs(jobsPool, jobs)

    getResults(resultsPool, jobs)

}


func worker(jobs <-chan Job, response chan<- JobResponse) {

    for n := range jobs {

        response <- n.Callback()

    }

}


func makeJobs(jobs chan<- Job, queue []Job) {

    for _, t := range queue {

        jobs <- t

    }

}


func getResults(response <-chan JobResponse, queue []Job) {

    for range queue {

        job := <-response

        status := fmt.Sprintf("[result] '%s' to '%s' was fetched with status '%d'\n", job.name, job.url, job.res)

        if job.err != nil {

            status = fmt.Sprintf(job.err.Error())

        }

        fmt.Printf(status)

    }

}

如何使用工人包

package main


import (

    "github.com/oze4/worker"

)


func main() {

    jobs := []worker.Job{

        AddedByUser{name: "1"},

        AddedByUser{name: "2"},

        AddedByUser{name: "3"},

        AddedByUser{name: "4"},

        AddedByUser{name: "5"},

        AddedByUser{name: "6"},

    }

    

    worker.Do(jobs, 5)

}


type AddedByUser struct {

    name string

}


func (a AddedByUser) Name() string {

    return a.name

}


func (a AddedByUser) Callback() worker.JobResponse {

    // User added func/callback goes here

    return worker.JobResponse{}

}

更新

我重命名了一些東西,希望能幫助它更清楚一點。


這是您需要的基礎知識:


package main


import (

    "fmt"

)


func main() {

    fmt.Println("Hello, playground")

}


type JobResponse struct {

    err  error

    name string

    res  int

    url  string

}


type Job interface {

    Name() string

    Callback() JobResponse

}


func worker(jobs <-chan Job, response chan<- JobResponse) {

    for n := range jobs {

        response <- n.Callback()

    }

}


func makeJobs(jobs chan<- Job, queue []Job) {

    for _, t := range queue {

        jobs <- t

    }

}


func getResults(response <-chan JobResponse, queue []Job) {

    for range queue {

        j := <-response

        status := fmt.Sprintf("[result] '%s' to '%s' was fetched with status '%d'\n", j.name, j.url, j.res)

        if j.err != nil {

            status = fmt.Sprintf(j.err.Error())

        }

        fmt.Printf(status)

    }

}

只要我滿足Job接口,我就可以將它傳遞給 worker、makeJobs 和 getResults:


type AddedByUser struct {

    name string

}


func (a AddedByUser) Name() string {

    return a.name

}


func (a AddedByUser) Callback() JobResponse {

    // User added func/callback goes here

    return JobResponse{}

}

像這樣:


package main


import (

    "fmt"

)


func main() {

    jobsPool := make(chan Job, len(testers))

    resultsPool := make(chan JobResponse, len(testers))


    maxWorkers := 5

    for i := 0; i < maxWorkers; i++ {

        go worker(jobsPool, resultsPool)

    }


    makeJobs(jobsPool, testers)

    getResults(resultsPool, testers)

}


var testers = []Job{

    AddedByUser{name: "abu"}, // Using different types in Job

    Tap{name: "tap"},         // Using different types in Job

}


type AddedByUser struct {

    name string

}


func (a AddedByUser) Name() string {

    return a.name

}


func (a AddedByUser) Callback() JobResponse {

    // User added func/callback goes here

    return JobResponse{}

}


type Tap struct {

    name string

}


func (t Tap) Name() string {

    return t.name

}


func (t Tap) Callback() JobResponse {

    // User added func/callback goes here

    return JobResponse{}

}


type JobResponse struct {

    err  error

    name string

    res  int

    url  string

}


type Job interface {

    Name() string

    Callback() JobResponse

}


func worker(jobs <-chan Job, response chan<- JobResponse) {

    for n := range jobs {

        response <- n.Callback()

    }

}


func makeJobs(jobs chan<- Job, queue []Job) {

    for _, t := range queue {

        jobs <- t

    }

}


func getResults(response <-chan JobResponse, queue []Job) {

    for range queue {

        job := <-response

        status := fmt.Sprintf("[result] '%s' to '%s' was fetched with status '%d'\n", job.name, job.url, job.res)

        if job.err != nil {

            status = fmt.Sprintf(job.err.Error())

        }

        fmt.Printf(status)

    }

}

原始答案

[添加此答案是因為 OP 和我一直在此線程之外交談]


您的代碼中有幾個錯誤,但最終您所要做的就是接受人們給您的建議。你只需要連接點。我建議對您的代碼進行故障排除并嘗試完全了解問題所在。老實說,這是唯一的學習方法。


我能記住的最大問題是:


需要修改您的HT界面,以便Check(...)簽名匹配每個方法

否則,這些結構 ( Tap, Tap1, Tap2) 不滿足HT接口,因此不實現 HT

funcs worker(...)、makeJobs(...)和getResults(...)中的參數類型從更改[]Tap為[]HT 

您沒有將所有 Taps 聚合到一個切片中

我們可以將所有不同的 Taps 用作 HT 的唯一原因是因為它們都實現了 HT

你在找這樣的東西嗎?


https://play.golang.org/p/zLmKOKAnX4C


package main


import (

    "fmt"

    "net/http"


    // "os/exec"

    "time"

)


type HT interface {

    Name() string

    Check() testerResponse

}


type testerResponse struct {

    err  error

    name string

    //res  http.Response

    res int

    url string

}


type Tap struct {

    url     string

    name    string

    timeout time.Duration

    client  *http.Client

}


func (p *Tap) Check() testerResponse {

    fmt.Printf("[job][Tap1] Fetching %s %s \n", p.name, p.url)

    p.client = &http.Client{Timeout: p.timeout}

    res, err := p.client.Get(p.url)

    if err != nil {

        return testerResponse{err: err}

    }


    // need to close body

    res.Body.Close()

    return testerResponse{name: p.name, res: res.StatusCode, url: p.url}

}


func (p *Tap) Name() string {

    return p.name

}


// ---- CUSTOM CHECKS-------------

// ---- 1. NEW specific function -------------


type Tap2 struct {

    url     string

    name    string

    timeout time.Duration

    client  *http.Client

}


func (p *Tap2) Check() testerResponse {

    // Do some request here.....

    fmt.Printf("[job][Tap2] Fetching %s %s \n", p.name, p.url)

    return testerResponse{res: 200, url: p.url, name: p.name}

}


func (p *Tap2) Name() string {

    return "yahoo custom check"

}


// ---- 2. NEW specific function which is not running http


type Tap3 struct {

    url     string

    name    string

    timeout time.Duration

    client  *http.Client

}


func (p *Tap3) Check() testerResponse {

    // Do some request here....

    fmt.Printf("[job][Tap3] Fetching %s %s \n", p.name, p.url)

    return testerResponse{res: 200, url: p.url, name: p.name}

}


func (p *Tap3) Name() string {

    return "custom check2"

}


// makeJobs fills up our jobs channel

func makeJobs(jch chan<- HT, jobs []HT) {

    for _, t := range jobs {

        jch <- t

    }

}


// getResults takes a job from our jobs channel, gets the result, and

// places it on the results channel

func getResults(tr <-chan testerResponse, jobs []HT) []testerResponse {

    var rts []testerResponse

    var r testerResponse

    for range jobs {

        r = <-tr

        status := fmt.Sprintf("[result] '%s' to '%s' was fetched with status '%d'\n", r.name, r.url, r.res)

        if r.err != nil {

            status = fmt.Sprintf(r.err.Error())

        }

        fmt.Printf(status)

        rts = append(rts, r)

    }

    return rts

}


// worker defines our worker func. as long as there is a job in the

// "queue" we continue to pick up  the "next" job

func worker(jobs <-chan HT, results chan<- testerResponse) {

    for n := range jobs {

        results <- n.Check()

    }

}


var (

    testers1 = []Tap{

        {

            name:    "First Tap1",

            url:     "http://google.com",

            timeout: time.Second * 20,

        },

        {

            name:    "Second Tap1",

            url:     "http://stackoverflow.com",

            timeout: time.Second * 20,

        },

    }


    testers2 = []Tap2{

        {

            name: "First Tap2",

            url:  "http://1.tap2.com",

        },

        {

            name: "Second Tap2",

            url:  "http://2.tap2.com",

        },

    }


    testers3 = []Tap3{

        {

            name: "First Tap3",

            url:  "http://1.tap3.com",

        },

        {

            name: "Second Tap3",

            url:  "http://2.tap3.com",

        },

    }

)


func main() {

    // Aggregate all testers into one slice

    var testers []HT

    for _, t1 := range testers1 {

        testers = append(testers, &t1)

    }

    for _, t2 := range testers2 {

        testers = append(testers, &t2)

    }

    for _, t3 := range testers3 {

        testers = append(testers, &t3)

    }


    // Make buffered channels

    buffer := len(testers)

    jobsPipe := make(chan HT, buffer)                // Jobs will be of type `HT`

    resultsPipe := make(chan testerResponse, buffer) // Results will be of type `testerResponse`


    // Create worker pool

    // Max workers default is 5

    maxWorkers := 5

    for i := 0; i < maxWorkers; i++ {

        go worker(jobsPipe, resultsPipe)

    }


    makeJobs(jobsPipe, testers)

    getResults(resultsPipe, testers)

    //fmt.Println("at the end",tr)

}

哪個輸出:


// [job][Tap1] Fetching Second Tap1 http://stackoverflow.com 

// [job][Tap2] Fetching Second Tap2 http://2.tap2.com 

// [job][Tap3] Fetching Second Tap3 http://2.tap3.com 

// [job][Tap3] Fetching Second Tap3 http://2.tap3.com 

// [result] 'Second Tap2' to 'http://2.tap2.com' was fetched with status '200'

// [result] 'Second Tap3' to 'http://2.tap3.com' was fetched with status '200'

// [result] 'Second Tap3' to 'http://2.tap3.com' was fetched with status '200'

// [job][Tap2] Fetching Second Tap2 http://2.tap2.com 

// [job][Tap1] Fetching Second Tap1 http://stackoverflow.com 

// [result] 'Second Tap2' to 'http://2.tap2.com' was fetched with status '200'

// [result] 'Second Tap1' to 'http://stackoverflow.com' was fetched with status '200'

// [result] 'Second Tap1' to 'http://stackoverflow.com' was fetched with status '200'


查看完整回答
反對 回復 2022-07-04
?
qq_遁去的一_1

TA貢獻1725條經驗 獲得超8個贊

據我了解,您希望您的員工接受其他測試人員


查看您的代碼后,您似乎將所有部分都放在了正確的位置,并且需要在此處進行一些小的更改


// makeJobs fills up our jobs channel

func makeJobs(jobs chan<- HT, taps []Tap) {

    for _, t := range taps {

        jobs <- t

    }

}


// getResults takes a job from our jobs channel, gets the result, and

// places it on the results channel

func getResults(tr <-chan HT, taps []Tap) {

    for range taps {

        r := <-tr

        status := fmt.Sprintf("'%s' to '%s' was fetched with status '%d'\n", r.name, r.url, r.res.StatusCode)

        if r.err != nil {

            status = fmt.Sprintf(r.err.Error())

        }

        fmt.Printf(status)

    }

}


// worker defines our worker func. as long as there is a job in the

// "queue" we continue to pick up  the "next" job

func worker(jobs <-chan HT, results chan<- testerResponse) {

    for n := range jobs {

        results <- n.Check()

    }

}

現在,如果您看到您的作業隊列可以接受任何實現 HT 接口的類型,那么如果您想要一個新作業,請說 Tap2,您只需


type Tap2 struct{...}


func (p *Tap2) Check() testerResponse {...}


func (p *Tap) Name() string {...}

現在您可以將 Tap 和 Tap2 推送到同一個 jobQueue,因為 job Queue 接受任何實現 HT 的類型


查看完整回答
反對 回復 2022-07-04
  • 2 回答
  • 0 關注
  • 133 瀏覽
慕課專欄
更多

添加回答

舉報

0/150
提交
取消
微信客服

購課補貼
聯系客服咨詢優惠詳情

幫助反饋 APP下載

慕課網APP
您的移動學習伙伴

公眾號

掃描二維碼
關注慕課網微信公眾號