server_events/v2/middleware.go

96 lines
2.1 KiB
Go

package serverevents
import (
"encoding/json"
"fmt"
di "git.apihub24.de/admin/generic-di"
"github.com/gorilla/websocket"
"net/http"
)
func init() {
di.Injectable(newServerEventsMiddleware)
}
type MiddlewareOptions struct {
Path string
}
type IServerEventsMiddleware interface {
Use(MiddlewareOptions, *http.ServeMux)
}
type serverEventsMiddleware struct {
upgrader websocket.Upgrader
streamSubscribers map[string]bool
}
func newServerEventsMiddleware() IServerEventsMiddleware {
return &serverEventsMiddleware{
upgrader: websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool {
return true
},
},
streamSubscribers: make(map[string]bool),
}
}
func (middleware *serverEventsMiddleware) Use(options MiddlewareOptions, muxer *http.ServeMux) {
middleware.selectMethod(options.Path, func(w http.ResponseWriter, r *http.Request) {
middleware.handleEventStream(w, r)
}, muxer)
}
func (middleware *serverEventsMiddleware) handleEventStream(w http.ResponseWriter, r *http.Request) {
emitter := di.Inject[IEventEmitter]()
conn, err := middleware.upgrader.Upgrade(w, r, nil)
if err != nil {
fmt.Println("Error upgrading:", err)
return
}
defer func() {
// do not handle Error
_ = conn.Close()
}()
id := r.URL.Query().Get("id")
if _, ok := middleware.streamSubscribers[id]; !ok {
subscription := emitter.OnAll(func(event Event) {
if !event.IsBackendOnly {
jsonData, jsonErr := json.Marshal(event)
if jsonErr != nil {
fmt.Println("Error make json string", jsonErr)
return
}
_ = conn.WriteMessage(websocket.TextMessage, jsonData)
}
})
defer func() {
subscription.Unsubscribe()
delete(middleware.streamSubscribers, id)
}()
middleware.streamSubscribers[id] = true
}
ctx := r.Context()
for {
_, _, err := conn.ReadMessage()
if err != nil {
return
}
select {
case <-ctx.Done():
return
}
}
}
func (middleware *serverEventsMiddleware) selectMethod(path string, todo http.HandlerFunc, muxer *http.ServeMux) {
if muxer == nil {
http.HandleFunc(path, todo)
} else {
muxer.HandleFunc(path, todo)
}
}