feat(webhook): add WebhookDeliveryListener for per-webhook delivery
This commit is contained in:
parent
d89af8ce6d
commit
eef985011b
|
|
@ -778,6 +778,55 @@ type WebhookPayload struct {
|
|||
Data interface{} `json:"data"`
|
||||
}
|
||||
|
||||
// WebhookDeliveryListener delivers one webhook per message. It is the
|
||||
// consumer for WebhookDeliveryEvent and owns the retry semantics: any
|
||||
// error returned from Handle triggers the watermill retry middleware
|
||||
// independently for this single delivery, with no effect on other
|
||||
// webhooks on the same parent event.
|
||||
type WebhookDeliveryListener struct{}
|
||||
|
||||
// Name defines the name for the WebhookDeliveryListener listener
|
||||
func (wdl *WebhookDeliveryListener) Name() string {
|
||||
return "webhook.delivery.listener"
|
||||
}
|
||||
|
||||
// Handle is executed when a WebhookDeliveryEvent is fired. It reloads the
|
||||
// webhook from the database by id (so secrets, target_url, and basic auth
|
||||
// credentials are always current) and performs the HTTP delivery.
|
||||
//
|
||||
// Special cases:
|
||||
// - If the webhook row no longer exists (deleted between fan-out and
|
||||
// delivery), Handle returns nil so the message is not retried.
|
||||
// - Any other error is returned so the watermill retry middleware
|
||||
// retries this delivery with exponential backoff, and eventually
|
||||
// parks it in the poison queue if all retries fail.
|
||||
func (wdl *WebhookDeliveryListener) Handle(msg *message.Message) error {
|
||||
evt := &WebhookDeliveryEvent{}
|
||||
if err := json.Unmarshal(msg.Payload, evt); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if evt.Payload == nil {
|
||||
log.Errorf("webhook delivery event for webhook %d has no payload", evt.WebhookID)
|
||||
return nil
|
||||
}
|
||||
|
||||
s := db.NewSession()
|
||||
defer s.Close()
|
||||
|
||||
webhook := &Webhook{}
|
||||
has, err := s.Where("id = ?", evt.WebhookID).Get(webhook)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if !has {
|
||||
log.Debugf("webhook %d no longer exists, skipping delivery of %s", evt.WebhookID, evt.Payload.EventName)
|
||||
return nil
|
||||
}
|
||||
|
||||
return webhook.sendWebhookPayload(evt.Payload)
|
||||
}
|
||||
|
||||
func getIDAsInt64(id interface{}) int64 {
|
||||
if id == nil {
|
||||
return 0
|
||||
|
|
|
|||
Loading…
Reference in New Issue