WebSocket Hub:Go 版 Actor 模型

gortex 的 WebSocket Hub 是一個永遠跑在獨立 goroutine 的事件迴圈,所有對 clients map 的存取都走 channel。這是 Go 版的 actor 模型,用通訊取代鎖。

前置:A4 goroutine/channel。

Hub.Run:單 goroutine 事件迴圈

A4 講的 actor 心法,這裡是實作。Hub.Run() 是一個永遠跑在獨立 goroutine 的 for { select },所有狀態(clients map、messageTypes)只有它一個人摸:

func (h *Hub) Run() {
    for {
        select {
        case req := <-h.register:    h.registerClient(req.client); req.result <- true
        case req := <-h.unregister:  h.unregisterClient(req.client); close(req.done)
        case op := <-h.broadcast:    h.broadcastMessage(op.msg)
        case req := <-h.clientCount: req.response <- len(h.clients)
        case req := <-h.metricsReq:  req.response <- h.snapshotMetrics()
        case <-h.shutdown:           h.runShutdown(); return
        }
    }
}

clients 就是個純 map[*Client]bool,沒有任何鎖,因為只有 Run 這個 goroutine 會碰它。

用 channel 取代鎖

外部要做任何事,都送一個 request 進 channel,而不是直接動 map。要連線數就送一個帶 response chan 的請求,等回覆:

func (h *Hub) GetConnectedClients() int {
    req := clientRequest{response: make(chan int)}
    select {
    case h.clientCount <- req:
        return <-req.response
    case <-h.shutdown:
        return 0
    }
}

RegisterClient 更講究:它 block 到 hub 真的記錄完才回(result chan 緩衝 1),所以呼叫端不用「sleep 然後祈禱」。註冊與關機互斥,client 的 send channel 保證只被關一次。把同步點集中到 channel,狀態就不需要鎖。

注意跨 goroutine 的計數器(totalConnectionsmessagesSent…)仍用 atomic.Int64,這些任何 goroutine 都會讀,用 atomic 才安全;只有單一 goroutine 摸的 messageTypes map 才免鎖。連 actor 內部也是按資料性質選工具,正好對照 B5。

慢消費者:select default 丟棄

actor 模型最大的風險是慢消費者拖垮整個 loop。gortex 的對策是「滿了就丟」。廣播時對每個 client:

select {
case client.send <- message:
    h.messagesSent.Add(1)
default:
    h.forcedDisconnects.Add(1) // 送不進去(buffer 滿)
    h.logger.Warn("client send full, closing", zap.String("client_id", client.ID))
    go h.removeClient(client)  // 直接踢掉慢的
}

Broadcast 本身也是 fire-and-forget:broadcast channel(緩衝 256)滿了就丟掉這則並 droppedBroadcasts++。原則是慢消費者永遠不能 block 生產者;而丟棄與強制斷線都記進 metrics,不會變成只活在 log 裡的隱形事件。

優雅關機

關機要讓在途訊息送完,又不能堵住別人。shutdownOnce sync.Once 保證 close(h.shutdown) 只關一次。runShutdown 在 hub goroutine 裡跑:先對所有 client 送 1001(Going Away)close frame,接著是關鍵:用 serveDuringGrace(500ms) 在這段寬限期內繼續服務 channel,而不是裸 time.Sleep,所以併發中的 Register / GetMetrics 呼叫不會被凍住。空 hub 直接跳過寬限、即刻關閉;寬限結束才關掉所有 client 的 send channel。

hub 還有兩個安全預設(細節留 B8):每則訊息 64 KiB 上限擋 oversize-message DoS,以及 Authorizer,私訊的 Target 在 Authorizer 執行前就解析好,所以授權策略看得到最終收件者。

重點整理

  • Hub.Run() 是單 goroutine 事件迴圈;clients map 只有它碰,所以免鎖,這就是 Go 版 actor。
  • 外部用「送 request + 等 response chan」跟 hub 互動;RegisterClient 同步回報,不用 sleep。
  • 跨 goroutine 計數器仍用 atomic,單 goroutine 的 map 才免鎖(工具對了才省)。
  • 慢消費者「滿了就丟 + 強制斷線」,永不 block 生產者;丟棄與斷線都進 metrics。
  • 優雅關機用 sync.Once + 寬限期內持續服務 channel(非 sleep),空 hub 即刻關。

原始碼:yshengliao/gortex


大綱 Sheng,內文 Claude 協助 · 環境 Go 1.25(gortex go.mod)· 本系列為事後回填整理 · 列入 20260613 blog 翻新計劃,新漆未乾。