From 126ea78dac0562d4bd5ae813e03da591b2d538da Mon Sep 17 00:00:00 2001 From: kolaente Date: Wed, 10 Jun 2026 21:23:29 +0200 Subject: [PATCH] refactor(events): pass context to DispatchPending directly Every DispatchPending caller either has the request context in scope or is genuinely request-less, so passing it as a parameter replaces the stored-context mechanism on the pending queue and satisfies contextcheck. Also fixes lint findings in the audit package. --- pkg/audit/audit_test.go | 18 ++++++++--------- pkg/audit/entry.go | 6 +++--- pkg/events/events.go | 20 ++----------------- pkg/events/events_test.go | 9 +++++---- pkg/models/task_comments_test.go | 3 ++- pkg/models/tasks_test.go | 5 +++-- pkg/models/time_tracking_test.go | 13 ++++++------ .../migration/create_from_structure.go | 3 ++- pkg/routes/api/v1/user_export.go | 2 +- pkg/routes/api/v2/time_entries.go | 2 +- pkg/routes/caldav/listStorageProvider.go | 7 ++++--- pkg/web/handler/core.go | 15 +++++--------- 12 files changed, 44 insertions(+), 59 deletions(-) diff --git a/pkg/audit/audit_test.go b/pkg/audit/audit_test.go index ef6ddc219..897ebe93c 100644 --- a/pkg/audit/audit_test.go +++ b/pkg/audit/audit_test.go @@ -136,10 +136,10 @@ func startEventRouter(t *testing.T) { <-ready } -func waitForLines(t *testing.T, logfile string, count int) []string { +func waitForLines(t *testing.T, logfile string) []string { t.Helper() var lines []string - require.Eventuallyf(t, func() bool { + require.Eventually(t, func() bool { content, err := os.ReadFile(logfile) if err != nil { return false @@ -148,8 +148,8 @@ func waitForLines(t *testing.T, logfile string, count int) []string { if len(lines) == 1 && lines[0] == "" { lines = nil } - return len(lines) >= count - }, 5*time.Second, 10*time.Millisecond, "expected %d audit log lines", count) + return len(lines) >= 1 + }, 5*time.Second, 10*time.Millisecond, "expected at least one audit log line") return lines } @@ -168,7 +168,7 @@ func TestAuditPipeline(t *testing.T) { }) require.NoError(t, events.DispatchWithContext(ctx, &pipelineEvent{TaskID: 99, DoerID: 7})) - lines := waitForLines(t, logfile, 1) + waitForLines(t, logfile) select { case <-other.called: case <-time.After(5 * time.Second): @@ -176,7 +176,7 @@ func TestAuditPipeline(t *testing.T) { } // A topic with multiple listeners must produce exactly one audit entry. events.WaitForPendingHandlers() - lines = waitForLines(t, logfile, 1) + lines := waitForLines(t, logfile) require.Len(t, lines, 1) var entry audit.Entry @@ -214,7 +214,7 @@ func TestAuditLicenseGating(t *testing.T) { t.Cleanup(license.ResetForTests) require.NoError(t, events.Dispatch(&licenseGateEvent{Marker: "licensed"})) - lines := waitForLines(t, logfile, 1) + lines := waitForLines(t, logfile) require.Len(t, lines, 1) assert.Contains(t, lines[0], `"marker":"licensed"`) assert.NotContains(t, lines[0], "unlicensed") @@ -237,9 +237,9 @@ func TestAuditRotation(t *testing.T) { filler := strings.Repeat("x", 600*1024) require.NoError(t, events.Dispatch(&rotationEvent{Filler: filler})) - waitForLines(t, logfile, 1) + waitForLines(t, logfile) require.NoError(t, events.Dispatch(&rotationEvent{Filler: filler})) - waitForLines(t, logfile, 1) + waitForLines(t, logfile) require.Eventually(t, func() bool { rotated, err := filepath.Glob(strings.TrimSuffix(logfile, ".log") + "-*.log") diff --git a/pkg/audit/entry.go b/pkg/audit/entry.go index e2ed91876..079629727 100644 --- a/pkg/audit/entry.go +++ b/pkg/audit/entry.go @@ -73,9 +73,9 @@ const ( ActionLoginSucceeded = "auth.login.succeeded" ActionLoginFailed = "auth.login.failed" ActionLogout = "auth.logout" - ActionAPITokenIssued = "auth.api_token.issued" - ActionAPITokenRevoked = "auth.api_token.revoked" - ActionAPITokenUsed = "auth.api_token.used" + ActionAPITokenIssued = "auth.api_token.issued" // #nosec G101 -- action identifier, not a credential + ActionAPITokenRevoked = "auth.api_token.revoked" // #nosec G101 + ActionAPITokenUsed = "auth.api_token.used" // #nosec G101 ActionUserCreated = "user.created" diff --git a/pkg/events/events.go b/pkg/events/events.go index 882de2bbb..5973b132d 100644 --- a/pkg/events/events.go +++ b/pkg/events/events.go @@ -240,24 +240,11 @@ func DispatchWithContext(ctx context.Context, event Event) error { // pendingEventQueue holds the pending events and a mutex for thread-safe access type pendingEventQueue struct { mu sync.Mutex - ctx context.Context events []Event } var pendingEvents sync.Map // map[any]*pendingEventQueue -// SetContextForKey associates a request context with a transaction key so that -// events queued via DispatchOnCommit for the same key are dispatched with the -// request metadata from that context. The entry is removed by DispatchPending -// or CleanupPending — callers must guarantee one of them runs for the key. -func SetContextForKey(key any, ctx context.Context) { - val, _ := pendingEvents.LoadOrStore(key, &pendingEventQueue{}) - queue := val.(*pendingEventQueue) - queue.mu.Lock() - queue.ctx = ctx - queue.mu.Unlock() -} - // DispatchOnCommit stores an event to be dispatched later, after a transaction commits. // The key should be the *xorm.Session pointer associated with the transaction. // Call DispatchPending(key) after s.Commit() to actually dispatch the events. @@ -272,8 +259,9 @@ func DispatchOnCommit(key any, event Event) { // DispatchPending dispatches all events accumulated for the given key and removes them. // Call this after s.Commit(). Safe to call even if no events were registered. +// Request metadata on the context (see WithRequestMeta) is copied onto each message. // If any event fails to dispatch, the error is logged but remaining events are still dispatched. -func DispatchPending(key any) { +func DispatchPending(ctx context.Context, key any) { val, ok := pendingEvents.LoadAndDelete(key) if !ok { return @@ -281,10 +269,6 @@ func DispatchPending(key any) { queue := val.(*pendingEventQueue) // No need to lock here since we've already removed it from the map // and this key won't receive new events - ctx := queue.ctx - if ctx == nil { - ctx = context.Background() - } for _, event := range queue.events { if err := DispatchWithContext(ctx, event); err != nil { log.Errorf("Failed to dispatch event %s: %v", event.Name(), err) diff --git a/pkg/events/events_test.go b/pkg/events/events_test.go index f78396a50..186d12f4a 100644 --- a/pkg/events/events_test.go +++ b/pkg/events/events_test.go @@ -17,6 +17,7 @@ package events import ( + "context" "testing" "github.com/stretchr/testify/assert" @@ -40,7 +41,7 @@ func TestDispatchOnCommit(t *testing.T) { assert.Equal(t, 0, CountDispatchedEvents("test.event")) // Simulate post-commit dispatch - DispatchPending(key) + DispatchPending(context.Background(), key) // Now it should be dispatched assert.Equal(t, 1, CountDispatchedEvents("test.event")) @@ -57,7 +58,7 @@ func TestDispatchOnCommitMultipleEvents(t *testing.T) { assert.Equal(t, 0, CountDispatchedEvents("test.event")) - DispatchPending(key) + DispatchPending(context.Background(), key) assert.Equal(t, 3, CountDispatchedEvents("test.event")) } @@ -74,7 +75,7 @@ func TestCleanupPending(t *testing.T) { CleanupPending(key) // Dispatching after cleanup should be a no-op - DispatchPending(key) + DispatchPending(context.Background(), key) assert.Equal(t, 0, CountDispatchedEvents("test.event")) } @@ -85,7 +86,7 @@ func TestDispatchPendingNoEvents(t *testing.T) { key := new(int) // Should be a no-op - DispatchPending(key) + DispatchPending(context.Background(), key) // Verify no events were dispatched assert.Equal(t, 0, CountDispatchedEvents("test.event")) diff --git a/pkg/models/task_comments_test.go b/pkg/models/task_comments_test.go index 61f8f6dc4..988dc4f27 100644 --- a/pkg/models/task_comments_test.go +++ b/pkg/models/task_comments_test.go @@ -17,6 +17,7 @@ package models import ( + "context" "fmt" "testing" @@ -45,7 +46,7 @@ func TestTaskComment_Create(t *testing.T) { assert.Equal(t, int64(1), tc.Author.ID) err = s.Commit() require.NoError(t, err) - events.DispatchPending(s) + events.DispatchPending(context.Background(), s) events.AssertDispatched(t, &TaskCommentCreatedEvent{}) db.AssertExists(t, "task_comments", map[string]interface{}{ diff --git a/pkg/models/tasks_test.go b/pkg/models/tasks_test.go index caa897740..7219b5ab8 100644 --- a/pkg/models/tasks_test.go +++ b/pkg/models/tasks_test.go @@ -17,6 +17,7 @@ package models import ( + "context" "testing" "time" @@ -70,7 +71,7 @@ func TestTask_Create(t *testing.T) { "bucket_id": 1, }, false) - events.DispatchPending(s) + events.DispatchPending(context.Background(), s) events.AssertDispatched(t, &TaskCreatedEvent{}) }) t.Run("with reminders", func(t *testing.T) { @@ -280,7 +281,7 @@ func TestTask_Update(t *testing.T) { err = s.Commit() require.NoError(t, err) - events.DispatchPending(s) + events.DispatchPending(context.Background(), s) // Verify exactly ONE task.updated event was dispatched count := events.CountDispatchedEvents("task.updated") assert.Equal(t, 1, count, "Expected exactly 1 task.updated event, got %d", count) diff --git a/pkg/models/time_tracking_test.go b/pkg/models/time_tracking_test.go index 91dc90ee6..6e5391d51 100644 --- a/pkg/models/time_tracking_test.go +++ b/pkg/models/time_tracking_test.go @@ -17,6 +17,7 @@ package models import ( + "context" "encoding/json" "testing" "time" @@ -596,7 +597,7 @@ func TestTimeEntry_Events(t *testing.T) { te := &TimeEntry{TaskID: 1, StartTime: someStart, EndTime: someEnd} require.NoError(t, te.Create(s, u)) require.NoError(t, s.Commit()) - events.DispatchPending(s) + events.DispatchPending(context.Background(), s) events.AssertDispatched(t, &TimeEntryCreatedEvent{}) }) @@ -612,7 +613,7 @@ func TestTimeEntry_Events(t *testing.T) { require.True(t, can) require.NoError(t, te.Update(s, u)) require.NoError(t, s.Commit()) - events.DispatchPending(s) + events.DispatchPending(context.Background(), s) events.AssertDispatched(t, &TimeEntryUpdatedEvent{}) }) @@ -624,7 +625,7 @@ func TestTimeEntry_Events(t *testing.T) { require.NoError(t, (&TimeEntry{ID: 1}).Delete(s, u)) require.NoError(t, s.Commit()) - events.DispatchPending(s) + events.DispatchPending(context.Background(), s) events.AssertDispatched(t, &TimeEntryDeletedEvent{}) }) @@ -637,7 +638,7 @@ func TestTimeEntry_Events(t *testing.T) { // entry 4 is user1's running timer; a new running timer auto-stops it require.NoError(t, (&TimeEntry{TaskID: 1}).Create(s, u)) require.NoError(t, s.Commit()) - events.DispatchPending(s) + events.DispatchPending(context.Background(), s) events.AssertDispatched(t, &TimeEntryCreatedEvent{}) events.AssertDispatched(t, &TimeEntryUpdatedEvent{}) }) @@ -651,7 +652,7 @@ func TestTimeEntry_Events(t *testing.T) { te := &TimeEntry{TaskID: 1, StartTime: someStart, EndTime: someEnd} require.NoError(t, te.Create(s, u)) require.NoError(t, s.Commit()) - events.DispatchPending(s) + events.DispatchPending(context.Background(), s) assert.Equal(t, 1, events.CountDispatchedEvents((&TimeEntryCreatedEvent{}).Name())) assert.Equal(t, 0, events.CountDispatchedEvents((&TimeEntryUpdatedEvent{}).Name()), "a completed manual entry must not auto-stop") }) @@ -665,7 +666,7 @@ func TestTimeEntry_Events(t *testing.T) { _, err := StopRunningTimer(s, u) require.NoError(t, err) require.NoError(t, s.Commit()) - events.DispatchPending(s) + events.DispatchPending(context.Background(), s) events.AssertDispatched(t, &TimeEntryUpdatedEvent{}) }) } diff --git a/pkg/modules/migration/create_from_structure.go b/pkg/modules/migration/create_from_structure.go index 0e9c9b942..d59dd6946 100644 --- a/pkg/modules/migration/create_from_structure.go +++ b/pkg/modules/migration/create_from_structure.go @@ -18,6 +18,7 @@ package migration import ( "bytes" + "context" "xorm.io/xorm" @@ -50,7 +51,7 @@ func InsertFromStructure(str []*models.ProjectWithTasksAndBuckets, user *user.Us return err } - events.DispatchPending(s) + events.DispatchPending(context.Background(), s) return nil } diff --git a/pkg/routes/api/v1/user_export.go b/pkg/routes/api/v1/user_export.go index 3c07c9ebc..6efc311c0 100644 --- a/pkg/routes/api/v1/user_export.go +++ b/pkg/routes/api/v1/user_export.go @@ -97,7 +97,7 @@ func RequestUserDataExport(c *echo.Context) error { return err } - events.DispatchPending(s) + events.DispatchPending(c.Request().Context(), s) return c.JSON(http.StatusOK, models.Message{Message: "Successfully requested data export. We will send you an email when it's ready."}) } diff --git a/pkg/routes/api/v2/time_entries.go b/pkg/routes/api/v2/time_entries.go index 3500677f7..a58ee8b92 100644 --- a/pkg/routes/api/v2/time_entries.go +++ b/pkg/routes/api/v2/time_entries.go @@ -155,7 +155,7 @@ func timeEntriesTimerStop(ctx context.Context, _ *struct{}) (*singleBody[models. events.CleanupPending(s) return nil, translateDomainError(err) } - events.DispatchPending(s) + events.DispatchPending(ctx, s) return &singleBody[models.TimeEntry]{Body: entry}, nil } diff --git a/pkg/routes/caldav/listStorageProvider.go b/pkg/routes/caldav/listStorageProvider.go index 5544d3ec7..60a151e2d 100644 --- a/pkg/routes/caldav/listStorageProvider.go +++ b/pkg/routes/caldav/listStorageProvider.go @@ -17,6 +17,7 @@ package caldav import ( + "context" "slices" "strconv" "strings" @@ -396,7 +397,7 @@ func (vcls *VikunjaCaldavProjectStorage) CreateResource(rpath, content string) ( return nil, err } - events.DispatchPending(s) + events.DispatchPending(context.Background(), s) // Build up the proper response rr := VikunjaProjectResourceAdapter{ @@ -473,7 +474,7 @@ func (vcls *VikunjaCaldavProjectStorage) UpdateResource(rpath, content string) ( return nil, err } - events.DispatchPending(s) + events.DispatchPending(context.Background(), s) // Build up the proper response rr := VikunjaProjectResourceAdapter{ @@ -516,7 +517,7 @@ func (vcls *VikunjaCaldavProjectStorage) DeleteResource(_ string) error { return err } - events.DispatchPending(s) + events.DispatchPending(context.Background(), s) } return nil diff --git a/pkg/web/handler/core.go b/pkg/web/handler/core.go index fa037794b..25c91c069 100644 --- a/pkg/web/handler/core.go +++ b/pkg/web/handler/core.go @@ -30,7 +30,6 @@ import ( // Caller is responsible for body/path binding and validation before calling. func DoCreate(ctx context.Context, obj CObject, a web.Auth) error { s := db.NewSession() - events.SetContextForKey(s, ctx) defer func() { if err := s.Close(); err != nil { log.Errorf("Could not close session: %s", err) @@ -61,7 +60,7 @@ func DoCreate(ctx context.Context, obj CObject, a web.Auth) error { return err } - events.DispatchPending(s) + events.DispatchPending(ctx, s) return nil } @@ -71,7 +70,6 @@ func DoCreate(ctx context.Context, obj CObject, a web.Auth) error { // header in the Echo wrapper; Huma wrapper may ignore it. func DoReadOne(ctx context.Context, obj CObject, a web.Auth) (maxPermission int, err error) { s := db.NewSession() - events.SetContextForKey(s, ctx) defer func() { if cerr := s.Close(); cerr != nil { log.Errorf("Could not close session: %s", cerr) @@ -102,7 +100,7 @@ func DoReadOne(ctx context.Context, obj CObject, a web.Auth) (maxPermission int, return 0, err } - events.DispatchPending(s) + events.DispatchPending(ctx, s) return maxPermission, nil } @@ -112,7 +110,6 @@ func DoReadOne(ctx context.Context, obj CObject, a web.Auth) (maxPermission int, // nil-slice normalization remain the caller's responsibility. func DoReadAll(ctx context.Context, obj CObject, a web.Auth, search string, page, perPage int) (result any, resultCount int, total int64, err error) { s := db.NewSession() - events.SetContextForKey(s, ctx) defer func() { if cerr := s.Close(); cerr != nil { log.Errorf("Could not close session: %s", cerr) @@ -131,7 +128,7 @@ func DoReadAll(ctx context.Context, obj CObject, a web.Auth, search string, page return nil, 0, 0, err } - events.DispatchPending(s) + events.DispatchPending(ctx, s) return result, resultCount, total, nil } @@ -140,7 +137,6 @@ func DoReadAll(ctx context.Context, obj CObject, a web.Auth, search string, page // and validation before calling. func DoUpdate(ctx context.Context, obj CObject, a web.Auth) error { s := db.NewSession() - events.SetContextForKey(s, ctx) defer func() { if err := s.Close(); err != nil { log.Errorf("Could not close session: %s", err) @@ -171,7 +167,7 @@ func DoUpdate(ctx context.Context, obj CObject, a web.Auth) error { return err } - events.DispatchPending(s) + events.DispatchPending(ctx, s) return nil } @@ -180,7 +176,6 @@ func DoUpdate(ctx context.Context, obj CObject, a web.Auth) error { // calling. func DoDelete(ctx context.Context, obj CObject, a web.Auth) error { s := db.NewSession() - events.SetContextForKey(s, ctx) defer func() { if err := s.Close(); err != nil { log.Errorf("Could not close session: %s", err) @@ -211,6 +206,6 @@ func DoDelete(ctx context.Context, obj CObject, a web.Auth) error { return err } - events.DispatchPending(s) + events.DispatchPending(ctx, s) return nil }