From 9bf8a1de169a50d31593a5ca1c4ee31c452e18de Mon Sep 17 00:00:00 2001 From: "slav.babanin" Date: Tue, 30 Sep 2025 18:38:26 -0700 Subject: [PATCH 1/4] Reuse ConnectionSource to avoid extra server selection. JAVA-5974 --- .../operation/ChangeStreamBatchCursor.java | 51 ++++++++++++++++++- .../operation/SyncOperationHelper.java | 4 ++ .../ChangeStreamBatchCursorTest.java | 23 ++++++--- 3 files changed, 69 insertions(+), 9 deletions(-) diff --git a/driver-core/src/main/com/mongodb/internal/operation/ChangeStreamBatchCursor.java b/driver-core/src/main/com/mongodb/internal/operation/ChangeStreamBatchCursor.java index c4bd72a4775..f10cd09c68b 100644 --- a/driver-core/src/main/com/mongodb/internal/operation/ChangeStreamBatchCursor.java +++ b/driver-core/src/main/com/mongodb/internal/operation/ChangeStreamBatchCursor.java @@ -19,10 +19,14 @@ import com.mongodb.MongoChangeStreamException; import com.mongodb.MongoException; import com.mongodb.MongoOperationTimeoutException; +import com.mongodb.ReadPreference; import com.mongodb.ServerAddress; import com.mongodb.ServerCursor; +import com.mongodb.assertions.Assertions; import com.mongodb.internal.TimeoutContext; +import com.mongodb.internal.binding.ConnectionSource; import com.mongodb.internal.binding.ReadBinding; +import com.mongodb.internal.connection.OperationContext; import com.mongodb.lang.Nullable; import org.bson.BsonDocument; import org.bson.BsonTimestamp; @@ -244,9 +248,9 @@ private void resumeChangeStream() { withReadConnectionSource(binding, source -> { changeStreamOperation.setChangeStreamOptionsForResume(resumeToken, source.getServerDescription().getMaxWireVersion()); + wrapped = ((ChangeStreamBatchCursor) changeStreamOperation.execute(new SourceAwareReadBinding(source, binding))).getWrapped(); return null; }); - wrapped = ((ChangeStreamBatchCursor) changeStreamOperation.execute(binding)).getWrapped(); binding.release(); // release the new change stream batch cursor's reference to the binding } @@ -257,4 +261,49 @@ private boolean hasPreviousNextTimedOut() { private static boolean isTimeoutException(final Throwable exception) { return exception instanceof MongoOperationTimeoutException; } + + private static class SourceAwareReadBinding implements ReadBinding { + private final ConnectionSource source; + private final ReadBinding binding; + + SourceAwareReadBinding(final ConnectionSource source, final ReadBinding binding) { + this.source = source; + this.binding = binding; + } + + @Override + public ReadPreference getReadPreference() { + return binding.getReadPreference(); + } + + @Override + public ConnectionSource getReadConnectionSource() { + return source; + } + + @Override + public ConnectionSource getReadConnectionSource(final int minWireVersion, final ReadPreference fallbackReadPreference) { + throw Assertions.fail(); + } + + @Override + public int getCount() { + return binding.getCount(); + } + + @Override + public ReadBinding retain() { + return binding.retain(); + } + + @Override + public int release() { + return binding.release(); + } + + @Override + public OperationContext getOperationContext() { + return binding.getOperationContext(); + } + } } diff --git a/driver-core/src/main/com/mongodb/internal/operation/SyncOperationHelper.java b/driver-core/src/main/com/mongodb/internal/operation/SyncOperationHelper.java index 6d013df59ba..f8bbacf53ed 100644 --- a/driver-core/src/main/com/mongodb/internal/operation/SyncOperationHelper.java +++ b/driver-core/src/main/com/mongodb/internal/operation/SyncOperationHelper.java @@ -97,6 +97,10 @@ interface CommandWriteTransformer { private static final BsonDocumentCodec BSON_DOCUMENT_CODEC = new BsonDocumentCodec(); + /** + * Gets a {@link ConnectionSource} from the {@link ReadBinding#getReadConnectionSource()} and executes + * the provided {@link CallableWithSource} with it. + */ static T withReadConnectionSource(final ReadBinding binding, final CallableWithSource callable) { ConnectionSource source = binding.getReadConnectionSource(); try { diff --git a/driver-core/src/test/unit/com/mongodb/internal/operation/ChangeStreamBatchCursorTest.java b/driver-core/src/test/unit/com/mongodb/internal/operation/ChangeStreamBatchCursorTest.java index 48c3a50e79a..7835af045dc 100644 --- a/driver-core/src/test/unit/com/mongodb/internal/operation/ChangeStreamBatchCursorTest.java +++ b/driver-core/src/test/unit/com/mongodb/internal/operation/ChangeStreamBatchCursorTest.java @@ -153,7 +153,7 @@ void shouldResumeOnlyOnceOnSubsequentCallsAfterTimeoutError() { verifyNoMoreInteractions(commandBatchCursor); verify(changeStreamOperation).setChangeStreamOptionsForResume(resumeToken, maxWireVersion); verify(changeStreamOperation, times(1)).getDecoder(); - verify(changeStreamOperation, times(1)).execute(readBinding); + verify(changeStreamOperation, times(1)).execute(any(ReadBinding.class)); verifyNoMoreInteractions(changeStreamOperation); verify(newCommandBatchCursor, times(1)).next(); verify(newCommandBatchCursor, atLeastOnce()).getPostBatchResumeToken(); @@ -180,7 +180,7 @@ void shouldResumeOnlyOnceOnSubsequentCallsAfterTimeoutError() { void shouldPropagateAnyErrorsOccurredInAggregateOperation() { when(commandBatchCursor.next()).thenThrow(new MongoOperationTimeoutException("timeout")); MongoNotPrimaryException resumableError = new MongoNotPrimaryException(new BsonDocument(), new ServerAddress()); - when(changeStreamOperation.execute(readBinding)).thenThrow(resumableError); + when(changeStreamOperation.execute(any(ReadBinding.class))).thenThrow(resumableError); ChangeStreamBatchCursor cursor = createChangeStreamCursor(); //when @@ -208,11 +208,12 @@ void shouldResumeAfterTimeoutInAggregateOnNextCall() { clearInvocations(commandBatchCursor, newCommandBatchCursor, timeoutContext, changeStreamOperation, readBinding); //second next operation times out on resume attempt when creating change stream - when(changeStreamOperation.execute(readBinding)).thenThrow(new MongoOperationTimeoutException("timeout during resumption")); + when(changeStreamOperation.execute(any(ReadBinding.class))).thenThrow( + new MongoOperationTimeoutException("timeout during resumption")); assertThrows(MongoOperationTimeoutException.class, cursor::next); - clearInvocations(commandBatchCursor, newCommandBatchCursor, timeoutContext, changeStreamOperation); + clearInvocations(commandBatchCursor, newCommandBatchCursor, timeoutContext, changeStreamOperation, readBinding); - doReturn(newChangeStreamCursor).when(changeStreamOperation).execute(readBinding); + doReturn(newChangeStreamCursor).when(changeStreamOperation).execute(any(ReadBinding.class)); //when third operation succeeds to resume and call next List next = cursor.next(); @@ -242,7 +243,8 @@ void shouldCloseChangeStreamWhenResumeOperationFailsDueToNonTimeoutError() { clearInvocations(commandBatchCursor, newCommandBatchCursor, timeoutContext, changeStreamOperation, readBinding); //when second next operation errors on resume attempt when creating change stream - when(changeStreamOperation.execute(readBinding)).thenThrow(new MongoNotPrimaryException(new BsonDocument(), new ServerAddress())); + when(changeStreamOperation.execute(any(ReadBinding.class))).thenThrow( + new MongoNotPrimaryException(new BsonDocument(), new ServerAddress())); assertThrows(MongoNotPrimaryException.class, cursor::next); //then @@ -280,7 +282,8 @@ private void verifyNoResumeAttemptCalled() { private void verifyResumeAttemptCalled() { verify(commandBatchCursor, times(1)).close(); verify(changeStreamOperation).setChangeStreamOptionsForResume(resumeToken, maxWireVersion); - verify(changeStreamOperation, times(1)).execute(readBinding); + verify(changeStreamOperation, times(1)).execute(any(ReadBinding.class)); + verify(readBinding, times(1)).getReadConnectionSource(); verifyNoMoreInteractions(commandBatchCursor); } @@ -326,7 +329,11 @@ void setUp() { changeStreamOperation = mock(ChangeStreamOperation.class); when(changeStreamOperation.getDecoder()).thenReturn(new DocumentCodec()); doNothing().when(changeStreamOperation).setChangeStreamOptionsForResume(resumeToken, maxWireVersion); - when(changeStreamOperation.execute(readBinding)).thenReturn(newChangeStreamCursor); + when(changeStreamOperation.execute(any(ReadBinding.class))).thenAnswer(invocation -> { + ReadBinding binding = invocation.getArgument(0); + binding.getReadConnectionSource(); + return newChangeStreamCursor; + }); } } From b125e5ad4c39d3ca47aaa5b2d35b1aa561e2f020 Mon Sep 17 00:00:00 2001 From: "slav.babanin" Date: Fri, 17 Oct 2025 14:45:36 -0700 Subject: [PATCH 2/4] Retain wrapped ConnectionSource getReadConnectionSource. --- .../com/mongodb/internal/operation/ChangeStreamBatchCursor.java | 1 + 1 file changed, 1 insertion(+) diff --git a/driver-core/src/main/com/mongodb/internal/operation/ChangeStreamBatchCursor.java b/driver-core/src/main/com/mongodb/internal/operation/ChangeStreamBatchCursor.java index f10cd09c68b..f5c483d7d85 100644 --- a/driver-core/src/main/com/mongodb/internal/operation/ChangeStreamBatchCursor.java +++ b/driver-core/src/main/com/mongodb/internal/operation/ChangeStreamBatchCursor.java @@ -278,6 +278,7 @@ public ReadPreference getReadPreference() { @Override public ConnectionSource getReadConnectionSource() { + source.retain(); return source; } From 835d556950d528a211807667e8d7e8af2eb110b9 Mon Sep 17 00:00:00 2001 From: "slav.babanin" Date: Fri, 17 Oct 2025 14:54:52 -0700 Subject: [PATCH 3/4] Add comments for clarity. --- .../mongodb/internal/operation/ChangeStreamBatchCursor.java | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/driver-core/src/main/com/mongodb/internal/operation/ChangeStreamBatchCursor.java b/driver-core/src/main/com/mongodb/internal/operation/ChangeStreamBatchCursor.java index f5c483d7d85..61708473143 100644 --- a/driver-core/src/main/com/mongodb/internal/operation/ChangeStreamBatchCursor.java +++ b/driver-core/src/main/com/mongodb/internal/operation/ChangeStreamBatchCursor.java @@ -248,6 +248,8 @@ private void resumeChangeStream() { withReadConnectionSource(binding, source -> { changeStreamOperation.setChangeStreamOptionsForResume(resumeToken, source.getServerDescription().getMaxWireVersion()); + // The same source is pined to resulting CommandBatchCursor, so we need to wrap the binding + // to return the same source to avoid double-selection of the server. wrapped = ((ChangeStreamBatchCursor) changeStreamOperation.execute(new SourceAwareReadBinding(source, binding))).getWrapped(); return null; }); @@ -262,6 +264,9 @@ private static boolean isTimeoutException(final Throwable exception) { return exception instanceof MongoOperationTimeoutException; } + /** + * Does not retain wrapped {link @ReadBinding} as it serves as a wrapper only. + */ private static class SourceAwareReadBinding implements ReadBinding { private final ConnectionSource source; private final ReadBinding binding; From ccd52af56820eccd9f8c20bba49debf1b4b07303 Mon Sep 17 00:00:00 2001 From: Viacheslav Babanin Date: Fri, 17 Oct 2025 15:44:39 -0700 Subject: [PATCH 4/4] Update comment. --- .../com/mongodb/internal/operation/ChangeStreamBatchCursor.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/driver-core/src/main/com/mongodb/internal/operation/ChangeStreamBatchCursor.java b/driver-core/src/main/com/mongodb/internal/operation/ChangeStreamBatchCursor.java index 61708473143..b51e6c02696 100644 --- a/driver-core/src/main/com/mongodb/internal/operation/ChangeStreamBatchCursor.java +++ b/driver-core/src/main/com/mongodb/internal/operation/ChangeStreamBatchCursor.java @@ -248,7 +248,7 @@ private void resumeChangeStream() { withReadConnectionSource(binding, source -> { changeStreamOperation.setChangeStreamOptionsForResume(resumeToken, source.getServerDescription().getMaxWireVersion()); - // The same source is pined to resulting CommandBatchCursor, so we need to wrap the binding + // The same source is pinned to resulting CommandBatchCursor, so we need to wrap the binding // to return the same source to avoid double-selection of the server. wrapped = ((ChangeStreamBatchCursor) changeStreamOperation.execute(new SourceAwareReadBinding(source, binding))).getWrapped(); return null;