feat: add InitEventsForTesting and Unfake for real event dispatch in tests

InitEventsForTesting sets up a real Watermill GoChannel router with a
cancellable context, returning a readiness channel. Skips Prometheus
metrics and poison queue to avoid duplicate registration panics.

Unfake re-enables real event dispatch after test init helpers call Fake().

Also guards Dispatch against nil pubsub with a clear error message.
This commit is contained in:
kolaente 2026-03-05 12:28:58 +01:00
parent f5595f0ed2
commit 1b1e8e5b19
2 changed files with 72 additions and 0 deletions

View File

@ -139,6 +139,66 @@ func InitEvents() (err error) {
return router.Run(context.Background())
}
// InitEventsForTesting sets up the event system like InitEvents but accepts a
// context so the watermill router can be shut down cleanly in tests.
// It starts the router in a background goroutine and returns a channel that is
// closed once the router is ready to accept messages.
func InitEventsForTesting(ctx context.Context) (<-chan struct{}, error) {
logger := log.NewWatermillLogger(config.LogEnabled.GetBool(), config.LogEvents.GetString(), config.LogEventsLevel.GetString(), config.LogFormat.GetString())
router, err := message.NewRouter(
message.RouterConfig{},
logger,
)
if err != nil {
return nil, err
}
pubsub = gochannel.NewGoChannel(
gochannel.Config{
OutputChannelBuffer: 1024,
},
logger,
)
// No prometheus metrics in tests — avoids duplicate registration panics
// No poison queue — keep test output clean, let errors surface directly
handlerTracker := func(h message.HandlerFunc) message.HandlerFunc {
return func(msg *message.Message) ([]*message.Message, error) {
activeHandlers.Add(1)
defer activeHandlers.Done()
return h(msg)
}
}
router.AddMiddleware(
handlerTracker,
middleware.Retry{
MaxRetries: 3,
InitialInterval: time.Millisecond * 50,
MaxInterval: time.Second,
Multiplier: 2,
Logger: logger,
}.Middleware,
middleware.Recoverer,
)
for topic, funcs := range listeners {
for _, handler := range funcs {
router.AddConsumerHandler(topic+"."+handler.Name(), topic, pubsub, handler.Handle)
}
}
ready := router.Running()
go func() {
if err := router.Run(ctx); err != nil {
log.Errorf("Event system error: %s", err)
}
}()
return ready, nil
}
// Dispatch dispatches an event
func Dispatch(event Event) error {
if isUnderTest {
@ -146,6 +206,10 @@ func Dispatch(event Event) error {
return nil
}
if pubsub == nil {
return fmt.Errorf("event system not initialized: call InitEvents or InitEventsForTesting before dispatching")
}
content, err := json.Marshal(event)
if err != nil {
return err

View File

@ -39,6 +39,14 @@ func Fake() {
dispatchedTestEvents = nil
}
// Unfake disables "test mode" so that events are dispatched through the real
// watermill pub/sub instead of being recorded. Call this after Fake() when you
// need the full event pipeline (e.g. in end-to-end tests).
func Unfake() {
isUnderTest = false
dispatchedTestEvents = nil
}
// AssertDispatched asserts an event has been dispatched.
func AssertDispatched(t *testing.T, event Event) {
var found bool