diff --git a/pkg/events/events.go b/pkg/events/events.go index 4ca7be72d..57b02dc37 100644 --- a/pkg/events/events.go +++ b/pkg/events/events.go @@ -154,3 +154,47 @@ func Dispatch(event Event) error { msg := message.NewMessage(watermill.NewUUID(), content) return pubsub.Publish(event.Name(), msg) } + +// pendingEventQueue holds the pending events and a mutex for thread-safe access +type pendingEventQueue struct { + mu sync.Mutex + events []Event +} + +var pendingEvents sync.Map // map[any]*pendingEventQueue + +// DispatchOnCommit stores an event to be dispatched later, after a transaction commits. +// The key should be the *xorm.Session pointer associated with the transaction. +// Call DispatchPending(key) after s.Commit() to actually dispatch the events. +// Call CleanupPending(key) on rollback to discard them. +func DispatchOnCommit(key any, event Event) { + val, _ := pendingEvents.LoadOrStore(key, &pendingEventQueue{}) + queue := val.(*pendingEventQueue) + queue.mu.Lock() + queue.events = append(queue.events, event) + queue.mu.Unlock() +} + +// DispatchPending dispatches all events accumulated for the given key and removes them. +// Call this after s.Commit(). Safe to call even if no events were registered. +// If any event fails to dispatch, the error is logged but remaining events are still dispatched. +func DispatchPending(key any) { + val, ok := pendingEvents.LoadAndDelete(key) + if !ok { + return + } + queue := val.(*pendingEventQueue) + // No need to lock here since we've already removed it from the map + // and this key won't receive new events + for _, event := range queue.events { + if err := Dispatch(event); err != nil { + log.Errorf("Failed to dispatch event %s: %v", event.Name(), err) + } + } +} + +// CleanupPending discards all pending events for the given key without dispatching. +// Call this when a transaction is rolled back. +func CleanupPending(key any) { + pendingEvents.Delete(key) +} diff --git a/pkg/events/events_test.go b/pkg/events/events_test.go new file mode 100644 index 000000000..f78396a50 --- /dev/null +++ b/pkg/events/events_test.go @@ -0,0 +1,92 @@ +// Vikunja is a to-do list application to facilitate your life. +// Copyright 2018-present Vikunja and contributors. All rights reserved. +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package events + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +type testEvent struct{} + +func (t *testEvent) Name() string { + return "test.event" +} + +func TestDispatchOnCommit(t *testing.T) { + Fake() + + // Use a simple key (in real use this is a *xorm.Session) + key := new(int) + + DispatchOnCommit(key, &testEvent{}) + + // Event should NOT be dispatched yet + assert.Equal(t, 0, CountDispatchedEvents("test.event")) + + // Simulate post-commit dispatch + DispatchPending(key) + + // Now it should be dispatched + assert.Equal(t, 1, CountDispatchedEvents("test.event")) +} + +func TestDispatchOnCommitMultipleEvents(t *testing.T) { + Fake() + + key := new(int) + + DispatchOnCommit(key, &testEvent{}) + DispatchOnCommit(key, &testEvent{}) + DispatchOnCommit(key, &testEvent{}) + + assert.Equal(t, 0, CountDispatchedEvents("test.event")) + + DispatchPending(key) + + assert.Equal(t, 3, CountDispatchedEvents("test.event")) +} + +func TestCleanupPending(t *testing.T) { + Fake() + + key := new(int) + + DispatchOnCommit(key, &testEvent{}) + DispatchOnCommit(key, &testEvent{}) + + // Simulate rollback — discard events + CleanupPending(key) + + // Dispatching after cleanup should be a no-op + DispatchPending(key) + + assert.Equal(t, 0, CountDispatchedEvents("test.event")) +} + +func TestDispatchPendingNoEvents(t *testing.T) { + Fake() + + key := new(int) + + // Should be a no-op + DispatchPending(key) + + // Verify no events were dispatched + assert.Equal(t, 0, CountDispatchedEvents("test.event")) +}