diff --git a/pkg/events/events.go b/pkg/events/events.go index 57b02dc37..30c26ea99 100644 --- a/pkg/events/events.go +++ b/pkg/events/events.go @@ -139,6 +139,66 @@ func InitEvents() (err error) { return router.Run(context.Background()) } +// InitEventsForTesting sets up the event system like InitEvents but accepts a +// context so the watermill router can be shut down cleanly in tests. +// It starts the router in a background goroutine and returns a channel that is +// closed once the router is ready to accept messages. +func InitEventsForTesting(ctx context.Context) (<-chan struct{}, error) { + logger := log.NewWatermillLogger(config.LogEnabled.GetBool(), config.LogEvents.GetString(), config.LogEventsLevel.GetString(), config.LogFormat.GetString()) + + router, err := message.NewRouter( + message.RouterConfig{}, + logger, + ) + if err != nil { + return nil, err + } + + pubsub = gochannel.NewGoChannel( + gochannel.Config{ + OutputChannelBuffer: 1024, + }, + logger, + ) + + // No prometheus metrics in tests — avoids duplicate registration panics + // No poison queue — keep test output clean, let errors surface directly + + handlerTracker := func(h message.HandlerFunc) message.HandlerFunc { + return func(msg *message.Message) ([]*message.Message, error) { + activeHandlers.Add(1) + defer activeHandlers.Done() + return h(msg) + } + } + + router.AddMiddleware( + handlerTracker, + middleware.Retry{ + MaxRetries: 3, + InitialInterval: time.Millisecond * 50, + MaxInterval: time.Second, + Multiplier: 2, + Logger: logger, + }.Middleware, + middleware.Recoverer, + ) + + for topic, funcs := range listeners { + for _, handler := range funcs { + router.AddConsumerHandler(topic+"."+handler.Name(), topic, pubsub, handler.Handle) + } + } + + ready := router.Running() + go func() { + if err := router.Run(ctx); err != nil { + log.Errorf("Event system error: %s", err) + } + }() + return ready, nil +} + // Dispatch dispatches an event func Dispatch(event Event) error { if isUnderTest { @@ -146,6 +206,10 @@ func Dispatch(event Event) error { return nil } + if pubsub == nil { + return fmt.Errorf("event system not initialized: call InitEvents or InitEventsForTesting before dispatching") + } + content, err := json.Marshal(event) if err != nil { return err diff --git a/pkg/events/testing.go b/pkg/events/testing.go index b422e7c20..2c969f057 100644 --- a/pkg/events/testing.go +++ b/pkg/events/testing.go @@ -39,6 +39,14 @@ func Fake() { dispatchedTestEvents = nil } +// Unfake disables "test mode" so that events are dispatched through the real +// watermill pub/sub instead of being recorded. Call this after Fake() when you +// need the full event pipeline (e.g. in end-to-end tests). +func Unfake() { + isUnderTest = false + dispatchedTestEvents = nil +} + // AssertDispatched asserts an event has been dispatched. func AssertDispatched(t *testing.T, event Event) { var found bool