Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
112 changes: 112 additions & 0 deletions CLAUDE.md
Original file line number Diff line number Diff line change
@@ -1 +1,113 @@
# CLAUDE.md

This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository.

@AGENTS.md

---

## Architecture

### Module dependency order

```
bson → driver-core → driver-sync / driver-reactive-streams → language wrappers
```

When a change touches more than one module, apply it lowest-first (bson before driver-core,
driver-core before driver-sync). Public API lives in driver-sync, driver-reactive-streams, and
the language wrappers. driver-core owns all internals.

### Connection string → cluster creation pipeline

This is the critical path to understand before modifying any connection, cluster, or settings code.

```
ConnectionString driver-core: com.mongodb.ConnectionString
└─ MongoClientSettings.builder()
.applyConnectionString() driver-core: com.mongodb.MongoClientSettings
└─ ClusterSettings.Builder
.applyConnectionString() driver-core: com.mongodb.connection.ClusterSettings
sets ClusterConnectionMode:
LOAD_BALANCED loadBalanced=true in URI
MULTIPLE + srvHost set mongodb+srv://
SINGLE directConnection=true
MULTIPLE directConnection=false or multiple hosts
└─ MongoClients.create() driver-sync: com.mongodb.client.MongoClients
└─ Clusters.createCluster() driver-sync: com.mongodb.client.internal.Clusters
└─ DefaultClusterFactory
.createCluster() driver-core: com.mongodb.internal.connection.DefaultClusterFactory
→ LoadBalancedCluster (LOAD_BALANCED)
→ SingleServerCluster (SINGLE)
→ MultiServerCluster (MULTIPLE, no srvHost)
→ DnsMultiServerCluster (MULTIPLE + srvHost — resolves SRV DNS)
```

`ConnectionString` stores `isSrvProtocol` (bool) and the parsed host list.
`mongodb+srv://` auto-enables TLS and resolves TXT records for extra options.
Cluster environment detection (CosmosDB, DocumentDB) is a logging-only heuristic in
`DefaultClusterFactory.ClusterEnvironment` — it does not affect topology.

### How operations execute (the binding layer)

driver-core is async-first. Sync execution in driver-sync wraps every async operation.

```
MongoCollectionImpl / MongoDatabaseImpl / MongoClusterImpl (driver-sync internal)
└─ executeOperation(ReadOperation / WriteOperation)
└─ ClusterBinding binds a Cluster to a ReadPreference
└─ ConnectionSource selects a server, provides a Connection
└─ CommandProtocolImpl serializes + sends the wire-protocol message
└─ Stream (Socket / Netty / TlsChannel)
```

Key classes:
- `com.mongodb.internal.binding.ClusterBinding` — wraps a `Cluster` for sync execution
- `com.mongodb.internal.connection.Cluster` (interface) — server selection + connection checkout
- `com.mongodb.internal.operation.*` — all concrete operations (FindOperation, AggregateOperation, …)
- `com.mongodb.internal.connection.CommandProtocolImpl` — wire protocol serialization

### Transport / stream selection

`StreamFactoryHelper` selects the I/O backend from `TransportSettings`:

| TransportSettings | Sync backend | Async backend |
|---|---|---|
| `null` (default) | `SocketStreamFactory` | `AsynchronousSocketChannelStreamFactoryFactory` or `TlsChannelStreamFactoryFactory` (when TLS) |
| `NettyTransportSettings` | `NettyStreamFactoryFactory` | `NettyStreamFactoryFactory` |
| `AsyncTransportSettings` | throws | `AsynchronousSocketChannelStreamFactoryFactory` |

### Running a single test class

```bash
./gradlew :driver-core:test --tests "com.mongodb.internal.operation.FindOperationTest"
./gradlew :driver-sync:test --tests "com.mongodb.client.MongoClientSpecification"
./gradlew test -PjavaVersion=11 # run against a different JDK
```

### Scala tests

```bash
./gradlew scalaCheck # tests against all supported Scala versions (2.11 / 2.12 / 2.13 / 3)
```

### Javadoc validation

```bash
./gradlew docs # fails if any public API is missing Javadoc
```

### Key conventions not obvious from the code

- **Sync wraps async, not the other way around.** `MongoClusterImpl.executeOperation()` drives async
operations synchronously via a blocking `SingleResultCallback`. Never add logic to the sync path
that cannot be mirrored in the async path.
- **`com.mongodb.internal.*` is private API.** Never reference internal packages from public-API
classes, and never expose internal types through public method signatures.
- **New public packages must declare `@NonNullApi`** in `package-info.java` to opt into non-null by
default. In older packages without it, every parameter/return is implicitly nullable unless
annotated `@NonNull`.
- **`Locks.withLock()`** (not `synchronized`) is the required idiom for locking in async connection
code to avoid deadlocks with the driver's internal thread model.
- **All dependency versions live in `gradle/libs.versions.toml`.** Never hard-code a version string
in a `build.gradle.kts` file.
36 changes: 35 additions & 1 deletion driver-core/src/main/com/mongodb/ConnectionString.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;

Expand Down Expand Up @@ -286,6 +287,10 @@ public class ConnectionString {

private static final String MONGODB_PREFIX = "mongodb://";
private static final String MONGODB_SRV_PREFIX = "mongodb+srv://";
private static final Pattern STREAM_PROCESSING_HOST_PATTERN = Pattern.compile(
"^atlas-stream-[a-zA-Z0-9]+-[a-zA-Z0-9]+"
+ "\\.[a-zA-Z0-9-]+\\.a\\.query\\.mongodb(?:-[a-zA-Z]+)?\\.net(?::\\d+)?$",
Pattern.CASE_INSENSITIVE);
private static final Set<String> ALLOWED_OPTIONS_IN_TXT_RECORD =
new HashSet<>(asList("authsource", "replicaset", "loadbalanced"));
private static final Logger LOGGER = Loggers.getLogger("uri");
Expand All @@ -295,6 +300,7 @@ public class ConnectionString {

private final MongoCredential credential;
private final boolean isSrvProtocol;
private final boolean isAtlasStreamProcessingWorkspace;
private final List<String> hosts;
private final String database;
private final String collection;
Expand Down Expand Up @@ -428,6 +434,9 @@ public ConnectionString(final String connectionString, @Nullable final DnsClient
}
}
this.hosts = unresolvedHosts;
isAtlasStreamProcessingWorkspace = !isSrvProtocol
&& unresolvedHosts.size() == 1
&& STREAM_PROCESSING_HOST_PATTERN.matcher(unresolvedHosts.get(0)).matches();

// Process the authDB section
String nsPart;
Expand Down Expand Up @@ -469,6 +478,17 @@ public ConnectionString(final String connectionString, @Nullable final DnsClient
if (isSrvProtocol && !(combinedOptionsMaps.containsKey("tls") || combinedOptionsMaps.containsKey("ssl"))) {
combinedOptionsMaps.put("tls", singletonList("true"));
}
if (isAtlasStreamProcessingWorkspace) {
if (!(combinedOptionsMaps.containsKey("tls") || combinedOptionsMaps.containsKey("ssl"))) {
combinedOptionsMaps.put("tls", singletonList("true"));
}
if (!combinedOptionsMaps.containsKey("authsource")) {
combinedOptionsMaps.put("authsource", singletonList("admin"));
}
if (!combinedOptionsMaps.containsKey("directconnection")) {
combinedOptionsMaps.put("directconnection", singletonList("true"));
}
}
translateOptions(combinedOptionsMaps);

if (!isSrvProtocol && srvMaxHosts != null) {
Expand Down Expand Up @@ -1331,6 +1351,19 @@ public boolean isSrvProtocol() {
return isSrvProtocol;
}

/**
* Returns true if this connection string targets an Atlas Stream Processing workspace.
*
* <p>Workspace hosts match the pattern {@code atlas-stream-*.*a.query.mongodb*.net}.
* When true, TLS, {@code directConnection=true}, and {@code authSource=admin} are applied by default.</p>
*
* @return true if targeting a stream processing workspace
* @since 5.5
*/
public boolean isAtlasStreamProcessingWorkspace() {
return isAtlasStreamProcessingWorkspace;
}

/**
* Gets the maximum number of hosts to connect to when using SRV protocol.
*
Expand Down Expand Up @@ -1786,6 +1819,7 @@ public boolean equals(final Object o) {
}
ConnectionString that = (ConnectionString) o;
return isSrvProtocol == that.isSrvProtocol
&& isAtlasStreamProcessingWorkspace == that.isAtlasStreamProcessingWorkspace
&& Objects.equals(directConnection, that.directConnection)
&& Objects.equals(credential, that.credential)
&& Objects.equals(hosts, that.hosts)
Expand Down Expand Up @@ -1825,7 +1859,7 @@ public boolean equals(final Object o) {

@Override
public int hashCode() {
return Objects.hash(credential, isSrvProtocol, hosts, database, collection, directConnection, readPreference,
return Objects.hash(credential, isSrvProtocol, isAtlasStreamProcessingWorkspace, hosts, database, collection, directConnection, readPreference,
writeConcern, retryWrites, retryReads, readConcern, minConnectionPoolSize, maxConnectionPoolSize, maxWaitTime,
maxConnectionIdleTime, maxConnectionLifeTime, maxConnecting, connectTimeout, timeout, socketTimeout, sslEnabled,
sslInvalidHostnameAllowed, requiredReplicaSetName, serverSelectionTimeout, localThreshold, heartbeatFrequency,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
/*
* Copyright 2008-present MongoDB, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.mongodb.client.model;

import com.mongodb.lang.Nullable;
import org.bson.conversions.Bson;

/**
* Options for creating a stream processor.
*
* @since 5.5
*/
public class CreateStreamProcessorOptions {
@Nullable
private Bson dlq;
@Nullable
private String streamMetaFieldName;
@Nullable
private String tier;
@Nullable
private Boolean failover;

/**
* Constructs an instance with default values.
*/
public CreateStreamProcessorOptions() {
}

/**
* Gets the dead letter queue configuration document.
*
* @return the DLQ configuration, or {@code null} if not set
*/
@Nullable
public Bson getDlq() {
return dlq;
}

/**
* Sets the dead letter queue configuration document.
*
* @param dlq the DLQ configuration
* @return this
*/
public CreateStreamProcessorOptions dlq(@Nullable final Bson dlq) {
this.dlq = dlq;
return this;
}

/**
* Gets the field name used for stream metadata.
*
* @return the stream meta field name, or {@code null} if not set
*/
@Nullable
public String getStreamMetaFieldName() {
return streamMetaFieldName;
}

/**
* Sets the field name used for stream metadata.
*
* @param streamMetaFieldName the stream meta field name
* @return this
*/
public CreateStreamProcessorOptions streamMetaFieldName(@Nullable final String streamMetaFieldName) {
this.streamMetaFieldName = streamMetaFieldName;
return this;
}

/**
* Gets the compute tier for the stream processor.
*
* @return the tier, or {@code null} if not set
*/
@Nullable
public String getTier() {
return tier;
}

/**
* Sets the compute tier for the stream processor (e.g. {@code "SP10"}).
*
* @param tier the compute tier
* @return this
*/
public CreateStreamProcessorOptions tier(@Nullable final String tier) {
this.tier = tier;
return this;
}

/**
* Gets whether failover is enabled for the stream processor.
*
* @return {@code true} if failover is enabled, or {@code null} if not set
*/
@Nullable
public Boolean getFailover() {
return failover;
}

/**
* Sets whether failover is enabled for the stream processor.
*
* @param failover {@code true} to enable failover
* @return this
*/
public CreateStreamProcessorOptions failover(@Nullable final Boolean failover) {
this.failover = failover;
return this;
}

@Override
public String toString() {
return "CreateStreamProcessorOptions{"
+ "dlq=" + dlq
+ ", streamMetaFieldName='" + streamMetaFieldName + '\''
+ ", tier='" + tier + '\''
+ ", failover=" + failover
+ '}';
}
}
Loading