Skip to content

feat: add outbound webhooks with at-least-once delivery#325

Draft
edsonmichaque wants to merge 4 commits intomainfrom
feat/outbound-webhooks
Draft

feat: add outbound webhooks with at-least-once delivery#325
edsonmichaque wants to merge 4 commits intomainfrom
feat/outbound-webhooks

Conversation

@edsonmichaque
Copy link
Copy Markdown
Contributor

@edsonmichaque edsonmichaque commented Feb 28, 2026

Summary

Adds a persistent outbound webhook system that notifies external services when platform objects change (LLMs, apps, users, tools, filters, plugins, model routers, etc.).

Architecture

  • Persistent delivery queueWebhookEvent rows survive process restarts; the poller reschedules anything left undelivered
  • Atomic in-flight claimingUPDATE WHERE status='pending' SET status='in_flight' prevents two workers racing to deliver the same event
  • 3-level retry policy — per-subscription overrides → DB singleton (WebhookConfig) → static startup config; changing the DB config takes effect on the next delivery attempt without restart
  • Normalized topicsWebhookTopic join table with composite unique index instead of a JSON column; findMatchingSubscriptions uses a SQL JOIN; topic validation against KnownWebhookTopics at write time
  • Per-subscription transport config — proxy URL (HTTP/HTTPS/SOCKS5), custom TLS CA cert, mTLS client cert/key, insecure_skip_verify; each delivery builds its own http.Client from the subscription's config
  • Replay attack prevention — HMAC-SHA256 signs "<unix_timestamp>.<body>" and sends both X-Tyk-Timestamp and X-Tyk-Signature: sha256=<hex> headers; receivers check the timestamp is within their tolerance window (e.g. 5 minutes) before verifying the signature
  • SSRF protection — blocks private/loopback IPs at write time (create/update) and again at delivery time; ALLOW_INTERNAL_NETWORK_ACCESS env var resolved once at startup, not per call
  • Claim-based actor trackingRetryDelivery extracts the authenticated user from gin context ("user" key, set by AuthMiddleware) and stamps WebhookEvent.TriggeredBy with the actor's user ID; 0 = system-initiated, non-zero = manually triggered by a specific admin
  • Standard paginationGET .../deliveries uses getPaginationParams() (?page, ?page_size, ?all) and sets X-Total-Count / X-Total-Pages headers, consistent with all other list endpoints

Signature verification (receiver side)

import hmac, hashlib, time

MAX_SKEW = 300  # seconds

def verify(secret: str, body: bytes, headers: dict) -> None:
    ts = int(headers["X-Tyk-Timestamp"])
    if abs(time.time() - ts) > MAX_SKEW:
        raise ValueError("stale timestamp — possible replay attack")
    signed = f"{ts}.".encode() + body
    expected = "sha256=" + hmac.new(secret.encode(), signed, hashlib.sha256).hexdigest()
    if not hmac.compare_digest(expected, headers["X-Tyk-Signature"]):
        raise ValueError("invalid signature")

REST endpoints (admin-only)

Method Path Description
POST /api/v1/webhooks Create subscription
GET /api/v1/webhooks List subscriptions
GET /api/v1/webhooks/topics List subscribable event topics
GET /api/v1/webhooks/config Get global config singleton
PUT /api/v1/webhooks/config Update global config singleton
GET /api/v1/webhooks/:id Get subscription
PUT /api/v1/webhooks/:id Update subscription
DELETE /api/v1/webhooks/:id Delete subscription
POST /api/v1/webhooks/:id/test Fire test delivery
GET /api/v1/webhooks/:id/deliveries List delivery logs (?page, ?page_size, ?all)
POST /api/v1/webhooks/:id/deliveries/:log_id/retry Replay a past delivery (202)

New tables

  • webhook_subscriptions — endpoint config + retry policy + transport config
  • webhook_topics — subscription ↔ topic join table (normalized, composite unique index)
  • webhook_events — persistent delivery queue (pendingin_flightdelivered/exhausted); triggered_by records the actor for manual retries
  • webhook_delivery_logs — per-attempt audit log
  • webhook_configs — DB singleton for runtime-tunable global defaults

Test plan

  • CGO_ENABLED=1 go test ./services/... -run Webhook -v — all tests pass
  • GET /api/v1/webhooks/topics returns the full known topic list
  • Create a subscription; confirm unknown topic is rejected with 400
  • Trigger a platform event (e.g. create an LLM) and confirm delivery fires with X-Tyk-Timestamp + X-Tyk-Signature headers
  • Verify signature using the formula above; confirm re-sending after 5+ minutes is rejected
  • Set insecure_skip_verify: true on a subscription pointing at a self-signed HTTPS server
  • Configure a proxy_url and verify traffic routes through it
  • Force a delivery failure, then POST .../deliveries/:log_id/retry; confirm triggered_by is set to the authenticated admin's user ID
  • Update WebhookConfig via PUT /api/v1/webhooks/config; confirm new retry policy takes effect without restart
  • Verify GET .../deliveries?page=1&page_size=10 returns paginated results with X-Total-Count and X-Total-Pages headers

@probelabs
Copy link
Copy Markdown
Contributor

probelabs Bot commented Feb 28, 2026

This PR introduces a persistent outbound webhook system with at-least-once delivery guarantees. It allows external services to subscribe to real-time notifications for platform events (e.g., LLM creation, app updates). The system is designed for reliability and security, featuring a persistent delivery queue, atomic event processing, a flexible 3-level retry policy, and per-subscription transport configurations. Security is addressed through HMAC-SHA256 payload signing to prevent replay attacks, robust SSRF protection, and encryption-at-rest for sensitive credentials.

Files Changed Analysis

The changes introduce a new, self-contained webhook feature, with additions totaling over 2,500 lines across 11 files. The implementation is well-structured:

  • Core Logic & Testing: services/webhook_service.go contains the primary implementation, including the event handler, worker/poller architecture, and delivery mechanism. It is thoroughly tested in services/webhook_service_test.go.
  • API Layer: api/webhook_handlers.go adds the new REST API endpoints, with api/api.go registering the routes.
  • Data Models: models/webhook.go defines the five new GORM models for the database, which are registered in models/models.go. GORM hooks are used to encrypt sensitive data like secrets and proxy URLs.
  • Configuration & Integration: config/config.go adds static configurations from environment variables, while main.go and services/service.go initialize the WebhookService and integrate it into the application's event bus and lifecycle.
  • Documentation: The feature is extensively documented in features/Webhooks.md.

Architecture & Impact Assessment

  • What this PR accomplishes: It adds a robust and secure system for notifying external services about internal platform events, enabling a wide range of integrations like Slack notifications, CI/CD pipeline triggers, or SIEM ingestion.
  • Key technical changes introduced:
    1. Persistent Queue: Introduces five new database tables (webhook_subscriptions, webhook_topics, webhook_events, webhook_delivery_logs, webhook_configs) to manage subscriptions and persist events, ensuring they survive application restarts.
    2. Background Worker/Poller: Implements an asynchronous processing system with multiple workers and a DB poller to handle event delivery reliably.
    3. HMAC Payload Signing: Adds X-Tyk-Signature and X-Tyk-Timestamp headers for security and replay attack prevention.
    4. SSRF Protection: Validates webhook URLs to block requests to private or loopback IP addresses.
    5. New REST API: Exposes a full set of endpoints under /api/v1/webhooks for managing subscriptions and configuration.
  • Affected system components: The changes introduce a new WebhookService that subscribes to the central eventbridge.Bus, making it a new consumer of all system events. The API layer is extended with new routes, and the database schema is significantly altered.
  • Architectural Diagram:
flowchart TD
    A[Service Layer] -->|Publish| B[eventbridge.Bus]
    B -->|SubscribeAll| C[WebhookService.HandleEvent]
    C -->|Persist row| D["WebhookEvent table (status=pending)"]
    C -->|Non-blocking push| E[queue chan uint]
    E --> F[Worker goroutines]
    G[DB Poller] -->|Poll due events| D
    G -->|Push IDs| E
    F -->|resolveRetryPolicy| P[Merge: sub > DB global > static]
    P -->|SSRF check + HMAC sign| H[HTTP POST → external URL]
    H -->|2xx| I["Persist success log<br>mark event delivered"]
    H -->|failure| J["Persist failed log<br>schedule next attempt"]
    J -->|attempt < maxAttempts| K["Update next_run_at<br>poller picks up"]
    J -->|attempt == maxAttempts| L[Mark event exhausted]
    M[Manual RetryDelivery API] -->|New WebhookEvent row| D
Loading

Scope Discovery & Context Expansion

The scope of this feature is defined by the list of KnownWebhookTopics exported in services/system_events.go. This list acts as an allowlist for subscribable events, covering CRUD operations for most major platform objects (LLMs, Apps, Users, Tools, etc.).

The broader impact is that any existing or future action that publishes one of these events to the eventbridge.Bus can now trigger an outbound webhook. To fully assess the integration's reach, a reviewer should search the codebase for all calls to eventbridge.Bus.Publish (or its wrappers) to identify every event source that will now feed into this new webhook system.

Metadata
  • Review Effort: 5 / 5
  • Primary Label: feature

Powered by Visor from Probelabs

Last updated: 2026-02-28T11:49:08.403Z | Triggered by: pr_updated | Commit: 503a68d

💡 TIP: You can chat with Visor using /visor ask <your question>

@probelabs
Copy link
Copy Markdown
Contributor

probelabs Bot commented Feb 28, 2026

Security Issues (2)

Severity Location Issue
🟡 Warning api/webhook_handlers.go:127-130
API error responses include raw error messages from internal libraries (e.g., from `c.ShouldBindJSON`). This can leak implementation details to the client. While the API is admin-only, it's a security best practice to log detailed errors server-side and return a generic, non-revealing error message to the user.
💡 SuggestionModify the `webhookError` calls that use `err.Error()` for request binding or other internal errors. Log the full `err` object on the server and return a consistent, generic error detail like 'Invalid request body' or 'An internal error occurred'.
🟡 Warning models/webhook.go:92-101
The `webhook_events` table, which serves as the persistent delivery queue, has no data retention or cleanup mechanism. Events with a final status of 'delivered' or 'exhausted' remain in the table indefinitely. Over time, this can lead to unbounded table growth, potentially causing database performance degradation and resource exhaustion.
💡 SuggestionImplement a periodic cleanup task to prune old events from the `webhook_events` table. A new configuration option, similar to `LogRetentionDays`, could control how long to keep terminal-state ('delivered', 'exhausted') events before deleting them.

Architecture Issues (1)

Severity Location Issue
🟡 Warning main.go:145-154
The WebhookService is initialized with a default configuration inside `services.NewServiceWithOCI` and then immediately replaced in `main.go` with a new instance that has the correct configuration from `appConf`. This pattern breaks encapsulation, as the `main` package is responsible for partially constructing the `Service` struct, splitting the setup logic across multiple locations.
💡 SuggestionTo improve encapsulation and make service setup more self-contained, consider passing the `appConf` object to `NewServiceWithOCI`. This would allow the constructor to create the `WebhookService` with the correct configuration from the start, removing the need to re-assign `service.WebhookService` in `main.go`.

Performance Issues (3)

Severity Location Issue
🟠 Error services/webhook_service.go:327
A new `http.Client` is created for every webhook delivery in `processEvent`. This is inefficient as it prevents TCP connection reuse, which can lead to performance degradation and resource exhaustion (e.g., socket descriptors in a `TIME_WAIT` state) under high load. The `http.Client` is designed to be reused.
💡 SuggestionCache and reuse `http.Client` instances. Since transport settings are per-subscription, create a cache that maps a subscription's transport configuration to a shared `http.Client`. This will allow for connection pooling and significantly improve performance for endpoints that receive frequent webhooks.
🟡 Warning api/webhook_handlers.go:152
The `GET /api/v1/webhooks` endpoint, implemented by the `List` handler, retrieves all webhook subscriptions without pagination. If a large number of subscriptions are created, this can lead to excessive memory consumption and slow API responses.
💡 SuggestionImplement pagination for the `List` handler and the underlying `ListWebhooks` service method. Use the existing `getPaginationParams` helper to handle `page` and `page_size` query parameters and apply `LIMIT` and `OFFSET` to the database query, consistent with the `ListDeliveries` handler.
🟡 Warning services/webhook_service.go:557
The `findMatchingSubscriptions` function uses `Preload("Topics")`, which executes an additional database query to fetch all topics for the matching subscriptions. However, the calling function, `HandleEvent`, only uses the `SubscriptionID` and does not use the preloaded `Topics` data. This results in an unnecessary database query for every event that triggers a webhook.
💡 SuggestionRemove the `.Preload("Topics")` call from the database query within the `findMatchingSubscriptions` function to avoid the unnecessary database call.

Quality Issues (1)

Severity Location Issue
🟡 Warning services/webhook_service.go:785-799
The transaction logic in `UpdateWebhook` is inefficient as it appears to create, delete, and then re-create webhook topics. The initial `tx.Save(sub)` likely cascades to create the new topics, which are then immediately deleted by the next statement, only to be created again. This create-delete-create cycle is redundant.
💡 SuggestionRefactor the transaction to be more explicit and efficient. First, update the scalar fields of the `WebhookSubscription` object while omitting the `Topics` association. Then, delete all existing topic associations. Finally, create the new topic associations. This avoids the redundant database operations.
🔧 Suggested Fix
func (s *WebhookService) UpdateWebhook(sub *models.WebhookSubscription) error {
	return s.db.Transaction(func(tx *gorm.DB) error {
		// Explicitly update only the WebhookSubscription model's scalar fields.
		if err := tx.Model(sub).Omit("Topics").Updates(sub).Error; err != nil {
			return err
		}
	// Remove all old topic associations.
	if err := tx.Where(&#34;subscription_id = ?&#34;, sub.ID).Delete(&amp;models.WebhookTopic{}).Error; err != nil {
		return err
	}

	// Create the new topic associations if any are provided.
	if len(sub.Topics) &gt; 0 {
		for i := range sub.Topics {
			sub.Topics[i].SubscriptionID = sub.ID
			sub.Topics[i].ID = 0 // Ensure GORM creates new records
		}
		return tx.Create(&amp;sub.Topics).Error
	}

	return nil
})

}


Powered by Visor from Probelabs

Last updated: 2026-02-28T11:49:10.907Z | Triggered by: pr_updated | Commit: 503a68d

💡 TIP: You can chat with Visor using /visor ask <your question>

Implements a persistent outbound webhook system for notifying external
services of platform events (LLM, app, user, tool, etc. CRUD events).

Key design decisions:
- DB-backed delivery queue (WebhookEvent) survives process restarts
- Atomic in_flight claim prevents double-delivery across workers
- 3-level retry policy: per-subscription > DB singleton > static config
- Normalized topic join table (WebhookTopic) instead of JSON column
- Per-subscription transport config: proxy, TLS CA, mTLS, skip-verify
- Replay attack prevention: HMAC-SHA256 signs "timestamp.body"; receivers
  can reject stale requests outside their tolerance window
- SSRF protection at write time and delivery time; configurable via
  ALLOW_INTERNAL_NETWORK_ACCESS env var (resolved at startup, not per call)
- Topic validation against KnownWebhookTopics derived from system_events.go

REST endpoints (admin-only):
  POST   /api/v1/webhooks
  GET    /api/v1/webhooks
  GET    /api/v1/webhooks/topics
  GET    /api/v1/webhooks/config
  PUT    /api/v1/webhooks/config
  GET    /api/v1/webhooks/:id
  PUT    /api/v1/webhooks/:id
  DELETE /api/v1/webhooks/:id
  POST   /api/v1/webhooks/:id/test
  GET    /api/v1/webhooks/:id/deliveries
  POST   /api/v1/webhooks/:id/deliveries/:log_id/retry
…ooks

- Extract actor user ID from gin context ("user" key) in RetryDelivery
  handler; pass it through to service as actorID and store on
  WebhookEvent.TriggeredBy (0 = system-initiated, non-zero = manual retry)
- Replace hardcoded limit=50 in ListDeliveries with getPaginationParams()
  (?page, ?page_size, ?all), matching the pattern used across all other
  list endpoints; set X-Total-Count and X-Total-Pages response headers
- Update ListDeliveryLogs service method to return (logs, totalCount,
  totalPages, error) consistent with other paginated service methods
Add BeforeSave/AfterFind GORM hooks on WebhookSubscription to
encrypt/decrypt sensitive fields using the platform's existing
AES-256 + $ENC/ prefix convention (TYK_AI_SECRET_KEY env var):

  - Secret (HMAC signing secret)
  - TransportConfig.ProxyURL (may contain embedded credentials)
  - TransportConfig.TLSCACert
  - TransportConfig.TLSClientCert
  - TransportConfig.TLSClientKey

Matches the pattern used by models.Submission. Encryption is a no-op
when TYK_AI_SECRET_KEY is not configured (graceful degradation).
- RetryDelivery: pass subscriptionID from URL to enforce ownership; service
  now validates log.SubscriptionID matches the URL param before re-enqueueing
- Error responses: replace err.Error() in 500 handlers with generic messages
  to avoid leaking internal details to callers
- Remove redundant GetWebhook existence checks from Delete and ListDeliveries
  handlers (DeleteWebhook and ListDeliveryLogs handle missing records)
- Config hot path: add 30-second in-memory cache with double-checked locking;
  invalidated immediately on UpdateWebhookConfig
- N+1 inserts: HandleEvent now collects all WebhookEvent rows into a slice and
  issues a single db.Create call
- Composite index on (status, next_run_at) for efficient poller query
- LogRetentionDays in WebhookConfig with daily pruning tick in poller
- LogPruneInterval moved to WebhookServiceConfig (no hardcoded 24h ticker)
- Fix brittle fmt.Sprint slice comparison → reflect.DeepEqual in test
- Fix RetryDelivery test call sites for updated 3-arg signature
@edsonmichaque edsonmichaque marked this pull request as draft March 2, 2026 11:39
@CLAassistant
Copy link
Copy Markdown

CLA assistant check
Thank you for your submission! We really appreciate it. Like many open source projects, we ask that you sign our Contributor License Agreement before we can accept your contribution.
You have signed the CLA already but the status is still pending? Let us recheck it.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants