refactor(events): pass context to DispatchPending directly
Every DispatchPending caller either has the request context in scope or is genuinely request-less, so passing it as a parameter replaces the stored-context mechanism on the pending queue and satisfies contextcheck. Also fixes lint findings in the audit package.
This commit is contained in:
parent
fc831719cd
commit
9da51f5096
|
|
@ -136,10 +136,10 @@ func startEventRouter(t *testing.T) {
|
|||
<-ready
|
||||
}
|
||||
|
||||
func waitForLines(t *testing.T, logfile string, count int) []string {
|
||||
func waitForLines(t *testing.T, logfile string) []string {
|
||||
t.Helper()
|
||||
var lines []string
|
||||
require.Eventuallyf(t, func() bool {
|
||||
require.Eventually(t, func() bool {
|
||||
content, err := os.ReadFile(logfile)
|
||||
if err != nil {
|
||||
return false
|
||||
|
|
@ -148,8 +148,8 @@ func waitForLines(t *testing.T, logfile string, count int) []string {
|
|||
if len(lines) == 1 && lines[0] == "" {
|
||||
lines = nil
|
||||
}
|
||||
return len(lines) >= count
|
||||
}, 5*time.Second, 10*time.Millisecond, "expected %d audit log lines", count)
|
||||
return len(lines) >= 1
|
||||
}, 5*time.Second, 10*time.Millisecond, "expected at least one audit log line")
|
||||
return lines
|
||||
}
|
||||
|
||||
|
|
@ -168,7 +168,7 @@ func TestAuditPipeline(t *testing.T) {
|
|||
})
|
||||
require.NoError(t, events.DispatchWithContext(ctx, &pipelineEvent{TaskID: 99, DoerID: 7}))
|
||||
|
||||
lines := waitForLines(t, logfile, 1)
|
||||
waitForLines(t, logfile)
|
||||
select {
|
||||
case <-other.called:
|
||||
case <-time.After(5 * time.Second):
|
||||
|
|
@ -176,7 +176,7 @@ func TestAuditPipeline(t *testing.T) {
|
|||
}
|
||||
// A topic with multiple listeners must produce exactly one audit entry.
|
||||
events.WaitForPendingHandlers()
|
||||
lines = waitForLines(t, logfile, 1)
|
||||
lines := waitForLines(t, logfile)
|
||||
require.Len(t, lines, 1)
|
||||
|
||||
var entry audit.Entry
|
||||
|
|
@ -214,7 +214,7 @@ func TestAuditLicenseGating(t *testing.T) {
|
|||
t.Cleanup(license.ResetForTests)
|
||||
require.NoError(t, events.Dispatch(&licenseGateEvent{Marker: "licensed"}))
|
||||
|
||||
lines := waitForLines(t, logfile, 1)
|
||||
lines := waitForLines(t, logfile)
|
||||
require.Len(t, lines, 1)
|
||||
assert.Contains(t, lines[0], `"marker":"licensed"`)
|
||||
assert.NotContains(t, lines[0], "unlicensed")
|
||||
|
|
@ -237,9 +237,9 @@ func TestAuditRotation(t *testing.T) {
|
|||
|
||||
filler := strings.Repeat("x", 600*1024)
|
||||
require.NoError(t, events.Dispatch(&rotationEvent{Filler: filler}))
|
||||
waitForLines(t, logfile, 1)
|
||||
waitForLines(t, logfile)
|
||||
require.NoError(t, events.Dispatch(&rotationEvent{Filler: filler}))
|
||||
waitForLines(t, logfile, 1)
|
||||
waitForLines(t, logfile)
|
||||
|
||||
require.Eventually(t, func() bool {
|
||||
rotated, err := filepath.Glob(strings.TrimSuffix(logfile, ".log") + "-*.log")
|
||||
|
|
|
|||
|
|
@ -73,9 +73,9 @@ const (
|
|||
ActionLoginSucceeded = "auth.login.succeeded"
|
||||
ActionLoginFailed = "auth.login.failed"
|
||||
ActionLogout = "auth.logout"
|
||||
ActionAPITokenIssued = "auth.api_token.issued"
|
||||
ActionAPITokenRevoked = "auth.api_token.revoked"
|
||||
ActionAPITokenUsed = "auth.api_token.used"
|
||||
ActionAPITokenIssued = "auth.api_token.issued" // #nosec G101 -- action identifier, not a credential
|
||||
ActionAPITokenRevoked = "auth.api_token.revoked" // #nosec G101
|
||||
ActionAPITokenUsed = "auth.api_token.used" // #nosec G101
|
||||
|
||||
ActionUserCreated = "user.created"
|
||||
|
||||
|
|
|
|||
|
|
@ -240,24 +240,11 @@ func DispatchWithContext(ctx context.Context, event Event) error {
|
|||
// pendingEventQueue holds the pending events and a mutex for thread-safe access
|
||||
type pendingEventQueue struct {
|
||||
mu sync.Mutex
|
||||
ctx context.Context
|
||||
events []Event
|
||||
}
|
||||
|
||||
var pendingEvents sync.Map // map[any]*pendingEventQueue
|
||||
|
||||
// SetContextForKey associates a request context with a transaction key so that
|
||||
// events queued via DispatchOnCommit for the same key are dispatched with the
|
||||
// request metadata from that context. The entry is removed by DispatchPending
|
||||
// or CleanupPending — callers must guarantee one of them runs for the key.
|
||||
func SetContextForKey(key any, ctx context.Context) {
|
||||
val, _ := pendingEvents.LoadOrStore(key, &pendingEventQueue{})
|
||||
queue := val.(*pendingEventQueue)
|
||||
queue.mu.Lock()
|
||||
queue.ctx = ctx
|
||||
queue.mu.Unlock()
|
||||
}
|
||||
|
||||
// DispatchOnCommit stores an event to be dispatched later, after a transaction commits.
|
||||
// The key should be the *xorm.Session pointer associated with the transaction.
|
||||
// Call DispatchPending(key) after s.Commit() to actually dispatch the events.
|
||||
|
|
@ -272,8 +259,9 @@ func DispatchOnCommit(key any, event Event) {
|
|||
|
||||
// DispatchPending dispatches all events accumulated for the given key and removes them.
|
||||
// Call this after s.Commit(). Safe to call even if no events were registered.
|
||||
// Request metadata on the context (see WithRequestMeta) is copied onto each message.
|
||||
// If any event fails to dispatch, the error is logged but remaining events are still dispatched.
|
||||
func DispatchPending(key any) {
|
||||
func DispatchPending(ctx context.Context, key any) {
|
||||
val, ok := pendingEvents.LoadAndDelete(key)
|
||||
if !ok {
|
||||
return
|
||||
|
|
@ -281,10 +269,6 @@ func DispatchPending(key any) {
|
|||
queue := val.(*pendingEventQueue)
|
||||
// No need to lock here since we've already removed it from the map
|
||||
// and this key won't receive new events
|
||||
ctx := queue.ctx
|
||||
if ctx == nil {
|
||||
ctx = context.Background()
|
||||
}
|
||||
for _, event := range queue.events {
|
||||
if err := DispatchWithContext(ctx, event); err != nil {
|
||||
log.Errorf("Failed to dispatch event %s: %v", event.Name(), err)
|
||||
|
|
|
|||
|
|
@ -17,6 +17,7 @@
|
|||
package events
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
|
|
@ -40,7 +41,7 @@ func TestDispatchOnCommit(t *testing.T) {
|
|||
assert.Equal(t, 0, CountDispatchedEvents("test.event"))
|
||||
|
||||
// Simulate post-commit dispatch
|
||||
DispatchPending(key)
|
||||
DispatchPending(context.Background(), key)
|
||||
|
||||
// Now it should be dispatched
|
||||
assert.Equal(t, 1, CountDispatchedEvents("test.event"))
|
||||
|
|
@ -57,7 +58,7 @@ func TestDispatchOnCommitMultipleEvents(t *testing.T) {
|
|||
|
||||
assert.Equal(t, 0, CountDispatchedEvents("test.event"))
|
||||
|
||||
DispatchPending(key)
|
||||
DispatchPending(context.Background(), key)
|
||||
|
||||
assert.Equal(t, 3, CountDispatchedEvents("test.event"))
|
||||
}
|
||||
|
|
@ -74,7 +75,7 @@ func TestCleanupPending(t *testing.T) {
|
|||
CleanupPending(key)
|
||||
|
||||
// Dispatching after cleanup should be a no-op
|
||||
DispatchPending(key)
|
||||
DispatchPending(context.Background(), key)
|
||||
|
||||
assert.Equal(t, 0, CountDispatchedEvents("test.event"))
|
||||
}
|
||||
|
|
@ -85,7 +86,7 @@ func TestDispatchPendingNoEvents(t *testing.T) {
|
|||
key := new(int)
|
||||
|
||||
// Should be a no-op
|
||||
DispatchPending(key)
|
||||
DispatchPending(context.Background(), key)
|
||||
|
||||
// Verify no events were dispatched
|
||||
assert.Equal(t, 0, CountDispatchedEvents("test.event"))
|
||||
|
|
|
|||
|
|
@ -17,6 +17,7 @@
|
|||
package models
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"testing"
|
||||
|
||||
|
|
@ -45,7 +46,7 @@ func TestTaskComment_Create(t *testing.T) {
|
|||
assert.Equal(t, int64(1), tc.Author.ID)
|
||||
err = s.Commit()
|
||||
require.NoError(t, err)
|
||||
events.DispatchPending(s)
|
||||
events.DispatchPending(context.Background(), s)
|
||||
events.AssertDispatched(t, &TaskCommentCreatedEvent{})
|
||||
|
||||
db.AssertExists(t, "task_comments", map[string]interface{}{
|
||||
|
|
|
|||
|
|
@ -17,6 +17,7 @@
|
|||
package models
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
|
|
@ -70,7 +71,7 @@ func TestTask_Create(t *testing.T) {
|
|||
"bucket_id": 1,
|
||||
}, false)
|
||||
|
||||
events.DispatchPending(s)
|
||||
events.DispatchPending(context.Background(), s)
|
||||
events.AssertDispatched(t, &TaskCreatedEvent{})
|
||||
})
|
||||
t.Run("with reminders", func(t *testing.T) {
|
||||
|
|
@ -280,7 +281,7 @@ func TestTask_Update(t *testing.T) {
|
|||
err = s.Commit()
|
||||
require.NoError(t, err)
|
||||
|
||||
events.DispatchPending(s)
|
||||
events.DispatchPending(context.Background(), s)
|
||||
// Verify exactly ONE task.updated event was dispatched
|
||||
count := events.CountDispatchedEvents("task.updated")
|
||||
assert.Equal(t, 1, count, "Expected exactly 1 task.updated event, got %d", count)
|
||||
|
|
|
|||
|
|
@ -17,6 +17,7 @@
|
|||
package models
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"testing"
|
||||
"time"
|
||||
|
|
@ -596,7 +597,7 @@ func TestTimeEntry_Events(t *testing.T) {
|
|||
te := &TimeEntry{TaskID: 1, StartTime: someStart, EndTime: someEnd}
|
||||
require.NoError(t, te.Create(s, u))
|
||||
require.NoError(t, s.Commit())
|
||||
events.DispatchPending(s)
|
||||
events.DispatchPending(context.Background(), s)
|
||||
events.AssertDispatched(t, &TimeEntryCreatedEvent{})
|
||||
})
|
||||
|
||||
|
|
@ -612,7 +613,7 @@ func TestTimeEntry_Events(t *testing.T) {
|
|||
require.True(t, can)
|
||||
require.NoError(t, te.Update(s, u))
|
||||
require.NoError(t, s.Commit())
|
||||
events.DispatchPending(s)
|
||||
events.DispatchPending(context.Background(), s)
|
||||
events.AssertDispatched(t, &TimeEntryUpdatedEvent{})
|
||||
})
|
||||
|
||||
|
|
@ -624,7 +625,7 @@ func TestTimeEntry_Events(t *testing.T) {
|
|||
|
||||
require.NoError(t, (&TimeEntry{ID: 1}).Delete(s, u))
|
||||
require.NoError(t, s.Commit())
|
||||
events.DispatchPending(s)
|
||||
events.DispatchPending(context.Background(), s)
|
||||
events.AssertDispatched(t, &TimeEntryDeletedEvent{})
|
||||
})
|
||||
|
||||
|
|
@ -637,7 +638,7 @@ func TestTimeEntry_Events(t *testing.T) {
|
|||
// entry 4 is user1's running timer; a new running timer auto-stops it
|
||||
require.NoError(t, (&TimeEntry{TaskID: 1}).Create(s, u))
|
||||
require.NoError(t, s.Commit())
|
||||
events.DispatchPending(s)
|
||||
events.DispatchPending(context.Background(), s)
|
||||
events.AssertDispatched(t, &TimeEntryCreatedEvent{})
|
||||
events.AssertDispatched(t, &TimeEntryUpdatedEvent{})
|
||||
})
|
||||
|
|
@ -651,7 +652,7 @@ func TestTimeEntry_Events(t *testing.T) {
|
|||
te := &TimeEntry{TaskID: 1, StartTime: someStart, EndTime: someEnd}
|
||||
require.NoError(t, te.Create(s, u))
|
||||
require.NoError(t, s.Commit())
|
||||
events.DispatchPending(s)
|
||||
events.DispatchPending(context.Background(), s)
|
||||
assert.Equal(t, 1, events.CountDispatchedEvents((&TimeEntryCreatedEvent{}).Name()))
|
||||
assert.Equal(t, 0, events.CountDispatchedEvents((&TimeEntryUpdatedEvent{}).Name()), "a completed manual entry must not auto-stop")
|
||||
})
|
||||
|
|
@ -665,7 +666,7 @@ func TestTimeEntry_Events(t *testing.T) {
|
|||
_, err := StopRunningTimer(s, u)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, s.Commit())
|
||||
events.DispatchPending(s)
|
||||
events.DispatchPending(context.Background(), s)
|
||||
events.AssertDispatched(t, &TimeEntryUpdatedEvent{})
|
||||
})
|
||||
}
|
||||
|
|
|
|||
|
|
@ -18,6 +18,7 @@ package migration
|
|||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
|
||||
"xorm.io/xorm"
|
||||
|
||||
|
|
@ -50,7 +51,7 @@ func InsertFromStructure(str []*models.ProjectWithTasksAndBuckets, user *user.Us
|
|||
return err
|
||||
}
|
||||
|
||||
events.DispatchPending(s)
|
||||
events.DispatchPending(context.Background(), s)
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -97,7 +97,7 @@ func RequestUserDataExport(c *echo.Context) error {
|
|||
return err
|
||||
}
|
||||
|
||||
events.DispatchPending(s)
|
||||
events.DispatchPending(c.Request().Context(), s)
|
||||
|
||||
return c.JSON(http.StatusOK, models.Message{Message: "Successfully requested data export. We will send you an email when it's ready."})
|
||||
}
|
||||
|
|
|
|||
|
|
@ -155,7 +155,7 @@ func timeEntriesTimerStop(ctx context.Context, _ *struct{}) (*singleBody[models.
|
|||
events.CleanupPending(s)
|
||||
return nil, translateDomainError(err)
|
||||
}
|
||||
events.DispatchPending(s)
|
||||
events.DispatchPending(ctx, s)
|
||||
return &singleBody[models.TimeEntry]{Body: entry}, nil
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -17,6 +17,7 @@
|
|||
package caldav
|
||||
|
||||
import (
|
||||
"context"
|
||||
"slices"
|
||||
"strconv"
|
||||
"strings"
|
||||
|
|
@ -396,7 +397,7 @@ func (vcls *VikunjaCaldavProjectStorage) CreateResource(rpath, content string) (
|
|||
return nil, err
|
||||
}
|
||||
|
||||
events.DispatchPending(s)
|
||||
events.DispatchPending(context.Background(), s)
|
||||
|
||||
// Build up the proper response
|
||||
rr := VikunjaProjectResourceAdapter{
|
||||
|
|
@ -473,7 +474,7 @@ func (vcls *VikunjaCaldavProjectStorage) UpdateResource(rpath, content string) (
|
|||
return nil, err
|
||||
}
|
||||
|
||||
events.DispatchPending(s)
|
||||
events.DispatchPending(context.Background(), s)
|
||||
|
||||
// Build up the proper response
|
||||
rr := VikunjaProjectResourceAdapter{
|
||||
|
|
@ -516,7 +517,7 @@ func (vcls *VikunjaCaldavProjectStorage) DeleteResource(_ string) error {
|
|||
return err
|
||||
}
|
||||
|
||||
events.DispatchPending(s)
|
||||
events.DispatchPending(context.Background(), s)
|
||||
}
|
||||
|
||||
return nil
|
||||
|
|
|
|||
|
|
@ -30,7 +30,6 @@ import (
|
|||
// Caller is responsible for body/path binding and validation before calling.
|
||||
func DoCreate(ctx context.Context, obj CObject, a web.Auth) error {
|
||||
s := db.NewSession()
|
||||
events.SetContextForKey(s, ctx)
|
||||
defer func() {
|
||||
if err := s.Close(); err != nil {
|
||||
log.Errorf("Could not close session: %s", err)
|
||||
|
|
@ -61,7 +60,7 @@ func DoCreate(ctx context.Context, obj CObject, a web.Auth) error {
|
|||
return err
|
||||
}
|
||||
|
||||
events.DispatchPending(s)
|
||||
events.DispatchPending(ctx, s)
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
@ -71,7 +70,6 @@ func DoCreate(ctx context.Context, obj CObject, a web.Auth) error {
|
|||
// header in the Echo wrapper; Huma wrapper may ignore it.
|
||||
func DoReadOne(ctx context.Context, obj CObject, a web.Auth) (maxPermission int, err error) {
|
||||
s := db.NewSession()
|
||||
events.SetContextForKey(s, ctx)
|
||||
defer func() {
|
||||
if cerr := s.Close(); cerr != nil {
|
||||
log.Errorf("Could not close session: %s", cerr)
|
||||
|
|
@ -102,7 +100,7 @@ func DoReadOne(ctx context.Context, obj CObject, a web.Auth) (maxPermission int,
|
|||
return 0, err
|
||||
}
|
||||
|
||||
events.DispatchPending(s)
|
||||
events.DispatchPending(ctx, s)
|
||||
return maxPermission, nil
|
||||
}
|
||||
|
||||
|
|
@ -112,7 +110,6 @@ func DoReadOne(ctx context.Context, obj CObject, a web.Auth) (maxPermission int,
|
|||
// nil-slice normalization remain the caller's responsibility.
|
||||
func DoReadAll(ctx context.Context, obj CObject, a web.Auth, search string, page, perPage int) (result any, resultCount int, total int64, err error) {
|
||||
s := db.NewSession()
|
||||
events.SetContextForKey(s, ctx)
|
||||
defer func() {
|
||||
if cerr := s.Close(); cerr != nil {
|
||||
log.Errorf("Could not close session: %s", cerr)
|
||||
|
|
@ -131,7 +128,7 @@ func DoReadAll(ctx context.Context, obj CObject, a web.Auth, search string, page
|
|||
return nil, 0, 0, err
|
||||
}
|
||||
|
||||
events.DispatchPending(s)
|
||||
events.DispatchPending(ctx, s)
|
||||
return result, resultCount, total, nil
|
||||
}
|
||||
|
||||
|
|
@ -140,7 +137,6 @@ func DoReadAll(ctx context.Context, obj CObject, a web.Auth, search string, page
|
|||
// and validation before calling.
|
||||
func DoUpdate(ctx context.Context, obj CObject, a web.Auth) error {
|
||||
s := db.NewSession()
|
||||
events.SetContextForKey(s, ctx)
|
||||
defer func() {
|
||||
if err := s.Close(); err != nil {
|
||||
log.Errorf("Could not close session: %s", err)
|
||||
|
|
@ -171,7 +167,7 @@ func DoUpdate(ctx context.Context, obj CObject, a web.Auth) error {
|
|||
return err
|
||||
}
|
||||
|
||||
events.DispatchPending(s)
|
||||
events.DispatchPending(ctx, s)
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
@ -180,7 +176,6 @@ func DoUpdate(ctx context.Context, obj CObject, a web.Auth) error {
|
|||
// calling.
|
||||
func DoDelete(ctx context.Context, obj CObject, a web.Auth) error {
|
||||
s := db.NewSession()
|
||||
events.SetContextForKey(s, ctx)
|
||||
defer func() {
|
||||
if err := s.Close(); err != nil {
|
||||
log.Errorf("Could not close session: %s", err)
|
||||
|
|
@ -211,6 +206,6 @@ func DoDelete(ctx context.Context, obj CObject, a web.Auth) error {
|
|||
return err
|
||||
}
|
||||
|
||||
events.DispatchPending(s)
|
||||
events.DispatchPending(ctx, s)
|
||||
return nil
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue