Compare commits

..

6 Commits
v2.0.0 ... main

Author SHA1 Message Date
7dc2fb0e77 allow custom cleanups 2025-07-25 14:40:04 +02:00
c6bf54943a better handler registration and dispatch 2025-07-10 10:23:03 +02:00
f27890e55e better ValueAs 2025-07-09 23:44:57 +02:00
a5f7a8c2ae fix warning in test 2025-07-09 23:29:16 +02:00
bc4e727b1f try coder websocket 2025-07-09 23:28:48 +02:00
09136b0ca6 add event data 2025-07-06 21:44:53 +02:00
23 changed files with 559 additions and 327 deletions

View File

@ -1,9 +1,12 @@
package serverevents package serverevents
import ( import (
di "git.apihub24.de/admin/generic-di" gocontext "context"
"sync" "sync"
"time" "time"
di "git.apihub24.de/admin/generic-di"
"github.com/coder/websocket"
) )
func init() { func init() {
@ -13,11 +16,13 @@ func init() {
type IContext interface { type IContext interface {
SetId(id string) SetId(id string)
GetId() string GetId() string
SetConnection(conn *websocket.Conn)
GetConnection() *websocket.Conn
Set(key string, data any) Set(key string, data any)
Get(key string) (any, bool) Get(key string) (any, bool)
RemoveMetaData(key string) RemoveMetaData(key string)
CleanupIn(lifetime time.Duration) CleanupIn(lifetime time.Duration, onCleanup ...func())
Dispatch(eventName string, data any, filter func(c IContext) bool) Dispatch(event IEvent, filter func(c IContext) bool)
IsCaller(c IContext) bool IsCaller(c IContext) bool
} }
@ -25,8 +30,10 @@ type context struct {
id string id string
metadata map[string]any metadata map[string]any
timer *time.Timer timer *time.Timer
emitter IEventEmitter hub IEventHub
mutex sync.RWMutex mutex sync.RWMutex
c *websocket.Conn
ctx gocontext.Context
} }
func newContext() IContext { func newContext() IContext {
@ -34,8 +41,9 @@ func newContext() IContext {
id: "", id: "",
metadata: make(map[string]any), metadata: make(map[string]any),
timer: nil, timer: nil,
emitter: di.Inject[IEventEmitter](), hub: di.Inject[IEventHub](),
mutex: sync.RWMutex{}, mutex: sync.RWMutex{},
c: nil,
} }
} }
@ -77,12 +85,10 @@ func (context *context) RemoveMetaData(key string) {
context.mutex.Lock() context.mutex.Lock()
defer context.mutex.Unlock() defer context.mutex.Unlock()
if _, ok := context.metadata[key]; ok {
delete(context.metadata, key) delete(context.metadata, key)
}
} }
func (context *context) CleanupIn(lifetime time.Duration) { func (context *context) CleanupIn(lifetime time.Duration, onCleanup ...func()) {
context.mutex.Lock() context.mutex.Lock()
defer context.mutex.Unlock() defer context.mutex.Unlock()
@ -98,17 +104,28 @@ func (context *context) CleanupIn(lifetime time.Duration) {
if context.timer != nil { if context.timer != nil {
di.Destroy[IContext](context.id) di.Destroy[IContext](context.id)
for _, cleaner := range onCleanup {
cleaner()
}
context.timer = nil context.timer = nil
} }
}(context.id) }(context.id)
} }
func (context *context) Dispatch(eventName string, data any, filter func(c IContext) bool) { func (context *context) Dispatch(event IEvent, filter func(c IContext) bool) {
ev := Event{ ev := Event{
Type: eventName, Type: event.GetEventName(),
Data: data, Data: event.GetEventData(),
IsBackendOnly: filter == nil, IsBackendOnly: filter == nil,
Filter: filter, Filter: filter,
} }
context.emitter.Emit(ev) context.hub.Dispatch(ev, context)
}
func (context *context) GetConnection() *websocket.Conn {
return context.c
}
func (context *context) SetConnection(conn *websocket.Conn) {
context.c = conn
} }

View File

@ -1,5 +1,7 @@
package serverevents package serverevents
import "encoding/json"
func CreateMetaDataFilter[T any](c IContext, metaDataSelector func(IContext) T, vgl func(a T, b T) bool) func(context IContext) bool { func CreateMetaDataFilter[T any](c IContext, metaDataSelector func(IContext) T, vgl func(a T, b T) bool) func(context IContext) bool {
return func(context IContext) bool { return func(context IContext) bool {
metaA := metaDataSelector(c) metaA := metaDataSelector(c)
@ -9,11 +11,19 @@ func CreateMetaDataFilter[T any](c IContext, metaDataSelector func(IContext) T,
} }
func MetadataAs[T any](context IContext, key string) (T, bool) { func MetadataAs[T any](context IContext, key string) (T, bool) {
tmp, _ := context.Get(key)
return ValueAs[T](tmp)
}
func ValueAs[T any](data any) (T, bool) {
var v T var v T
tmp, ok := context.Get(key) str, err := json.Marshal(data)
if tmp == nil { if err != nil {
return v, false return v, false
} }
res, ok := tmp.(T) err = json.Unmarshal(str, &v)
return res, ok if err != nil {
return v, false
}
return v, true
} }

View File

@ -1,89 +0,0 @@
package serverevents
import (
di "git.apihub24.de/admin/generic-di"
"github.com/google/uuid"
"slices"
"sync"
)
func init() {
di.Injectable(newEventEmitter)
}
type IEventEmitter interface {
On(eventType string, do func(Event)) Subscription
OnAll(do func(Event)) Subscription
Emit(event Event)
Unsubscribe(Subscription)
}
type eventEmitter struct {
subscribers map[string][]Subscription
onAllSubscribers []Subscription
mu sync.RWMutex
}
func newEventEmitter() IEventEmitter {
return &eventEmitter{}
}
func (emitter *eventEmitter) On(eventType string, do func(Event)) Subscription {
emitter.mu.Lock()
defer emitter.mu.Unlock()
if emitter.subscribers[eventType] == nil {
emitter.subscribers[eventType] = make([]Subscription, 0)
}
sub := Subscription{
id: uuid.NewString(),
eventType: eventType,
todo: do,
emitter: emitter,
}
emitter.subscribers[eventType] = append(emitter.subscribers[eventType], sub)
return sub
}
func (emitter *eventEmitter) OnAll(do func(Event)) Subscription {
emitter.mu.Lock()
defer emitter.mu.Unlock()
sub := Subscription{
id: uuid.NewString(),
todo: do,
emitter: emitter,
}
emitter.onAllSubscribers = append(emitter.onAllSubscribers, sub)
return sub
}
func (emitter *eventEmitter) Emit(event Event) {
emitter.executeSubscribers(emitter.onAllSubscribers, event)
eventSubscribers := emitter.subscribers[event.Type]
if eventSubscribers != nil {
emitter.executeSubscribers(eventSubscribers, event)
}
}
func (emitter *eventEmitter) Unsubscribe(subscription Subscription) {
if len(subscription.eventType) < 1 {
emitter.onAllSubscribers = slices.DeleteFunc(emitter.onAllSubscribers, func(existSub Subscription) bool {
return existSub.id == subscription.id
})
return
}
eventSubscribers := emitter.subscribers[subscription.eventType]
if eventSubscribers == nil || len(eventSubscribers) < 1 {
return
}
emitter.subscribers[subscription.eventType] = slices.DeleteFunc(emitter.subscribers[subscription.eventType], func(existSub Subscription) bool {
return existSub.id == subscription.id
})
}
func (emitter *eventEmitter) executeSubscribers(toDos []Subscription, event Event) {
for _, sub := range toDos {
sub.todo(event)
}
}

View File

@ -6,3 +6,8 @@ type Event struct {
IsBackendOnly bool `json:"-"` IsBackendOnly bool `json:"-"`
Filter func(c IContext) bool `json:"-"` Filter func(c IContext) bool `json:"-"`
} }
type IEvent interface {
GetEventName() string
GetEventData() any
}

View File

@ -1,9 +1,13 @@
module git.apihub24.de/admin/server_events/v2 module git.apihub24.de/admin/server_events/v2
go 1.22.0 go 1.23.0
toolchain go1.24.4
require ( require (
git.apihub24.de/admin/generic-di v1.4.0 git.apihub24.de/admin/generic-di v1.4.0
github.com/google/uuid v1.6.0 )
github.com/gorilla/websocket v1.5.3
require (
github.com/coder/websocket v1.8.13 // indirect
) )

102
v2/go.sum
View File

@ -1,6 +1,108 @@
git.apihub24.de/admin/generic-di v1.4.0 h1:0mQnpAcavMLBcnF5UO+tUI7abZ6zQPleqPsjEk3WIaU= git.apihub24.de/admin/generic-di v1.4.0 h1:0mQnpAcavMLBcnF5UO+tUI7abZ6zQPleqPsjEk3WIaU=
git.apihub24.de/admin/generic-di v1.4.0/go.mod h1:VcHV8MOb1qhwabHdO09CpjEg2VaDesehul86g1iyOxY= git.apihub24.de/admin/generic-di v1.4.0/go.mod h1:VcHV8MOb1qhwabHdO09CpjEg2VaDesehul86g1iyOxY=
github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8=
github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE=
github.com/coder/websocket v1.8.13 h1:f3QZdXy7uGVz+4uCJy2nTZyM0yTBj8yANEHhqlXZ9FE=
github.com/coder/websocket v1.8.13/go.mod h1:LNVeNrXQZfe5qhS9ALED3uA+l5pPqvwXg3CKoDBB2gs=
github.com/dave/jennifer v1.7.0 h1:uRbSBH9UTS64yXbh4FrMHfgfY762RD+C7bUPKODpSJE=
github.com/dave/jennifer v1.7.0/go.mod h1:nXbxhEmQfOZhWml3D1cDK5M1FLnMSozpbFN/m3RmGZc=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/francoispqt/gojay v1.2.13 h1:d2m3sFjloqoIUQU3TsHBgj6qg/BVGlTBeHDUmyJnXKk=
github.com/francoispqt/gojay v1.2.13/go.mod h1:ehT5mTG4ua4581f1++1WLG0vPdaA9HaiDsoyrBGkyDY=
github.com/fsnotify/fsnotify v1.7.0 h1:8JEhPFa5W2WU7YfeZzPNqzMP6Lwt7L2715Ggo0nosvA=
github.com/fsnotify/fsnotify v1.7.0/go.mod h1:40Bi/Hjc2AVfZrqy+aj+yEI+/bRxZnMJyTJwOpGvigM=
github.com/go-kit/log v0.2.1 h1:MRVx0/zhvdseW+Gza6N9rVzU/IVzaeE1SFI4raAhmBU=
github.com/go-kit/log v0.2.1/go.mod h1:NwTd00d/i8cPZ3xOwwiv2PO5MOcx78fFErGNcVmBjv0=
github.com/go-logfmt/logfmt v0.6.0 h1:wGYYu3uicYdqXVgoYbvnkrPVXkuLM1p1ifugDMEdRi4=
github.com/go-logfmt/logfmt v0.6.0/go.mod h1:WYhtIu8zTZfxdn5+rREduYbwxfcBr/Vr6KEVveWlfTs=
github.com/go-logr/logr v1.2.4 h1:g01GSCwiDw2xSZfjJ2/T9M+S6pFdcNtFYsp+Y43HYDQ=
github.com/go-logr/logr v1.2.4/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572 h1:tfuBGBXKqDEevZMzYi5KSi8KkcZtzBcTgAUUtapy0OI=
github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572/go.mod h1:9Pwr4B2jHnOSGXyyzV8ROjYa2ojvAY6HCGYYfMoC3Ls=
github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg=
github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/pprof v0.0.0-20240402174815-29b9bb013b0f h1:f00RU+zOX+B3rLAmMMkzHUF2h1z4DeYR9tTCvEq2REY=
github.com/google/pprof v0.0.0-20240402174815-29b9bb013b0f/go.mod h1:kf6iHlnVGwgKolg33glAes7Yg/8iWP8ukqeldJSO7jw=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg= github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg=
github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/nxadm/tail v1.4.11 h1:8feyoE3OzPrcshW5/MJ4sGESc5cqmGkGCWlco4l0bqY=
github.com/nxadm/tail v1.4.11/go.mod h1:OTaG3NK980DZzxbRq6lEuzgU+mug70nY11sMd4JXXHc=
github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE=
github.com/onsi/ginkgo v1.16.5/go.mod h1:+E8gABHa3K6zRBolWtd+ROzc/U5bkGt0FwiG042wbpU=
github.com/onsi/ginkgo/v2 v2.13.0 h1:0jY9lJquiL8fcf3M4LAXN5aMlS/b2BV86HFFPCPMgE4=
github.com/onsi/ginkgo/v2 v2.13.0/go.mod h1:TE309ZR8s5FsKKpuB1YAQYBzCaAfUgatB/xlT/ETL/o=
github.com/onsi/gomega v1.27.10 h1:naR28SdDFlqrG6kScpT8VWpu1xWY5nJRCF3XaYyBjhI=
github.com/onsi/gomega v1.27.10/go.mod h1:RsS8tutOdbdgzbPtzzATp12yT7kM5I5aElG3evPbQ0M=
github.com/philippseith/signalr v0.7.0 h1:bt8uusIKr3+hpQ5bRxEsHAM3VbEU15lve7e74Hf5a4E=
github.com/philippseith/signalr v0.7.0/go.mod h1:deqWw2+rPPcdUQVvzXoB6A6FQB3v1OCnroy8CiUJNkg=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/quic-go/qpack v0.5.1 h1:giqksBPnT/HDtZ6VhtFKgoLOWmlyo9Ei6u9PqzIMbhI=
github.com/quic-go/qpack v0.5.1/go.mod h1:+PC4XFrEskIVkcLzpEkbLqq1uCoxPhQuvK5rH1ZgaEg=
github.com/quic-go/quic-go v0.48.2 h1:wsKXZPeGWpMpCGSWqOcqpW2wZYic/8T3aqiOID0/KWE=
github.com/quic-go/quic-go v0.48.2/go.mod h1:yBgs3rWBOADpga7F+jJsb6Ybg1LSYiQvwWlLX+/6HMs=
github.com/quic-go/webtransport-go v0.8.1-0.20241018022711-4ac2c9250e66 h1:4WFk6u3sOT6pLa1kQ50ZVdm8BQFgJNA117cepZxtLIg=
github.com/quic-go/webtransport-go v0.8.1-0.20241018022711-4ac2c9250e66/go.mod h1:Vp72IJajgeOL6ddqrAhmp7IM9zbTcgkQxD/YdxrVwMw=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/teivah/onecontext v1.3.0 h1:tbikMhAlo6VhAuEGCvhc8HlTnpX4xTNPTOseWuhO1J0=
github.com/teivah/onecontext v1.3.0/go.mod h1:hoW1nmdPVK/0jrvGtcx8sCKYs2PiS4z0zzfdeuEVyb0=
github.com/vmihailenco/msgpack/v5 v5.4.1 h1:cQriyiUvjTwOHg8QZaPihLWeRAAVoCpE00IUPn0Bjt8=
github.com/vmihailenco/msgpack/v5 v5.4.1/go.mod h1:GaZTsDaehaPpQVyxrf5mtQlH+pc21PIudVV/E3rRQok=
github.com/vmihailenco/tagparser/v2 v2.0.0 h1:y09buUbR+b5aycVFQs/g70pqKVZNBmxwAhO7/IwNM9g=
github.com/vmihailenco/tagparser/v2 v2.0.0/go.mod h1:Wri+At7QHww0WTrCBeu4J6bNtoV6mEfg5OIWRZA9qds=
go.uber.org/goleak v1.1.10 h1:z+mqJhf6ss6BSfSM671tgKyZBFPTTJM+HLxnhPC3wu0=
go.uber.org/goleak v1.1.10/go.mod h1:8a7PlsEVH3e/a/GLqe5IIrQx6GzcnRmZEufDUTk4A7A=
go.uber.org/mock v0.4.0 h1:VcM4ZOtdbR4f6VXfiOpwpVJDL6lCReaZ6mw31wqh7KU=
go.uber.org/mock v0.4.0/go.mod h1:a6FSlNadKUHUa9IP5Vyt1zh4fC7uAwxMutEAscFbkZc=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.36.0 h1:AnAEvhDddvBdpY+uR+MyHmuZzzNqXSe/GvuDeob5L34=
golang.org/x/crypto v0.36.0/go.mod h1:Y4J0ReaxCR1IMaabaSMugxJES1EpwhBHhv2bDHklZvc=
golang.org/x/exp v0.0.0-20240506185415-9bf2ced13842 h1:vr/HnozRka3pE4EsMEg1lgkXJkTFJCVUX+S/ZT6wYzM=
golang.org/x/exp v0.0.0-20240506185415-9bf2ced13842/go.mod h1:XtvwrStGgqGPLc4cjQfWqZHG1YFdYs6swckp8vpsjnc=
golang.org/x/lint v0.0.0-20190930215403-16217165b5de h1:5hukYrvBGR8/eNkX5mdUezrA6JiaEZDtJb9Ei+1LlBs=
golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
golang.org/x/mod v0.17.0 h1:zY54UmvipHiNd+pm+m0x9KhZ9hl1/7QNMyxXbc6ICqA=
golang.org/x/mod v0.17.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c=
golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.38.0 h1:vRMAPTMaeGqVhG5QyLJHqNDwecKTomGeqbnfZyKlBI8=
golang.org/x/net v0.38.0/go.mod h1:ivrbrMbzFq5J41QOQh0siUuly180yBYtLp+CKbEaFx8=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.12.0 h1:MHc5BpPuC30uJk597Ri8TV3CNZcTLu6B6z4lJy+g6Jw=
golang.org/x/sync v0.12.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.31.0 h1:ioabZlmFYtWhL+TRYpcnNlLwhyxaM9kWTDEmfnprqik=
golang.org/x/sys v0.31.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.23.0 h1:D71I7dUrlY+VX0gQShAThNGHFxZ13dGLBHQLVl1mJlY=
golang.org/x/text v0.23.0/go.mod h1:/BLNzu4aZCJ1+kcD0DNRotWKage4q2rGVAg4o22unh4=
golang.org/x/time v0.5.0 h1:o7cqy6amK/52YcAKIPlM3a+Fpj35zvRj2TP+e1xFSfk=
golang.org/x/time v0.5.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM=
golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
golang.org/x/tools v0.0.0-20191108193012-7d206e10da11/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d h1:vU5i/LfpvrRCpgM/VPfJLg5KjxD3E+hfT1SH+d9zLwg=
golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d/go.mod h1:aiJjzUbINMkxbQROHiO6hDPo2LHcIPhhQsa9DLh0yGk=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/protobuf v1.33.0 h1:uNO2rsAINq/JlFpSdYEKIZ0uKD/R9cpdv0T+yoGwGmI=
google.golang.org/protobuf v1.33.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ=
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

View File

@ -1,6 +1,7 @@
package serverevents package serverevents
type IEventHandler interface { type IEventHandler interface {
GetConnectedEventName() string
CanExecute(IContext) bool CanExecute(IContext) bool
Handle(IContext) Handle(IContext, any)
} }

99
v2/hub.go Normal file
View File

@ -0,0 +1,99 @@
package serverevents
import (
gocontext "context"
"fmt"
di "git.apihub24.de/admin/generic-di"
"github.com/coder/websocket"
"github.com/coder/websocket/wsjson"
"sync"
"time"
)
func init() {
di.Injectable(newHub)
}
type IEventHub interface {
SetContextLifetime(duration time.Duration)
OnConnected(connectionId string, conn *websocket.Conn)
OnDisconnected(connectionId string)
Dispatch(event Event, callerCtx IContext)
Event(event Event, id string)
}
type eventHub struct {
mu sync.Mutex
contextLifetime time.Duration
contexts map[string]IContext
registration IEventHandlerRegistration
parser IMessageParser
}
func newHub() IEventHub {
return &eventHub{
mu: sync.Mutex{},
contextLifetime: 500 * time.Second,
contexts: make(map[string]IContext),
registration: di.Inject[IEventHandlerRegistration](),
parser: di.Inject[IMessageParser](),
}
}
func (hub *eventHub) Dispatch(event Event, callerCtx IContext) {
// dispatch Event to Handler
handler, getHandlerErr := hub.registration.GetHandler(event.Type)
if getHandlerErr == nil && handler.CanExecute(callerCtx) {
handler.Handle(callerCtx, event.Data)
}
if event.IsBackendOnly || event.Filter == nil {
// only dispatch Event on Backend or we have no Context Filter
// if you want to dispatch to all Clients use Filter that returns true
return
}
writeCtx, cancel := gocontext.WithTimeout(gocontext.Background(), time.Second*10)
defer cancel()
for _, ctx := range hub.contexts {
if event.Filter(ctx) {
writeErr := wsjson.Write(writeCtx, ctx.GetConnection(), event)
if writeErr != nil {
println(fmt.Sprintf("[Error on write to Websocket]: %v", writeErr))
}
}
}
}
func (hub *eventHub) SetContextLifetime(duration time.Duration) {
hub.contextLifetime = duration
}
func (hub *eventHub) Event(event Event, id string) {
callerCtx, ok := hub.contexts[id]
if !ok {
println(fmt.Sprintf("[Error on select caller context]: missing context connectionId: %s", id))
return
}
// Event from Client not send back!
event.IsBackendOnly = true
hub.Dispatch(event, callerCtx)
}
func (hub *eventHub) OnConnected(connectionId string, conn *websocket.Conn) {
hub.mu.Lock()
defer hub.mu.Unlock()
hub.contexts[connectionId] = di.Inject[IContext](connectionId)
hub.contexts[connectionId].SetId(connectionId)
hub.contexts[connectionId].SetConnection(conn)
}
func (hub *eventHub) OnDisconnected(connectionId string) {
hub.mu.Lock()
defer hub.mu.Unlock()
hub.contexts[connectionId].CleanupIn(hub.contextLifetime)
delete(hub.contexts, connectionId)
}

View File

@ -0,0 +1,5 @@
package dto
type Greeting struct {
Message string `json:"message"`
}

View File

@ -0,0 +1,26 @@
package definitions
import (
serverevents "git.apihub24.de/admin/server_events/v2"
"git.apihub24.de/admin/server_events/v2/implementation_test/data/dto"
)
const GreeterEventName = "greet"
type greetEvent struct {
data dto.Greeting
}
func NewGreetEvent(data dto.Greeting) serverevents.IEvent {
return &greetEvent{
data: data,
}
}
func (ev *greetEvent) GetEventName() string {
return GreeterEventName
}
func (ev *greetEvent) GetEventData() any {
return ev.data
}

View File

@ -0,0 +1,23 @@
package definitions
import serverevents "git.apihub24.de/admin/server_events/v2"
const GreetMeEventName = "greet me"
type greetMeEvent struct {
data string
}
func NewGreetMeEvent(name string) serverevents.IEvent {
return &greetMeEvent{
data: name,
}
}
func (ev *greetMeEvent) GetEventName() string {
return GreetMeEventName
}
func (ev *greetMeEvent) GetEventData() any {
return ev.data
}

View File

@ -0,0 +1,19 @@
package definitions
import serverevents "git.apihub24.de/admin/server_events/v2"
const PingEventName = "ping"
type pingEvent struct{}
func NewPingEvent() serverevents.IEvent {
return &pingEvent{}
}
func (ev *pingEvent) GetEventName() string {
return PingEventName
}
func (ev *pingEvent) GetEventData() any {
return nil
}

View File

@ -0,0 +1,19 @@
package definitions
import serverevents "git.apihub24.de/admin/server_events/v2"
const PongEventName = "pong"
type pongEvent struct{}
func NewPongEvent() serverevents.IEvent {
return &pongEvent{}
}
func (ev *pongEvent) GetEventName() string {
return PongEventName
}
func (ev *pongEvent) GetEventData() any {
return nil
}

View File

@ -1,37 +0,0 @@
package events
import (
"fmt"
serverevents "git.apihub24.de/admin/server_events/v2"
)
func withSameUserName(context serverevents.IContext) func(context serverevents.IContext) bool {
return serverevents.CreateMetaDataFilter(context, func(c serverevents.IContext) string {
userName, _ := serverevents.MetadataAs[string](context, "UserName")
return userName
}, func(a string, b string) bool {
return a == b
})
}
type greeting struct {
Message string
}
type greeterEventHandler struct{}
func (handler greeterEventHandler) CanExecute(context serverevents.IContext) bool {
_, ok := serverevents.MetadataAs[string](context, "UserName")
return ok
}
func (handler greeterEventHandler) Handle(context serverevents.IContext) {
userName, _ := serverevents.MetadataAs[string](context, "UserName")
context.Dispatch("greet", greeting{
Message: fmt.Sprintf("Hello, %s", userName),
}, withSameUserName(context))
}
func NewGreeterEventHandler() serverevents.IEventHandler {
return &greeterEventHandler{}
}

View File

@ -0,0 +1,34 @@
package handler
import (
"fmt"
serverevents "git.apihub24.de/admin/server_events/v2"
"git.apihub24.de/admin/server_events/v2/implementation_test/data/dto"
"git.apihub24.de/admin/server_events/v2/implementation_test/events/definitions"
"git.apihub24.de/admin/server_events/v2/implementation_test/filter"
)
type greetMeEventHandler struct{}
func NewGreetMeEventHandler() serverevents.IEventHandler {
return &greetMeEventHandler{}
}
func (handler *greetMeEventHandler) GetConnectedEventName() string {
return definitions.GreetMeEventName
}
func (handler *greetMeEventHandler) CanExecute(_ serverevents.IContext) bool {
return true
}
func (handler *greetMeEventHandler) Handle(context serverevents.IContext, data any) {
userName, ok := serverevents.ValueAs[string](data)
if !ok || len(userName) < 1 {
userName = "Anonymous"
}
context.Dispatch(definitions.NewGreetEvent(dto.Greeting{
Message: fmt.Sprintf("Hello, %s", userName),
}), filter.WithSameUserName(context))
}

View File

@ -0,0 +1,24 @@
package handler
import (
serverevents "git.apihub24.de/admin/server_events/v2"
"git.apihub24.de/admin/server_events/v2/implementation_test/events/definitions"
)
type pingEventHandler struct{}
func NewPingEventHandler() serverevents.IEventHandler {
return &pingEventHandler{}
}
func (handler *pingEventHandler) GetConnectedEventName() string {
return definitions.PingEventName
}
func (handler *pingEventHandler) CanExecute(_ serverevents.IContext) bool {
return true
}
func (handler *pingEventHandler) Handle(context serverevents.IContext, _ any) {
context.Dispatch(definitions.NewPongEvent(), context.IsCaller)
}

View File

@ -1,19 +0,0 @@
package events
import (
serverevents "git.apihub24.de/admin/server_events/v2"
)
type pingEventHandler struct{}
func NewPingEventHandler() serverevents.IEventHandler {
return &pingEventHandler{}
}
func (p pingEventHandler) CanExecute(context serverevents.IContext) bool {
return true
}
func (p pingEventHandler) Handle(context serverevents.IContext) {
context.Dispatch("pong", nil, context.IsCaller)
}

View File

@ -0,0 +1,12 @@
package filter
import serverevents "git.apihub24.de/admin/server_events/v2"
func WithSameUserName(context serverevents.IContext) func(context serverevents.IContext) bool {
return serverevents.CreateMetaDataFilter(context, func(c serverevents.IContext) string {
userName, _ := serverevents.MetadataAs[string](context, "UserName")
return userName
}, func(a string, b string) bool {
return a == b
})
}

View File

@ -0,0 +1,61 @@
package main
var Index = `<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Test</title>
<script src="https://cdnjs.cloudflare.com/ajax/libs/microsoft-signalr/8.0.7/signalr.min.js" integrity="sha512-7SRCYIJtR6F8ocwW7UxW6wGKqbSyqREDbfCORCbGLatU0iugBLwyOXpzhkPyHIFdBO0K2VCu57fvP2Twgx1o2A==" crossorigin="anonymous" referrerpolicy="no-referrer"></script>
</head>
<body>
<div>
<div>
<input type="text" id="name" />
<button id="greet">Greet</button>
<button id="ping">Ping</button>
</div>
<ul id="events"></ul>
</div>
<script>
(async function () {
let ws;
function connect() {
if (!localStorage.getItem('client_id')) {
localStorage.setItem('client_id', crypto.randomUUID());
}
ws = new WebSocket('/events?id=' + localStorage.getItem('client_id'));
ws.addEventListener('open', function () {
wsOpen = true;
});
ws.addEventListener('message', function (message) {
const event = JSON.parse(message.data);
if (!event.type) {
return;
}
const eventList = document.getElementById('events');
const li = document.createElement('li');
li.innerText = event.type + ': ' + (event.data ? JSON.stringify(event.data) : 'null');
eventList.append(li);
});
ws.onclose = function () {
setTimeout(() => connect(), 5000);
};
}
document.getElementById('greet').onclick = function () {
const name = document.getElementById('name').value;
ws.send(JSON.stringify({
type: 'greet me',
data: name,
}));
};
document.getElementById('ping').onclick = function () {
ws.send(JSON.stringify({type: 'ping'}));
};
connect();
})();
</script>
</body>
</html>`

View File

@ -1,25 +1,38 @@
package main package main
import ( import (
di "git.apihub24.de/admin/generic-di"
serverevents "git.apihub24.de/admin/server_events/v2"
"git.apihub24.de/admin/server_events/v2/implementation_test/events"
"log" "log"
"net/http" "net/http"
"time" "time"
di "git.apihub24.de/admin/generic-di"
serverevents "git.apihub24.de/admin/server_events/v2"
"git.apihub24.de/admin/server_events/v2/implementation_test/events/handler"
) )
func main() { func main() {
serverEventsMiddleware := di.Inject[serverevents.IMiddleware]() serverEventsMiddleware := di.Inject[serverevents.IMiddleware]()
eventRegistration := di.Inject[serverevents.IEventHandlerRegistration]() eventRegistration := di.Inject[serverevents.IEventHandlerRegistration]()
eventRegistration.Add("ping", events.NewPingEventHandler) eventRegistration.Use(handler.NewPingEventHandler)
eventRegistration.Add("greet all", events.NewGreeterEventHandler) eventRegistration.Use(handler.NewGreetMeEventHandler)
serverEventsMiddleware.Use(serverevents.MiddlewareOptions{ router := http.NewServeMux()
router.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
if r.Method == "GET" {
w.Header().Set("Content-Type", "text/html")
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte(Index))
return
}
})
router = serverEventsMiddleware.Use(serverevents.MiddlewareOptions{
Path: "/events", Path: "/events",
ContextLifetime: 500 * time.Second, ContextLifetime: 500 * time.Second,
}, nil) KeepAliveTime: 2 * time.Second,
}, router)
log.Fatal(http.ListenAndServe(":8080", nil)) log.Fatal(http.ListenAndServe(":8080", router))
} }

View File

@ -1,11 +1,12 @@
package serverevents package serverevents
import ( import (
gocontext "context"
"fmt" "fmt"
di "git.apihub24.de/admin/generic-di" di "git.apihub24.de/admin/generic-di"
"github.com/gorilla/websocket" "github.com/coder/websocket"
"github.com/coder/websocket/wsjson"
"net/http" "net/http"
"sync"
"time" "time"
) )
@ -16,152 +17,67 @@ func init() {
type MiddlewareOptions struct { type MiddlewareOptions struct {
Path string Path string
ContextLifetime time.Duration ContextLifetime time.Duration
KeepAliveTime time.Duration
} }
type IMiddleware interface { type IMiddleware interface {
Use(options MiddlewareOptions, mux *http.ServeMux) Use(options MiddlewareOptions, mux *http.ServeMux) *http.ServeMux
} }
type serverEventsMiddleware struct { type serverEventsMiddleware struct {
options MiddlewareOptions hub IEventHub
upgrader websocket.Upgrader
streamSubscribers map[string]bool
emitter IEventEmitter
parser IMessageParser
registration IEventHandlerRegistration
mutex sync.Mutex
} }
func newServerEventsMiddleware() IMiddleware { func newServerEventsMiddleware() IMiddleware {
return &serverEventsMiddleware{ return &serverEventsMiddleware{
upgrader: websocket.Upgrader{ hub: di.Inject[IEventHub](),
CheckOrigin: func(r *http.Request) bool {
return true
},
},
streamSubscribers: make(map[string]bool),
emitter: di.Inject[IEventEmitter](),
parser: di.Inject[IMessageParser](),
registration: di.Inject[IEventHandlerRegistration](),
mutex: sync.Mutex{},
} }
} }
func (middleware *serverEventsMiddleware) Use(options MiddlewareOptions, muxer *http.ServeMux) { func (middleware *serverEventsMiddleware) Use(options MiddlewareOptions, muxer *http.ServeMux) *http.ServeMux {
middleware.options = options
middleware.selectMethod(options.Path, func(w http.ResponseWriter, r *http.Request) {
middleware.handleEventStream(w, r)
}, muxer)
}
func (middleware *serverEventsMiddleware) handleEventStream(w http.ResponseWriter, r *http.Request) {
conn, err := middleware.upgrader.Upgrade(w, r, nil)
if err != nil {
fmt.Println("Error upgrading:", err)
return
}
defer func() {
// do not handle Error
_ = conn.Close()
}()
id := r.URL.Query().Get("id")
context := di.Inject[IContext](id)
context.SetId(id)
// Locks the mutex before accessing 'middleware.streamSubscribers',
// to prevent race conditions when reading/writing to the map.
// This ensures that only one goroutine checks/registers at a time.
// defer can't be used here as the stream listens for messages in an infinite loop
middleware.mutex.Lock()
if _, ok := middleware.streamSubscribers[id]; !ok {
subscription := middleware.emitter.OnAll(func(ev Event) {
// trigger the Backend Handlers
handler, getHandlerErr := middleware.registration.GetHandler(ev.Type)
if getHandlerErr != nil {
println(fmt.Sprintf("no Handler found for Event %s", ev.Type))
} else {
if handler.CanExecute(context) {
handler.Handle(context)
}
}
if ev.IsBackendOnly || ev.Filter == nil || !ev.Filter(context) {
// the Event is Backend only or there is no socket (context) Filter
// to send to all Sockets deliver context Filter with returns bool
return
}
jsonData, jsonErr := middleware.parser.ToString(ev)
if jsonErr != nil {
println(fmt.Sprintf("Error parse event %s %s", jsonErr.Error(), ev.Type))
return
}
_ = conn.WriteMessage(websocket.TextMessage, []byte(jsonData))
})
defer func() {
// Blocks the mutex again, as this block also accesses 'middleware.streamSubscribers'.
// This is crucial to avoid race conditions when removing entries,
// while new connections may be established.
// defer cannot be used here either
middleware.mutex.Lock()
subscription.Unsubscribe()
delete(middleware.streamSubscribers, id)
// starts the Cleanup Process
context.CleanupIn(middleware.options.ContextLifetime)
middleware.mutex.Unlock()
}()
middleware.streamSubscribers[id] = true
}
middleware.mutex.Unlock()
ctx := r.Context()
for {
messageType, data, err := conn.ReadMessage()
if err != nil {
return
}
if middleware.HandleMessage(conn, messageType, data) {
return
}
select {
case <-ctx.Done():
return
}
}
}
func (middleware *serverEventsMiddleware) selectMethod(path string, todo http.HandlerFunc, muxer *http.ServeMux) {
if muxer == nil { if muxer == nil {
http.HandleFunc(path, todo) muxer = http.NewServeMux()
} else {
muxer.HandleFunc(path, todo)
} }
eventHub := di.Inject[IEventHub]()
muxer.HandleFunc(options.Path, func(w http.ResponseWriter, r *http.Request) {
id := r.URL.Query().Get("id")
if len(id) < 1 {
println(fmt.Sprintf("no id found in query websocket closed"))
return
}
c, connectionErr := websocket.Accept(w, r, nil)
if connectionErr != nil {
println(fmt.Sprintf("[Error on connect Websocket]: %v", connectionErr))
return
}
defer func() {
println("Websocket disconnect")
eventHub.OnDisconnected(id)
_ = c.CloseNow()
}()
eventHub.OnConnected(id, c)
for {
err := readMessage(c, eventHub, id)
if err != nil {
return
}
}
})
return muxer
} }
func (middleware *serverEventsMiddleware) HandleMessage(conn *websocket.Conn, messageType int, data []byte) bool { func readMessage(c *websocket.Conn, eventHub IEventHub, id string) error {
switch messageType { var message Event
case websocket.PingMessage: readErr := wsjson.Read(gocontext.Background(), c, &message)
pongErr := conn.WriteMessage(websocket.PongMessage, []byte{}) if readErr != nil {
if pongErr != nil { println(fmt.Sprintf("[Error on read from Websocket]: %v", readErr))
println(fmt.Sprintf("error on send PongMessage: %s", pongErr.Error())) return readErr
} }
break
case websocket.CloseMessage: eventHub.Event(message, id)
// return true to close the Websocket return nil
return true
case websocket.TextMessage:
ev, parseErr := middleware.parser.FromString(string(data))
if parseErr != nil {
println(fmt.Sprintf("error on parse Event: %s data: %s", parseErr.Error(), string(data)))
return false
}
// Event was dispatched over the Websocket so not send it back to Client!
ev.IsBackendOnly = true
middleware.emitter.Emit(ev)
break
case websocket.BinaryMessage:
println(fmt.Sprintf("BinaryMessages are not supported"))
break
}
return false
} }

View File

@ -2,8 +2,9 @@ package serverevents
import ( import (
"fmt" "fmt"
di "git.apihub24.de/admin/generic-di"
"sync" "sync"
di "git.apihub24.de/admin/generic-di"
) )
func init() { func init() {
@ -11,7 +12,7 @@ func init() {
} }
type IEventHandlerRegistration interface { type IEventHandlerRegistration interface {
Add(eventType string, creator func() IEventHandler) Use(creator func() IEventHandler)
GetHandler(eventType string) (IEventHandler, error) GetHandler(eventType string) (IEventHandler, error)
} }
@ -29,16 +30,14 @@ func newEventHandlerRegistration() IEventHandlerRegistration {
} }
} }
func (registration *eventHandlerRegistration) Add(eventType string, creator func() IEventHandler) { func (registration *eventHandlerRegistration) Use(creator func() IEventHandler) {
registration.mutex.Lock() registration.mutex.Lock()
defer registration.mutex.Unlock() defer registration.mutex.Unlock()
if _, ok := registration.instances[eventType]; ok { tmp := creator()
eventType := tmp.GetConnectedEventName()
delete(registration.instances, eventType) delete(registration.instances, eventType)
}
if _, ok := registration.creators[eventType]; ok {
delete(registration.creators, eventType) delete(registration.creators, eventType)
}
registration.creators[eventType] = creator registration.creators[eventType] = creator
} }

View File

@ -1,12 +0,0 @@
package serverevents
type Subscription struct {
id string
eventType string
todo func(Event)
emitter IEventEmitter
}
func (subscription Subscription) Unsubscribe() {
subscription.emitter.Unsubscribe(subscription)
}