commit 1e2b3a2e5c7756b53f24b7a062dd3723dc1bf318 Author: admin Date: Mon Jun 23 21:03:52 2025 +0200 init diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..600d2d3 --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +.vscode \ No newline at end of file diff --git a/README.md b/README.md new file mode 100644 index 0000000..11589cd --- /dev/null +++ b/README.md @@ -0,0 +1,106 @@ +# Server Events + +## How To + +In this Example i use the generic-di package to Inject a Service into a Event. +first you have to create some Event Handler: + +services/external_service.go + +```go +package services + +import ( + di "git.apihub24.de/admin/generic-di" +) + +func init() { + di.Injectable(newExternalService) +} + +// this Service is only a Example to show how to use a real world authentication service! +type IExternalService interface { + CheckLogin(string, string) bool +} + +type externalService struct {} + +func newExternalService() IExternalService { + return &externalService{} +} + +func (service *externalService) CheckLogin(name, password string) bool { + return len(name) > 0 && len(password) > 0 +} +``` + +events/login_event.go + +```go +package events + +import ( + + di "git.apihub24.de/admin/generic-di" +) + +// a Struct of the Login success EventData +type UserLogin struct { + UserName string `json:"userName"` +} + +// a Struct of the Login EventData +type LoginData struct { + Login string `json:"login"` + Password string `json:"password"` +} + +type LoginEvent struct { + someExternalService services.IExternalService +} + +func NewLoginEvent() *LoginEvent { + return &LoginEvent{ + someExternalService: di.Inject[services.IExternalService]() + } +} + +// implement a Method to get the EventName String +func (event *LoginEvent) GetEventName() string { + return "login" +} + +// implement a Method to check if the Event can Execute +func (event *LoginEvent) CanExecute(ctx *serverevents.Context) bool { + return true +} + +// implement a Method to send non Authorized Response +func (event *UserLoginEvent) SendNotAuthorizedResponse(ctx *serverevents.Context) { +} + +// implement a Method to Handle a Event +func (event *UserLoginEvent) Handle(ctx *serverevents.Context) { + data, dataParseErr := serverevents.GetEventData[LoginData](ctx) + if dataParseErr != nil { + ctx.Send(http.StatusInternalServerError, "text/plain", []byte(dataParseErr.Error())) + return + } + + if !event.someExternalService.CheckLogin(data.Login, data.Password) { + // send a new Event to the Client + ctx.Emit(serverevents.Event{ + Type: "login fail", + Data: "login fails!", + }) + return + } + + ctx.Emit(serverevents.Event{ + Type: "login success", + Data: UserLogin{ + UserName: "some Username", + }, + }) +} +``` diff --git a/context.go b/context.go new file mode 100644 index 0000000..537b17c --- /dev/null +++ b/context.go @@ -0,0 +1,106 @@ +package serverevents + +import ( + "encoding/json" + "fmt" + "io" + "net/http" +) + +type Context struct { + method string + eventName string + eventData any + meta map[string]any + eventBefore *Event + emitter *EventEmitter + w http.ResponseWriter + r *http.Request +} + +func NewServerEventContext(w http.ResponseWriter, r *http.Request, eventBefore *Event) *Context { + context := Context{ + eventBefore: eventBefore, + meta: make(map[string]any), + w: w, + r: r, + emitter: GetEventEmitter(), + } + context.method = r.Method + context.eventName = r.PathValue("event") + return &context +} + +func (ctx *Context) Emit(event Event) { + if ctx.emitter == nil { + return + } + ctx.emitter.Emit(event) +} + +func (ctx *Context) GetRequest() *http.Request { + return ctx.r +} + +func (ctx *Context) GetResponseWriter() http.ResponseWriter { + return ctx.w +} + +func (ctx *Context) GetEventName() string { + return ctx.eventName +} + +func (ctx *Context) GetMethod() string { + return ctx.method +} + +func (ctx *Context) GetHeaderValue(key string) string { + return ctx.r.Header.Get(key) +} + +func (ctx *Context) Send(status int, contentType string, data []byte) { + if len(contentType) > 0 { + ctx.w.Header().Add("Content-Type", contentType) + } + ctx.w.WriteHeader(status) + ctx.w.Write(data) +} + +func GetEventData[T any](ctx *Context) (T, error) { + var result T + var ok bool + if ctx.eventData != nil { + result, ok = ctx.eventData.(T) + if ok { + return result, nil + } + } + stream, readBodyErr := io.ReadAll(ctx.r.Body) + if readBodyErr != nil { + return result, readBodyErr + } + parseErr := json.Unmarshal(stream, &result) + if parseErr != nil { + return result, parseErr + } + ctx.eventData = result + return result, nil +} + +func SetContextValue[T any](ctx *Context, key string, value T) { + ctx.meta[key] = value +} + +func GetContextValue[T any](ctx *Context, key string) (T, error) { + var result T + var ok bool + tmp := ctx.meta[key] + if tmp == nil { + return result, fmt.Errorf("no value with key %s found", key) + } + result, ok = tmp.(T) + if !ok { + return result, fmt.Errorf("value has expect to be a other DataType %s", key) + } + return result, nil +} diff --git a/effect.go b/effect.go new file mode 100644 index 0000000..028061d --- /dev/null +++ b/effect.go @@ -0,0 +1,6 @@ +package serverevents + +type Effect interface { + OnEvent() string + Execute(*Context) +} diff --git a/emitter.go b/emitter.go new file mode 100644 index 0000000..603e976 --- /dev/null +++ b/emitter.go @@ -0,0 +1,99 @@ +package serverevents + +import ( + "slices" + "sync" + + "github.com/google/uuid" +) + +var emitter *EventEmitter + +type Subscription struct { + id string + eventType string + todo func(Event) + emitter *EventEmitter +} + +func (sub Subscription) Unsubscribe() { + if len(sub.eventType) < 1 { + emitter.onAllSubscribers = slices.DeleteFunc(emitter.onAllSubscribers, func(existSub Subscription) bool { + return existSub.id == sub.id + }) + return + } + eventSubscribers := emitter.subscribers[sub.eventType] + if eventSubscribers == nil || len(eventSubscribers) < 1 { + return + } + emitter.subscribers[sub.eventType] = slices.DeleteFunc(emitter.subscribers[sub.eventType], func(existSub Subscription) bool { + return existSub.id == sub.id + }) +} + +type Event struct { + Type string `json:"type"` + Data any `json:"data"` + IsBackendOnly bool `json:"-"` +} + +type EventEmitter struct { + subscribers map[string][]Subscription + onAllSubscribers []Subscription + mu sync.RWMutex +} + +func GetEventEmitter() *EventEmitter { + if emitter == nil { + emitter = &EventEmitter{ + subscribers: make(map[string][]Subscription), + onAllSubscribers: make([]Subscription, 0), + } + } + return emitter +} + +func (em *EventEmitter) On(eventType string, do func(Event)) Subscription { + em.mu.Lock() + defer em.mu.Unlock() + + if em.subscribers[eventType] == nil { + em.subscribers[eventType] = make([]Subscription, 0) + } + sub := Subscription{ + id: uuid.NewString(), + eventType: eventType, + todo: do, + emitter: em, + } + em.subscribers[eventType] = append(em.subscribers[eventType], sub) + return sub +} + +func (em *EventEmitter) OnAll(do func(Event)) Subscription { + em.mu.Lock() + defer em.mu.Unlock() + + sub := Subscription{ + id: uuid.NewString(), + todo: do, + emitter: em, + } + em.onAllSubscribers = append(em.onAllSubscribers, sub) + return sub +} + +func (em *EventEmitter) Emit(event Event) { + executeSubscribers(em.onAllSubscribers, event) + eventSubscribers := em.subscribers[event.Type] + if eventSubscribers != nil { + executeSubscribers(eventSubscribers, event) + } +} + +func executeSubscribers(toDos []Subscription, event Event) { + for _, sub := range toDos { + sub.todo(event) + } +} diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..574b659 --- /dev/null +++ b/go.mod @@ -0,0 +1,8 @@ +module git.apihub24.de/admin/server_events + +go 1.22.0 + +require ( + github.com/google/uuid v1.6.0 + github.com/gorilla/websocket v1.5.3 +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..73bbf57 --- /dev/null +++ b/go.sum @@ -0,0 +1,4 @@ +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg= +github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= diff --git a/handler.go b/handler.go new file mode 100644 index 0000000..d59f1ab --- /dev/null +++ b/handler.go @@ -0,0 +1,8 @@ +package serverevents + +type EventHandler interface { + GetEventName() string + CanExecute(*Context) bool + SendNotAuthorizedResponse(*Context) + Handle(*Context) +} diff --git a/middleware.go b/middleware.go new file mode 100644 index 0000000..2191226 --- /dev/null +++ b/middleware.go @@ -0,0 +1,124 @@ +package serverevents + +import ( + "encoding/json" + "fmt" + "net/http" + + "github.com/gorilla/websocket" +) + +var streamSubscribers = make(map[string]bool) +var eventHandlerStore = make(map[string]EventHandler) +var effectStore = make(map[string]Effect) +var upgrader = websocket.Upgrader{ + CheckOrigin: func(r *http.Request) bool { + return true + }, +} + +func RegisterEvents(events []EventHandler, effects []Effect, muxer *http.ServeMux) { + for _, event := range events { + eventHandlerStore[event.GetEventName()] = event + } + for _, effect := range effects { + effectStore[effect.OnEvent()] = effect + } + registerRoute(muxer) + registerSender(muxer) +} + +func registerSender(muxer *http.ServeMux) { + emitter := GetEventEmitter() + selectMethod("/event/stream", func(w http.ResponseWriter, r *http.Request) { + handleEventStream(w, r, emitter) + }, muxer) +} + +func handleEventStream(w http.ResponseWriter, r *http.Request, emitter *EventEmitter) { + conn, err := upgrader.Upgrade(w, r, nil) + if err != nil { + fmt.Println("Error upgrading:", err) + return + } + defer conn.Close() + id := r.URL.Query().Get("id") + + if _, ok := streamSubscribers[id]; !ok { + subscription := emitter.OnAll(func(event Event) { + triggerEffect(event.Type, NewServerEventContext(w, r, &event)) + if !event.IsBackendOnly { + jsonData, jsonErr := json.Marshal(event) + if jsonErr != nil { + fmt.Println("Error make json string", jsonErr) + return + } + _ = conn.WriteMessage(websocket.TextMessage, jsonData) + } + }) + defer func() { + subscription.Unsubscribe() + delete(streamSubscribers, id) + }() + streamSubscribers[id] = true + } + + ctx := r.Context() + for { + _, _, err := conn.ReadMessage() + if err != nil { + return + } + select { + case <-ctx.Done(): + return + } + } +} + +func registerRoute(muxer *http.ServeMux) { + selectMethod("/event/{event}", func(w http.ResponseWriter, r *http.Request) { + handleEvent(w, r) + }, muxer) +} + +func handleEvent(w http.ResponseWriter, r *http.Request) { + context := NewServerEventContext(w, r, nil) + handler := eventHandlerStore[context.GetEventName()] + if handler == nil { + return + } + if !validateRequest(context) { + return + } + if !handler.CanExecute(context) { + handler.SendNotAuthorizedResponse(context) + return + } + handler.Handle(context) + triggerEffect(context.GetEventName(), context) +} + +func validateRequest(context *Context) bool { + canExecute := true + if context.GetMethod() != "POST" { + canExecute = false + context.Send(http.StatusMethodNotAllowed, "text/plain", []byte("only POST Method allowed!")) + } + return canExecute +} + +func selectMethod(path string, todo http.HandlerFunc, muxer *http.ServeMux) { + if muxer == nil { + http.HandleFunc(path, todo) + } else { + muxer.HandleFunc(path, todo) + } +} + +func triggerEffect(eventName string, context *Context) { + effect := effectStore[eventName] + if effect != nil { + effect.Execute(context) + } +}