亚洲在线久爱草,狠狠天天香蕉网,天天搞日日干久草,伊人亚洲日本欧美

為了賬號安全,請及時綁定郵箱和手機立即綁定
已解決430363個問題,去搜搜看,總會有你想問的

去 PubSub 沒有互斥鎖?

去 PubSub 沒有互斥鎖?

Go
當年話下 2022-10-24 09:35:31
我將在網站后端實現通知系統,每次頁面訪問都會為用戶訂閱頁面上顯示的一些數據,當系統發生變化時,他會收到通知。例如,有人正在查看包含新聞文章的頁面,當發布新文章時,我想通知用戶,以便他可以通過 js 或重新加載頁面來獲取這些新文章。手動或自動。為了實現這一點,我將以發布/訂閱的方式使用頻道。例如,會有一個“新聞”頻道。創建新文章時,該頻道將收到該文章的 id。當用戶打開一個頁面并訂閱“新聞”頻道(可能通過 websocket)時,必須有一個此新聞頻道的訂閱者列表,他將作為訂閱者添加到該列表中以得到通知。就像是:type Channel struct {   ingres <-chan int // news article id   subs [] chan<- int   mx sync.Mutex}這些中的每一個都將有一個 goroutine 將進入的內容分發到 subs 列表中。現在我看到的問題,可能是過早的優化,是會有很多頻道和很多來來往往的訂戶。這意味著會有很多帶有互斥鎖的世界末日事件。例如,如果有 10 000 個用戶在線,訂閱了新聞頻道,goroutine 將必須發送 10k 個通知,而 subs 切片將被鎖定,因此新訂閱者將不得不等待互斥鎖解鎖?,F在將其乘以 100 個通道,我認為我們遇到了問題。因此,我正在尋找一種方法來添加和刪除訂閱者,而不會阻止其他訂閱者被添加或刪除,或者可能只是在可接受的時間內全面接收通知。“等待所有子程序接收”問題可以通過 goroutine 解決每個子程序的超時問題,因此在收到 id 后,將創建 10k 個 goroutines 并且可以立即解鎖互斥鎖。但是,它仍然可以添加多個渠道。
查看完整描述

1 回答

?
慕容森

TA貢獻1853條經驗 獲得超18個贊

根據鏈接的評論,我想出了這段代碼:


package notif


import (

    "context"

    "sync"

    "time"

    "unsafe"

)


type Client struct {

    recv   chan interface{}

    ch     *Channel

    o      sync.Once

    ctx    context.Context

    cancel context.CancelFunc

}


// will be nil if this client is write-only

func (c *Client) Listen() <-chan interface{} {

    return c.recv

}


func (c *Client) Close() {

    select {

    case <-c.ctx.Done():

    case c.ch.unsubscribe <- c:

    }

}


func (c *Client) Done() <-chan struct{} {

    return c.ctx.Done()

}


func (c *Client) doClose() {

    c.o.Do(func() {

        c.cancel()

        if c.recv != nil {

            close(c.recv)

        }

    })

}


func (c *Client) send(msg interface{}) {

    // write-only clients will not handle any messages

    if c.recv == nil {

        return

    }

    t := time.NewTimer(c.ch.sc)

    select {

    case <-c.ctx.Done():

    case c.recv <- msg:

    case <-t.C:

        // time out/slow consumer, close the connection

        c.Close()

    }

}


func (c *Client) Broadcast(payload interface{}) bool {

    select {

    case <-c.ctx.Done():

        return false

    default:

        c.ch.Broadcast() <- &envelope{Message: payload, Sender: uintptr(unsafe.Pointer(c))}

        return true

    }

}


type envelope struct {

    Message interface{}

    Sender  uintptr

}


// leech is channel-blocking so goroutine should be called internally to make it non-blocking

// this is to ensure proper order of leeched messages.

func NewChannel(ctx context.Context, name string, slowConsumer time.Duration, emptyCh chan string, leech func(interface{})) *Channel {

    return &Channel{

        name:        name,

        ingres:      make(chan interface{}, 1000),

        subscribe:   make(chan *Client, 1000),

        unsubscribe: make(chan *Client, 1000),

        aud:         make(map[*Client]struct{}, 1000),

        ctx:         ctx,

        sc:          slowConsumer,

        empty:       emptyCh,

        leech:       leech,

    }

}


type Channel struct {

    name        string

    ingres      chan interface{}

    subscribe   chan *Client

    unsubscribe chan *Client

    aud         map[*Client]struct{}

    ctx         context.Context

    sc          time.Duration

    empty       chan string

    leech       func(interface{})

}


func (ch *Channel) Id() string {

    return ch.name

}


// subscription is read-write by default. by providing "writeOnly=true", it can be switched into write-only mode

// in which case the client will not be disconnected for being slow reader.

func (ch *Channel) Subscribe(writeOnly ...bool) *Client {

    c := &Client{

        ch: ch,

    }

    if len(writeOnly) == 0 || writeOnly[0] == false {

        c.recv = make(chan interface{})

    }

    c.ctx, c.cancel = context.WithCancel(ch.ctx)

    ch.subscribe <- c

    return c

}


func (ch *Channel) Broadcast() chan<- interface{} {

    return ch.ingres

}


// returns once context is cancelled

func (ch *Channel) Start() {

    for {

        select {

        case <-ch.ctx.Done():

            for cl := range ch.aud {

                delete(ch.aud, cl)

                cl.doClose()

            }

            return

        case cl := <-ch.subscribe:

            ch.aud[cl] = struct{}{}


        case cl := <-ch.unsubscribe:

            delete(ch.aud, cl)

            cl.doClose()

            if len(ch.aud) == 0 {

                ch.signalEmpty()

            }


        case msg := <-ch.ingres:

            e, ok := msg.(*envelope)

            if ok {

                msg = e.Message

            }

            for cl := range ch.aud {

                if ok == false || uintptr(unsafe.Pointer(cl)) != e.Sender {

                    go cl.send(e.Message)

                }

            }

            if ch.leech != nil {

                ch.leech(msg)

            }

        }

    }

}


func (ch *Channel) signalEmpty() {

    if ch.empty == nil {

        return

    }


    select {

    case ch.empty <- ch.name:

    default:

    }

}


type subscribeRequest struct {

    name string

    recv chan *Client

    wo   bool

}


type broadcastRequest struct {

    name string

    recv chan *Channel

}


type brokeredChannel struct {

    ch     *Channel

    cancel context.CancelFunc

}


type brokerLeech interface {

    Match(string) func(interface{})

}


func NewBroker(ctx context.Context, slowConsumer time.Duration, leech brokerLeech) *Broker {

    return &Broker{

        chans:     make(map[string]*brokeredChannel, 100),

        subscribe: make(chan *subscribeRequest, 10),

        broadcast: make(chan *broadcastRequest, 10),

        ctx:       ctx,

        sc:        slowConsumer,

        empty:     make(chan string, 10),

        leech:     leech,

    }

}


type Broker struct {

    chans     map[string]*brokeredChannel

    subscribe chan *subscribeRequest

    broadcast chan *broadcastRequest

    ctx       context.Context

    sc        time.Duration

    empty     chan string

    leech     brokerLeech

}


// returns once context is cancelled

func (b *Broker) Start() {

    for {

        select {

        case <-b.ctx.Done():

            return

        case req := <-b.subscribe:

            ch, ok := b.chans[req.name]

            if ok == false {

                ctx, cancel := context.WithCancel(b.ctx)

                var l func(interface{})

                if b.leech != nil {

                    l = b.leech.Match(req.name)

                }

                ch = &brokeredChannel{

                    ch:     NewChannel(ctx, req.name, b.sc, b.empty, l),

                    cancel: cancel,

                }

                b.chans[req.name] = ch

                go ch.ch.Start()

            }

            req.recv <- ch.ch.Subscribe(req.wo)


        case req := <-b.broadcast:

            ch, ok := b.chans[req.name]

            if ok == false {

                ctx, cancel := context.WithCancel(b.ctx)

                var l func(interface{})

                if b.leech != nil {

                    l = b.leech.Match(req.name)

                }

                ch = &brokeredChannel{

                    ch:     NewChannel(ctx, req.name, b.sc, b.empty, l),

                    cancel: cancel,

                }

                b.chans[req.name] = ch

                go ch.ch.Start()

            }

            req.recv <- ch.ch


        case name := <-b.empty:

            if ch, ok := b.chans[name]; ok {

                ch.cancel()

                delete(b.chans, name)

            }

        }

    }

}


// subscription is read-write by default. by providing "writeOnly=true", it can be switched into write-only mode

// in which case the client will not be disconnected for being slow reader.

func (b *Broker) Subscribe(name string, writeOnly ...bool) *Client {

    req := &subscribeRequest{

        name: name,

        recv: make(chan *Client),

        wo:   len(writeOnly) > 0 && writeOnly[0] == true,

    }

    b.subscribe <- req

    c := <-req.recv

    close(req.recv)

    return c

}


func (b *Broker) Broadcast(name string) chan<- interface{} {

    req := &broadcastRequest{name: name, recv: make(chan *Channel)}

    b.broadcast <- req

    ch := <-req.recv

    close(req.recv)

    return ch.ingres

}



查看完整回答
反對 回復 2022-10-24
  • 1 回答
  • 0 關注
  • 101 瀏覽
慕課專欄
更多

添加回答

舉報

0/150
提交
取消
微信客服

購課補貼
聯系客服咨詢優惠詳情

幫助反饋 APP下載

慕課網APP
您的移動學習伙伴

公眾號

掃描二維碼
關注慕課網微信公眾號