feat(events): carry request metadata onto dispatched event messages
Adds a RequestMeta context bridge so events dispatched during an HTTP request can be attributed to it: a middleware stashes IP/UA/request-id on the request context, the generic Do* handlers associate that context with the transaction key, and DispatchPending/DispatchWithContext copy the metadata onto the watermill message at publish time. Existing dispatch call sites are unchanged.
This commit is contained in:
parent
2bbe77c141
commit
48f7dafce3
|
|
@ -201,6 +201,13 @@ func InitEventsForTesting(ctx context.Context) (<-chan struct{}, error) {
|
|||
|
||||
// Dispatch dispatches an event
|
||||
func Dispatch(event Event) error {
|
||||
return DispatchWithContext(context.Background(), event)
|
||||
}
|
||||
|
||||
// DispatchWithContext dispatches an event and copies request metadata from the
|
||||
// context (see WithRequestMeta) onto the message metadata, so listeners can
|
||||
// attribute the event to the originating HTTP request.
|
||||
func DispatchWithContext(ctx context.Context, event Event) error {
|
||||
if isUnderTest {
|
||||
dispatchedTestEvents = append(dispatchedTestEvents, event)
|
||||
return nil
|
||||
|
|
@ -216,17 +223,41 @@ func Dispatch(event Event) error {
|
|||
}
|
||||
|
||||
msg := message.NewMessage(watermill.NewUUID(), content)
|
||||
if meta := RequestMetaFromContext(ctx); meta != nil {
|
||||
if meta.IP != "" {
|
||||
msg.Metadata.Set(MetadataKeyIP, meta.IP)
|
||||
}
|
||||
if meta.UserAgent != "" {
|
||||
msg.Metadata.Set(MetadataKeyUserAgent, meta.UserAgent)
|
||||
}
|
||||
if meta.RequestID != "" {
|
||||
msg.Metadata.Set(MetadataKeyRequestID, meta.RequestID)
|
||||
}
|
||||
}
|
||||
return pubsub.Publish(event.Name(), msg)
|
||||
}
|
||||
|
||||
// pendingEventQueue holds the pending events and a mutex for thread-safe access
|
||||
type pendingEventQueue struct {
|
||||
mu sync.Mutex
|
||||
ctx context.Context
|
||||
events []Event
|
||||
}
|
||||
|
||||
var pendingEvents sync.Map // map[any]*pendingEventQueue
|
||||
|
||||
// SetContextForKey associates a request context with a transaction key so that
|
||||
// events queued via DispatchOnCommit for the same key are dispatched with the
|
||||
// request metadata from that context. The entry is removed by DispatchPending
|
||||
// or CleanupPending — callers must guarantee one of them runs for the key.
|
||||
func SetContextForKey(key any, ctx context.Context) {
|
||||
val, _ := pendingEvents.LoadOrStore(key, &pendingEventQueue{})
|
||||
queue := val.(*pendingEventQueue)
|
||||
queue.mu.Lock()
|
||||
queue.ctx = ctx
|
||||
queue.mu.Unlock()
|
||||
}
|
||||
|
||||
// DispatchOnCommit stores an event to be dispatched later, after a transaction commits.
|
||||
// The key should be the *xorm.Session pointer associated with the transaction.
|
||||
// Call DispatchPending(key) after s.Commit() to actually dispatch the events.
|
||||
|
|
@ -250,8 +281,12 @@ func DispatchPending(key any) {
|
|||
queue := val.(*pendingEventQueue)
|
||||
// No need to lock here since we've already removed it from the map
|
||||
// and this key won't receive new events
|
||||
ctx := queue.ctx
|
||||
if ctx == nil {
|
||||
ctx = context.Background()
|
||||
}
|
||||
for _, event := range queue.events {
|
||||
if err := Dispatch(event); err != nil {
|
||||
if err := DispatchWithContext(ctx, event); err != nil {
|
||||
log.Errorf("Failed to dispatch event %s: %v", event.Name(), err)
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -0,0 +1,55 @@
|
|||
// Vikunja is a to-do list application to facilitate your life.
|
||||
// Copyright 2018-present Vikunja and contributors. All rights reserved.
|
||||
//
|
||||
// This program is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU Affero General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
//
|
||||
// This program is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU Affero General Public License for more details.
|
||||
//
|
||||
// You should have received a copy of the GNU Affero General Public License
|
||||
// along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
|
||||
package events
|
||||
|
||||
import "context"
|
||||
|
||||
// RequestMeta carries information about the originating HTTP request. It is
|
||||
// stashed on the request context by a middleware and copied onto message
|
||||
// metadata at publish time, so listeners (e.g. audit) can attribute an event
|
||||
// to a request without every dispatch site changing its signature.
|
||||
type RequestMeta struct {
|
||||
IP string
|
||||
UserAgent string
|
||||
RequestID string
|
||||
}
|
||||
|
||||
// Message metadata keys holding request information.
|
||||
const (
|
||||
MetadataKeyIP = "request_ip"
|
||||
MetadataKeyUserAgent = "request_user_agent"
|
||||
MetadataKeyRequestID = "request_id"
|
||||
)
|
||||
|
||||
type requestMetaKeyType struct{}
|
||||
|
||||
var requestMetaKey requestMetaKeyType
|
||||
|
||||
// WithRequestMeta returns a context carrying the given request metadata.
|
||||
func WithRequestMeta(ctx context.Context, meta *RequestMeta) context.Context {
|
||||
return context.WithValue(ctx, requestMetaKey, meta)
|
||||
}
|
||||
|
||||
// RequestMetaFromContext returns the request metadata stored on the context,
|
||||
// or nil if there is none.
|
||||
func RequestMetaFromContext(ctx context.Context) *RequestMeta {
|
||||
if ctx == nil {
|
||||
return nil
|
||||
}
|
||||
meta, _ := ctx.Value(requestMetaKey).(*RequestMeta)
|
||||
return meta
|
||||
}
|
||||
|
|
@ -28,8 +28,9 @@ import (
|
|||
// DoCreate runs the permission check + model Create + commit pipeline for a
|
||||
// CObject. Framework-agnostic: callable from both Echo (CreateWeb) and Huma.
|
||||
// Caller is responsible for body/path binding and validation before calling.
|
||||
func DoCreate(_ context.Context, obj CObject, a web.Auth) error {
|
||||
func DoCreate(ctx context.Context, obj CObject, a web.Auth) error {
|
||||
s := db.NewSession()
|
||||
events.SetContextForKey(s, ctx)
|
||||
defer func() {
|
||||
if err := s.Close(); err != nil {
|
||||
log.Errorf("Could not close session: %s", err)
|
||||
|
|
@ -68,8 +69,9 @@ func DoCreate(_ context.Context, obj CObject, a web.Auth) error {
|
|||
// CObject. obj should have its identifying fields set before call. On success,
|
||||
// obj is fully populated. maxPermission is exposed via the x-max-permission
|
||||
// header in the Echo wrapper; Huma wrapper may ignore it.
|
||||
func DoReadOne(_ context.Context, obj CObject, a web.Auth) (maxPermission int, err error) {
|
||||
func DoReadOne(ctx context.Context, obj CObject, a web.Auth) (maxPermission int, err error) {
|
||||
s := db.NewSession()
|
||||
events.SetContextForKey(s, ctx)
|
||||
defer func() {
|
||||
if cerr := s.Close(); cerr != nil {
|
||||
log.Errorf("Could not close session: %s", cerr)
|
||||
|
|
@ -108,8 +110,9 @@ func DoReadOne(_ context.Context, obj CObject, a web.Auth) (maxPermission int, e
|
|||
// scoping context (e.g., TaskID on LabelTask). Returns the result slice/
|
||||
// interface, the result count, and total count. Pagination header math and
|
||||
// nil-slice normalization remain the caller's responsibility.
|
||||
func DoReadAll(_ context.Context, obj CObject, a web.Auth, search string, page, perPage int) (result any, resultCount int, total int64, err error) {
|
||||
func DoReadAll(ctx context.Context, obj CObject, a web.Auth, search string, page, perPage int) (result any, resultCount int, total int64, err error) {
|
||||
s := db.NewSession()
|
||||
events.SetContextForKey(s, ctx)
|
||||
defer func() {
|
||||
if cerr := s.Close(); cerr != nil {
|
||||
log.Errorf("Could not close session: %s", cerr)
|
||||
|
|
@ -135,8 +138,9 @@ func DoReadAll(_ context.Context, obj CObject, a web.Auth, search string, page,
|
|||
// DoUpdate runs the permission check + model Update + commit pipeline for a
|
||||
// CObject. Framework-agnostic. Caller is responsible for body/path binding
|
||||
// and validation before calling.
|
||||
func DoUpdate(_ context.Context, obj CObject, a web.Auth) error {
|
||||
func DoUpdate(ctx context.Context, obj CObject, a web.Auth) error {
|
||||
s := db.NewSession()
|
||||
events.SetContextForKey(s, ctx)
|
||||
defer func() {
|
||||
if err := s.Close(); err != nil {
|
||||
log.Errorf("Could not close session: %s", err)
|
||||
|
|
@ -174,8 +178,9 @@ func DoUpdate(_ context.Context, obj CObject, a web.Auth) error {
|
|||
// DoDelete runs the permission check + model Delete + commit pipeline for a
|
||||
// CObject. Framework-agnostic. Caller is responsible for path binding before
|
||||
// calling.
|
||||
func DoDelete(_ context.Context, obj CObject, a web.Auth) error {
|
||||
func DoDelete(ctx context.Context, obj CObject, a web.Auth) error {
|
||||
s := db.NewSession()
|
||||
events.SetContextForKey(s, ctx)
|
||||
defer func() {
|
||||
if err := s.Close(); err != nil {
|
||||
log.Errorf("Could not close session: %s", err)
|
||||
|
|
|
|||
Loading…
Reference in New Issue