feat(events): add DispatchOnCommit/DispatchPending for deferred event dispatch
Events dispatched inside model methods run before the transaction commits, causing listeners (especially webhooks) that open new sessions to read stale data. These new functions allow accumulating events during a transaction and dispatching them only after commit. Refs #2315
This commit is contained in:
parent
9a4a2eb184
commit
564573bdd5
|
|
@ -154,3 +154,47 @@ func Dispatch(event Event) error {
|
|||
msg := message.NewMessage(watermill.NewUUID(), content)
|
||||
return pubsub.Publish(event.Name(), msg)
|
||||
}
|
||||
|
||||
// pendingEventQueue holds the pending events and a mutex for thread-safe access
|
||||
type pendingEventQueue struct {
|
||||
mu sync.Mutex
|
||||
events []Event
|
||||
}
|
||||
|
||||
var pendingEvents sync.Map // map[any]*pendingEventQueue
|
||||
|
||||
// DispatchOnCommit stores an event to be dispatched later, after a transaction commits.
|
||||
// The key should be the *xorm.Session pointer associated with the transaction.
|
||||
// Call DispatchPending(key) after s.Commit() to actually dispatch the events.
|
||||
// Call CleanupPending(key) on rollback to discard them.
|
||||
func DispatchOnCommit(key any, event Event) {
|
||||
val, _ := pendingEvents.LoadOrStore(key, &pendingEventQueue{})
|
||||
queue := val.(*pendingEventQueue)
|
||||
queue.mu.Lock()
|
||||
queue.events = append(queue.events, event)
|
||||
queue.mu.Unlock()
|
||||
}
|
||||
|
||||
// DispatchPending dispatches all events accumulated for the given key and removes them.
|
||||
// Call this after s.Commit(). Safe to call even if no events were registered.
|
||||
// If any event fails to dispatch, the error is logged but remaining events are still dispatched.
|
||||
func DispatchPending(key any) {
|
||||
val, ok := pendingEvents.LoadAndDelete(key)
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
queue := val.(*pendingEventQueue)
|
||||
// No need to lock here since we've already removed it from the map
|
||||
// and this key won't receive new events
|
||||
for _, event := range queue.events {
|
||||
if err := Dispatch(event); err != nil {
|
||||
log.Errorf("Failed to dispatch event %s: %v", event.Name(), err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// CleanupPending discards all pending events for the given key without dispatching.
|
||||
// Call this when a transaction is rolled back.
|
||||
func CleanupPending(key any) {
|
||||
pendingEvents.Delete(key)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -0,0 +1,92 @@
|
|||
// Vikunja is a to-do list application to facilitate your life.
|
||||
// Copyright 2018-present Vikunja and contributors. All rights reserved.
|
||||
//
|
||||
// This program is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU Affero General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
//
|
||||
// This program is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU Affero General Public License for more details.
|
||||
//
|
||||
// You should have received a copy of the GNU Affero General Public License
|
||||
// along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
|
||||
package events
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
type testEvent struct{}
|
||||
|
||||
func (t *testEvent) Name() string {
|
||||
return "test.event"
|
||||
}
|
||||
|
||||
func TestDispatchOnCommit(t *testing.T) {
|
||||
Fake()
|
||||
|
||||
// Use a simple key (in real use this is a *xorm.Session)
|
||||
key := new(int)
|
||||
|
||||
DispatchOnCommit(key, &testEvent{})
|
||||
|
||||
// Event should NOT be dispatched yet
|
||||
assert.Equal(t, 0, CountDispatchedEvents("test.event"))
|
||||
|
||||
// Simulate post-commit dispatch
|
||||
DispatchPending(key)
|
||||
|
||||
// Now it should be dispatched
|
||||
assert.Equal(t, 1, CountDispatchedEvents("test.event"))
|
||||
}
|
||||
|
||||
func TestDispatchOnCommitMultipleEvents(t *testing.T) {
|
||||
Fake()
|
||||
|
||||
key := new(int)
|
||||
|
||||
DispatchOnCommit(key, &testEvent{})
|
||||
DispatchOnCommit(key, &testEvent{})
|
||||
DispatchOnCommit(key, &testEvent{})
|
||||
|
||||
assert.Equal(t, 0, CountDispatchedEvents("test.event"))
|
||||
|
||||
DispatchPending(key)
|
||||
|
||||
assert.Equal(t, 3, CountDispatchedEvents("test.event"))
|
||||
}
|
||||
|
||||
func TestCleanupPending(t *testing.T) {
|
||||
Fake()
|
||||
|
||||
key := new(int)
|
||||
|
||||
DispatchOnCommit(key, &testEvent{})
|
||||
DispatchOnCommit(key, &testEvent{})
|
||||
|
||||
// Simulate rollback — discard events
|
||||
CleanupPending(key)
|
||||
|
||||
// Dispatching after cleanup should be a no-op
|
||||
DispatchPending(key)
|
||||
|
||||
assert.Equal(t, 0, CountDispatchedEvents("test.event"))
|
||||
}
|
||||
|
||||
func TestDispatchPendingNoEvents(t *testing.T) {
|
||||
Fake()
|
||||
|
||||
key := new(int)
|
||||
|
||||
// Should be a no-op
|
||||
DispatchPending(key)
|
||||
|
||||
// Verify no events were dispatched
|
||||
assert.Equal(t, 0, CountDispatchedEvents("test.event"))
|
||||
}
|
||||
Loading…
Reference in New Issue