Skip to content

NATS JetStream publish source + fork image build#1

Open
michaeldoehler wants to merge 9 commits into
mainfrom
feat/publishmq-nats-jetstream
Open

NATS JetStream publish source + fork image build#1
michaeldoehler wants to merge 9 commits into
mainfrom
feat/publishmq-nats-jetstream

Conversation

@michaeldoehler
Copy link
Copy Markdown

Internal fork of hookdeck/outpost carrying the NATS JetStream publish-mq
source plus a GHCR image build, for running Outpost headless against our
per-tenant NATS accounts.

What's in here

  • NATS JetStream publish-mq driver with per-account multi-tenancy, an
    optional watched accounts directory for runtime add/remove, and
    account-config tenant_id stamping. (Same work proposed upstream as
    feat(publishmq): NATS JetStream source with per-account multi-tenancy hookdeck/outpost#910.)
  • ci: fork-image workflow building outpost + outpost-server (distroless,
    headless) and pushing to ghcr.io/revenexx/outpost on every push to this
    branch. Uses the built-in GITHUB_TOKEN.

Fork operations

  • All upstream workflows (release, SDK generation/publish, speakeasy,
    website deploy, unit tests, docs eval, claude) are disabled on this fork;
    only fork-image runs.
  • main tracks upstream hookdeck/outpost. This branch is rebased onto
    upstream to pull fixes; the patch is almost entirely additive (new files
    under internal/mqs, internal/util, cmd/publish, docs) so the rebase
    conflict surface stays small.

Image

ghcr.io/revenexx/outpost:nats-latest (moving) and :sha-<commit> (pinned).
Consumed by the webhooks deployment repo.

Michael Döhler added 9 commits May 23, 2026 00:56
Adds a publish-mq-only driver for NATS JetStream. Outpost reads events
from one or more pre-provisioned JetStream consumers via a pull-based
multiplexed subscription.

Key design points:
- Multi-account: one NATS Account per Outpost tenant. Each account gets
  its own connection and pull loop; messages are merged into a single
  Receive channel.
- Account.TenantID overrides the tenant_id field on incoming payloads,
  so an Account can only ever produce events for its mapped tenant.
- Stream and Consumer are operator-provisioned. Outpost only verifies
  existence on Init and fails loudly if either is missing.
- Auth via credentials_file (.creds, Operator/JWT-resolver mode).
- ConcurrentSubscription: pull concurrency is bounded by PullMaxMessages
  per account; upstream consumer skips its own semaphore.
- Publish() is intentionally unimplemented; JetStream is read-only here.
Adds dynamic add/remove of NATS Accounts at runtime via a watched
directory. Layout under accounts_dir:

  <account-name>/
    user.creds      NATS .creds (JWT + NKey seed)
    meta.yaml       stream/consumer/tenant_id metadata

The watcher debounces filesystem events (250ms) and triggers a
reconcile against the current connection set. Static accounts from
config.Accounts are preserved across reconciles; only dir-derived
accounts are added or removed.

Refactors NATSQueue internals to keep connections in a map keyed by
account name, with safe add/remove that also starts/stops the
per-account pump when a subscription is active.
Adds PublishNATSConfig + PublishNATSAccountConfig to the PublishMQ
config, plus GetInfraType / GetQueueConfig branches that map them
onto the mqs.NATSConfig the driver expects.

Static account lists and the watched accounts_dir can be used
independently or combined; the queue treats them as additive.
Adds a small NATS publisher to the local dev publish service, matching
the existing rabbitmq/aws_sqs/gcp_pubsub helpers. Reads URL/subject/
stream/consumer/creds from environment with defaults matching the
docker-compose example.

declareNATS() creates a work-queue stream + durable consumer so a
fresh local NATS server is usable in seconds.
Adds four integration tests covering the NATS driver:

- TestIntegrationMQ_NATS: basic publish + receive + ack via JetStream
- TestIntegrationMQ_NATS_TenantOverride: account.TenantID rewrites
  the payload's tenant_id field even when payload contains a value
- TestIntegrationMQ_NATS_MultiAccount: two accounts consumed in
  parallel, each tagged with its own tenant_id
- TestIntegrationMQ_NATS_AccountsDir: directory watcher picks up
  an account directory created after Init and starts consuming
  from it within a few seconds

Supporting infrastructure:

- internal/util/testinfra/nats.go: nats:2.10-alpine testcontainer
  with JetStream enabled
- internal/util/testutil/nats.go: stream/consumer declare + teardown
  helpers plus a small publish helper for injecting test events
- testinfra.Config.NATSURL + TEST_NATS_URL in .env.test

Drive-by: relax NATSAccountConfig validation so credentials_file is
optional (no-auth and token-via-URL deployments are legitimate);
the accounts-dir loader only defaults to user.creds when the file
actually exists in the account directory.
- docs/content/publishing/publish-from-nats.mdoc: new guide covering
  message structure, prerequisites, configuration (env + yaml), the
  accounts-dir layout, and the multi-tenancy / NATS-account pattern.
- .env.example: PUBLISH_NATS_SERVERS / PUBLISH_NATS_ACCOUNTS_DIR.
- .outpost.yaml.example: full publishmq.nats block under publishmq.
- contributing/mq.md: tick NATS in the supported-MQ list and add
  a section describing scope, configuration, infra ownership, and
  retry/visibility behavior.
- examples/docker-compose/compose-publish-nats.yml + helper script:
  single-node JetStream container for local development, paired with
  the existing publish dev service (method=nats).
- queue_nats.go pump: add 250ms backoff between non-fatal iter.Next()
  errors so the loop doesn't busy-spin during transient consumer
  unavailability (e.g. leadership change, connection blip).
- queue_nats.go reconcileFromDir: log addAccount failures instead of
  silently dropping them, so operators can spot bad creds / missing
  streams / unreachable servers when a tenant directory lands.
- nats_accounts.go loadAccountsFromDir: surface non-ENOENT os.Stat
  errors (permission, transient IO) instead of treating them as
  'subdirectory has no meta.yaml'.
- nats_accounts.go watcher: switch to a single reusable timer with
  Reset/Stop and proper channel drain, eliminating per-event timer
  allocations and the Stop-without-drain edge case.
- testutil/nats.go: replace strings.Contains substring matching on
  'stream not found' with errors.Is(err, jetstream.ErrStreamNotFound).
- testinfra/nats.go EnsureNATS: move the cfg.NATSURL check inside
  sync.Once.Do to close a data race between concurrent t.Parallel()
  callers and the container-start write path. Verified with -race.
- testinfra/testinfra.go: use strings.HasPrefix (nats:// and tls://)
  instead of strings.Contains for the scheme normalization.
Builds the patched outpost image (outpost + outpost-server, distroless,
headless) from this branch and pushes to ghcr.io/<owner>/outpost on
every push. Uses the built-in GITHUB_TOKEN, no external registry
credentials required.
The publishmq consumer constructs its queue via mqs.NewQueue and calls
Subscribe directly, never Init — exactly like the RabbitMQ driver, which
lazily connects inside Subscribe via a sync.Once. The NATS driver instead
required an explicit Init() and returned "nats: queue not initialized" from
Subscribe when it had not run, so the publishmq-consumer worker failed at
startup and was never restarted (the integration tests called Init directly
and so never exercised this path).

- Subscribe now runs connect() once via sync.Once (Init becomes a thin
  eager wrapper around the same once), mirroring queue_rabbitmq.go.
- connect() tolerates a zero-account start when AccountsDir is set: it
  starts the watcher and returns a working empty subscription that the
  watcher fills as credentials land. This also fixes the cold-start race
  where Outpost boots before a tenant's .creds have synced to the shared
  accounts directory.
- Subscribe clears a failed subscription so the queue is not pinned to a
  shut-down sub.
- Add a broker-free regression test for the Subscribe-without-Init +
  empty-accounts-dir path.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant