Skip to content

Add Atlas Stream Processing Support#1971

Draft
mesbah1242 wants to merge 10 commits into
mongodb:mainfrom
mesbah1242:add-streams-driver
Draft

Add Atlas Stream Processing Support#1971
mesbah1242 wants to merge 10 commits into
mongodb:mainfrom
mesbah1242:add-streams-driver

Conversation

@mesbah1242
Copy link
Copy Markdown

Introduces StreamProcessingClient and StreamProcessingClients as a distinct client entry point for connecting to Atlas Stream Processing workspaces (hosts matching atlas-stream-*.a.query.mongodb.net).

ConnectionString now detects workspace hosts and automatically applies TLS, loadBalanced=true, and authSource=admin as defaults, routing connections through the existing LoadBalancedCluster path without requiring a new cluster type.

mesbah1242 added 10 commits May 12, 2026 14:48
Introduces StreamProcessingClient and StreamProcessingClients as a
distinct client entry point for connecting to Atlas Stream Processing
workspaces (hosts matching atlas-stream-*.*a.query.mongodb*.net).

ConnectionString now detects workspace hosts and automatically applies
TLS, loadBalanced=true, and authSource=admin as defaults, routing
connections through the existing LoadBalancedCluster path without
requiring a new cluster type.
…erations

Phase 1 — driver-core model classes:
- CreateStreamProcessorOptions, StartStreamProcessorOptions, FailoverOptions
- GetStreamProcessorStatsOptions, GetStreamProcessorSamplesOptions
- GetStreamProcessorSamplesResult, StreamProcessorInfo (builder pattern)

Phase 2 — driver-sync public interfaces:
- StreamProcessors interface (create, get, getInfo)
- StreamProcessor interface (start, stop, drop, stats, getStreamProcessorSamples)
- StreamProcessingClient.streamProcessors() added; stub impl in StreamProcessingClientImpl

Phase 3 — driver-core internal operations (all target admin db):
- CreateStreamProcessorOperation, StartStreamProcessorOperation (startAfter intentionally omitted)
- StopStreamProcessorOperation, DropStreamProcessorOperation
- GetStreamProcessorOperation, GetStreamProcessorStatsOperation (retryable reads)
- StartSampleStreamProcessorOperation, GetMoreSampleStreamProcessorOperation (custom sample cursor)
- StreamProcessorImpl: delegates start/stop/drop/stats/getStreamProcessorSamples
  to their respective operations; sample cursor routing (startSample then getMore
  on first call, getMore-only on subsequent calls)
- StreamProcessorsImpl: converts List<? extends Bson> pipeline and Bson dlq to
  BsonDocument before constructing operations; get() returns a local handle with
  no server round-trip
- StreamProcessingClientImpl: stores settings, wires retryReads, and replaces the
  UnsupportedOperationException stub with a SimpleOperationExecutor inner class
  that wraps ClusterBinding + OperationContext.simpleOperationContext()
- Add StreamProcessorOperationsTest: verifies wire command shapes for all
  8 ASP operations (stop, drop, create, start, startSample, getMoreSample,
  getProcessor, getProcessorStats) including that startAfter is never sent
- Add StreamProcessorImplTest and StreamProcessorsImplTest: verify correct
  operation types are dispatched and sample cursor lifecycle (start+getMore
  on first call, getMore-only with existing cursorId)
- Fix cross-module @see/@link Javadoc references in driver-core model classes
  (StreamProcessor/StreamProcessors are in driver-sync, not driver-core)
- Fix SpotBugs NP_NULL_ON_SOME_PATH_FROM_RETURN_VALUE in StreamProcessorImpl
  and StreamProcessorsImpl by using local variables for nullable getters
- Fix unused ClusterSettings import in StreamProcessingClientsTest
…sing

- Use directConnection=true instead of loadBalanced=true for ASP workspace
  hosts: load-balanced mode requires serviceId in the hello response which
  ASP workspaces do not provide
- Parse getStreamProcessor response correctly: unwrap the "result" subdocument,
  resolve id from _id/tenantID/id, and treat all fields beyond name/state/
  pipeline as optional with sensible defaults since the server omits many
  of the fields we originally modelled
- Update ConnectionStringUnitTest and StreamProcessorOperationsTest to match
  the new connection mode and actual server response shape
- Add Checkstyle suppressions for AspSmokeTest (intentional console output)
…reamProcessor

tenantID in the server response is the workspace tenant identifier, not the
processor's unique id. Add a separate tenantId field to StreamProcessorInfo
so callers can access it without conflating it with the processor id.
The smoke test is not tracked in the repository, so its suppressions
should not be committed.
The server returns sample documents under "messages" not "nextBatch".
- Exclude sync-only StreamProcessingClient/StreamProcessingClients/
  StreamProcessor/StreamProcessors from the com.mongodb.client mirror
  check (the Scala driver wraps reactive streams, not sync)
- Add type aliases for the 7 new com.mongodb.client.model ASP types to
  org.mongodb.scala.model so the model mirror check passes
Update Javadoc to say directConnection=true instead of loadBalanced=true,
reflecting the connection mode change made in an earlier commit.
@mesbah1242 mesbah1242 changed the title Add Atlas Stream Processing workspace connectivity Add Atlas Stream Processing Support May 14, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants