diff --git a/CLAUDE.md b/CLAUDE.md index 43c994c2d3..bb43217c70 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -1 +1,113 @@ +# CLAUDE.md + +This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository. + @AGENTS.md + +--- + +## Architecture + +### Module dependency order + +``` +bson → driver-core → driver-sync / driver-reactive-streams → language wrappers +``` + +When a change touches more than one module, apply it lowest-first (bson before driver-core, +driver-core before driver-sync). Public API lives in driver-sync, driver-reactive-streams, and +the language wrappers. driver-core owns all internals. + +### Connection string → cluster creation pipeline + +This is the critical path to understand before modifying any connection, cluster, or settings code. + +``` +ConnectionString driver-core: com.mongodb.ConnectionString + └─ MongoClientSettings.builder() + .applyConnectionString() driver-core: com.mongodb.MongoClientSettings + └─ ClusterSettings.Builder + .applyConnectionString() driver-core: com.mongodb.connection.ClusterSettings + sets ClusterConnectionMode: + LOAD_BALANCED loadBalanced=true in URI + MULTIPLE + srvHost set mongodb+srv:// + SINGLE directConnection=true + MULTIPLE directConnection=false or multiple hosts + └─ MongoClients.create() driver-sync: com.mongodb.client.MongoClients + └─ Clusters.createCluster() driver-sync: com.mongodb.client.internal.Clusters + └─ DefaultClusterFactory + .createCluster() driver-core: com.mongodb.internal.connection.DefaultClusterFactory + → LoadBalancedCluster (LOAD_BALANCED) + → SingleServerCluster (SINGLE) + → MultiServerCluster (MULTIPLE, no srvHost) + → DnsMultiServerCluster (MULTIPLE + srvHost — resolves SRV DNS) +``` + +`ConnectionString` stores `isSrvProtocol` (bool) and the parsed host list. +`mongodb+srv://` auto-enables TLS and resolves TXT records for extra options. +Cluster environment detection (CosmosDB, DocumentDB) is a logging-only heuristic in +`DefaultClusterFactory.ClusterEnvironment` — it does not affect topology. + +### How operations execute (the binding layer) + +driver-core is async-first. Sync execution in driver-sync wraps every async operation. + +``` +MongoCollectionImpl / MongoDatabaseImpl / MongoClusterImpl (driver-sync internal) + └─ executeOperation(ReadOperation / WriteOperation) + └─ ClusterBinding binds a Cluster to a ReadPreference + └─ ConnectionSource selects a server, provides a Connection + └─ CommandProtocolImpl serializes + sends the wire-protocol message + └─ Stream (Socket / Netty / TlsChannel) +``` + +Key classes: +- `com.mongodb.internal.binding.ClusterBinding` — wraps a `Cluster` for sync execution +- `com.mongodb.internal.connection.Cluster` (interface) — server selection + connection checkout +- `com.mongodb.internal.operation.*` — all concrete operations (FindOperation, AggregateOperation, …) +- `com.mongodb.internal.connection.CommandProtocolImpl` — wire protocol serialization + +### Transport / stream selection + +`StreamFactoryHelper` selects the I/O backend from `TransportSettings`: + +| TransportSettings | Sync backend | Async backend | +|---|---|---| +| `null` (default) | `SocketStreamFactory` | `AsynchronousSocketChannelStreamFactoryFactory` or `TlsChannelStreamFactoryFactory` (when TLS) | +| `NettyTransportSettings` | `NettyStreamFactoryFactory` | `NettyStreamFactoryFactory` | +| `AsyncTransportSettings` | throws | `AsynchronousSocketChannelStreamFactoryFactory` | + +### Running a single test class + +```bash +./gradlew :driver-core:test --tests "com.mongodb.internal.operation.FindOperationTest" +./gradlew :driver-sync:test --tests "com.mongodb.client.MongoClientSpecification" +./gradlew test -PjavaVersion=11 # run against a different JDK +``` + +### Scala tests + +```bash +./gradlew scalaCheck # tests against all supported Scala versions (2.11 / 2.12 / 2.13 / 3) +``` + +### Javadoc validation + +```bash +./gradlew docs # fails if any public API is missing Javadoc +``` + +### Key conventions not obvious from the code + +- **Sync wraps async, not the other way around.** `MongoClusterImpl.executeOperation()` drives async + operations synchronously via a blocking `SingleResultCallback`. Never add logic to the sync path + that cannot be mirrored in the async path. +- **`com.mongodb.internal.*` is private API.** Never reference internal packages from public-API + classes, and never expose internal types through public method signatures. +- **New public packages must declare `@NonNullApi`** in `package-info.java` to opt into non-null by + default. In older packages without it, every parameter/return is implicitly nullable unless + annotated `@NonNull`. +- **`Locks.withLock()`** (not `synchronized`) is the required idiom for locking in async connection + code to avoid deadlocks with the driver's internal thread model. +- **All dependency versions live in `gradle/libs.versions.toml`.** Never hard-code a version string + in a `build.gradle.kts` file. diff --git a/driver-core/src/main/com/mongodb/ConnectionString.java b/driver-core/src/main/com/mongodb/ConnectionString.java index 659e8fd02a..c26441c14f 100644 --- a/driver-core/src/main/com/mongodb/ConnectionString.java +++ b/driver-core/src/main/com/mongodb/ConnectionString.java @@ -49,6 +49,7 @@ import java.util.Objects; import java.util.Set; import java.util.concurrent.TimeUnit; +import java.util.regex.Pattern; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -286,6 +287,10 @@ public class ConnectionString { private static final String MONGODB_PREFIX = "mongodb://"; private static final String MONGODB_SRV_PREFIX = "mongodb+srv://"; + private static final Pattern STREAM_PROCESSING_HOST_PATTERN = Pattern.compile( + "^atlas-stream-[a-zA-Z0-9]+-[a-zA-Z0-9]+" + + "\\.[a-zA-Z0-9-]+\\.a\\.query\\.mongodb(?:-[a-zA-Z]+)?\\.net(?::\\d+)?$", + Pattern.CASE_INSENSITIVE); private static final Set ALLOWED_OPTIONS_IN_TXT_RECORD = new HashSet<>(asList("authsource", "replicaset", "loadbalanced")); private static final Logger LOGGER = Loggers.getLogger("uri"); @@ -295,6 +300,7 @@ public class ConnectionString { private final MongoCredential credential; private final boolean isSrvProtocol; + private final boolean isAtlasStreamProcessingWorkspace; private final List hosts; private final String database; private final String collection; @@ -428,6 +434,9 @@ public ConnectionString(final String connectionString, @Nullable final DnsClient } } this.hosts = unresolvedHosts; + isAtlasStreamProcessingWorkspace = !isSrvProtocol + && unresolvedHosts.size() == 1 + && STREAM_PROCESSING_HOST_PATTERN.matcher(unresolvedHosts.get(0)).matches(); // Process the authDB section String nsPart; @@ -469,6 +478,17 @@ public ConnectionString(final String connectionString, @Nullable final DnsClient if (isSrvProtocol && !(combinedOptionsMaps.containsKey("tls") || combinedOptionsMaps.containsKey("ssl"))) { combinedOptionsMaps.put("tls", singletonList("true")); } + if (isAtlasStreamProcessingWorkspace) { + if (!(combinedOptionsMaps.containsKey("tls") || combinedOptionsMaps.containsKey("ssl"))) { + combinedOptionsMaps.put("tls", singletonList("true")); + } + if (!combinedOptionsMaps.containsKey("authsource")) { + combinedOptionsMaps.put("authsource", singletonList("admin")); + } + if (!combinedOptionsMaps.containsKey("directconnection")) { + combinedOptionsMaps.put("directconnection", singletonList("true")); + } + } translateOptions(combinedOptionsMaps); if (!isSrvProtocol && srvMaxHosts != null) { @@ -1331,6 +1351,19 @@ public boolean isSrvProtocol() { return isSrvProtocol; } + /** + * Returns true if this connection string targets an Atlas Stream Processing workspace. + * + *

Workspace hosts match the pattern {@code atlas-stream-*.*a.query.mongodb*.net}. + * When true, TLS, {@code directConnection=true}, and {@code authSource=admin} are applied by default.

+ * + * @return true if targeting a stream processing workspace + * @since 5.5 + */ + public boolean isAtlasStreamProcessingWorkspace() { + return isAtlasStreamProcessingWorkspace; + } + /** * Gets the maximum number of hosts to connect to when using SRV protocol. * @@ -1786,6 +1819,7 @@ public boolean equals(final Object o) { } ConnectionString that = (ConnectionString) o; return isSrvProtocol == that.isSrvProtocol + && isAtlasStreamProcessingWorkspace == that.isAtlasStreamProcessingWorkspace && Objects.equals(directConnection, that.directConnection) && Objects.equals(credential, that.credential) && Objects.equals(hosts, that.hosts) @@ -1825,7 +1859,7 @@ public boolean equals(final Object o) { @Override public int hashCode() { - return Objects.hash(credential, isSrvProtocol, hosts, database, collection, directConnection, readPreference, + return Objects.hash(credential, isSrvProtocol, isAtlasStreamProcessingWorkspace, hosts, database, collection, directConnection, readPreference, writeConcern, retryWrites, retryReads, readConcern, minConnectionPoolSize, maxConnectionPoolSize, maxWaitTime, maxConnectionIdleTime, maxConnectionLifeTime, maxConnecting, connectTimeout, timeout, socketTimeout, sslEnabled, sslInvalidHostnameAllowed, requiredReplicaSetName, serverSelectionTimeout, localThreshold, heartbeatFrequency, diff --git a/driver-core/src/main/com/mongodb/client/model/CreateStreamProcessorOptions.java b/driver-core/src/main/com/mongodb/client/model/CreateStreamProcessorOptions.java new file mode 100644 index 0000000000..6a79951a12 --- /dev/null +++ b/driver-core/src/main/com/mongodb/client/model/CreateStreamProcessorOptions.java @@ -0,0 +1,136 @@ +/* + * Copyright 2008-present MongoDB, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.mongodb.client.model; + +import com.mongodb.lang.Nullable; +import org.bson.conversions.Bson; + +/** + * Options for creating a stream processor. + * + * @since 5.5 + */ +public class CreateStreamProcessorOptions { + @Nullable + private Bson dlq; + @Nullable + private String streamMetaFieldName; + @Nullable + private String tier; + @Nullable + private Boolean failover; + + /** + * Constructs an instance with default values. + */ + public CreateStreamProcessorOptions() { + } + + /** + * Gets the dead letter queue configuration document. + * + * @return the DLQ configuration, or {@code null} if not set + */ + @Nullable + public Bson getDlq() { + return dlq; + } + + /** + * Sets the dead letter queue configuration document. + * + * @param dlq the DLQ configuration + * @return this + */ + public CreateStreamProcessorOptions dlq(@Nullable final Bson dlq) { + this.dlq = dlq; + return this; + } + + /** + * Gets the field name used for stream metadata. + * + * @return the stream meta field name, or {@code null} if not set + */ + @Nullable + public String getStreamMetaFieldName() { + return streamMetaFieldName; + } + + /** + * Sets the field name used for stream metadata. + * + * @param streamMetaFieldName the stream meta field name + * @return this + */ + public CreateStreamProcessorOptions streamMetaFieldName(@Nullable final String streamMetaFieldName) { + this.streamMetaFieldName = streamMetaFieldName; + return this; + } + + /** + * Gets the compute tier for the stream processor. + * + * @return the tier, or {@code null} if not set + */ + @Nullable + public String getTier() { + return tier; + } + + /** + * Sets the compute tier for the stream processor (e.g. {@code "SP10"}). + * + * @param tier the compute tier + * @return this + */ + public CreateStreamProcessorOptions tier(@Nullable final String tier) { + this.tier = tier; + return this; + } + + /** + * Gets whether failover is enabled for the stream processor. + * + * @return {@code true} if failover is enabled, or {@code null} if not set + */ + @Nullable + public Boolean getFailover() { + return failover; + } + + /** + * Sets whether failover is enabled for the stream processor. + * + * @param failover {@code true} to enable failover + * @return this + */ + public CreateStreamProcessorOptions failover(@Nullable final Boolean failover) { + this.failover = failover; + return this; + } + + @Override + public String toString() { + return "CreateStreamProcessorOptions{" + + "dlq=" + dlq + + ", streamMetaFieldName='" + streamMetaFieldName + '\'' + + ", tier='" + tier + '\'' + + ", failover=" + failover + + '}'; + } +} diff --git a/driver-core/src/main/com/mongodb/client/model/FailoverOptions.java b/driver-core/src/main/com/mongodb/client/model/FailoverOptions.java new file mode 100644 index 0000000000..e09f656b5c --- /dev/null +++ b/driver-core/src/main/com/mongodb/client/model/FailoverOptions.java @@ -0,0 +1,104 @@ +/* + * Copyright 2008-present MongoDB, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.mongodb.client.model; + +import com.mongodb.lang.Nullable; + +import static com.mongodb.assertions.Assertions.notNull; + +/** + * Options for triggering a stream processor failover. + * + * @since 5.5 + * @see StartStreamProcessorOptions#failover(FailoverOptions) + */ +public class FailoverOptions { + private final String region; + @Nullable + private String mode; + @Nullable + private Boolean dryRun; + + /** + * Constructs an instance with the required target region. + * + * @param region the target region for failover; must not be {@code null} + */ + public FailoverOptions(final String region) { + this.region = notNull("region", region); + } + + /** + * Gets the target region for failover. + * + * @return the target region + */ + public String getRegion() { + return region; + } + + /** + * Gets the failover mode. + * + * @return the failover mode (e.g. {@code "GRACEFUL"} or {@code "FORCED"}), or {@code null} if not set + */ + @Nullable + public String getMode() { + return mode; + } + + /** + * Sets the failover mode. Valid values are {@code "GRACEFUL"} (default) and {@code "FORCED"}. + * + * @param mode the failover mode + * @return this + */ + public FailoverOptions mode(@Nullable final String mode) { + this.mode = mode; + return this; + } + + /** + * Gets whether this is a dry-run validation. + * + * @return {@code true} if this is a dry run, or {@code null} if not set + */ + @Nullable + public Boolean getDryRun() { + return dryRun; + } + + /** + * Sets whether to validate the failover request without executing it. + * + * @param dryRun {@code true} to validate without executing + * @return this + */ + public FailoverOptions dryRun(@Nullable final Boolean dryRun) { + this.dryRun = dryRun; + return this; + } + + @Override + public String toString() { + return "FailoverOptions{" + + "region='" + region + '\'' + + ", mode='" + mode + '\'' + + ", dryRun=" + dryRun + + '}'; + } +} diff --git a/driver-core/src/main/com/mongodb/client/model/GetStreamProcessorSamplesOptions.java b/driver-core/src/main/com/mongodb/client/model/GetStreamProcessorSamplesOptions.java new file mode 100644 index 0000000000..01e2c8c4c0 --- /dev/null +++ b/driver-core/src/main/com/mongodb/client/model/GetStreamProcessorSamplesOptions.java @@ -0,0 +1,130 @@ +/* + * Copyright 2008-present MongoDB, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.mongodb.client.model; + +import com.mongodb.lang.Nullable; + +/** + * Options for sampling output from a running stream processor. + * + *

On the first call (when {@code cursorId} is absent or zero), a new sample cursor is opened + * via {@code startSampleStreamProcessor} and {@code limit} controls the maximum number of + * documents to sample. On subsequent calls, the cursor is advanced via + * {@code getMoreSampleStreamProcessor} and {@code batchSize} controls how many documents + * are returned per call.

+ * + * @since 5.5 + */ +public class GetStreamProcessorSamplesOptions { + @Nullable + private Long cursorId; + @Nullable + private Integer limit; + @Nullable + private Integer batchSize; + + /** + * Constructs an instance with default values. + */ + public GetStreamProcessorSamplesOptions() { + } + + /** + * Gets the cursor ID from a previous call. + * + *

If absent or zero, a new sample cursor is opened. If non-zero, the next batch + * is fetched from the existing cursor.

+ * + * @return the cursor ID, or {@code null} if not set + */ + @Nullable + public Long getCursorId() { + return cursorId; + } + + /** + * Sets the cursor ID from a previous {@code StreamProcessor#getStreamProcessorSamples} call. + * + * @param cursorId the cursor ID; {@code 0} or {@code null} opens a new cursor + * @return this + */ + public GetStreamProcessorSamplesOptions cursorId(@Nullable final Long cursorId) { + this.cursorId = cursorId; + return this; + } + + /** + * Gets the maximum number of documents to sample. + * + *

Only applied on the initial call when opening a new cursor.

+ * + * @return the limit, or {@code null} if not set + */ + @Nullable + public Integer getLimit() { + return limit; + } + + /** + * Sets the maximum number of documents to sample. + * + *

Only sent on the initial call (i.e. when {@code cursorId} is absent or zero). + * Ignored on subsequent calls.

+ * + * @param limit the maximum number of documents to sample + * @return this + */ + public GetStreamProcessorSamplesOptions limit(@Nullable final Integer limit) { + this.limit = limit; + return this; + } + + /** + * Gets the number of documents to return per batch. + * + *

Only applied on subsequent calls when advancing an existing cursor.

+ * + * @return the batch size, or {@code null} if not set + */ + @Nullable + public Integer getBatchSize() { + return batchSize; + } + + /** + * Sets the number of documents to return per batch. + * + *

Only sent on subsequent calls (i.e. when {@code cursorId} is non-zero). + * Ignored on the initial call.

+ * + * @param batchSize the number of documents per batch + * @return this + */ + public GetStreamProcessorSamplesOptions batchSize(@Nullable final Integer batchSize) { + this.batchSize = batchSize; + return this; + } + + @Override + public String toString() { + return "GetStreamProcessorSamplesOptions{" + + "cursorId=" + cursorId + + ", limit=" + limit + + ", batchSize=" + batchSize + + '}'; + } +} diff --git a/driver-core/src/main/com/mongodb/client/model/GetStreamProcessorSamplesResult.java b/driver-core/src/main/com/mongodb/client/model/GetStreamProcessorSamplesResult.java new file mode 100644 index 0000000000..b26d80ddd4 --- /dev/null +++ b/driver-core/src/main/com/mongodb/client/model/GetStreamProcessorSamplesResult.java @@ -0,0 +1,77 @@ +/* + * Copyright 2008-present MongoDB, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.mongodb.client.model; + +import org.bson.BsonDocument; + +import java.util.Collections; +import java.util.List; + +import static com.mongodb.assertions.Assertions.notNull; + +/** + * The result of a {@code StreamProcessor#getStreamProcessorSamples} call. + * + *

Callers MUST check {@link #getCursorId()}: a value of {@code 0} means the cursor is + * exhausted and no further calls should be made.

+ * + * @since 5.5 + */ +public class GetStreamProcessorSamplesResult { + private final long cursorId; + private final List documents; + + /** + * Constructs an instance. + * + * @param cursorId the cursor ID; {@code 0} indicates the cursor is exhausted + * @param documents the sampled documents returned by this call + */ + public GetStreamProcessorSamplesResult(final long cursorId, final List documents) { + this.cursorId = cursorId; + this.documents = Collections.unmodifiableList(notNull("documents", documents)); + } + + /** + * Gets the cursor ID to pass to the next call. + * + *

A value of {@code 0} means the cursor is exhausted; callers MUST NOT make further + * calls with a zero cursor ID.

+ * + * @return the cursor ID + */ + public long getCursorId() { + return cursorId; + } + + /** + * Gets the batch of sampled documents returned by this call. + * + * @return an unmodifiable list of sampled documents + */ + public List getDocuments() { + return documents; + } + + @Override + public String toString() { + return "GetStreamProcessorSamplesResult{" + + "cursorId=" + cursorId + + ", documents=" + documents + + '}'; + } +} diff --git a/driver-core/src/main/com/mongodb/client/model/GetStreamProcessorStatsOptions.java b/driver-core/src/main/com/mongodb/client/model/GetStreamProcessorStatsOptions.java new file mode 100644 index 0000000000..b03cbc9375 --- /dev/null +++ b/driver-core/src/main/com/mongodb/client/model/GetStreamProcessorStatsOptions.java @@ -0,0 +1,63 @@ +/* + * Copyright 2008-present MongoDB, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.mongodb.client.model; + +import com.mongodb.lang.Nullable; + +/** + * Options for retrieving stream processor statistics. + * + * @since 5.5 + */ +public class GetStreamProcessorStatsOptions { + @Nullable + private Boolean verbose; + + /** + * Constructs an instance with default values. + */ + public GetStreamProcessorStatsOptions() { + } + + /** + * Gets whether verbose per-operator statistics are included. + * + * @return {@code true} if verbose statistics are requested, or {@code null} if not set + */ + @Nullable + public Boolean getVerbose() { + return verbose; + } + + /** + * Sets whether to include per-operator statistics in the response. + * + * @param verbose {@code true} to include per-operator statistics + * @return this + */ + public GetStreamProcessorStatsOptions verbose(@Nullable final Boolean verbose) { + this.verbose = verbose; + return this; + } + + @Override + public String toString() { + return "GetStreamProcessorStatsOptions{" + + "verbose=" + verbose + + '}'; + } +} diff --git a/driver-core/src/main/com/mongodb/client/model/StartStreamProcessorOptions.java b/driver-core/src/main/com/mongodb/client/model/StartStreamProcessorOptions.java new file mode 100644 index 0000000000..0f1669dbb2 --- /dev/null +++ b/driver-core/src/main/com/mongodb/client/model/StartStreamProcessorOptions.java @@ -0,0 +1,184 @@ +/* + * Copyright 2008-present MongoDB, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.mongodb.client.model; + +import com.mongodb.lang.Nullable; +import org.bson.BsonTimestamp; + +/** + * Options for starting a stream processor. + * + * @since 5.5 + */ +public class StartStreamProcessorOptions { + @Nullable + private Integer workers; + @Nullable + private Boolean clearCheckpoints; + @Nullable + private BsonTimestamp startAtOperationTime; + @Nullable + private String tier; + @Nullable + private Boolean enableAutoScaling; + @Nullable + private FailoverOptions failover; + + /** + * Constructs an instance with default values. + */ + public StartStreamProcessorOptions() { + } + + /** + * Gets the number of workers. + * + * @return the number of workers, or {@code null} if not set + */ + @Nullable + public Integer getWorkers() { + return workers; + } + + /** + * Sets the number of workers for the stream processor. + * + * @param workers the number of workers + * @return this + */ + public StartStreamProcessorOptions workers(@Nullable final Integer workers) { + this.workers = workers; + return this; + } + + /** + * Gets whether checkpoints should be cleared on start. + * + * @return {@code true} if checkpoints should be cleared, or {@code null} if not set + */ + @Nullable + public Boolean getClearCheckpoints() { + return clearCheckpoints; + } + + /** + * Sets whether to clear checkpoints when starting the processor. + * + * @param clearCheckpoints {@code true} to clear checkpoints + * @return this + */ + public StartStreamProcessorOptions clearCheckpoints(@Nullable final Boolean clearCheckpoints) { + this.clearCheckpoints = clearCheckpoints; + return this; + } + + /** + * Gets the operation time from which to start processing. + * + * @return the start operation time, or {@code null} if not set + */ + @Nullable + public BsonTimestamp getStartAtOperationTime() { + return startAtOperationTime; + } + + /** + * Sets the operation time from which to resume processing. + * + * @param startAtOperationTime the operation time + * @return this + */ + public StartStreamProcessorOptions startAtOperationTime(@Nullable final BsonTimestamp startAtOperationTime) { + this.startAtOperationTime = startAtOperationTime; + return this; + } + + /** + * Gets the compute tier. + * + * @return the tier, or {@code null} if not set + */ + @Nullable + public String getTier() { + return tier; + } + + /** + * Sets the compute tier. Valid values: {@code "SP2"}, {@code "SP5"}, {@code "SP10"}, {@code "SP30"}, {@code "SP50"}. + * + * @param tier the compute tier + * @return this + */ + public StartStreamProcessorOptions tier(@Nullable final String tier) { + this.tier = tier; + return this; + } + + /** + * Gets whether auto-scaling is enabled. + * + * @return {@code true} if auto-scaling is enabled, or {@code null} if not set + */ + @Nullable + public Boolean getEnableAutoScaling() { + return enableAutoScaling; + } + + /** + * Sets whether to enable auto-scaling. + * + * @param enableAutoScaling {@code true} to enable auto-scaling + * @return this + */ + public StartStreamProcessorOptions enableAutoScaling(@Nullable final Boolean enableAutoScaling) { + this.enableAutoScaling = enableAutoScaling; + return this; + } + + /** + * Gets the failover options. + * + * @return the failover options, or {@code null} if not set + */ + @Nullable + public FailoverOptions getFailover() { + return failover; + } + + /** + * Sets the failover options. + * + * @param failover the failover options + * @return this + */ + public StartStreamProcessorOptions failover(@Nullable final FailoverOptions failover) { + this.failover = failover; + return this; + } + + @Override + public String toString() { + return "StartStreamProcessorOptions{" + + "workers=" + workers + + ", clearCheckpoints=" + clearCheckpoints + + ", startAtOperationTime=" + startAtOperationTime + + ", tier='" + tier + '\'' + + ", enableAutoScaling=" + enableAutoScaling + + ", failover=" + failover + + '}'; + } +} diff --git a/driver-core/src/main/com/mongodb/client/model/StreamProcessorInfo.java b/driver-core/src/main/com/mongodb/client/model/StreamProcessorInfo.java new file mode 100644 index 0000000000..a7990da7d7 --- /dev/null +++ b/driver-core/src/main/com/mongodb/client/model/StreamProcessorInfo.java @@ -0,0 +1,588 @@ +/* + * Copyright 2008-present MongoDB, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.mongodb.client.model; + +import com.mongodb.lang.Nullable; +import org.bson.BsonDocument; + +import java.util.Collections; +import java.util.Date; +import java.util.List; + +import static com.mongodb.assertions.Assertions.notNull; + +/** + * Information about a stream processor, as returned by {@code getStreamProcessor}. + * + *

The {@code state} field is returned as a raw string. Known values include + * {@code "CREATING"}, {@code "VALIDATING"}, {@code "CREATED"}, {@code "PROVISIONING"}, + * {@code "STARTED"}, {@code "STOPPING"}, {@code "STOPPED"}, {@code "DROPPED"}, and + * {@code "FAILED"}, but additional states may be introduced in future server versions.

+ * + * @since 5.5 + */ +public final class StreamProcessorInfo { + private final String id; + @Nullable + private final String tenantId; + private final String name; + private final String state; + private final List pipeline; + private final int pipelineVersion; + @Nullable + private final String tier; + @Nullable + private final BsonDocument dlq; + @Nullable + private final String streamMetaFieldName; + private final boolean enableAutoScaling; + private final boolean failoverEnabled; + private final String activeRegion; + private final String workspaceDefaultRegion; + @Nullable + private final Date lastStateChange; + @Nullable + private final Date lastModifiedAt; + private final String modifiedBy; + private final boolean hasStarted; + private final String errorMsg; + private final boolean errorRetryable; + @Nullable + private final Integer errorCode; + + private StreamProcessorInfo(final Builder builder) { + this.id = notNull("id", builder.id); + this.tenantId = builder.tenantId; + this.name = notNull("name", builder.name); + this.state = notNull("state", builder.state); + this.pipeline = Collections.unmodifiableList(notNull("pipeline", builder.pipeline)); + this.pipelineVersion = builder.pipelineVersion; + this.tier = builder.tier; + this.dlq = builder.dlq; + this.streamMetaFieldName = builder.streamMetaFieldName; + this.enableAutoScaling = builder.enableAutoScaling; + this.failoverEnabled = builder.failoverEnabled; + this.activeRegion = notNull("activeRegion", builder.activeRegion); + this.workspaceDefaultRegion = notNull("workspaceDefaultRegion", builder.workspaceDefaultRegion); + this.lastStateChange = builder.lastStateChange; + this.lastModifiedAt = builder.lastModifiedAt; + this.modifiedBy = notNull("modifiedBy", builder.modifiedBy); + this.hasStarted = builder.hasStarted; + this.errorMsg = notNull("errorMsg", builder.errorMsg); + this.errorRetryable = builder.errorRetryable; + this.errorCode = builder.errorCode; + } + + /** + * Creates a new {@link Builder}. + * + * @return a new builder + */ + public static Builder builder() { + return new Builder(); + } + + /** + * Gets the processor ID. + * + * @return the processor ID + */ + public String getId() { + return id; + } + + /** + * Gets the tenant ID associated with this processor. + * + * @return the tenant ID, or {@code null} if not provided by the server + */ + @Nullable + public String getTenantId() { + return tenantId; + } + + /** + * Gets the processor name. + * + * @return the processor name + */ + public String getName() { + return name; + } + + /** + * Gets the current state of the processor. + * + *

The value is returned as-is from the server. Known values include {@code "CREATING"}, + * {@code "VALIDATING"}, {@code "CREATED"}, {@code "PROVISIONING"}, {@code "STARTED"}, + * {@code "STOPPING"}, {@code "STOPPED"}, {@code "DROPPED"}, and {@code "FAILED"}, + * but additional states may be introduced in future server versions.

+ * + * @return the processor state + */ + public String getState() { + return state; + } + + /** + * Gets the processor pipeline. + * + * @return an unmodifiable list of pipeline stage documents + */ + public List getPipeline() { + return pipeline; + } + + /** + * Gets the pipeline version, incremented on each successful modification. + * + * @return the pipeline version + */ + public int getPipelineVersion() { + return pipelineVersion; + } + + /** + * Gets the compute tier. + * + * @return the tier (e.g. {@code "SP10"}), or {@code null} if not set + */ + @Nullable + public String getTier() { + return tier; + } + + /** + * Gets the dead letter queue configuration. + * + * @return the DLQ configuration document, or {@code null} if not configured + */ + @Nullable + public BsonDocument getDlq() { + return dlq; + } + + /** + * Gets the field name used for stream metadata. + * + * @return the stream meta field name, or {@code null} if not set + */ + @Nullable + public String getStreamMetaFieldName() { + return streamMetaFieldName; + } + + /** + * Gets whether auto-scaling is enabled. + * + * @return {@code true} if auto-scaling is enabled + */ + public boolean isEnableAutoScaling() { + return enableAutoScaling; + } + + /** + * Gets whether failover is enabled. + * + * @return {@code true} if failover is enabled + */ + public boolean isFailoverEnabled() { + return failoverEnabled; + } + + /** + * Gets the region where the processor is currently deployed. + * + * @return the active region + */ + public String getActiveRegion() { + return activeRegion; + } + + /** + * Gets the workspace's default region. + * + *

This may differ from {@link #getActiveRegion()} during or after a failover.

+ * + * @return the workspace default region + */ + public String getWorkspaceDefaultRegion() { + return workspaceDefaultRegion; + } + + /** + * Gets the time of the last state change. + * + * @return the last state change time, or {@code null} if not available + */ + @Nullable + public Date getLastStateChange() { + return lastStateChange; + } + + /** + * Gets the time the processor was last modified. + * + * @return the last modified time, or {@code null} if not available + */ + @Nullable + public Date getLastModifiedAt() { + return lastModifiedAt; + } + + /** + * Gets the identity of the user who last modified the processor. + * + * @return the modifier's identity + */ + public String getModifiedBy() { + return modifiedBy; + } + + /** + * Gets whether the processor has ever been started. + * + * @return {@code true} if the processor has been started at least once + */ + public boolean isHasStarted() { + return hasStarted; + } + + /** + * Gets the current error message. + * + *

Returns an empty string when no error has occurred.

+ * + * @return the error message + */ + public String getErrorMsg() { + return errorMsg; + } + + /** + * Gets whether the current error is retryable. + * + * @return {@code true} if the error is retryable + */ + public boolean isErrorRetryable() { + return errorRetryable; + } + + /** + * Gets the error code. + * + * @return the error code, or {@code null} if no error has occurred + */ + @Nullable + public Integer getErrorCode() { + return errorCode; + } + + @Override + public String toString() { + return "StreamProcessorInfo{" + + "id='" + id + '\'' + + ", tenantId='" + tenantId + '\'' + + ", name='" + name + '\'' + + ", state='" + state + '\'' + + ", pipelineVersion=" + pipelineVersion + + ", tier='" + tier + '\'' + + ", enableAutoScaling=" + enableAutoScaling + + ", failoverEnabled=" + failoverEnabled + + ", activeRegion='" + activeRegion + '\'' + + ", workspaceDefaultRegion='" + workspaceDefaultRegion + '\'' + + ", modifiedBy='" + modifiedBy + '\'' + + ", hasStarted=" + hasStarted + + ", errorMsg='" + errorMsg + '\'' + + ", errorRetryable=" + errorRetryable + + ", errorCode=" + errorCode + + '}'; + } + + /** + * A builder for {@link StreamProcessorInfo}. + */ + public static final class Builder { + @Nullable + private String id; + @Nullable + private String tenantId; + @Nullable + private String name; + @Nullable + private String state; + @Nullable + private List pipeline; + private int pipelineVersion; + @Nullable + private String tier; + @Nullable + private BsonDocument dlq; + @Nullable + private String streamMetaFieldName; + private boolean enableAutoScaling; + private boolean failoverEnabled; + @Nullable + private String activeRegion; + @Nullable + private String workspaceDefaultRegion; + @Nullable + private Date lastStateChange; + @Nullable + private Date lastModifiedAt; + @Nullable + private String modifiedBy; + private boolean hasStarted; + @Nullable + private String errorMsg; + private boolean errorRetryable; + @Nullable + private Integer errorCode; + + private Builder() { + } + + /** + * Sets the processor ID. + * + * @param id the processor ID + * @return this + */ + public Builder id(final String id) { + this.id = id; + return this; + } + + /** + * Sets the tenant ID. + * + * @param tenantId the tenant ID + * @return this + */ + public Builder tenantId(@Nullable final String tenantId) { + this.tenantId = tenantId; + return this; + } + + /** + * Sets the processor name. + * + * @param name the processor name + * @return this + */ + public Builder name(final String name) { + this.name = name; + return this; + } + + /** + * Sets the processor state. + * + * @param state the processor state + * @return this + */ + public Builder state(final String state) { + this.state = state; + return this; + } + + /** + * Sets the processor pipeline. + * + * @param pipeline the pipeline stage documents + * @return this + */ + public Builder pipeline(final List pipeline) { + this.pipeline = pipeline; + return this; + } + + /** + * Sets the pipeline version. + * + * @param pipelineVersion the pipeline version + * @return this + */ + public Builder pipelineVersion(final int pipelineVersion) { + this.pipelineVersion = pipelineVersion; + return this; + } + + /** + * Sets the compute tier. + * + * @param tier the compute tier + * @return this + */ + public Builder tier(@Nullable final String tier) { + this.tier = tier; + return this; + } + + /** + * Sets the dead letter queue configuration. + * + * @param dlq the DLQ configuration document + * @return this + */ + public Builder dlq(@Nullable final BsonDocument dlq) { + this.dlq = dlq; + return this; + } + + /** + * Sets the stream meta field name. + * + * @param streamMetaFieldName the stream meta field name + * @return this + */ + public Builder streamMetaFieldName(@Nullable final String streamMetaFieldName) { + this.streamMetaFieldName = streamMetaFieldName; + return this; + } + + /** + * Sets whether auto-scaling is enabled. + * + * @param enableAutoScaling {@code true} if auto-scaling is enabled + * @return this + */ + public Builder enableAutoScaling(final boolean enableAutoScaling) { + this.enableAutoScaling = enableAutoScaling; + return this; + } + + /** + * Sets whether failover is enabled. + * + * @param failoverEnabled {@code true} if failover is enabled + * @return this + */ + public Builder failoverEnabled(final boolean failoverEnabled) { + this.failoverEnabled = failoverEnabled; + return this; + } + + /** + * Sets the active region. + * + * @param activeRegion the active region + * @return this + */ + public Builder activeRegion(final String activeRegion) { + this.activeRegion = activeRegion; + return this; + } + + /** + * Sets the workspace default region. + * + * @param workspaceDefaultRegion the workspace default region + * @return this + */ + public Builder workspaceDefaultRegion(final String workspaceDefaultRegion) { + this.workspaceDefaultRegion = workspaceDefaultRegion; + return this; + } + + /** + * Sets the last state change time. + * + * @param lastStateChange the last state change time + * @return this + */ + public Builder lastStateChange(@Nullable final Date lastStateChange) { + this.lastStateChange = lastStateChange; + return this; + } + + /** + * Sets the last modified time. + * + * @param lastModifiedAt the last modified time + * @return this + */ + public Builder lastModifiedAt(@Nullable final Date lastModifiedAt) { + this.lastModifiedAt = lastModifiedAt; + return this; + } + + /** + * Sets the identity of the user who last modified the processor. + * + * @param modifiedBy the modifier's identity + * @return this + */ + public Builder modifiedBy(final String modifiedBy) { + this.modifiedBy = modifiedBy; + return this; + } + + /** + * Sets whether the processor has ever been started. + * + * @param hasStarted {@code true} if the processor has been started + * @return this + */ + public Builder hasStarted(final boolean hasStarted) { + this.hasStarted = hasStarted; + return this; + } + + /** + * Sets the error message. + * + * @param errorMsg the error message; empty string when no error + * @return this + */ + public Builder errorMsg(final String errorMsg) { + this.errorMsg = errorMsg; + return this; + } + + /** + * Sets whether the current error is retryable. + * + * @param errorRetryable {@code true} if the error is retryable + * @return this + */ + public Builder errorRetryable(final boolean errorRetryable) { + this.errorRetryable = errorRetryable; + return this; + } + + /** + * Sets the error code. + * + * @param errorCode the error code, or {@code null} if no error + * @return this + */ + public Builder errorCode(@Nullable final Integer errorCode) { + this.errorCode = errorCode; + return this; + } + + /** + * Builds a {@link StreamProcessorInfo} from the current state of this builder. + * + * @return the built instance + */ + public StreamProcessorInfo build() { + return new StreamProcessorInfo(this); + } + } +} diff --git a/driver-core/src/main/com/mongodb/internal/operation/CreateStreamProcessorOperation.java b/driver-core/src/main/com/mongodb/internal/operation/CreateStreamProcessorOperation.java new file mode 100644 index 0000000000..39d657c3c9 --- /dev/null +++ b/driver-core/src/main/com/mongodb/internal/operation/CreateStreamProcessorOperation.java @@ -0,0 +1,132 @@ +/* + * Copyright 2008-present MongoDB, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.mongodb.internal.operation; + +import com.mongodb.MongoNamespace; +import com.mongodb.internal.MongoNamespaceHelper; +import com.mongodb.internal.async.SingleResultCallback; +import com.mongodb.internal.binding.AsyncWriteBinding; +import com.mongodb.internal.binding.WriteBinding; +import com.mongodb.internal.connection.OperationContext; +import com.mongodb.lang.Nullable; +import org.bson.BsonArray; +import org.bson.BsonBoolean; +import org.bson.BsonDocument; +import org.bson.BsonString; + +import java.util.List; + +import static com.mongodb.assertions.Assertions.notNull; +import static com.mongodb.internal.async.ErrorHandlingResultCallback.errorHandlingCallback; +import static com.mongodb.internal.operation.AsyncOperationHelper.executeCommandAsync; +import static com.mongodb.internal.operation.AsyncOperationHelper.releasingCallback; +import static com.mongodb.internal.operation.AsyncOperationHelper.withAsyncConnection; +import static com.mongodb.internal.operation.AsyncOperationHelper.writeConcernErrorTransformerAsync; +import static com.mongodb.internal.operation.OperationHelper.LOGGER; +import static com.mongodb.internal.operation.SyncOperationHelper.executeCommand; +import static com.mongodb.internal.operation.SyncOperationHelper.withConnection; +import static com.mongodb.internal.operation.SyncOperationHelper.writeConcernErrorTransformer; + +/** + * Creates a new stream processor in the workspace. + * + *

This class is not part of the public API and may be removed or changed at any time

+ */ +public final class CreateStreamProcessorOperation implements WriteOperation { + private static final String COMMAND_NAME = "createStreamProcessor"; + private static final String DATABASE = "admin"; + + private final String processorName; + private final List pipeline; + @Nullable + private final BsonDocument dlq; + @Nullable + private final String streamMetaFieldName; + @Nullable + private final String tier; + @Nullable + private final Boolean failover; + + public CreateStreamProcessorOperation(final String processorName, final List pipeline, + @Nullable final BsonDocument dlq, @Nullable final String streamMetaFieldName, + @Nullable final String tier, @Nullable final Boolean failover) { + this.processorName = notNull("processorName", processorName); + this.pipeline = notNull("pipeline", pipeline); + this.dlq = dlq; + this.streamMetaFieldName = streamMetaFieldName; + this.tier = tier; + this.failover = failover; + } + + @Override + public String getCommandName() { + return COMMAND_NAME; + } + + @Override + public MongoNamespace getNamespace() { + return MongoNamespaceHelper.ADMIN_DB_COMMAND_NAMESPACE; + } + + @Override + public Void execute(final WriteBinding binding, final OperationContext operationContext) { + return withConnection(binding, operationContext, (connection, operationContextWithMinRtt) -> { + executeCommand(binding, operationContextWithMinRtt, DATABASE, getCommand(), connection, + writeConcernErrorTransformer(operationContextWithMinRtt.getTimeoutContext())); + return null; + }); + } + + @Override + public void executeAsync(final AsyncWriteBinding binding, final OperationContext operationContext, + final SingleResultCallback callback) { + withAsyncConnection(binding, operationContext, (connection, operationContextWithMinRtt, t) -> { + SingleResultCallback errHandlingCallback = errorHandlingCallback(callback, LOGGER); + if (t != null) { + errHandlingCallback.onResult(null, t); + } else { + executeCommandAsync(binding, operationContextWithMinRtt, DATABASE, getCommand(), connection, + writeConcernErrorTransformerAsync(operationContextWithMinRtt.getTimeoutContext()), + releasingCallback(errHandlingCallback, connection)); + } + }); + } + + private BsonDocument getCommand() { + BsonArray pipelineArray = new BsonArray(pipeline); + BsonDocument command = new BsonDocument(COMMAND_NAME, new BsonString(processorName)) + .append("pipeline", pipelineArray); + + BsonDocument options = new BsonDocument(); + if (dlq != null) { + options.append("dlq", dlq); + } + if (streamMetaFieldName != null) { + options.append("streamMetaFieldName", new BsonString(streamMetaFieldName)); + } + if (tier != null) { + options.append("tier", new BsonString(tier)); + } + if (failover != null) { + options.append("failover", BsonBoolean.valueOf(failover)); + } + if (!options.isEmpty()) { + command.append("options", options); + } + return command; + } +} diff --git a/driver-core/src/main/com/mongodb/internal/operation/DropStreamProcessorOperation.java b/driver-core/src/main/com/mongodb/internal/operation/DropStreamProcessorOperation.java new file mode 100644 index 0000000000..c3fbf418ea --- /dev/null +++ b/driver-core/src/main/com/mongodb/internal/operation/DropStreamProcessorOperation.java @@ -0,0 +1,91 @@ +/* + * Copyright 2008-present MongoDB, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.mongodb.internal.operation; + +import com.mongodb.MongoNamespace; +import com.mongodb.internal.MongoNamespaceHelper; +import com.mongodb.internal.async.SingleResultCallback; +import com.mongodb.internal.binding.AsyncWriteBinding; +import com.mongodb.internal.binding.WriteBinding; +import com.mongodb.internal.connection.OperationContext; +import org.bson.BsonDocument; +import org.bson.BsonString; + +import static com.mongodb.assertions.Assertions.notNull; +import static com.mongodb.internal.async.ErrorHandlingResultCallback.errorHandlingCallback; +import static com.mongodb.internal.operation.AsyncOperationHelper.executeCommandAsync; +import static com.mongodb.internal.operation.AsyncOperationHelper.releasingCallback; +import static com.mongodb.internal.operation.AsyncOperationHelper.withAsyncConnection; +import static com.mongodb.internal.operation.AsyncOperationHelper.writeConcernErrorTransformerAsync; +import static com.mongodb.internal.operation.OperationHelper.LOGGER; +import static com.mongodb.internal.operation.SyncOperationHelper.executeCommand; +import static com.mongodb.internal.operation.SyncOperationHelper.withConnection; +import static com.mongodb.internal.operation.SyncOperationHelper.writeConcernErrorTransformer; + +/** + * Permanently drops a stream processor. + * + *

This class is not part of the public API and may be removed or changed at any time

+ */ +public final class DropStreamProcessorOperation implements WriteOperation { + private static final String COMMAND_NAME = "dropStreamProcessor"; + private static final String DATABASE = "admin"; + + private final String processorName; + + public DropStreamProcessorOperation(final String processorName) { + this.processorName = notNull("processorName", processorName); + } + + @Override + public String getCommandName() { + return COMMAND_NAME; + } + + @Override + public MongoNamespace getNamespace() { + return MongoNamespaceHelper.ADMIN_DB_COMMAND_NAMESPACE; + } + + @Override + public Void execute(final WriteBinding binding, final OperationContext operationContext) { + return withConnection(binding, operationContext, (connection, operationContextWithMinRtt) -> { + executeCommand(binding, operationContextWithMinRtt, DATABASE, getCommand(), connection, + writeConcernErrorTransformer(operationContextWithMinRtt.getTimeoutContext())); + return null; + }); + } + + @Override + public void executeAsync(final AsyncWriteBinding binding, final OperationContext operationContext, + final SingleResultCallback callback) { + withAsyncConnection(binding, operationContext, (connection, operationContextWithMinRtt, t) -> { + SingleResultCallback errHandlingCallback = errorHandlingCallback(callback, LOGGER); + if (t != null) { + errHandlingCallback.onResult(null, t); + } else { + executeCommandAsync(binding, operationContextWithMinRtt, DATABASE, getCommand(), connection, + writeConcernErrorTransformerAsync(operationContextWithMinRtt.getTimeoutContext()), + releasingCallback(errHandlingCallback, connection)); + } + }); + } + + private BsonDocument getCommand() { + return new BsonDocument(COMMAND_NAME, new BsonString(processorName)); + } +} diff --git a/driver-core/src/main/com/mongodb/internal/operation/GetMoreSampleStreamProcessorOperation.java b/driver-core/src/main/com/mongodb/internal/operation/GetMoreSampleStreamProcessorOperation.java new file mode 100644 index 0000000000..308d6fc956 --- /dev/null +++ b/driver-core/src/main/com/mongodb/internal/operation/GetMoreSampleStreamProcessorOperation.java @@ -0,0 +1,120 @@ +/* + * Copyright 2008-present MongoDB, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.mongodb.internal.operation; + +import com.mongodb.MongoNamespace; +import com.mongodb.client.model.GetStreamProcessorSamplesResult; +import com.mongodb.internal.MongoNamespaceHelper; +import com.mongodb.internal.async.SingleResultCallback; +import com.mongodb.internal.binding.AsyncWriteBinding; +import com.mongodb.internal.binding.WriteBinding; +import com.mongodb.internal.connection.OperationContext; +import com.mongodb.lang.Nullable; +import org.bson.BsonArray; +import org.bson.BsonDocument; +import org.bson.BsonInt32; +import org.bson.BsonInt64; +import org.bson.BsonString; +import org.bson.BsonValue; +import org.bson.codecs.BsonDocumentCodec; + +import java.util.ArrayList; +import java.util.List; + +import static com.mongodb.assertions.Assertions.notNull; +import static com.mongodb.internal.async.ErrorHandlingResultCallback.errorHandlingCallback; +import static com.mongodb.internal.operation.AsyncOperationHelper.executeCommandAsync; +import static com.mongodb.internal.operation.AsyncOperationHelper.releasingCallback; +import static com.mongodb.internal.operation.AsyncOperationHelper.withAsyncConnection; +import static com.mongodb.internal.operation.OperationHelper.LOGGER; +import static com.mongodb.internal.operation.SyncOperationHelper.executeCommand; +import static com.mongodb.internal.operation.SyncOperationHelper.withConnection; + +/** + * Fetches the next batch from an open sample cursor. + * A {@code cursorId} of {@code 0} in the response indicates the cursor is exhausted. + * + *

This class is not part of the public API and may be removed or changed at any time

+ */ +public final class GetMoreSampleStreamProcessorOperation implements WriteOperation { + private static final String COMMAND_NAME = "getMoreSampleStreamProcessor"; + private static final String DATABASE = "admin"; + private static final BsonDocumentCodec DECODER = new BsonDocumentCodec(); + + private final String processorName; + private final long cursorId; + @Nullable + private final Integer batchSize; + + public GetMoreSampleStreamProcessorOperation(final String processorName, final long cursorId, + @Nullable final Integer batchSize) { + this.processorName = notNull("processorName", processorName); + this.cursorId = cursorId; + this.batchSize = batchSize; + } + + @Override + public String getCommandName() { + return COMMAND_NAME; + } + + @Override + public MongoNamespace getNamespace() { + return MongoNamespaceHelper.ADMIN_DB_COMMAND_NAMESPACE; + } + + @Override + public GetStreamProcessorSamplesResult execute(final WriteBinding binding, final OperationContext operationContext) { + return withConnection(binding, operationContext, (connection, operationContextWithMinRtt) -> + executeCommand(binding, operationContextWithMinRtt, DATABASE, getCommand(), DECODER, + (result, connection1) -> toResult(result))); + } + + @Override + public void executeAsync(final AsyncWriteBinding binding, final OperationContext operationContext, + final SingleResultCallback callback) { + withAsyncConnection(binding, operationContext, (connection, operationContextWithMinRtt, t) -> { + SingleResultCallback errHandlingCallback = errorHandlingCallback(callback, LOGGER); + if (t != null) { + errHandlingCallback.onResult(null, t); + } else { + executeCommandAsync(binding, operationContextWithMinRtt, DATABASE, getCommand(), connection, + (result, connection1) -> toResult(result), + releasingCallback(errHandlingCallback, connection)); + } + }); + } + + private BsonDocument getCommand() { + BsonDocument command = new BsonDocument(COMMAND_NAME, new BsonString(processorName)) + .append("cursorId", new BsonInt64(cursorId)); + if (batchSize != null) { + command.append("batchSize", new BsonInt32(batchSize)); + } + return command; + } + + private static GetStreamProcessorSamplesResult toResult(final BsonDocument result) { + long responseCursorId = result.getInt64("cursorId").getValue(); + BsonArray messages = result.getArray("messages", new BsonArray()); + List documents = new ArrayList<>(messages.size()); + for (BsonValue value : messages) { + documents.add(value.asDocument()); + } + return new GetStreamProcessorSamplesResult(responseCursorId, documents); + } +} diff --git a/driver-core/src/main/com/mongodb/internal/operation/GetStreamProcessorOperation.java b/driver-core/src/main/com/mongodb/internal/operation/GetStreamProcessorOperation.java new file mode 100644 index 0000000000..47c25c5022 --- /dev/null +++ b/driver-core/src/main/com/mongodb/internal/operation/GetStreamProcessorOperation.java @@ -0,0 +1,164 @@ +/* + * Copyright 2008-present MongoDB, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.mongodb.internal.operation; + +import com.mongodb.MongoNamespace; +import com.mongodb.client.model.StreamProcessorInfo; +import com.mongodb.internal.MongoNamespaceHelper; +import com.mongodb.internal.async.SingleResultCallback; +import com.mongodb.internal.binding.AsyncReadBinding; +import com.mongodb.internal.binding.ReadBinding; +import com.mongodb.internal.connection.OperationContext; +import org.bson.BsonArray; +import org.bson.BsonDocument; +import org.bson.BsonString; +import org.bson.BsonValue; +import org.bson.codecs.BsonDocumentCodec; +import org.bson.codecs.Decoder; + +import java.util.ArrayList; +import java.util.Date; +import java.util.List; + +import static com.mongodb.assertions.Assertions.notNull; +import static com.mongodb.internal.operation.AsyncOperationHelper.executeRetryableReadAsync; +import static com.mongodb.internal.operation.CommandOperationHelper.CommandCreator; +import static com.mongodb.internal.operation.SyncOperationHelper.executeRetryableRead; + +/** + * Returns information about a single stream processor. This is a retryable read. + * + *

This class is not part of the public API and may be removed or changed at any time

+ */ +public final class GetStreamProcessorOperation implements ReadOperationSimple { + private static final String COMMAND_NAME = "getStreamProcessor"; + private static final String DATABASE = "admin"; + private static final Decoder DECODER = new BsonDocumentCodec(); + + private final String processorName; + private final boolean retryReads; + + public GetStreamProcessorOperation(final String processorName, final boolean retryReads) { + this.processorName = notNull("processorName", processorName); + this.retryReads = retryReads; + } + + @Override + public String getCommandName() { + return COMMAND_NAME; + } + + @Override + public MongoNamespace getNamespace() { + return MongoNamespaceHelper.ADMIN_DB_COMMAND_NAMESPACE; + } + + @Override + public StreamProcessorInfo execute(final ReadBinding binding, final OperationContext operationContext) { + return executeRetryableRead(binding, operationContext, DATABASE, getCommandCreator(), DECODER, transformer(), retryReads); + } + + @Override + public void executeAsync(final AsyncReadBinding binding, final OperationContext operationContext, + final SingleResultCallback callback) { + executeRetryableReadAsync(binding, operationContext, DATABASE, getCommandCreator(), DECODER, asyncTransformer(), retryReads, callback); + } + + private SyncOperationHelper.CommandReadTransformer transformer() { + return (result, source, connection, operationContext) -> toStreamProcessorInfo(result); + } + + private AsyncOperationHelper.CommandReadTransformerAsync asyncTransformer() { + return (result, source, connection, operationContext) -> toStreamProcessorInfo(result); + } + + private CommandCreator getCommandCreator() { + return (operationContext, serverDescription, connectionDescription) -> + new BsonDocument(COMMAND_NAME, new BsonString(processorName)); + } + + private static StreamProcessorInfo toStreamProcessorInfo(final BsonDocument response) { + try { + // The server wraps the processor data in a "result" subdocument + BsonDocument doc = response.containsKey("result") ? response.getDocument("result") : response; + + String id; + if (doc.containsKey("_id")) { + BsonValue rawId = doc.get("_id"); + id = rawId.isString() ? rawId.asString().getValue() : rawId.asObjectId().getValue().toHexString(); + } else if (doc.containsKey("id")) { + id = doc.getString("id").getValue(); + } else { + id = ""; + } + + StreamProcessorInfo.Builder builder = StreamProcessorInfo.builder() + .id(id) + .name(doc.getString("name").getValue()) + .state(doc.getString("state").getValue()) + .pipeline(toBsonDocumentList(doc.getArray("pipeline", new BsonArray()))) + .pipelineVersion(doc.containsKey("pipelineVersion") ? doc.getInt32("pipelineVersion").getValue() : 0) + .enableAutoScaling(doc.containsKey("enableAutoScaling") && doc.getBoolean("enableAutoScaling").getValue()) + .failoverEnabled(doc.containsKey("failoverEnabled") && doc.getBoolean("failoverEnabled").getValue()) + .hasStarted(doc.containsKey("hasStarted") && doc.getBoolean("hasStarted").getValue()) + .errorRetryable(doc.containsKey("errorRetryable") && doc.getBoolean("errorRetryable").getValue()); + + if (doc.containsKey("tenantID") && !doc.isNull("tenantID")) { + builder.tenantId(doc.getString("tenantID").getValue()); + } + builder.activeRegion(doc.containsKey("activeRegion") && !doc.isNull("activeRegion") + ? doc.getString("activeRegion").getValue() : ""); + builder.workspaceDefaultRegion(doc.containsKey("workspaceDefaultRegion") && !doc.isNull("workspaceDefaultRegion") + ? doc.getString("workspaceDefaultRegion").getValue() : ""); + builder.modifiedBy(doc.containsKey("modifiedBy") && !doc.isNull("modifiedBy") + ? doc.getString("modifiedBy").getValue() : ""); + builder.errorMsg(doc.containsKey("errorMsg") && !doc.isNull("errorMsg") + ? doc.getString("errorMsg").getValue() : ""); + if (doc.containsKey("tier") && !doc.isNull("tier")) { + builder.tier(doc.getString("tier").getValue()); + } + if (doc.containsKey("dlq") && doc.isDocument("dlq")) { + builder.dlq(doc.getDocument("dlq")); + } + if (doc.containsKey("streamMetaFieldName") && !doc.isNull("streamMetaFieldName")) { + builder.streamMetaFieldName(doc.getString("streamMetaFieldName").getValue()); + } + if (doc.containsKey("lastStateChange") && doc.isDateTime("lastStateChange")) { + builder.lastStateChange(new Date(doc.getDateTime("lastStateChange").getValue())); + } + if (doc.containsKey("lastModifiedAt") && doc.isDateTime("lastModifiedAt")) { + builder.lastModifiedAt(new Date(doc.getDateTime("lastModifiedAt").getValue())); + } + if (doc.containsKey("errorCode") && doc.isInt32("errorCode")) { + builder.errorCode(doc.getInt32("errorCode").getValue()); + } + + return builder.build(); + } catch (Exception e) { + throw new IllegalStateException("Failed to parse getStreamProcessor response. Raw document: " + response.toJson(), e); + } + } + + private static List toBsonDocumentList(final BsonArray array) { + List list = new ArrayList<>(array.size()); + for (BsonValue value : array) { + list.add(value.asDocument()); + } + return list; + } + +} diff --git a/driver-core/src/main/com/mongodb/internal/operation/GetStreamProcessorStatsOperation.java b/driver-core/src/main/com/mongodb/internal/operation/GetStreamProcessorStatsOperation.java new file mode 100644 index 0000000000..06b20d970e --- /dev/null +++ b/driver-core/src/main/com/mongodb/internal/operation/GetStreamProcessorStatsOperation.java @@ -0,0 +1,92 @@ +/* + * Copyright 2008-present MongoDB, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.mongodb.internal.operation; + +import com.mongodb.MongoNamespace; +import com.mongodb.internal.MongoNamespaceHelper; +import com.mongodb.internal.async.SingleResultCallback; +import com.mongodb.internal.binding.AsyncReadBinding; +import com.mongodb.internal.binding.ReadBinding; +import com.mongodb.internal.connection.OperationContext; +import com.mongodb.lang.Nullable; +import org.bson.BsonBoolean; +import org.bson.BsonDocument; +import org.bson.BsonString; +import org.bson.Document; +import org.bson.codecs.DocumentCodec; + +import static com.mongodb.assertions.Assertions.notNull; +import static com.mongodb.internal.operation.AsyncOperationHelper.executeRetryableReadAsync; +import static com.mongodb.internal.operation.CommandOperationHelper.CommandCreator; +import static com.mongodb.internal.operation.SyncOperationHelper.executeRetryableRead; + +/** + * Returns runtime statistics for a stream processor. This is a retryable read. + * Returns an error from the server if the processor is not in the {@code STARTED} state. + * + *

This class is not part of the public API and may be removed or changed at any time

+ */ +public final class GetStreamProcessorStatsOperation implements ReadOperationSimple { + private static final String COMMAND_NAME = "getStreamProcessorStats"; + private static final String DATABASE = "admin"; + private static final DocumentCodec DECODER = new DocumentCodec(); + + private final String processorName; + private final boolean retryReads; + @Nullable + private final Boolean verbose; + + public GetStreamProcessorStatsOperation(final String processorName, final boolean retryReads, + @Nullable final Boolean verbose) { + this.processorName = notNull("processorName", processorName); + this.retryReads = retryReads; + this.verbose = verbose; + } + + @Override + public String getCommandName() { + return COMMAND_NAME; + } + + @Override + public MongoNamespace getNamespace() { + return MongoNamespaceHelper.ADMIN_DB_COMMAND_NAMESPACE; + } + + @Override + public Document execute(final ReadBinding binding, final OperationContext operationContext) { + return executeRetryableRead(binding, operationContext, DATABASE, getCommandCreator(), DECODER, + (result, source, connection, ctx) -> result, retryReads); + } + + @Override + public void executeAsync(final AsyncReadBinding binding, final OperationContext operationContext, + final SingleResultCallback callback) { + executeRetryableReadAsync(binding, operationContext, DATABASE, getCommandCreator(), DECODER, + (result, source, connection, ctx) -> result, retryReads, callback); + } + + private CommandCreator getCommandCreator() { + return (operationContext, serverDescription, connectionDescription) -> { + BsonDocument command = new BsonDocument(COMMAND_NAME, new BsonString(processorName)); + if (verbose != null) { + command.append("options", new BsonDocument("verbose", BsonBoolean.valueOf(verbose))); + } + return command; + }; + } +} diff --git a/driver-core/src/main/com/mongodb/internal/operation/StartSampleStreamProcessorOperation.java b/driver-core/src/main/com/mongodb/internal/operation/StartSampleStreamProcessorOperation.java new file mode 100644 index 0000000000..63a787d72c --- /dev/null +++ b/driver-core/src/main/com/mongodb/internal/operation/StartSampleStreamProcessorOperation.java @@ -0,0 +1,99 @@ +/* + * Copyright 2008-present MongoDB, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.mongodb.internal.operation; + +import com.mongodb.MongoNamespace; +import com.mongodb.internal.MongoNamespaceHelper; +import com.mongodb.internal.async.SingleResultCallback; +import com.mongodb.internal.binding.AsyncWriteBinding; +import com.mongodb.internal.binding.WriteBinding; +import com.mongodb.internal.connection.OperationContext; +import com.mongodb.lang.Nullable; +import org.bson.BsonDocument; +import org.bson.BsonInt32; +import org.bson.BsonString; +import org.bson.codecs.BsonDocumentCodec; + +import static com.mongodb.assertions.Assertions.notNull; +import static com.mongodb.internal.async.ErrorHandlingResultCallback.errorHandlingCallback; +import static com.mongodb.internal.operation.AsyncOperationHelper.executeCommandAsync; +import static com.mongodb.internal.operation.AsyncOperationHelper.releasingCallback; +import static com.mongodb.internal.operation.AsyncOperationHelper.withAsyncConnection; +import static com.mongodb.internal.operation.OperationHelper.LOGGER; +import static com.mongodb.internal.operation.SyncOperationHelper.executeCommand; +import static com.mongodb.internal.operation.SyncOperationHelper.withConnection; + +/** + * Opens a sample cursor on a running stream processor and returns the cursor ID. + * The first batch of documents is fetched via {@link GetMoreSampleStreamProcessorOperation}. + * + *

This class is not part of the public API and may be removed or changed at any time

+ */ +public final class StartSampleStreamProcessorOperation implements WriteOperation { + private static final String COMMAND_NAME = "startSampleStreamProcessor"; + private static final String DATABASE = "admin"; + private static final BsonDocumentCodec DECODER = new BsonDocumentCodec(); + + private final String processorName; + @Nullable + private final Integer limit; + + public StartSampleStreamProcessorOperation(final String processorName, @Nullable final Integer limit) { + this.processorName = notNull("processorName", processorName); + this.limit = limit; + } + + @Override + public String getCommandName() { + return COMMAND_NAME; + } + + @Override + public MongoNamespace getNamespace() { + return MongoNamespaceHelper.ADMIN_DB_COMMAND_NAMESPACE; + } + + @Override + public Long execute(final WriteBinding binding, final OperationContext operationContext) { + return withConnection(binding, operationContext, (connection, operationContextWithMinRtt) -> + executeCommand(binding, operationContextWithMinRtt, DATABASE, getCommand(), DECODER, + (result, connection1) -> result.getInt64("cursorId").getValue())); + } + + @Override + public void executeAsync(final AsyncWriteBinding binding, final OperationContext operationContext, + final SingleResultCallback callback) { + withAsyncConnection(binding, operationContext, (connection, operationContextWithMinRtt, t) -> { + SingleResultCallback errHandlingCallback = errorHandlingCallback(callback, LOGGER); + if (t != null) { + errHandlingCallback.onResult(null, t); + } else { + executeCommandAsync(binding, operationContextWithMinRtt, DATABASE, getCommand(), connection, + (result, connection1) -> result.getInt64("cursorId").getValue(), + releasingCallback(errHandlingCallback, connection)); + } + }); + } + + private BsonDocument getCommand() { + BsonDocument command = new BsonDocument(COMMAND_NAME, new BsonString(processorName)); + if (limit != null) { + command.append("limit", new BsonInt32(limit)); + } + return command; + } +} diff --git a/driver-core/src/main/com/mongodb/internal/operation/StartStreamProcessorOperation.java b/driver-core/src/main/com/mongodb/internal/operation/StartStreamProcessorOperation.java new file mode 100644 index 0000000000..498af9eb36 --- /dev/null +++ b/driver-core/src/main/com/mongodb/internal/operation/StartStreamProcessorOperation.java @@ -0,0 +1,164 @@ +/* + * Copyright 2008-present MongoDB, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.mongodb.internal.operation; + +import com.mongodb.MongoNamespace; +import com.mongodb.internal.MongoNamespaceHelper; +import com.mongodb.internal.async.SingleResultCallback; +import com.mongodb.internal.binding.AsyncWriteBinding; +import com.mongodb.internal.binding.WriteBinding; +import com.mongodb.internal.connection.OperationContext; +import com.mongodb.lang.Nullable; +import org.bson.BsonBoolean; +import org.bson.BsonDocument; +import org.bson.BsonInt32; +import org.bson.BsonString; +import org.bson.BsonTimestamp; + +import static com.mongodb.assertions.Assertions.notNull; +import static com.mongodb.internal.async.ErrorHandlingResultCallback.errorHandlingCallback; +import static com.mongodb.internal.operation.AsyncOperationHelper.executeCommandAsync; +import static com.mongodb.internal.operation.AsyncOperationHelper.releasingCallback; +import static com.mongodb.internal.operation.AsyncOperationHelper.withAsyncConnection; +import static com.mongodb.internal.operation.AsyncOperationHelper.writeConcernErrorTransformerAsync; +import static com.mongodb.internal.operation.OperationHelper.LOGGER; +import static com.mongodb.internal.operation.SyncOperationHelper.executeCommand; +import static com.mongodb.internal.operation.SyncOperationHelper.withConnection; +import static com.mongodb.internal.operation.SyncOperationHelper.writeConcernErrorTransformer; + +/** + * Starts a stream processor. The processor must be in the {@code STOPPED} or {@code FAILED} state. + * + *

NOTE: {@code startAfter} is reserved for future use and is intentionally never serialized to the wire.

+ * + *

This class is not part of the public API and may be removed or changed at any time

+ */ +public final class StartStreamProcessorOperation implements WriteOperation { + private static final String COMMAND_NAME = "startStreamProcessor"; + private static final String DATABASE = "admin"; + + private final String processorName; + @Nullable + private final Integer workers; + @Nullable + private final Boolean clearCheckpoints; + @Nullable + private final BsonTimestamp startAtOperationTime; + @Nullable + private final String tier; + @Nullable + private final Boolean enableAutoScaling; + @Nullable + private final String failoverRegion; + @Nullable + private final String failoverMode; + @Nullable + private final Boolean failoverDryRun; + + public StartStreamProcessorOperation(final String processorName, + @Nullable final Integer workers, + @Nullable final Boolean clearCheckpoints, + @Nullable final BsonTimestamp startAtOperationTime, + @Nullable final String tier, + @Nullable final Boolean enableAutoScaling, + @Nullable final String failoverRegion, + @Nullable final String failoverMode, + @Nullable final Boolean failoverDryRun) { + this.processorName = notNull("processorName", processorName); + this.workers = workers; + this.clearCheckpoints = clearCheckpoints; + this.startAtOperationTime = startAtOperationTime; + this.tier = tier; + this.enableAutoScaling = enableAutoScaling; + this.failoverRegion = failoverRegion; + this.failoverMode = failoverMode; + this.failoverDryRun = failoverDryRun; + } + + @Override + public String getCommandName() { + return COMMAND_NAME; + } + + @Override + public MongoNamespace getNamespace() { + return MongoNamespaceHelper.ADMIN_DB_COMMAND_NAMESPACE; + } + + @Override + public Void execute(final WriteBinding binding, final OperationContext operationContext) { + return withConnection(binding, operationContext, (connection, operationContextWithMinRtt) -> { + executeCommand(binding, operationContextWithMinRtt, DATABASE, getCommand(), connection, + writeConcernErrorTransformer(operationContextWithMinRtt.getTimeoutContext())); + return null; + }); + } + + @Override + public void executeAsync(final AsyncWriteBinding binding, final OperationContext operationContext, + final SingleResultCallback callback) { + withAsyncConnection(binding, operationContext, (connection, operationContextWithMinRtt, t) -> { + SingleResultCallback errHandlingCallback = errorHandlingCallback(callback, LOGGER); + if (t != null) { + errHandlingCallback.onResult(null, t); + } else { + executeCommandAsync(binding, operationContextWithMinRtt, DATABASE, getCommand(), connection, + writeConcernErrorTransformerAsync(operationContextWithMinRtt.getTimeoutContext()), + releasingCallback(errHandlingCallback, connection)); + } + }); + } + + private BsonDocument getCommand() { + BsonDocument command = new BsonDocument(COMMAND_NAME, new BsonString(processorName)); + + if (workers != null) { + command.append("workers", new BsonInt32(workers)); + } + + BsonDocument options = new BsonDocument(); + if (clearCheckpoints != null) { + options.append("clearCheckpoints", BsonBoolean.valueOf(clearCheckpoints)); + } + if (startAtOperationTime != null) { + options.append("startAtOperationTime", startAtOperationTime); + } + // NOTE: startAfter is RESERVED and MUST NOT be sent to the server. + if (tier != null) { + options.append("tier", new BsonString(tier)); + } + if (enableAutoScaling != null) { + options.append("enableAutoScaling", BsonBoolean.valueOf(enableAutoScaling)); + } + if (!options.isEmpty()) { + command.append("options", options); + } + + if (failoverRegion != null) { + BsonDocument failover = new BsonDocument("region", new BsonString(failoverRegion)); + if (failoverMode != null) { + failover.append("mode", new BsonString(failoverMode)); + } + if (failoverDryRun != null) { + failover.append("dryRun", BsonBoolean.valueOf(failoverDryRun)); + } + command.append("failover", failover); + } + + return command; + } +} diff --git a/driver-core/src/main/com/mongodb/internal/operation/StopStreamProcessorOperation.java b/driver-core/src/main/com/mongodb/internal/operation/StopStreamProcessorOperation.java new file mode 100644 index 0000000000..7299c6d0b6 --- /dev/null +++ b/driver-core/src/main/com/mongodb/internal/operation/StopStreamProcessorOperation.java @@ -0,0 +1,91 @@ +/* + * Copyright 2008-present MongoDB, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.mongodb.internal.operation; + +import com.mongodb.MongoNamespace; +import com.mongodb.internal.MongoNamespaceHelper; +import com.mongodb.internal.async.SingleResultCallback; +import com.mongodb.internal.binding.AsyncWriteBinding; +import com.mongodb.internal.binding.WriteBinding; +import com.mongodb.internal.connection.OperationContext; +import org.bson.BsonDocument; +import org.bson.BsonString; + +import static com.mongodb.assertions.Assertions.notNull; +import static com.mongodb.internal.async.ErrorHandlingResultCallback.errorHandlingCallback; +import static com.mongodb.internal.operation.AsyncOperationHelper.executeCommandAsync; +import static com.mongodb.internal.operation.AsyncOperationHelper.releasingCallback; +import static com.mongodb.internal.operation.AsyncOperationHelper.withAsyncConnection; +import static com.mongodb.internal.operation.AsyncOperationHelper.writeConcernErrorTransformerAsync; +import static com.mongodb.internal.operation.OperationHelper.LOGGER; +import static com.mongodb.internal.operation.SyncOperationHelper.executeCommand; +import static com.mongodb.internal.operation.SyncOperationHelper.withConnection; +import static com.mongodb.internal.operation.SyncOperationHelper.writeConcernErrorTransformer; + +/** + * Stops a running stream processor. + * + *

This class is not part of the public API and may be removed or changed at any time

+ */ +public final class StopStreamProcessorOperation implements WriteOperation { + private static final String COMMAND_NAME = "stopStreamProcessor"; + private static final String DATABASE = "admin"; + + private final String processorName; + + public StopStreamProcessorOperation(final String processorName) { + this.processorName = notNull("processorName", processorName); + } + + @Override + public String getCommandName() { + return COMMAND_NAME; + } + + @Override + public MongoNamespace getNamespace() { + return MongoNamespaceHelper.ADMIN_DB_COMMAND_NAMESPACE; + } + + @Override + public Void execute(final WriteBinding binding, final OperationContext operationContext) { + return withConnection(binding, operationContext, (connection, operationContextWithMinRtt) -> { + executeCommand(binding, operationContextWithMinRtt, DATABASE, getCommand(), connection, + writeConcernErrorTransformer(operationContextWithMinRtt.getTimeoutContext())); + return null; + }); + } + + @Override + public void executeAsync(final AsyncWriteBinding binding, final OperationContext operationContext, + final SingleResultCallback callback) { + withAsyncConnection(binding, operationContext, (connection, operationContextWithMinRtt, t) -> { + SingleResultCallback errHandlingCallback = errorHandlingCallback(callback, LOGGER); + if (t != null) { + errHandlingCallback.onResult(null, t); + } else { + executeCommandAsync(binding, operationContextWithMinRtt, DATABASE, getCommand(), connection, + writeConcernErrorTransformerAsync(operationContextWithMinRtt.getTimeoutContext()), + releasingCallback(errHandlingCallback, connection)); + } + }); + } + + private BsonDocument getCommand() { + return new BsonDocument(COMMAND_NAME, new BsonString(processorName)); + } +} diff --git a/driver-core/src/test/unit/com/mongodb/ConnectionStringUnitTest.java b/driver-core/src/test/unit/com/mongodb/ConnectionStringUnitTest.java index 0b3dd1a081..47918cdc30 100644 --- a/driver-core/src/test/unit/com/mongodb/ConnectionStringUnitTest.java +++ b/driver-core/src/test/unit/com/mongodb/ConnectionStringUnitTest.java @@ -31,6 +31,7 @@ import static org.junit.jupiter.api.Assertions.assertNotEquals; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; final class ConnectionStringUnitTest { private static final String DEFAULT_OPTIONS = "mongodb://localhost/?"; @@ -95,6 +96,77 @@ void serverMonitoringMode() { } + // ---- Atlas Stream Processing workspace detection ---- + + private static final String WORKSPACE_HOST = + "atlas-stream-68f93575a1b17c4d20fb60cb-y7ufzk.virginia-usa.a.query.mongodb-qa.net"; + private static final String WORKSPACE_URI = "mongodb://user:pass@" + WORKSPACE_HOST + "/"; + + @ParameterizedTest + @ValueSource(strings = { + "mongodb://user:pass@atlas-stream-68f93575a1b17c4d20fb60cb-y7ufzk.virginia-usa.a.query.mongodb-qa.net/", + "mongodb://user:pass@atlas-stream-aabbcc-xyz.us-east-1.a.query.mongodb.net/", + "mongodb://user:pass@atlas-stream-aabbcc-xyz.eu-west-1.a.query.mongodb-dev.net/", + "mongodb://user:pass@ATLAS-STREAM-AABBCC-XYZ.us-east-1.a.query.mongodb.net/" + }) + void workspaceHostIsDetected(final String uri) { + ConnectionString cs = new ConnectionString(uri); + assertTrue(cs.isAtlasStreamProcessingWorkspace(), "Expected workspace detection for: " + uri); + } + + @ParameterizedTest + @ValueSource(strings = { + "mongodb://localhost/", + "mongodb://cluster0.example.mongodb.net/", + "mongodb://user:pass@cluster0.abcde.mongodb.net/", + "mongodb+srv://user:pass@atlas-stream-aabbcc-xyz.us-east-1.a.query.mongodb.net/" + }) + void nonWorkspaceHostIsNotDetected(final String uri) { + ConnectionString cs = new ConnectionString(uri); + assertFalse(cs.isAtlasStreamProcessingWorkspace(), "Did not expect workspace detection for: " + uri); + } + + @Test + void workspaceAutoEnablesTls() { + ConnectionString cs = new ConnectionString(WORKSPACE_URI); + assertEquals(Boolean.TRUE, cs.getSslEnabled()); + } + + @Test + void workspaceExplicitTlsFalseIsPreserved() { + ConnectionString cs = new ConnectionString(WORKSPACE_URI + "?tls=false"); + assertEquals(Boolean.FALSE, cs.getSslEnabled()); + } + + @Test + void workspaceAutoEnablesDirectConnection() { + ConnectionString cs = new ConnectionString(WORKSPACE_URI); + assertEquals(Boolean.TRUE, cs.isDirectConnection()); + } + + @Test + void workspaceDefaultsAuthSourceToAdmin() { + ConnectionString cs = new ConnectionString(WORKSPACE_URI); + MongoCredential credential = Assertions.assertNotNull(cs.getCredential()); + assertEquals("admin", credential.getSource()); + } + + @Test + void workspaceExplicitAuthSourceIsPreserved() { + ConnectionString cs = new ConnectionString(WORKSPACE_URI + "?authSource=admin"); + MongoCredential credential = Assertions.assertNotNull(cs.getCredential()); + assertEquals("admin", credential.getSource()); + } + + @Test + void workspaceIsNotDetectedForSrvProtocol() { + // mongodb+srv:// with a workspace-like host must not be treated as a workspace + ConnectionString cs = new ConnectionString( + "mongodb+srv://user:pass@atlas-stream-aabbcc-xyz.us-east-1.a.query.mongodb.net/"); + assertFalse(cs.isAtlasStreamProcessingWorkspace()); + assertTrue(cs.isSrvProtocol()); + } + @ParameterizedTest @ValueSource(strings = {"mongodb://foo:bar/@hostname/java?", "mongodb://foo:bar?@hostname/java/", "mongodb+srv://foo:bar/@hostname/java?", "mongodb+srv://foo:bar?@hostname/java/", diff --git a/driver-core/src/test/unit/com/mongodb/internal/operation/StreamProcessorOperationsTest.java b/driver-core/src/test/unit/com/mongodb/internal/operation/StreamProcessorOperationsTest.java new file mode 100644 index 0000000000..e4e7547d8d --- /dev/null +++ b/driver-core/src/test/unit/com/mongodb/internal/operation/StreamProcessorOperationsTest.java @@ -0,0 +1,304 @@ +/* + * Copyright 2008-present MongoDB, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.mongodb.internal.operation; + +import com.mongodb.ReadPreference; +import com.mongodb.ServerAddress; +import com.mongodb.connection.ClusterId; +import com.mongodb.connection.ConnectionDescription; +import com.mongodb.connection.ServerConnectionState; +import com.mongodb.connection.ServerDescription; +import com.mongodb.connection.ServerId; +import com.mongodb.connection.ServerType; +import com.mongodb.internal.binding.ConnectionSource; +import com.mongodb.internal.binding.ReadBinding; +import com.mongodb.internal.binding.WriteBinding; +import com.mongodb.internal.connection.Connection; +import com.mongodb.internal.connection.OperationContext; +import org.bson.BsonArray; +import org.bson.BsonBoolean; +import org.bson.BsonDocument; +import org.bson.BsonInt32; +import org.bson.BsonInt64; +import org.bson.BsonString; +import org.bson.BsonTimestamp; +import org.bson.Document; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.ArgumentCaptor; + +import java.util.Collections; + +import static com.mongodb.ClusterFixture.OPERATION_CONTEXT; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.mockito.ArgumentCaptor.forClass; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +class StreamProcessorOperationsTest { + private static final String PROCESSOR_NAME = "myProcessor"; + private static final BsonDocument OK_RESPONSE = new BsonDocument("ok", new BsonInt32(1)); + + private static final ServerAddress SERVER_ADDRESS = new ServerAddress(); + private static final ConnectionDescription CONNECTION_DESCRIPTION = + new ConnectionDescription(new ServerId(new ClusterId(), SERVER_ADDRESS)); + private static final ServerDescription SERVER_DESCRIPTION = ServerDescription.builder() + .address(SERVER_ADDRESS) + .type(ServerType.STANDALONE) + .state(ServerConnectionState.CONNECTED) + .build(); + + private Connection writeConnection; + private WriteBinding writeBinding; + private Connection readConnection; + private ReadBinding readBinding; + + @BeforeEach + void setUp() { + writeConnection = mock(Connection.class); + when(writeConnection.release()).thenReturn(1); + when(writeConnection.getDescription()).thenReturn(CONNECTION_DESCRIPTION); + doReturn(OK_RESPONSE).when(writeConnection).command(anyString(), any(BsonDocument.class), any(), any(), any(), any()); + + ConnectionSource writeSource = mock(ConnectionSource.class); + when(writeSource.release()).thenReturn(1); + when(writeSource.getServerDescription()).thenReturn(SERVER_DESCRIPTION); + when(writeSource.getConnection(any(OperationContext.class))).thenReturn(writeConnection); + + writeBinding = mock(WriteBinding.class); + when(writeBinding.getWriteConnectionSource(any(OperationContext.class))).thenReturn(writeSource); + + readConnection = mock(Connection.class); + when(readConnection.release()).thenReturn(1); + when(readConnection.getDescription()).thenReturn(CONNECTION_DESCRIPTION); + + ConnectionSource readSource = mock(ConnectionSource.class); + when(readSource.release()).thenReturn(1); + when(readSource.getServerDescription()).thenReturn(SERVER_DESCRIPTION); + when(readSource.getReadPreference()).thenReturn(ReadPreference.primary()); + when(readSource.getConnection(any(OperationContext.class))).thenReturn(readConnection); + + readBinding = mock(ReadBinding.class); + when(readBinding.getReadConnectionSource(any(OperationContext.class))).thenReturn(readSource); + } + + // -- StopStreamProcessorOperation -- + + @Test + void stopCommandShape() { + new StopStreamProcessorOperation(PROCESSOR_NAME).execute(writeBinding, OPERATION_CONTEXT); + BsonDocument command = capturedWriteCommand(); + assertEquals(new BsonDocument("stopStreamProcessor", new BsonString(PROCESSOR_NAME)), command); + } + + // -- DropStreamProcessorOperation -- + + @Test + void dropCommandShape() { + new DropStreamProcessorOperation(PROCESSOR_NAME).execute(writeBinding, OPERATION_CONTEXT); + BsonDocument command = capturedWriteCommand(); + assertEquals(new BsonDocument("dropStreamProcessor", new BsonString(PROCESSOR_NAME)), command); + } + + // -- CreateStreamProcessorOperation -- + + @Test + void createMinimalCommand() { + BsonDocument stage = new BsonDocument("$source", new BsonString("kafka")); + new CreateStreamProcessorOperation(PROCESSOR_NAME, Collections.singletonList(stage), null, null, null, null) + .execute(writeBinding, OPERATION_CONTEXT); + BsonDocument command = capturedWriteCommand(); + assertEquals(PROCESSOR_NAME, command.getString("createStreamProcessor").getValue()); + assertEquals(new BsonArray(Collections.singletonList(stage)), command.getArray("pipeline")); + assertFalse(command.containsKey("options")); + } + + @Test + void createCommandWithOptions() { + BsonDocument dlq = new BsonDocument("connectionName", new BsonString("kafka")) + .append("db", new BsonString("errors")).append("coll", new BsonString("dlq")); + new CreateStreamProcessorOperation(PROCESSOR_NAME, Collections.emptyList(), dlq, "ts", "SP10", true) + .execute(writeBinding, OPERATION_CONTEXT); + BsonDocument options = capturedWriteCommand().getDocument("options"); + assertEquals(dlq, options.getDocument("dlq")); + assertEquals("ts", options.getString("streamMetaFieldName").getValue()); + assertEquals("SP10", options.getString("tier").getValue()); + assertEquals(BsonBoolean.TRUE, options.getBoolean("failover")); + } + + @Test + void createOptionsAbsentWhenAllNull() { + new CreateStreamProcessorOperation(PROCESSOR_NAME, Collections.emptyList(), null, null, null, null) + .execute(writeBinding, OPERATION_CONTEXT); + assertFalse(capturedWriteCommand().containsKey("options")); + } + + // -- StartStreamProcessorOperation -- + + @Test + void startMinimalCommand() { + new StartStreamProcessorOperation(PROCESSOR_NAME, null, null, null, null, null, null, null, null) + .execute(writeBinding, OPERATION_CONTEXT); + BsonDocument command = capturedWriteCommand(); + assertEquals(PROCESSOR_NAME, command.getString("startStreamProcessor").getValue()); + assertFalse(command.containsKey("workers")); + assertFalse(command.containsKey("options")); + assertFalse(command.containsKey("failover")); + } + + @Test + void startCommandWithWorkers() { + new StartStreamProcessorOperation(PROCESSOR_NAME, 4, null, null, null, null, null, null, null) + .execute(writeBinding, OPERATION_CONTEXT); + assertEquals(4, capturedWriteCommand().getInt32("workers").getValue()); + } + + @Test + void startCommandWithOptions() { + BsonTimestamp ts = new BsonTimestamp(1000, 1); + new StartStreamProcessorOperation(PROCESSOR_NAME, null, true, ts, "SP10", true, null, null, null) + .execute(writeBinding, OPERATION_CONTEXT); + BsonDocument options = capturedWriteCommand().getDocument("options"); + assertEquals(BsonBoolean.TRUE, options.getBoolean("clearCheckpoints")); + assertEquals(ts, options.getTimestamp("startAtOperationTime")); + assertEquals("SP10", options.getString("tier").getValue()); + assertEquals(BsonBoolean.TRUE, options.getBoolean("enableAutoScaling")); + } + + @Test + void startCommandWithFailover() { + new StartStreamProcessorOperation(PROCESSOR_NAME, null, null, null, null, null, "US_EAST_1", "auto", true) + .execute(writeBinding, OPERATION_CONTEXT); + BsonDocument failover = capturedWriteCommand().getDocument("failover"); + assertEquals("US_EAST_1", failover.getString("region").getValue()); + assertEquals("auto", failover.getString("mode").getValue()); + assertEquals(BsonBoolean.TRUE, failover.getBoolean("dryRun")); + } + + @Test + void startNeverSendsStartAfter() { + new StartStreamProcessorOperation(PROCESSOR_NAME, null, null, null, null, null, null, null, null) + .execute(writeBinding, OPERATION_CONTEXT); + BsonDocument command = capturedWriteCommand(); + assertFalse(command.containsKey("startAfter")); + if (command.containsKey("options")) { + assertFalse(command.getDocument("options").containsKey("startAfter")); + } + } + + // -- StartSampleStreamProcessorOperation -- + + @Test + void startSampleMinimalCommand() { + doReturn(new BsonDocument("cursorId", new BsonInt64(42L))) + .when(writeConnection).command(anyString(), any(BsonDocument.class), any(), any(), any(), any()); + new StartSampleStreamProcessorOperation(PROCESSOR_NAME, null).execute(writeBinding, OPERATION_CONTEXT); + BsonDocument command = capturedWriteCommand(); + assertEquals(PROCESSOR_NAME, command.getString("startSampleStreamProcessor").getValue()); + assertFalse(command.containsKey("limit")); + } + + @Test + void startSampleCommandWithLimit() { + doReturn(new BsonDocument("cursorId", new BsonInt64(42L))) + .when(writeConnection).command(anyString(), any(BsonDocument.class), any(), any(), any(), any()); + new StartSampleStreamProcessorOperation(PROCESSOR_NAME, 100).execute(writeBinding, OPERATION_CONTEXT); + assertEquals(100, capturedWriteCommand().getInt32("limit").getValue()); + } + + // -- GetMoreSampleStreamProcessorOperation -- + + @Test + void getMoreSampleCommandShape() { + doReturn(new BsonDocument("cursorId", new BsonInt64(0L)).append("messages", new BsonArray())) + .when(writeConnection).command(anyString(), any(BsonDocument.class), any(), any(), any(), any()); + new GetMoreSampleStreamProcessorOperation(PROCESSOR_NAME, 42L, null).execute(writeBinding, OPERATION_CONTEXT); + BsonDocument command = capturedWriteCommand(); + assertEquals(PROCESSOR_NAME, command.getString("getMoreSampleStreamProcessor").getValue()); + assertEquals(42L, command.getInt64("cursorId").getValue()); + assertFalse(command.containsKey("batchSize")); + } + + @Test + void getMoreSampleCommandWithBatchSize() { + doReturn(new BsonDocument("cursorId", new BsonInt64(0L)).append("messages", new BsonArray())) + .when(writeConnection).command(anyString(), any(BsonDocument.class), any(), any(), any(), any()); + new GetMoreSampleStreamProcessorOperation(PROCESSOR_NAME, 42L, 50).execute(writeBinding, OPERATION_CONTEXT); + assertEquals(50, capturedWriteCommand().getInt32("batchSize").getValue()); + } + + // -- GetStreamProcessorOperation -- + + @Test + void getProcessorCommandShape() { + doReturn(minimalProcessorInfoDocument()) + .when(readConnection).command(anyString(), any(BsonDocument.class), any(), any(), any(), any()); + new GetStreamProcessorOperation(PROCESSOR_NAME, true).execute(readBinding, OPERATION_CONTEXT); + BsonDocument command = capturedReadCommand(); + assertEquals(new BsonDocument("getStreamProcessor", new BsonString(PROCESSOR_NAME)), command); + } + + // -- GetStreamProcessorStatsOperation -- + + @Test + void getProcessorStatsMinimalCommand() { + doReturn(new Document("ok", 1)) + .when(readConnection).command(anyString(), any(BsonDocument.class), any(), any(), any(), any()); + new GetStreamProcessorStatsOperation(PROCESSOR_NAME, true, null).execute(readBinding, OPERATION_CONTEXT); + BsonDocument command = capturedReadCommand(); + assertEquals(PROCESSOR_NAME, command.getString("getStreamProcessorStats").getValue()); + assertFalse(command.containsKey("options")); + } + + @Test + void getProcessorStatsCommandWithVerbose() { + doReturn(new Document("ok", 1)) + .when(readConnection).command(anyString(), any(BsonDocument.class), any(), any(), any(), any()); + new GetStreamProcessorStatsOperation(PROCESSOR_NAME, true, true).execute(readBinding, OPERATION_CONTEXT); + BsonDocument command = capturedReadCommand(); + assertEquals(BsonBoolean.TRUE, command.getDocument("options").getBoolean("verbose")); + } + + // -- helpers -- + + private BsonDocument capturedWriteCommand() { + ArgumentCaptor captor = forClass(BsonDocument.class); + verify(writeConnection).command(anyString(), captor.capture(), any(), any(), any(), any()); + return captor.getValue(); + } + + private BsonDocument capturedReadCommand() { + ArgumentCaptor captor = forClass(BsonDocument.class); + verify(readConnection).command(anyString(), captor.capture(), any(), any(), any(), any()); + return captor.getValue(); + } + + private static BsonDocument minimalProcessorInfoDocument() { + BsonDocument inner = new BsonDocument("tenantID", new BsonString("proc-1")) + .append("name", new BsonString(PROCESSOR_NAME)) + .append("state", new BsonString("CREATED")) + .append("pipeline", new BsonArray()) + .append("errorMsg", new BsonString("")); + return new BsonDocument("result", inner) + .append("ok", new BsonInt32(1)); + } +} diff --git a/driver-scala/src/main/scala/org/mongodb/scala/model/package.scala b/driver-scala/src/main/scala/org/mongodb/scala/model/package.scala index 0d23a38c2e..14095e6c80 100644 --- a/driver-scala/src/main/scala/org/mongodb/scala/model/package.scala +++ b/driver-scala/src/main/scala/org/mongodb/scala/model/package.scala @@ -992,6 +992,27 @@ package object model { */ @Sealed type ApproximateQuantileMethod = com.mongodb.client.model.ApproximateQuantileMethod + + /** @see [[com.mongodb.client.model.CreateStreamProcessorOptions]] */ + type CreateStreamProcessorOptions = com.mongodb.client.model.CreateStreamProcessorOptions + + /** @see [[com.mongodb.client.model.FailoverOptions]] */ + type FailoverOptions = com.mongodb.client.model.FailoverOptions + + /** @see [[com.mongodb.client.model.GetStreamProcessorSamplesOptions]] */ + type GetStreamProcessorSamplesOptions = com.mongodb.client.model.GetStreamProcessorSamplesOptions + + /** @see [[com.mongodb.client.model.GetStreamProcessorSamplesResult]] */ + type GetStreamProcessorSamplesResult = com.mongodb.client.model.GetStreamProcessorSamplesResult + + /** @see [[com.mongodb.client.model.GetStreamProcessorStatsOptions]] */ + type GetStreamProcessorStatsOptions = com.mongodb.client.model.GetStreamProcessorStatsOptions + + /** @see [[com.mongodb.client.model.StartStreamProcessorOptions]] */ + type StartStreamProcessorOptions = com.mongodb.client.model.StartStreamProcessorOptions + + /** @see [[com.mongodb.client.model.StreamProcessorInfo]] */ + type StreamProcessorInfo = com.mongodb.client.model.StreamProcessorInfo } // scalastyle:on number.of.methods number.of.types diff --git a/driver-scala/src/test/scala-2/org/mongodb/scala/ApiAliasAndCompanionSpec.scala b/driver-scala/src/test/scala-2/org/mongodb/scala/ApiAliasAndCompanionSpec.scala index 99fb5a5e78..be8d6e3f8d 100644 --- a/driver-scala/src/test/scala-2/org/mongodb/scala/ApiAliasAndCompanionSpec.scala +++ b/driver-scala/src/test/scala-2/org/mongodb/scala/ApiAliasAndCompanionSpec.scala @@ -248,6 +248,10 @@ class ApiAliasAndCompanionSpec extends BaseSpec { "MongoObservable", "Name", "NameCodecProvider", + "StreamProcessor", + "StreamProcessingClient", + "StreamProcessingClients", + "StreamProcessors", "SynchronousContextProvider", "TransactionBody", "FailPoint", diff --git a/driver-sync/src/main/com/mongodb/client/StreamProcessingClient.java b/driver-sync/src/main/com/mongodb/client/StreamProcessingClient.java new file mode 100644 index 0000000000..340a9c5dcc --- /dev/null +++ b/driver-sync/src/main/com/mongodb/client/StreamProcessingClient.java @@ -0,0 +1,54 @@ +/* + * Copyright 2008-present MongoDB, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.mongodb.client; + +import com.mongodb.annotations.Alpha; +import com.mongodb.annotations.Reason; +import com.mongodb.annotations.Sealed; +import com.mongodb.annotations.ThreadSafe; + +import java.io.Closeable; + +/** + * A client for connecting to an Atlas Stream Processing workspace. + * + *

Workspace connection strings match the pattern + * {@code mongodb://[username:password@]atlas-stream-*.*a.query.mongodb*.net/}. + * TLS, {@code loadBalanced=true}, and {@code authSource=admin} are applied automatically.

+ * + *

Use {@link StreamProcessingClients} to create an instance.

+ * + * @since 5.5 + */ +@Alpha(Reason.CLIENT) +@Sealed +@ThreadSafe +public interface StreamProcessingClient extends Closeable { + + /** + * Returns a handle for managing stream processors in this workspace. + * + * @return a {@link StreamProcessors} handle for this workspace + */ + StreamProcessors streamProcessors(); + + /** + * Closes this client, releasing all underlying resources. + */ + @Override + void close(); +} diff --git a/driver-sync/src/main/com/mongodb/client/StreamProcessingClients.java b/driver-sync/src/main/com/mongodb/client/StreamProcessingClients.java new file mode 100644 index 0000000000..e06aa961fb --- /dev/null +++ b/driver-sync/src/main/com/mongodb/client/StreamProcessingClients.java @@ -0,0 +1,128 @@ +/* + * Copyright 2008-present MongoDB, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.mongodb.client; + +import com.mongodb.ConnectionString; +import com.mongodb.MongoClientSettings; +import com.mongodb.MongoDriverInformation; +import com.mongodb.client.internal.Clusters; +import com.mongodb.client.internal.StreamProcessingClientImpl; +import com.mongodb.internal.connection.Cluster; +import com.mongodb.internal.connection.StreamFactoryFactory; +import com.mongodb.lang.Nullable; + +import static com.mongodb.assertions.Assertions.notNull; +import static com.mongodb.internal.connection.ServerAddressHelper.getInetAddressResolver; +import static com.mongodb.internal.connection.StreamFactoryHelper.getSyncStreamFactoryFactory; + +/** + * A factory for {@link StreamProcessingClient} instances. + * + *

The connection string must target an Atlas Stream Processing workspace — a host matching the pattern + * {@code atlas-stream-*.*a.query.mongodb*.net}. TLS, {@code directConnection=true}, and + * {@code authSource=admin} are applied automatically when the workspace host is detected.

+ * + * @see StreamProcessingClient + * @since 5.5 + */ +public final class StreamProcessingClients { + + /** + * Creates a new client with the given connection string. + * + * @param connectionString the connection string targeting a stream processing workspace + * @return the client + * @throws IllegalArgumentException if the connection string does not target a stream processing workspace + */ + public static StreamProcessingClient create(final String connectionString) { + return create(new ConnectionString(connectionString)); + } + + /** + * Creates a new client with the given connection string. + * + * @param connectionString the connection string targeting a stream processing workspace + * @return the client + * @throws IllegalArgumentException if the connection string does not target a stream processing workspace + */ + public static StreamProcessingClient create(final ConnectionString connectionString) { + return create(connectionString, null); + } + + /** + * Creates a new client with the given connection string. + * + *

Note: Intended for driver and library authors to associate extra driver metadata with the connections.

+ * + * @param connectionString the connection string targeting a stream processing workspace + * @param mongoDriverInformation any driver information to associate with the client + * @return the client + * @throws IllegalArgumentException if the connection string does not target a stream processing workspace + */ + public static StreamProcessingClient create(final ConnectionString connectionString, + @Nullable final MongoDriverInformation mongoDriverInformation) { + notNull("connectionString", connectionString); + if (!connectionString.isAtlasStreamProcessingWorkspace()) { + throw new IllegalArgumentException( + "The connection string does not target an Atlas Stream Processing workspace. " + + "Workspace hosts must match the pattern: atlas-stream-*.*a.query.mongodb*.net"); + } + MongoClientSettings settings = MongoClientSettings.builder() + .applyConnectionString(connectionString) + .build(); + return create(settings, mongoDriverInformation); + } + + /** + * Creates a new client with the given settings. + * + * @param settings the settings + * @return the client + */ + public static StreamProcessingClient create(final MongoClientSettings settings) { + return create(settings, null); + } + + /** + * Creates a new client with the given settings. + * + *

Note: Intended for driver and library authors to associate extra driver metadata with the connections.

+ * + * @param settings the settings + * @param mongoDriverInformation any driver information to associate with the client + * @return the client + */ + public static StreamProcessingClient create(final MongoClientSettings settings, + @Nullable final MongoDriverInformation mongoDriverInformation) { + notNull("settings", settings); + + MongoDriverInformation.Builder builder = mongoDriverInformation == null + ? MongoDriverInformation.builder() + : MongoDriverInformation.builder(mongoDriverInformation); + MongoDriverInformation driverInfo = builder.driverName("stream-processing").build(); + + StreamFactoryFactory streamFactoryFactory = getSyncStreamFactoryFactory( + settings.getTransportSettings(), + getInetAddressResolver(settings)); + + Cluster cluster = Clusters.createCluster(settings, driverInfo, streamFactoryFactory); + return new StreamProcessingClientImpl(cluster, settings, driverInfo, streamFactoryFactory); + } + + private StreamProcessingClients() { + } +} diff --git a/driver-sync/src/main/com/mongodb/client/StreamProcessor.java b/driver-sync/src/main/com/mongodb/client/StreamProcessor.java new file mode 100644 index 0000000000..e99a6b48ec --- /dev/null +++ b/driver-sync/src/main/com/mongodb/client/StreamProcessor.java @@ -0,0 +1,130 @@ +/* + * Copyright 2008-present MongoDB, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.mongodb.client; + +import com.mongodb.annotations.Alpha; +import com.mongodb.annotations.Reason; +import com.mongodb.annotations.Sealed; +import com.mongodb.annotations.ThreadSafe; +import com.mongodb.client.model.GetStreamProcessorSamplesOptions; +import com.mongodb.client.model.GetStreamProcessorSamplesResult; +import com.mongodb.client.model.GetStreamProcessorStatsOptions; +import com.mongodb.client.model.StartStreamProcessorOptions; +import org.bson.Document; + +/** + * A handle for a named stream processor in an Atlas Stream Processing workspace. + * + *

Obtaining this handle does not imply the processor currently exists on the server. + * Obtain an instance via {@link StreamProcessors#get(String)}.

+ * + * @since 5.5 + */ +@Alpha(Reason.CLIENT) +@Sealed +@ThreadSafe +public interface StreamProcessor { + + /** + * Gets the name of this stream processor. + * + * @return the processor name + */ + String getName(); + + /** + * Starts the stream processor. + * + *

Sends the {@code startStreamProcessor} command. The processor must be in the + * {@code STOPPED} or {@code FAILED} state.

+ */ + void start(); + + /** + * Starts the stream processor with the given options. + * + *

Sends the {@code startStreamProcessor} command. The processor must be in the + * {@code STOPPED} or {@code FAILED} state.

+ * + * @param options the options for starting the processor + */ + void start(StartStreamProcessorOptions options); + + /** + * Stops the stream processor. + * + *

Sends the {@code stopStreamProcessor} command. The processor transitions to the + * {@code STOPPED} state and can be restarted.

+ */ + void stop(); + + /** + * Permanently drops the stream processor. + * + *

Sends the {@code dropStreamProcessor} command. A dropped processor cannot be recovered.

+ */ + void drop(); + + /** + * Returns runtime statistics for the stream processor. + * + *

Sends the {@code getStreamProcessorStats} command. Returns an error if the processor + * is not in the {@code STARTED} state.

+ * + * @return a document containing the processor's runtime statistics + */ + Document stats(); + + /** + * Returns runtime statistics for the stream processor with the given options. + * + *

Sends the {@code getStreamProcessorStats} command. Returns an error if the processor + * is not in the {@code STARTED} state.

+ * + * @param options the options for retrieving statistics + * @return a document containing the processor's runtime statistics + */ + Document stats(GetStreamProcessorStatsOptions options); + + /** + * Retrieves a batch of sampled documents from a running stream processor. + * + *

On the first call, sends {@code startSampleStreamProcessor} to open a new sample cursor. + * On subsequent calls, sends {@code getMoreSampleStreamProcessor} to fetch the next batch.

+ * + *

Callers MUST check {@link GetStreamProcessorSamplesResult#getCursorId()}: a value of + * {@code 0} means the cursor is exhausted and no further calls should be made.

+ * + * @return the result containing the sampled documents and the cursor ID for the next call + */ + GetStreamProcessorSamplesResult getStreamProcessorSamples(); + + /** + * Retrieves a batch of sampled documents from a running stream processor with the given options. + * + *

If {@link GetStreamProcessorSamplesOptions#getCursorId()} is absent or zero, sends + * {@code startSampleStreamProcessor} to open a new sample cursor. Otherwise, sends + * {@code getMoreSampleStreamProcessor} using the provided cursor ID.

+ * + *

Callers MUST check {@link GetStreamProcessorSamplesResult#getCursorId()}: a value of + * {@code 0} means the cursor is exhausted and no further calls should be made.

+ * + * @param options the options controlling cursor ID, limit, and batch size + * @return the result containing the sampled documents and the cursor ID for the next call + */ + GetStreamProcessorSamplesResult getStreamProcessorSamples(GetStreamProcessorSamplesOptions options); +} diff --git a/driver-sync/src/main/com/mongodb/client/StreamProcessors.java b/driver-sync/src/main/com/mongodb/client/StreamProcessors.java new file mode 100644 index 0000000000..4b946e574b --- /dev/null +++ b/driver-sync/src/main/com/mongodb/client/StreamProcessors.java @@ -0,0 +1,83 @@ +/* + * Copyright 2008-present MongoDB, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.mongodb.client; + +import com.mongodb.annotations.Alpha; +import com.mongodb.annotations.Reason; +import com.mongodb.annotations.Sealed; +import com.mongodb.annotations.ThreadSafe; +import com.mongodb.client.model.CreateStreamProcessorOptions; +import com.mongodb.client.model.StreamProcessorInfo; +import org.bson.conversions.Bson; + +import java.util.List; + +/** + * A handle for managing stream processors in an Atlas Stream Processing workspace. + * + *

Obtain an instance via {@link StreamProcessingClient#streamProcessors()}.

+ * + * @since 5.5 + */ +@Alpha(Reason.CLIENT) +@Sealed +@ThreadSafe +public interface StreamProcessors { + + /** + * Creates a new stream processor with the given name and pipeline. + * + *

Sends the {@code createStreamProcessor} command.

+ * + * @param name the name of the stream processor + * @param pipeline the aggregation pipeline defining the processor's logic + */ + void create(String name, List pipeline); + + /** + * Creates a new stream processor with the given name, pipeline, and options. + * + *

Sends the {@code createStreamProcessor} command.

+ * + * @param name the name of the stream processor + * @param pipeline the aggregation pipeline defining the processor's logic + * @param options the options for creating the processor + */ + void create(String name, List pipeline, CreateStreamProcessorOptions options); + + /** + * Returns a handle for an existing stream processor by name. + * + *

This method does not send any commands to the server; the returned handle is + * a local reference. Use it to call {@link StreamProcessor#start()}, + * {@link StreamProcessor#stop()}, {@link StreamProcessor#drop()}, etc.

+ * + * @param name the name of the stream processor + * @return a {@link StreamProcessor} handle for the named processor + */ + StreamProcessor get(String name); + + /** + * Returns information about a single stream processor. + * + *

Sends the {@code getStreamProcessor} command.

+ * + * @param name the name of the stream processor + * @return information about the named processor + */ + StreamProcessorInfo getInfo(String name); +} diff --git a/driver-sync/src/main/com/mongodb/client/internal/StreamProcessingClientImpl.java b/driver-sync/src/main/com/mongodb/client/internal/StreamProcessingClientImpl.java new file mode 100644 index 0000000000..0d188b550f --- /dev/null +++ b/driver-sync/src/main/com/mongodb/client/internal/StreamProcessingClientImpl.java @@ -0,0 +1,144 @@ +/* + * Copyright 2008-present MongoDB, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.mongodb.client.internal; + +import com.mongodb.MongoClientSettings; +import com.mongodb.MongoDriverInformation; +import com.mongodb.ReadConcern; +import com.mongodb.ReadPreference; +import com.mongodb.annotations.ThreadSafe; +import com.mongodb.client.StreamProcessingClient; +import com.mongodb.client.StreamProcessors; +import com.mongodb.internal.TimeoutSettings; +import com.mongodb.internal.binding.ClusterBinding; +import com.mongodb.internal.binding.ReadBinding; +import com.mongodb.internal.binding.WriteBinding; +import com.mongodb.internal.connection.Cluster; +import com.mongodb.internal.connection.OperationContext; +import com.mongodb.internal.connection.StreamFactoryFactory; +import com.mongodb.internal.diagnostics.logging.Logger; +import com.mongodb.internal.diagnostics.logging.Loggers; +import com.mongodb.internal.operation.ReadOperation; +import com.mongodb.internal.operation.WriteOperation; +import com.mongodb.lang.Nullable; + +import java.util.concurrent.atomic.AtomicBoolean; + +import static com.mongodb.ReadPreference.primary; +import static com.mongodb.assertions.Assertions.notNull; + +/** + *

This class is not part of the public API and may be removed or changed at any time

+ */ +@ThreadSafe +public final class StreamProcessingClientImpl implements StreamProcessingClient { + private static final Logger LOGGER = Loggers.getLogger("client"); + + private final Cluster cluster; + private final StreamFactoryFactory streamFactoryFactory; + private final OperationExecutor executor; + private final boolean retryReads; + private final AtomicBoolean closed = new AtomicBoolean(); + + public StreamProcessingClientImpl(final Cluster cluster, + final MongoClientSettings settings, + final MongoDriverInformation mongoDriverInformation, + final StreamFactoryFactory streamFactoryFactory) { + this.cluster = notNull("cluster", cluster); + this.streamFactoryFactory = notNull("streamFactoryFactory", streamFactoryFactory); + notNull("settings", settings); + this.retryReads = settings.getRetryReads(); + this.executor = new SimpleOperationExecutor(cluster, TimeoutSettings.create(settings), settings.getServerApi()); + LOGGER.info("StreamProcessingClient created with settings " + settings); + } + + @Override + public StreamProcessors streamProcessors() { + return new StreamProcessorsImpl(executor, retryReads); + } + + @Override + public void close() { + if (!closed.getAndSet(true)) { + cluster.close(); + streamFactoryFactory.close(); + } + } + + /** + * A minimal OperationExecutor for stream processing workspaces. + * No sessions, CSFLE, or transactions are supported. + */ + private static final class SimpleOperationExecutor implements OperationExecutor { + private final Cluster cluster; + private final TimeoutSettings timeoutSettings; + @Nullable + private final com.mongodb.ServerApi serverApi; + + SimpleOperationExecutor(final Cluster cluster, final TimeoutSettings timeoutSettings, + @Nullable final com.mongodb.ServerApi serverApi) { + this.cluster = cluster; + this.timeoutSettings = timeoutSettings; + this.serverApi = serverApi; + } + + @Override + public T execute(final ReadOperation operation, final ReadPreference readPreference, + final ReadConcern readConcern) { + return execute(operation, readPreference, readConcern, null); + } + + @Override + public T execute(final WriteOperation operation, final ReadConcern readConcern) { + return execute(operation, readConcern, null); + } + + @Override + public T execute(final ReadOperation operation, final ReadPreference readPreference, + final ReadConcern readConcern, @Nullable final com.mongodb.client.ClientSession session) { + OperationContext operationContext = OperationContext.simpleOperationContext(timeoutSettings, serverApi); + ReadBinding binding = new ClusterBinding(cluster, readPreference); + try { + return operation.execute(binding, operationContext); + } finally { + binding.release(); + } + } + + @Override + public T execute(final WriteOperation operation, final ReadConcern readConcern, + @Nullable final com.mongodb.client.ClientSession session) { + OperationContext operationContext = OperationContext.simpleOperationContext(timeoutSettings, serverApi); + WriteBinding binding = new ClusterBinding(cluster, primary()); + try { + return operation.execute(binding, operationContext); + } finally { + binding.release(); + } + } + + @Override + public OperationExecutor withTimeoutSettings(final TimeoutSettings newTimeoutSettings) { + return new SimpleOperationExecutor(cluster, newTimeoutSettings, serverApi); + } + + @Override + public TimeoutSettings getTimeoutSettings() { + return timeoutSettings; + } + } +} diff --git a/driver-sync/src/main/com/mongodb/client/internal/StreamProcessorImpl.java b/driver-sync/src/main/com/mongodb/client/internal/StreamProcessorImpl.java new file mode 100644 index 0000000000..9b5e00acbc --- /dev/null +++ b/driver-sync/src/main/com/mongodb/client/internal/StreamProcessorImpl.java @@ -0,0 +1,126 @@ +/* + * Copyright 2008-present MongoDB, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.mongodb.client.internal; + +import com.mongodb.ReadConcern; +import com.mongodb.ReadPreference; +import com.mongodb.annotations.ThreadSafe; +import com.mongodb.client.StreamProcessor; +import com.mongodb.client.model.FailoverOptions; +import com.mongodb.client.model.GetStreamProcessorSamplesOptions; +import com.mongodb.client.model.GetStreamProcessorSamplesResult; +import com.mongodb.client.model.GetStreamProcessorStatsOptions; +import com.mongodb.client.model.StartStreamProcessorOptions; +import com.mongodb.internal.operation.DropStreamProcessorOperation; +import com.mongodb.internal.operation.GetMoreSampleStreamProcessorOperation; +import com.mongodb.internal.operation.GetStreamProcessorStatsOperation; +import com.mongodb.internal.operation.StartSampleStreamProcessorOperation; +import com.mongodb.internal.operation.StartStreamProcessorOperation; +import com.mongodb.internal.operation.StopStreamProcessorOperation; +import org.bson.Document; + +import static com.mongodb.assertions.Assertions.notNull; + +/** + *

This class is not part of the public API and may be removed or changed at any time

+ */ +@ThreadSafe +public final class StreamProcessorImpl implements StreamProcessor { + private final String name; + private final OperationExecutor executor; + private final boolean retryReads; + + StreamProcessorImpl(final String name, final OperationExecutor executor, final boolean retryReads) { + this.name = notNull("name", name); + this.executor = notNull("executor", executor); + this.retryReads = retryReads; + } + + @Override + public String getName() { + return name; + } + + @Override + public void start() { + executor.execute( + new StartStreamProcessorOperation(name, null, null, null, null, null, null, null, null), + ReadConcern.DEFAULT); + } + + @Override + public void start(final StartStreamProcessorOptions options) { + notNull("options", options); + FailoverOptions failover = options.getFailover(); + String failoverRegion = failover != null ? failover.getRegion() : null; + String failoverMode = failover != null ? failover.getMode() : null; + Boolean failoverDryRun = failover != null ? failover.getDryRun() : null; + executor.execute( + new StartStreamProcessorOperation(name, options.getWorkers(), options.getClearCheckpoints(), + options.getStartAtOperationTime(), options.getTier(), options.getEnableAutoScaling(), + failoverRegion, failoverMode, failoverDryRun), + ReadConcern.DEFAULT); + } + + @Override + public void stop() { + executor.execute(new StopStreamProcessorOperation(name), ReadConcern.DEFAULT); + } + + @Override + public void drop() { + executor.execute(new DropStreamProcessorOperation(name), ReadConcern.DEFAULT); + } + + @Override + public Document stats() { + return executor.execute( + new GetStreamProcessorStatsOperation(name, retryReads, null), + ReadPreference.primary(), ReadConcern.DEFAULT); + } + + @Override + public Document stats(final GetStreamProcessorStatsOptions options) { + notNull("options", options); + return executor.execute( + new GetStreamProcessorStatsOperation(name, retryReads, options.getVerbose()), + ReadPreference.primary(), ReadConcern.DEFAULT); + } + + @Override + public GetStreamProcessorSamplesResult getStreamProcessorSamples() { + return getStreamProcessorSamples(new GetStreamProcessorSamplesOptions()); + } + + @Override + public GetStreamProcessorSamplesResult getStreamProcessorSamples(final GetStreamProcessorSamplesOptions options) { + notNull("options", options); + Long cursorId = options.getCursorId(); + if (cursorId == null || cursorId == 0L) { + long openedCursorId = executor.execute( + new StartSampleStreamProcessorOperation(name, options.getLimit()), + ReadConcern.DEFAULT); + return executor.execute( + new GetMoreSampleStreamProcessorOperation(name, openedCursorId, null), + ReadConcern.DEFAULT); + } else { + return executor.execute( + new GetMoreSampleStreamProcessorOperation(name, cursorId, options.getBatchSize()), + ReadConcern.DEFAULT); + } + } +} diff --git a/driver-sync/src/main/com/mongodb/client/internal/StreamProcessorsImpl.java b/driver-sync/src/main/com/mongodb/client/internal/StreamProcessorsImpl.java new file mode 100644 index 0000000000..44327f3061 --- /dev/null +++ b/driver-sync/src/main/com/mongodb/client/internal/StreamProcessorsImpl.java @@ -0,0 +1,92 @@ +/* + * Copyright 2008-present MongoDB, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.mongodb.client.internal; + +import com.mongodb.MongoClientSettings; +import com.mongodb.ReadConcern; +import com.mongodb.ReadPreference; +import com.mongodb.annotations.ThreadSafe; +import com.mongodb.client.StreamProcessor; +import com.mongodb.client.StreamProcessors; +import com.mongodb.client.model.CreateStreamProcessorOptions; +import com.mongodb.client.model.StreamProcessorInfo; +import com.mongodb.internal.operation.CreateStreamProcessorOperation; +import com.mongodb.internal.operation.GetStreamProcessorOperation; +import org.bson.BsonDocument; +import org.bson.conversions.Bson; + +import java.util.ArrayList; +import java.util.List; + +import static com.mongodb.assertions.Assertions.notNull; + +/** + *

This class is not part of the public API and may be removed or changed at any time

+ */ +@ThreadSafe +public final class StreamProcessorsImpl implements StreamProcessors { + private final OperationExecutor executor; + private final boolean retryReads; + + StreamProcessorsImpl(final OperationExecutor executor, final boolean retryReads) { + this.executor = notNull("executor", executor); + this.retryReads = retryReads; + } + + @Override + public void create(final String name, final List pipeline) { + create(name, pipeline, new CreateStreamProcessorOptions()); + } + + @Override + public void create(final String name, final List pipeline, final CreateStreamProcessorOptions options) { + notNull("name", name); + notNull("pipeline", pipeline); + notNull("options", options); + Bson dlq = options.getDlq(); + BsonDocument dlqDoc = dlq != null + ? dlq.toBsonDocument(BsonDocument.class, MongoClientSettings.getDefaultCodecRegistry()) + : null; + executor.execute( + new CreateStreamProcessorOperation(name, toBsonDocumentList(pipeline), dlqDoc, + options.getStreamMetaFieldName(), options.getTier(), options.getFailover()), + ReadConcern.DEFAULT); + } + + @Override + public StreamProcessor get(final String name) { + notNull("name", name); + return new StreamProcessorImpl(name, executor, retryReads); + } + + @Override + public StreamProcessorInfo getInfo(final String name) { + notNull("name", name); + return executor.execute( + new GetStreamProcessorOperation(name, retryReads), + ReadPreference.primary(), ReadConcern.DEFAULT); + } + + private static List toBsonDocumentList(final List pipeline) { + List result = new ArrayList<>(pipeline.size()); + for (Bson stage : pipeline) { + notNull("pipeline stage", stage); + result.add(stage.toBsonDocument(BsonDocument.class, MongoClientSettings.getDefaultCodecRegistry())); + } + return result; + } +} diff --git a/driver-sync/src/main/com/mongodb/client/package-info.java b/driver-sync/src/main/com/mongodb/client/package-info.java index f98e983c21..43fea9ccf1 100644 --- a/driver-sync/src/main/com/mongodb/client/package-info.java +++ b/driver-sync/src/main/com/mongodb/client/package-info.java @@ -16,6 +16,10 @@ /** * This package contains the synchronous CRUD API. + * + *

Use {@link com.mongodb.client.MongoClients} to create a {@link com.mongodb.client.MongoClient} for standard MongoDB connections. + * Use {@link com.mongodb.client.StreamProcessingClients} to create a {@link com.mongodb.client.StreamProcessingClient} + * for Atlas Stream Processing workspace connections.

*/ @NonNullApi package com.mongodb.client; diff --git a/driver-sync/src/test/unit/com/mongodb/client/StreamProcessingClientsTest.java b/driver-sync/src/test/unit/com/mongodb/client/StreamProcessingClientsTest.java new file mode 100644 index 0000000000..744deda088 --- /dev/null +++ b/driver-sync/src/test/unit/com/mongodb/client/StreamProcessingClientsTest.java @@ -0,0 +1,97 @@ +/* + * Copyright 2008-present MongoDB, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.mongodb.client; + +import com.mongodb.ConnectionString; +import com.mongodb.MongoClientSettings; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; + +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertInstanceOf; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +class StreamProcessingClientsTest { + + private static final String WORKSPACE_URI = + "mongodb://user:pass@atlas-stream-68f93575a1b17c4d20fb60cb-y7ufzk.virginia-usa.a.query.mongodb-qa.net/"; + + @ParameterizedTest + @ValueSource(strings = { + "mongodb://localhost/", + "mongodb://cluster0.example.mongodb.net/", + "mongodb://user:pass@hostname.mongodb.net/" + }) + @DisplayName("Non-workspace connection string throws IllegalArgumentException") + void nonWorkspaceUriThrows(final String uri) { + IllegalArgumentException ex = assertThrows(IllegalArgumentException.class, + () -> StreamProcessingClients.create(uri)); + assertTrue(ex.getMessage().contains("Atlas Stream Processing workspace"), + "Exception message should mention Atlas Stream Processing workspace"); + } + + @Test + @DisplayName("Non-workspace ConnectionString object throws IllegalArgumentException") + void nonWorkspaceConnectionStringObjectThrows() { + ConnectionString cs = new ConnectionString("mongodb://localhost/"); + assertThrows(IllegalArgumentException.class, () -> StreamProcessingClients.create(cs)); + } + + @Test + @DisplayName("Workspace URI returns a StreamProcessingClient instance") + void workspaceUriReturnsClient() { + try (StreamProcessingClient client = StreamProcessingClients.create(WORKSPACE_URI)) { + assertInstanceOf(StreamProcessingClient.class, client); + } + } + + @Test + @DisplayName("Workspace ConnectionString returns a StreamProcessingClient instance") + void workspaceConnectionStringReturnsClient() { + ConnectionString cs = new ConnectionString(WORKSPACE_URI); + assertTrue(cs.isAtlasStreamProcessingWorkspace()); + try (StreamProcessingClient client = StreamProcessingClients.create(cs)) { + assertInstanceOf(StreamProcessingClient.class, client); + } + } + + @Test + @DisplayName("create(MongoClientSettings) with SINGLE (directConnection) settings succeeds") + void settingsBasedCreateSucceeds() { + MongoClientSettings settings = MongoClientSettings.builder() + .applyConnectionString(new ConnectionString(WORKSPACE_URI)) + .build(); + assertDoesNotThrow(() -> { + try (StreamProcessingClient client = StreamProcessingClients.create(settings)) { + assertInstanceOf(StreamProcessingClient.class, client); + } + }); + } + + @Test + @DisplayName("close() is idempotent — double-close does not throw") + void closeIsIdempotent() { + StreamProcessingClient client = StreamProcessingClients.create(WORKSPACE_URI); + assertDoesNotThrow(() -> { + client.close(); + client.close(); + }); + } +} diff --git a/driver-sync/src/test/unit/com/mongodb/client/internal/StreamProcessorImplTest.java b/driver-sync/src/test/unit/com/mongodb/client/internal/StreamProcessorImplTest.java new file mode 100644 index 0000000000..132ee1b30b --- /dev/null +++ b/driver-sync/src/test/unit/com/mongodb/client/internal/StreamProcessorImplTest.java @@ -0,0 +1,143 @@ +/* + * Copyright 2008-present MongoDB, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.mongodb.client.internal; + +import com.mongodb.ReadConcern; +import com.mongodb.ReadPreference; +import com.mongodb.client.model.GetStreamProcessorSamplesOptions; +import com.mongodb.client.model.GetStreamProcessorSamplesResult; +import com.mongodb.client.model.GetStreamProcessorStatsOptions; +import com.mongodb.client.model.StartStreamProcessorOptions; +import com.mongodb.internal.operation.DropStreamProcessorOperation; +import com.mongodb.internal.operation.GetMoreSampleStreamProcessorOperation; +import com.mongodb.internal.operation.GetStreamProcessorStatsOperation; +import com.mongodb.internal.operation.StartSampleStreamProcessorOperation; +import com.mongodb.internal.operation.StartStreamProcessorOperation; +import com.mongodb.internal.operation.StopStreamProcessorOperation; +import org.bson.Document; +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; + +import static java.util.Collections.emptyList; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertInstanceOf; +import static org.junit.jupiter.api.Assertions.assertNull; + +class StreamProcessorImplTest { + private static final String PROCESSOR_NAME = "myProcessor"; + + @Test + void getName() { + StreamProcessorImpl proc = new StreamProcessorImpl(PROCESSOR_NAME, + new TestOperationExecutor(new ArrayList<>()), true); + assertEquals(PROCESSOR_NAME, proc.getName()); + } + + @Test + void startWithNoOptionsDispatchesStartOperation() { + TestOperationExecutor executor = new TestOperationExecutor(new ArrayList<>(Collections.singletonList(null))); + new StreamProcessorImpl(PROCESSOR_NAME, executor, true).start(); + assertInstanceOf(StartStreamProcessorOperation.class, executor.getWriteOperation()); + } + + @Test + void startWithOptionsDispatchesStartOperation() { + TestOperationExecutor executor = new TestOperationExecutor(new ArrayList<>(Collections.singletonList(null))); + new StreamProcessorImpl(PROCESSOR_NAME, executor, true) + .start(new StartStreamProcessorOptions()); + assertInstanceOf(StartStreamProcessorOperation.class, executor.getWriteOperation()); + assertEquals(ReadConcern.DEFAULT, executor.getReadConcern()); + } + + @Test + void stopDispatchesStopOperation() { + TestOperationExecutor executor = new TestOperationExecutor(new ArrayList<>(Collections.singletonList(null))); + new StreamProcessorImpl(PROCESSOR_NAME, executor, true).stop(); + assertInstanceOf(StopStreamProcessorOperation.class, executor.getWriteOperation()); + } + + @Test + void dropDispatchesDropOperation() { + TestOperationExecutor executor = new TestOperationExecutor(new ArrayList<>(Collections.singletonList(null))); + new StreamProcessorImpl(PROCESSOR_NAME, executor, true).drop(); + assertInstanceOf(DropStreamProcessorOperation.class, executor.getWriteOperation()); + } + + @Test + void statsDispatchesGetStatsOperation() { + Document statsDoc = new Document("ok", 1); + TestOperationExecutor executor = new TestOperationExecutor(new ArrayList<>(Collections.singletonList(statsDoc))); + Document result = new StreamProcessorImpl(PROCESSOR_NAME, executor, true).stats(); + assertEquals(statsDoc, result); + assertInstanceOf(GetStreamProcessorStatsOperation.class, executor.getReadOperation()); + assertEquals(ReadPreference.primary(), executor.getReadPreference()); + assertEquals(ReadConcern.DEFAULT, executor.getReadConcern()); + } + + @Test + void statsWithOptionsDispatchesGetStatsOperation() { + Document statsDoc = new Document("ok", 1); + TestOperationExecutor executor = new TestOperationExecutor(new ArrayList<>(Collections.singletonList(statsDoc))); + new StreamProcessorImpl(PROCESSOR_NAME, executor, true) + .stats(new GetStreamProcessorStatsOptions().verbose(true)); + assertInstanceOf(GetStreamProcessorStatsOperation.class, executor.getReadOperation()); + } + + @Test + void getStreamProcessorSamplesInitialCallExecutesStartThenGetMore() { + GetStreamProcessorSamplesResult samplesResult = new GetStreamProcessorSamplesResult(0L, emptyList()); + TestOperationExecutor executor = new TestOperationExecutor(new ArrayList<>(Arrays.asList(42L, samplesResult))); + GetStreamProcessorSamplesResult result = + new StreamProcessorImpl(PROCESSOR_NAME, executor, true).getStreamProcessorSamples(); + assertEquals(samplesResult, result); + assertInstanceOf(StartSampleStreamProcessorOperation.class, executor.getWriteOperation()); + assertInstanceOf(GetMoreSampleStreamProcessorOperation.class, executor.getWriteOperation()); + } + + @Test + void getStreamProcessorSamplesWithNullCursorIdExecutesStartThenGetMore() { + GetStreamProcessorSamplesResult samplesResult = new GetStreamProcessorSamplesResult(0L, emptyList()); + TestOperationExecutor executor = new TestOperationExecutor(new ArrayList<>(Arrays.asList(42L, samplesResult))); + new StreamProcessorImpl(PROCESSOR_NAME, executor, true) + .getStreamProcessorSamples(new GetStreamProcessorSamplesOptions()); + assertInstanceOf(StartSampleStreamProcessorOperation.class, executor.getWriteOperation()); + assertInstanceOf(GetMoreSampleStreamProcessorOperation.class, executor.getWriteOperation()); + } + + @Test + void getStreamProcessorSamplesWithZeroCursorIdExecutesStartThenGetMore() { + GetStreamProcessorSamplesResult samplesResult = new GetStreamProcessorSamplesResult(0L, emptyList()); + TestOperationExecutor executor = new TestOperationExecutor(new ArrayList<>(Arrays.asList(42L, samplesResult))); + new StreamProcessorImpl(PROCESSOR_NAME, executor, true) + .getStreamProcessorSamples(new GetStreamProcessorSamplesOptions().cursorId(0L)); + assertInstanceOf(StartSampleStreamProcessorOperation.class, executor.getWriteOperation()); + assertInstanceOf(GetMoreSampleStreamProcessorOperation.class, executor.getWriteOperation()); + } + + @Test + void getStreamProcessorSamplesWithExistingCursorIdExecutesGetMoreOnly() { + GetStreamProcessorSamplesResult samplesResult = new GetStreamProcessorSamplesResult(0L, emptyList()); + TestOperationExecutor executor = new TestOperationExecutor(new ArrayList<>(Collections.singletonList(samplesResult))); + new StreamProcessorImpl(PROCESSOR_NAME, executor, true) + .getStreamProcessorSamples(new GetStreamProcessorSamplesOptions().cursorId(42L)); + assertInstanceOf(GetMoreSampleStreamProcessorOperation.class, executor.getWriteOperation()); + assertNull(executor.getWriteOperation()); + } +} diff --git a/driver-sync/src/test/unit/com/mongodb/client/internal/StreamProcessorsImplTest.java b/driver-sync/src/test/unit/com/mongodb/client/internal/StreamProcessorsImplTest.java new file mode 100644 index 0000000000..1ff023abde --- /dev/null +++ b/driver-sync/src/test/unit/com/mongodb/client/internal/StreamProcessorsImplTest.java @@ -0,0 +1,95 @@ +/* + * Copyright 2008-present MongoDB, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.mongodb.client.internal; + +import com.mongodb.ReadConcern; +import com.mongodb.ReadPreference; +import com.mongodb.client.model.CreateStreamProcessorOptions; +import com.mongodb.client.model.StreamProcessorInfo; +import com.mongodb.internal.operation.CreateStreamProcessorOperation; +import com.mongodb.internal.operation.GetStreamProcessorOperation; +import org.bson.BsonDocument; +import org.bson.BsonString; +import org.bson.conversions.Bson; +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import static java.util.Collections.emptyList; +import static java.util.Collections.singletonList; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertInstanceOf; + +class StreamProcessorsImplTest { + private static final String PROCESSOR_NAME = "myProcessor"; + private static final List PIPELINE = singletonList( + new BsonDocument("$source", new BsonString("test"))); + + @Test + void createDispatchesCreateOperation() { + TestOperationExecutor executor = new TestOperationExecutor(new ArrayList<>(Collections.singletonList(null))); + new StreamProcessorsImpl(executor, true).create(PROCESSOR_NAME, PIPELINE); + assertInstanceOf(CreateStreamProcessorOperation.class, executor.getWriteOperation()); + assertEquals(ReadConcern.DEFAULT, executor.getReadConcern()); + } + + @Test + void createWithOptionsDispatchesCreateOperation() { + TestOperationExecutor executor = new TestOperationExecutor(new ArrayList<>(Collections.singletonList(null))); + new StreamProcessorsImpl(executor, true).create(PROCESSOR_NAME, PIPELINE, + new CreateStreamProcessorOptions().tier("SP10")); + assertInstanceOf(CreateStreamProcessorOperation.class, executor.getWriteOperation()); + } + + @Test + void getReturnsStreamProcessorImpl() { + TestOperationExecutor executor = new TestOperationExecutor(new ArrayList<>()); + assertInstanceOf(StreamProcessorImpl.class, + new StreamProcessorsImpl(executor, true).get(PROCESSOR_NAME)); + } + + @Test + void getInfoDispatchesGetStreamProcessorOperation() { + StreamProcessorInfo info = minimalStreamProcessorInfo(); + TestOperationExecutor executor = new TestOperationExecutor(new ArrayList<>(Collections.singletonList(info))); + StreamProcessorInfo result = new StreamProcessorsImpl(executor, true).getInfo(PROCESSOR_NAME); + assertEquals(info, result); + assertInstanceOf(GetStreamProcessorOperation.class, executor.getReadOperation()); + assertEquals(ReadPreference.primary(), executor.getReadPreference()); + assertEquals(ReadConcern.DEFAULT, executor.getReadConcern()); + } + + private static StreamProcessorInfo minimalStreamProcessorInfo() { + return StreamProcessorInfo.builder() + .id("id-1") + .name(PROCESSOR_NAME) + .state("CREATED") + .pipeline(emptyList()) + .pipelineVersion(1) + .enableAutoScaling(false) + .failoverEnabled(false) + .activeRegion("US_EAST_1") + .workspaceDefaultRegion("US_EAST_1") + .modifiedBy("user@example.com") + .hasStarted(false) + .errorMsg("") + .errorRetryable(false) + .build(); + } +}