WebSocket Hub:Go 版 Actor 模型
WebSocket Hub:Go 版 Actor 模型
gortex 的 WebSocket Hub 是一個永遠跑在獨立 goroutine 的事件迴圈,所有對 clients map 的存取都走 channel。這是 Go 版的 actor 模型,用通訊取代鎖。
前置:A4 goroutine/channel。
Hub.Run:單 goroutine 事件迴圈
A4 講的 actor 心法,這裡是實作。Hub.Run() 是一個永遠跑在獨立 goroutine 的 for { select },所有狀態(clients map、messageTypes)只有它一個人摸:
func (h *Hub) Run() {
for {
select {
case req := <-h.register: h.registerClient(req.client); req.result <- true
case req := <-h.unregister: h.unregisterClient(req.client); close(req.done)
case op := <-h.broadcast: h.broadcastMessage(op.msg)
case req := <-h.clientCount: req.response <- len(h.clients)
case req := <-h.metricsReq: req.response <- h.snapshotMetrics()
case <-h.shutdown: h.runShutdown(); return
}
}
}
clients 就是個純 map[*Client]bool,沒有任何鎖,因為只有 Run 這個 goroutine 會碰它。
用 channel 取代鎖
外部要做任何事,都送一個 request 進 channel,而不是直接動 map。要連線數就送一個帶 response chan 的請求,等回覆:
func (h *Hub) GetConnectedClients() int {
req := clientRequest{response: make(chan int)}
select {
case h.clientCount <- req:
return <-req.response
case <-h.shutdown:
return 0
}
}
RegisterClient 更講究:它 block 到 hub 真的記錄完才回(result chan 緩衝 1),所以呼叫端不用「sleep 然後祈禱」。註冊與關機互斥,client 的 send channel 保證只被關一次。把同步點集中到 channel,狀態就不需要鎖。
注意跨 goroutine 的計數器(totalConnections、messagesSent…)仍用 atomic.Int64,這些任何 goroutine 都會讀,用 atomic 才安全;只有單一 goroutine 摸的 messageTypes map 才免鎖。連 actor 內部也是按資料性質選工具,正好對照 B5。
慢消費者:select default 丟棄
actor 模型最大的風險是慢消費者拖垮整個 loop。gortex 的對策是「滿了就丟」。廣播時對每個 client:
select {
case client.send <- message:
h.messagesSent.Add(1)
default:
h.forcedDisconnects.Add(1) // 送不進去(buffer 滿)
h.logger.Warn("client send full, closing", zap.String("client_id", client.ID))
go h.removeClient(client) // 直接踢掉慢的
}
Broadcast 本身也是 fire-and-forget:broadcast channel(緩衝 256)滿了就丟掉這則並 droppedBroadcasts++。原則是慢消費者永遠不能 block 生產者;而丟棄與強制斷線都記進 metrics,不會變成只活在 log 裡的隱形事件。
優雅關機
關機要讓在途訊息送完,又不能堵住別人。shutdownOnce sync.Once 保證 close(h.shutdown) 只關一次。runShutdown 在 hub goroutine 裡跑:先對所有 client 送 1001(Going Away)close frame,接著是關鍵:用 serveDuringGrace(500ms) 在這段寬限期內繼續服務 channel,而不是裸 time.Sleep,所以併發中的 Register / GetMetrics 呼叫不會被凍住。空 hub 直接跳過寬限、即刻關閉;寬限結束才關掉所有 client 的 send channel。
hub 還有兩個安全預設(細節留 B8):每則訊息 64 KiB 上限擋 oversize-message DoS,以及 Authorizer,私訊的 Target 在 Authorizer 執行前就解析好,所以授權策略看得到最終收件者。
重點整理
Hub.Run()是單 goroutine 事件迴圈;clientsmap 只有它碰,所以免鎖,這就是 Go 版 actor。- 外部用「送 request + 等 response chan」跟 hub 互動;
RegisterClient同步回報,不用 sleep。 - 跨 goroutine 計數器仍用
atomic,單 goroutine 的 map 才免鎖(工具對了才省)。 - 慢消費者「滿了就丟 + 強制斷線」,永不 block 生產者;丟棄與斷線都進 metrics。
- 優雅關機用
sync.Once+ 寬限期內持續服務 channel(非 sleep),空 hub 即刻關。
原始碼:yshengliao/gortex。
大綱 Sheng,內文 Claude 協助 · 環境 Go 1.25(gortex go.mod)· 本系列為事後回填整理 · 列入 20260613 blog 翻新計劃,新漆未乾。