diff --git a/.gitignore b/.gitignore index 600d2d3..8cd0df3 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,2 @@ -.vscode \ No newline at end of file +.vscode +.idea \ No newline at end of file diff --git a/v2/emitter.go b/v2/emitter.go new file mode 100644 index 0000000..d113f30 --- /dev/null +++ b/v2/emitter.go @@ -0,0 +1,89 @@ +package serverevents + +import ( + di "git.apihub24.de/admin/generic-di" + "github.com/google/uuid" + "slices" + "sync" +) + +func init() { + di.Injectable(newEventEmitter) +} + +type IEventEmitter interface { + On(eventType string, do func(Event)) Subscription + OnAll(do func(Event)) Subscription + Emit(event Event) + Unsubscribe(Subscription) +} + +type eventEmitter struct { + subscribers map[string][]Subscription + onAllSubscribers []Subscription + mu sync.RWMutex +} + +func newEventEmitter() IEventEmitter { + return &eventEmitter{} +} + +func (emitter *eventEmitter) On(eventType string, do func(Event)) Subscription { + emitter.mu.Lock() + defer emitter.mu.Unlock() + + if emitter.subscribers[eventType] == nil { + emitter.subscribers[eventType] = make([]Subscription, 0) + } + sub := Subscription{ + id: uuid.NewString(), + eventType: eventType, + todo: do, + emitter: emitter, + } + emitter.subscribers[eventType] = append(emitter.subscribers[eventType], sub) + return sub +} + +func (emitter *eventEmitter) OnAll(do func(Event)) Subscription { + emitter.mu.Lock() + defer emitter.mu.Unlock() + + sub := Subscription{ + id: uuid.NewString(), + todo: do, + emitter: emitter, + } + emitter.onAllSubscribers = append(emitter.onAllSubscribers, sub) + return sub +} + +func (emitter *eventEmitter) Emit(event Event) { + emitter.executeSubscribers(emitter.onAllSubscribers, event) + eventSubscribers := emitter.subscribers[event.Type] + if eventSubscribers != nil { + emitter.executeSubscribers(eventSubscribers, event) + } +} + +func (emitter *eventEmitter) Unsubscribe(subscription Subscription) { + if len(subscription.eventType) < 1 { + emitter.onAllSubscribers = slices.DeleteFunc(emitter.onAllSubscribers, func(existSub Subscription) bool { + return existSub.id == subscription.id + }) + return + } + eventSubscribers := emitter.subscribers[subscription.eventType] + if eventSubscribers == nil || len(eventSubscribers) < 1 { + return + } + emitter.subscribers[subscription.eventType] = slices.DeleteFunc(emitter.subscribers[subscription.eventType], func(existSub Subscription) bool { + return existSub.id == subscription.id + }) +} + +func (emitter *eventEmitter) executeSubscribers(toDos []Subscription, event Event) { + for _, sub := range toDos { + sub.todo(event) + } +} diff --git a/v2/event.go b/v2/event.go new file mode 100644 index 0000000..abdcb22 --- /dev/null +++ b/v2/event.go @@ -0,0 +1,7 @@ +package serverevents + +type Event struct { + Type string `json:"type"` + Data any `json:"data"` + IsBackendOnly bool `json:"-"` +} diff --git a/v2/go.mod b/v2/go.mod new file mode 100644 index 0000000..0d7cf41 --- /dev/null +++ b/v2/go.mod @@ -0,0 +1,9 @@ +module git.apihub24.de/admin/server_events/v2 + +go 1.22.0 + +require ( + git.apihub24.de/admin/generic-di v1.4.0 + github.com/google/uuid v1.6.0 + github.com/gorilla/websocket v1.5.3 +) diff --git a/v2/go.sum b/v2/go.sum new file mode 100644 index 0000000..6adcfb3 --- /dev/null +++ b/v2/go.sum @@ -0,0 +1,6 @@ +git.apihub24.de/admin/generic-di v1.4.0 h1:0mQnpAcavMLBcnF5UO+tUI7abZ6zQPleqPsjEk3WIaU= +git.apihub24.de/admin/generic-di v1.4.0/go.mod h1:VcHV8MOb1qhwabHdO09CpjEg2VaDesehul86g1iyOxY= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg= +github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= diff --git a/v2/middleware.go b/v2/middleware.go new file mode 100644 index 0000000..44eb915 --- /dev/null +++ b/v2/middleware.go @@ -0,0 +1,95 @@ +package serverevents + +import ( + "encoding/json" + "fmt" + di "git.apihub24.de/admin/generic-di" + "github.com/gorilla/websocket" + "net/http" +) + +func init() { + di.Injectable(newServerEventsMiddleware) +} + +type MiddlewareOptions struct { + Path string +} + +type IServerEventsMiddleware interface { + Use(MiddlewareOptions, *http.ServeMux) +} + +type serverEventsMiddleware struct { + upgrader websocket.Upgrader + streamSubscribers map[string]bool +} + +func newServerEventsMiddleware() IServerEventsMiddleware { + return &serverEventsMiddleware{ + upgrader: websocket.Upgrader{ + CheckOrigin: func(r *http.Request) bool { + return true + }, + }, + streamSubscribers: make(map[string]bool), + } +} + +func (middleware *serverEventsMiddleware) Use(options MiddlewareOptions, muxer *http.ServeMux) { + middleware.selectMethod(options.Path, func(w http.ResponseWriter, r *http.Request) { + middleware.handleEventStream(w, r) + }, muxer) +} + +func (middleware *serverEventsMiddleware) handleEventStream(w http.ResponseWriter, r *http.Request) { + emitter := di.Inject[IEventEmitter]() + conn, err := middleware.upgrader.Upgrade(w, r, nil) + if err != nil { + fmt.Println("Error upgrading:", err) + return + } + defer func() { + // do not handle Error + _ = conn.Close() + }() + id := r.URL.Query().Get("id") + + if _, ok := middleware.streamSubscribers[id]; !ok { + subscription := emitter.OnAll(func(event Event) { + if !event.IsBackendOnly { + jsonData, jsonErr := json.Marshal(event) + if jsonErr != nil { + fmt.Println("Error make json string", jsonErr) + return + } + _ = conn.WriteMessage(websocket.TextMessage, jsonData) + } + }) + defer func() { + subscription.Unsubscribe() + delete(middleware.streamSubscribers, id) + }() + middleware.streamSubscribers[id] = true + } + + ctx := r.Context() + for { + _, _, err := conn.ReadMessage() + if err != nil { + return + } + select { + case <-ctx.Done(): + return + } + } +} + +func (middleware *serverEventsMiddleware) selectMethod(path string, todo http.HandlerFunc, muxer *http.ServeMux) { + if muxer == nil { + http.HandleFunc(path, todo) + } else { + muxer.HandleFunc(path, todo) + } +} diff --git a/v2/subscription.go b/v2/subscription.go new file mode 100644 index 0000000..5035156 --- /dev/null +++ b/v2/subscription.go @@ -0,0 +1,12 @@ +package serverevents + +type Subscription struct { + id string + eventType string + todo func(Event) + emitter IEventEmitter +} + +func (subscription Subscription) Unsubscribe() { + subscription.emitter.Unsubscribe(subscription) +}