From 514dbc56bd1a4e0c76d0f6538ef44d5ebe11fce0 Mon Sep 17 00:00:00 2001 From: kolaente Date: Wed, 10 Jun 2026 21:00:41 +0200 Subject: [PATCH] feat(events): carry request metadata onto dispatched event messages Adds a RequestMeta context bridge so events dispatched during an HTTP request can be attributed to it: a middleware stashes IP/UA/request-id on the request context, the generic Do* handlers associate that context with the transaction key, and DispatchPending/DispatchWithContext copy the metadata onto the watermill message at publish time. Existing dispatch call sites are unchanged. --- pkg/events/events.go | 37 ++++++++++++++++++++++++- pkg/events/request_meta.go | 55 ++++++++++++++++++++++++++++++++++++++ pkg/web/handler/core.go | 15 +++++++---- 3 files changed, 101 insertions(+), 6 deletions(-) create mode 100644 pkg/events/request_meta.go diff --git a/pkg/events/events.go b/pkg/events/events.go index 30c26ea99..882de2bbb 100644 --- a/pkg/events/events.go +++ b/pkg/events/events.go @@ -201,6 +201,13 @@ func InitEventsForTesting(ctx context.Context) (<-chan struct{}, error) { // Dispatch dispatches an event func Dispatch(event Event) error { + return DispatchWithContext(context.Background(), event) +} + +// DispatchWithContext dispatches an event and copies request metadata from the +// context (see WithRequestMeta) onto the message metadata, so listeners can +// attribute the event to the originating HTTP request. +func DispatchWithContext(ctx context.Context, event Event) error { if isUnderTest { dispatchedTestEvents = append(dispatchedTestEvents, event) return nil @@ -216,17 +223,41 @@ func Dispatch(event Event) error { } msg := message.NewMessage(watermill.NewUUID(), content) + if meta := RequestMetaFromContext(ctx); meta != nil { + if meta.IP != "" { + msg.Metadata.Set(MetadataKeyIP, meta.IP) + } + if meta.UserAgent != "" { + msg.Metadata.Set(MetadataKeyUserAgent, meta.UserAgent) + } + if meta.RequestID != "" { + msg.Metadata.Set(MetadataKeyRequestID, meta.RequestID) + } + } return pubsub.Publish(event.Name(), msg) } // pendingEventQueue holds the pending events and a mutex for thread-safe access type pendingEventQueue struct { mu sync.Mutex + ctx context.Context events []Event } var pendingEvents sync.Map // map[any]*pendingEventQueue +// SetContextForKey associates a request context with a transaction key so that +// events queued via DispatchOnCommit for the same key are dispatched with the +// request metadata from that context. The entry is removed by DispatchPending +// or CleanupPending — callers must guarantee one of them runs for the key. +func SetContextForKey(key any, ctx context.Context) { + val, _ := pendingEvents.LoadOrStore(key, &pendingEventQueue{}) + queue := val.(*pendingEventQueue) + queue.mu.Lock() + queue.ctx = ctx + queue.mu.Unlock() +} + // DispatchOnCommit stores an event to be dispatched later, after a transaction commits. // The key should be the *xorm.Session pointer associated with the transaction. // Call DispatchPending(key) after s.Commit() to actually dispatch the events. @@ -250,8 +281,12 @@ func DispatchPending(key any) { queue := val.(*pendingEventQueue) // No need to lock here since we've already removed it from the map // and this key won't receive new events + ctx := queue.ctx + if ctx == nil { + ctx = context.Background() + } for _, event := range queue.events { - if err := Dispatch(event); err != nil { + if err := DispatchWithContext(ctx, event); err != nil { log.Errorf("Failed to dispatch event %s: %v", event.Name(), err) } } diff --git a/pkg/events/request_meta.go b/pkg/events/request_meta.go new file mode 100644 index 000000000..796c7b7e9 --- /dev/null +++ b/pkg/events/request_meta.go @@ -0,0 +1,55 @@ +// Vikunja is a to-do list application to facilitate your life. +// Copyright 2018-present Vikunja and contributors. All rights reserved. +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package events + +import "context" + +// RequestMeta carries information about the originating HTTP request. It is +// stashed on the request context by a middleware and copied onto message +// metadata at publish time, so listeners (e.g. audit) can attribute an event +// to a request without every dispatch site changing its signature. +type RequestMeta struct { + IP string + UserAgent string + RequestID string +} + +// Message metadata keys holding request information. +const ( + MetadataKeyIP = "request_ip" + MetadataKeyUserAgent = "request_user_agent" + MetadataKeyRequestID = "request_id" +) + +type requestMetaKeyType struct{} + +var requestMetaKey requestMetaKeyType + +// WithRequestMeta returns a context carrying the given request metadata. +func WithRequestMeta(ctx context.Context, meta *RequestMeta) context.Context { + return context.WithValue(ctx, requestMetaKey, meta) +} + +// RequestMetaFromContext returns the request metadata stored on the context, +// or nil if there is none. +func RequestMetaFromContext(ctx context.Context) *RequestMeta { + if ctx == nil { + return nil + } + meta, _ := ctx.Value(requestMetaKey).(*RequestMeta) + return meta +} diff --git a/pkg/web/handler/core.go b/pkg/web/handler/core.go index 01474b874..fa037794b 100644 --- a/pkg/web/handler/core.go +++ b/pkg/web/handler/core.go @@ -28,8 +28,9 @@ import ( // DoCreate runs the permission check + model Create + commit pipeline for a // CObject. Framework-agnostic: callable from both Echo (CreateWeb) and Huma. // Caller is responsible for body/path binding and validation before calling. -func DoCreate(_ context.Context, obj CObject, a web.Auth) error { +func DoCreate(ctx context.Context, obj CObject, a web.Auth) error { s := db.NewSession() + events.SetContextForKey(s, ctx) defer func() { if err := s.Close(); err != nil { log.Errorf("Could not close session: %s", err) @@ -68,8 +69,9 @@ func DoCreate(_ context.Context, obj CObject, a web.Auth) error { // CObject. obj should have its identifying fields set before call. On success, // obj is fully populated. maxPermission is exposed via the x-max-permission // header in the Echo wrapper; Huma wrapper may ignore it. -func DoReadOne(_ context.Context, obj CObject, a web.Auth) (maxPermission int, err error) { +func DoReadOne(ctx context.Context, obj CObject, a web.Auth) (maxPermission int, err error) { s := db.NewSession() + events.SetContextForKey(s, ctx) defer func() { if cerr := s.Close(); cerr != nil { log.Errorf("Could not close session: %s", cerr) @@ -108,8 +110,9 @@ func DoReadOne(_ context.Context, obj CObject, a web.Auth) (maxPermission int, e // scoping context (e.g., TaskID on LabelTask). Returns the result slice/ // interface, the result count, and total count. Pagination header math and // nil-slice normalization remain the caller's responsibility. -func DoReadAll(_ context.Context, obj CObject, a web.Auth, search string, page, perPage int) (result any, resultCount int, total int64, err error) { +func DoReadAll(ctx context.Context, obj CObject, a web.Auth, search string, page, perPage int) (result any, resultCount int, total int64, err error) { s := db.NewSession() + events.SetContextForKey(s, ctx) defer func() { if cerr := s.Close(); cerr != nil { log.Errorf("Could not close session: %s", cerr) @@ -135,8 +138,9 @@ func DoReadAll(_ context.Context, obj CObject, a web.Auth, search string, page, // DoUpdate runs the permission check + model Update + commit pipeline for a // CObject. Framework-agnostic. Caller is responsible for body/path binding // and validation before calling. -func DoUpdate(_ context.Context, obj CObject, a web.Auth) error { +func DoUpdate(ctx context.Context, obj CObject, a web.Auth) error { s := db.NewSession() + events.SetContextForKey(s, ctx) defer func() { if err := s.Close(); err != nil { log.Errorf("Could not close session: %s", err) @@ -174,8 +178,9 @@ func DoUpdate(_ context.Context, obj CObject, a web.Auth) error { // DoDelete runs the permission check + model Delete + commit pipeline for a // CObject. Framework-agnostic. Caller is responsible for path binding before // calling. -func DoDelete(_ context.Context, obj CObject, a web.Auth) error { +func DoDelete(ctx context.Context, obj CObject, a web.Auth) error { s := db.NewSession() + events.SetContextForKey(s, ctx) defer func() { if err := s.Close(); err != nil { log.Errorf("Could not close session: %s", err)