server_events/v2/hub.go
2025-07-09 23:28:48 +02:00

100 lines
2.6 KiB
Go

package serverevents
import (
gocontext "context"
"fmt"
di "git.apihub24.de/admin/generic-di"
"github.com/coder/websocket"
"github.com/coder/websocket/wsjson"
"sync"
"time"
)
func init() {
di.Injectable(newHub)
}
type IEventHub interface {
SetContextLifetime(duration time.Duration)
OnConnected(connectionId string, conn *websocket.Conn)
OnDisconnected(connectionId string)
Dispatch(event Event, callerCtx IContext)
Event(event Event, id string)
}
type eventHub struct {
mu sync.Mutex
contextLifetime time.Duration
contexts map[string]IContext
registration IEventHandlerRegistration
parser IMessageParser
}
func newHub() IEventHub {
return &eventHub{
mu: sync.Mutex{},
contextLifetime: 500 * time.Second,
contexts: make(map[string]IContext),
registration: di.Inject[IEventHandlerRegistration](),
parser: di.Inject[IMessageParser](),
}
}
func (hub *eventHub) Dispatch(event Event, callerCtx IContext) {
// dispatch Event to Handler
handler, getHandlerErr := hub.registration.GetHandler(event.Type)
if getHandlerErr == nil && handler.CanExecute(callerCtx) {
handler.Handle(callerCtx, event.Data)
}
if event.IsBackendOnly || event.Filter == nil {
// only dispatch Event on Backend or we have no Context Filter
// if you want to dispatch to all Clients use Filter that returns true
return
}
writeCtx, cancel := gocontext.WithTimeout(gocontext.Background(), time.Second*10)
defer cancel()
for _, ctx := range hub.contexts {
if event.Filter(ctx) {
writeErr := wsjson.Write(writeCtx, ctx.GetConnection(), event)
if writeErr != nil {
println(fmt.Sprintf("[Error on write to Websocket]: %v", writeErr))
}
}
}
}
func (hub *eventHub) SetContextLifetime(duration time.Duration) {
hub.contextLifetime = duration
}
func (hub *eventHub) Event(event Event, id string) {
callerCtx, ok := hub.contexts[id]
if !ok {
println(fmt.Sprintf("[Error on select caller context]: missing context connectionId: %s", id))
return
}
// Event from Client not send back!
event.IsBackendOnly = true
hub.Dispatch(event, callerCtx)
}
func (hub *eventHub) OnConnected(connectionId string, conn *websocket.Conn) {
hub.mu.Lock()
defer hub.mu.Unlock()
hub.contexts[connectionId] = di.Inject[IContext](connectionId)
hub.contexts[connectionId].SetId(connectionId)
hub.contexts[connectionId].SetConnection(conn)
}
func (hub *eventHub) OnDisconnected(connectionId string) {
hub.mu.Lock()
defer hub.mu.Unlock()
hub.contexts[connectionId].CleanupIn(hub.contextLifetime)
delete(hub.contexts, connectionId)
}