100 lines
2.6 KiB
Go
100 lines
2.6 KiB
Go
package serverevents
|
|
|
|
import (
|
|
gocontext "context"
|
|
"fmt"
|
|
di "git.apihub24.de/admin/generic-di"
|
|
"github.com/coder/websocket"
|
|
"github.com/coder/websocket/wsjson"
|
|
"sync"
|
|
"time"
|
|
)
|
|
|
|
func init() {
|
|
di.Injectable(newHub)
|
|
}
|
|
|
|
type IEventHub interface {
|
|
SetContextLifetime(duration time.Duration)
|
|
OnConnected(connectionId string, conn *websocket.Conn)
|
|
OnDisconnected(connectionId string)
|
|
Dispatch(event Event, callerCtx IContext)
|
|
Event(event Event, id string)
|
|
}
|
|
|
|
type eventHub struct {
|
|
mu sync.Mutex
|
|
contextLifetime time.Duration
|
|
contexts map[string]IContext
|
|
registration IEventHandlerRegistration
|
|
parser IMessageParser
|
|
}
|
|
|
|
func newHub() IEventHub {
|
|
return &eventHub{
|
|
mu: sync.Mutex{},
|
|
contextLifetime: 500 * time.Second,
|
|
contexts: make(map[string]IContext),
|
|
registration: di.Inject[IEventHandlerRegistration](),
|
|
parser: di.Inject[IMessageParser](),
|
|
}
|
|
}
|
|
|
|
func (hub *eventHub) Dispatch(event Event, callerCtx IContext) {
|
|
// dispatch Event to Handler
|
|
handler, getHandlerErr := hub.registration.GetHandler(event.Type)
|
|
if getHandlerErr == nil && handler.CanExecute(callerCtx) {
|
|
handler.Handle(callerCtx, event.Data)
|
|
}
|
|
|
|
if event.IsBackendOnly || event.Filter == nil {
|
|
// only dispatch Event on Backend or we have no Context Filter
|
|
// if you want to dispatch to all Clients use Filter that returns true
|
|
return
|
|
}
|
|
|
|
writeCtx, cancel := gocontext.WithTimeout(gocontext.Background(), time.Second*10)
|
|
defer cancel()
|
|
for _, ctx := range hub.contexts {
|
|
if event.Filter(ctx) {
|
|
writeErr := wsjson.Write(writeCtx, ctx.GetConnection(), event)
|
|
if writeErr != nil {
|
|
println(fmt.Sprintf("[Error on write to Websocket]: %v", writeErr))
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (hub *eventHub) SetContextLifetime(duration time.Duration) {
|
|
hub.contextLifetime = duration
|
|
}
|
|
|
|
func (hub *eventHub) Event(event Event, id string) {
|
|
callerCtx, ok := hub.contexts[id]
|
|
if !ok {
|
|
println(fmt.Sprintf("[Error on select caller context]: missing context connectionId: %s", id))
|
|
return
|
|
}
|
|
|
|
// Event from Client not send back!
|
|
event.IsBackendOnly = true
|
|
hub.Dispatch(event, callerCtx)
|
|
}
|
|
|
|
func (hub *eventHub) OnConnected(connectionId string, conn *websocket.Conn) {
|
|
hub.mu.Lock()
|
|
defer hub.mu.Unlock()
|
|
|
|
hub.contexts[connectionId] = di.Inject[IContext](connectionId)
|
|
hub.contexts[connectionId].SetId(connectionId)
|
|
hub.contexts[connectionId].SetConnection(conn)
|
|
}
|
|
|
|
func (hub *eventHub) OnDisconnected(connectionId string) {
|
|
hub.mu.Lock()
|
|
defer hub.mu.Unlock()
|
|
|
|
hub.contexts[connectionId].CleanupIn(hub.contextLifetime)
|
|
delete(hub.contexts, connectionId)
|
|
}
|