();
collection.drop(callback);
@@ -224,7 +224,7 @@ public void setUp() {
}
}
- String keyVaultNamespace = "admin.datakeys";
+ String keyVaultNamespace = "keyvault.datakeys";
if (cryptOptions.containsKey("keyVaultNamespace")) {
keyVaultNamespace = cryptOptions.getString("keyVaultNamespace").getValue();
}
diff --git a/driver-async/src/test/functional/com/mongodb/async/client/ClientSideEncryptionViewAreProhibitedTest.java b/driver-async/src/test/functional/com/mongodb/async/client/ClientSideEncryptionViewAreProhibitedTest.java
index d22b835add3..bad9e58540c 100644
--- a/driver-async/src/test/functional/com/mongodb/async/client/ClientSideEncryptionViewAreProhibitedTest.java
+++ b/driver-async/src/test/functional/com/mongodb/async/client/ClientSideEncryptionViewAreProhibitedTest.java
@@ -71,7 +71,7 @@ public void setUp() {
kmsProviders.put("local", localMasterkey);
AutoEncryptionSettings.Builder autoEncryptionSettingsBuilder = AutoEncryptionSettings.builder()
- .keyVaultNamespace("admin.datakeys")
+ .keyVaultNamespace("keyvault.datakeys")
.kmsProviders(kmsProviders);
AutoEncryptionSettings autoEncryptionSettings = autoEncryptionSettingsBuilder.build();
diff --git a/driver-async/src/test/functional/com/mongodb/async/client/MongoClientSessionSpecification.groovy b/driver-async/src/test/functional/com/mongodb/async/client/MongoClientSessionSpecification.groovy
index 693f158b713..37f45cb9370 100644
--- a/driver-async/src/test/functional/com/mongodb/async/client/MongoClientSessionSpecification.groovy
+++ b/driver-async/src/test/functional/com/mongodb/async/client/MongoClientSessionSpecification.groovy
@@ -39,7 +39,6 @@ import spock.lang.IgnoreIf
import java.util.concurrent.TimeUnit
import static com.mongodb.ClusterFixture.isDiscoverableReplicaSet
-import static com.mongodb.ClusterFixture.isStandalone
import static com.mongodb.ClusterFixture.serverVersionAtLeast
import static com.mongodb.async.client.Fixture.getMongoClient
import static com.mongodb.async.client.TestHelper.run
@@ -48,22 +47,13 @@ class MongoClientSessionSpecification extends FunctionalSpecification {
def 'should throw IllegalArgumentException if options are null'() {
when:
- Fixture.getMongoClient().startSession(null, Stub(SingleResultCallback))
+ getMongoClient().startSession(null, Stub(SingleResultCallback))
then:
thrown(IllegalArgumentException)
}
- @IgnoreIf({ serverVersionAtLeast(3, 6) && !isStandalone() })
- def 'should throw MongoClientException starting a session when sessions are not supported'() {
- when:
- startSession(ClientSessionOptions.builder().build())
-
- then:
- thrown(MongoClientException)
- }
-
- @IgnoreIf({ !serverVersionAtLeast(3, 6) || isStandalone() })
+ @IgnoreIf({ !serverVersionAtLeast(3, 6) })
def 'should create session with correct defaults'() {
when:
def options = ClientSessionOptions.builder().build()
@@ -71,7 +61,7 @@ class MongoClientSessionSpecification extends FunctionalSpecification {
then:
clientSession != null
- clientSession.getOriginator() == Fixture.getMongoClient()
+ clientSession.getOriginator() == getMongoClient()
clientSession.isCausallyConsistent()
clientSession.getOptions() == ClientSessionOptions.builder()
.defaultTransactionOptions(TransactionOptions.builder()
@@ -88,7 +78,7 @@ class MongoClientSessionSpecification extends FunctionalSpecification {
clientSession.close()
}
- @IgnoreIf({ !serverVersionAtLeast(3, 6) || isStandalone() })
+ @IgnoreIf({ !serverVersionAtLeast(3, 6) })
def 'cluster time should advance'() {
given:
def firstOperationTime = new BsonTimestamp(42, 1)
@@ -132,7 +122,7 @@ class MongoClientSessionSpecification extends FunctionalSpecification {
clientSession.close()
}
- @IgnoreIf({ !serverVersionAtLeast(3, 6) || isStandalone() })
+ @IgnoreIf({ !serverVersionAtLeast(3, 6) })
def 'operation time should advance'() {
given:
def firstOperationTime = new BsonTimestamp(42, 1)
@@ -173,7 +163,7 @@ class MongoClientSessionSpecification extends FunctionalSpecification {
clientSession.close()
}
- @IgnoreIf({ !serverVersionAtLeast(3, 6) || isStandalone() })
+ @IgnoreIf({ !serverVersionAtLeast(3, 6) })
def 'methods that use the session should throw if the session is closed'() {
given:
def options = ClientSessionOptions.builder().build()
@@ -202,7 +192,7 @@ class MongoClientSessionSpecification extends FunctionalSpecification {
clientSession.close()
}
- @IgnoreIf({ !serverVersionAtLeast(3, 6) || isStandalone() })
+ @IgnoreIf({ !serverVersionAtLeast(3, 6) })
def 'informational methods should not throw if the session is closed'() {
given:
def options = ClientSessionOptions.builder().build()
@@ -219,7 +209,7 @@ class MongoClientSessionSpecification extends FunctionalSpecification {
true
}
- @IgnoreIf({ !serverVersionAtLeast(3, 6) || isStandalone() })
+ @IgnoreIf({ !serverVersionAtLeast(3, 6) })
def 'should apply causally consistent session option to client session'() {
when:
def clientSession = startSession(ClientSessionOptions.builder().causallyConsistent(causallyConsistent).build())
@@ -235,7 +225,7 @@ class MongoClientSessionSpecification extends FunctionalSpecification {
causallyConsistent << [true, false]
}
- @IgnoreIf({ !serverVersionAtLeast(3, 6) || isStandalone() })
+ @IgnoreIf({ !serverVersionAtLeast(3, 6) })
def 'client session should have server session with valid identifier'() {
given:
def clientSession = startSession(ClientSessionOptions.builder().build())
@@ -254,7 +244,7 @@ class MongoClientSessionSpecification extends FunctionalSpecification {
clientSession.close()
}
- @IgnoreIf({ !serverVersionAtLeast(3, 6) || isStandalone() })
+ @IgnoreIf({ !serverVersionAtLeast(3, 6) })
def 'should use a default session'() {
given:
def commandListener = new TestCommandListener()
@@ -273,25 +263,7 @@ class MongoClientSessionSpecification extends FunctionalSpecification {
client?.close()
}
- @IgnoreIf({ serverVersionAtLeast(3, 6) && !isStandalone() })
- def 'should not use a default session when sessions are not supported'() {
- given:
- def commandListener = new TestCommandListener()
- def options = Fixture.getMongoClientBuilderFromConnectionString().addCommandListener(commandListener).build()
- def client = MongoClients.create(options)
-
- when:
- run(client.getDatabase('admin').&runCommand, new BsonDocument('ping', new BsonInt32(1)))
-
- then:
- def pingCommandStartedEvent = commandListener.events.get(0)
- !(pingCommandStartedEvent as CommandStartedEvent).command.containsKey('lsid')
-
- cleanup:
- client?.close()
- }
-
- @IgnoreIf({ !serverVersionAtLeast(3, 6) || isStandalone() })
+ @IgnoreIf({ !serverVersionAtLeast(3, 6) })
def 'should throw exception if unacknowledged write used with explicit session'() {
given:
def session = run(getMongoClient().&startSession)
@@ -334,7 +306,7 @@ class MongoClientSessionSpecification extends FunctionalSpecification {
// This test is inherently racy as it's possible that the server _does_ replicate fast enough and therefore the test passes anyway
// even if causal consistency was not actually in effect. For that reason the test iterates a number of times in order to increase
// confidence that it's really causal consistency that is causing the test to succeed
- @IgnoreIf({ !serverVersionAtLeast(3, 6) || isStandalone() })
+ @IgnoreIf({ !serverVersionAtLeast(3, 6) })
@Category(Slow)
def 'should find inserted document on a secondary when causal consistency is enabled'() {
given:
diff --git a/driver-core/src/main/com/mongodb/connection/netty/NettyStream.java b/driver-core/src/main/com/mongodb/connection/netty/NettyStream.java
index b8e5e8d0671..2eca8673730 100644
--- a/driver-core/src/main/com/mongodb/connection/netty/NettyStream.java
+++ b/driver-core/src/main/com/mongodb/connection/netty/NettyStream.java
@@ -24,10 +24,12 @@
import com.mongodb.MongoSocketOpenException;
import com.mongodb.MongoSocketReadTimeoutException;
import com.mongodb.ServerAddress;
+import com.mongodb.annotations.ThreadSafe;
import com.mongodb.connection.AsyncCompletionHandler;
import com.mongodb.connection.SocketSettings;
import com.mongodb.connection.SslSettings;
import com.mongodb.connection.Stream;
+import com.mongodb.lang.Nullable;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.CompositeByteBuf;
@@ -35,16 +37,16 @@
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
-import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
+import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.ssl.SslHandler;
import io.netty.handler.timeout.ReadTimeoutException;
-import io.netty.util.concurrent.EventExecutor;
import org.bson.ByteBuf;
import javax.net.ssl.SSLContext;
@@ -58,6 +60,8 @@
import java.util.List;
import java.util.Queue;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledFuture;
import static com.mongodb.internal.connection.SslHelper.enableHostNameVerification;
import static com.mongodb.internal.connection.SslHelper.enableSni;
@@ -65,9 +69,39 @@
/**
* A Stream implementation based on Netty 4.0.
+ * Just like it is for the {@link java.nio.channels.AsynchronousSocketChannel},
+ * concurrent pending1 readers
+ * (whether {@linkplain #read(int, int) synchronous} or {@linkplain #readAsync(int, AsyncCompletionHandler) asynchronous})
+ * are not supported by {@link NettyStream}.
+ * However, this class does not have a fail-fast mechanism checking for such situations.
+ *
+ * 1We cannot simply say that read methods are not allowed be run concurrently because strictly speaking they are allowed,
+ * as explained below.
+ * {@code
+ * NettyStream stream = ...;
+ * stream.readAsync(1, new AsyncCompletionHandler() {//inv1
+ * @Override
+ * public void completed(ByteBuf o) {
+ * stream.readAsync(//inv2
+ * 1, ...);//ret2
+ * }
+ *
+ * @Override
+ * public void failed(Throwable t) {
+ * }
+ * });//ret1
+ * }
+ * Arrows on the diagram below represent happens-before relations.
+ * {@code
+ * int1 -> inv2 -> ret2
+ * \--------> ret1
+ * }
+ * As shown on the diagram, the method {@link #readAsync(int, AsyncCompletionHandler)} runs concurrently with
+ * itself in the example above. However, there are no concurrent pending readers because the second operation
+ * is invoked after the first operation has completed reading despite the method has not returned yet.
*/
final class NettyStream implements Stream {
- private static final String READ_HANDLER_NAME = "ReadTimeoutHandler";
+ private static final byte NO_SCHEDULE_TIME = 0;
private final ServerAddress address;
private final SocketSettings settings;
private final SslSettings sslSettings;
@@ -79,8 +113,19 @@ final class NettyStream implements Stream {
private volatile Channel channel;
private final LinkedList pendingInboundBuffers = new LinkedList();
- private volatile PendingReader pendingReader;
- private volatile Throwable pendingException;
+ /* The fields pendingReader, pendingException are always written/read inside synchronized blocks
+ * that use the same NettyStream object, so they can be plain.*/
+ private PendingReader pendingReader;
+ private Throwable pendingException;
+ /* The fields readTimeoutTask, readTimeoutMillis are each written only in the ChannelInitializer.initChannel method
+ * (in addition to the write of the default value and the write by variable initializers),
+ * and read only when NettyStream users read data, or Netty event loop handles incoming data.
+ * Since actions done by the ChannelInitializer.initChannel method
+ * are ordered (in the happens-before order) before user read actions and before event loop actions that handle incoming data,
+ * these fields can be plain.*/
+ @Nullable
+ private ReadTimeoutTask readTimeoutTask;
+ private long readTimeoutMillis = NO_SCHEDULE_TIME;
NettyStream(final ServerAddress address, final SocketSettings settings, final SslSettings sslSettings, final EventLoopGroup workerGroup,
final Class extends SocketChannel> socketChannelClass, final ByteBufAllocator allocator) {
@@ -135,6 +180,7 @@ private void initializeChannel(final AsyncCompletionHandler handler, final
bootstrap.handler(new ChannelInitializer() {
@Override
public void initChannel(final SocketChannel ch) {
+ ChannelPipeline pipeline = ch.pipeline();
if (sslSettings.isEnabled()) {
SSLEngine engine = getSslContext().createSSLEngine(address.getHost(), address.getPort());
engine.setUseClientMode(true);
@@ -144,13 +190,20 @@ public void initChannel(final SocketChannel ch) {
enableHostNameVerification(sslParameters);
}
engine.setSSLParameters(sslParameters);
- ch.pipeline().addFirst("ssl", new SslHandler(engine, false));
+ pipeline.addFirst("ssl", new SslHandler(engine, false));
}
+
int readTimeout = settings.getReadTimeout(MILLISECONDS);
- if (readTimeout > 0) {
- ch.pipeline().addLast(READ_HANDLER_NAME, new ReadTimeoutHandler(readTimeout));
+ if (readTimeout > NO_SCHEDULE_TIME) {
+ readTimeoutMillis = readTimeout;
+ /* We need at least one handler before (in the inbound evaluation order) the InboundBufferHandler,
+ * so that we can fire exception events (they are inbound events) using its context and the InboundBufferHandler
+ * receives them. SslHandler is not always present, so adding a NOOP handler.*/
+ pipeline.addLast(new ChannelInboundHandlerAdapter());
+ readTimeoutTask = new ReadTimeoutTask(pipeline.lastContext());
}
- ch.pipeline().addLast(new InboundBufferHandler());
+
+ pipeline.addLast(new InboundBufferHandler());
}
});
final ChannelFuture channelFuture = bootstrap.connect(nextAddress);
@@ -193,14 +246,27 @@ public void operationComplete(final ChannelFuture future) throws Exception {
@Override
public void readAsync(final int numBytes, final AsyncCompletionHandler handler) {
- scheduleReadTimeout();
+ readAsync(numBytes, handler, readTimeoutMillis);
+ }
+
+ /**
+ * @param numBytes Must be equal to {@link #pendingReader}{@code .numBytes} when called by a Netty channel handler.
+ * @param handler Must be equal to {@link #pendingReader}{@code .handler} when called by a Netty channel handler.
+ * @param readTimeoutMillis Must be equal to {@link #NO_SCHEDULE_TIME} when called by a Netty channel handler.
+ * Timeouts may be scheduled only by the public read methods. Taking into account that concurrent pending
+ * readers are not allowed, there must not be a situation when threads attempt to schedule a timeout
+ * before the previous one is either cancelled or completed.
+ */
+ private void readAsync(final int numBytes, final AsyncCompletionHandler handler, final long readTimeoutMillis) {
ByteBuf buffer = null;
Throwable exceptionResult = null;
synchronized (this) {
exceptionResult = pendingException;
if (exceptionResult == null) {
if (!hasBytesAvailable(numBytes)) {
- pendingReader = new PendingReader(numBytes, handler);
+ if (pendingReader == null) {//called by a public read method
+ pendingReader = new PendingReader(numBytes, handler, scheduleReadTimeout(readTimeoutTask, readTimeoutMillis));
+ }
} else {
CompositeByteBuf composite = allocator.compositeBuffer(pendingInboundBuffers.size());
int bytesNeeded = numBytes;
@@ -223,13 +289,16 @@ public void readAsync(final int numBytes, final AsyncCompletionHandler
buffer = new NettyByteBuf(composite).flip();
}
}
+ if (!(exceptionResult == null && buffer == null)//the read operation has completed
+ && pendingReader != null) {//we need to clear the pending reader
+ cancel(pendingReader.timeout);
+ this.pendingReader = null;
+ }
}
if (exceptionResult != null) {
- disableReadTimeout();
handler.failed(exceptionResult);
}
if (buffer != null) {
- disableReadTimeout();
handler.completed(buffer);
}
}
@@ -253,14 +322,12 @@ private void handleReadResponse(final io.netty.buffer.ByteBuf buffer, final Thro
} else {
pendingException = t;
}
- if (pendingReader != null) {
- localPendingReader = pendingReader;
- pendingReader = null;
- }
+ localPendingReader = pendingReader;
}
if (localPendingReader != null) {
- readAsync(localPendingReader.numBytes, localPendingReader.handler);
+ //timeouts may be scheduled only by the public read methods
+ readAsync(localPendingReader.numBytes, localPendingReader.handler, NO_SCHEDULE_TIME);
}
}
@@ -336,10 +403,14 @@ public void exceptionCaught(final ChannelHandlerContext ctx, final Throwable t)
private static final class PendingReader {
private final int numBytes;
private final AsyncCompletionHandler handler;
+ @Nullable
+ private final ScheduledFuture> timeout;
- private PendingReader(final int numBytes, final AsyncCompletionHandler handler) {
+ private PendingReader(
+ final int numBytes, final AsyncCompletionHandler handler, @Nullable final ScheduledFuture> timeout) {
this.numBytes = numBytes;
this.handler = handler;
+ this.timeout = timeout;
}
}
@@ -423,44 +494,52 @@ public void operationComplete(final ChannelFuture future) {
}
}
- private void scheduleReadTimeout() {
- adjustTimeout(false);
+ private static void cancel(@Nullable final Future> f) {
+ if (f != null) {
+ f.cancel(false);
+ }
}
- private void disableReadTimeout() {
- adjustTimeout(true);
+ private static long combinedTimeout(final long timeout, final int additionalTimeout) {
+ if (timeout == NO_SCHEDULE_TIME) {
+ return NO_SCHEDULE_TIME;
+ } else {
+ return Math.addExact(timeout, additionalTimeout);
+ }
}
- private void adjustTimeout(final boolean disable) {
- ChannelHandler timeoutHandler = channel.pipeline().get(READ_HANDLER_NAME);
- if (timeoutHandler != null) {
- final ReadTimeoutHandler readTimeoutHandler = (ReadTimeoutHandler) timeoutHandler;
- final ChannelHandlerContext handlerContext = channel.pipeline().context(timeoutHandler);
- EventExecutor executor = handlerContext.executor();
+ private static ScheduledFuture> scheduleReadTimeout(@Nullable final ReadTimeoutTask readTimeoutTask, final long timeoutMillis) {
+ if (timeoutMillis == NO_SCHEDULE_TIME) {
+ return null;
+ } else {
+ //assert readTimeoutTask != null : "readTimeoutTask must be initialized if read timeouts are enabled";
+ return readTimeoutTask.schedule(timeoutMillis);
+ }
+ }
- if (disable) {
- if (executor.inEventLoop()) {
- readTimeoutHandler.removeTimeout(handlerContext);
- } else {
- executor.submit(new Runnable() {
- @Override
- public void run() {
- readTimeoutHandler.removeTimeout(handlerContext);
- }
- });
- }
- } else {
- if (executor.inEventLoop()) {
- readTimeoutHandler.scheduleTimeout(handlerContext);
- } else {
- executor.submit(new Runnable() {
- @Override
- public void run() {
- readTimeoutHandler.scheduleTimeout(handlerContext);
- }
- });
- }
+ @ThreadSafe
+ private static final class ReadTimeoutTask implements Runnable {
+ private final ChannelHandlerContext ctx;
+
+ private ReadTimeoutTask(final ChannelHandlerContext timeoutChannelHandlerContext) {
+ ctx = timeoutChannelHandlerContext;
+ }
+
+ @Override
+ public void run() {
+ try {
+ if (ctx.channel().isOpen()) {
+ ctx.fireExceptionCaught(ReadTimeoutException.INSTANCE);
+ ctx.close();
}
+ } catch (final Throwable t) {
+ ctx.fireExceptionCaught(t);
}
+ }
+
+ private ScheduledFuture> schedule(final long timeoutMillis) {
+ //assert timeoutMillis > 0 : timeoutMillis;
+ return ctx.executor().schedule(this, timeoutMillis, MILLISECONDS);
+ }
}
}
diff --git a/driver-core/src/main/com/mongodb/connection/netty/ReadTimeoutHandler.java b/driver-core/src/main/com/mongodb/connection/netty/ReadTimeoutHandler.java
deleted file mode 100644
index 4b4533f3cde..00000000000
--- a/driver-core/src/main/com/mongodb/connection/netty/ReadTimeoutHandler.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Copyright 2008-present MongoDB, Inc.
- * Copyright 2012 The Netty Project
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.mongodb.connection.netty;
-
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelInboundHandlerAdapter;
-import io.netty.handler.timeout.ReadTimeoutException;
-
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
-
-import static com.mongodb.assertions.Assertions.isTrue;
-import static com.mongodb.assertions.Assertions.isTrueArgument;
-
-/**
- * Passes a {@link ReadTimeoutException} if the time between a {@link #scheduleTimeout} and {@link #removeTimeout} is longer than the set
- * timeout.
- */
-final class ReadTimeoutHandler extends ChannelInboundHandlerAdapter {
- private final long readTimeout;
- private volatile ScheduledFuture> timeout;
-
- ReadTimeoutHandler(final long readTimeout) {
- isTrueArgument("readTimeout must be greater than zero.", readTimeout > 0);
- this.readTimeout = readTimeout;
- }
-
- void scheduleTimeout(final ChannelHandlerContext ctx) {
- isTrue("Handler called from the eventLoop", ctx.channel().eventLoop().inEventLoop());
- if (timeout == null) {
- timeout = ctx.executor().schedule(new ReadTimeoutTask(ctx), readTimeout, TimeUnit.MILLISECONDS);
- }
- }
-
- void removeTimeout(final ChannelHandlerContext ctx) {
- isTrue("Handler called from the eventLoop", ctx.channel().eventLoop().inEventLoop());
- if (timeout != null) {
- timeout.cancel(false);
- timeout = null;
- }
- }
-
- private static final class ReadTimeoutTask implements Runnable {
-
- private final ChannelHandlerContext ctx;
-
- ReadTimeoutTask(final ChannelHandlerContext ctx) {
- this.ctx = ctx;
- }
-
- @Override
- public void run() {
- if (ctx.channel().isOpen()) {
- try {
- ctx.fireExceptionCaught(ReadTimeoutException.INSTANCE);
- ctx.close();
- } catch (Throwable t) {
- ctx.fireExceptionCaught(t);
- }
- }
- }
- }
-}
diff --git a/driver-core/src/main/com/mongodb/internal/connection/InternalStreamConnection.java b/driver-core/src/main/com/mongodb/internal/connection/InternalStreamConnection.java
index 63d6e05cc22..b1faa79fd6d 100644
--- a/driver-core/src/main/com/mongodb/internal/connection/InternalStreamConnection.java
+++ b/driver-core/src/main/com/mongodb/internal/connection/InternalStreamConnection.java
@@ -145,37 +145,36 @@ public void openAsync(final SingleResultCallback callback) {
isTrue("Open already called", stream == null, callback);
try {
stream = streamFactory.create(serverId.getAddress());
- } catch (Throwable t) {
- callback.onResult(null, t);
- return;
- }
- stream.openAsync(new AsyncCompletionHandler() {
- @Override
- public void completed(final Void aVoid) {
- connectionInitializer.initializeAsync(InternalStreamConnection.this, new SingleResultCallback() {
- @Override
- public void onResult(final ConnectionDescription result, final Throwable t) {
- if (t != null) {
- close();
- callback.onResult(null, t);
- } else {
- description = result;
- opened.set(true);
- sendCompressor = findSendCompressor(description);
- if (LOGGER.isInfoEnabled()) {
- LOGGER.info(format("Opened connection [%s] to %s", getId(), serverId.getAddress()));
+ stream.openAsync(new AsyncCompletionHandler() {
+ @Override
+ public void completed(final Void aVoid) {
+ connectionInitializer.initializeAsync(InternalStreamConnection.this, new SingleResultCallback() {
+ @Override
+ public void onResult(final ConnectionDescription result, final Throwable t) {
+ if (t != null) {
+ close();
+ callback.onResult(null, t);
+ } else {
+ description = result;
+ opened.set(true);
+ sendCompressor = findSendCompressor(description);
+ if (LOGGER.isInfoEnabled()) {
+ LOGGER.info(format("Opened connection [%s] to %s", getId(), serverId.getAddress()));
+ }
+ callback.onResult(null, null);
}
- callback.onResult(null, null);
}
- }
- });
- }
+ });
+ }
- @Override
- public void failed(final Throwable t) {
- callback.onResult(null, t);
- }
- });
+ @Override
+ public void failed(final Throwable t) {
+ callback.onResult(null, t);
+ }
+ });
+ } catch (Throwable t) {
+ callback.onResult(null, t);
+ }
}
private Map createCompressorMap(final List compressorList) {
diff --git a/driver-core/src/main/com/mongodb/operation/ChangeStreamBatchCursor.java b/driver-core/src/main/com/mongodb/operation/ChangeStreamBatchCursor.java
index 3f553893b71..16c373af866 100644
--- a/driver-core/src/main/com/mongodb/operation/ChangeStreamBatchCursor.java
+++ b/driver-core/src/main/com/mongodb/operation/ChangeStreamBatchCursor.java
@@ -61,7 +61,11 @@ public boolean hasNext() {
return resumeableOperation(new Function, Boolean>() {
@Override
public Boolean apply(final AggregateResponseBatchCursor queryBatchCursor) {
- return queryBatchCursor.hasNext();
+ try {
+ return queryBatchCursor.hasNext();
+ } finally {
+ cachePostBatchResumeToken(queryBatchCursor);
+ }
}
});
}
@@ -71,9 +75,11 @@ public List next() {
return resumeableOperation(new Function, List>() {
@Override
public List apply(final AggregateResponseBatchCursor queryBatchCursor) {
- List results = convertResults(queryBatchCursor.next());
- cachePostBatchResumeToken(queryBatchCursor);
- return results;
+ try {
+ return convertResults(queryBatchCursor.next());
+ } finally {
+ cachePostBatchResumeToken(queryBatchCursor);
+ }
}
});
}
@@ -83,9 +89,11 @@ public List tryNext() {
return resumeableOperation(new Function, List>() {
@Override
public List apply(final AggregateResponseBatchCursor queryBatchCursor) {
- List results = convertResults(queryBatchCursor.tryNext());
- cachePostBatchResumeToken(queryBatchCursor);
- return results;
+ try {
+ return convertResults(queryBatchCursor.tryNext());
+ } finally {
+ cachePostBatchResumeToken(queryBatchCursor);
+ }
}
});
}
diff --git a/driver-core/src/main/com/mongodb/operation/QueryBatchCursor.java b/driver-core/src/main/com/mongodb/operation/QueryBatchCursor.java
index ba3efdf053e..895fe68110b 100644
--- a/driver-core/src/main/com/mongodb/operation/QueryBatchCursor.java
+++ b/driver-core/src/main/com/mongodb/operation/QueryBatchCursor.java
@@ -99,6 +99,7 @@ class QueryBatchCursor implements AggregateResponseBatchCursor {
this.decoder = notNull("decoder", decoder);
if (result != null) {
this.operationTime = result.getTimestamp(OPERATION_TIME, null);
+ this.postBatchResumeToken = getPostBatchResumeTokenFromResponse(result);
}
if (firstQueryResult.getCursor() != null) {
notNull("connectionSource", connectionSource);
diff --git a/driver-core/src/test/functional/com/mongodb/OperationFunctionalSpecification.groovy b/driver-core/src/test/functional/com/mongodb/OperationFunctionalSpecification.groovy
index 3defca55427..1d968ceebd9 100644
--- a/driver-core/src/test/functional/com/mongodb/OperationFunctionalSpecification.groovy
+++ b/driver-core/src/test/functional/com/mongodb/OperationFunctionalSpecification.groovy
@@ -157,21 +157,32 @@ class OperationFunctionalSpecification extends Specification {
}
def next(cursor, boolean async, int minimumCount) {
+ next(cursor, async, false, minimumCount)
+ }
+
+ def next(cursor, boolean async, boolean callHasNextBeforeNext, int minimumCount) {
List retVal = []
while (retVal.size() < minimumCount) {
- retVal.addAll(next(cursor, async))
+ retVal.addAll(doNext(cursor, async, callHasNextBeforeNext))
}
retVal
}
def next(cursor, boolean async) {
+ doNext(cursor, async, false)
+ }
+
+ def doNext(cursor, boolean async, boolean callHasNextBeforeNext) {
if (async) {
def futureResultCallback = new FutureResultCallback>()
cursor.next(futureResultCallback)
futureResultCallback.get(TIMEOUT, TimeUnit.SECONDS)
} else {
+ if (callHasNextBeforeNext) {
+ cursor.hasNext()
+ }
cursor.next()
}
}
diff --git a/driver-core/src/test/functional/com/mongodb/client/model/IndexesFunctionalSpecification.groovy b/driver-core/src/test/functional/com/mongodb/client/model/IndexesFunctionalSpecification.groovy
index cc1f05ce364..7a33ccf3413 100644
--- a/driver-core/src/test/functional/com/mongodb/client/model/IndexesFunctionalSpecification.groovy
+++ b/driver-core/src/test/functional/com/mongodb/client/model/IndexesFunctionalSpecification.groovy
@@ -17,7 +17,9 @@
package com.mongodb.client.model
import com.mongodb.OperationFunctionalSpecification
+import spock.lang.IgnoreIf
+import static com.mongodb.ClusterFixture.serverVersionGreaterThan
import static com.mongodb.client.model.Indexes.ascending
import static com.mongodb.client.model.Indexes.compoundIndex
import static com.mongodb.client.model.Indexes.descending
@@ -98,6 +100,7 @@ class IndexesFunctionalSpecification extends OperationFunctionalSpecification {
getCollectionHelper().listIndexes()*.get('key').contains(parse('{x : "2d"}'))
}
+ @IgnoreIf({ serverVersionGreaterThan('4.4') })
def 'geoHaystack'() {
when:
getCollectionHelper().createIndex(geoHaystack('x', descending('b')), 2.0)
diff --git a/driver-core/src/test/functional/com/mongodb/operation/CreateIndexesOperationSpecification.groovy b/driver-core/src/test/functional/com/mongodb/operation/CreateIndexesOperationSpecification.groovy
index 9eeee686d65..5ff94ccceb8 100644
--- a/driver-core/src/test/functional/com/mongodb/operation/CreateIndexesOperationSpecification.groovy
+++ b/driver-core/src/test/functional/com/mongodb/operation/CreateIndexesOperationSpecification.groovy
@@ -39,6 +39,7 @@ import static com.mongodb.ClusterFixture.getBinding
import static com.mongodb.ClusterFixture.isDiscoverableReplicaSet
import static com.mongodb.ClusterFixture.isSharded
import static com.mongodb.ClusterFixture.serverVersionAtLeast
+import static com.mongodb.ClusterFixture.serverVersionGreaterThan
import static java.util.concurrent.TimeUnit.SECONDS
class CreateIndexesOperationSpecification extends OperationFunctionalSpecification {
@@ -291,6 +292,7 @@ class CreateIndexesOperationSpecification extends OperationFunctionalSpecificati
async << [true, false]
}
+ @IgnoreIf({ serverVersionGreaterThan('4.4') })
def 'should be able to create a geoHaystack indexes'() {
given:
def operation = new CreateIndexesOperation(getNamespace(),
diff --git a/driver-core/src/test/functional/com/mongodb/operation/FindOperationSpecification.groovy b/driver-core/src/test/functional/com/mongodb/operation/FindOperationSpecification.groovy
index 15a91b5eead..b4f2260cb11 100644
--- a/driver-core/src/test/functional/com/mongodb/operation/FindOperationSpecification.groovy
+++ b/driver-core/src/test/functional/com/mongodb/operation/FindOperationSpecification.groovy
@@ -61,6 +61,7 @@ import static com.mongodb.ClusterFixture.getBinding
import static com.mongodb.ClusterFixture.getCluster
import static com.mongodb.ClusterFixture.isSharded
import static com.mongodb.ClusterFixture.serverVersionAtLeast
+import static com.mongodb.ClusterFixture.serverVersionGreaterThan
import static com.mongodb.CursorType.NonTailable
import static com.mongodb.CursorType.Tailable
import static com.mongodb.CursorType.TailableAwait
@@ -697,7 +698,7 @@ class FindOperationSpecification extends OperationFunctionalSpecification {
].combinations()
}
-
+ @IgnoreIf({ serverVersionGreaterThan('4.4') && isSharded() })
def 'should explain with $explain modifier'() {
given:
def operation = new FindOperation(getNamespace(), new BsonDocumentCodec())
diff --git a/driver-core/src/test/resources/sessions/README.rst b/driver-core/src/test/resources/sessions/README.rst
new file mode 100644
index 00000000000..3ed7eea96a4
--- /dev/null
+++ b/driver-core/src/test/resources/sessions/README.rst
@@ -0,0 +1,84 @@
+====================
+Driver Session Tests
+====================
+
+.. contents::
+
+----
+
+Introduction
+============
+
+The YAML and JSON files in this directory are platform-independent tests that
+drivers can use to prove their conformance to the Driver Sessions Spec. They are
+designed with the intention of sharing most test-runner code with the
+Transactions spec tests.
+
+Several prose tests, which are not easily expressed in YAML, are also presented
+in the Driver Sessions Spec. Those tests will need to be manually implemented
+by each driver.
+
+Test Format
+===========
+
+The same as the `Transactions Spec Test format
+<../../transactions/tests/README.rst#test-format>`_.
+
+Special Test Operations
+```````````````````````
+
+Certain operations that appear in the "operations" array do not correspond to
+API methods but instead represent special test operations. Such operations are
+defined on the "testRunner" object and are documented in the
+`Transactions Spec Test
+<../../transactions/tests/README.rst#special-test-operations>`_.
+Additional, session test specific operations are documented here:
+
+assertDifferentLsidOnLastTwoCommands
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+The "assertDifferentLsidOnLastTwoCommands" operation instructs the test runner
+to assert that the last two command started events from the test's MongoClient
+have different "lsid" fields. This assertion is used to ensure that dirty
+server sessions are discarded from the pool::
+
+ - name: assertDifferentLsidOnLastTwoCommands
+ object: testRunner
+
+assertSameLsidOnLastTwoCommands
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+The "assertSameLsidOnLastTwoCommands" operation instructs the test runner
+to assert that the last two command started events from the test's MongoClient
+have the same "lsid" field. This assertion is used to ensure that non-dirty
+server sessions are not discarded from the pool::
+
+ - name: assertSameLsidOnLastTwoCommands
+ object: testRunner
+
+assertSessionDirty
+~~~~~~~~~~~~~~~~~~
+
+The "assertSessionDirty" operation instructs the test runner to assert that
+the given session is marked dirty::
+
+ - name: assertSessionDirty
+ object: testRunner
+ arguments:
+ session: session0
+
+assertSessionNotDirty
+~~~~~~~~~~~~~~~~~~~~~
+
+The "assertSessionNotDirty" operation instructs the test runner to assert that
+the given session is *not* marked dirty::
+
+ - name: assertSessionNotDirty
+ object: testRunner
+ arguments:
+ session: session0
+
+Changelog
+=========
+
+:2019-05-15: Initial version.
diff --git a/driver-core/src/test/resources/sessions/dirty-session-errors.json b/driver-core/src/test/resources/sessions/dirty-session-errors.json
index a964b208c51..77f71c7623e 100644
--- a/driver-core/src/test/resources/sessions/dirty-session-errors.json
+++ b/driver-core/src/test/resources/sessions/dirty-session-errors.json
@@ -21,171 +21,6 @@
}
],
"tests": [
- {
- "description": "Clean explicit session is not discarded",
- "operations": [
- {
- "name": "assertSessionNotDirty",
- "object": "testRunner",
- "arguments": {
- "session": "session0"
- }
- },
- {
- "name": "insertOne",
- "object": "collection",
- "arguments": {
- "session": "session0",
- "document": {
- "_id": 2
- }
- },
- "result": {
- "insertedId": 2
- }
- },
- {
- "name": "assertSessionNotDirty",
- "object": "testRunner",
- "arguments": {
- "session": "session0"
- }
- },
- {
- "name": "endSession",
- "object": "session0"
- },
- {
- "name": "find",
- "object": "collection",
- "arguments": {
- "filter": {
- "_id": -1
- }
- },
- "result": []
- },
- {
- "name": "assertSameLsidOnLastTwoCommands",
- "object": "testRunner"
- }
- ],
- "expectations": [
- {
- "command_started_event": {
- "command": {
- "insert": "test",
- "documents": [
- {
- "_id": 2
- }
- ],
- "ordered": true,
- "lsid": "session0"
- },
- "command_name": "insert",
- "database_name": "session-tests"
- }
- },
- {
- "command_started_event": {
- "command": {
- "find": "test",
- "filter": {
- "_id": -1
- },
- "lsid": "session0"
- },
- "command_name": "find",
- "database_name": "session-tests"
- }
- }
- ],
- "outcome": {
- "collection": {
- "data": [
- {
- "_id": 1
- },
- {
- "_id": 2
- }
- ]
- }
- }
- },
- {
- "description": "Clean implicit session is not discarded",
- "operations": [
- {
- "name": "insertOne",
- "object": "collection",
- "arguments": {
- "document": {
- "_id": 2
- }
- },
- "result": {
- "insertedId": 2
- }
- },
- {
- "name": "find",
- "object": "collection",
- "arguments": {
- "filter": {
- "_id": -1
- }
- },
- "result": []
- },
- {
- "name": "assertSameLsidOnLastTwoCommands",
- "object": "testRunner"
- }
- ],
- "expectations": [
- {
- "command_started_event": {
- "command": {
- "insert": "test",
- "documents": [
- {
- "_id": 2
- }
- ],
- "ordered": true
- },
- "command_name": "insert",
- "database_name": "session-tests"
- }
- },
- {
- "command_started_event": {
- "command": {
- "find": "test",
- "filter": {
- "_id": -1
- }
- },
- "command_name": "find",
- "database_name": "session-tests"
- }
- }
- ],
- "outcome": {
- "collection": {
- "data": [
- {
- "_id": 1
- },
- {
- "_id": 2
- }
- ]
- }
- }
- },
{
"description": "Dirty explicit session is discarded",
"clientOptions": {
@@ -833,4 +668,4 @@
}
}
]
-}
\ No newline at end of file
+}
diff --git a/driver-core/src/test/resources/sessions/server-support.json b/driver-core/src/test/resources/sessions/server-support.json
new file mode 100644
index 00000000000..967c9143fd9
--- /dev/null
+++ b/driver-core/src/test/resources/sessions/server-support.json
@@ -0,0 +1,181 @@
+{
+ "runOn": [
+ {
+ "minServerVersion": "3.6.0"
+ }
+ ],
+ "database_name": "session-tests",
+ "collection_name": "test",
+ "data": [
+ {
+ "_id": 1
+ }
+ ],
+ "tests": [
+ {
+ "description": "Server supports explicit sessions",
+ "operations": [
+ {
+ "name": "assertSessionNotDirty",
+ "object": "testRunner",
+ "arguments": {
+ "session": "session0"
+ }
+ },
+ {
+ "name": "insertOne",
+ "object": "collection",
+ "arguments": {
+ "session": "session0",
+ "document": {
+ "_id": 2
+ }
+ },
+ "result": {
+ "insertedId": 2
+ }
+ },
+ {
+ "name": "assertSessionNotDirty",
+ "object": "testRunner",
+ "arguments": {
+ "session": "session0"
+ }
+ },
+ {
+ "name": "endSession",
+ "object": "session0"
+ },
+ {
+ "name": "find",
+ "object": "collection",
+ "arguments": {
+ "filter": {
+ "_id": -1
+ }
+ },
+ "result": []
+ },
+ {
+ "name": "assertSameLsidOnLastTwoCommands",
+ "object": "testRunner"
+ }
+ ],
+ "expectations": [
+ {
+ "command_started_event": {
+ "command": {
+ "insert": "test",
+ "documents": [
+ {
+ "_id": 2
+ }
+ ],
+ "ordered": true,
+ "lsid": "session0"
+ },
+ "command_name": "insert",
+ "database_name": "session-tests"
+ }
+ },
+ {
+ "command_started_event": {
+ "command": {
+ "find": "test",
+ "filter": {
+ "_id": -1
+ },
+ "lsid": "session0"
+ },
+ "command_name": "find",
+ "database_name": "session-tests"
+ }
+ }
+ ],
+ "outcome": {
+ "collection": {
+ "data": [
+ {
+ "_id": 1
+ },
+ {
+ "_id": 2
+ }
+ ]
+ }
+ }
+ },
+ {
+ "description": "Server supports implicit sessions",
+ "operations": [
+ {
+ "name": "insertOne",
+ "object": "collection",
+ "arguments": {
+ "document": {
+ "_id": 2
+ }
+ },
+ "result": {
+ "insertedId": 2
+ }
+ },
+ {
+ "name": "find",
+ "object": "collection",
+ "arguments": {
+ "filter": {
+ "_id": -1
+ }
+ },
+ "result": []
+ },
+ {
+ "name": "assertSameLsidOnLastTwoCommands",
+ "object": "testRunner"
+ }
+ ],
+ "expectations": [
+ {
+ "command_started_event": {
+ "command": {
+ "insert": "test",
+ "documents": [
+ {
+ "_id": 2
+ }
+ ],
+ "ordered": true
+ },
+ "command_name": "insert",
+ "database_name": "session-tests"
+ }
+ },
+ {
+ "command_started_event": {
+ "command": {
+ "find": "test",
+ "filter": {
+ "_id": -1
+ }
+ },
+ "command_name": "find",
+ "database_name": "session-tests"
+ }
+ }
+ ],
+ "outcome": {
+ "collection": {
+ "data": [
+ {
+ "_id": 1
+ },
+ {
+ "_id": 2
+ }
+ ]
+ }
+ }
+ }
+ ]
+}
diff --git a/driver-legacy/src/test/functional/com/mongodb/MongoClientSessionSpecification.groovy b/driver-legacy/src/test/functional/com/mongodb/MongoClientSessionSpecification.groovy
index 544b951a320..856111aef00 100644
--- a/driver-legacy/src/test/functional/com/mongodb/MongoClientSessionSpecification.groovy
+++ b/driver-legacy/src/test/functional/com/mongodb/MongoClientSessionSpecification.groovy
@@ -34,7 +34,6 @@ import static Fixture.getDefaultDatabaseName
import static Fixture.getMongoClientURI
import static com.mongodb.ClusterFixture.isAuthenticated
import static com.mongodb.ClusterFixture.isDiscoverableReplicaSet
-import static com.mongodb.ClusterFixture.isStandalone
import static com.mongodb.ClusterFixture.serverVersionAtLeast
import static com.mongodb.Fixture.getMongoClient
import static com.mongodb.MongoCredential.createCredential
@@ -49,22 +48,7 @@ class MongoClientSessionSpecification extends FunctionalSpecification {
thrown(IllegalArgumentException)
}
- @IgnoreIf({ serverVersionAtLeast(3, 6) && !isStandalone() })
- def 'should throw MongoClientException starting a session when sessions are not supported'() {
- when:
- getMongoClient().startSession()
-
- then:
- thrown(MongoClientException)
-
- when:
- getMongoClient().startSession(ClientSessionOptions.builder().build())
-
- then:
- thrown(MongoClientException)
- }
-
- @IgnoreIf({ !serverVersionAtLeast(3, 6) || isStandalone() })
+ @IgnoreIf({ !serverVersionAtLeast(3, 6) })
def 'should create session with correct defaults'() {
given:
def clientSession = getMongoClient().startSession()
@@ -87,7 +71,7 @@ class MongoClientSessionSpecification extends FunctionalSpecification {
clientSession?.close()
}
- @IgnoreIf({ !serverVersionAtLeast(4, 0) || isStandalone() })
+ @IgnoreIf({ !serverVersionAtLeast(4, 0) })
def 'should use mutated client properties for default transaction options'() {
given:
def originalWriteConcern = getMongoClient().getWriteConcern()
@@ -111,7 +95,7 @@ class MongoClientSessionSpecification extends FunctionalSpecification {
clientSession?.close()
}
- @IgnoreIf({ !serverVersionAtLeast(3, 6) || isStandalone() })
+ @IgnoreIf({ !serverVersionAtLeast(3, 6) })
def 'cluster time should advance'() {
given:
def firstOperationTime = new BsonTimestamp(42, 1)
@@ -155,7 +139,7 @@ class MongoClientSessionSpecification extends FunctionalSpecification {
clientSession?.close()
}
- @IgnoreIf({ !serverVersionAtLeast(3, 6) || isStandalone() })
+ @IgnoreIf({ !serverVersionAtLeast(3, 6) })
def 'operation time should advance'() {
given:
def firstOperationTime = new BsonTimestamp(42, 1)
@@ -196,7 +180,7 @@ class MongoClientSessionSpecification extends FunctionalSpecification {
clientSession?.close()
}
- @IgnoreIf({ !serverVersionAtLeast(3, 6) || isStandalone() })
+ @IgnoreIf({ !serverVersionAtLeast(3, 6) })
def 'methods that use the session should throw if the session is closed'() {
given:
def options = ClientSessionOptions.builder().build()
@@ -225,7 +209,7 @@ class MongoClientSessionSpecification extends FunctionalSpecification {
clientSession?.close()
}
- @IgnoreIf({ !serverVersionAtLeast(3, 6) || isStandalone() })
+ @IgnoreIf({ !serverVersionAtLeast(3, 6) })
def 'informational methods should not throw if the session is closed'() {
given:
def options = ClientSessionOptions.builder().build()
@@ -245,7 +229,7 @@ class MongoClientSessionSpecification extends FunctionalSpecification {
clientSession?.close()
}
- @IgnoreIf({ !serverVersionAtLeast(3, 6) || isStandalone() })
+ @IgnoreIf({ !serverVersionAtLeast(3, 6) })
def 'should apply causally consistent session option to client session'() {
when:
def clientSession = getMongoClient().startSession(ClientSessionOptions.builder()
@@ -263,7 +247,7 @@ class MongoClientSessionSpecification extends FunctionalSpecification {
causallyConsistent << [true, false]
}
- @IgnoreIf({ !serverVersionAtLeast(3, 6) || isStandalone() })
+ @IgnoreIf({ !serverVersionAtLeast(3, 6) })
def 'client session should have server session with valid identifier'() {
given:
def clientSession = getMongoClient().startSession(ClientSessionOptions.builder().build())
@@ -282,7 +266,7 @@ class MongoClientSessionSpecification extends FunctionalSpecification {
clientSession?.close()
}
- @IgnoreIf({ !serverVersionAtLeast(3, 6) || isStandalone() })
+ @IgnoreIf({ !serverVersionAtLeast(3, 6) })
def 'should use a default session'() {
given:
def commandListener = new TestCommandListener()
@@ -302,24 +286,6 @@ class MongoClientSessionSpecification extends FunctionalSpecification {
client?.close()
}
- @IgnoreIf({ serverVersionAtLeast(3, 6) && !isStandalone() })
- def 'should not use a default session when sessions are not supported'() {
- given:
- def commandListener = new TestCommandListener()
- def optionsBuilder = MongoClientOptions.builder()
- .addCommandListener(commandListener)
- def client = new MongoClient(getMongoClientURI(optionsBuilder))
-
- when:
- client.getDatabase('admin').runCommand(new BsonDocument('ping', new BsonInt32(1)))
-
- then:
- def pingCommandStartedEvent = commandListener.events.get(0)
- !(pingCommandStartedEvent as CommandStartedEvent).command.containsKey('lsid')
- cleanup:
- client?.close()
- }
-
// This test attempts attempts to demonstrate that causal consistency works correctly by inserting a document and then immediately
// searching for that document on a secondary by its _id and failing the test if the document is not found. Without causal consistency
// enabled the expectation is that eventually that test would fail since generally the find will execute on the secondary before
@@ -327,7 +293,7 @@ class MongoClientSessionSpecification extends FunctionalSpecification {
// This test is inherently racy as it's possible that the server _does_ replicate fast enough and therefore the test passes anyway
// even if causal consistency was not actually in effect. For that reason the test iterates a number of times in order to increase
// confidence that it's really causal consistency that is causing the test to succeed
- @IgnoreIf({ !serverVersionAtLeast(3, 6) || isStandalone() })
+ @IgnoreIf({ !serverVersionAtLeast(3, 6) })
@Category(Slow)
def 'should find inserted document on a secondary when causal consistency is enabled'() {
given:
@@ -410,7 +376,7 @@ class MongoClientSessionSpecification extends FunctionalSpecification {
client?.close()
}
- @IgnoreIf({ !serverVersionAtLeast(3, 6) || isStandalone() })
+ @IgnoreIf({ !serverVersionAtLeast(3, 6) })
def 'should throw exception if unacknowledged write used with explicit session'() {
given:
def session = getMongoClient().startSession()
diff --git a/driver-sync/src/examples/tour/ClientSideEncryptionAutoEncryptionSettingsTour.java b/driver-sync/src/examples/tour/ClientSideEncryptionAutoEncryptionSettingsTour.java
index c1680193c4d..398f89084c4 100644
--- a/driver-sync/src/examples/tour/ClientSideEncryptionAutoEncryptionSettingsTour.java
+++ b/driver-sync/src/examples/tour/ClientSideEncryptionAutoEncryptionSettingsTour.java
@@ -59,7 +59,7 @@ public static void main(final String[] args) {
}});
}};
- String keyVaultNamespace = "admin.datakeys";
+ String keyVaultNamespace = "keyvault.datakeys";
ClientEncryptionSettings clientEncryptionSettings = ClientEncryptionSettings.builder()
.keyVaultMongoClientSettings(MongoClientSettings.builder()
.applyConnectionString(new ConnectionString("mongodb://localhost"))
diff --git a/driver-sync/src/examples/tour/ClientSideEncryptionSimpleTour.java b/driver-sync/src/examples/tour/ClientSideEncryptionSimpleTour.java
index bf2cf668364..bc286ead954 100644
--- a/driver-sync/src/examples/tour/ClientSideEncryptionSimpleTour.java
+++ b/driver-sync/src/examples/tour/ClientSideEncryptionSimpleTour.java
@@ -52,7 +52,7 @@ public static void main(final String[] args) {
}});
}};
- String keyVaultNamespace = "admin.datakeys";
+ String keyVaultNamespace = "keyvault.datakeys";
AutoEncryptionSettings autoEncryptionSettings = AutoEncryptionSettings.builder()
.keyVaultNamespace(keyVaultNamespace)
diff --git a/driver-sync/src/main/com/mongodb/client/internal/KeyManagementService.java b/driver-sync/src/main/com/mongodb/client/internal/KeyManagementService.java
index e1c786d27e0..1df021c44f5 100644
--- a/driver-sync/src/main/com/mongodb/client/internal/KeyManagementService.java
+++ b/driver-sync/src/main/com/mongodb/client/internal/KeyManagementService.java
@@ -17,8 +17,11 @@
package com.mongodb.client.internal;
import com.mongodb.ServerAddress;
+import com.mongodb.internal.connection.SslHelper;
import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLParameters;
+import javax.net.ssl.SSLSocket;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
@@ -38,9 +41,10 @@ class KeyManagementService {
public InputStream stream(final String host, final ByteBuffer message) throws IOException {
ServerAddress serverAddress = host.contains(":") ? new ServerAddress(host) : new ServerAddress(host, defaultPort);
- Socket socket = sslContext.getSocketFactory().createSocket();
+ SSLSocket socket = (SSLSocket) sslContext.getSocketFactory().createSocket();
try {
+ enableHostNameVerification(socket);
socket.setSoTimeout(timeoutMillis);
socket.connect(serverAddress.getSocketAddress(), timeoutMillis);
} catch (IOException e) {
@@ -68,6 +72,15 @@ public InputStream stream(final String host, final ByteBuffer message) throws IO
}
}
+ private void enableHostNameVerification(final SSLSocket socket) {
+ SSLParameters sslParameters = socket.getSSLParameters();
+ if (sslParameters == null) {
+ sslParameters = new SSLParameters();
+ }
+ SslHelper.enableHostNameVerification(sslParameters);
+ socket.setSSLParameters(sslParameters);
+ }
+
public int getDefaultPort() {
return defaultPort;
}
diff --git a/driver-sync/src/main/com/mongodb/client/internal/MongoClientDelegate.java b/driver-sync/src/main/com/mongodb/client/internal/MongoClientDelegate.java
index 254ce3e55d4..41837905dbe 100644
--- a/driver-sync/src/main/com/mongodb/client/internal/MongoClientDelegate.java
+++ b/driver-sync/src/main/com/mongodb/client/internal/MongoClientDelegate.java
@@ -37,7 +37,6 @@
import com.mongodb.connection.Cluster;
import com.mongodb.connection.ClusterConnectionMode;
import com.mongodb.connection.ClusterDescription;
-import com.mongodb.connection.ClusterType;
import com.mongodb.connection.ServerDescription;
import com.mongodb.internal.binding.ClusterAwareReadWriteBinding;
import com.mongodb.internal.session.ServerSessionPool;
@@ -102,8 +101,7 @@ public ClientSession createClientSession(final ClientSessionOptions options, fin
ClusterDescription connectedClusterDescription = getConnectedClusterDescription();
- if (connectedClusterDescription.getType() == ClusterType.STANDALONE
- || connectedClusterDescription.getLogicalSessionTimeoutMinutes() == null) {
+ if (connectedClusterDescription.getLogicalSessionTimeoutMinutes() == null) {
return null;
} else {
ClientSessionOptions mergedOptions = ClientSessionOptions.builder(options)
diff --git a/driver-sync/src/test/functional/com/mongodb/client/ChangeStreamProseTest.java b/driver-sync/src/test/functional/com/mongodb/client/ChangeStreamProseTest.java
index 17785a503ce..fb7da3bec27 100644
--- a/driver-sync/src/test/functional/com/mongodb/client/ChangeStreamProseTest.java
+++ b/driver-sync/src/test/functional/com/mongodb/client/ChangeStreamProseTest.java
@@ -394,8 +394,10 @@ public void testGetResumeTokenReturnsPostBatchResumeTokenAfterGetMore()
// use reflection to access the postBatchResumeToken
AggregateResponseBatchCursor> batchCursor = getBatchCursor(cursor);
- // check equality in the case where the batch has not been iterated at all
- assertEquals(cursor.getResumeToken(), batchCursor.getPostBatchResumeToken());
+ assertNotNull(batchCursor.getPostBatchResumeToken());
+
+ // resume token should be null before iteration
+ assertNull(cursor.getResumeToken());
cursor.next();
assertEquals(cursor.getResumeToken(), batchCursor.getPostBatchResumeToken());
diff --git a/driver-sync/src/test/functional/com/mongodb/client/MongoClientSessionSpecification.groovy b/driver-sync/src/test/functional/com/mongodb/client/MongoClientSessionSpecification.groovy
index eddd26ae0b6..80360ccc353 100644
--- a/driver-sync/src/test/functional/com/mongodb/client/MongoClientSessionSpecification.groovy
+++ b/driver-sync/src/test/functional/com/mongodb/client/MongoClientSessionSpecification.groovy
@@ -38,7 +38,6 @@ import spock.lang.IgnoreIf
import java.util.concurrent.TimeUnit
import static com.mongodb.ClusterFixture.isDiscoverableReplicaSet
-import static com.mongodb.ClusterFixture.isStandalone
import static com.mongodb.ClusterFixture.serverVersionAtLeast
import static com.mongodb.client.Fixture.getDefaultDatabaseName
import static com.mongodb.client.Fixture.getMongoClient
@@ -54,22 +53,7 @@ class MongoClientSessionSpecification extends FunctionalSpecification {
thrown(IllegalArgumentException)
}
- @IgnoreIf({ serverVersionAtLeast(3, 6) && !isStandalone() })
- def 'should throw MongoClientException starting a session when sessions are not supported'() {
- when:
- getMongoClient().startSession()
-
- then:
- thrown(MongoClientException)
-
- when:
- getMongoClient().startSession(ClientSessionOptions.builder().build())
-
- then:
- thrown(MongoClientException)
- }
-
- @IgnoreIf({ !serverVersionAtLeast(3, 6) || isStandalone() })
+ @IgnoreIf({ !serverVersionAtLeast(3, 6) })
def 'should create session with correct defaults'() {
expect:
clientSession.getOriginator() == getMongoClient()
@@ -93,7 +77,7 @@ class MongoClientSessionSpecification extends FunctionalSpecification {
getMongoClient().startSession(ClientSessionOptions.builder().build())]
}
- @IgnoreIf({ !serverVersionAtLeast(3, 6) || isStandalone() })
+ @IgnoreIf({ !serverVersionAtLeast(3, 6) })
def 'cluster time should advance'() {
given:
def firstOperationTime = new BsonTimestamp(42, 1)
@@ -134,7 +118,7 @@ class MongoClientSessionSpecification extends FunctionalSpecification {
clientSession.getClusterTime() == secondClusterTime
}
- @IgnoreIf({ !serverVersionAtLeast(3, 6) || isStandalone() })
+ @IgnoreIf({ !serverVersionAtLeast(3, 6) })
def 'operation time should advance'() {
given:
def firstOperationTime = new BsonTimestamp(42, 1)
@@ -172,7 +156,7 @@ class MongoClientSessionSpecification extends FunctionalSpecification {
clientSession.getOperationTime() == secondOperationTime
}
- @IgnoreIf({ !serverVersionAtLeast(3, 6) || isStandalone() })
+ @IgnoreIf({ !serverVersionAtLeast(3, 6) })
def 'methods that use the session should throw if the session is closed'() {
given:
def options = ClientSessionOptions.builder().build()
@@ -198,7 +182,7 @@ class MongoClientSessionSpecification extends FunctionalSpecification {
thrown(IllegalStateException)
}
- @IgnoreIf({ !serverVersionAtLeast(3, 6) || isStandalone() })
+ @IgnoreIf({ !serverVersionAtLeast(3, 6) })
def 'informational methods should not throw if the session is closed'() {
given:
def options = ClientSessionOptions.builder().build()
@@ -215,7 +199,7 @@ class MongoClientSessionSpecification extends FunctionalSpecification {
noExceptionThrown()
}
- @IgnoreIf({ !serverVersionAtLeast(3, 6) || isStandalone() })
+ @IgnoreIf({ !serverVersionAtLeast(3, 6) })
def 'should apply causally consistent session option to client session'() {
when:
def clientSession = getMongoClient().startSession(ClientSessionOptions.builder()
@@ -230,7 +214,7 @@ class MongoClientSessionSpecification extends FunctionalSpecification {
causallyConsistent << [true, false]
}
- @IgnoreIf({ !serverVersionAtLeast(3, 6) || isStandalone() })
+ @IgnoreIf({ !serverVersionAtLeast(3, 6) })
def 'client session should have server session with valid identifier'() {
given:
def clientSession = getMongoClient().startSession(ClientSessionOptions.builder().build())
@@ -246,7 +230,7 @@ class MongoClientSessionSpecification extends FunctionalSpecification {
identifier.getBinary('id').data.length == 16
}
- @IgnoreIf({ !serverVersionAtLeast(3, 6) || isStandalone() })
+ @IgnoreIf({ !serverVersionAtLeast(3, 6) })
def 'should use a default session'() {
given:
def commandListener = new TestCommandListener()
@@ -265,24 +249,6 @@ class MongoClientSessionSpecification extends FunctionalSpecification {
client?.close()
}
- @IgnoreIf({ serverVersionAtLeast(3, 6) && !isStandalone() })
- def 'should not use a default session when sessions are not supported'() {
- given:
- def commandListener = new TestCommandListener()
- def settings = MongoClientSettings.builder(getMongoClientSettings()).commandListenerList([commandListener]).build()
- def client = MongoClients.create(settings)
-
- when:
- client.getDatabase('admin').runCommand(new BsonDocument('ping', new BsonInt32(1)))
-
- then:
- def pingCommandStartedEvent = commandListener.events.get(0)
- !(pingCommandStartedEvent as CommandStartedEvent).command.containsKey('lsid')
-
- cleanup:
- client?.close()
- }
-
// This test attempts attempts to demonstrate that causal consistency works correctly by inserting a document and then immediately
// searching for that document on a secondary by its _id and failing the test if the document is not found. Without causal consistency
// enabled the expectation is that eventually that test would fail since generally the find will execute on the secondary before
@@ -290,7 +256,7 @@ class MongoClientSessionSpecification extends FunctionalSpecification {
// This test is inherently racy as it's possible that the server _does_ replicate fast enough and therefore the test passes anyway
// even if causal consistency was not actually in effect. For that reason the test iterates a number of times in order to increase
// confidence that it's really causal consistency that is causing the test to succeed
- @IgnoreIf({ !serverVersionAtLeast(3, 6) || isStandalone() })
+ @IgnoreIf({ !serverVersionAtLeast(3, 6) })
@Category(Slow)
def 'should find inserted document on a secondary when causal consistency is enabled'() {
given:
@@ -342,7 +308,7 @@ class MongoClientSessionSpecification extends FunctionalSpecification {
client?.close()
}
- @IgnoreIf({ !serverVersionAtLeast(3, 6) || isStandalone() })
+ @IgnoreIf({ !serverVersionAtLeast(3, 6) })
def 'should throw exception if unacknowledged write used with explicit session'() {
given:
def session = getMongoClient().startSession()
diff --git a/driver-sync/src/test/functional/com/mongodb/client/TransactionProseTest.java b/driver-sync/src/test/functional/com/mongodb/client/TransactionProseTest.java
index 35a293b3498..e3f121fc6b3 100644
--- a/driver-sync/src/test/functional/com/mongodb/client/TransactionProseTest.java
+++ b/driver-sync/src/test/functional/com/mongodb/client/TransactionProseTest.java
@@ -35,6 +35,7 @@
import static com.mongodb.ClusterFixture.isSharded;
import static com.mongodb.ClusterFixture.serverVersionAtLeast;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assume.assumeNotNull;
import static org.junit.Assume.assumeTrue;
// See https://github.com/mongodb/specifications/blob/master/source/transactions/tests/README.rst#mongos-pinning-prose-tests
@@ -45,6 +46,8 @@ public class TransactionProseTest {
@Before
public void setUp() {
assumeTrue(canRunTests());
+ assumeNotNull(getMultiMongosConnectionString());
+
MongoClientSettings.Builder builder = MongoClientSettings.builder()
.applyConnectionString(getMultiMongosConnectionString());
if (System.getProperty("java.version").startsWith("1.6.")) {