feat(events): carry request metadata onto dispatched event messages

Adds a RequestMeta context bridge so events dispatched during an HTTP
request can be attributed to it: a middleware stashes IP/UA/request-id
on the request context, the generic Do* handlers associate that context
with the transaction key, and DispatchPending/DispatchWithContext copy
the metadata onto the watermill message at publish time. Existing
dispatch call sites are unchanged.
This commit is contained in:
kolaente 2026-06-10 21:00:41 +02:00
parent e271f75cad
commit 5db25ab75c
3 changed files with 101 additions and 6 deletions

View File

@ -201,6 +201,13 @@ func InitEventsForTesting(ctx context.Context) (<-chan struct{}, error) {
// Dispatch dispatches an event
func Dispatch(event Event) error {
return DispatchWithContext(context.Background(), event)
}
// DispatchWithContext dispatches an event and copies request metadata from the
// context (see WithRequestMeta) onto the message metadata, so listeners can
// attribute the event to the originating HTTP request.
func DispatchWithContext(ctx context.Context, event Event) error {
if isUnderTest {
dispatchedTestEvents = append(dispatchedTestEvents, event)
return nil
@ -216,17 +223,41 @@ func Dispatch(event Event) error {
}
msg := message.NewMessage(watermill.NewUUID(), content)
if meta := RequestMetaFromContext(ctx); meta != nil {
if meta.IP != "" {
msg.Metadata.Set(MetadataKeyIP, meta.IP)
}
if meta.UserAgent != "" {
msg.Metadata.Set(MetadataKeyUserAgent, meta.UserAgent)
}
if meta.RequestID != "" {
msg.Metadata.Set(MetadataKeyRequestID, meta.RequestID)
}
}
return pubsub.Publish(event.Name(), msg)
}
// pendingEventQueue holds the pending events and a mutex for thread-safe access
type pendingEventQueue struct {
mu sync.Mutex
ctx context.Context
events []Event
}
var pendingEvents sync.Map // map[any]*pendingEventQueue
// SetContextForKey associates a request context with a transaction key so that
// events queued via DispatchOnCommit for the same key are dispatched with the
// request metadata from that context. The entry is removed by DispatchPending
// or CleanupPending — callers must guarantee one of them runs for the key.
func SetContextForKey(key any, ctx context.Context) {
val, _ := pendingEvents.LoadOrStore(key, &pendingEventQueue{})
queue := val.(*pendingEventQueue)
queue.mu.Lock()
queue.ctx = ctx
queue.mu.Unlock()
}
// DispatchOnCommit stores an event to be dispatched later, after a transaction commits.
// The key should be the *xorm.Session pointer associated with the transaction.
// Call DispatchPending(key) after s.Commit() to actually dispatch the events.
@ -250,8 +281,12 @@ func DispatchPending(key any) {
queue := val.(*pendingEventQueue)
// No need to lock here since we've already removed it from the map
// and this key won't receive new events
ctx := queue.ctx
if ctx == nil {
ctx = context.Background()
}
for _, event := range queue.events {
if err := Dispatch(event); err != nil {
if err := DispatchWithContext(ctx, event); err != nil {
log.Errorf("Failed to dispatch event %s: %v", event.Name(), err)
}
}

View File

@ -0,0 +1,55 @@
// Vikunja is a to-do list application to facilitate your life.
// Copyright 2018-present Vikunja and contributors. All rights reserved.
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
package events
import "context"
// RequestMeta carries information about the originating HTTP request. It is
// stashed on the request context by a middleware and copied onto message
// metadata at publish time, so listeners (e.g. audit) can attribute an event
// to a request without every dispatch site changing its signature.
type RequestMeta struct {
IP string
UserAgent string
RequestID string
}
// Message metadata keys holding request information.
const (
MetadataKeyIP = "request_ip"
MetadataKeyUserAgent = "request_user_agent"
MetadataKeyRequestID = "request_id"
)
type requestMetaKeyType struct{}
var requestMetaKey requestMetaKeyType
// WithRequestMeta returns a context carrying the given request metadata.
func WithRequestMeta(ctx context.Context, meta *RequestMeta) context.Context {
return context.WithValue(ctx, requestMetaKey, meta)
}
// RequestMetaFromContext returns the request metadata stored on the context,
// or nil if there is none.
func RequestMetaFromContext(ctx context.Context) *RequestMeta {
if ctx == nil {
return nil
}
meta, _ := ctx.Value(requestMetaKey).(*RequestMeta)
return meta
}

View File

@ -28,8 +28,9 @@ import (
// DoCreate runs the permission check + model Create + commit pipeline for a
// CObject. Framework-agnostic: callable from both Echo (CreateWeb) and Huma.
// Caller is responsible for body/path binding and validation before calling.
func DoCreate(_ context.Context, obj CObject, a web.Auth) error {
func DoCreate(ctx context.Context, obj CObject, a web.Auth) error {
s := db.NewSession()
events.SetContextForKey(s, ctx)
defer func() {
if err := s.Close(); err != nil {
log.Errorf("Could not close session: %s", err)
@ -68,8 +69,9 @@ func DoCreate(_ context.Context, obj CObject, a web.Auth) error {
// CObject. obj should have its identifying fields set before call. On success,
// obj is fully populated. maxPermission is exposed via the x-max-permission
// header in the Echo wrapper; Huma wrapper may ignore it.
func DoReadOne(_ context.Context, obj CObject, a web.Auth) (maxPermission int, err error) {
func DoReadOne(ctx context.Context, obj CObject, a web.Auth) (maxPermission int, err error) {
s := db.NewSession()
events.SetContextForKey(s, ctx)
defer func() {
if cerr := s.Close(); cerr != nil {
log.Errorf("Could not close session: %s", cerr)
@ -108,8 +110,9 @@ func DoReadOne(_ context.Context, obj CObject, a web.Auth) (maxPermission int, e
// scoping context (e.g., TaskID on LabelTask). Returns the result slice/
// interface, the result count, and total count. Pagination header math and
// nil-slice normalization remain the caller's responsibility.
func DoReadAll(_ context.Context, obj CObject, a web.Auth, search string, page, perPage int) (result any, resultCount int, total int64, err error) {
func DoReadAll(ctx context.Context, obj CObject, a web.Auth, search string, page, perPage int) (result any, resultCount int, total int64, err error) {
s := db.NewSession()
events.SetContextForKey(s, ctx)
defer func() {
if cerr := s.Close(); cerr != nil {
log.Errorf("Could not close session: %s", cerr)
@ -135,8 +138,9 @@ func DoReadAll(_ context.Context, obj CObject, a web.Auth, search string, page,
// DoUpdate runs the permission check + model Update + commit pipeline for a
// CObject. Framework-agnostic. Caller is responsible for body/path binding
// and validation before calling.
func DoUpdate(_ context.Context, obj CObject, a web.Auth) error {
func DoUpdate(ctx context.Context, obj CObject, a web.Auth) error {
s := db.NewSession()
events.SetContextForKey(s, ctx)
defer func() {
if err := s.Close(); err != nil {
log.Errorf("Could not close session: %s", err)
@ -174,8 +178,9 @@ func DoUpdate(_ context.Context, obj CObject, a web.Auth) error {
// DoDelete runs the permission check + model Delete + commit pipeline for a
// CObject. Framework-agnostic. Caller is responsible for path binding before
// calling.
func DoDelete(_ context.Context, obj CObject, a web.Auth) error {
func DoDelete(ctx context.Context, obj CObject, a web.Auth) error {
s := db.NewSession()
events.SetContextForKey(s, ctx)
defer func() {
if err := s.Close(); err != nil {
log.Errorf("Could not close session: %s", err)