Compare commits

..

No commits in common. "a5f7a8c2ae26508101e5005600af54da7edf0d31" and "09136b0ca62ca538aa4f7e3c53d864e128c99303" have entirely different histories.

10 changed files with 245 additions and 352 deletions

View File

@ -1,9 +1,7 @@
package serverevents package serverevents
import ( import (
gocontext "context"
di "git.apihub24.de/admin/generic-di" di "git.apihub24.de/admin/generic-di"
"github.com/coder/websocket"
"sync" "sync"
"time" "time"
) )
@ -15,8 +13,6 @@ func init() {
type IContext interface { type IContext interface {
SetId(id string) SetId(id string)
GetId() string GetId() string
SetConnection(conn *websocket.Conn)
GetConnection() *websocket.Conn
Set(key string, data any) Set(key string, data any)
Get(key string) (any, bool) Get(key string) (any, bool)
RemoveMetaData(key string) RemoveMetaData(key string)
@ -29,10 +25,8 @@ type context struct {
id string id string
metadata map[string]any metadata map[string]any
timer *time.Timer timer *time.Timer
hub IEventHub emitter IEventEmitter
mutex sync.RWMutex mutex sync.RWMutex
c *websocket.Conn
ctx gocontext.Context
} }
func newContext() IContext { func newContext() IContext {
@ -40,9 +34,8 @@ func newContext() IContext {
id: "", id: "",
metadata: make(map[string]any), metadata: make(map[string]any),
timer: nil, timer: nil,
hub: di.Inject[IEventHub](), emitter: di.Inject[IEventEmitter](),
mutex: sync.RWMutex{}, mutex: sync.RWMutex{},
c: nil,
} }
} }
@ -117,13 +110,5 @@ func (context *context) Dispatch(eventName string, data any, filter func(c ICont
IsBackendOnly: filter == nil, IsBackendOnly: filter == nil,
Filter: filter, Filter: filter,
} }
context.hub.Dispatch(ev, context) context.emitter.Emit(ev)
}
func (context *context) GetConnection() *websocket.Conn {
return context.c
}
func (context *context) SetConnection(conn *websocket.Conn) {
context.c = conn
} }

89
v2/emitter.go Normal file
View File

@ -0,0 +1,89 @@
package serverevents
import (
di "git.apihub24.de/admin/generic-di"
"github.com/google/uuid"
"slices"
"sync"
)
func init() {
di.Injectable(newEventEmitter)
}
type IEventEmitter interface {
On(eventType string, do func(Event)) Subscription
OnAll(do func(Event)) Subscription
Emit(event Event)
Unsubscribe(Subscription)
}
type eventEmitter struct {
subscribers map[string][]Subscription
onAllSubscribers []Subscription
mu sync.RWMutex
}
func newEventEmitter() IEventEmitter {
return &eventEmitter{}
}
func (emitter *eventEmitter) On(eventType string, do func(Event)) Subscription {
emitter.mu.Lock()
defer emitter.mu.Unlock()
if emitter.subscribers[eventType] == nil {
emitter.subscribers[eventType] = make([]Subscription, 0)
}
sub := Subscription{
id: uuid.NewString(),
eventType: eventType,
todo: do,
emitter: emitter,
}
emitter.subscribers[eventType] = append(emitter.subscribers[eventType], sub)
return sub
}
func (emitter *eventEmitter) OnAll(do func(Event)) Subscription {
emitter.mu.Lock()
defer emitter.mu.Unlock()
sub := Subscription{
id: uuid.NewString(),
todo: do,
emitter: emitter,
}
emitter.onAllSubscribers = append(emitter.onAllSubscribers, sub)
return sub
}
func (emitter *eventEmitter) Emit(event Event) {
emitter.executeSubscribers(emitter.onAllSubscribers, event)
eventSubscribers := emitter.subscribers[event.Type]
if eventSubscribers != nil {
emitter.executeSubscribers(eventSubscribers, event)
}
}
func (emitter *eventEmitter) Unsubscribe(subscription Subscription) {
if len(subscription.eventType) < 1 {
emitter.onAllSubscribers = slices.DeleteFunc(emitter.onAllSubscribers, func(existSub Subscription) bool {
return existSub.id == subscription.id
})
return
}
eventSubscribers := emitter.subscribers[subscription.eventType]
if eventSubscribers == nil || len(eventSubscribers) < 1 {
return
}
emitter.subscribers[subscription.eventType] = slices.DeleteFunc(emitter.subscribers[subscription.eventType], func(existSub Subscription) bool {
return existSub.id == subscription.id
})
}
func (emitter *eventEmitter) executeSubscribers(toDos []Subscription, event Event) {
for _, sub := range toDos {
sub.todo(event)
}
}

View File

@ -1,13 +1,9 @@
module git.apihub24.de/admin/server_events/v2 module git.apihub24.de/admin/server_events/v2
go 1.23.0 go 1.22.0
toolchain go1.24.4
require ( require (
git.apihub24.de/admin/generic-di v1.4.0 git.apihub24.de/admin/generic-di v1.4.0
) github.com/google/uuid v1.6.0
github.com/gorilla/websocket v1.5.3
require (
github.com/coder/websocket v1.8.13 // indirect
) )

102
v2/go.sum
View File

@ -1,108 +1,6 @@
git.apihub24.de/admin/generic-di v1.4.0 h1:0mQnpAcavMLBcnF5UO+tUI7abZ6zQPleqPsjEk3WIaU= git.apihub24.de/admin/generic-di v1.4.0 h1:0mQnpAcavMLBcnF5UO+tUI7abZ6zQPleqPsjEk3WIaU=
git.apihub24.de/admin/generic-di v1.4.0/go.mod h1:VcHV8MOb1qhwabHdO09CpjEg2VaDesehul86g1iyOxY= git.apihub24.de/admin/generic-di v1.4.0/go.mod h1:VcHV8MOb1qhwabHdO09CpjEg2VaDesehul86g1iyOxY=
github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8=
github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE=
github.com/coder/websocket v1.8.13 h1:f3QZdXy7uGVz+4uCJy2nTZyM0yTBj8yANEHhqlXZ9FE=
github.com/coder/websocket v1.8.13/go.mod h1:LNVeNrXQZfe5qhS9ALED3uA+l5pPqvwXg3CKoDBB2gs=
github.com/dave/jennifer v1.7.0 h1:uRbSBH9UTS64yXbh4FrMHfgfY762RD+C7bUPKODpSJE=
github.com/dave/jennifer v1.7.0/go.mod h1:nXbxhEmQfOZhWml3D1cDK5M1FLnMSozpbFN/m3RmGZc=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/francoispqt/gojay v1.2.13 h1:d2m3sFjloqoIUQU3TsHBgj6qg/BVGlTBeHDUmyJnXKk=
github.com/francoispqt/gojay v1.2.13/go.mod h1:ehT5mTG4ua4581f1++1WLG0vPdaA9HaiDsoyrBGkyDY=
github.com/fsnotify/fsnotify v1.7.0 h1:8JEhPFa5W2WU7YfeZzPNqzMP6Lwt7L2715Ggo0nosvA=
github.com/fsnotify/fsnotify v1.7.0/go.mod h1:40Bi/Hjc2AVfZrqy+aj+yEI+/bRxZnMJyTJwOpGvigM=
github.com/go-kit/log v0.2.1 h1:MRVx0/zhvdseW+Gza6N9rVzU/IVzaeE1SFI4raAhmBU=
github.com/go-kit/log v0.2.1/go.mod h1:NwTd00d/i8cPZ3xOwwiv2PO5MOcx78fFErGNcVmBjv0=
github.com/go-logfmt/logfmt v0.6.0 h1:wGYYu3uicYdqXVgoYbvnkrPVXkuLM1p1ifugDMEdRi4=
github.com/go-logfmt/logfmt v0.6.0/go.mod h1:WYhtIu8zTZfxdn5+rREduYbwxfcBr/Vr6KEVveWlfTs=
github.com/go-logr/logr v1.2.4 h1:g01GSCwiDw2xSZfjJ2/T9M+S6pFdcNtFYsp+Y43HYDQ=
github.com/go-logr/logr v1.2.4/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572 h1:tfuBGBXKqDEevZMzYi5KSi8KkcZtzBcTgAUUtapy0OI=
github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572/go.mod h1:9Pwr4B2jHnOSGXyyzV8ROjYa2ojvAY6HCGYYfMoC3Ls=
github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg=
github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/pprof v0.0.0-20240402174815-29b9bb013b0f h1:f00RU+zOX+B3rLAmMMkzHUF2h1z4DeYR9tTCvEq2REY=
github.com/google/pprof v0.0.0-20240402174815-29b9bb013b0f/go.mod h1:kf6iHlnVGwgKolg33glAes7Yg/8iWP8ukqeldJSO7jw=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg= github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg=
github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/nxadm/tail v1.4.11 h1:8feyoE3OzPrcshW5/MJ4sGESc5cqmGkGCWlco4l0bqY=
github.com/nxadm/tail v1.4.11/go.mod h1:OTaG3NK980DZzxbRq6lEuzgU+mug70nY11sMd4JXXHc=
github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE=
github.com/onsi/ginkgo v1.16.5/go.mod h1:+E8gABHa3K6zRBolWtd+ROzc/U5bkGt0FwiG042wbpU=
github.com/onsi/ginkgo/v2 v2.13.0 h1:0jY9lJquiL8fcf3M4LAXN5aMlS/b2BV86HFFPCPMgE4=
github.com/onsi/ginkgo/v2 v2.13.0/go.mod h1:TE309ZR8s5FsKKpuB1YAQYBzCaAfUgatB/xlT/ETL/o=
github.com/onsi/gomega v1.27.10 h1:naR28SdDFlqrG6kScpT8VWpu1xWY5nJRCF3XaYyBjhI=
github.com/onsi/gomega v1.27.10/go.mod h1:RsS8tutOdbdgzbPtzzATp12yT7kM5I5aElG3evPbQ0M=
github.com/philippseith/signalr v0.7.0 h1:bt8uusIKr3+hpQ5bRxEsHAM3VbEU15lve7e74Hf5a4E=
github.com/philippseith/signalr v0.7.0/go.mod h1:deqWw2+rPPcdUQVvzXoB6A6FQB3v1OCnroy8CiUJNkg=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/quic-go/qpack v0.5.1 h1:giqksBPnT/HDtZ6VhtFKgoLOWmlyo9Ei6u9PqzIMbhI=
github.com/quic-go/qpack v0.5.1/go.mod h1:+PC4XFrEskIVkcLzpEkbLqq1uCoxPhQuvK5rH1ZgaEg=
github.com/quic-go/quic-go v0.48.2 h1:wsKXZPeGWpMpCGSWqOcqpW2wZYic/8T3aqiOID0/KWE=
github.com/quic-go/quic-go v0.48.2/go.mod h1:yBgs3rWBOADpga7F+jJsb6Ybg1LSYiQvwWlLX+/6HMs=
github.com/quic-go/webtransport-go v0.8.1-0.20241018022711-4ac2c9250e66 h1:4WFk6u3sOT6pLa1kQ50ZVdm8BQFgJNA117cepZxtLIg=
github.com/quic-go/webtransport-go v0.8.1-0.20241018022711-4ac2c9250e66/go.mod h1:Vp72IJajgeOL6ddqrAhmp7IM9zbTcgkQxD/YdxrVwMw=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/teivah/onecontext v1.3.0 h1:tbikMhAlo6VhAuEGCvhc8HlTnpX4xTNPTOseWuhO1J0=
github.com/teivah/onecontext v1.3.0/go.mod h1:hoW1nmdPVK/0jrvGtcx8sCKYs2PiS4z0zzfdeuEVyb0=
github.com/vmihailenco/msgpack/v5 v5.4.1 h1:cQriyiUvjTwOHg8QZaPihLWeRAAVoCpE00IUPn0Bjt8=
github.com/vmihailenco/msgpack/v5 v5.4.1/go.mod h1:GaZTsDaehaPpQVyxrf5mtQlH+pc21PIudVV/E3rRQok=
github.com/vmihailenco/tagparser/v2 v2.0.0 h1:y09buUbR+b5aycVFQs/g70pqKVZNBmxwAhO7/IwNM9g=
github.com/vmihailenco/tagparser/v2 v2.0.0/go.mod h1:Wri+At7QHww0WTrCBeu4J6bNtoV6mEfg5OIWRZA9qds=
go.uber.org/goleak v1.1.10 h1:z+mqJhf6ss6BSfSM671tgKyZBFPTTJM+HLxnhPC3wu0=
go.uber.org/goleak v1.1.10/go.mod h1:8a7PlsEVH3e/a/GLqe5IIrQx6GzcnRmZEufDUTk4A7A=
go.uber.org/mock v0.4.0 h1:VcM4ZOtdbR4f6VXfiOpwpVJDL6lCReaZ6mw31wqh7KU=
go.uber.org/mock v0.4.0/go.mod h1:a6FSlNadKUHUa9IP5Vyt1zh4fC7uAwxMutEAscFbkZc=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.36.0 h1:AnAEvhDddvBdpY+uR+MyHmuZzzNqXSe/GvuDeob5L34=
golang.org/x/crypto v0.36.0/go.mod h1:Y4J0ReaxCR1IMaabaSMugxJES1EpwhBHhv2bDHklZvc=
golang.org/x/exp v0.0.0-20240506185415-9bf2ced13842 h1:vr/HnozRka3pE4EsMEg1lgkXJkTFJCVUX+S/ZT6wYzM=
golang.org/x/exp v0.0.0-20240506185415-9bf2ced13842/go.mod h1:XtvwrStGgqGPLc4cjQfWqZHG1YFdYs6swckp8vpsjnc=
golang.org/x/lint v0.0.0-20190930215403-16217165b5de h1:5hukYrvBGR8/eNkX5mdUezrA6JiaEZDtJb9Ei+1LlBs=
golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
golang.org/x/mod v0.17.0 h1:zY54UmvipHiNd+pm+m0x9KhZ9hl1/7QNMyxXbc6ICqA=
golang.org/x/mod v0.17.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c=
golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.38.0 h1:vRMAPTMaeGqVhG5QyLJHqNDwecKTomGeqbnfZyKlBI8=
golang.org/x/net v0.38.0/go.mod h1:ivrbrMbzFq5J41QOQh0siUuly180yBYtLp+CKbEaFx8=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.12.0 h1:MHc5BpPuC30uJk597Ri8TV3CNZcTLu6B6z4lJy+g6Jw=
golang.org/x/sync v0.12.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.31.0 h1:ioabZlmFYtWhL+TRYpcnNlLwhyxaM9kWTDEmfnprqik=
golang.org/x/sys v0.31.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.23.0 h1:D71I7dUrlY+VX0gQShAThNGHFxZ13dGLBHQLVl1mJlY=
golang.org/x/text v0.23.0/go.mod h1:/BLNzu4aZCJ1+kcD0DNRotWKage4q2rGVAg4o22unh4=
golang.org/x/time v0.5.0 h1:o7cqy6amK/52YcAKIPlM3a+Fpj35zvRj2TP+e1xFSfk=
golang.org/x/time v0.5.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM=
golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
golang.org/x/tools v0.0.0-20191108193012-7d206e10da11/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d h1:vU5i/LfpvrRCpgM/VPfJLg5KjxD3E+hfT1SH+d9zLwg=
golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d/go.mod h1:aiJjzUbINMkxbQROHiO6hDPo2LHcIPhhQsa9DLh0yGk=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/protobuf v1.33.0 h1:uNO2rsAINq/JlFpSdYEKIZ0uKD/R9cpdv0T+yoGwGmI=
google.golang.org/protobuf v1.33.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ=
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

View File

@ -1,99 +0,0 @@
package serverevents
import (
gocontext "context"
"fmt"
di "git.apihub24.de/admin/generic-di"
"github.com/coder/websocket"
"github.com/coder/websocket/wsjson"
"sync"
"time"
)
func init() {
di.Injectable(newHub)
}
type IEventHub interface {
SetContextLifetime(duration time.Duration)
OnConnected(connectionId string, conn *websocket.Conn)
OnDisconnected(connectionId string)
Dispatch(event Event, callerCtx IContext)
Event(event Event, id string)
}
type eventHub struct {
mu sync.Mutex
contextLifetime time.Duration
contexts map[string]IContext
registration IEventHandlerRegistration
parser IMessageParser
}
func newHub() IEventHub {
return &eventHub{
mu: sync.Mutex{},
contextLifetime: 500 * time.Second,
contexts: make(map[string]IContext),
registration: di.Inject[IEventHandlerRegistration](),
parser: di.Inject[IMessageParser](),
}
}
func (hub *eventHub) Dispatch(event Event, callerCtx IContext) {
// dispatch Event to Handler
handler, getHandlerErr := hub.registration.GetHandler(event.Type)
if getHandlerErr == nil && handler.CanExecute(callerCtx) {
handler.Handle(callerCtx, event.Data)
}
if event.IsBackendOnly || event.Filter == nil {
// only dispatch Event on Backend or we have no Context Filter
// if you want to dispatch to all Clients use Filter that returns true
return
}
writeCtx, cancel := gocontext.WithTimeout(gocontext.Background(), time.Second*10)
defer cancel()
for _, ctx := range hub.contexts {
if event.Filter(ctx) {
writeErr := wsjson.Write(writeCtx, ctx.GetConnection(), event)
if writeErr != nil {
println(fmt.Sprintf("[Error on write to Websocket]: %v", writeErr))
}
}
}
}
func (hub *eventHub) SetContextLifetime(duration time.Duration) {
hub.contextLifetime = duration
}
func (hub *eventHub) Event(event Event, id string) {
callerCtx, ok := hub.contexts[id]
if !ok {
println(fmt.Sprintf("[Error on select caller context]: missing context connectionId: %s", id))
return
}
// Event from Client not send back!
event.IsBackendOnly = true
hub.Dispatch(event, callerCtx)
}
func (hub *eventHub) OnConnected(connectionId string, conn *websocket.Conn) {
hub.mu.Lock()
defer hub.mu.Unlock()
hub.contexts[connectionId] = di.Inject[IContext](connectionId)
hub.contexts[connectionId].SetId(connectionId)
hub.contexts[connectionId].SetConnection(conn)
}
func (hub *eventHub) OnDisconnected(connectionId string) {
hub.mu.Lock()
defer hub.mu.Unlock()
hub.contexts[connectionId].CleanupIn(hub.contextLifetime)
delete(hub.contexts, connectionId)
}

View File

@ -20,14 +20,15 @@ type greeting struct {
type greeterEventHandler struct{} type greeterEventHandler struct{}
func (handler greeterEventHandler) CanExecute(_ serverevents.IContext) bool { func (handler greeterEventHandler) CanExecute(context serverevents.IContext) bool {
return true _, ok := serverevents.MetadataAs[string](context, "UserName")
return ok
} }
func (handler greeterEventHandler) Handle(context serverevents.IContext, data any) { func (handler greeterEventHandler) Handle(context serverevents.IContext, data any) {
userName, ok := serverevents.ValueAs[string](data) userName, ok := serverevents.ValueAs[string](data)
if !ok || len(userName) < 1 { if !ok {
userName = "Anonymous" userName, _ = serverevents.MetadataAs[string](context, "UserName")
} }
context.Dispatch("greet", greeting{ context.Dispatch("greet", greeting{
Message: fmt.Sprintf("Hello, %s", userName), Message: fmt.Sprintf("Hello, %s", userName),

View File

@ -1,61 +0,0 @@
package main
var Index = `<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Test</title>
<script src="https://cdnjs.cloudflare.com/ajax/libs/microsoft-signalr/8.0.7/signalr.min.js" integrity="sha512-7SRCYIJtR6F8ocwW7UxW6wGKqbSyqREDbfCORCbGLatU0iugBLwyOXpzhkPyHIFdBO0K2VCu57fvP2Twgx1o2A==" crossorigin="anonymous" referrerpolicy="no-referrer"></script>
</head>
<body>
<div>
<div>
<input type="text" id="name" />
<button id="greet">Greet</button>
<button id="ping">Ping</button>
</div>
<ul id="events"></ul>
</div>
<script>
(async function () {
let ws;
function connect() {
if (!localStorage.getItem('client_id')) {
localStorage.setItem('client_id', crypto.randomUUID());
}
ws = new WebSocket('/events?id=' + localStorage.getItem('client_id'));
ws.addEventListener('open', function () {
wsOpen = true;
});
ws.addEventListener('message', function (message) {
const event = JSON.parse(message.data);
if (!event.type) {
return;
}
const eventList = document.getElementById('events');
const li = document.createElement('li');
li.innerText = event.type + ': ' + (event.data ? JSON.stringify(event.data) : 'null');
eventList.append(li);
});
ws.onclose = function () {
setTimeout(() => connect(), 5000);
};
}
document.getElementById('greet').onclick = function () {
const name = document.getElementById('name').value;
ws.send(JSON.stringify({
type: 'greet me',
data: name,
}));
};
document.getElementById('ping').onclick = function () {
ws.send(JSON.stringify({type: 'ping'}));
};
connect();
})();
</script>
</body>
</html>`

View File

@ -14,24 +14,12 @@ func main() {
eventRegistration := di.Inject[serverevents.IEventHandlerRegistration]() eventRegistration := di.Inject[serverevents.IEventHandlerRegistration]()
eventRegistration.Add("ping", events.NewPingEventHandler) eventRegistration.Add("ping", events.NewPingEventHandler)
eventRegistration.Add("greet me", events.NewGreeterEventHandler) eventRegistration.Add("greet all", events.NewGreeterEventHandler)
router := http.NewServeMux() serverEventsMiddleware.Use(serverevents.MiddlewareOptions{
router.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
if r.Method == "GET" {
w.Header().Set("Content-Type", "text/html")
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte(Index))
return
}
})
router = serverEventsMiddleware.Use(serverevents.MiddlewareOptions{
Path: "/events", Path: "/events",
ContextLifetime: 500 * time.Second, ContextLifetime: 500 * time.Second,
KeepAliveTime: 2 * time.Second, }, nil)
}, router)
log.Fatal(http.ListenAndServe(":8080", router)) log.Fatal(http.ListenAndServe(":8080", nil))
} }

View File

@ -1,12 +1,11 @@
package serverevents package serverevents
import ( import (
gocontext "context"
"fmt" "fmt"
di "git.apihub24.de/admin/generic-di" di "git.apihub24.de/admin/generic-di"
"github.com/coder/websocket" "github.com/gorilla/websocket"
"github.com/coder/websocket/wsjson"
"net/http" "net/http"
"sync"
"time" "time"
) )
@ -17,67 +16,152 @@ func init() {
type MiddlewareOptions struct { type MiddlewareOptions struct {
Path string Path string
ContextLifetime time.Duration ContextLifetime time.Duration
KeepAliveTime time.Duration
} }
type IMiddleware interface { type IMiddleware interface {
Use(options MiddlewareOptions, mux *http.ServeMux) *http.ServeMux Use(options MiddlewareOptions, mux *http.ServeMux)
} }
type serverEventsMiddleware struct { type serverEventsMiddleware struct {
hub IEventHub options MiddlewareOptions
upgrader websocket.Upgrader
streamSubscribers map[string]bool
emitter IEventEmitter
parser IMessageParser
registration IEventHandlerRegistration
mutex sync.Mutex
} }
func newServerEventsMiddleware() IMiddleware { func newServerEventsMiddleware() IMiddleware {
return &serverEventsMiddleware{ return &serverEventsMiddleware{
hub: di.Inject[IEventHub](), upgrader: websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool {
return true
},
},
streamSubscribers: make(map[string]bool),
emitter: di.Inject[IEventEmitter](),
parser: di.Inject[IMessageParser](),
registration: di.Inject[IEventHandlerRegistration](),
mutex: sync.Mutex{},
} }
} }
func (middleware *serverEventsMiddleware) Use(options MiddlewareOptions, muxer *http.ServeMux) *http.ServeMux { func (middleware *serverEventsMiddleware) Use(options MiddlewareOptions, muxer *http.ServeMux) {
if muxer == nil { middleware.options = options
muxer = http.NewServeMux() middleware.selectMethod(options.Path, func(w http.ResponseWriter, r *http.Request) {
middleware.handleEventStream(w, r)
}, muxer)
} }
eventHub := di.Inject[IEventHub]() func (middleware *serverEventsMiddleware) handleEventStream(w http.ResponseWriter, r *http.Request) {
muxer.HandleFunc(options.Path, func(w http.ResponseWriter, r *http.Request) { conn, err := middleware.upgrader.Upgrade(w, r, nil)
id := r.URL.Query().Get("id") if err != nil {
if len(id) < 1 { fmt.Println("Error upgrading:", err)
println(fmt.Sprintf("no id found in query websocket closed"))
return
}
c, connectionErr := websocket.Accept(w, r, nil)
if connectionErr != nil {
println(fmt.Sprintf("[Error on connect Websocket]: %v", connectionErr))
return return
} }
defer func() { defer func() {
println("Websocket disconnect") // do not handle Error
eventHub.OnDisconnected(id) _ = conn.Close()
_ = c.CloseNow()
}() }()
id := r.URL.Query().Get("id")
context := di.Inject[IContext](id)
context.SetId(id)
eventHub.OnConnected(id, c) // Locks the mutex before accessing 'middleware.streamSubscribers',
// to prevent race conditions when reading/writing to the map.
// This ensures that only one goroutine checks/registers at a time.
// defer can't be used here as the stream listens for messages in an infinite loop
middleware.mutex.Lock()
if _, ok := middleware.streamSubscribers[id]; !ok {
subscription := middleware.emitter.OnAll(func(ev Event) {
// trigger the Backend Handlers
handler, getHandlerErr := middleware.registration.GetHandler(ev.Type)
if getHandlerErr != nil {
println(fmt.Sprintf("no Handler found for Event %s", ev.Type))
} else {
if handler.CanExecute(context) {
handler.Handle(context, ev.Data)
}
}
if ev.IsBackendOnly || ev.Filter == nil || !ev.Filter(context) {
// the Event is Backend only or there is no socket (context) Filter
// to send to all Sockets deliver context Filter with returns bool
return
}
jsonData, jsonErr := middleware.parser.ToString(ev)
if jsonErr != nil {
println(fmt.Sprintf("Error parse event %s %s", jsonErr.Error(), ev.Type))
return
}
_ = conn.WriteMessage(websocket.TextMessage, []byte(jsonData))
})
defer func() {
// Blocks the mutex again, as this block also accesses 'middleware.streamSubscribers'.
// This is crucial to avoid race conditions when removing entries,
// while new connections may be established.
// defer cannot be used here either
middleware.mutex.Lock()
subscription.Unsubscribe()
delete(middleware.streamSubscribers, id)
// starts the Cleanup Process
context.CleanupIn(middleware.options.ContextLifetime)
middleware.mutex.Unlock()
}()
middleware.streamSubscribers[id] = true
}
middleware.mutex.Unlock()
ctx := r.Context()
for { for {
err := readMessage(c, eventHub, id) messageType, data, err := conn.ReadMessage()
if err != nil { if err != nil {
return return
} }
if middleware.HandleMessage(conn, messageType, data) {
return
}
select {
case <-ctx.Done():
return
}
} }
})
return muxer
} }
func readMessage(c *websocket.Conn, eventHub IEventHub, id string) error { func (middleware *serverEventsMiddleware) selectMethod(path string, todo http.HandlerFunc, muxer *http.ServeMux) {
var message Event if muxer == nil {
readErr := wsjson.Read(gocontext.Background(), c, &message) http.HandleFunc(path, todo)
if readErr != nil { } else {
println(fmt.Sprintf("[Error on read from Websocket]: %v", readErr)) muxer.HandleFunc(path, todo)
return readErr }
} }
eventHub.Event(message, id) func (middleware *serverEventsMiddleware) HandleMessage(conn *websocket.Conn, messageType int, data []byte) bool {
return nil switch messageType {
case websocket.PingMessage:
pongErr := conn.WriteMessage(websocket.PongMessage, []byte{})
if pongErr != nil {
println(fmt.Sprintf("error on send PongMessage: %s", pongErr.Error()))
}
break
case websocket.CloseMessage:
// return true to close the Websocket
return true
case websocket.TextMessage:
ev, parseErr := middleware.parser.FromString(string(data))
if parseErr != nil {
println(fmt.Sprintf("error on parse Event: %s data: %s", parseErr.Error(), string(data)))
return false
}
// Event was dispatched over the Websocket so not send it back to Client!
ev.IsBackendOnly = true
middleware.emitter.Emit(ev)
break
case websocket.BinaryMessage:
println(fmt.Sprintf("BinaryMessages are not supported"))
break
}
return false
} }

12
v2/subscription.go Normal file
View File

@ -0,0 +1,12 @@
package serverevents
type Subscription struct {
id string
eventType string
todo func(Event)
emitter IEventEmitter
}
func (subscription Subscription) Unsubscribe() {
subscription.emitter.Unsubscribe(subscription)
}