init
This commit is contained in:
commit
1e2b3a2e5c
1
.gitignore
vendored
Normal file
1
.gitignore
vendored
Normal file
@ -0,0 +1 @@
|
||||
.vscode
|
||||
106
README.md
Normal file
106
README.md
Normal file
@ -0,0 +1,106 @@
|
||||
# Server Events
|
||||
|
||||
## How To
|
||||
|
||||
In this Example i use the generic-di package to Inject a Service into a Event.
|
||||
first you have to create some Event Handler:
|
||||
|
||||
services/external_service.go
|
||||
|
||||
```go
|
||||
package services
|
||||
|
||||
import (
|
||||
di "git.apihub24.de/admin/generic-di"
|
||||
)
|
||||
|
||||
func init() {
|
||||
di.Injectable(newExternalService)
|
||||
}
|
||||
|
||||
// this Service is only a Example to show how to use a real world authentication service!
|
||||
type IExternalService interface {
|
||||
CheckLogin(string, string) bool
|
||||
}
|
||||
|
||||
type externalService struct {}
|
||||
|
||||
func newExternalService() IExternalService {
|
||||
return &externalService{}
|
||||
}
|
||||
|
||||
func (service *externalService) CheckLogin(name, password string) bool {
|
||||
return len(name) > 0 && len(password) > 0
|
||||
}
|
||||
```
|
||||
|
||||
events/login_event.go
|
||||
|
||||
```go
|
||||
package events
|
||||
|
||||
import (
|
||||
|
||||
di "git.apihub24.de/admin/generic-di"
|
||||
)
|
||||
|
||||
// a Struct of the Login success EventData
|
||||
type UserLogin struct {
|
||||
UserName string `json:"userName"`
|
||||
}
|
||||
|
||||
// a Struct of the Login EventData
|
||||
type LoginData struct {
|
||||
Login string `json:"login"`
|
||||
Password string `json:"password"`
|
||||
}
|
||||
|
||||
type LoginEvent struct {
|
||||
someExternalService services.IExternalService
|
||||
}
|
||||
|
||||
func NewLoginEvent() *LoginEvent {
|
||||
return &LoginEvent{
|
||||
someExternalService: di.Inject[services.IExternalService]()
|
||||
}
|
||||
}
|
||||
|
||||
// implement a Method to get the EventName String
|
||||
func (event *LoginEvent) GetEventName() string {
|
||||
return "login"
|
||||
}
|
||||
|
||||
// implement a Method to check if the Event can Execute
|
||||
func (event *LoginEvent) CanExecute(ctx *serverevents.Context) bool {
|
||||
return true
|
||||
}
|
||||
|
||||
// implement a Method to send non Authorized Response
|
||||
func (event *UserLoginEvent) SendNotAuthorizedResponse(ctx *serverevents.Context) {
|
||||
}
|
||||
|
||||
// implement a Method to Handle a Event
|
||||
func (event *UserLoginEvent) Handle(ctx *serverevents.Context) {
|
||||
data, dataParseErr := serverevents.GetEventData[LoginData](ctx)
|
||||
if dataParseErr != nil {
|
||||
ctx.Send(http.StatusInternalServerError, "text/plain", []byte(dataParseErr.Error()))
|
||||
return
|
||||
}
|
||||
|
||||
if !event.someExternalService.CheckLogin(data.Login, data.Password) {
|
||||
// send a new Event to the Client
|
||||
ctx.Emit(serverevents.Event{
|
||||
Type: "login fail",
|
||||
Data: "login fails!",
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
ctx.Emit(serverevents.Event{
|
||||
Type: "login success",
|
||||
Data: UserLogin{
|
||||
UserName: "some Username",
|
||||
},
|
||||
})
|
||||
}
|
||||
```
|
||||
106
context.go
Normal file
106
context.go
Normal file
@ -0,0 +1,106 @@
|
||||
package serverevents
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
)
|
||||
|
||||
type Context struct {
|
||||
method string
|
||||
eventName string
|
||||
eventData any
|
||||
meta map[string]any
|
||||
eventBefore *Event
|
||||
emitter *EventEmitter
|
||||
w http.ResponseWriter
|
||||
r *http.Request
|
||||
}
|
||||
|
||||
func NewServerEventContext(w http.ResponseWriter, r *http.Request, eventBefore *Event) *Context {
|
||||
context := Context{
|
||||
eventBefore: eventBefore,
|
||||
meta: make(map[string]any),
|
||||
w: w,
|
||||
r: r,
|
||||
emitter: GetEventEmitter(),
|
||||
}
|
||||
context.method = r.Method
|
||||
context.eventName = r.PathValue("event")
|
||||
return &context
|
||||
}
|
||||
|
||||
func (ctx *Context) Emit(event Event) {
|
||||
if ctx.emitter == nil {
|
||||
return
|
||||
}
|
||||
ctx.emitter.Emit(event)
|
||||
}
|
||||
|
||||
func (ctx *Context) GetRequest() *http.Request {
|
||||
return ctx.r
|
||||
}
|
||||
|
||||
func (ctx *Context) GetResponseWriter() http.ResponseWriter {
|
||||
return ctx.w
|
||||
}
|
||||
|
||||
func (ctx *Context) GetEventName() string {
|
||||
return ctx.eventName
|
||||
}
|
||||
|
||||
func (ctx *Context) GetMethod() string {
|
||||
return ctx.method
|
||||
}
|
||||
|
||||
func (ctx *Context) GetHeaderValue(key string) string {
|
||||
return ctx.r.Header.Get(key)
|
||||
}
|
||||
|
||||
func (ctx *Context) Send(status int, contentType string, data []byte) {
|
||||
if len(contentType) > 0 {
|
||||
ctx.w.Header().Add("Content-Type", contentType)
|
||||
}
|
||||
ctx.w.WriteHeader(status)
|
||||
ctx.w.Write(data)
|
||||
}
|
||||
|
||||
func GetEventData[T any](ctx *Context) (T, error) {
|
||||
var result T
|
||||
var ok bool
|
||||
if ctx.eventData != nil {
|
||||
result, ok = ctx.eventData.(T)
|
||||
if ok {
|
||||
return result, nil
|
||||
}
|
||||
}
|
||||
stream, readBodyErr := io.ReadAll(ctx.r.Body)
|
||||
if readBodyErr != nil {
|
||||
return result, readBodyErr
|
||||
}
|
||||
parseErr := json.Unmarshal(stream, &result)
|
||||
if parseErr != nil {
|
||||
return result, parseErr
|
||||
}
|
||||
ctx.eventData = result
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func SetContextValue[T any](ctx *Context, key string, value T) {
|
||||
ctx.meta[key] = value
|
||||
}
|
||||
|
||||
func GetContextValue[T any](ctx *Context, key string) (T, error) {
|
||||
var result T
|
||||
var ok bool
|
||||
tmp := ctx.meta[key]
|
||||
if tmp == nil {
|
||||
return result, fmt.Errorf("no value with key %s found", key)
|
||||
}
|
||||
result, ok = tmp.(T)
|
||||
if !ok {
|
||||
return result, fmt.Errorf("value has expect to be a other DataType %s", key)
|
||||
}
|
||||
return result, nil
|
||||
}
|
||||
6
effect.go
Normal file
6
effect.go
Normal file
@ -0,0 +1,6 @@
|
||||
package serverevents
|
||||
|
||||
type Effect interface {
|
||||
OnEvent() string
|
||||
Execute(*Context)
|
||||
}
|
||||
99
emitter.go
Normal file
99
emitter.go
Normal file
@ -0,0 +1,99 @@
|
||||
package serverevents
|
||||
|
||||
import (
|
||||
"slices"
|
||||
"sync"
|
||||
|
||||
"github.com/google/uuid"
|
||||
)
|
||||
|
||||
var emitter *EventEmitter
|
||||
|
||||
type Subscription struct {
|
||||
id string
|
||||
eventType string
|
||||
todo func(Event)
|
||||
emitter *EventEmitter
|
||||
}
|
||||
|
||||
func (sub Subscription) Unsubscribe() {
|
||||
if len(sub.eventType) < 1 {
|
||||
emitter.onAllSubscribers = slices.DeleteFunc(emitter.onAllSubscribers, func(existSub Subscription) bool {
|
||||
return existSub.id == sub.id
|
||||
})
|
||||
return
|
||||
}
|
||||
eventSubscribers := emitter.subscribers[sub.eventType]
|
||||
if eventSubscribers == nil || len(eventSubscribers) < 1 {
|
||||
return
|
||||
}
|
||||
emitter.subscribers[sub.eventType] = slices.DeleteFunc(emitter.subscribers[sub.eventType], func(existSub Subscription) bool {
|
||||
return existSub.id == sub.id
|
||||
})
|
||||
}
|
||||
|
||||
type Event struct {
|
||||
Type string `json:"type"`
|
||||
Data any `json:"data"`
|
||||
IsBackendOnly bool `json:"-"`
|
||||
}
|
||||
|
||||
type EventEmitter struct {
|
||||
subscribers map[string][]Subscription
|
||||
onAllSubscribers []Subscription
|
||||
mu sync.RWMutex
|
||||
}
|
||||
|
||||
func GetEventEmitter() *EventEmitter {
|
||||
if emitter == nil {
|
||||
emitter = &EventEmitter{
|
||||
subscribers: make(map[string][]Subscription),
|
||||
onAllSubscribers: make([]Subscription, 0),
|
||||
}
|
||||
}
|
||||
return emitter
|
||||
}
|
||||
|
||||
func (em *EventEmitter) On(eventType string, do func(Event)) Subscription {
|
||||
em.mu.Lock()
|
||||
defer em.mu.Unlock()
|
||||
|
||||
if em.subscribers[eventType] == nil {
|
||||
em.subscribers[eventType] = make([]Subscription, 0)
|
||||
}
|
||||
sub := Subscription{
|
||||
id: uuid.NewString(),
|
||||
eventType: eventType,
|
||||
todo: do,
|
||||
emitter: em,
|
||||
}
|
||||
em.subscribers[eventType] = append(em.subscribers[eventType], sub)
|
||||
return sub
|
||||
}
|
||||
|
||||
func (em *EventEmitter) OnAll(do func(Event)) Subscription {
|
||||
em.mu.Lock()
|
||||
defer em.mu.Unlock()
|
||||
|
||||
sub := Subscription{
|
||||
id: uuid.NewString(),
|
||||
todo: do,
|
||||
emitter: em,
|
||||
}
|
||||
em.onAllSubscribers = append(em.onAllSubscribers, sub)
|
||||
return sub
|
||||
}
|
||||
|
||||
func (em *EventEmitter) Emit(event Event) {
|
||||
executeSubscribers(em.onAllSubscribers, event)
|
||||
eventSubscribers := em.subscribers[event.Type]
|
||||
if eventSubscribers != nil {
|
||||
executeSubscribers(eventSubscribers, event)
|
||||
}
|
||||
}
|
||||
|
||||
func executeSubscribers(toDos []Subscription, event Event) {
|
||||
for _, sub := range toDos {
|
||||
sub.todo(event)
|
||||
}
|
||||
}
|
||||
8
go.mod
Normal file
8
go.mod
Normal file
@ -0,0 +1,8 @@
|
||||
module git.apihub24.de/admin/server_events
|
||||
|
||||
go 1.22.0
|
||||
|
||||
require (
|
||||
github.com/google/uuid v1.6.0
|
||||
github.com/gorilla/websocket v1.5.3
|
||||
)
|
||||
4
go.sum
Normal file
4
go.sum
Normal file
@ -0,0 +1,4 @@
|
||||
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
|
||||
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
|
||||
github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg=
|
||||
github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
|
||||
8
handler.go
Normal file
8
handler.go
Normal file
@ -0,0 +1,8 @@
|
||||
package serverevents
|
||||
|
||||
type EventHandler interface {
|
||||
GetEventName() string
|
||||
CanExecute(*Context) bool
|
||||
SendNotAuthorizedResponse(*Context)
|
||||
Handle(*Context)
|
||||
}
|
||||
124
middleware.go
Normal file
124
middleware.go
Normal file
@ -0,0 +1,124 @@
|
||||
package serverevents
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net/http"
|
||||
|
||||
"github.com/gorilla/websocket"
|
||||
)
|
||||
|
||||
var streamSubscribers = make(map[string]bool)
|
||||
var eventHandlerStore = make(map[string]EventHandler)
|
||||
var effectStore = make(map[string]Effect)
|
||||
var upgrader = websocket.Upgrader{
|
||||
CheckOrigin: func(r *http.Request) bool {
|
||||
return true
|
||||
},
|
||||
}
|
||||
|
||||
func RegisterEvents(events []EventHandler, effects []Effect, muxer *http.ServeMux) {
|
||||
for _, event := range events {
|
||||
eventHandlerStore[event.GetEventName()] = event
|
||||
}
|
||||
for _, effect := range effects {
|
||||
effectStore[effect.OnEvent()] = effect
|
||||
}
|
||||
registerRoute(muxer)
|
||||
registerSender(muxer)
|
||||
}
|
||||
|
||||
func registerSender(muxer *http.ServeMux) {
|
||||
emitter := GetEventEmitter()
|
||||
selectMethod("/event/stream", func(w http.ResponseWriter, r *http.Request) {
|
||||
handleEventStream(w, r, emitter)
|
||||
}, muxer)
|
||||
}
|
||||
|
||||
func handleEventStream(w http.ResponseWriter, r *http.Request, emitter *EventEmitter) {
|
||||
conn, err := upgrader.Upgrade(w, r, nil)
|
||||
if err != nil {
|
||||
fmt.Println("Error upgrading:", err)
|
||||
return
|
||||
}
|
||||
defer conn.Close()
|
||||
id := r.URL.Query().Get("id")
|
||||
|
||||
if _, ok := streamSubscribers[id]; !ok {
|
||||
subscription := emitter.OnAll(func(event Event) {
|
||||
triggerEffect(event.Type, NewServerEventContext(w, r, &event))
|
||||
if !event.IsBackendOnly {
|
||||
jsonData, jsonErr := json.Marshal(event)
|
||||
if jsonErr != nil {
|
||||
fmt.Println("Error make json string", jsonErr)
|
||||
return
|
||||
}
|
||||
_ = conn.WriteMessage(websocket.TextMessage, jsonData)
|
||||
}
|
||||
})
|
||||
defer func() {
|
||||
subscription.Unsubscribe()
|
||||
delete(streamSubscribers, id)
|
||||
}()
|
||||
streamSubscribers[id] = true
|
||||
}
|
||||
|
||||
ctx := r.Context()
|
||||
for {
|
||||
_, _, err := conn.ReadMessage()
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func registerRoute(muxer *http.ServeMux) {
|
||||
selectMethod("/event/{event}", func(w http.ResponseWriter, r *http.Request) {
|
||||
handleEvent(w, r)
|
||||
}, muxer)
|
||||
}
|
||||
|
||||
func handleEvent(w http.ResponseWriter, r *http.Request) {
|
||||
context := NewServerEventContext(w, r, nil)
|
||||
handler := eventHandlerStore[context.GetEventName()]
|
||||
if handler == nil {
|
||||
return
|
||||
}
|
||||
if !validateRequest(context) {
|
||||
return
|
||||
}
|
||||
if !handler.CanExecute(context) {
|
||||
handler.SendNotAuthorizedResponse(context)
|
||||
return
|
||||
}
|
||||
handler.Handle(context)
|
||||
triggerEffect(context.GetEventName(), context)
|
||||
}
|
||||
|
||||
func validateRequest(context *Context) bool {
|
||||
canExecute := true
|
||||
if context.GetMethod() != "POST" {
|
||||
canExecute = false
|
||||
context.Send(http.StatusMethodNotAllowed, "text/plain", []byte("only POST Method allowed!"))
|
||||
}
|
||||
return canExecute
|
||||
}
|
||||
|
||||
func selectMethod(path string, todo http.HandlerFunc, muxer *http.ServeMux) {
|
||||
if muxer == nil {
|
||||
http.HandleFunc(path, todo)
|
||||
} else {
|
||||
muxer.HandleFunc(path, todo)
|
||||
}
|
||||
}
|
||||
|
||||
func triggerEffect(eventName string, context *Context) {
|
||||
effect := effectStore[eventName]
|
||||
if effect != nil {
|
||||
effect.Execute(context)
|
||||
}
|
||||
}
|
||||
Loading…
x
Reference in New Issue
Block a user