From e808b7ad3cf145d78429e050f176fbb081446712 Mon Sep 17 00:00:00 2001 From: admin Date: Sun, 6 Jul 2025 21:28:31 +0200 Subject: [PATCH] v2 final --- v2/context.go | 114 +++++++++++++++++++++++ v2/context_helper.go | 19 ++++ v2/event.go | 7 +- v2/handler.go | 6 ++ v2/implementation_test/events/greeter.go | 37 ++++++++ v2/implementation_test/events/ping.go | 19 ++++ v2/implementation_test/main.go | 25 +++++ v2/middleware.go | 100 +++++++++++++++++--- v2/parser.go | 35 +++++++ v2/registration.go | 57 ++++++++++++ 10 files changed, 402 insertions(+), 17 deletions(-) create mode 100644 v2/context.go create mode 100644 v2/context_helper.go create mode 100644 v2/handler.go create mode 100644 v2/implementation_test/events/greeter.go create mode 100644 v2/implementation_test/events/ping.go create mode 100644 v2/implementation_test/main.go create mode 100644 v2/parser.go create mode 100644 v2/registration.go diff --git a/v2/context.go b/v2/context.go new file mode 100644 index 0000000..1506281 --- /dev/null +++ b/v2/context.go @@ -0,0 +1,114 @@ +package serverevents + +import ( + di "git.apihub24.de/admin/generic-di" + "sync" + "time" +) + +func init() { + di.Injectable(newContext) +} + +type IContext interface { + SetId(id string) + GetId() string + Set(key string, data any) + Get(key string) (any, bool) + RemoveMetaData(key string) + CleanupIn(lifetime time.Duration) + Dispatch(eventName string, data any, filter func(c IContext) bool) + IsCaller(c IContext) bool +} + +type context struct { + id string + metadata map[string]any + timer *time.Timer + emitter IEventEmitter + mutex sync.RWMutex +} + +func newContext() IContext { + return &context{ + id: "", + metadata: make(map[string]any), + timer: nil, + emitter: di.Inject[IEventEmitter](), + mutex: sync.RWMutex{}, + } +} + +func (context *context) SetId(id string) { + context.mutex.Lock() + defer context.mutex.Unlock() + + if context.timer != nil { + context.timer.Stop() + context.timer = nil + } + context.id = id +} + +func (context *context) GetId() string { + return context.id +} + +func (context *context) IsCaller(c IContext) bool { + return context.id == c.GetId() +} + +func (context *context) Set(key string, data any) { + context.mutex.Lock() + defer context.mutex.Unlock() + + context.metadata[key] = data +} + +func (context *context) Get(key string) (any, bool) { + context.mutex.RLock() + defer context.mutex.RUnlock() + + v, ok := context.metadata[key] + return v, ok +} + +func (context *context) RemoveMetaData(key string) { + context.mutex.Lock() + defer context.mutex.Unlock() + + if _, ok := context.metadata[key]; ok { + delete(context.metadata, key) + } +} + +func (context *context) CleanupIn(lifetime time.Duration) { + context.mutex.Lock() + defer context.mutex.Unlock() + + if context.timer != nil { + context.timer.Stop() + } + context.timer = time.NewTimer(lifetime) + go func(currentId string) { + <-context.timer.C + + context.mutex.Lock() + defer context.mutex.Unlock() + + if context.timer != nil { + di.Destroy[IContext](context.id) + context.timer = nil + } + }(context.id) +} + +func (context *context) Dispatch(eventName string, data any, filter func(c IContext) bool) { + ev := Event{ + Type: eventName, + Data: data, + IsBackendOnly: filter == nil, + Filter: filter, + } + context.emitter.Emit(ev) +} diff --git a/v2/context_helper.go b/v2/context_helper.go new file mode 100644 index 0000000..18f3fae --- /dev/null +++ b/v2/context_helper.go @@ -0,0 +1,19 @@ +package serverevents + +func CreateMetaDataFilter[T any](c IContext, metaDataSelector func(IContext) T, vgl func(a T, b T) bool) func(context IContext) bool { + return func(context IContext) bool { + metaA := metaDataSelector(c) + metaB := metaDataSelector(context) + return vgl(metaA, metaB) + } +} + +func MetadataAs[T any](context IContext, key string) (T, bool) { + var v T + tmp, ok := context.Get(key) + if tmp == nil { + return v, false + } + res, ok := tmp.(T) + return res, ok +} diff --git a/v2/event.go b/v2/event.go index abdcb22..da3e66e 100644 --- a/v2/event.go +++ b/v2/event.go @@ -1,7 +1,8 @@ package serverevents type Event struct { - Type string `json:"type"` - Data any `json:"data"` - IsBackendOnly bool `json:"-"` + Type string `json:"type"` + Data any `json:"data"` + IsBackendOnly bool `json:"-"` + Filter func(c IContext) bool `json:"-"` } diff --git a/v2/handler.go b/v2/handler.go new file mode 100644 index 0000000..c587821 --- /dev/null +++ b/v2/handler.go @@ -0,0 +1,6 @@ +package serverevents + +type IEventHandler interface { + CanExecute(IContext) bool + Handle(IContext) +} diff --git a/v2/implementation_test/events/greeter.go b/v2/implementation_test/events/greeter.go new file mode 100644 index 0000000..c22b0da --- /dev/null +++ b/v2/implementation_test/events/greeter.go @@ -0,0 +1,37 @@ +package events + +import ( + "fmt" + serverevents "git.apihub24.de/admin/server_events/v2" +) + +func withSameUserName(context serverevents.IContext) func(context serverevents.IContext) bool { + return serverevents.CreateMetaDataFilter(context, func(c serverevents.IContext) string { + userName, _ := serverevents.MetadataAs[string](context, "UserName") + return userName + }, func(a string, b string) bool { + return a == b + }) +} + +type greeting struct { + Message string +} + +type greeterEventHandler struct{} + +func (handler greeterEventHandler) CanExecute(context serverevents.IContext) bool { + _, ok := serverevents.MetadataAs[string](context, "UserName") + return ok +} + +func (handler greeterEventHandler) Handle(context serverevents.IContext) { + userName, _ := serverevents.MetadataAs[string](context, "UserName") + context.Dispatch("greet", greeting{ + Message: fmt.Sprintf("Hello, %s", userName), + }, withSameUserName(context)) +} + +func NewGreeterEventHandler() serverevents.IEventHandler { + return &greeterEventHandler{} +} diff --git a/v2/implementation_test/events/ping.go b/v2/implementation_test/events/ping.go new file mode 100644 index 0000000..7975413 --- /dev/null +++ b/v2/implementation_test/events/ping.go @@ -0,0 +1,19 @@ +package events + +import ( + serverevents "git.apihub24.de/admin/server_events/v2" +) + +type pingEventHandler struct{} + +func NewPingEventHandler() serverevents.IEventHandler { + return &pingEventHandler{} +} + +func (p pingEventHandler) CanExecute(context serverevents.IContext) bool { + return true +} + +func (p pingEventHandler) Handle(context serverevents.IContext) { + context.Dispatch("pong", nil, context.IsCaller) +} diff --git a/v2/implementation_test/main.go b/v2/implementation_test/main.go new file mode 100644 index 0000000..683c983 --- /dev/null +++ b/v2/implementation_test/main.go @@ -0,0 +1,25 @@ +package main + +import ( + di "git.apihub24.de/admin/generic-di" + serverevents "git.apihub24.de/admin/server_events/v2" + "git.apihub24.de/admin/server_events/v2/implementation_test/events" + "log" + "net/http" + "time" +) + +func main() { + serverEventsMiddleware := di.Inject[serverevents.IMiddleware]() + eventRegistration := di.Inject[serverevents.IEventHandlerRegistration]() + + eventRegistration.Add("ping", events.NewPingEventHandler) + eventRegistration.Add("greet all", events.NewGreeterEventHandler) + + serverEventsMiddleware.Use(serverevents.MiddlewareOptions{ + Path: "/events", + ContextLifetime: 500 * time.Second, + }, nil) + + log.Fatal(http.ListenAndServe(":8080", nil)) +} diff --git a/v2/middleware.go b/v2/middleware.go index 44eb915..d73e656 100644 --- a/v2/middleware.go +++ b/v2/middleware.go @@ -1,11 +1,12 @@ package serverevents import ( - "encoding/json" "fmt" di "git.apihub24.de/admin/generic-di" "github.com/gorilla/websocket" "net/http" + "sync" + "time" ) func init() { @@ -13,19 +14,25 @@ func init() { } type MiddlewareOptions struct { - Path string + Path string + ContextLifetime time.Duration } -type IServerEventsMiddleware interface { - Use(MiddlewareOptions, *http.ServeMux) +type IMiddleware interface { + Use(options MiddlewareOptions, mux *http.ServeMux) } type serverEventsMiddleware struct { + options MiddlewareOptions upgrader websocket.Upgrader streamSubscribers map[string]bool + emitter IEventEmitter + parser IMessageParser + registration IEventHandlerRegistration + mutex sync.Mutex } -func newServerEventsMiddleware() IServerEventsMiddleware { +func newServerEventsMiddleware() IMiddleware { return &serverEventsMiddleware{ upgrader: websocket.Upgrader{ CheckOrigin: func(r *http.Request) bool { @@ -33,17 +40,21 @@ func newServerEventsMiddleware() IServerEventsMiddleware { }, }, streamSubscribers: make(map[string]bool), + emitter: di.Inject[IEventEmitter](), + parser: di.Inject[IMessageParser](), + registration: di.Inject[IEventHandlerRegistration](), + mutex: sync.Mutex{}, } } func (middleware *serverEventsMiddleware) Use(options MiddlewareOptions, muxer *http.ServeMux) { + middleware.options = options middleware.selectMethod(options.Path, func(w http.ResponseWriter, r *http.Request) { middleware.handleEventStream(w, r) }, muxer) } func (middleware *serverEventsMiddleware) handleEventStream(w http.ResponseWriter, r *http.Request) { - emitter := di.Inject[IEventEmitter]() conn, err := middleware.upgrader.Upgrade(w, r, nil) if err != nil { fmt.Println("Error upgrading:", err) @@ -54,31 +65,64 @@ func (middleware *serverEventsMiddleware) handleEventStream(w http.ResponseWrite _ = conn.Close() }() id := r.URL.Query().Get("id") + context := di.Inject[IContext](id) + context.SetId(id) + // Locks the mutex before accessing 'middleware.streamSubscribers', + // to prevent race conditions when reading/writing to the map. + // This ensures that only one goroutine checks/registers at a time. + // defer can't be used here as the stream listens for messages in an infinite loop + middleware.mutex.Lock() if _, ok := middleware.streamSubscribers[id]; !ok { - subscription := emitter.OnAll(func(event Event) { - if !event.IsBackendOnly { - jsonData, jsonErr := json.Marshal(event) - if jsonErr != nil { - fmt.Println("Error make json string", jsonErr) - return + subscription := middleware.emitter.OnAll(func(ev Event) { + // trigger the Backend Handlers + handler, getHandlerErr := middleware.registration.GetHandler(ev.Type) + if getHandlerErr != nil { + println(fmt.Sprintf("no Handler found for Event %s", ev.Type)) + } else { + if handler.CanExecute(context) { + handler.Handle(context) } - _ = conn.WriteMessage(websocket.TextMessage, jsonData) } + + if ev.IsBackendOnly || ev.Filter == nil || !ev.Filter(context) { + // the Event is Backend only or there is no socket (context) Filter + // to send to all Sockets deliver context Filter with returns bool + return + } + jsonData, jsonErr := middleware.parser.ToString(ev) + if jsonErr != nil { + println(fmt.Sprintf("Error parse event %s %s", jsonErr.Error(), ev.Type)) + return + } + _ = conn.WriteMessage(websocket.TextMessage, []byte(jsonData)) }) defer func() { + // Blocks the mutex again, as this block also accesses 'middleware.streamSubscribers'. + // This is crucial to avoid race conditions when removing entries, + // while new connections may be established. + // defer cannot be used here either + middleware.mutex.Lock() subscription.Unsubscribe() delete(middleware.streamSubscribers, id) + + // starts the Cleanup Process + context.CleanupIn(middleware.options.ContextLifetime) + middleware.mutex.Unlock() }() middleware.streamSubscribers[id] = true } + middleware.mutex.Unlock() ctx := r.Context() for { - _, _, err := conn.ReadMessage() + messageType, data, err := conn.ReadMessage() if err != nil { return } + if middleware.HandleMessage(conn, messageType, data) { + return + } select { case <-ctx.Done(): return @@ -93,3 +137,31 @@ func (middleware *serverEventsMiddleware) selectMethod(path string, todo http.Ha muxer.HandleFunc(path, todo) } } + +func (middleware *serverEventsMiddleware) HandleMessage(conn *websocket.Conn, messageType int, data []byte) bool { + switch messageType { + case websocket.PingMessage: + pongErr := conn.WriteMessage(websocket.PongMessage, []byte{}) + if pongErr != nil { + println(fmt.Sprintf("error on send PongMessage: %s", pongErr.Error())) + } + break + case websocket.CloseMessage: + // return true to close the Websocket + return true + case websocket.TextMessage: + ev, parseErr := middleware.parser.FromString(string(data)) + if parseErr != nil { + println(fmt.Sprintf("error on parse Event: %s data: %s", parseErr.Error(), string(data))) + return false + } + // Event was dispatched over the Websocket so not send it back to Client! + ev.IsBackendOnly = true + middleware.emitter.Emit(ev) + break + case websocket.BinaryMessage: + println(fmt.Sprintf("BinaryMessages are not supported")) + break + } + return false +} diff --git a/v2/parser.go b/v2/parser.go new file mode 100644 index 0000000..dd8313f --- /dev/null +++ b/v2/parser.go @@ -0,0 +1,35 @@ +package serverevents + +import ( + "encoding/json" + di "git.apihub24.de/admin/generic-di" +) + +func init() { + di.Injectable(newMessageParser) +} + +type IMessageParser interface { + ToString(event Event) (string, error) + FromString(eventStr string) (Event, error) +} + +type messageParser struct{} + +func newMessageParser() IMessageParser { + return &messageParser{} +} + +func (parser *messageParser) ToString(event Event) (string, error) { + stream, marshalErr := json.Marshal(event) + if marshalErr != nil { + return "", marshalErr + } + return string(stream), nil +} + +func (parser *messageParser) FromString(eventStr string) (Event, error) { + var ev Event + unmarshalErr := json.Unmarshal([]byte(eventStr), &ev) + return ev, unmarshalErr +} diff --git a/v2/registration.go b/v2/registration.go new file mode 100644 index 0000000..0f9149e --- /dev/null +++ b/v2/registration.go @@ -0,0 +1,57 @@ +package serverevents + +import ( + "fmt" + di "git.apihub24.de/admin/generic-di" + "sync" +) + +func init() { + di.Injectable(newEventHandlerRegistration) +} + +type IEventHandlerRegistration interface { + Add(eventType string, creator func() IEventHandler) + GetHandler(eventType string) (IEventHandler, error) +} + +type eventHandlerRegistration struct { + creators map[string]func() IEventHandler + instances map[string]IEventHandler + mutex sync.RWMutex +} + +func newEventHandlerRegistration() IEventHandlerRegistration { + return &eventHandlerRegistration{ + creators: make(map[string]func() IEventHandler), + instances: make(map[string]IEventHandler), + mutex: sync.RWMutex{}, + } +} + +func (registration *eventHandlerRegistration) Add(eventType string, creator func() IEventHandler) { + registration.mutex.Lock() + defer registration.mutex.Unlock() + + if _, ok := registration.instances[eventType]; ok { + delete(registration.instances, eventType) + } + if _, ok := registration.creators[eventType]; ok { + delete(registration.creators, eventType) + } + registration.creators[eventType] = creator +} + +func (registration *eventHandlerRegistration) GetHandler(eventType string) (IEventHandler, error) { + registration.mutex.RLock() + defer registration.mutex.RUnlock() + + if creator, ok := registration.creators[eventType]; ok { + if instance, ok := registration.instances[eventType]; ok { + return instance, nil + } + registration.instances[eventType] = creator() + return registration.instances[eventType], nil + } + return nil, fmt.Errorf("no instance of IEventHandler registered for Event %s! please call Registration Add in a init function to register your EventHandler", eventType) +}