package serverevents import ( gocontext "context" "fmt" di "git.apihub24.de/admin/generic-di" "github.com/coder/websocket" "github.com/coder/websocket/wsjson" "sync" "time" ) func init() { di.Injectable(newHub) } type IEventHub interface { SetContextLifetime(duration time.Duration) OnConnected(connectionId string, conn *websocket.Conn) OnDisconnected(connectionId string) Dispatch(event Event, callerCtx IContext) Event(event Event, id string) } type eventHub struct { mu sync.Mutex contextLifetime time.Duration contexts map[string]IContext registration IEventHandlerRegistration parser IMessageParser } func newHub() IEventHub { return &eventHub{ mu: sync.Mutex{}, contextLifetime: 500 * time.Second, contexts: make(map[string]IContext), registration: di.Inject[IEventHandlerRegistration](), parser: di.Inject[IMessageParser](), } } func (hub *eventHub) Dispatch(event Event, callerCtx IContext) { // dispatch Event to Handler handler, getHandlerErr := hub.registration.GetHandler(event.Type) if getHandlerErr == nil && handler.CanExecute(callerCtx) { handler.Handle(callerCtx, event.Data) } if event.IsBackendOnly || event.Filter == nil { // only dispatch Event on Backend or we have no Context Filter // if you want to dispatch to all Clients use Filter that returns true return } writeCtx, cancel := gocontext.WithTimeout(gocontext.Background(), time.Second*10) defer cancel() for _, ctx := range hub.contexts { if event.Filter(ctx) { writeErr := wsjson.Write(writeCtx, ctx.GetConnection(), event) if writeErr != nil { println(fmt.Sprintf("[Error on write to Websocket]: %v", writeErr)) } } } } func (hub *eventHub) SetContextLifetime(duration time.Duration) { hub.contextLifetime = duration } func (hub *eventHub) Event(event Event, id string) { callerCtx, ok := hub.contexts[id] if !ok { println(fmt.Sprintf("[Error on select caller context]: missing context connectionId: %s", id)) return } // Event from Client not send back! event.IsBackendOnly = true hub.Dispatch(event, callerCtx) } func (hub *eventHub) OnConnected(connectionId string, conn *websocket.Conn) { hub.mu.Lock() defer hub.mu.Unlock() hub.contexts[connectionId] = di.Inject[IContext](connectionId) hub.contexts[connectionId].SetId(connectionId) hub.contexts[connectionId].SetConnection(conn) } func (hub *eventHub) OnDisconnected(connectionId string) { hub.mu.Lock() defer hub.mu.Unlock() hub.contexts[connectionId].CleanupIn(hub.contextLifetime) delete(hub.contexts, connectionId) }