diff --git a/api/api.go b/api/api.go index 6fedd3269..1f8288000 100644 --- a/api/api.go +++ b/api/api.go @@ -964,6 +964,20 @@ func (a *API) setupRoutes() { v1.GET("/sync/status/:namespace", syncStatusHandlers.GetNamespaceSyncStatus) v1.GET("/sync/audit", syncStatusHandlers.GetSyncAuditLog) + // Outbound webhook routes + webhookHandlers := NewWebhookHandlers(a.service.WebhookService) + v1.POST("/webhooks", webhookHandlers.Create) + v1.GET("/webhooks", webhookHandlers.List) + v1.GET("/webhooks/topics", webhookHandlers.ListTopics) + v1.GET("/webhooks/config", webhookHandlers.GetConfig) + v1.PUT("/webhooks/config", webhookHandlers.UpdateConfig) + v1.GET("/webhooks/:id", webhookHandlers.Get) + v1.PUT("/webhooks/:id", webhookHandlers.Update) + v1.DELETE("/webhooks/:id", webhookHandlers.Delete) + v1.GET("/webhooks/:id/deliveries", webhookHandlers.ListDeliveries) + v1.POST("/webhooks/:id/test", webhookHandlers.Test) + v1.POST("/webhooks/:id/deliveries/:log_id/retry", webhookHandlers.RetryDelivery) + // SSO routes (ENT: full functionality, CE: returns 402 Payment Required) public.GET("/auth/:id/:provider", a.handleTIBAuth) public.POST("/auth/:id/:provider", a.handleTIBAuth) diff --git a/api/webhook_handlers.go b/api/webhook_handlers.go new file mode 100644 index 000000000..c96abfb6d --- /dev/null +++ b/api/webhook_handlers.go @@ -0,0 +1,323 @@ +package api + +import ( + "net/http" + "strconv" + + "github.com/TykTechnologies/midsommar/v2/models" + "github.com/TykTechnologies/midsommar/v2/services" + "github.com/gin-gonic/gin" +) + +// actorID extracts the authenticated user's ID from the gin context. +// Returns 0 if the user is not present (should not happen on admin-only routes). +func actorID(c *gin.Context) uint { + u, ok := c.Get("user") + if !ok { + return 0 + } + user, ok := u.(*models.User) + if !ok { + return 0 + } + return user.ID +} + +// WebhookHandlers provides HTTP handlers for outbound webhook subscription management. +type WebhookHandlers struct { + svc *services.WebhookService +} + +// NewWebhookHandlers constructs a WebhookHandlers. +func NewWebhookHandlers(svc *services.WebhookService) *WebhookHandlers { + return &WebhookHandlers{svc: svc} +} + +func (h *WebhookHandlers) webhookError(c *gin.Context, status int, title, detail string) { + c.JSON(status, ErrorResponse{ + Errors: []struct { + Title string `json:"title"` + Detail string `json:"detail"` + }{{Title: title, Detail: detail}}, + }) +} + +// webhookRequest is the JSON body for create/update. +type webhookRequest struct { + Name string `json:"name"` + URL string `json:"url"` + Secret string `json:"secret"` + Enabled bool `json:"enabled"` + Description string `json:"description"` + Topics []string `json:"topics"` + RetryPolicy models.WebhookRetryPolicy `json:"retry_policy"` + TransportConfig models.WebhookTransportConfig `json:"transport_config"` +} + +// webhookResponse is the JSON representation returned to callers. +type webhookResponse struct { + ID uint `json:"id"` + Name string `json:"name"` + URL string `json:"url"` + Secret string `json:"secret"` + Enabled bool `json:"enabled"` + Description string `json:"description"` + Topics []string `json:"topics"` + RetryPolicy models.WebhookRetryPolicy `json:"retry_policy"` + TransportConfig models.WebhookTransportConfig `json:"transport_config"` +} + +func topicsToStrings(topics []models.WebhookTopic) []string { + out := make([]string, len(topics)) + for i, t := range topics { + out[i] = t.Topic + } + return out +} + +func stringsToTopics(subID uint, topics []string) []models.WebhookTopic { + out := make([]models.WebhookTopic, len(topics)) + for i, t := range topics { + out[i] = models.WebhookTopic{SubscriptionID: subID, Topic: t} + } + return out +} + +func toResponse(sub *models.WebhookSubscription) webhookResponse { + return webhookResponse{ + ID: sub.ID, + Name: sub.Name, + URL: sub.URL, + Secret: sub.Secret, + Enabled: sub.Enabled, + Description: sub.Description, + Topics: topicsToStrings(sub.Topics), + RetryPolicy: sub.RetryPolicy, + TransportConfig: sub.TransportConfig, + } +} + +func applyRequest(sub *models.WebhookSubscription, req webhookRequest) { + sub.Name = req.Name + sub.URL = req.URL + sub.Secret = req.Secret + sub.Enabled = req.Enabled + sub.Description = req.Description + sub.RetryPolicy = req.RetryPolicy + sub.TransportConfig = req.TransportConfig + sub.Topics = stringsToTopics(sub.ID, req.Topics) +} + +// Create handles POST /api/v1/webhooks +func (h *WebhookHandlers) Create(c *gin.Context) { + var req webhookRequest + if err := c.ShouldBindJSON(&req); err != nil { + h.webhookError(c, http.StatusBadRequest, "Bad Request", err.Error()) + return + } + + if err := h.svc.ValidateURL(req.URL); err != nil { + h.webhookError(c, http.StatusBadRequest, "Bad Request", err.Error()) + return + } + + if err := services.ValidateTopics(req.Topics); err != nil { + h.webhookError(c, http.StatusBadRequest, "Bad Request", err.Error()) + return + } + + var sub models.WebhookSubscription + applyRequest(&sub, req) + + if err := h.svc.CreateWebhook(&sub); err != nil { + h.webhookError(c, http.StatusInternalServerError, "Internal Server Error", "failed to create webhook") + return + } + + c.JSON(http.StatusCreated, toResponse(&sub)) +} + +// List handles GET /api/v1/webhooks +func (h *WebhookHandlers) List(c *gin.Context) { + subs, err := h.svc.ListWebhooks() + if err != nil { + h.webhookError(c, http.StatusInternalServerError, "Internal Server Error", "failed to list webhooks") + return + } + + resp := make([]webhookResponse, len(subs)) + for i := range subs { + resp[i] = toResponse(&subs[i]) + } + c.JSON(http.StatusOK, resp) +} + +// Get handles GET /api/v1/webhooks/:id +func (h *WebhookHandlers) Get(c *gin.Context) { + id, err := strconv.ParseUint(c.Param("id"), 10, 64) + if err != nil { + h.webhookError(c, http.StatusBadRequest, "Bad Request", "invalid webhook id") + return + } + + sub, err := h.svc.GetWebhook(uint(id)) + if err != nil { + h.webhookError(c, http.StatusNotFound, "Not Found", "webhook not found") + return + } + + c.JSON(http.StatusOK, toResponse(sub)) +} + +// Update handles PUT /api/v1/webhooks/:id +func (h *WebhookHandlers) Update(c *gin.Context) { + id, err := strconv.ParseUint(c.Param("id"), 10, 64) + if err != nil { + h.webhookError(c, http.StatusBadRequest, "Bad Request", "invalid webhook id") + return + } + + existing, err := h.svc.GetWebhook(uint(id)) + if err != nil { + h.webhookError(c, http.StatusNotFound, "Not Found", "webhook not found") + return + } + + var req webhookRequest + if err := c.ShouldBindJSON(&req); err != nil { + h.webhookError(c, http.StatusBadRequest, "Bad Request", err.Error()) + return + } + + if err := h.svc.ValidateURL(req.URL); err != nil { + h.webhookError(c, http.StatusBadRequest, "Bad Request", err.Error()) + return + } + + if err := services.ValidateTopics(req.Topics); err != nil { + h.webhookError(c, http.StatusBadRequest, "Bad Request", err.Error()) + return + } + + applyRequest(existing, req) + + if err := h.svc.UpdateWebhook(existing); err != nil { + h.webhookError(c, http.StatusInternalServerError, "Internal Server Error", "failed to update webhook") + return + } + + c.JSON(http.StatusOK, toResponse(existing)) +} + +// Delete handles DELETE /api/v1/webhooks/:id +func (h *WebhookHandlers) Delete(c *gin.Context) { + id, err := strconv.ParseUint(c.Param("id"), 10, 64) + if err != nil { + h.webhookError(c, http.StatusBadRequest, "Bad Request", "invalid webhook id") + return + } + + if err := h.svc.DeleteWebhook(uint(id)); err != nil { + h.webhookError(c, http.StatusInternalServerError, "Internal Server Error", "failed to delete webhook") + return + } + + c.Status(http.StatusNoContent) +} + +// ListDeliveries handles GET /api/v1/webhooks/:id/deliveries +func (h *WebhookHandlers) ListDeliveries(c *gin.Context) { + id, err := strconv.ParseUint(c.Param("id"), 10, 64) + if err != nil { + h.webhookError(c, http.StatusBadRequest, "Bad Request", "invalid webhook id") + return + } + + pageSize, pageNumber, _ := getPaginationParams(c) + + logs, totalCount, totalPages, err := h.svc.ListDeliveryLogs(uint(id), pageSize, pageNumber) + if err != nil { + h.webhookError(c, http.StatusInternalServerError, "Internal Server Error", "failed to list deliveries") + return + } + + c.Header("X-Total-Count", strconv.FormatInt(totalCount, 10)) + c.Header("X-Total-Pages", strconv.Itoa(totalPages)) + c.JSON(http.StatusOK, gin.H{"data": logs}) +} + +// Test handles POST /api/v1/webhooks/:id/test +func (h *WebhookHandlers) Test(c *gin.Context) { + id, err := strconv.ParseUint(c.Param("id"), 10, 64) + if err != nil { + h.webhookError(c, http.StatusBadRequest, "Bad Request", "invalid webhook id") + return + } + + sub, err := h.svc.GetWebhook(uint(id)) + if err != nil { + h.webhookError(c, http.StatusNotFound, "Not Found", "webhook not found") + return + } + + if err := h.svc.TestWebhook(sub); err != nil { + h.webhookError(c, http.StatusBadGateway, "Bad Gateway", err.Error()) + return + } + + c.JSON(http.StatusOK, gin.H{"message": "test delivery sent successfully"}) +} + +// RetryDelivery handles POST /api/v1/webhooks/:id/deliveries/:log_id/retry +func (h *WebhookHandlers) RetryDelivery(c *gin.Context) { + id, err := strconv.ParseUint(c.Param("id"), 10, 64) + if err != nil { + h.webhookError(c, http.StatusBadRequest, "Bad Request", "invalid webhook id") + return + } + + logID, err := strconv.ParseUint(c.Param("log_id"), 10, 64) + if err != nil { + h.webhookError(c, http.StatusBadRequest, "Bad Request", "invalid log id") + return + } + + if err := h.svc.RetryDelivery(uint(id), uint(logID), actorID(c)); err != nil { + h.webhookError(c, http.StatusInternalServerError, "Internal Server Error", "failed to enqueue retry") + return + } + + c.Status(http.StatusAccepted) +} + +// ListTopics handles GET /api/v1/webhooks/topics +func (h *WebhookHandlers) ListTopics(c *gin.Context) { + c.JSON(http.StatusOK, services.KnownWebhookTopics) +} + +// GetConfig handles GET /api/v1/webhooks/config +func (h *WebhookHandlers) GetConfig(c *gin.Context) { + cfg, err := h.svc.GetWebhookConfig() + if err != nil { + h.webhookError(c, http.StatusInternalServerError, "Internal Server Error", "failed to get webhook config") + return + } + + c.JSON(http.StatusOK, cfg) +} + +// UpdateConfig handles PUT /api/v1/webhooks/config +func (h *WebhookHandlers) UpdateConfig(c *gin.Context) { + var cfg models.WebhookConfig + if err := c.ShouldBindJSON(&cfg); err != nil { + h.webhookError(c, http.StatusBadRequest, "Bad Request", err.Error()) + return + } + + if err := h.svc.UpdateWebhookConfig(&cfg); err != nil { + h.webhookError(c, http.StatusInternalServerError, "Internal Server Error", "failed to update webhook config") + return + } + + c.JSON(http.StatusOK, cfg) +} diff --git a/config/config.go b/config/config.go index 6efa23b46..d24a10956 100644 --- a/config/config.go +++ b/config/config.go @@ -99,6 +99,14 @@ type AppConf struct { // Submission Configuration MaxResourcePayloadSize int // Max size in bytes for submission resource_payload JSON (default: 5MB) + + // Webhook service static startup configuration (overridable at runtime via DB singleton) + WebhookWorkers int + WebhookQueueSize int + WebhookMaxRetries int + WebhookBackoffSeconds []int // parsed from WEBHOOK_BACKOFF_SECONDS (comma-separated) + WebhookHTTPTimeoutSeconds int + WebhookMaxResponseBodyBytes int } // QueueConfig holds configuration for message queues @@ -516,6 +524,41 @@ func getConfigFromEnv(envFile string) *AppConf { } } + // Webhook service static startup configuration + if v := os.Getenv("WEBHOOK_WORKERS"); v != "" { + if n, err := strconv.Atoi(v); err == nil && n > 0 { + conf.WebhookWorkers = n + } + } + if v := os.Getenv("WEBHOOK_QUEUE_SIZE"); v != "" { + if n, err := strconv.Atoi(v); err == nil && n > 0 { + conf.WebhookQueueSize = n + } + } + if v := os.Getenv("WEBHOOK_MAX_RETRIES"); v != "" { + if n, err := strconv.Atoi(v); err == nil && n > 0 { + conf.WebhookMaxRetries = n + } + } + if v := os.Getenv("WEBHOOK_BACKOFF_SECONDS"); v != "" { + for _, part := range strings.Split(v, ",") { + part = strings.TrimSpace(part) + if n, err := strconv.Atoi(part); err == nil && n >= 0 { + conf.WebhookBackoffSeconds = append(conf.WebhookBackoffSeconds, n) + } + } + } + if v := os.Getenv("WEBHOOK_HTTP_TIMEOUT"); v != "" { + if n, err := strconv.Atoi(v); err == nil && n > 0 { + conf.WebhookHTTPTimeoutSeconds = n + } + } + if v := os.Getenv("WEBHOOK_MAX_RESP_BODY"); v != "" { + if n, err := strconv.Atoi(v); err == nil && n > 0 { + conf.WebhookMaxResponseBodyBytes = n + } + } + return conf } diff --git a/features/Webhooks.md b/features/Webhooks.md new file mode 100644 index 000000000..a0ff53d19 --- /dev/null +++ b/features/Webhooks.md @@ -0,0 +1,473 @@ +# Outbound Webhooks + +## Overview + +Tyk AI Studio can push platform events to external HTTP endpoints (webhooks) in real time. Subscriptions match specific event topics exactly, and the platform delivers a signed JSON payload to the target URL with at-least-once delivery semantics and exponential-backoff retries. + +Retry policy is configurable at three levels: +1. **Static defaults** — set at startup via environment variables (see Configuration) +2. **Global dynamic defaults** — stored in the `webhook_configs` DB singleton, updatable at runtime via API without restart +3. **Per-subscription overrides** — `retry_policy` field on each `WebhookSubscription` + +All webhook endpoints require admin access. + +### Use Cases + +- Slack / Teams notifications on LLM changes or budget alerts +- PagerDuty incidents on repeated delivery failures +- CI/CD pipeline triggers when App configurations are updated +- SIEM ingestion of all platform CRUD audit events + +--- + +## Event Coverage + +Subscribable topics are defined in `services/system_events.go` as `KnownWebhookTopics`. Use `GET /api/v1/webhooks/topics` to retrieve the current list at runtime. + +| Domain | Topics | +|---|---| +| LLM | `system.llm.created`, `system.llm.updated`, `system.llm.deleted` | +| App | `system.app.created`, `system.app.updated`, `system.app.deleted`, `system.app.approved`, `system.app.plugin_resources_changed` | +| User | `system.user.created`, `system.user.updated`, `system.user.deleted` | +| Group | `system.group.created`, `system.group.updated`, `system.group.deleted` | +| Datasource | `system.datasource.created`, `system.datasource.updated`, `system.datasource.deleted` | +| Tool | `system.tool.created`, `system.tool.updated`, `system.tool.deleted` | +| Filter | `system.filter.created`, `system.filter.updated`, `system.filter.deleted` | +| Plugin | `system.plugin.created`, `system.plugin.updated`, `system.plugin.deleted` | +| Model Price | `system.modelprice.created`, `system.modelprice.updated`, `system.modelprice.deleted` | +| Model Router | `system.modelrouter.created`, `system.modelrouter.updated`, `system.modelrouter.deleted` | + +Each topic in `topics` must be an exact match against a known topic. Unknown topics are rejected at create/update time. + +--- + +## Architecture + +```mermaid +flowchart TD + A[Service Layer] -->|Publish| B[eventbridge.Bus] + B -->|SubscribeAll| C[WebhookService.HandleEvent] + C -->|Persist row| D[WebhookEvent table\nstatus=pending] + C -->|Non-blocking push| E[queue chan uint] + E --> F[Worker goroutines] + G[DB Poller] -->|Poll due events| D + G -->|Push IDs| E + F -->|resolveRetryPolicy| P[Merge: sub > DB global > static] + P -->|SSRF check + HMAC sign| H[HTTP POST → external URL] + H -->|2xx| I[Persist success log\nmark event delivered] + H -->|failure| J[Persist failed log\nschedule next attempt] + J -->|attempt < maxAttempts| K[Update next_run_at\npoller picks up] + J -->|attempt == maxAttempts| L[Mark event exhausted] + M[Manual RetryDelivery API] -->|New WebhookEvent row| D +``` + +**Key properties:** +- `HandleEvent` returns immediately (non-blocking — persists DB row then tries to enqueue) +- Events survive process restarts (stored in `webhook_events` table) +- Atomic `UPDATE WHERE status='pending' SET status='in_flight'` prevents double-delivery across workers +- DB poller recovers stale in-flight events after crashes +- Queue buffer and worker count configurable (defaults: 512 / 4) +- HTTP timeout per attempt is configurable (default: 15 seconds) +- Retry policy resolved at execution time — changing DB config takes effect immediately + +--- + +## Delivery Guarantee + +Webhooks are delivered **at-least-once**. Events are persisted to the `webhook_events` table before delivery, so they survive process restarts. Deduplication of duplicate deliveries is the receiver's responsibility. Use the `X-Tyk-Event-ID` header (UUID) and the `attempt_number` field in the delivery log to deduplicate. + +--- + +## SSRF Protection + +Webhook URLs are validated at both **write time** (Create/Update) and **delivery time** (defence-in-depth). The validator: +- Rejects empty URLs +- Rejects non-HTTP/HTTPS schemes +- Resolves the hostname via DNS and rejects any IP in private/internal CIDRs: + - `10.0.0.0/8`, `172.16.0.0/12`, `192.168.0.0/16` + - `127.0.0.0/8` (loopback), `169.254.0.0/16` (link-local) + - `::1/128`, `fc00::/7` (IPv6 private) + +SSRF protection is controlled by `WebhookServiceConfig.AllowInternalNetwork`, which is resolved from the `ALLOW_INTERNAL_NETWORK_ACCESS` environment variable at service construction time. Setting this to `true` disables SSRF checks — useful for local development or internal deployments. + +--- + +## HMAC Signature & Replay Protection + +When a `secret` is configured on the subscription, each delivery includes two headers: + +``` +X-Tyk-Timestamp: 1709123456 +X-Tyk-Signature: sha256= +``` + +The signature is computed over `"."` — the timestamp is part of the signed material. This prevents replay attacks: an attacker who captures a valid `(timestamp, body, signature)` tuple cannot reuse it once the receiver's tolerance window (typically 5 minutes) has elapsed, because the timestamp embedded in the signed material will be stale. + +### Verification (Go) + +```go +import ( + "crypto/hmac" + "crypto/sha256" + "encoding/hex" + "fmt" + "net/http" + "strconv" + "time" +) + +const maxSkew = 5 * time.Minute + +func verifySignature(r *http.Request, body []byte, secret string) error { + tsStr := r.Header.Get("X-Tyk-Timestamp") + tsUnix, err := strconv.ParseInt(tsStr, 10, 64) + if err != nil { + return fmt.Errorf("missing or invalid X-Tyk-Timestamp") + } + if d := time.Since(time.Unix(tsUnix, 0)); d > maxSkew || d < -maxSkew { + return fmt.Errorf("timestamp outside tolerance window (possible replay)") + } + signed := []byte(fmt.Sprintf("%d.%s", tsUnix, body)) + mac := hmac.New(sha256.New, []byte(secret)) + mac.Write(signed) + expected := "sha256=" + hex.EncodeToString(mac.Sum(nil)) + if !hmac.Equal([]byte(r.Header.Get("X-Tyk-Signature")), []byte(expected)) { + return fmt.Errorf("invalid signature") + } + return nil +} +``` + +### Verification (Python) + +```python +import hmac, hashlib, time + +MAX_SKEW = 300 # seconds + +def verify(secret: str, body: bytes, headers: dict) -> None: + ts = int(headers["X-Tyk-Timestamp"]) + if abs(time.time() - ts) > MAX_SKEW: + raise ValueError("stale timestamp — possible replay attack") + signed = f"{ts}.".encode() + body + expected = "sha256=" + hmac.new(secret.encode(), signed, hashlib.sha256).hexdigest() + if not hmac.compare_digest(expected, headers["X-Tyk-Signature"]): + raise ValueError("invalid signature") +``` + +If `secret` is empty, no signature or timestamp headers are sent and verification is skipped. + +--- + +## Request Format + +Each delivery is an HTTP POST with the following headers: + +| Header | Value | +|---|---| +| `Content-Type` | `application/json` | +| `X-Tyk-Timestamp` | Unix epoch of the delivery attempt (omitted if no secret) | +| `X-Tyk-Signature` | `sha256=` (omitted if no secret) | +| `X-Tyk-Event-Topic` | Event topic string | +| `X-Tyk-Event-ID` | Event UUID | +| `X-Tyk-Delivery-Attempt` | Attempt number (1-based) | + +The body is the JSON-marshalled `eventbridge.Event`. For manual retries, the original payload from the delivery log is replayed verbatim. + +Response body is capped (default 4KB, configurable) before storage in the delivery log. + +--- + +## Retry Schedule (Default) + +| Attempt | Delay Before Next Retry | +|---|---| +| 1 (initial failure) | 10 seconds | +| 2 | 30 seconds | +| 3 | 2 minutes | +| 4 | 10 minutes | +| 5 (final) | No retry | + +Configurable at global level (DB singleton) or per subscription (see `retry_policy` field). + +--- + +## Configuration + +### Static Startup Configuration (Environment Variables) + +These are set once at startup and serve as the last-resort fallback. + +| Env Var | Default | Description | +|---|---|---| +| `WEBHOOK_WORKERS` | `4` | Number of worker goroutines | +| `WEBHOOK_QUEUE_SIZE` | `512` | Job queue buffer size | +| `WEBHOOK_MAX_RETRIES` | `5` | Maximum delivery attempts | +| `WEBHOOK_BACKOFF_SECONDS` | `10,30,120,600,1800` | Comma-separated backoff delays | +| `WEBHOOK_HTTP_TIMEOUT` | `15` | HTTP timeout per attempt (seconds) | +| `WEBHOOK_MAX_RESP_BODY` | `4096` | Max response body bytes to store | +| `ALLOW_INTERNAL_NETWORK_ACCESS` | `false` | Disable SSRF protection (development/internal only) | + +### Global Dynamic Configuration (DB Singleton, runtime-updatable) + +Managed via `GET/PUT /api/v1/webhooks/config`. Fields with zero values defer to static defaults. + +```json +{ + "workers": 0, + "queue_size": 0, + "default_retry_policy": { + "max_attempts": 3, + "backoff_seconds": [5, 15, 60], + "timeout_seconds": 10 + }, + "max_response_body_bytes": 0 +} +``` + +### Per-Subscription Override + +Set `retry_policy` and/or `transport_config` on any subscription. Zero values in `retry_policy` inherit from the DB global config or static defaults. + +```json +{ + "retry_policy": { + "max_attempts": 1, + "timeout_seconds": 5 + }, + "transport_config": { + "proxy_url": "http://proxy.internal:3128", + "insecure_skip_verify": false, + "tls_ca_cert": "-----BEGIN CERTIFICATE-----\n...", + "tls_client_cert": "-----BEGIN CERTIFICATE-----\n...", + "tls_client_key": "-----BEGIN EC PRIVATE KEY-----\n..." + } +} +``` + +**Retry policy merge precedence:** per-subscription > DB global > static startup + +--- + +## API Endpoints + +All endpoints require authentication (`Authorization: Bearer `) and admin role. + +### Subscription Management + +| Method | Path | Description | +|---|---|---| +| `POST` | `/api/v1/webhooks` | Create a new subscription | +| `GET` | `/api/v1/webhooks` | List all subscriptions | +| `GET` | `/api/v1/webhooks/topics` | List all subscribable event topics | +| `GET` | `/api/v1/webhooks/:id` | Get a subscription by ID | +| `PUT` | `/api/v1/webhooks/:id` | Update a subscription | +| `DELETE` | `/api/v1/webhooks/:id` | Delete a subscription (204) | +| `GET` | `/api/v1/webhooks/:id/deliveries` | List delivery logs (`?limit=50`) | +| `POST` | `/api/v1/webhooks/:id/test` | Send a test event | +| `POST` | `/api/v1/webhooks/:id/deliveries/:log_id/retry` | Replay a specific delivery (202) | + +### Global Config + +| Method | Path | Description | +|---|---|---| +| `GET` | `/api/v1/webhooks/config` | Get global webhook config singleton | +| `PUT` | `/api/v1/webhooks/config` | Update global webhook config singleton | + +### Create / Update Payload + +```json +{ + "name": "slack-notifications", + "url": "https://hooks.slack.com/services/...", + "secret": "my-signing-secret", + "topics": ["system.llm.created", "system.app.approved"], + "enabled": true, + "description": "Notify Slack on key platform events", + "retry_policy": { + "max_attempts": 3, + "timeout_seconds": 10 + }, + "transport_config": { + "proxy_url": "", + "insecure_skip_verify": false + } +} +``` + +--- + +## Data Models + +### `WebhookRetryPolicy` (value type, embedded in subscription and config) + +| Field | Type | Description | +|---|---|---| +| `max_attempts` | int | 0 = use next-level default | +| `backoff_seconds` | []int | nil = use next-level default | +| `timeout_seconds` | int | 0 = use next-level default | + +### `WebhookTransportConfig` (value type, embedded in subscription) + +| Field | Type | Description | +|---|---|---| +| `proxy_url` | string | HTTP/HTTPS/SOCKS5 proxy for outbound requests | +| `insecure_skip_verify` | bool | Disable TLS certificate validation (self-signed certs) | +| `tls_ca_cert` | string | PEM-encoded CA certificate to trust (private CA) | +| `tls_client_cert` | string | PEM-encoded client certificate for mTLS | +| `tls_client_key` | string | PEM-encoded client private key for mTLS | + +### `WebhookSubscription` + +| Field | Type | Description | +|---|---|---| +| `id` | uint | Auto-increment primary key | +| `name` | string | Human-readable name | +| `url` | string | Target HTTP endpoint | +| `secret` | string | HMAC signing secret (optional) | +| `topics` | []WebhookTopic | Subscribed event topics (join table) | +| `enabled` | bool | Default: true | +| `description` | string | Optional description | +| `retry_policy` | WebhookRetryPolicy | Per-subscription retry overrides | +| `transport_config` | WebhookTransportConfig | Per-subscription HTTP transport settings | +| `created_at` | time | GORM timestamp | +| `updated_at` | time | GORM timestamp | + +### `WebhookTopic` (join table) + +| Field | Type | Description | +|---|---|---| +| `id` | uint | Auto-increment primary key | +| `subscription_id` | uint | Foreign key → `webhook_subscriptions.id` (CASCADE delete) | +| `topic` | string | Exact event topic string | + +Composite unique index on `(subscription_id, topic)` prevents duplicates. + +### `WebhookEvent` (persistent delivery queue) + +| Field | Type | Description | +|---|---|---| +| `id` | uint | Auto-increment primary key | +| `subscription_id` | uint | Foreign key (indexed) | +| `event_topic` | string | Topic of the event | +| `event_id` | string | UUID of the event | +| `payload` | text | Full JSON payload to deliver | +| `attempt_number` | int | Current attempt (1-based) | +| `status` | string | `pending` / `in_flight` / `delivered` / `exhausted` | +| `next_run_at` | time | When this event is next due (indexed) | + +### `WebhookDeliveryLog` (audit log) + +| Field | Type | Description | +|---|---|---| +| `id` | uint | Auto-increment primary key | +| `subscription_id` | uint | Foreign key (indexed) | +| `event_topic` | string | Topic of the delivered event | +| `event_id` | string | UUID of the event | +| `payload` | text | Full JSON payload sent | +| `attempt_number` | int | 1-based attempt counter | +| `status` | string | `success` / `failed` | +| `http_status_code` | int | HTTP response code | +| `response_body` | text | Response body (capped at configured limit) | +| `error_message` | text | Network/HTTP error message | +| `attempted_at` | time | When this attempt was made | +| `next_retry_at` | *time | Scheduled retry time (nil if done) | + +### `WebhookConfig` (singleton, ID=1) + +| Field | Type | Description | +|---|---|---| +| `id` | uint | Always 1 (singleton) | +| `workers` | int | 0 = use static default | +| `queue_size` | int | 0 = use static default | +| `default_retry_policy` | WebhookRetryPolicy | Global defaults for all subscriptions | +| `max_response_body_bytes` | int | 0 = use static default | + +--- + +## Code References + +| Component | File | +|---|---| +| Data models | `models/webhook.go` | +| AutoMigrate registration | `models/models.go` | +| Service (dispatch, HMAC, retry, CRUD, config) | `services/webhook_service.go` | +| Service wiring (struct, SetEventBus, Cleanup) | `services/service.go` | +| Standalone mode + worker startup | `main.go` | +| HTTP handlers | `api/webhook_handlers.go` | +| Route registration | `api/api.go` | +| Static config env vars | `config/config.go` | +| Known topics list | `services/system_events.go` | +| Unit tests | `services/webhook_service_test.go` | + +--- + +## Testing Strategy + +### Unit Tests (`services/webhook_service_test.go`) + +- **`TestTopicMatching_Exact`** — exact topic matches; wrong topic does not +- **`TestComputeHMAC`** — deterministic for same inputs; differs when timestamp differs +- **`TestComputeHMAC_EmptySecret`** — empty secret returns empty string +- **`TestDefaultBackoffDuration`** — default config backoff values match spec +- **`TestValidateWebhookURL_Empty`** — empty URL rejected +- **`TestValidateWebhookURL_BadScheme`** — non-HTTP/HTTPS scheme rejected +- **`TestValidateWebhookURL_PrivateLoopback`** — loopback address rejected by SSRF check +- **`TestValidateWebhookURL_AllowInternal`** — loopback allowed when `AllowInternalNetwork=true` +- **`TestHandleEvent_PersistsQueue`** — HandleEvent creates WebhookEvent row in DB +- **`TestDelivery_Success`** — mock server 200; event `status=delivered`, log `status=success` +- **`TestDelivery_Failure`** — mock server 500; event `status=exhausted`, log `status=failed` +- **`TestDelivery_Retry`** — server 500 then 200; second log row `status=success` +- **`TestDelivery_MaxRetries`** — server always 503; exactly N log rows, event `status=exhausted` +- **`TestTestWebhook_Success`** — smoke-tests TestWebhook against a 200 server +- **`TestTestWebhook_Failure`** — TestWebhook against a 403 server returns error +- **`TestWebhookCRUD`** — create, get, list, update, delete subscription +- **`TestRetryPolicy_Resolution_StaticDefaults`** — zero-value sub uses static defaults +- **`TestRetryPolicy_Resolution_DBGlobal`** — DB global overrides static defaults +- **`TestRetryPolicy_Resolution_PerSubscriptionOverride`** — sub override wins over DB global +- **`TestRetryDelivery`** — manual retry creates new WebhookEvent; delivery succeeds +- **`TestRetryDelivery_NotFound`** — unknown log ID returns error +- **`TestWebhookConfig_DefaultsAndUpdate`** — singleton read/write round-trip +- **`TestListWebhooks`** — list returns all subscriptions +- **`TestWebhookTransportConfig_Proxy`** — invalid proxy URL rejected; valid proxy builds client +- **`TestWebhookTransportConfig_InsecureSkipVerify`** — InsecureSkipVerify propagated to transport +- **`TestWebhookTransportConfig_InvalidCACert`** — malformed PEM CA cert returns error + +### Manual Smoke Test + +```bash +# 1. List available topics +curl http://localhost:8080/api/v1/webhooks/topics \ + -H "Authorization: Bearer " + +# 2. Create a subscription +curl -X POST http://localhost:8080/api/v1/webhooks \ + -H "Authorization: Bearer " \ + -H "Content-Type: application/json" \ + -d '{ + "name": "test", + "url": "https://webhook.site/your-id", + "secret": "mysecret", + "topics": ["system.llm.created"], + "enabled": true + }' + +# 3. Trigger an event (e.g. create an LLM via the admin UI or API) + +# 4. Check delivery logs +curl http://localhost:8080/api/v1/webhooks/1/deliveries \ + -H "Authorization: Bearer " + +# 5. Manually retry a specific delivery +curl -X POST http://localhost:8080/api/v1/webhooks/1/deliveries/5/retry \ + -H "Authorization: Bearer " + +# 6. View / update global config +curl http://localhost:8080/api/v1/webhooks/config \ + -H "Authorization: Bearer " + +curl -X PUT http://localhost:8080/api/v1/webhooks/config \ + -H "Authorization: Bearer " \ + -H "Content-Type: application/json" \ + -d '{"default_retry_policy":{"max_attempts":3,"backoff_seconds":[5,30,120]}}' +``` diff --git a/main.go b/main.go index 34e49f9aa..3dfba602b 100644 --- a/main.go +++ b/main.go @@ -25,6 +25,7 @@ import ( "github.com/TykTechnologies/midsommar/v2/logger" "github.com/TykTechnologies/midsommar/v2/models" "github.com/TykTechnologies/midsommar/v2/notifications" + "github.com/TykTechnologies/midsommar/v2/pkg/eventbridge" "github.com/TykTechnologies/midsommar/v2/pkg/ociplugins" "github.com/TykTechnologies/midsommar/v2/proxy" "github.com/TykTechnologies/midsommar/v2/secrets" @@ -142,6 +143,17 @@ func main() { service := services.NewServiceWithOCI(db, ociConfig) + // Replace the default webhook service with one configured from environment variables. + webhookCfg := services.WebhookServiceConfigFromValues( + appConf.WebhookWorkers, + appConf.WebhookQueueSize, + appConf.WebhookMaxRetries, + appConf.WebhookHTTPTimeoutSeconds, + appConf.WebhookMaxResponseBodyBytes, + appConf.WebhookBackoffSeconds, + ) + service.WebhookService = services.NewWebhookService(db, webhookCfg) + // Wire licensing service to main service for plugin license checks service.SetLicensingService(licensingService) @@ -284,6 +296,13 @@ func main() { logger.Info("Gateway not started - feature_gateway not in license entitlements") } + // Wire a local event bus so webhooks work in standalone mode. + // In control mode, SetEventBus is called again below with the gRPC-backed bus, + // which re-subscribes the webhook service to the correct bus. + localBus := eventbridge.NewBus() + service.SetEventBus(localBus) + logger.Info("Local event bus initialized") + // Initialize gRPC control server and reload coordinator if in control mode var controlServer *grpc.ControlServer var reloadCoordinator *services.ReloadCoordinator @@ -360,6 +379,14 @@ func main() { } } + // Start webhook workers after all event bus wiring is complete. + if service.WebhookService != nil { + webhookCtx, webhookCancel := context.WithCancel(context.Background()) + defer webhookCancel() + go service.WebhookService.Start(webhookCtx) + logger.Info("Webhook service workers started") + } + noDocsArg := appConf.DocsDisabled docsPortArg := appConf.DocsPort for i, arg := range os.Args { diff --git a/models/models.go b/models/models.go index 507eed4b7..36cb906d0 100644 --- a/models/models.go +++ b/models/models.go @@ -93,6 +93,12 @@ func InitModels(db *gorm.DB) error { &PluginResourceType{}, // Plugin-registered resource types &AppPluginResource{}, // App ↔ plugin resource instance associations &GroupPluginResource{}, // Group ↔ plugin resource instance access control + // Outbound Webhooks + &WebhookSubscription{}, // Webhook endpoint subscriptions + &WebhookTopic{}, // Subscription–topic join table + &WebhookEvent{}, // Persistent delivery queue + &WebhookDeliveryLog{}, // Per-attempt delivery audit log + &WebhookConfig{}, // Global webhook runtime configuration (singleton) ); err != nil { return err } diff --git a/models/webhook.go b/models/webhook.go new file mode 100644 index 000000000..555d62f27 --- /dev/null +++ b/models/webhook.go @@ -0,0 +1,146 @@ +package models + +import ( + "time" + + "github.com/TykTechnologies/midsommar/v2/secrets" + "gorm.io/gorm" +) + +// WebhookTransportConfig holds per-subscription HTTP transport settings. +// Zero/empty values mean "use service defaults". +type WebhookTransportConfig struct { + // ProxyURL is an optional HTTP/HTTPS/SOCKS5 proxy for outbound requests. + ProxyURL string `json:"proxy_url,omitempty"` + // InsecureSkipVerify disables TLS certificate validation for this endpoint. + // Use only when the target uses a self-signed cert you trust. + InsecureSkipVerify bool `json:"insecure_skip_verify,omitempty"` + // TLSCACert is an optional PEM-encoded CA certificate to trust when + // validating the server's TLS certificate (e.g. private CA). + TLSCACert string `json:"tls_ca_cert,omitempty"` + // TLSClientCert and TLSClientKey are PEM-encoded client cert/key for mTLS. + TLSClientCert string `json:"tls_client_cert,omitempty"` + TLSClientKey string `json:"tls_client_key,omitempty"` +} + +const ( + DeliveryStatusPending = "pending" + DeliveryStatusSuccess = "success" + DeliveryStatusFailed = "failed" +) + +const WebhookConfigSingletonID = 1 + +// WebhookRetryPolicy is a value type shared by WebhookConfig (global defaults) and +// WebhookSubscription (per-subscription overrides). +// Zero values mean "use the next level's default". +type WebhookRetryPolicy struct { + MaxAttempts int `json:"max_attempts"` + BackoffSeconds []int `json:"backoff_seconds"` + TimeoutSeconds int `json:"timeout_seconds"` +} + +// WebhookTopic is a row in the subscription–topic join table. +type WebhookTopic struct { + ID uint `json:"id" gorm:"primaryKey;autoIncrement"` + SubscriptionID uint `json:"subscription_id" gorm:"not null;index;uniqueIndex:idx_sub_topic"` + Topic string `json:"topic" gorm:"not null;index;uniqueIndex:idx_sub_topic"` +} + +// WebhookSubscription defines an outbound webhook destination. +type WebhookSubscription struct { + gorm.Model + Name string `json:"name"` + URL string `json:"url"` + Secret string `json:"secret"` + Enabled bool `json:"enabled" gorm:"default:true"` + Description string `json:"description"` + RetryPolicy WebhookRetryPolicy `json:"retry_policy" gorm:"serializer:json"` + TransportConfig WebhookTransportConfig `json:"transport_config" gorm:"serializer:json"` + Topics []WebhookTopic `json:"topics" gorm:"foreignKey:SubscriptionID;constraint:OnDelete:CASCADE"` +} + +// BeforeSave encrypts sensitive fields before writing to the database. +// Uses the same AES-256 key (TYK_AI_SECRET_KEY) and $ENC/ prefix convention +// as the rest of the platform. A no-op if no key is configured. +func (s *WebhookSubscription) BeforeSave(tx *gorm.DB) error { + s.Secret = secrets.EncryptValue(s.Secret) + s.TransportConfig.ProxyURL = secrets.EncryptValue(s.TransportConfig.ProxyURL) + s.TransportConfig.TLSCACert = secrets.EncryptValue(s.TransportConfig.TLSCACert) + s.TransportConfig.TLSClientCert = secrets.EncryptValue(s.TransportConfig.TLSClientCert) + s.TransportConfig.TLSClientKey = secrets.EncryptValue(s.TransportConfig.TLSClientKey) + return nil +} + +// AfterFind decrypts sensitive fields after loading from the database. +func (s *WebhookSubscription) AfterFind(tx *gorm.DB) error { + s.Secret = secrets.DecryptValue(s.Secret) + s.TransportConfig.ProxyURL = secrets.DecryptValue(s.TransportConfig.ProxyURL) + s.TransportConfig.TLSCACert = secrets.DecryptValue(s.TransportConfig.TLSCACert) + s.TransportConfig.TLSClientCert = secrets.DecryptValue(s.TransportConfig.TLSClientCert) + s.TransportConfig.TLSClientKey = secrets.DecryptValue(s.TransportConfig.TLSClientKey) + return nil +} + +// WebhookEvent is the persistent delivery queue. +type WebhookEvent struct { + gorm.Model + SubscriptionID uint `json:"subscription_id" gorm:"not null;index"` + EventTopic string `json:"event_topic"` + EventID string `json:"event_id"` + Payload string `json:"payload" gorm:"type:text"` + AttemptNumber int `json:"attempt_number" gorm:"default:1"` + Status string `json:"status" gorm:"index:idx_webhook_events_status_next_run"` + NextRunAt time.Time `json:"next_run_at" gorm:"index:idx_webhook_events_status_next_run"` + // TriggeredBy is non-zero for manually-triggered retries; holds the actor's user ID. + TriggeredBy uint `json:"triggered_by" gorm:"default:0"` +} + +// WebhookDeliveryLog records each HTTP delivery attempt for audit. +// CreatedAt is indexed to support efficient retention-based pruning. +type WebhookDeliveryLog struct { + gorm.Model + SubscriptionID uint `json:"subscription_id" gorm:"not null;index"` + EventTopic string `json:"event_topic"` + EventID string `json:"event_id"` + Payload string `json:"payload" gorm:"type:text"` + AttemptNumber int `json:"attempt_number"` + Status string `json:"status"` + HTTPStatusCode int `json:"http_status_code"` + ResponseBody string `json:"response_body" gorm:"type:text"` + ErrorMessage string `json:"error_message" gorm:"type:text"` + AttemptedAt time.Time `json:"attempted_at"` + NextRetryAt *time.Time `json:"next_retry_at"` +} + +// WebhookConfig is a DB singleton (ID=1) that holds dynamic global defaults. +type WebhookConfig struct { + gorm.Model + ID uint `json:"id" gorm:"primaryKey"` + Workers int `json:"workers"` + QueueSize int `json:"queue_size"` + DefaultRetryPolicy WebhookRetryPolicy `json:"default_retry_policy" gorm:"serializer:json"` + MaxResponseBodyBytes int `json:"max_response_body_bytes"` + // LogRetentionDays is how many days of delivery logs to keep. 0 = keep forever. + LogRetentionDays int `json:"log_retention_days"` +} + +func NewDefaultWebhookConfig() *WebhookConfig { + return &WebhookConfig{ID: WebhookConfigSingletonID} +} + +func GetWebhookConfig(db *gorm.DB) (*WebhookConfig, error) { + cfg := NewDefaultWebhookConfig() + // FirstOrCreate is atomic: safe under concurrent calls at startup. + if err := db.Where(WebhookConfig{ID: WebhookConfigSingletonID}). + Attrs(cfg). + FirstOrCreate(cfg).Error; err != nil { + return nil, err + } + return cfg, nil +} + +func UpdateWebhookConfig(db *gorm.DB, cfg *WebhookConfig) error { + cfg.ID = WebhookConfigSingletonID + return db.Save(cfg).Error +} diff --git a/services/service.go b/services/service.go index 2cc835727..493620c38 100644 --- a/services/service.go +++ b/services/service.go @@ -47,6 +47,8 @@ type Service struct { ModelRouterService model_router.Service // Sync Status (Hub-and-Spoke) SyncStatusService *SyncStatusService + // Outbound Webhooks + WebhookService *WebhookService } func NewService(db *gorm.DB) *Service { @@ -73,6 +75,7 @@ func NewServiceWithOCI(db *gorm.DB, ociConfig *ociplugins.OCIConfig) *Service { namespaceService := NewNamespaceService(db, edgeService) edgeManagementService := edge_management.NewService(db) syncStatusService := NewSyncStatusService(db) + webhookService := NewWebhookService(db, DefaultWebhookServiceConfig()) // Initialize plugin services with OCI support var pluginService *PluginService @@ -180,6 +183,7 @@ func NewServiceWithOCI(db *gorm.DB, ociConfig *ociplugins.OCIConfig) *Service { HookManager: hookManager, ModelRouterService: modelRouterSvc, SyncStatusService: syncStatusService, + WebhookService: webhookService, } // Wire service reference to AI Studio plugin manager for proper service provider injection @@ -219,6 +223,12 @@ func (s *Service) Cleanup() error { var errors []error + // Stop webhook service + if s.WebhookService != nil { + logger.Info("Stopping webhook service...") + s.WebhookService.Stop() + } + // Stop log export service (cleanup goroutine) if s.LogExportService != nil { logger.Info("Stopping log export service...") @@ -298,6 +308,10 @@ func (s *Service) SetEventBus(bus eventbridge.Bus) { s.SystemEvents = NewSystemEventEmitter(bus, "control") s.SubscribeResourceInstanceChanges(bus) logger.Debug("Initialized system event emitter") + if s.WebhookService != nil { + bus.SubscribeAll(s.WebhookService.HandleEvent) + logger.Debug("Webhook service subscribed to all events") + } } } diff --git a/services/system_events.go b/services/system_events.go index cc81630fa..087f6acec 100644 --- a/services/system_events.go +++ b/services/system_events.go @@ -72,6 +72,42 @@ type ObjectEventPayload struct { Object interface{} `json:"object"` // The full object (for create/update, nil for delete) } +// KnownWebhookTopics is the authoritative list of topics that can be subscribed to via webhooks. +var KnownWebhookTopics = []string{ + TopicLLMCreated, + TopicLLMUpdated, + TopicLLMDeleted, + TopicAppCreated, + TopicAppUpdated, + TopicAppDeleted, + TopicAppApproved, + TopicAppPluginResourcesChanged, + TopicDatasourceCreated, + TopicDatasourceUpdated, + TopicDatasourceDeleted, + TopicUserCreated, + TopicUserUpdated, + TopicUserDeleted, + TopicGroupCreated, + TopicGroupUpdated, + TopicGroupDeleted, + TopicToolCreated, + TopicToolUpdated, + TopicToolDeleted, + TopicFilterCreated, + TopicFilterUpdated, + TopicFilterDeleted, + TopicPluginCreated, + TopicPluginUpdated, + TopicPluginDeleted, + TopicModelPriceCreated, + TopicModelPriceUpdated, + TopicModelPriceDeleted, + TopicModelRouterCreated, + TopicModelRouterUpdated, + TopicModelRouterDeleted, +} + // SystemEventEmitter provides a clean interface for emitting system events type SystemEventEmitter struct { bus eventbridge.Bus diff --git a/services/webhook_service.go b/services/webhook_service.go new file mode 100644 index 000000000..34d2ee37a --- /dev/null +++ b/services/webhook_service.go @@ -0,0 +1,801 @@ +package services + +import ( + "context" + "crypto/hmac" + "crypto/sha256" + "crypto/tls" + "crypto/x509" + "encoding/hex" + "encoding/json" + "fmt" + "io" + "net" + "net/http" + neturl "net/url" + "os" + "strings" + "sync" + "time" + + "github.com/TykTechnologies/midsommar/v2/logger" + "github.com/TykTechnologies/midsommar/v2/models" + "github.com/TykTechnologies/midsommar/v2/pkg/eventbridge" + "gorm.io/gorm" +) + +type WebhookServiceConfig struct { + Workers int + QueueSize int + DefaultMaxAttempts int + DefaultBackoff []time.Duration + DefaultHTTPTimeout time.Duration + MaxResponseBodyBytes int + PollInterval time.Duration + LogPruneInterval time.Duration // How often to prune old delivery logs; default 24h + AllowInternalNetwork bool // Disable SSRF protection (e.g. for testing or internal deployments) +} + +func DefaultWebhookServiceConfig() WebhookServiceConfig { + return WebhookServiceConfig{ + Workers: 4, + QueueSize: 512, + DefaultMaxAttempts: 5, + DefaultBackoff: []time.Duration{ + 10 * time.Second, + 30 * time.Second, + 2 * time.Minute, + 10 * time.Minute, + 30 * time.Minute, + }, + DefaultHTTPTimeout: 15 * time.Second, + MaxResponseBodyBytes: 4 * 1024, + PollInterval: 5 * time.Second, + LogPruneInterval: 24 * time.Hour, + } +} + +func WebhookServiceConfigFromValues(workers, queueSize, maxRetries, httpTimeoutSecs, maxRespBody int, backoffSecs []int) WebhookServiceConfig { + cfg := DefaultWebhookServiceConfig() + if workers > 0 { + cfg.Workers = workers + } + if queueSize > 0 { + cfg.QueueSize = queueSize + } + if maxRetries > 0 { + cfg.DefaultMaxAttempts = maxRetries + } + if httpTimeoutSecs > 0 { + cfg.DefaultHTTPTimeout = time.Duration(httpTimeoutSecs) * time.Second + } + if maxRespBody > 0 { + cfg.MaxResponseBodyBytes = maxRespBody + } + if len(backoffSecs) > 0 { + cfg.DefaultBackoff = make([]time.Duration, len(backoffSecs)) + for i, s := range backoffSecs { + cfg.DefaultBackoff[i] = time.Duration(s) * time.Second + } + } + cfg.AllowInternalNetwork = os.Getenv("ALLOW_INTERNAL_NETWORK_ACCESS") == "true" + return cfg +} + +type resolvedPolicy struct { + maxAttempts int + backoff []time.Duration + httpTimeout time.Duration +} + +// cachedConfig holds a short-lived in-memory copy of WebhookConfig to avoid +// a DB round-trip on every delivery attempt in the hot path. +type cachedConfig struct { + mu sync.RWMutex + cfg *models.WebhookConfig + fetchedAt time.Time +} + +const webhookConfigCacheTTL = 30 * time.Second + +type WebhookService struct { + db *gorm.DB + cfg WebhookServiceConfig + queue chan uint // WebhookEvent IDs + stopCh chan struct{} + configCache cachedConfig +} + +func NewWebhookService(db *gorm.DB, cfg WebhookServiceConfig) *WebhookService { + return &WebhookService{ + db: db, + cfg: cfg, + queue: make(chan uint, cfg.QueueSize), + stopCh: make(chan struct{}), + } +} + +// getCachedConfig returns the WebhookConfig, using a 30-second in-memory cache +// to avoid a DB query on every delivery attempt. +func (s *WebhookService) getCachedConfig() *models.WebhookConfig { + s.configCache.mu.RLock() + if s.configCache.cfg != nil && time.Since(s.configCache.fetchedAt) < webhookConfigCacheTTL { + cfg := s.configCache.cfg + s.configCache.mu.RUnlock() + return cfg + } + s.configCache.mu.RUnlock() + + s.configCache.mu.Lock() + defer s.configCache.mu.Unlock() + // Re-check after acquiring write lock (another goroutine may have populated it). + if s.configCache.cfg != nil && time.Since(s.configCache.fetchedAt) < webhookConfigCacheTTL { + return s.configCache.cfg + } + if cfg, err := models.GetWebhookConfig(s.db); err == nil { + s.configCache.cfg = cfg + s.configCache.fetchedAt = time.Now() + return cfg + } + return s.configCache.cfg // Return stale value on error rather than nil +} + +// invalidateConfigCache clears the cached config, forcing the next call to +// getCachedConfig to re-read from the database. +func (s *WebhookService) invalidateConfigCache() { + s.configCache.mu.Lock() + s.configCache.cfg = nil + s.configCache.mu.Unlock() +} + +func (s *WebhookService) Start(ctx context.Context) { + // Reset any events left in_flight from a previous crash back to pending. + s.db.Model(&models.WebhookEvent{}). + Where("status = ?", "in_flight"). + Updates(map[string]interface{}{"status": models.DeliveryStatusPending}) + + for i := 0; i < s.cfg.Workers; i++ { + go s.worker(ctx) + } + go s.poller(ctx) +} + +func (s *WebhookService) Stop() { + close(s.stopCh) +} + +// poller periodically scans WebhookEvent rows whose next_run_at is due and +// pushes their IDs to the worker queue. It also recovers any events that were +// left in-flight if the process previously crashed (status=pending but +// next_run_at is old enough to be considered stale — beyond the longest +// possible delivery timeout). +func (s *WebhookService) poller(ctx context.Context) { + ticker := time.NewTicker(s.cfg.PollInterval) + pruneTicker := time.NewTicker(s.cfg.LogPruneInterval) + defer ticker.Stop() + defer pruneTicker.Stop() + for { + select { + case <-ticker.C: + s.pollDueEvents() + case <-pruneTicker.C: + s.pruneOldDeliveryLogs() + case <-s.stopCh: + return + case <-ctx.Done(): + return + } + } +} + +func (s *WebhookService) pruneOldDeliveryLogs() { + dbCfg := s.getCachedConfig() + if dbCfg == nil || dbCfg.LogRetentionDays <= 0 { + return + } + cutoff := time.Now().AddDate(0, 0, -dbCfg.LogRetentionDays) + result := s.db.Where("created_at < ?", cutoff).Delete(&models.WebhookDeliveryLog{}) + if result.Error != nil { + logger.Warnf("webhook: delivery log pruning failed: %v", result.Error) + return + } + if result.RowsAffected > 0 { + logger.Infof("webhook: pruned %d delivery log(s) older than %d days", result.RowsAffected, dbCfg.LogRetentionDays) + } +} + +func (s *WebhookService) pollDueEvents() { + var events []models.WebhookEvent + now := time.Now() + if err := s.db. + Where("status = ? AND next_run_at <= ?", models.DeliveryStatusPending, now). + Order("next_run_at ASC"). + Limit(s.cfg.QueueSize). + Find(&events).Error; err != nil { + logger.Warnf("webhook: poll error: %v", err) + return + } + for _, ev := range events { + select { + case s.queue <- ev.ID: + default: + // Queue full; will pick up on next poll cycle + } + } +} + +func (s *WebhookService) worker(ctx context.Context) { + for { + select { + case eventID := <-s.queue: + s.processEvent(ctx, eventID) + case <-s.stopCh: + return + case <-ctx.Done(): + return + } + } +} + +func (s *WebhookService) processEvent(ctx context.Context, eventID uint) { + // Atomically claim the event: only proceed if status is still "pending". + // This prevents two workers from processing the same event concurrently. + result := s.db.Model(&models.WebhookEvent{}). + Where("id = ? AND status = ?", eventID, models.DeliveryStatusPending). + Update("status", "in_flight") + if result.Error != nil { + logger.Errorf("webhook: failed to claim event %d: %v", eventID, result.Error) + return + } + if result.RowsAffected == 0 { + // Another worker claimed it first + return + } + + var ev models.WebhookEvent + if err := s.db.First(&ev, eventID).Error; err != nil { + logger.Errorf("webhook: event %d not found after claim: %v", eventID, err) + return + } + + var sub models.WebhookSubscription + if err := s.db.First(&sub, ev.SubscriptionID).Error; err != nil { + logger.Errorf("webhook: subscription %d not found for event %d: %v", ev.SubscriptionID, eventID, err) + s.db.Delete(&ev) + return + } + + policy := s.resolveRetryPolicy(sub) + + body := []byte(ev.Payload) + + // SSRF check at delivery time (defence-in-depth; URL also checked at write time) + if err := validateWebhookURLInternal(sub.URL, s.cfg.AllowInternalNetwork); err != nil { + log := models.WebhookDeliveryLog{ + SubscriptionID: sub.ID, + EventTopic: ev.EventTopic, + EventID: ev.EventID, + Payload: string(body), + AttemptNumber: ev.AttemptNumber, + Status: models.DeliveryStatusFailed, + ErrorMessage: "SSRF: " + err.Error(), + AttemptedAt: time.Now(), + } + s.db.Create(&log) + s.db.Model(&ev).Update("status", "exhausted") + return + } + + deliveryTime := time.Now() + sig := computeHMAC(body, sub.Secret, deliveryTime) + + client, err := buildHTTPClient(sub.TransportConfig, policy.httpTimeout) + if err != nil { + log := models.WebhookDeliveryLog{ + SubscriptionID: sub.ID, + EventTopic: ev.EventTopic, + EventID: ev.EventID, + Payload: string(body), + AttemptNumber: ev.AttemptNumber, + Status: models.DeliveryStatusFailed, + ErrorMessage: err.Error(), + AttemptedAt: deliveryTime, + } + s.db.Create(&log) + s.db.Model(&ev).Update("status", "exhausted") + return + } + + deliveryCtx, cancel := context.WithTimeout(ctx, policy.httpTimeout) + defer cancel() + + req, err := http.NewRequestWithContext(deliveryCtx, http.MethodPost, sub.URL, strings.NewReader(string(body))) + if err != nil { + logger.Errorf("webhook: failed to build request for sub %d: %v", sub.ID, err) + return + } + req.Header.Set("Content-Type", "application/json") + req.Header.Set("X-Tyk-Timestamp", fmt.Sprintf("%d", deliveryTime.Unix())) + if sig != "" { + req.Header.Set("X-Tyk-Signature", "sha256="+sig) + } + req.Header.Set("X-Tyk-Event-Topic", ev.EventTopic) + req.Header.Set("X-Tyk-Event-ID", ev.EventID) + req.Header.Set("X-Tyk-Delivery-Attempt", fmt.Sprintf("%d", ev.AttemptNumber)) + + maxRespBody := s.cfg.MaxResponseBodyBytes + if dbCfg := s.getCachedConfig(); dbCfg != nil && dbCfg.MaxResponseBodyBytes > 0 { + maxRespBody = dbCfg.MaxResponseBodyBytes + } + + now := time.Now() + resp, httpErr := client.Do(req) + + logEntry := models.WebhookDeliveryLog{ + SubscriptionID: sub.ID, + EventTopic: ev.EventTopic, + EventID: ev.EventID, + Payload: string(body), + AttemptNumber: ev.AttemptNumber, + AttemptedAt: now, + } + + success := false + if httpErr != nil { + logEntry.Status = models.DeliveryStatusFailed + logEntry.ErrorMessage = httpErr.Error() + } else { + defer resp.Body.Close() + respBytes, _ := io.ReadAll(io.LimitReader(resp.Body, int64(maxRespBody))) + logEntry.HTTPStatusCode = resp.StatusCode + logEntry.ResponseBody = string(respBytes) + if resp.StatusCode >= 200 && resp.StatusCode < 300 { + logEntry.Status = models.DeliveryStatusSuccess + success = true + } else { + logEntry.Status = models.DeliveryStatusFailed + logEntry.ErrorMessage = fmt.Sprintf("HTTP %d", resp.StatusCode) + } + } + + if err := s.db.Create(&logEntry).Error; err != nil { + logger.Errorf("webhook: failed to persist delivery log for sub %d: %v", sub.ID, err) + } + + if success { + s.db.Model(&ev).Update("status", "delivered") + return + } + + nextAttempt := ev.AttemptNumber + 1 + if nextAttempt > policy.maxAttempts { + s.db.Model(&ev).Update("status", "exhausted") + return + } + + backoffIdx := ev.AttemptNumber - 1 + var delay time.Duration + if backoffIdx < len(policy.backoff) { + delay = policy.backoff[backoffIdx] + } else if len(policy.backoff) > 0 { + delay = policy.backoff[len(policy.backoff)-1] + } + + nextRun := now.Add(delay) + logEntry.NextRetryAt = &nextRun + s.db.Model(&logEntry).Update("next_retry_at", &nextRun) + + s.db.Model(&ev).Updates(map[string]interface{}{ + "status": models.DeliveryStatusPending, + "attempt_number": nextAttempt, + "next_run_at": nextRun, + }) +} + +func (s *WebhookService) resolveRetryPolicy(sub models.WebhookSubscription) resolvedPolicy { + policy := resolvedPolicy{ + maxAttempts: s.cfg.DefaultMaxAttempts, + backoff: s.cfg.DefaultBackoff, + httpTimeout: s.cfg.DefaultHTTPTimeout, + } + + if dbCfg := s.getCachedConfig(); dbCfg != nil { + if dbCfg.DefaultRetryPolicy.MaxAttempts > 0 { + policy.maxAttempts = dbCfg.DefaultRetryPolicy.MaxAttempts + } + if len(dbCfg.DefaultRetryPolicy.BackoffSeconds) > 0 { + policy.backoff = secondsToDurations(dbCfg.DefaultRetryPolicy.BackoffSeconds) + } + if dbCfg.DefaultRetryPolicy.TimeoutSeconds > 0 { + policy.httpTimeout = time.Duration(dbCfg.DefaultRetryPolicy.TimeoutSeconds) * time.Second + } + } + + if sub.RetryPolicy.MaxAttempts > 0 { + policy.maxAttempts = sub.RetryPolicy.MaxAttempts + } + if len(sub.RetryPolicy.BackoffSeconds) > 0 { + policy.backoff = secondsToDurations(sub.RetryPolicy.BackoffSeconds) + } + if sub.RetryPolicy.TimeoutSeconds > 0 { + policy.httpTimeout = time.Duration(sub.RetryPolicy.TimeoutSeconds) * time.Second + } + + return policy +} + +func secondsToDurations(secs []int) []time.Duration { + out := make([]time.Duration, len(secs)) + for i, s := range secs { + out[i] = time.Duration(s) * time.Second + } + return out +} + +// HandleEvent is called by the event bus. It persists a WebhookEvent row for +// each matching subscription so delivery survives process restarts. +func (s *WebhookService) HandleEvent(ev eventbridge.Event) { + subs, err := s.findMatchingSubscriptions(ev.Topic) + if err != nil { + logger.Warnf("webhook: failed to query subscriptions for topic %s: %v", ev.Topic, err) + return + } + + body, err := json.Marshal(ev) + if err != nil { + logger.Errorf("webhook: failed to marshal event %s: %v", ev.ID, err) + return + } + + now := time.Now() + rows := make([]models.WebhookEvent, len(subs)) + for i, sub := range subs { + rows[i] = models.WebhookEvent{ + SubscriptionID: sub.ID, + EventTopic: ev.Topic, + EventID: ev.ID, + Payload: string(body), + AttemptNumber: 1, + Status: models.DeliveryStatusPending, + NextRunAt: now, + } + } + if len(rows) == 0 { + return + } + if err := s.db.Create(&rows).Error; err != nil { + logger.Errorf("webhook: failed to persist events for topic %s: %v", ev.Topic, err) + return + } + for _, row := range rows { + select { + case s.queue <- row.ID: + default: + // Poller will pick it up + } + } +} + +func (s *WebhookService) findMatchingSubscriptions(topic string) ([]models.WebhookSubscription, error) { + var subs []models.WebhookSubscription + err := s.db. + Joins("JOIN webhook_topics ON webhook_topics.subscription_id = webhook_subscriptions.id"). + Where("webhook_subscriptions.enabled = ? AND webhook_topics.topic = ?", true, topic). + Preload("Topics"). + Find(&subs).Error + return subs, err +} + +// RetryDelivery re-enqueues the original payload from a delivery log as a new +// WebhookEvent with attempt=1 for immediate delivery. subscriptionID is the +// parent subscription from the URL path and must match the log's own +// SubscriptionID. actorID is the user ID of the person who triggered the retry +// (0 if system-initiated). +func (s *WebhookService) RetryDelivery(subscriptionID, logID, actorID uint) error { + var deliveryLog models.WebhookDeliveryLog + if err := s.db.First(&deliveryLog, logID).Error; err != nil { + return fmt.Errorf("delivery log not found: %w", err) + } + if deliveryLog.SubscriptionID != subscriptionID { + return fmt.Errorf("delivery log %d does not belong to subscription %d", logID, subscriptionID) + } + sub, err := s.GetWebhook(deliveryLog.SubscriptionID) + if err != nil { + return fmt.Errorf("subscription not found: %w", err) + } + + row := models.WebhookEvent{ + SubscriptionID: sub.ID, + EventTopic: deliveryLog.EventTopic, + EventID: deliveryLog.EventID, + Payload: deliveryLog.Payload, + AttemptNumber: 1, + Status: models.DeliveryStatusPending, + NextRunAt: time.Now(), + TriggeredBy: actorID, + } + if err := s.db.Create(&row).Error; err != nil { + return fmt.Errorf("failed to enqueue retry: %w", err) + } + + select { + case s.queue <- row.ID: + default: + // Poller will pick it up + } + return nil +} + +func validateWebhookURLInternal(rawURL string, allowInternal bool) error { + if rawURL == "" { + return fmt.Errorf("webhook URL must not be empty") + } + if allowInternal { + return nil + } + + parsed, err := neturl.Parse(rawURL) + if err != nil { + return fmt.Errorf("invalid URL: %w", err) + } + if parsed.Scheme != "http" && parsed.Scheme != "https" { + return fmt.Errorf("URL scheme must be http or https") + } + + hostname := parsed.Hostname() + ips, err := net.LookupIP(hostname) + if err != nil { + return fmt.Errorf("cannot resolve hostname %q: %w", hostname, err) + } + for _, ip := range ips { + if isWebhookPrivateIP(ip) { + return fmt.Errorf("URL %q resolves to private/internal address %s", rawURL, ip) + } + } + return nil +} + +var webhookPrivateCIDRs = func() []*net.IPNet { + ranges := []string{ + "10.0.0.0/8", + "172.16.0.0/12", + "192.168.0.0/16", + "127.0.0.0/8", + "169.254.0.0/16", + "::1/128", + "fc00::/7", + } + nets := make([]*net.IPNet, 0, len(ranges)) + for _, cidr := range ranges { + _, n, _ := net.ParseCIDR(cidr) + nets = append(nets, n) + } + return nets +}() + +func isWebhookPrivateIP(ip net.IP) bool { + for _, n := range webhookPrivateCIDRs { + if n.Contains(ip) { + return true + } + } + return false +} + +// computeHMAC signs "timestamp.body" with the subscription secret so the +// signature is bound to a specific delivery time. This prevents replay attacks: +// even if an attacker captures a valid (timestamp, body, sig) tuple, re-sending +// it after the receiver's tolerance window (typically 5 minutes) will be rejected +// because the timestamp embedded in the signed material will be stale. +// +// The signature covers: strconv.FormatInt(ts.Unix(), 10) + "." + string(body) +// +// Receivers verify by: +// 1. Parsing X-Tyk-Timestamp as a Unix epoch integer +// 2. Rejecting if abs(now - ts) > tolerance +// 3. Recomputing HMAC-SHA256 over "ts.body" with their copy of the secret +// 4. Comparing with X-Tyk-Signature using constant-time equality +func computeHMAC(body []byte, secret string, ts time.Time) string { + if secret == "" { + return "" + } + mac := hmac.New(sha256.New, []byte(secret)) + mac.Write([]byte(fmt.Sprintf("%d.%s", ts.Unix(), body))) + return hex.EncodeToString(mac.Sum(nil)) +} + +// buildHTTPClient constructs a per-subscription http.Client that applies the +// subscription's TransportConfig (proxy, TLS settings) on top of the service's +// default timeout. Returns an error only if the TLS configuration is invalid. +func buildHTTPClient(tc models.WebhookTransportConfig, timeout time.Duration) (*http.Client, error) { + tlsCfg := &tls.Config{ + InsecureSkipVerify: tc.InsecureSkipVerify, //nolint:gosec // user-controlled per subscription + } + + if tc.TLSCACert != "" { + pool := x509.NewCertPool() + if !pool.AppendCertsFromPEM([]byte(tc.TLSCACert)) { + return nil, fmt.Errorf("webhook: invalid TLS CA certificate") + } + tlsCfg.RootCAs = pool + } + + if tc.TLSClientCert != "" || tc.TLSClientKey != "" { + cert, err := tls.X509KeyPair([]byte(tc.TLSClientCert), []byte(tc.TLSClientKey)) + if err != nil { + return nil, fmt.Errorf("webhook: invalid TLS client cert/key: %w", err) + } + tlsCfg.Certificates = []tls.Certificate{cert} + } + + transport := &http.Transport{ + TLSClientConfig: tlsCfg, + } + + if tc.ProxyURL != "" { + proxyURL, err := neturl.Parse(tc.ProxyURL) + if err != nil { + return nil, fmt.Errorf("webhook: invalid proxy URL: %w", err) + } + transport.Proxy = http.ProxyURL(proxyURL) + } + + return &http.Client{ + Transport: transport, + Timeout: timeout, + }, nil +} + +// ValidateURL checks whether rawURL is safe to use as a webhook target, +// respecting the service's AllowInternalNetwork setting. +func (s *WebhookService) ValidateURL(rawURL string) error { + return validateWebhookURLInternal(rawURL, s.cfg.AllowInternalNetwork) +} + +// ValidateTopics returns an error if any topic in the list is not a known subscribable topic. +func ValidateTopics(topics []string) error { + known := make(map[string]struct{}, len(KnownWebhookTopics)) + for _, t := range KnownWebhookTopics { + known[t] = struct{}{} + } + for _, t := range topics { + if _, ok := known[t]; !ok { + return fmt.Errorf("unknown event topic %q", t) + } + } + return nil +} + +func (s *WebhookService) CreateWebhook(sub *models.WebhookSubscription) error { + return s.db.Create(sub).Error +} + +func (s *WebhookService) GetWebhook(id uint) (*models.WebhookSubscription, error) { + var sub models.WebhookSubscription + if err := s.db.Preload("Topics").First(&sub, id).Error; err != nil { + return nil, err + } + return &sub, nil +} + +func (s *WebhookService) ListWebhooks() ([]models.WebhookSubscription, error) { + var subs []models.WebhookSubscription + if err := s.db.Preload("Topics").Find(&subs).Error; err != nil { + return nil, err + } + return subs, nil +} + +func (s *WebhookService) UpdateWebhook(sub *models.WebhookSubscription) error { + return s.db.Transaction(func(tx *gorm.DB) error { + if err := tx.Save(sub).Error; err != nil { + return err + } + if err := tx.Where("subscription_id = ?", sub.ID).Delete(&models.WebhookTopic{}).Error; err != nil { + return err + } + for i := range sub.Topics { + sub.Topics[i].SubscriptionID = sub.ID + sub.Topics[i].ID = 0 + } + if len(sub.Topics) > 0 { + return tx.Create(&sub.Topics).Error + } + return nil + }) +} + +func (s *WebhookService) DeleteWebhook(id uint) error { + return s.db.Delete(&models.WebhookSubscription{}, id).Error +} + +func (s *WebhookService) ListDeliveryLogs(subscriptionID uint, pageSize, pageNumber int) ([]models.WebhookDeliveryLog, int64, int, error) { + var totalCount int64 + query := s.db.Model(&models.WebhookDeliveryLog{}).Where("subscription_id = ?", subscriptionID) + if err := query.Count(&totalCount).Error; err != nil { + return nil, 0, 0, err + } + + totalPages := 1 + if pageSize > 0 && totalCount > 0 { + totalPages = int(totalCount) / pageSize + if int(totalCount)%pageSize != 0 { + totalPages++ + } + } + + var logs []models.WebhookDeliveryLog + q := s.db.Where("subscription_id = ?", subscriptionID).Order("created_at DESC") + if pageSize > 0 { + q = q.Limit(pageSize).Offset((pageNumber - 1) * pageSize) + } + if err := q.Find(&logs).Error; err != nil { + return nil, 0, 0, err + } + return logs, totalCount, totalPages, nil +} + +func (s *WebhookService) GetWebhookConfig() (*models.WebhookConfig, error) { + return models.GetWebhookConfig(s.db) +} + +func (s *WebhookService) UpdateWebhookConfig(cfg *models.WebhookConfig) error { + if err := models.UpdateWebhookConfig(s.db, cfg); err != nil { + return err + } + s.invalidateConfigCache() + return nil +} + +// TestWebhook fires a synchronous test delivery. No delivery log or queue row is persisted. +func (s *WebhookService) TestWebhook(sub *models.WebhookSubscription) error { + payload, _ := json.Marshal(map[string]string{ + "message": "This is a test webhook delivery from Tyk AI Studio.", + }) + + event := eventbridge.Event{ + ID: fmt.Sprintf("test-%d", time.Now().UnixNano()), + Topic: "system.webhook.test", + Origin: "control", + Dir: eventbridge.DirLocal, + Payload: payload, + } + + body, _ := json.Marshal(event) + deliveryTime := time.Now() + sig := computeHMAC(body, sub.Secret, deliveryTime) + + policy := s.resolveRetryPolicy(*sub) + + client, err := buildHTTPClient(sub.TransportConfig, policy.httpTimeout) + if err != nil { + return fmt.Errorf("failed to build HTTP client: %w", err) + } + + ctx, cancel := context.WithTimeout(context.Background(), policy.httpTimeout) + defer cancel() + + req, err := http.NewRequestWithContext(ctx, http.MethodPost, sub.URL, strings.NewReader(string(body))) + if err != nil { + return fmt.Errorf("failed to build test request: %w", err) + } + req.Header.Set("Content-Type", "application/json") + req.Header.Set("X-Tyk-Timestamp", fmt.Sprintf("%d", deliveryTime.Unix())) + if sig != "" { + req.Header.Set("X-Tyk-Signature", "sha256="+sig) + } + req.Header.Set("X-Tyk-Event-Topic", event.Topic) + req.Header.Set("X-Tyk-Event-ID", event.ID) + req.Header.Set("X-Tyk-Delivery-Attempt", "1") + + resp, err := client.Do(req) + if err != nil { + return fmt.Errorf("test webhook delivery failed: %w", err) + } + defer resp.Body.Close() + + if resp.StatusCode < 200 || resp.StatusCode >= 300 { + return fmt.Errorf("test webhook returned HTTP %d", resp.StatusCode) + } + return nil +} diff --git a/services/webhook_service_test.go b/services/webhook_service_test.go new file mode 100644 index 000000000..75fcb4d96 --- /dev/null +++ b/services/webhook_service_test.go @@ -0,0 +1,689 @@ +package services + +import ( + "net/http" + "net/http/httptest" + "reflect" + "sync/atomic" + "testing" + "time" + + "github.com/TykTechnologies/midsommar/v2/models" + "github.com/TykTechnologies/midsommar/v2/pkg/eventbridge" + "gorm.io/driver/sqlite" + "gorm.io/gorm" + "gorm.io/gorm/logger" +) + +func setupWebhookDB(t *testing.T) *gorm.DB { + t.Helper() + db, err := gorm.Open(sqlite.Open(":memory:"), &gorm.Config{ + Logger: logger.Default.LogMode(logger.Silent), + }) + if err != nil { + t.Fatalf("open db: %v", err) + } + if err := db.AutoMigrate( + &models.WebhookSubscription{}, + &models.WebhookTopic{}, + &models.WebhookEvent{}, + &models.WebhookDeliveryLog{}, + &models.WebhookConfig{}, + ); err != nil { + t.Fatalf("migrate: %v", err) + } + return db +} + +func setupService(t *testing.T) *WebhookService { + t.Helper() + cfg := DefaultWebhookServiceConfig() + cfg.AllowInternalNetwork = true // httptest servers use 127.0.0.1 + return NewWebhookService(setupWebhookDB(t), cfg) +} + +// setupFastService creates a service with short backoff for retry tests. +// backoffMs is the delay between attempts in milliseconds. +func setupFastService(t *testing.T, maxAttempts, backoffMs int) *WebhookService { + t.Helper() + backoff := make([]time.Duration, maxAttempts) + for i := range backoff { + backoff[i] = time.Duration(backoffMs) * time.Millisecond + } + cfg := WebhookServiceConfig{ + Workers: 2, + QueueSize: 64, + DefaultMaxAttempts: maxAttempts, + DefaultBackoff: backoff, + DefaultHTTPTimeout: 5 * time.Second, + MaxResponseBodyBytes: 4096, + PollInterval: 20 * time.Millisecond, + AllowInternalNetwork: true, // httptest servers use 127.0.0.1 + } + return NewWebhookService(setupWebhookDB(t), cfg) +} + +func createSub(t *testing.T, svc *WebhookService, url string, topics []string) *models.WebhookSubscription { + t.Helper() + sub := &models.WebhookSubscription{ + Name: "test", + URL: url, + Enabled: true, + Topics: make([]models.WebhookTopic, len(topics)), + } + for i, t := range topics { + sub.Topics[i] = models.WebhookTopic{Topic: t} + } + if err := svc.CreateWebhook(sub); err != nil { + t.Fatalf("create webhook: %v", err) + } + return sub +} + +func waitForEvent(t *testing.T, svc *WebhookService, subID uint, status string, timeout time.Duration) models.WebhookEvent { + t.Helper() + deadline := time.Now().Add(timeout) + for time.Now().Before(deadline) { + var ev models.WebhookEvent + err := svc.db.Where("subscription_id = ? AND status = ?", subID, status).First(&ev).Error + if err == nil { + return ev + } + time.Sleep(5 * time.Millisecond) + } + t.Fatalf("timeout waiting for event status=%s for sub %d", status, subID) + return models.WebhookEvent{} +} + +func TestValidateTopics_Known(t *testing.T) { + if err := ValidateTopics([]string{TopicLLMCreated, TopicAppApproved}); err != nil { + t.Fatalf("known topics should be valid: %v", err) + } +} + +func TestValidateTopics_Unknown(t *testing.T) { + if err := ValidateTopics([]string{"custom.made.up.topic"}); err == nil { + t.Fatal("unknown topic should fail validation") + } +} + +func TestValidateTopics_Empty(t *testing.T) { + if err := ValidateTopics([]string{}); err != nil { + t.Fatalf("empty topics should be valid: %v", err) + } +} + +func TestTopicMatching_Exact(t *testing.T) { + svc := setupService(t) + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {})) + defer ts.Close() + + createSub(t, svc, ts.URL, []string{TopicLLMCreated}) + + matched, err := svc.findMatchingSubscriptions(TopicLLMCreated) + if err != nil || len(matched) != 1 { + t.Fatalf("expected 1 match for exact topic, got %d (err=%v)", len(matched), err) + } + + notMatched, err := svc.findMatchingSubscriptions(TopicLLMDeleted) + if err != nil || len(notMatched) != 0 { + t.Fatalf("expected 0 matches for different topic, got %d (err=%v)", len(notMatched), err) + } +} + +func TestComputeHMAC(t *testing.T) { + body := []byte(`{"hello":"world"}`) + ts := time.Unix(1700000000, 0) + sig := computeHMAC(body, "secret", ts) + if sig == "" { + t.Fatal("expected non-empty signature") + } + // Same inputs must produce the same output. + if computeHMAC(body, "secret", ts) != sig { + t.Fatal("HMAC must be deterministic for same inputs") + } + // Different timestamp must produce a different signature. + if computeHMAC(body, "secret", ts.Add(time.Second)) == sig { + t.Fatal("HMAC must differ when timestamp differs") + } +} + +func TestComputeHMAC_EmptySecret(t *testing.T) { + if computeHMAC([]byte("data"), "", time.Now()) != "" { + t.Fatal("empty secret should return empty string") + } +} + +func TestDefaultBackoffDuration(t *testing.T) { + cfg := DefaultWebhookServiceConfig() + expected := []time.Duration{10 * time.Second, 30 * time.Second, 2 * time.Minute, 10 * time.Minute, 30 * time.Minute} + if len(cfg.DefaultBackoff) != len(expected) { + t.Fatalf("expected %d backoff entries, got %d", len(expected), len(cfg.DefaultBackoff)) + } + for i, d := range expected { + if cfg.DefaultBackoff[i] != d { + t.Errorf("backoff[%d]: want %v, got %v", i, d, cfg.DefaultBackoff[i]) + } + } +} + +func setupSSRFService(t *testing.T) *WebhookService { + t.Helper() + cfg := DefaultWebhookServiceConfig() + cfg.AllowInternalNetwork = false + return NewWebhookService(setupWebhookDB(t), cfg) +} + +func TestValidateWebhookURL_Empty(t *testing.T) { + svc := setupSSRFService(t) + if err := svc.ValidateURL(""); err == nil { + t.Fatal("empty URL should fail") + } +} + +func TestValidateWebhookURL_BadScheme(t *testing.T) { + svc := setupSSRFService(t) + if err := svc.ValidateURL("ftp://example.com/hook"); err == nil { + t.Fatal("ftp scheme should fail") + } +} + +func TestValidateWebhookURL_PrivateLoopback(t *testing.T) { + svc := setupSSRFService(t) + if err := svc.ValidateURL("http://127.0.0.1/hook"); err == nil { + t.Fatal("loopback URL should fail SSRF check") + } +} + +func TestValidateWebhookURL_AllowInternal(t *testing.T) { + cfg := DefaultWebhookServiceConfig() + cfg.AllowInternalNetwork = true + svc := NewWebhookService(setupWebhookDB(t), cfg) + if err := svc.ValidateURL("http://127.0.0.1/hook"); err != nil { + t.Fatalf("should be allowed with AllowInternalNetwork=true: %v", err) + } +} + +func TestHandleEvent_PersistsQueue(t *testing.T) { + svc := setupFastService(t, 1, 50) + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + })) + defer ts.Close() + + createSub(t, svc, ts.URL, []string{"system.llm.created"}) + + ev := eventbridge.Event{ID: "evt-1", Topic: "system.llm.created", Origin: "control"} + svc.HandleEvent(ev) + + var events []models.WebhookEvent + if err := svc.db.Find(&events).Error; err != nil { + t.Fatalf("db find: %v", err) + } + if len(events) != 1 { + t.Fatalf("expected 1 WebhookEvent row, got %d", len(events)) + } + if events[0].Status != models.DeliveryStatusPending && events[0].Status != "delivered" { + t.Errorf("unexpected initial status: %s", events[0].Status) + } +} + +func TestDelivery_Success(t *testing.T) { + svc := setupFastService(t, 3, 30) + + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + })) + defer ts.Close() + + sub := createSub(t, svc, ts.URL, []string{"system.llm.created"}) + + ctx := t.Context() + svc.Start(ctx) + defer svc.Stop() + + ev := eventbridge.Event{ID: "evt-ok", Topic: "system.llm.created", Origin: "control"} + svc.HandleEvent(ev) + + delivered := waitForEvent(t, svc, sub.ID, "delivered", 10*time.Second) + if delivered.Status != "delivered" { + t.Errorf("expected delivered, got %s", delivered.Status) + } + + var logs []models.WebhookDeliveryLog + svc.db.Where("subscription_id = ?", sub.ID).Find(&logs) + if len(logs) != 1 { + t.Fatalf("expected 1 delivery log, got %d", len(logs)) + } + if logs[0].Status != models.DeliveryStatusSuccess { + t.Errorf("log status: want success, got %s", logs[0].Status) + } +} + +func TestDelivery_Failure(t *testing.T) { + svc := setupFastService(t, 1, 30) + + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusInternalServerError) + })) + defer ts.Close() + + sub := createSub(t, svc, ts.URL, []string{"system.llm.created"}) + + ctx := t.Context() + svc.Start(ctx) + defer svc.Stop() + + ev := eventbridge.Event{ID: "evt-fail", Topic: "system.llm.created", Origin: "control"} + svc.HandleEvent(ev) + + waitForEvent(t, svc, sub.ID, "exhausted", 2*time.Second) + + var logs []models.WebhookDeliveryLog + svc.db.Where("subscription_id = ?", sub.ID).Find(&logs) + if len(logs) != 1 { + t.Fatalf("expected 1 delivery log, got %d", len(logs)) + } + if logs[0].Status != models.DeliveryStatusFailed { + t.Errorf("log status: want failed, got %s", logs[0].Status) + } +} + +func TestDelivery_Retry(t *testing.T) { + var callCount int32 + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + n := atomic.AddInt32(&callCount, 1) + if n == 1 { + w.WriteHeader(http.StatusInternalServerError) + } else { + w.WriteHeader(http.StatusOK) + } + })) + defer ts.Close() + + svc := setupFastService(t, 3, 30) + sub := createSub(t, svc, ts.URL, []string{"system.llm.created"}) + + ctx := t.Context() + svc.Start(ctx) + defer svc.Stop() + + ev := eventbridge.Event{ID: "evt-retry", Topic: "system.llm.created", Origin: "control"} + svc.HandleEvent(ev) + + waitForEvent(t, svc, sub.ID, "delivered", 5*time.Second) + + var logs []models.WebhookDeliveryLog + svc.db.Where("subscription_id = ?", sub.ID).Find(&logs) + if len(logs) < 2 { + t.Fatalf("expected at least 2 delivery log entries, got %d", len(logs)) + } + last := logs[len(logs)-1] + if last.Status != models.DeliveryStatusSuccess { + t.Errorf("last log status: want success, got %s", last.Status) + } +} + +func TestDelivery_MaxRetries(t *testing.T) { + maxAttempts := 3 + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusServiceUnavailable) + })) + defer ts.Close() + + svc := setupFastService(t, maxAttempts, 30) + sub := createSub(t, svc, ts.URL, []string{"system.llm.created"}) + + ctx := t.Context() + svc.Start(ctx) + defer svc.Stop() + + ev := eventbridge.Event{ID: "evt-max", Topic: "system.llm.created", Origin: "control"} + svc.HandleEvent(ev) + + waitForEvent(t, svc, sub.ID, "exhausted", 5*time.Second) + + var logs []models.WebhookDeliveryLog + svc.db.Where("subscription_id = ?", sub.ID).Find(&logs) + if len(logs) != maxAttempts { + t.Fatalf("expected %d delivery log entries, got %d", maxAttempts, len(logs)) + } + for _, l := range logs { + if l.Status != models.DeliveryStatusFailed { + t.Errorf("log status: want failed, got %s", l.Status) + } + } +} + +func TestTestWebhook_Success(t *testing.T) { + svc := setupService(t) + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + })) + defer ts.Close() + + sub := createSub(t, svc, ts.URL, []string{"system.llm.created"}) + if err := svc.TestWebhook(sub); err != nil { + t.Fatalf("TestWebhook: %v", err) + } +} + +func TestTestWebhook_Failure(t *testing.T) { + svc := setupService(t) + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusForbidden) + })) + defer ts.Close() + + sub := createSub(t, svc, ts.URL, []string{"system.llm.created"}) + if err := svc.TestWebhook(sub); err == nil { + t.Fatal("expected error for non-2xx response") + } +} + +func TestWebhookCRUD(t *testing.T) { + svc := setupService(t) + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {})) + defer ts.Close() + + sub := &models.WebhookSubscription{ + Name: "crud-test", + URL: ts.URL, + Topics: []models.WebhookTopic{{Topic: "system.llm.created"}}, + Enabled: true, + } + if err := svc.CreateWebhook(sub); err != nil { + t.Fatalf("create: %v", err) + } + if sub.ID == 0 { + t.Fatal("expected non-zero ID after create") + } + + got, err := svc.GetWebhook(sub.ID) + if err != nil { + t.Fatalf("get: %v", err) + } + if got.Name != "crud-test" { + t.Errorf("name: want crud-test, got %s", got.Name) + } + + got.Name = "updated" + if err := svc.UpdateWebhook(got); err != nil { + t.Fatalf("update: %v", err) + } + got2, _ := svc.GetWebhook(sub.ID) + if got2.Name != "updated" { + t.Errorf("after update name: want updated, got %s", got2.Name) + } + + all, err := svc.ListWebhooks() + if err != nil { + t.Fatalf("list: %v", err) + } + if len(all) != 1 { + t.Fatalf("expected 1 webhook, got %d", len(all)) + } + + if err := svc.DeleteWebhook(sub.ID); err != nil { + t.Fatalf("delete: %v", err) + } + if _, err := svc.GetWebhook(sub.ID); err == nil { + t.Fatal("expected error after delete") + } +} + +func TestRetryPolicy_Resolution_StaticDefaults(t *testing.T) { + svc := setupService(t) + sub := models.WebhookSubscription{} + policy := svc.resolveRetryPolicy(sub) + if policy.maxAttempts != svc.cfg.DefaultMaxAttempts { + t.Errorf("maxAttempts: want %d, got %d", svc.cfg.DefaultMaxAttempts, policy.maxAttempts) + } + if policy.httpTimeout != svc.cfg.DefaultHTTPTimeout { + t.Errorf("httpTimeout: want %v, got %v", svc.cfg.DefaultHTTPTimeout, policy.httpTimeout) + } +} + +func TestRetryPolicy_Resolution_DBGlobal(t *testing.T) { + svc := setupService(t) + + cfg := &models.WebhookConfig{ + ID: models.WebhookConfigSingletonID, + DefaultRetryPolicy: models.WebhookRetryPolicy{ + MaxAttempts: 7, + TimeoutSeconds: 30, + }, + } + if err := svc.UpdateWebhookConfig(cfg); err != nil { + t.Fatalf("update config: %v", err) + } + + sub := models.WebhookSubscription{} + policy := svc.resolveRetryPolicy(sub) + if policy.maxAttempts != 7 { + t.Errorf("maxAttempts: want 7, got %d", policy.maxAttempts) + } + if policy.httpTimeout != 30*time.Second { + t.Errorf("httpTimeout: want 30s, got %v", policy.httpTimeout) + } +} + +func TestRetryPolicy_Resolution_PerSubscriptionOverride(t *testing.T) { + svc := setupService(t) + + cfg := &models.WebhookConfig{ + ID: models.WebhookConfigSingletonID, + DefaultRetryPolicy: models.WebhookRetryPolicy{ + MaxAttempts: 7, + }, + } + svc.UpdateWebhookConfig(cfg) + + sub := models.WebhookSubscription{ + RetryPolicy: models.WebhookRetryPolicy{ + MaxAttempts: 2, + TimeoutSeconds: 5, + }, + } + policy := svc.resolveRetryPolicy(sub) + if policy.maxAttempts != 2 { + t.Errorf("maxAttempts: want 2 (sub override), got %d", policy.maxAttempts) + } + if policy.httpTimeout != 5*time.Second { + t.Errorf("httpTimeout: want 5s, got %v", policy.httpTimeout) + } +} + +func TestRetryDelivery(t *testing.T) { + svc := setupFastService(t, 3, 30) + + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + })) + defer ts.Close() + + sub := createSub(t, svc, ts.URL, []string{"system.llm.created"}) + + log := &models.WebhookDeliveryLog{ + SubscriptionID: sub.ID, + EventTopic: "system.llm.created", + EventID: "evt-retry-manual", + Payload: `{"topic":"system.llm.created"}`, + AttemptNumber: 1, + Status: models.DeliveryStatusFailed, + AttemptedAt: time.Now(), + } + if err := svc.db.Create(log).Error; err != nil { + t.Fatalf("create log: %v", err) + } + + ctx := t.Context() + svc.Start(ctx) + defer svc.Stop() + + if err := svc.RetryDelivery(sub.ID, log.ID, 0); err != nil { + t.Fatalf("RetryDelivery: %v", err) + } + + deadline := time.Now().Add(3 * time.Second) + for time.Now().Before(deadline) { + var ev models.WebhookEvent + if svc.db.Where("subscription_id = ? AND event_id = ? AND status = ?", sub.ID, "evt-retry-manual", "delivered").First(&ev).Error == nil { + return + } + time.Sleep(10 * time.Millisecond) + } + t.Fatal("timeout waiting for retried delivery to succeed") +} + +func TestRetryDelivery_NotFound(t *testing.T) { + svc := setupService(t) + if err := svc.RetryDelivery(1, 99999, 0); err == nil { + t.Fatal("expected error for unknown log ID") + } +} + +func TestWebhookConfig_DefaultsAndUpdate(t *testing.T) { + svc := setupService(t) + + cfg, err := svc.GetWebhookConfig() + if err != nil { + t.Fatalf("get config: %v", err) + } + if cfg.ID != models.WebhookConfigSingletonID { + t.Errorf("singleton ID: want %d, got %d", models.WebhookConfigSingletonID, cfg.ID) + } + + cfg.DefaultRetryPolicy = models.WebhookRetryPolicy{ + MaxAttempts: 3, + BackoffSeconds: []int{5, 15, 60}, + } + if err := svc.UpdateWebhookConfig(cfg); err != nil { + t.Fatalf("update config: %v", err) + } + + got, err := svc.GetWebhookConfig() + if err != nil { + t.Fatalf("get config after update: %v", err) + } + if got.DefaultRetryPolicy.MaxAttempts != 3 { + t.Errorf("max_attempts: want 3, got %d", got.DefaultRetryPolicy.MaxAttempts) + } + if !reflect.DeepEqual(got.DefaultRetryPolicy.BackoffSeconds, []int{5, 15, 60}) { + t.Errorf("backoff: want [5 15 60], got %v", got.DefaultRetryPolicy.BackoffSeconds) + } +} + +func TestListWebhooks(t *testing.T) { + svc := setupService(t) + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {})) + defer ts.Close() + + svc.CreateWebhook(&models.WebhookSubscription{Name: "sub-a", URL: ts.URL, Topics: []models.WebhookTopic{{Topic: "system.llm.created"}}, Enabled: true}) + svc.CreateWebhook(&models.WebhookSubscription{Name: "sub-b", URL: ts.URL, Topics: []models.WebhookTopic{{Topic: "system.llm.created"}}, Enabled: true}) + + all, err := svc.ListWebhooks() + if err != nil { + t.Fatalf("list: %v", err) + } + if len(all) != 2 { + t.Errorf("want 2 webhooks, got %d", len(all)) + } +} + +func TestWebhookSubscription_SensitiveFieldEncryption(t *testing.T) { + // Enable encryption for this test. + t.Setenv("TYK_AI_SECRET_KEY", "test-encryption-key-for-webhooks") + + svc := setupService(t) + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {})) + defer ts.Close() + + sub := &models.WebhookSubscription{ + Name: "enc-test", + URL: ts.URL, + Secret: "hmac-signing-secret", + Enabled: true, + Topics: []models.WebhookTopic{{Topic: "system.llm.created"}}, + TransportConfig: models.WebhookTransportConfig{ + ProxyURL: "http://user:pass@proxy.example.com:3128", + }, + } + if err := svc.CreateWebhook(sub); err != nil { + t.Fatalf("create: %v", err) + } + + // Verify the values are encrypted at rest by reading with hooks disabled. + var raw models.WebhookSubscription + if err := svc.db.Session(&gorm.Session{SkipHooks: true}).First(&raw, sub.ID).Error; err != nil { + t.Fatalf("raw read: %v", err) + } + if raw.Secret == "hmac-signing-secret" { + t.Error("Secret stored in plaintext — expected encrypted value") + } + if raw.Secret == "" { + t.Error("Secret was not stored at all") + } + + // Verify GetWebhook returns plaintext values (AfterFind decrypts). + got, err := svc.GetWebhook(sub.ID) + if err != nil { + t.Fatalf("get: %v", err) + } + if got.Secret != "hmac-signing-secret" { + t.Errorf("Secret after GetWebhook: want %q, got %q", "hmac-signing-secret", got.Secret) + } + if got.TransportConfig.ProxyURL != "http://user:pass@proxy.example.com:3128" { + t.Errorf("ProxyURL after GetWebhook: want %q, got %q", + "http://user:pass@proxy.example.com:3128", got.TransportConfig.ProxyURL) + } +} + +func TestWebhookTransportConfig_Proxy(t *testing.T) { + svc := setupService(t) + + // Use an invalid proxy URL — buildHTTPClient should reject it. + sub := &models.WebhookSubscription{ + Name: "proxy-test", + URL: "http://example.com", + Enabled: true, + TransportConfig: models.WebhookTransportConfig{ + ProxyURL: "://bad-proxy", + }, + } + _, err := buildHTTPClient(sub.TransportConfig, 5*time.Second) + if err == nil { + t.Fatal("expected error for invalid proxy URL") + } + + // Valid proxy URL should succeed. + sub.TransportConfig.ProxyURL = "http://proxy.example.com:3128" + client, err := buildHTTPClient(sub.TransportConfig, 5*time.Second) + if err != nil { + t.Fatalf("unexpected error for valid proxy: %v", err) + } + if client == nil { + t.Fatal("expected non-nil client") + } + _ = svc +} + +func TestWebhookTransportConfig_InsecureSkipVerify(t *testing.T) { + tc := models.WebhookTransportConfig{InsecureSkipVerify: true} + client, err := buildHTTPClient(tc, 5*time.Second) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + transport, ok := client.Transport.(*http.Transport) + if !ok || !transport.TLSClientConfig.InsecureSkipVerify { + t.Fatal("expected InsecureSkipVerify=true on transport") + } +} + +func TestWebhookTransportConfig_InvalidCACert(t *testing.T) { + tc := models.WebhookTransportConfig{TLSCACert: "not-a-pem-cert"} + _, err := buildHTTPClient(tc, 5*time.Second) + if err == nil { + t.Fatal("expected error for invalid CA cert") + } +}