From 0ff1d273ad28a850d540ac558d1be1574a1bb72e Mon Sep 17 00:00:00 2001 From: Oleksandr Poliakov Date: Wed, 24 Sep 2025 15:57:34 -0700 Subject: [PATCH 1/3] CSHARP-5024: CSOT avoid connection churn when operations timeout --- build.cake | 9 + evergreen/evergreen.yml | 37 ++ evergreen/run-csot-pendingresponse-tests.sh | 8 + .../pending-response-close-connection.json | 180 ++++++ .../pending-response-close-connection.yml | 98 +++ .../tests/pending-response.json | 491 +++++++++++++++ .../tests/pending-response.yml | 258 ++++++++ .../logging/connection-logging-csot.json | 578 ++++++++++++++++++ .../tests/logging/connection-logging-csot.yml | 254 ++++++++ .../Core/Compression/SnappyCompressor.cs | 6 +- .../ExclusiveConnectionPool.Helpers.cs | 103 +++- .../Core/Connections/BinaryConnection.cs | 482 +++++++++++++-- .../Core/Connections/IConnection.cs | 2 + .../ConnectionReadPendingResponseEvent.cs | 60 ++ .../ConnectionReadingPendingResponseEvent.cs | 51 ++ ...ectionReadingPendingResponseFailedEvent.cs | 68 +++ src/MongoDB.Driver/Core/Events/EventType.cs | 3 + .../Logging/StructuredLogTemplateProviders.cs | 7 + ...tructuredLogTemplateProvidersConnection.cs | 30 + .../Core/Misc/IClockExtensions.cs | 28 + .../Core/Misc/StreamExtensionMethods.cs | 161 +---- .../Core/Operations/RetryabilityHelper.cs | 4 +- .../RetryableWriteOperationExecutor.cs | 4 +- src/MongoDB.Driver/Core/Servers/Server.cs | 2 +- .../Messages/CommandRequestMessage.cs | 3 +- .../WireProtocol/Messages/QueryMessage.cs | 2 + .../WireProtocol/Messages/RequestMessage.cs | 2 + src/MongoDB.Driver/OperationContext.cs | 21 +- .../Core/MockConnection.cs | 9 + .../Core/Misc/StreamExtensionMethodsTests.cs | 238 +------- .../PendingResponseProseTests.cs | 146 +++++ .../Matchers/UnifiedEventMatcher.cs | 33 + .../Matchers/UnifiedLogMatcher.cs | 7 + .../UnifiedTestOperations/UnifiedEntityMap.cs | 9 + .../UnifiedTestRunner.cs | 2 +- 35 files changed, 2941 insertions(+), 455 deletions(-) create mode 100644 evergreen/run-csot-pendingresponse-tests.sh create mode 100644 specifications/client-side-operations-timeout/tests/pending-response-close-connection.json create mode 100644 specifications/client-side-operations-timeout/tests/pending-response-close-connection.yml create mode 100644 specifications/client-side-operations-timeout/tests/pending-response.json create mode 100644 specifications/client-side-operations-timeout/tests/pending-response.yml create mode 100644 specifications/connection-monitoring-and-pooling/tests/logging/connection-logging-csot.json create mode 100644 specifications/connection-monitoring-and-pooling/tests/logging/connection-logging-csot.yml create mode 100644 src/MongoDB.Driver/Core/Events/ConnectionReadPendingResponseEvent.cs create mode 100644 src/MongoDB.Driver/Core/Events/ConnectionReadingPendingResponseEvent.cs create mode 100644 src/MongoDB.Driver/Core/Events/ConnectionReadingPendingResponseFailedEvent.cs create mode 100644 src/MongoDB.Driver/Core/Misc/IClockExtensions.cs create mode 100644 tests/MongoDB.Driver.Tests/Specifications/connection-monitoring-and-pooling/PendingResponseProseTests.cs diff --git a/build.cake b/build.cake index fc2432a722f..9d2c957f182 100644 --- a/build.cake +++ b/build.cake @@ -267,6 +267,15 @@ Task("TestSocks5Proxy") action: (BuildConfig buildConfig, Path testProject) => RunTests(buildConfig, testProject, filter: "Category=\"Socks5Proxy\"")); +Task("CSOTPendingResponse") + .IsDependentOn("Build") + .DoesForEach( + items: GetFiles("./**/MongoDB.Driver.Tests.csproj"), + action: (BuildConfig buildConfig, Path testProject) => + RunTests(buildConfig, testProject, filter: "Category=\"CSOTPendingResponse\"")); + +Task("CSOTPendingResponseNet60").IsDependentOn("CSOTPendingResponse"); + Task("Package") .IsDependentOn("PackageNugetPackages"); diff --git a/evergreen/evergreen.yml b/evergreen/evergreen.yml index 617e84b5b30..dd4d8a65ba8 100644 --- a/evergreen/evergreen.yml +++ b/evergreen/evergreen.yml @@ -748,6 +748,15 @@ functions: git clone https://github.com/microsoft/semantic-kernel.git ./evergreen/run-sk.sh + run-csot-pendingresponse-tests: + - command: shell.exec + type: test + params: + working_dir: mongo-csharp-driver + script: | + ${PREPARE_SHELL} + evergreen/run-csot-pendingresponse-tests.sh + setup-csfle-secrets: - command: ec2.assume_role params: @@ -1823,6 +1832,25 @@ tasks: vars: FRAMEWORK: net60 + - name: test-csot-pendingresponse + commands: + - func: bootstrap-mongo-orchestration + vars: + MONGODB_VERSION: latest + TOPOLOGY: standalone + AUTH: noauth + SSL: nossl + - command: subprocess.exec + params: + working_dir: mongo-csharp-driver + binary: bash + background: true + include_expansions_in_env: + - MONGODB_URI + args: + - ${DRIVERS_TOOLS}/.evergreen/start-mongoproxy.sh + - func: install-dotnet + - func: run-csot-pendingresponse-tests axes: - id: version display_name: MongoDB Version @@ -2556,6 +2584,15 @@ buildvariants: - name: "test-load-balancer-netstandard21" - name: "test-load-balancer-net60" +# CSOT +- name: csot-pendingresponse-tests + display_name: "CSOT Pending Response tests" + batchtime: 720 # 12 hours + run_on: + - ubuntu2204-small + tasks: + - name: test-csot-pendingresponse + # Performance tests - name: driver-performance-tests display_name: "Driver Performance Tests" diff --git a/evergreen/run-csot-pendingresponse-tests.sh b/evergreen/run-csot-pendingresponse-tests.sh new file mode 100644 index 00000000000..94d47b0164a --- /dev/null +++ b/evergreen/run-csot-pendingresponse-tests.sh @@ -0,0 +1,8 @@ +#!/usr/bin/env bash + +set -o errexit # Exit the script with error if any of the commands fail + +export ENABLE_CSOT_PENDING_RESPONSE_TESTS=1 +export MONGODB_URI="mongodb://127.0.0.1:28017/?directConnection=true" + +./build.sh --target=CSOTPendingResponseNet60 diff --git a/specifications/client-side-operations-timeout/tests/pending-response-close-connection.json b/specifications/client-side-operations-timeout/tests/pending-response-close-connection.json new file mode 100644 index 00000000000..84de8306c5f --- /dev/null +++ b/specifications/client-side-operations-timeout/tests/pending-response-close-connection.json @@ -0,0 +1,180 @@ +{ + "description": "Connection churn is prevented by reading pending responses during connection checkout", + "schemaVersion": "1.28", + "runOnRequirements": [ + { + "minServerVersion": "5.0", + "topologies": [ + "single", + "replicaset" + ] + } + ], + "createEntities": [ + { + "client": { + "id": "failPointClient", + "useMultipleMongoses": false + } + }, + { + "client": { + "id": "client", + "uriOptions": { + "maxPoolSize": 1, + "minPoolSize": 1 + }, + "awaitMinPoolSizeMS": 10000, + "useMultipleMongoses": false, + "observeEvents": [ + "commandFailedEvent", + "commandSucceededEvent", + "connectionCheckOutStartedEvent", + "connectionCheckedOutEvent", + "connectionCheckOutFailedEvent", + "connectionCheckedInEvent", + "connectionCreatedEvent", + "connectionReadyEvent", + "connectionClosedEvent", + "connectionPendingResponseSucceededEvent", + "connectionPendingResponseStartedEvent", + "connectionPendingResponseFailedEvent" + ] + } + }, + { + "database": { + "id": "test", + "client": "client", + "databaseName": "test" + } + }, + { + "collection": { + "id": "coll", + "database": "test", + "collectionName": "coll" + } + } + ], + "initialData": [ + { + "collectionName": "coll", + "databaseName": "test", + "documents": [] + } + ], + "tests": [ + { + "description": "write op retries when connection closes server-side while draining response", + "operations": [ + { + "name": "failPoint", + "object": "testRunner", + "arguments": { + "client": "failPointClient", + "failPoint": { + "configureFailPoint": "failCommand", + "mode": { + "times": 1 + }, + "data": { + "failCommands": [ + "insert" + ], + "blockConnection": true, + "blockTimeMS": 200, + "closeConnection": true + } + } + } + }, + { + "name": "insertOne", + "object": "coll", + "arguments": { + "timeoutMS": 50, + "document": { + "_id": 3, + "x": 1 + } + }, + "expectError": { + "isTimeoutError": true + } + }, + { + "name": "insertOne", + "object": "coll", + "arguments": { + "timeoutMS": 1000, + "document": { + "_id": 3, + "x": 1 + } + } + } + ], + "expectEvents": [ + { + "client": "client", + "events": [ + { + "commandFailedEvent": { + "commandName": "insert" + } + }, + { + "commandSucceededEvent": { + "commandName": "insert" + } + } + ] + }, + { + "client": "client", + "eventType": "cmap", + "events": [ + { + "connectionCheckOutStartedEvent": {} + }, + { + "connectionCheckedOutEvent": {} + }, + { + "connectionCheckedInEvent": {} + }, + { + "connectionCheckOutStartedEvent": {} + }, + { + "connectionPendingResponseStartedEvent": {} + }, + { + "connectionPendingResponseFailedEvent": { + "reason": "error" + } + }, + { + "connectionClosedEvent": { + "reason": "error" + } + }, + { + "connectionCreatedEvent": {} + }, + { + "connectionReadyEvent": {} + }, + { + "connectionCheckedOutEvent": {} + }, + { + "connectionCheckedInEvent": {} + } + ] + } + ] + } + ] +} diff --git a/specifications/client-side-operations-timeout/tests/pending-response-close-connection.yml b/specifications/client-side-operations-timeout/tests/pending-response-close-connection.yml new file mode 100644 index 00000000000..20f5375c197 --- /dev/null +++ b/specifications/client-side-operations-timeout/tests/pending-response-close-connection.yml @@ -0,0 +1,98 @@ +description: "Connection churn is prevented by reading pending responses during connection checkout" +schemaVersion: "1.28" +runOnRequirements: + - minServerVersion: "5.0" + # FailPoint on Server before 5.0 do not wait for blockTimeMS before closing the connection + topologies: ["single", "replicaset"] +createEntities: + - client: + id: &failPointClient failPointClient + useMultipleMongoses: false + - client: + id: &client client + uriOptions: + maxPoolSize: 1 + minPoolSize: 1 + awaitMinPoolSizeMS: 10000 + useMultipleMongoses: false + observeEvents: + - commandFailedEvent + - commandSucceededEvent + - connectionCheckOutStartedEvent + - connectionCheckedOutEvent + - connectionCheckOutFailedEvent + - connectionCheckedInEvent + - connectionCreatedEvent + - connectionReadyEvent + - connectionClosedEvent + - connectionPendingResponseSucceededEvent + - connectionPendingResponseStartedEvent + - connectionPendingResponseFailedEvent + - database: + id: &database test + client: *client + databaseName: *database + - collection: + id: &collection coll + database: *database + collectionName: *collection +initialData: + - collectionName: *collection + databaseName: *database + documents: [] +tests: + # If the connection is closed server-side while draining the response, the + # driver must retry with a different connection. + - description: "write op retries when connection closes server-side while draining response" + operations: + # Create a failpoint to block the first and second operation. + - name: failPoint + object: testRunner + arguments: + client: *failPointClient + failPoint: + configureFailPoint: failCommand + mode: {times: 1} + data: + failCommands: ["insert"] + blockConnection: true + blockTimeMS: 200 + closeConnection: true + # Execute operation with timeout less than block time. + - name: insertOne + object: *collection + arguments: + timeoutMS: 50 + document: {_id: 3, x: 1} + expectError: + isTimeoutError: true + # Draining pending response should failure because of closed connection, + # but another connection should be established within the checkout and operation should succeed. + - name: insertOne + object: *collection + arguments: + timeoutMS: 1000 + document: {_id: 3, x: 1} + expectEvents: + - client: *client + events: + - commandFailedEvent: + commandName: insert + - commandSucceededEvent: + commandName: insert + - client: *client + eventType: cmap + events: + - connectionCheckOutStartedEvent: { } + - connectionCheckedOutEvent: { } + - connectionCheckedInEvent: { } # Insert fails. + - connectionCheckOutStartedEvent: { } + - connectionPendingResponseStartedEvent: { } + - connectionPendingResponseFailedEvent: # Pending response failed. + reason: error + - connectionClosedEvent: + reason: error + - connectionCreatedEvent: { } # New connection should be created and checked out. + - connectionReadyEvent: { } + - connectionCheckedOutEvent: { } + - connectionCheckedInEvent: { } # Find finishes. diff --git a/specifications/client-side-operations-timeout/tests/pending-response.json b/specifications/client-side-operations-timeout/tests/pending-response.json new file mode 100644 index 00000000000..4e69ebe44c8 --- /dev/null +++ b/specifications/client-side-operations-timeout/tests/pending-response.json @@ -0,0 +1,491 @@ +{ + "description": "Connection churn is prevented by reading pending responses during connection checkout", + "schemaVersion": "1.28", + "runOnRequirements": [ + { + "minServerVersion": "4.4", + "topologies": [ + "single", + "replicaset" + ] + } + ], + "createEntities": [ + { + "client": { + "id": "failPointClient", + "useMultipleMongoses": false + } + }, + { + "client": { + "id": "client", + "uriOptions": { + "maxPoolSize": 1, + "minPoolSize": 1 + }, + "awaitMinPoolSizeMS": 10000, + "useMultipleMongoses": false, + "observeEvents": [ + "commandFailedEvent", + "commandSucceededEvent", + "connectionCheckOutStartedEvent", + "connectionCheckedOutEvent", + "connectionCheckOutFailedEvent", + "connectionCheckedInEvent", + "connectionCreatedEvent", + "connectionReadyEvent", + "connectionClosedEvent", + "connectionPendingResponseSucceededEvent", + "connectionPendingResponseStartedEvent", + "connectionPendingResponseFailedEvent" + ] + } + }, + { + "database": { + "id": "test", + "client": "client", + "databaseName": "test" + } + }, + { + "collection": { + "id": "coll", + "database": "test", + "collectionName": "coll" + } + } + ], + "initialData": [ + { + "collectionName": "coll", + "databaseName": "test", + "documents": [] + } + ], + "tests": [ + { + "description": "non-timeout op with response and no error", + "operations": [ + { + "name": "failPoint", + "object": "testRunner", + "arguments": { + "client": "failPointClient", + "failPoint": { + "configureFailPoint": "failCommand", + "mode": { + "times": 1 + }, + "data": { + "failCommands": [ + "insert" + ], + "blockConnection": true, + "blockTimeMS": 100 + } + } + } + }, + { + "name": "insertOne", + "object": "coll", + "arguments": { + "timeoutMS": 75, + "document": { + "_id": 3, + "x": 1 + } + }, + "expectError": { + "isTimeoutError": true + } + }, + { + "name": "find", + "object": "coll", + "arguments": { + "filter": { + "_id": 1 + } + } + } + ], + "expectEvents": [ + { + "client": "client", + "events": [ + { + "commandFailedEvent": { + "commandName": "insert" + } + }, + { + "commandSucceededEvent": { + "commandName": "find" + } + } + ] + }, + { + "client": "client", + "eventType": "cmap", + "events": [ + { + "connectionCheckOutStartedEvent": {} + }, + { + "connectionCheckedOutEvent": {} + }, + { + "connectionCheckedInEvent": {} + }, + { + "connectionCheckOutStartedEvent": {} + }, + { + "connectionPendingResponseStartedEvent": {} + }, + { + "connectionPendingResponseSucceededEvent": {} + }, + { + "connectionCheckedOutEvent": {} + }, + { + "connectionCheckedInEvent": {} + } + ] + } + ] + }, + { + "description": "non-timeout op with no response", + "operations": [ + { + "name": "failPoint", + "object": "testRunner", + "arguments": { + "client": "failPointClient", + "failPoint": { + "configureFailPoint": "failCommand", + "mode": { + "times": 1 + }, + "data": { + "failCommands": [ + "insert" + ], + "blockConnection": true, + "blockTimeMS": 3100 + } + } + } + }, + { + "name": "insertOne", + "object": "coll", + "arguments": { + "timeoutMS": 50, + "document": { + "_id": 3, + "x": 1 + } + }, + "expectError": { + "isTimeoutError": true + } + }, + { + "name": "find", + "object": "coll", + "arguments": { + "filter": { + "_id": 1 + } + } + } + ], + "expectEvents": [ + { + "client": "client", + "events": [ + { + "commandFailedEvent": { + "commandName": "insert" + } + }, + { + "commandSucceededEvent": { + "commandName": "find" + } + } + ] + }, + { + "client": "client", + "eventType": "cmap", + "events": [ + { + "connectionCheckOutStartedEvent": {} + }, + { + "connectionCheckedOutEvent": {} + }, + { + "connectionCheckedInEvent": {} + }, + { + "connectionCheckOutStartedEvent": {} + }, + { + "connectionPendingResponseStartedEvent": {} + }, + { + "connectionPendingResponseFailedEvent": { + "reason": "timeout" + } + }, + { + "connectionClosedEvent": { + "reason": "error" + } + }, + { + "connectionCreatedEvent": {} + }, + { + "connectionReadyEvent": {} + }, + { + "connectionCheckedOutEvent": {} + }, + { + "connectionCheckedInEvent": {} + } + ] + } + ] + }, + { + "description": "timeout op with response and no error", + "operations": [ + { + "name": "failPoint", + "object": "testRunner", + "arguments": { + "client": "failPointClient", + "failPoint": { + "configureFailPoint": "failCommand", + "mode": { + "times": 1 + }, + "data": { + "failCommands": [ + "insert" + ], + "blockConnection": true, + "blockTimeMS": 100 + } + } + } + }, + { + "name": "insertOne", + "object": "coll", + "arguments": { + "timeoutMS": 75, + "document": { + "_id": 3, + "x": 1 + } + }, + "expectError": { + "isTimeoutError": true + } + }, + { + "name": "find", + "object": "coll", + "arguments": { + "timeoutMS": 100, + "filter": { + "_id": 1 + } + } + } + ], + "expectEvents": [ + { + "client": "client", + "events": [ + { + "commandFailedEvent": { + "commandName": "insert" + } + }, + { + "commandSucceededEvent": { + "commandName": "find" + } + } + ] + }, + { + "client": "client", + "eventType": "cmap", + "events": [ + { + "connectionCheckOutStartedEvent": {} + }, + { + "connectionCheckedOutEvent": {} + }, + { + "connectionCheckedInEvent": {} + }, + { + "connectionCheckOutStartedEvent": {} + }, + { + "connectionPendingResponseStartedEvent": {} + }, + { + "connectionPendingResponseSucceededEvent": {} + }, + { + "connectionCheckedOutEvent": {} + }, + { + "connectionCheckedInEvent": {} + } + ] + } + ] + }, + { + "description": "multiple calls to drain buffer", + "operations": [ + { + "name": "failPoint", + "object": "testRunner", + "arguments": { + "client": "failPointClient", + "failPoint": { + "configureFailPoint": "failCommand", + "mode": { + "times": 1 + }, + "data": { + "failCommands": [ + "insert" + ], + "blockConnection": true, + "blockTimeMS": 200 + } + } + } + }, + { + "name": "insertOne", + "object": "coll", + "arguments": { + "timeoutMS": 50, + "document": { + "_id": 3, + "x": 1 + } + }, + "expectError": { + "isTimeoutError": true + } + }, + { + "name": "find", + "object": "coll", + "arguments": { + "timeoutMS": 50, + "filter": { + "_id": 1 + } + }, + "expectError": { + "isTimeoutError": true + } + }, + { + "name": "find", + "object": "coll", + "arguments": { + "timeoutMS": 200, + "filter": { + "_id": 1 + } + } + } + ], + "expectEvents": [ + { + "client": "client", + "events": [ + { + "commandFailedEvent": { + "commandName": "insert" + } + }, + { + "commandSucceededEvent": { + "commandName": "find" + } + } + ] + }, + { + "client": "client", + "eventType": "cmap", + "events": [ + { + "connectionCheckOutStartedEvent": {} + }, + { + "connectionCheckedOutEvent": {} + }, + { + "connectionCheckedInEvent": {} + }, + { + "connectionCheckOutStartedEvent": {} + }, + { + "connectionPendingResponseStartedEvent": {} + }, + { + "connectionPendingResponseFailedEvent": { + "reason": "timeout" + } + }, + { + "connectionCheckOutFailedEvent": {} + }, + { + "connectionCheckOutStartedEvent": {} + }, + { + "connectionPendingResponseStartedEvent": {} + }, + { + "connectionPendingResponseSucceededEvent": {} + }, + { + "connectionCheckedOutEvent": {} + }, + { + "connectionCheckedInEvent": {} + } + ] + } + ] + } + ] +} diff --git a/specifications/client-side-operations-timeout/tests/pending-response.yml b/specifications/client-side-operations-timeout/tests/pending-response.yml new file mode 100644 index 00000000000..4324e97476b --- /dev/null +++ b/specifications/client-side-operations-timeout/tests/pending-response.yml @@ -0,0 +1,258 @@ +description: "Connection churn is prevented by reading pending responses during connection checkout" +schemaVersion: "1.28" +runOnRequirements: + - minServerVersion: "4.4" + # TODO(SERVER-96344): When using failpoints, mongos returns MaxTimeMSExpired + # after maxTimeMS, whereas mongod returns it after + # max(blockTimeMS, maxTimeMS). Until this ticket is resolved, these tests + # will not pass on sharded clusters. + topologies: ["single", "replicaset"] +createEntities: + - client: + id: &failPointClient failPointClient + useMultipleMongoses: false + - client: + id: &client client + uriOptions: + maxPoolSize: 1 + minPoolSize: 1 + awaitMinPoolSizeMS: 10000 + useMultipleMongoses: false + observeEvents: + - commandFailedEvent + - commandSucceededEvent + - connectionCheckOutStartedEvent + - connectionCheckedOutEvent + - connectionCheckOutFailedEvent + - connectionCheckedInEvent + - connectionCreatedEvent + - connectionReadyEvent + - connectionClosedEvent + - connectionPendingResponseSucceededEvent + - connectionPendingResponseStartedEvent + - connectionPendingResponseFailedEvent + - database: + id: &database test + client: *client + databaseName: *database + - collection: + id: &collection coll + database: *database + collectionName: *collection +initialData: + - collectionName: *collection + databaseName: *database + documents: [] +tests: + # Attempting a pending response read on a non-timeout operation that can + # immediately read from the TCP buffer should complete the pending read and + # the connection should be checked out. + - description: "non-timeout op with response and no error" + operations: + # Create a failpoint to block the first operation. + - name: failPoint + object: testRunner + arguments: + client: *failPointClient + failPoint: + configureFailPoint: failCommand + mode: {times: 1} + data: + failCommands: ["insert"] + blockConnection: true + blockTimeMS: 100 + # Execute operation with timeout less than block time. + - name: insertOne + object: *collection + arguments: + timeoutMS: 75 + document: {_id: 3, x: 1} + expectError: + isTimeoutError: true + # Execute a subsequent operation to complete the read. + - name: find + object: *collection + arguments: + filter: {_id: 1} + expectEvents: + - client: *client + events: + - commandFailedEvent: + commandName: insert + - commandSucceededEvent: + commandName: find + - client: *client + eventType: cmap + events: + - connectionCheckOutStartedEvent: {} + - connectionCheckedOutEvent: {} + - connectionCheckedInEvent: {} # Insert fails. + - connectionCheckOutStartedEvent: {} + - connectionPendingResponseStartedEvent: {} + - connectionPendingResponseSucceededEvent: {} # Find operation drains connection. + - connectionCheckedOutEvent: {} + - connectionCheckedInEvent: {} # Find succeeds. + # Attempting a pending response read on a non-timeout operation that gets no + # response from the server after 3s should close the connection. + - description: "non-timeout op with no response" + operations: + # Create a failpoint to block the first operation. + - name: failPoint + object: testRunner + arguments: + client: *failPointClient + failPoint: + configureFailPoint: failCommand + mode: {times: 1} + data: + failCommands: ["insert"] + blockConnection: true + blockTimeMS: 3100 + # Execute operation with timeout less than block time. + - name: insertOne + object: *collection + arguments: + timeoutMS: 50 + document: {_id: 3, x: 1} + expectError: + isTimeoutError: true + # Execute a subsequent operation to complete the read. We expect failure + # in the pending read, resulting in a closed connection. However, the + # find should retry and succeed. + - name: find + object: *collection + arguments: + filter: {_id: 1} + expectEvents: + - client: *client + events: + - commandFailedEvent: + commandName: insert + - commandSucceededEvent: + commandName: find + - client: *client + eventType: cmap + events: + - connectionCheckOutStartedEvent: {} + - connectionCheckedOutEvent: {} + - connectionCheckedInEvent: {} # Insert fails. + - connectionCheckOutStartedEvent: {} + - connectionPendingResponseStartedEvent: {} + - connectionPendingResponseFailedEvent: + reason: timeout + - connectionClosedEvent: + reason: error + - connectionCreatedEvent: {} + - connectionReadyEvent: {} + - connectionCheckedOutEvent: {} + - connectionCheckedInEvent: {} + # Attempting a pending response read on a realistic timeout operation that can + # immediately read from the TCP buffer should complete the pending read and + # the connection should be checked out. + - description: "timeout op with response and no error" + operations: + # Create a failpoint to block the first operation. + - name: failPoint + object: testRunner + arguments: + client: *failPointClient + failPoint: + configureFailPoint: failCommand + mode: {times: 1} + data: + failCommands: ["insert"] + blockConnection: true + blockTimeMS: 100 + # Execute operation with timeout less than block time. + - name: insertOne + object: *collection + arguments: + timeoutMS: 75 + document: {_id: 3, x: 1} + expectError: + isTimeoutError: true + # Execute a subsequent operation to complete the read. + - name: find + object: *collection + arguments: + timeoutMS: 100 + filter: {_id: 1} + expectEvents: + - client: *client + events: + - commandFailedEvent: + commandName: insert + - commandSucceededEvent: + commandName: find + - client: *client + eventType: cmap + events: + - connectionCheckOutStartedEvent: {} + - connectionCheckedOutEvent: {} + - connectionCheckedInEvent: {} # Insert fails. + - connectionCheckOutStartedEvent: {} + - connectionPendingResponseStartedEvent: {} + - connectionPendingResponseSucceededEvent: {} + - connectionCheckedOutEvent: {} + - connectionCheckedInEvent: {} # Find succeeds. + # It may take multiple calls to the pending response handler to drain the + # inbound buffer. + - description: "multiple calls to drain buffer" + operations: + # Create a failpoint to block the first and second operation. + - name: failPoint + object: testRunner + arguments: + client: *failPointClient + failPoint: + configureFailPoint: failCommand + mode: {times: 1} + data: + failCommands: ["insert"] + blockConnection: true + blockTimeMS: 200 + # Execute operation with timeout less than block time. + - name: insertOne + object: *collection + arguments: + timeoutMS: 50 + document: {_id: 3, x: 1} + expectError: + isTimeoutError: true + # Execute a subsequent operation with a timeout less than the block time. + - name: find + object: *collection + arguments: + timeoutMS: 50 + filter: {_id: 1} + expectError: + isTimeoutError: true + # Execute a final operation to drain the buffer. + - name: find + object: *collection + arguments: + timeoutMS: 200 + filter: {_id: 1} + expectEvents: + - client: *client + events: + - commandFailedEvent: + commandName: insert + - commandSucceededEvent: + commandName: find + - client: *client + eventType: cmap + events: + - connectionCheckOutStartedEvent: {} + - connectionCheckedOutEvent: {} + - connectionCheckedInEvent: {} # Insert fails. + - connectionCheckOutStartedEvent: {} + - connectionPendingResponseStartedEvent: {} # First find fails + - connectionPendingResponseFailedEvent: + reason: timeout + - connectionCheckOutFailedEvent: {} + - connectionCheckOutStartedEvent: {} + - connectionPendingResponseStartedEvent: {} # Second find drains the buffer. + - connectionPendingResponseSucceededEvent: {} + - connectionCheckedOutEvent: {} + - connectionCheckedInEvent: {} # Second find succeeds. diff --git a/specifications/connection-monitoring-and-pooling/tests/logging/connection-logging-csot.json b/specifications/connection-monitoring-and-pooling/tests/logging/connection-logging-csot.json new file mode 100644 index 00000000000..f39aaf9b76f --- /dev/null +++ b/specifications/connection-monitoring-and-pooling/tests/logging/connection-logging-csot.json @@ -0,0 +1,578 @@ +{ + "description": "connection-logging", + "schemaVersion": "1.24", + "runOnRequirements": [ + { + "topologies": [ + "single" + ] + } + ], + "createEntities": [ + { + "client": { + "id": "failPointClient" + } + } + ], + "tests": [ + { + "description": "force a pending response read, fail first try, succeed second try", + "operations": [ + { + "name": "createEntities", + "object": "testRunner", + "arguments": { + "entities": [ + { + "client": { + "id": "client", + "uriOptions": { + "retryReads": false, + "appname": "clientAppName", + "maxPoolSize": 1, + "heartbeatFrequencyMS": 10000 + }, + "observeLogMessages": { + "connection": "debug" + } + } + }, + { + "database": { + "id": "test", + "client": "client", + "databaseName": "test" + } + }, + { + "collection": { + "id": "coll", + "database": "test", + "collectionName": "coll" + } + } + ] + } + }, + { + "name": "runCommand", + "object": "test", + "arguments": { + "command": { + "ping": 1 + }, + "commandName": "ping" + } + }, + { + "name": "failPoint", + "object": "testRunner", + "arguments": { + "client": "failPointClient", + "failPoint": { + "configureFailPoint": "failCommand", + "mode": { + "times": 1 + }, + "data": { + "failCommands": [ + "insert" + ], + "blockConnection": true, + "blockTimeMS": 300 + } + } + } + }, + { + "name": "insertOne", + "object": "coll", + "arguments": { + "timeoutMS": 50, + "document": { + "_id": 3, + "x": 1 + } + }, + "expectError": { + "isTimeoutError": true + } + }, + { + "name": "find", + "object": "coll", + "arguments": { + "filter": { + "_id": 1 + }, + "timeoutMS": 75 + }, + "expectError": { + "isTimeoutError": true + } + }, + { + "name": "wait", + "object": "testRunner", + "arguments": { + "ms": 200 + } + }, + { + "name": "find", + "object": "coll", + "arguments": { + "filter": { + "_id": 1 + } + } + } + ], + "expectLogMessages": [ + { + "client": "client", + "ignoreExtraMessages": true, + "messages": [ + { + "level": "debug", + "component": "connection", + "data": { + "message": "Connection pool created", + "serverHost": { + "$$type": "string" + }, + "serverPort": { + "$$type": [ + "int", + "long" + ] + } + } + }, + { + "level": "debug", + "component": "connection", + "data": { + "message": "Connection pool ready", + "serverHost": { + "$$type": "string" + }, + "serverPort": { + "$$type": [ + "int", + "long" + ] + } + } + }, + { + "level": "debug", + "component": "connection", + "data": { + "message": "Connection checkout started", + "serverHost": { + "$$type": "string" + }, + "serverPort": { + "$$type": [ + "int", + "long" + ] + } + } + }, + { + "level": "debug", + "component": "connection", + "data": { + "message": "Connection created", + "driverConnectionId": { + "$$type": [ + "int", + "long" + ] + }, + "serverHost": { + "$$type": "string" + }, + "serverPort": { + "$$type": [ + "int", + "long" + ] + } + } + }, + { + "level": "debug", + "component": "connection", + "data": { + "message": "Connection ready", + "driverConnectionId": { + "$$type": [ + "int", + "long" + ] + }, + "serverHost": { + "$$type": "string" + }, + "serverPort": { + "$$type": [ + "int", + "long" + ] + }, + "durationMS": { + "$$type": [ + "double", + "int", + "long" + ] + } + } + }, + { + "level": "debug", + "component": "connection", + "data": { + "message": "Connection checked out", + "driverConnectionId": { + "$$type": [ + "int", + "long" + ] + }, + "serverHost": { + "$$type": "string" + }, + "serverPort": { + "$$type": [ + "int", + "long" + ] + }, + "durationMS": { + "$$type": [ + "double", + "int", + "long" + ] + } + } + }, + { + "level": "debug", + "component": "connection", + "data": { + "message": "Connection checked in", + "driverConnectionId": { + "$$type": [ + "int", + "long" + ] + }, + "serverHost": { + "$$type": "string" + }, + "serverPort": { + "$$type": [ + "int", + "long" + ] + } + } + }, + { + "level": "debug", + "component": "connection", + "data": { + "message": "Connection checkout started", + "serverHost": { + "$$type": "string" + }, + "serverPort": { + "$$type": [ + "int", + "long" + ] + } + } + }, + { + "level": "debug", + "component": "connection", + "data": { + "message": "Connection checked out", + "driverConnectionId": { + "$$type": [ + "int", + "long" + ] + }, + "serverHost": { + "$$type": "string" + }, + "serverPort": { + "$$type": [ + "int", + "long" + ] + }, + "durationMS": { + "$$type": [ + "double", + "int", + "long" + ] + } + } + }, + { + "level": "debug", + "component": "connection", + "data": { + "message": "Connection checked in", + "driverConnectionId": { + "$$type": [ + "int", + "long" + ] + }, + "serverHost": { + "$$type": "string" + }, + "serverPort": { + "$$type": [ + "int", + "long" + ] + } + } + }, + { + "level": "debug", + "component": "connection", + "data": { + "message": "Connection checkout started", + "serverHost": { + "$$type": "string" + }, + "serverPort": { + "$$type": [ + "int", + "long" + ] + } + } + }, + { + "level": "debug", + "component": "connection", + "data": { + "message": "Pending response started", + "serverHost": { + "$$type": "string" + }, + "serverPort": { + "$$type": [ + "int", + "long" + ] + }, + "driverConnectionId": { + "$$type": [ + "int", + "long" + ] + }, + "requestId": { + "$$type": [ + "int", + "long" + ] + } + } + }, + { + "level": "debug", + "component": "connection", + "data": { + "message": "Pending response failed", + "serverHost": { + "$$type": "string" + }, + "serverPort": { + "$$type": [ + "int", + "long" + ] + }, + "driverConnectionId": { + "$$type": [ + "int", + "long" + ] + }, + "requestId": { + "$$type": [ + "int", + "long" + ] + }, + "reason": "timeout" + } + }, + { + "level": "debug", + "component": "connection", + "data": { + "message": "Connection checkout failed", + "serverHost": { + "$$type": "string" + }, + "serverPort": { + "$$type": [ + "int", + "long" + ] + } + } + }, + { + "level": "debug", + "component": "connection", + "data": { + "message": "Connection checkout started", + "serverHost": { + "$$type": "string" + }, + "serverPort": { + "$$type": [ + "int", + "long" + ] + } + } + }, + { + "level": "debug", + "component": "connection", + "data": { + "message": "Pending response started", + "serverHost": { + "$$type": "string" + }, + "serverPort": { + "$$type": [ + "int", + "long" + ] + }, + "driverConnectionId": { + "$$type": [ + "int", + "long" + ] + }, + "requestId": { + "$$type": [ + "int", + "long" + ] + } + } + }, + { + "level": "debug", + "component": "connection", + "data": { + "message": "Pending response succeeded", + "serverHost": { + "$$type": "string" + }, + "serverPort": { + "$$type": [ + "int", + "long" + ] + }, + "driverConnectionId": { + "$$type": [ + "int", + "long" + ] + }, + "requestId": { + "$$type": [ + "int", + "long" + ] + }, + "durationMS": { + "$$type": [ + "double", + "int", + "long" + ] + } + } + }, + { + "level": "debug", + "component": "connection", + "data": { + "message": "Connection checked out", + "driverConnectionId": { + "$$type": [ + "int", + "long" + ] + }, + "serverHost": { + "$$type": "string" + }, + "serverPort": { + "$$type": [ + "int", + "long" + ] + }, + "durationMS": { + "$$type": [ + "double", + "int", + "long" + ] + } + } + }, + { + "level": "debug", + "component": "connection", + "data": { + "message": "Connection checked in", + "driverConnectionId": { + "$$type": [ + "int", + "long" + ] + }, + "serverHost": { + "$$type": "string" + }, + "serverPort": { + "$$type": [ + "int", + "long" + ] + } + } + } + ] + } + ] + } + ] +} diff --git a/specifications/connection-monitoring-and-pooling/tests/logging/connection-logging-csot.yml b/specifications/connection-monitoring-and-pooling/tests/logging/connection-logging-csot.yml new file mode 100644 index 00000000000..d9acfa70e18 --- /dev/null +++ b/specifications/connection-monitoring-and-pooling/tests/logging/connection-logging-csot.yml @@ -0,0 +1,254 @@ +description: "connection-logging" + +schemaVersion: "1.24" + +runOnRequirements: + - topologies: + - single # The number of log messages is different for each topology since there is a connection pool per host. + +createEntities: + - client: + id: &failPointClient failPointClient + +tests: + - description: "force a pending response read, fail first try, succeed second try" + operations: + - name: createEntities + object: testRunner + arguments: + entities: + - client: + id: &client client + uriOptions: + retryReads: false + appname: &clientAppName clientAppName + maxPoolSize: 1 + # use a high heartbeatFrequencyMS to avoid a successful monitor check marking the pool as + # ready (and emitting another event) during the course of test execution. + heartbeatFrequencyMS: 10000 + observeLogMessages: + connection: debug + - database: + id: &database test + client: *client + databaseName: *database + - collection: + id: &collection coll + database: *database + collectionName: *collection + + # Run a ping command to pre-load the pool with a connection. + - name: runCommand + object: *database + arguments: + command: + ping: 1 + commandName: ping + + # Create a failpoint to block the first operation + - name: failPoint + object: testRunner + arguments: + client: *failPointClient + failPoint: + configureFailPoint: failCommand + mode: { times: 1 } + data: + failCommands: ["insert"] + blockConnection: true + blockTimeMS: 300 + + # Execute operation with timeout less than block time + - name: insertOne + object: *collection + arguments: + timeoutMS: 50 + document: { _id: 3, x: 1 } + expectError: + isTimeoutError: true + + # Execute a subsequent operation which should time out during the + # pending response read attempt. + - name: find + object: *collection + arguments: + filter: { _id: 1 } + timeoutMS: 75 + expectError: + isTimeoutError: true + + # Wait for the blocked connection to free up. + - name: wait + # operation. + object: testRunner + arguments: + ms: 200 + + # Execute a subsequent operation which should completed the pending read. + - name: find + object: *collection + arguments: + filter: { _id: 1 } + + expectLogMessages: + - client: *client + ignoreExtraMessages: true + messages: + - level: debug + component: connection + data: + message: "Connection pool created" + serverHost: { $$type: string } + serverPort: { $$type: [int, long] } + + - level: debug + component: connection + data: + message: "Connection pool ready" + serverHost: { $$type: string } + serverPort: { $$type: [int, long] } + + # Ping + - level: debug + component: connection + data: + message: "Connection checkout started" + serverHost: { $$type: string } + serverPort: { $$type: [int, long] } + + - level: debug + component: connection + data: + message: "Connection created" + driverConnectionId: { $$type: [int, long] } + serverHost: { $$type: string } + serverPort: { $$type: [int, long] } + + - level: debug + component: connection + data: + message: "Connection ready" + driverConnectionId: { $$type: [int, long] } + serverHost: { $$type: string } + serverPort: { $$type: [int, long] } + durationMS: { $$type: [double, int, long] } + + - level: debug + component: connection + data: + message: "Connection checked out" + driverConnectionId: { $$type: [int, long] } + serverHost: { $$type: string } + serverPort: { $$type: [int, long] } + durationMS: { $$type: [double, int, long] } + + - level: debug + component: connection + data: + message: "Connection checked in" + driverConnectionId: { $$type: [int, long] } + serverHost: { $$type: string } + serverPort: { $$type: [int, long] } + + # Insert + - level: debug + component: connection + data: + message: "Connection checkout started" + serverHost: { $$type: string } + serverPort: { $$type: [int, long] } + + - level: debug + component: connection + data: + message: "Connection checked out" + driverConnectionId: { $$type: [int, long] } + serverHost: { $$type: string } + serverPort: { $$type: [int, long] } + durationMS: { $$type: [double, int, long] } + + - level: debug + component: connection + data: + message: "Connection checked in" + driverConnectionId: { $$type: [int, long] } + serverHost: { $$type: string } + serverPort: { $$type: [int, long] } + + # First Find + - level: debug + component: connection + data: + message: "Connection checkout started" + serverHost: { $$type: string } + serverPort: { $$type: [int, long] } + + - level: debug + component: connection + data: + message: "Pending response started" + serverHost: { $$type: string } + serverPort: { $$type: [int, long] } + driverConnectionId: { $$type: [int, long] } + requestId: { $$type: [int, long] } + + - level: debug + component: connection + data: + message: "Pending response failed" + serverHost: { $$type: string } + serverPort: { $$type: [int, long] } + driverConnectionId: { $$type: [int, long] } + requestId: { $$type: [int, long] } + reason: "timeout" + + - level: debug + component: connection + data: + message: "Connection checkout failed" + serverHost: { $$type: string } + serverPort: { $$type: [ int, long ] } + + # Second Find + - level: debug + component: connection + data: + message: "Connection checkout started" + serverHost: { $$type: string } + serverPort: { $$type: [int, long] } + + - level: debug + component: connection + data: + message: "Pending response started" + serverHost: { $$type: string } + serverPort: { $$type: [int, long] } + driverConnectionId: { $$type: [int, long] } + requestId: { $$type: [int, long] } + + - level: debug + component: connection + data: + message: "Pending response succeeded" + serverHost: { $$type: string } + serverPort: { $$type: [int, long] } + driverConnectionId: { $$type: [int, long] } + requestId: { $$type: [int, long] } + durationMS: { $$type: [double, int, long] } + + - level: debug + component: connection + data: + message: "Connection checked out" + driverConnectionId: { $$type: [int, long] } + serverHost: { $$type: string } + serverPort: { $$type: [int, long] } + durationMS: { $$type: [double, int, long] } + + - level: debug + component: connection + data: + message: "Connection checked in" + driverConnectionId: { $$type: [int, long] } + serverHost: { $$type: string } + serverPort: { $$type: [int, long] } diff --git a/src/MongoDB.Driver/Core/Compression/SnappyCompressor.cs b/src/MongoDB.Driver/Core/Compression/SnappyCompressor.cs index 4e84bbfd213..a4963ee3c1d 100644 --- a/src/MongoDB.Driver/Core/Compression/SnappyCompressor.cs +++ b/src/MongoDB.Driver/Core/Compression/SnappyCompressor.cs @@ -1,4 +1,4 @@ -/* Copyright 2019-present MongoDB Inc. +/* Copyright 2010-present MongoDB Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -34,7 +34,7 @@ public void Compress(Stream input, Stream output) { var uncompressedSize = (int)(input.Length - input.Position); var uncompressedBytes = new byte[uncompressedSize]; // does not include uncompressed message headers - input.ReadBytes(OperationContext.NoTimeout, uncompressedBytes, offset: 0, count: uncompressedSize, socketTimeout: Timeout.InfiniteTimeSpan); + input.ReadBytes(uncompressedBytes, offset: 0, count: uncompressedSize, CancellationToken.None); var maxCompressedSize = Snappy.GetMaxCompressedLength(uncompressedSize); var compressedBytes = new byte[maxCompressedSize]; var compressedSize = Snappy.Compress(uncompressedBytes, compressedBytes); @@ -50,7 +50,7 @@ public void Decompress(Stream input, Stream output) { var compressedSize = (int)(input.Length - input.Position); var compressedBytes = new byte[compressedSize]; - input.ReadBytes(OperationContext.NoTimeout, compressedBytes, offset: 0, count: compressedSize, socketTimeout: Timeout.InfiniteTimeSpan); + input.ReadBytes(compressedBytes, offset: 0, count: compressedSize, CancellationToken.None); var uncompressedSize = Snappy.GetUncompressedLength(compressedBytes); var decompressedBytes = new byte[uncompressedSize]; var decompressedSize = Snappy.Decompress(compressedBytes, decompressedBytes); diff --git a/src/MongoDB.Driver/Core/ConnectionPools/ExclusiveConnectionPool.Helpers.cs b/src/MongoDB.Driver/Core/ConnectionPools/ExclusiveConnectionPool.Helpers.cs index 151cb8719e4..ae2c0a6aa01 100644 --- a/src/MongoDB.Driver/Core/ConnectionPools/ExclusiveConnectionPool.Helpers.cs +++ b/src/MongoDB.Driver/Core/ConnectionPools/ExclusiveConnectionPool.Helpers.cs @@ -43,7 +43,7 @@ private TimeSpan CalculateRemainingTimeout(TimeSpan timeout, Stopwatch stopwatch var elapsed = stopwatch.Elapsed; var remainingTimeout = timeout - elapsed; - if (remainingTimeout < TimeSpan.Zero) + if (remainingTimeout <= TimeSpan.FromMilliseconds(1)) { throw CreateTimeoutException(elapsed, $"Timed out waiting for a connection after {elapsed.TotalMilliseconds}ms."); } @@ -423,6 +423,10 @@ public void Dispose() } } + public void EnsureConnectionReady(OperationContext operationContext) => _connection.EnsureConnectionReady(operationContext); + + public Task EnsureConnectionReadyAsync(OperationContext operationContext) => _connection.EnsureConnectionReadyAsync(operationContext); + public void Open(OperationContext operationContext) { try @@ -604,6 +608,18 @@ public IConnectionHandle Fork() return new AcquiredConnection(_connectionPool, _reference); } + public void EnsureConnectionReady(OperationContext operationContext) + { + ThrowIfDisposed(); + _reference.Instance.EnsureConnectionReady(operationContext); + } + + public Task EnsureConnectionReadyAsync(OperationContext operationContext) + { + ThrowIfDisposed(); + return _reference.Instance.EnsureConnectionReadyAsync(operationContext); + } + public void Open(OperationContext operationContext) { ThrowIfDisposed(); @@ -888,29 +904,35 @@ public PooledConnection CreateOpened(TimeSpan maxConnectingQueueTimeout, Cancell public PooledConnection CreateOpenedOrReuse(OperationContext operationContext, TimeSpan waitQueueTimeout) { + using var waitQueueOperationContext = operationContext.WithTimeout(waitQueueTimeout); try { var connection = _pool._connectionHolder.Acquire(); - var stopwatch = Stopwatch.StartNew(); + connection = EnsureConnectionReady(operationContext, this, connection); while (connection == null) { _pool._poolState.ThrowIfNotReady(); - var waitTimeout = _pool.CalculateRemainingTimeout(waitQueueTimeout, stopwatch); + if (waitQueueOperationContext.IsCancelledOrTimedOut()) + { + throw CreateTimeoutException(waitQueueOperationContext.Elapsed); + } // Try to acquire connecting semaphore. Possible operation results: // Entered: The request was successfully fulfilled, and a connection establishment can start // Signaled: The request was interrupted because Connection was return to pool and can be reused // Timeout: The request was timed out after WaitQueueTimeout period. - _connectingWaitStatus = _pool._maxConnectingQueue.WaitSignaled(waitTimeout, operationContext.CancellationToken); + _connectingWaitStatus = _pool._maxConnectingQueue.WaitSignaled(waitQueueOperationContext.RemainingTimeout, operationContext.CancellationToken); connection = _connectingWaitStatus switch { SemaphoreSlimSignalable.SemaphoreWaitResult.Signaled => _pool._connectionHolder.Acquire(), SemaphoreSlimSignalable.SemaphoreWaitResult.Entered => CreateOpenedInternal(operationContext), - SemaphoreSlimSignalable.SemaphoreWaitResult.TimedOut => throw CreateTimeoutException(stopwatch.Elapsed), + SemaphoreSlimSignalable.SemaphoreWaitResult.TimedOut => throw CreateTimeoutException(waitQueueOperationContext.Elapsed), _ => throw new InvalidOperationException($"Invalid wait result {_connectingWaitStatus}") }; + + connection = EnsureConnectionReady(waitQueueOperationContext, this, connection); } return connection; @@ -920,33 +942,68 @@ public PooledConnection CreateOpenedOrReuse(OperationContext operationContext, T _pool._connectionExceptionHandler.HandleExceptionOnOpen(ex); throw; } + + static PooledConnection EnsureConnectionReady(OperationContext operationContext, ConnectionCreator connectionCreator, PooledConnection connection) + { + if (connection == null) + { + return null; + } + + try + { + connection.EnsureConnectionReady(operationContext); + } + catch (Exception) + { + if (connection.IsExpired) + { + connectionCreator._pool._connectionHolder.RemoveConnection(connection); + } + else + { + connectionCreator._pool._connectionHolder.Return(connection); + } + + return null; + } + + return connection; + } } public async Task CreateOpenedOrReuseAsync(OperationContext operationContext, TimeSpan waitQueueTimeout) { + using var waitQueueOperationContext = operationContext.WithTimeout(waitQueueTimeout); + try { var connection = _pool._connectionHolder.Acquire(); - var stopwatch = Stopwatch.StartNew(); + connection = await EnsureConnectionReadyAsync(waitQueueOperationContext, this, connection).ConfigureAwait(false); while (connection == null) { _pool._poolState.ThrowIfNotReady(); + if (waitQueueOperationContext.IsCancelledOrTimedOut()) + { + throw CreateTimeoutException(waitQueueOperationContext.Elapsed); + } - var waitTimeout = _pool.CalculateRemainingTimeout(waitQueueTimeout, stopwatch); // Try to acquire connecting semaphore. Possible operation results: // Entered: The request was successfully fulfilled, and a connection establishment can start // Signaled: The request was interrupted because Connection was return to pool and can be reused // Timeout: The request was timed out after WaitQueueTimeout period. - _connectingWaitStatus = await _pool._maxConnectingQueue.WaitSignaledAsync(waitTimeout, operationContext.CancellationToken).ConfigureAwait(false); + _connectingWaitStatus = await _pool._maxConnectingQueue.WaitSignaledAsync(waitQueueOperationContext.RemainingTimeout, operationContext.CancellationToken).ConfigureAwait(false); connection = _connectingWaitStatus switch { SemaphoreSlimSignalable.SemaphoreWaitResult.Signaled => _pool._connectionHolder.Acquire(), SemaphoreSlimSignalable.SemaphoreWaitResult.Entered => await CreateOpenedInternalAsync(operationContext).ConfigureAwait(false), - SemaphoreSlimSignalable.SemaphoreWaitResult.TimedOut => throw CreateTimeoutException(stopwatch.Elapsed), + SemaphoreSlimSignalable.SemaphoreWaitResult.TimedOut => throw CreateTimeoutException(waitQueueOperationContext.Elapsed), _ => throw new InvalidOperationException($"Invalid wait result {_connectingWaitStatus}") }; + + connection = await EnsureConnectionReadyAsync(waitQueueOperationContext, this, connection).ConfigureAwait(false); } return connection; @@ -956,6 +1013,34 @@ public async Task CreateOpenedOrReuseAsync(OperationContext op _pool._connectionExceptionHandler.HandleExceptionOnOpen(ex); throw; } + + static async Task EnsureConnectionReadyAsync(OperationContext operationContext, ConnectionCreator connectionCreator, PooledConnection connection) + { + if (connection == null) + { + return null; + } + + try + { + await connection.EnsureConnectionReadyAsync(operationContext).ConfigureAwait(false); + } + catch (Exception) + { + if (connection.IsExpired) + { + connectionCreator._pool._connectionHolder.RemoveConnection(connection); + } + else + { + connectionCreator._pool._connectionHolder.Return(connection); + } + + return null; + } + + return connection; + } } public void Dispose() diff --git a/src/MongoDB.Driver/Core/Connections/BinaryConnection.cs b/src/MongoDB.Driver/Core/Connections/BinaryConnection.cs index a775f7dff0d..a633fcf8ac7 100644 --- a/src/MongoDB.Driver/Core/Connections/BinaryConnection.cs +++ b/src/MongoDB.Driver/Core/Connections/BinaryConnection.cs @@ -50,6 +50,7 @@ internal sealed class BinaryConnection : IConnection private DateTime _openedAtUtc; private readonly object _openLock = new object(); private Task _openTask; + private readonly ResponseHelper _responseHelper; private CompressorType? _sendCompressorType; private readonly ConnectionSettings _settings; private readonly TimeSpan _socketReadTimeout; @@ -86,6 +87,7 @@ public BinaryConnection( _commandEventHelper = new CommandEventHelper(loggerFactory.CreateEventLogger(eventSubscriber)); _socketReadTimeout = socketReadTimeout; _socketWriteTimeout = socketWriteTimeout; + _responseHelper = new ResponseHelper(this, SystemClock.Instance); } // properties @@ -191,14 +193,26 @@ private void Dispose(bool disposing) } } - private void EnsureMessageSizeIsValid(int messageSize) + public void EnsureConnectionReady(OperationContext operationContext) { - var maxMessageSize = _description?.MaxMessageSize ?? 48000000; + if (!_responseHelper.HasPendingReads) + { + return; + } + + ThrowIfCancelledOrDisposed(operationContext); + _responseHelper.DiscardPendingReads(operationContext); + } - if (messageSize < 0 || messageSize > maxMessageSize) + public Task EnsureConnectionReadyAsync(OperationContext operationContext) + { + if (!_responseHelper.HasPendingReads) { - throw new FormatException("The size of the message is invalid."); + return Task.CompletedTask; } + + ThrowIfCancelledOrDisposed(operationContext); + return _responseHelper.DiscardPendingReadsAsync(operationContext); } public void Open(OperationContext operationContext) @@ -340,56 +354,6 @@ private void InvalidateAuthenticator() } } - private IByteBuffer ReceiveBuffer(OperationContext operationContext) - { - try - { - var messageSizeBytes = new byte[4]; - _stream.ReadBytes(operationContext, messageSizeBytes, 0, 4, _socketReadTimeout); - var messageSize = BinaryPrimitives.ReadInt32LittleEndian(messageSizeBytes); - EnsureMessageSizeIsValid(messageSize); - var inputBufferChunkSource = new InputBufferChunkSource(BsonChunkPool.Default); - var buffer = ByteBufferFactory.Create(inputBufferChunkSource, messageSize); - buffer.Length = messageSize; - buffer.SetBytes(0, messageSizeBytes, 0, 4); - _stream.ReadBytes(operationContext, buffer, 4, messageSize - 4, _socketReadTimeout); - _lastUsedAtUtc = DateTime.UtcNow; - buffer.MakeReadOnly(); - return buffer; - } - catch (Exception ex) - { - var wrappedException = WrapExceptionIfRequired(operationContext, ex, "receiving a message from the server"); - ConnectionFailed(wrappedException ?? ex); - if (wrappedException == null) { throw; } else { throw wrappedException; } - } - } - - private async Task ReceiveBufferAsync(OperationContext operationContext) - { - try - { - var messageSizeBytes = new byte[4]; - await _stream.ReadBytesAsync(operationContext, messageSizeBytes, 0, 4, _socketReadTimeout).ConfigureAwait(false); - var messageSize = BinaryPrimitives.ReadInt32LittleEndian(messageSizeBytes); - EnsureMessageSizeIsValid(messageSize); - var inputBufferChunkSource = new InputBufferChunkSource(BsonChunkPool.Default); - var buffer = ByteBufferFactory.Create(inputBufferChunkSource, messageSize); - buffer.Length = messageSize; - buffer.SetBytes(0, messageSizeBytes, 0, 4); - await _stream.ReadBytesAsync(operationContext, buffer, 4, messageSize - 4, _socketReadTimeout).ConfigureAwait(false); - _lastUsedAtUtc = DateTime.UtcNow; - buffer.MakeReadOnly(); - return buffer; - } - catch (Exception ex) - { - var wrappedException = WrapExceptionIfRequired(operationContext, ex, "receiving a message from the server"); - ConnectionFailed(wrappedException ?? ex); - if (wrappedException == null) { throw; } else { throw wrappedException; } - } - } - public ResponseMessage ReceiveMessage( OperationContext operationContext, int responseTo, @@ -405,7 +369,7 @@ public ResponseMessage ReceiveMessage( helper.ReceivingMessage(); while (true) { - using (var buffer = ReceiveBuffer(operationContext)) + using (var buffer = _responseHelper.ReadResponse(operationContext)) { if (responseTo != GetResponseTo(buffer)) { @@ -414,14 +378,21 @@ public ResponseMessage ReceiveMessage( var message = helper.DecodeMessage(operationContext, buffer, encoderSelector); helper.ReceivedMessage(buffer, message); + _lastUsedAtUtc = DateTime.UtcNow; return message; } } } catch (Exception ex) { + var wrappedException = WrapExceptionIfRequired(operationContext, ex, "receiving a message from the server"); + if (ex is not (TimeoutException or OperationCanceledException or TaskCanceledException)) + { + ConnectionFailed(wrappedException ?? ex); + } + helper.FailedReceivingMessage(ex); - throw; + if (wrappedException == null) { throw; } else { throw wrappedException; } } } @@ -440,7 +411,7 @@ public async Task ReceiveMessageAsync( helper.ReceivingMessage(); while (true) { - using (var buffer = await ReceiveBufferAsync(operationContext).ConfigureAwait(false)) + using (var buffer = await _responseHelper.ReadResponseAsync(operationContext).ConfigureAwait(false)) { if (responseTo != GetResponseTo(buffer)) { @@ -449,14 +420,21 @@ public async Task ReceiveMessageAsync( var message = helper.DecodeMessage(operationContext, buffer, encoderSelector); helper.ReceivedMessage(buffer, message); + _lastUsedAtUtc = DateTime.UtcNow; return message; } } } catch (Exception ex) { + var wrappedException = WrapExceptionIfRequired(operationContext, ex, "receiving a message from the server"); + if (ex is not (TimeoutException or OperationCanceledException or TaskCanceledException)) + { + ConnectionFailed(wrappedException ?? ex); + } + helper.FailedReceivingMessage(ex); - throw; + if (wrappedException == null) { throw; } else { throw wrappedException; } } } @@ -532,7 +510,8 @@ public void SendMessage(OperationContext operationContext, RequestMessage messag SendBuffer(operationContext, uncompressedBuffer); sentLength = uncompressedBuffer.Length; } - helper.SentMessage(sentLength); + + helper.SentMessage(message, sentLength); } } catch (Exception ex) @@ -568,7 +547,8 @@ public async Task SendMessageAsync(OperationContext operationContext, RequestMes await SendBufferAsync(operationContext, uncompressedBuffer).ConfigureAwait(false); sentLength = uncompressedBuffer.Length; } - helper.SentMessage(sentLength); + + helper.SentMessage(message, sentLength); } } catch (Exception ex) @@ -664,11 +644,6 @@ private void ThrowIfDisposed() private Exception WrapExceptionIfRequired(OperationContext operationContext, Exception ex, string action) { - if (ex is TimeoutException && operationContext.IsRootContextTimeoutConfigured()) - { - return null; - } - if (ex is ThreadAbortException || ex is StackOverflowException || ex is MongoAuthenticationException || @@ -753,6 +728,376 @@ public void OpeningConnection() } } +#pragma warning disable CA1001 + private sealed class ResponseHelper +#pragma warning restore CA1001 + { + private const int MessageSizePrefixLength = 4; + private static readonly TimeSpan __pendingReadsWindow = TimeSpan.FromSeconds(3); + + private readonly IClock _clock; + private readonly BinaryConnection _connection; + private readonly byte[] _responseHeaderBuffer = new byte[MessageSizePrefixLength]; + private readonly ManualResetEventSlim _manualResetEventSlim = new ManualResetEventSlim(); + private IByteBuffer _byteBuffer; + private int _bytesRead; + private Exception _readException; + private long _lastReadTimestamp; + private long? _pendingRequestId; + private Task _pendingReadTask; + + public ResponseHelper(BinaryConnection connection, IClock clock) + { + _connection = connection; + _clock = clock; + } + + public bool HasPendingReads => _pendingRequestId.HasValue; + + public void DiscardPendingReads(OperationContext operationContext) + { + if (!_pendingRequestId.HasValue) + { + throw new InvalidOperationException(); + } + + ReadingRendingResponseStarted(); + _pendingReadTask ??= ConfigurePendingReadsTask(ContinueReadAsync()); + using var pendingReadsOperationContext = CreatePendingReadsContext(operationContext); + + try + { + pendingReadsOperationContext.WaitTask(_pendingReadTask); + ReadingRendingResponseDone(pendingReadsOperationContext.Elapsed); + ClearPendingReads(); + } + catch (Exception ex) + { + ReadingRendingResponseFailed(pendingReadsOperationContext.Elapsed, ex); + HandlePendingReadsException(ex); + throw; + } + } + + public async Task DiscardPendingReadsAsync(OperationContext operationContext) + { + if (!_pendingRequestId.HasValue) + { + throw new InvalidOperationException(); + } + + ReadingRendingResponseStarted(); + _pendingReadTask ??= ConfigurePendingReadsTask(ContinueReadAsync()); + using var pendingReadsOperationContext = CreatePendingReadsContext(operationContext); + + try + { + await pendingReadsOperationContext.WaitTaskAsync(_pendingReadTask).ConfigureAwait(false); + ReadingRendingResponseDone(pendingReadsOperationContext.Elapsed); + ClearPendingReads(); + } + catch (Exception ex) + { + ReadingRendingResponseFailed(pendingReadsOperationContext.Elapsed, ex); + HandlePendingReadsException(ex); + throw; + } + } + + public IByteBuffer ReadResponse(OperationContext operationContext) + { + // TODO: remove this operationContext fork together with removal of SocketTimeout. + using var readOperationContext = operationContext.Timeout != null ? operationContext.Fork() : operationContext.WithTimeout(_connection._socketReadTimeout); + + try + { + var messageSize = ReadMessageSize(readOperationContext); + _byteBuffer = PrepareBuffer(_responseHeaderBuffer, messageSize); + + while (messageSize > _bytesRead) + { + var backingBytes = _byteBuffer.AccessBackingBytes(_bytesRead); + var bytesToRead = Math.Min(messageSize - _bytesRead, backingBytes.Count); + + ReadBytes(readOperationContext, backingBytes.Array, backingBytes.Offset, bytesToRead); + } + + _byteBuffer.MakeReadOnly(); + var result = _byteBuffer; + ClearPendingReads(); + return result; + } + catch (Exception ex) + { + if (ex is not (TimeoutException or OperationCanceledException or TaskCanceledException)) + { + // Should not dispose buffer here, as it is passed into ContinueReadAsync + _byteBuffer?.Dispose(); + } + + throw; + } + } + + public async Task ReadResponseAsync(OperationContext operationContext) + { + var readTask = ContinueReadAsync(); + + // TODO: remove this operationContext fork together with removal of SocketTimeout. + using var readOperationContext = operationContext.Timeout != null ? operationContext.Fork() : operationContext.WithTimeout(_connection._socketReadTimeout); + + try + { + await readOperationContext.WaitTaskAsync(readTask).ConfigureAwait(false); + var result = await readTask.ConfigureAwait(false); + ClearPendingReads(); + return result; + } + catch (Exception ex) when (ex is TimeoutException or TaskCanceledException or TaskCanceledException) + { + _pendingReadTask = ConfigurePendingReadsTask(readTask); + throw; + } + } + + public void SetPendingReads(long requestId) + { + if (_pendingRequestId.HasValue) + { + throw new InvalidOperationException("There is already pending reads on the connection."); + } + + _pendingRequestId = requestId; + } + + private OperationContext CreatePendingReadsContext(OperationContext operationContext) + { + var spend = _clock.TimeSince(_lastReadTimestamp); + var pendingReadsRemainingTimeout = __pendingReadsWindow - spend; + if (pendingReadsRemainingTimeout < TimeSpan.Zero) + { + pendingReadsRemainingTimeout = TimeSpan.Zero; + } + + return operationContext.WithTimeout(pendingReadsRemainingTimeout); + } + + private void HandlePendingReadsException(Exception ex) + { + if (ex is TimeoutException && __pendingReadsWindow - _clock.TimeSince(_lastReadTimestamp) > TimeSpan.FromMilliseconds(1)) + { + // Do nothing here?? accordingly to spec if there were any bytes read - we should return it to the pool + return; + } + + _connection.ConnectionFailed(ex); + } + + private void ClearPendingReads() + { + _readException = null; + _bytesRead = 0; + _byteBuffer = null; + _pendingRequestId = null; + _pendingReadTask = null; + _lastReadTimestamp = 0; + } + + private Task ConfigurePendingReadsTask(Task readTask) + { + _lastReadTimestamp = _clock.GetTimestamp(); + readTask.ContinueWith(t => + { + // This continuation will release the buffer once response is fully read + // and also inspects Exception property to prevent UnobservedException. + if (t.Exception == null) + { + t.Result.Dispose(); + } + }); + return readTask; + } + + private async Task ContinueReadAsync(IAsyncResult asyncResult = null) + { + try + { + if (asyncResult != null) + { + var asyncResultTask = Task.Factory.FromAsync(asyncResult, _ => { }); + await asyncResultTask.ConfigureAwait(false); + + if (_readException != null) + { + throw _readException; + } + } + + var messageSize = await ReadMessageSizeAsync().ConfigureAwait(false); + + _byteBuffer ??= PrepareBuffer(_responseHeaderBuffer, messageSize); + + while (messageSize > _bytesRead) + { + var backingBytes = _byteBuffer.AccessBackingBytes(_bytesRead); + var bytesToRead = Math.Min(messageSize - _bytesRead, backingBytes.Count); + await ReadBytesAsync(backingBytes.Array, backingBytes.Offset, bytesToRead).ConfigureAwait(false); + } + + _byteBuffer.MakeReadOnly(); + return _byteBuffer; + } + catch (Exception) + { + _byteBuffer?.Dispose(); + throw; + } + } + + private void EnsureMessageSizeIsValid(int messageSize) + { + var maxMessageSize = _connection._description?.MaxMessageSize ?? 48000000; + + if (messageSize < 0 || messageSize > maxMessageSize) + { + throw new FormatException("The size of the message is invalid."); + } + } + + private IByteBuffer PrepareBuffer(byte[] messageSizeBytes, int messageSize) + { + var inputBufferChunkSource = new InputBufferChunkSource(BsonChunkPool.Default); + var buffer = ByteBufferFactory.Create(inputBufferChunkSource, messageSize); + buffer.Length = messageSize; + buffer.SetBytes(0, messageSizeBytes, 0, MessageSizePrefixLength); // Do we really need message size in the result buffer? + return buffer; + } + + private int ReadMessageSize(OperationContext operationContext) + { + operationContext.ThrowIfTimedOutOrCanceled(); + + ReadBytes(operationContext, _responseHeaderBuffer, 0, MessageSizePrefixLength); + var messageSize = BinaryPrimitives.ReadInt32LittleEndian(_responseHeaderBuffer); + EnsureMessageSizeIsValid(messageSize); + + return messageSize; + } + + private async Task ReadMessageSizeAsync() + { + if (_bytesRead < MessageSizePrefixLength) + { + await ReadBytesAsync(_responseHeaderBuffer, _bytesRead, MessageSizePrefixLength - _bytesRead).ConfigureAwait(false); + } + + var messageSize = BinaryPrimitives.ReadInt32LittleEndian(_responseHeaderBuffer); + EnsureMessageSizeIsValid(messageSize); + + return messageSize; + } + + private void ReadingRendingResponseStarted() + { + _connection._eventLogger.LogAndPublish( + new ConnectionReadingPendingResponseEvent(_connection._connectionId, _pendingRequestId.Value)); + } + + private void ReadingRendingResponseFailed(TimeSpan duration, Exception ex) + { + _connection._eventLogger.LogAndPublish( + new ConnectionReadingPendingResponseFailedEvent(_connection._connectionId, _pendingRequestId.Value, duration, ex)); + } + + private void ReadingRendingResponseDone(TimeSpan duration) + { + _connection._eventLogger.LogAndPublish( + new ConnectionReadPendingResponseEvent(_connection._connectionId, _pendingRequestId.Value, duration)); + } + + private void ReadBytes(OperationContext operationContext, byte[] destination, int offset, int count) + { + var bytesRead = 0; + var initialBytesRead = _bytesRead; + while (bytesRead < count) + { + _manualResetEventSlim.Reset(); + + var operation = _connection._stream.BeginRead( + destination, + offset + bytesRead, + count - bytesRead, + result => + { + var responseHelper = (ResponseHelper)result.AsyncState; + int readResult = 0; + try + { + + readResult = responseHelper._connection._stream.EndRead(result); + } + catch (ObjectDisposedException ex) + { + _readException = new EndOfStreamException(ex.Message, ex); + } + catch (Exception ex) + { + // Async handler should not throw any errors, we mark the current read as failed so appropriate exception could be thrown later. + _readException = ex; + } + + if (readResult == 0) + { + _readException ??= new EndOfStreamException(); + } + + responseHelper._bytesRead += readResult; + responseHelper._lastReadTimestamp = responseHelper._clock.GetTimestamp(); + responseHelper._manualResetEventSlim.Set(); + }, + this); + + if (!_manualResetEventSlim.Wait(operationContext.RemainingTimeout, operationContext.CancellationToken)) + { + _pendingReadTask = ConfigurePendingReadsTask(ContinueReadAsync(operation)); + operationContext.CancellationToken.ThrowIfCancellationRequested(); + throw new TimeoutException(); + } + + if (_readException != null) + { + throw _readException; + } + + bytesRead = _bytesRead - initialBytesRead; + } + } + + private async Task ReadBytesAsync(byte[] destination, int offset, int count) + { + try + { + var bytesRead = 0; + while (bytesRead < count) + { + var readResult = await _connection._stream.ReadAsync(destination, offset + bytesRead, count - bytesRead).ConfigureAwait(false); + if (readResult == 0) + { + throw new EndOfStreamException(); + } + + bytesRead += readResult; + _bytesRead += readResult; + _lastReadTimestamp = _clock.GetTimestamp(); + } + } + catch (ObjectDisposedException ex) + { + throw new EndOfStreamException(ex.Message, ex); + } + } + } + private class ReceiveMessageHelper { private readonly ICompressorSource _compressorSource; @@ -914,11 +1259,16 @@ public void SendingMessage(IByteBuffer buffer) _networkStopwatch = Stopwatch.StartNew(); } - public void SentMessage(int bufferLength) + public void SentMessage(RequestMessage message, int bufferLength) { _networkStopwatch.Stop(); var networkDuration = _networkStopwatch.Elapsed; + if (message.ResponseExpected) + { + _connection._responseHelper.SetPendingReads(message.RequestId); + } + if (_connection._commandEventHelper.ShouldCallAfterSending) { _connection._commandEventHelper.AfterSending(_message, _connection._connectionId, _connection.Description?.ServiceId, _connection.IsInitializing); diff --git a/src/MongoDB.Driver/Core/Connections/IConnection.cs b/src/MongoDB.Driver/Core/Connections/IConnection.cs index b61f0c24075..ed21fe6ea29 100644 --- a/src/MongoDB.Driver/Core/Connections/IConnection.cs +++ b/src/MongoDB.Driver/Core/Connections/IConnection.cs @@ -31,6 +31,8 @@ internal interface IConnection : IDisposable bool IsExpired { get; } ConnectionSettings Settings { get; } + void EnsureConnectionReady(OperationContext operationContext); + Task EnsureConnectionReadyAsync(OperationContext operationContext); void Open(OperationContext operationContext); Task OpenAsync(OperationContext operationContext); void Reauthenticate(OperationContext operationContext); diff --git a/src/MongoDB.Driver/Core/Events/ConnectionReadPendingResponseEvent.cs b/src/MongoDB.Driver/Core/Events/ConnectionReadPendingResponseEvent.cs new file mode 100644 index 00000000000..997ee4baf92 --- /dev/null +++ b/src/MongoDB.Driver/Core/Events/ConnectionReadPendingResponseEvent.cs @@ -0,0 +1,60 @@ +/* Copyright 2010-present MongoDB Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +using System; +using MongoDB.Driver.Core.Connections; + +namespace MongoDB.Driver.Core.Events +{ + /// + /// Emitted when the connection being checked out is attempting to read and discard a pending server response. + /// + public struct ConnectionReadPendingResponseEvent : IEvent + { + /// + /// Initializes a new instance of the struct. + /// + /// The connection identifier. + /// The driver-generated request ID associated with the network timeout. + /// The time it took to complete the pending read. + public ConnectionReadPendingResponseEvent( + ConnectionId connectionId, + long requestId, + TimeSpan duration) + { + ConnectionId = connectionId; + RequestId = requestId; + Duration = duration; + } + + /// + /// The connection identifier. + /// + public ConnectionId ConnectionId { get; } + + /// + /// The time it took to complete the pending read. + /// + public TimeSpan Duration { get; } + + /// + /// The driver-generated request ID associated with the network timeout. + /// + public long RequestId { get; } + + EventType IEvent.Type => EventType.ConnectionReadPendingResponse; + } +} + diff --git a/src/MongoDB.Driver/Core/Events/ConnectionReadingPendingResponseEvent.cs b/src/MongoDB.Driver/Core/Events/ConnectionReadingPendingResponseEvent.cs new file mode 100644 index 00000000000..8b74346eb50 --- /dev/null +++ b/src/MongoDB.Driver/Core/Events/ConnectionReadingPendingResponseEvent.cs @@ -0,0 +1,51 @@ +/* Copyright 2010-present MongoDB Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +using MongoDB.Driver.Core.Connections; + +namespace MongoDB.Driver.Core.Events +{ + /// + /// Emitted when the connection being checked out is attempting to read and discard a pending server response. + /// + public struct ConnectionReadingPendingResponseEvent : IEvent + { + /// + /// Initializes a new instance of the struct. + /// + /// The connection identifier. + /// The driver-generated request ID associated with the network timeout. + public ConnectionReadingPendingResponseEvent( + ConnectionId connectionId, + long requestId) + { + ConnectionId = connectionId; + RequestId = requestId; + } + + /// + /// The connection identifier. + /// + public ConnectionId ConnectionId { get; } + + /// + /// The driver-generated request ID associated with the network timeout. + /// + public long RequestId { get; } + + EventType IEvent.Type => EventType.ConnectionReadingPendingResponse; + } +} + diff --git a/src/MongoDB.Driver/Core/Events/ConnectionReadingPendingResponseFailedEvent.cs b/src/MongoDB.Driver/Core/Events/ConnectionReadingPendingResponseFailedEvent.cs new file mode 100644 index 00000000000..fcd0e766db4 --- /dev/null +++ b/src/MongoDB.Driver/Core/Events/ConnectionReadingPendingResponseFailedEvent.cs @@ -0,0 +1,68 @@ +/* Copyright 2010-present MongoDB Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +using System; +using MongoDB.Driver.Core.Connections; + +namespace MongoDB.Driver.Core.Events +{ + /// + /// Emitted when the connection being checked out is attempting to read and discard a pending server response. + /// + public struct ConnectionReadingPendingResponseFailedEvent : IEvent + { + /// + /// Initializes a new instance of the struct. + /// + /// The connection identifier. + /// The driver-generated request ID associated with the network timeout. + /// Duration until the pending response drain failed. + /// The reason for why the pending read failed. + public ConnectionReadingPendingResponseFailedEvent( + ConnectionId connectionId, + long requestId, + TimeSpan duration, + Exception exception) + { + ConnectionId = connectionId; + RequestId = requestId; + Duration = duration; + Exception = exception; + } + + /// + /// The connection identifier. + /// + public ConnectionId ConnectionId { get; } + + /// + /// Duration until the pending response drain failed. + /// + public TimeSpan Duration { get; } + + /// + /// The reason for why the pending read failed. + /// + public Exception Exception { get; } + + /// + /// The driver-generated request ID associated with the network timeout. + /// + public long RequestId { get; } + + EventType IEvent.Type => EventType.ConnectionReadingPendingResponseFailed; + } +} + diff --git a/src/MongoDB.Driver/Core/Events/EventType.cs b/src/MongoDB.Driver/Core/Events/EventType.cs index 3e6e8865c31..2df85781702 100644 --- a/src/MongoDB.Driver/Core/Events/EventType.cs +++ b/src/MongoDB.Driver/Core/Events/EventType.cs @@ -56,6 +56,9 @@ internal enum EventType ConnectionPoolReady, ConnectionPoolRemovedConnection, ConnectionPoolRemovingConnection, + ConnectionReadingPendingResponse, + ConnectionReadingPendingResponseFailed, + ConnectionReadPendingResponse, ConnectionReceivedMessage, ConnectionReceivingMessage, ConnectionReceivingMessageFailed, diff --git a/src/MongoDB.Driver/Core/Logging/StructuredLogTemplateProviders.cs b/src/MongoDB.Driver/Core/Logging/StructuredLogTemplateProviders.cs index fcdce3efaa9..d68ed80d26c 100644 --- a/src/MongoDB.Driver/Core/Logging/StructuredLogTemplateProviders.cs +++ b/src/MongoDB.Driver/Core/Logging/StructuredLogTemplateProviders.cs @@ -176,6 +176,13 @@ public static object[] GetParams(ConnectionId connectionId, object arg1, object return new[] { connectionId.ServerId.ClusterId.Value, connectionId.LongLocalValue, host, port, connectionId.LongServerValue, arg1, arg2 }; } + public static object[] GetParams(ConnectionId connectionId, object arg1, object arg2, object arg3) + { + var (host, port) = connectionId.ServerId.EndPoint.GetHostAndPort(); + + return new[] { connectionId.ServerId.ClusterId.Value, connectionId.LongLocalValue, host, port, connectionId.LongServerValue, arg1, arg2, arg3 }; + } + public static object[] GetParams(ConnectionId connectionId, object arg1, object arg2, object arg3, object arg4) { var (host, port) = connectionId.ServerId.EndPoint.GetHostAndPort(); diff --git a/src/MongoDB.Driver/Core/Logging/StructuredLogTemplateProvidersConnection.cs b/src/MongoDB.Driver/Core/Logging/StructuredLogTemplateProvidersConnection.cs index 10c7ba60b0d..4bfe686506b 100644 --- a/src/MongoDB.Driver/Core/Logging/StructuredLogTemplateProvidersConnection.cs +++ b/src/MongoDB.Driver/Core/Logging/StructuredLogTemplateProvidersConnection.cs @@ -13,6 +13,7 @@ * limitations under the License. */ +using System; using Microsoft.Extensions.Logging; using MongoDB.Driver.Core.Events; @@ -39,6 +40,21 @@ private static void AddConnectionTemplates() ConnectionCommonParams(), (e, _) => GetParams(e.ConnectionId, "Connection added")); + AddTemplateProvider( + LogLevel.Debug, + ConnectionCommonParams(RequestId), + (e, _) => GetParams(e.ConnectionId, "Pending response started", e.RequestId)); + + AddTemplateProvider( + LogLevel.Debug, + ConnectionCommonParams(RequestId, DurationMS), + (e, _) => GetParams(e.ConnectionId, "Pending response succeeded", e.RequestId, e.Duration.TotalMilliseconds)); + + AddTemplateProvider( + LogLevel.Debug, + ConnectionCommonParams(RequestId, DurationMS, Reason), + (e, _) => GetParams(e.ConnectionId, "Pending response failed", e.RequestId, e.Duration.TotalMilliseconds, MapExceptionToReason(e.Exception))); + AddTemplateProvider( LogLevel.Debug, ConnectionCommonParams(DurationMS), @@ -103,6 +119,20 @@ private static void AddConnectionTemplates() LogLevel.Trace, ConnectionCommonParams(), (e, _) => GetParams(e.ConnectionId, "Sent")); + + static string MapExceptionToReason(Exception exception) + { + if (exception == null) + { + return ""; + } + + return exception switch + { + TimeoutException => "timeout", + _ => "error" + }; + } } } } diff --git a/src/MongoDB.Driver/Core/Misc/IClockExtensions.cs b/src/MongoDB.Driver/Core/Misc/IClockExtensions.cs new file mode 100644 index 00000000000..da21c402dfa --- /dev/null +++ b/src/MongoDB.Driver/Core/Misc/IClockExtensions.cs @@ -0,0 +1,28 @@ +/* Copyright 2010-present MongoDB Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +using System; + +namespace MongoDB.Driver.Core.Misc; + +internal static class IClockExtensions +{ + public static TimeSpan TimeSince(this IClock clock, long timestamp) + { + double ticks = clock.GetTimestamp() - timestamp; + return TimeSpan.FromSeconds(ticks / clock.Frequency); + } +} + diff --git a/src/MongoDB.Driver/Core/Misc/StreamExtensionMethods.cs b/src/MongoDB.Driver/Core/Misc/StreamExtensionMethods.cs index 93e2da1a1ba..c557cb06180 100644 --- a/src/MongoDB.Driver/Core/Misc/StreamExtensionMethods.cs +++ b/src/MongoDB.Driver/Core/Misc/StreamExtensionMethods.cs @@ -1,4 +1,4 @@ -/* Copyright 2013-present MongoDB Inc. +/* Copyright 2010-present MongoDB Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -36,143 +36,16 @@ public static void EfficientCopyTo(this Stream input, Stream output) } } - public static int Read(this Stream stream, byte[] buffer, int offset, int count, TimeSpan timeout, CancellationToken cancellationToken) - { - try - { - using var manualResetEvent = new ManualResetEventSlim(); - var readOperation = stream.BeginRead( - buffer, - offset, - count, - state => ((ManualResetEventSlim)state.AsyncState).Set(), - manualResetEvent); - - if (readOperation.IsCompleted || manualResetEvent.Wait(timeout, cancellationToken)) - { - return stream.EndRead(readOperation); - } - } - catch (OperationCanceledException) - { - // Have to suppress OperationCanceledException here, it will be thrown after the stream will be disposed. - } - catch (ObjectDisposedException) - { - throw new IOException(); - } - - try - { - stream.Dispose(); - } - catch - { - // Ignore any exceptions - } - - cancellationToken.ThrowIfCancellationRequested(); - throw new TimeoutException(); - } - - public static async Task ReadAsync(this Stream stream, byte[] buffer, int offset, int count, TimeSpan timeout, CancellationToken cancellationToken) - { - Task readTask = null; - try - { - readTask = stream.ReadAsync(buffer, offset, count); - return await readTask.WaitAsync(timeout, cancellationToken).ConfigureAwait(false); - } - catch (ObjectDisposedException) - { - // It's possible to get ObjectDisposedException when the connection pool was closed with interruptInUseConnections set to true. - throw new IOException(); - } - catch (Exception ex) when (ex is OperationCanceledException or TimeoutException) - { - // await Task.WaitAsync() throws OperationCanceledException in case of cancellation and TimeoutException in case of timeout - try - { - stream.Dispose(); - if (readTask != null) - { - // Should await on the task to avoid UnobservedTaskException - await readTask.ConfigureAwait(false); - } - } - catch - { - // Ignore any exceptions - } - - throw; - } - } - - public static void ReadBytes(this Stream stream, OperationContext operationContext, byte[] buffer, int offset, int count, TimeSpan socketTimeout) - { - Ensure.IsNotNull(stream, nameof(stream)); - Ensure.IsNotNull(buffer, nameof(buffer)); - Ensure.IsBetween(offset, 0, buffer.Length, nameof(offset)); - Ensure.IsBetween(count, 0, buffer.Length - offset, nameof(count)); - - while (count > 0) - { - var bytesRead = stream.Read(buffer, offset, count, operationContext.RemainingTimeoutOrDefault(socketTimeout), operationContext.CancellationToken); - if (bytesRead == 0) - { - throw new EndOfStreamException(); - } - offset += bytesRead; - count -= bytesRead; - } - } - - public static void ReadBytes(this Stream stream, OperationContext operationContext, IByteBuffer buffer, int offset, int count, TimeSpan socketTimeout) - { - Ensure.IsNotNull(stream, nameof(stream)); - Ensure.IsNotNull(buffer, nameof(buffer)); - Ensure.IsBetween(offset, 0, buffer.Length, nameof(offset)); - Ensure.IsBetween(count, 0, buffer.Length - offset, nameof(count)); - - while (count > 0) - { - var backingBytes = buffer.AccessBackingBytes(offset); - var bytesToRead = Math.Min(count, backingBytes.Count); - var bytesRead = stream.Read(backingBytes.Array, backingBytes.Offset, bytesToRead, operationContext.RemainingTimeoutOrDefault(socketTimeout), operationContext.CancellationToken); - if (bytesRead == 0) - { - throw new EndOfStreamException(); - } - offset += bytesRead; - count -= bytesRead; - } - } - public static void ReadBytes(this Stream stream, byte[] destination, int offset, int count, CancellationToken cancellationToken) - { - while (count > 0) - { - var bytesRead = stream.Read(destination, offset, count); // TODO: honor cancellationToken? - if (bytesRead == 0) - { - throw new EndOfStreamException(); - } - offset += bytesRead; - count -= bytesRead; - } - } - - public static async Task ReadBytesAsync(this Stream stream, OperationContext operationContext, byte[] buffer, int offset, int count, TimeSpan socketTimeout) { Ensure.IsNotNull(stream, nameof(stream)); - Ensure.IsNotNull(buffer, nameof(buffer)); - Ensure.IsBetween(offset, 0, buffer.Length, nameof(offset)); - Ensure.IsBetween(count, 0, buffer.Length - offset, nameof(count)); + Ensure.IsNotNull(destination, nameof(destination)); + Ensure.IsBetween(offset, 0, destination.Length, nameof(offset)); + Ensure.IsBetween(count, 0, destination.Length - offset, nameof(count)); while (count > 0) { - var bytesRead = await stream.ReadAsync(buffer, offset, count, operationContext.RemainingTimeoutOrDefault(socketTimeout), operationContext.CancellationToken).ConfigureAwait(false); + var bytesRead = stream.Read(destination, offset, count); // TODO: honor cancellationToken? if (bytesRead == 0) { throw new EndOfStreamException(); @@ -182,29 +55,13 @@ public static async Task ReadBytesAsync(this Stream stream, OperationContext ope } } - public static async Task ReadBytesAsync(this Stream stream, OperationContext operationContext, IByteBuffer buffer, int offset, int count, TimeSpan socketTimeout) + public static async Task ReadBytesAsync(this Stream stream, byte[] destination, int offset, int count, CancellationToken cancellationToken) { Ensure.IsNotNull(stream, nameof(stream)); - Ensure.IsNotNull(buffer, nameof(buffer)); - Ensure.IsBetween(offset, 0, buffer.Length, nameof(offset)); - Ensure.IsBetween(count, 0, buffer.Length - offset, nameof(count)); - - while (count > 0) - { - var backingBytes = buffer.AccessBackingBytes(offset); - var bytesToRead = Math.Min(count, backingBytes.Count); - var bytesRead = await stream.ReadAsync(backingBytes.Array, backingBytes.Offset, bytesToRead, operationContext.RemainingTimeoutOrDefault(socketTimeout), operationContext.CancellationToken).ConfigureAwait(false); - if (bytesRead == 0) - { - throw new EndOfStreamException(); - } - offset += bytesRead; - count -= bytesRead; - } - } + Ensure.IsNotNull(destination, nameof(destination)); + Ensure.IsBetween(offset, 0, destination.Length, nameof(offset)); + Ensure.IsBetween(count, 0, destination.Length - offset, nameof(count)); - public static async Task ReadBytesAsync(this Stream stream, byte[] destination, int offset, int count, CancellationToken cancellationToken) - { while (count > 0) { var bytesRead = await stream.ReadAsync(destination, offset, count, cancellationToken).ConfigureAwait(false); diff --git a/src/MongoDB.Driver/Core/Operations/RetryabilityHelper.cs b/src/MongoDB.Driver/Core/Operations/RetryabilityHelper.cs index 0ccde3f9c7d..7ac9b51b076 100644 --- a/src/MongoDB.Driver/Core/Operations/RetryabilityHelper.cs +++ b/src/MongoDB.Driver/Core/Operations/RetryabilityHelper.cs @@ -45,7 +45,9 @@ static RetryabilityHelper() { typeof(MongoNotPrimaryException), typeof(MongoNodeIsRecoveringException), - typeof(MongoConnectionPoolPausedException) + typeof(MongoConnectionPoolPausedException), + + typeof(TimeoutException) }; __resumableChangeStreamExceptions = new HashSet(resumableAndRetryableExceptions); diff --git a/src/MongoDB.Driver/Core/Operations/RetryableWriteOperationExecutor.cs b/src/MongoDB.Driver/Core/Operations/RetryableWriteOperationExecutor.cs index ff8568bda36..638ad618f9b 100644 --- a/src/MongoDB.Driver/Core/Operations/RetryableWriteOperationExecutor.cs +++ b/src/MongoDB.Driver/Core/Operations/RetryableWriteOperationExecutor.cs @@ -136,13 +136,13 @@ public static async Task ExecuteAsync(OperationContext operati public static bool ShouldConnectionAcquireBeRetried(OperationContext operationContext, RetryableWriteContext context, ServerDescription server, Exception exception, int attempt) { - if (!DoesContextAllowRetries(context, server)) + // According to the spec error during handshake should be handled according to RetryableReads logic + if (!context.RetryRequested || context.Binding.Session.IsInTransaction) { return false; } var innerException = exception is MongoAuthenticationException mongoAuthenticationException ? mongoAuthenticationException.InnerException : exception; - // According the spec error during handshake should be handle according to RetryableReads logic if (!RetryabilityHelper.IsRetryableReadException(innerException)) { return false; diff --git a/src/MongoDB.Driver/Core/Servers/Server.cs b/src/MongoDB.Driver/Core/Servers/Server.cs index 50c1710e566..4e62919a67c 100644 --- a/src/MongoDB.Driver/Core/Servers/Server.cs +++ b/src/MongoDB.Driver/Core/Servers/Server.cs @@ -121,7 +121,7 @@ bool ShouldIgnoreException(Exception ex) // because of OperationCanceledExceptions. We trust that the // implementations of connection don't leave themselves in a state // where they can't be used based on user cancellation. - return ex is OperationCanceledException; + return ex is OperationCanceledException or TimeoutException; } Exception GetEffectiveException(Exception ex) => diff --git a/src/MongoDB.Driver/Core/WireProtocol/Messages/CommandRequestMessage.cs b/src/MongoDB.Driver/Core/WireProtocol/Messages/CommandRequestMessage.cs index d488eeb3e8e..1c0c625f02e 100644 --- a/src/MongoDB.Driver/Core/WireProtocol/Messages/CommandRequestMessage.cs +++ b/src/MongoDB.Driver/Core/WireProtocol/Messages/CommandRequestMessage.cs @@ -13,7 +13,6 @@ * limitations under the License. */ -using System; using MongoDB.Driver.Core.Misc; using MongoDB.Driver.Core.WireProtocol.Messages.Encoders; @@ -36,6 +35,8 @@ public CommandRequestMessage(CommandMessage wrappedMessage) public override MongoDBMessageType MessageType => _wrappedMessage.MessageType; + public override bool ResponseExpected => _wrappedMessage.ResponseExpected; + public CommandMessage WrappedMessage => _wrappedMessage; // public methods diff --git a/src/MongoDB.Driver/Core/WireProtocol/Messages/QueryMessage.cs b/src/MongoDB.Driver/Core/WireProtocol/Messages/QueryMessage.cs index feb0b0ffcf5..3b441d8ca83 100644 --- a/src/MongoDB.Driver/Core/WireProtocol/Messages/QueryMessage.cs +++ b/src/MongoDB.Driver/Core/WireProtocol/Messages/QueryMessage.cs @@ -161,6 +161,8 @@ public IElementNameValidator QueryValidator get { return _queryValidator; } } + public override bool ResponseExpected => ResponseHandling != CommandResponseHandling.NoResponseExpected; + public CommandResponseHandling ResponseHandling { get { return _responseHandling; } diff --git a/src/MongoDB.Driver/Core/WireProtocol/Messages/RequestMessage.cs b/src/MongoDB.Driver/Core/WireProtocol/Messages/RequestMessage.cs index a1e0d7bbd70..74a88f43de1 100644 --- a/src/MongoDB.Driver/Core/WireProtocol/Messages/RequestMessage.cs +++ b/src/MongoDB.Driver/Core/WireProtocol/Messages/RequestMessage.cs @@ -52,6 +52,8 @@ public int RequestId get { return _requestId; } } + public abstract bool ResponseExpected { get; } + public bool WasSent { get { return _wasSent; } diff --git a/src/MongoDB.Driver/OperationContext.cs b/src/MongoDB.Driver/OperationContext.cs index 7d0ce583df8..e86617381f7 100644 --- a/src/MongoDB.Driver/OperationContext.cs +++ b/src/MongoDB.Driver/OperationContext.cs @@ -63,9 +63,10 @@ public TimeSpan RemainingTimeout var result = Timeout.Value - Elapsed; if (result < TimeSpan.Zero) { - result = TimeSpan.Zero; + return TimeSpan.Zero; } + result = TimeSpan.FromMilliseconds(Math.Ceiling(result.TotalMilliseconds)); return result; } } @@ -94,14 +95,7 @@ public CancellationToken CombinedCancellationToken private IClock Clock { get; } - public TimeSpan Elapsed - { - get - { - var totalSeconds = (Clock.GetTimestamp() - InitialTimestamp) / (double)Clock.Frequency; - return TimeSpan.FromSeconds(totalSeconds); - } - } + public TimeSpan Elapsed => Clock.TimeSince(InitialTimestamp); public TimeSpan? Timeout { get; } @@ -122,7 +116,12 @@ public bool IsTimedOut() // Dotnet APIs like task.WaitAsync truncating the timeout to milliseconds. // We should truncate the remaining timeout to the milliseconds, in order to maintain the consistent state: // if operationContext.WaitTaskAsync() failed with TimeoutException, we want IsTimedOut() returns true. - return (int)RemainingTimeout.TotalMilliseconds == 0; + if (RemainingTimeout == System.Threading.Timeout.InfiniteTimeSpan) + { + return false; + } + + return RemainingTimeout.TotalMilliseconds < 5; } public bool IsCancelledOrTimedOut() => IsTimedOut() || CancellationToken.IsCancellationRequested; @@ -177,7 +176,7 @@ public async Task WaitTaskAsync(Task task) return; } - var timeout = RemainingTimeout; + var timeout = TimeSpan.FromMilliseconds(Math.Ceiling(RemainingTimeout.TotalMilliseconds)); if (timeout == TimeSpan.Zero) { throw new TimeoutException(); diff --git a/tests/MongoDB.Driver.TestHelpers/Core/MockConnection.cs b/tests/MongoDB.Driver.TestHelpers/Core/MockConnection.cs index b2830dc1f1f..001467afe2b 100644 --- a/tests/MongoDB.Driver.TestHelpers/Core/MockConnection.cs +++ b/tests/MongoDB.Driver.TestHelpers/Core/MockConnection.cs @@ -181,6 +181,15 @@ public List GetSentMessages() return _sentMessages; } + public void EnsureConnectionReady(OperationContext operationContext) + { + } + + public Task EnsureConnectionReadyAsync(OperationContext operationContext) + { + return Task.CompletedTask; + } + public void Open(OperationContext operationContext) { _openingEventHandler?.Invoke(new ConnectionOpeningEvent(_connectionId, _connectionSettings, null)); diff --git a/tests/MongoDB.Driver.Tests/Core/Misc/StreamExtensionMethodsTests.cs b/tests/MongoDB.Driver.Tests/Core/Misc/StreamExtensionMethodsTests.cs index 8da7e5f7de8..eccabad5760 100644 --- a/tests/MongoDB.Driver.Tests/Core/Misc/StreamExtensionMethodsTests.cs +++ b/tests/MongoDB.Driver.Tests/Core/Misc/StreamExtensionMethodsTests.cs @@ -34,7 +34,7 @@ public class StreamExtensionMethodsTests [InlineData(false, 0, new byte[] { 0, 0 })] [InlineData(false, 1, new byte[] { 1, 0 })] [InlineData(false, 2, new byte[] { 1, 2 })] - public async Task ReadBytes_with_byte_array_should_have_expected_effect_for_count(bool async, int count, byte[] expectedBytes) + public async Task ReadBytes_should_have_expected_effect_for_count(bool async, int count, byte[] expectedBytes) { var bytes = new byte[] { 1, 2 }; var stream = new MemoryStream(bytes); @@ -42,11 +42,11 @@ public async Task ReadBytes_with_byte_array_should_have_expected_effect_for_coun if (async) { - await stream.ReadBytesAsync(OperationContext.NoTimeout, destination, 0, count, Timeout.InfiniteTimeSpan); + await stream.ReadBytesAsync(destination, 0, count, CancellationToken.None); } else { - stream.ReadBytes(OperationContext.NoTimeout, destination, 0, count, Timeout.InfiniteTimeSpan); + stream.ReadBytes(destination, 0, count, CancellationToken.None); } destination.Should().Equal(expectedBytes); @@ -57,7 +57,7 @@ public async Task ReadBytes_with_byte_array_should_have_expected_effect_for_coun [InlineData(true, 2, new byte[] { 0, 0, 1 })] [InlineData(false, 1, new byte[] { 0, 1, 0 })] [InlineData(false, 2, new byte[] { 0, 0, 1 })] - public async Task ReadBytes_with_byte_array_should_have_expected_effect_for_offset(bool async, int offset, byte[] expectedBytes) + public async Task ReadBytes_should_have_expected_effect_for_offset(bool async, int offset, byte[] expectedBytes) { var bytes = new byte[] { 1 }; var stream = new MemoryStream(bytes); @@ -65,11 +65,11 @@ public async Task ReadBytes_with_byte_array_should_have_expected_effect_for_offs if (async) { - await stream.ReadBytesAsync(OperationContext.NoTimeout, destination, offset, 1, Timeout.InfiniteTimeSpan); + await stream.ReadBytesAsync(destination, offset, 1, CancellationToken.None); } else { - stream.ReadBytes(OperationContext.NoTimeout, destination, offset, 1, Timeout.InfiniteTimeSpan); + stream.ReadBytes(destination, offset, 1, CancellationToken.None); } destination.Should().Equal(expectedBytes); @@ -84,35 +84,33 @@ public async Task ReadBytes_with_byte_array_should_have_expected_effect_for_offs [InlineData(false, 2, new[] { 1, 2 })] [InlineData(false, 3, new[] { 2, 1 })] [InlineData(false, 4, new[] { 1, 1, 1 })] - public async Task ReadBytes_with_byte_array_should_have_expected_effect_for_partial_reads(bool async, int testCase, int[] partition) + public async Task ReadBytes_should_have_expected_effect_for_partial_reads(bool async, int testCase, int[] partition) { var mockStream = new Mock(); var bytes = new byte[] { 1, 2, 3 }; var n = 0; var position = 0; - Task ReadPartial (byte[] buffer, int offset, int count) + int ReadPartial (byte[] buffer, int offset, int count) { var length = partition[n++]; Buffer.BlockCopy(bytes, position, buffer, offset, length); position += length; - return Task.FromResult(length); + return length; } mockStream.Setup(s => s.ReadAsync(It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny())) - .Returns((byte[] buffer, int offset, int count, CancellationToken cancellationToken) => ReadPartial(buffer, offset, count)); - mockStream.Setup(s => s.BeginRead(It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny())) - .Returns((byte[] buffer, int offset, int count, AsyncCallback callback, object state) => ReadPartial(buffer, offset, count)); - mockStream.Setup(s => s.EndRead(It.IsAny())) - .Returns(x => ((Task)x).GetAwaiter().GetResult()); + .Returns((byte[] buffer, int offset, int count, CancellationToken _) => Task.FromResult(ReadPartial(buffer, offset, count))); + mockStream.Setup(s => s.Read(It.IsAny(), It.IsAny(), It.IsAny())) + .Returns((byte[] buffer, int offset, int count) => ReadPartial(buffer, offset, count)); var destination = new byte[3]; if (async) { - await mockStream.Object.ReadBytesAsync(OperationContext.NoTimeout, destination, 0, 3, Timeout.InfiniteTimeSpan); + await mockStream.Object.ReadBytesAsync(destination, 0, 3, CancellationToken.None); } else { - mockStream.Object.ReadBytes(OperationContext.NoTimeout, destination, 0, 3, Timeout.InfiniteTimeSpan); + mockStream.Object.ReadBytes(destination, 0, 3, CancellationToken.None); } destination.Should().Equal(bytes); @@ -120,7 +118,7 @@ Task ReadPartial (byte[] buffer, int offset, int count) [Theory] [ParameterAttributeData] - public async Task ReadBytes_with_byte_array_should_throw_when_end_of_stream_is_reached([Values(true, false)]bool async) + public async Task ReadBytes_should_throw_when_end_of_stream_is_reached([Values(true, false)]bool async) { var mockStream = new Mock(); var destination = new byte[1]; @@ -130,25 +128,25 @@ public async Task ReadBytes_with_byte_array_should_throw_when_end_of_stream_is_r .Returns(Task.FromResult(0)); var exception = async ? - await Record.ExceptionAsync(() => mockStream.Object.ReadBytesAsync(OperationContext.NoTimeout, destination, 0, 1, Timeout.InfiniteTimeSpan)) : - Record.Exception(() => mockStream.Object.ReadBytes(OperationContext.NoTimeout, destination, 0, 1, Timeout.InfiniteTimeSpan)); + await Record.ExceptionAsync(() => mockStream.Object.ReadBytesAsync(destination, 0, 1, CancellationToken.None)) : + Record.Exception(() => mockStream.Object.ReadBytes(destination, 0, 1, CancellationToken.None)); exception.Should().BeOfType(); } [Theory] [ParameterAttributeData] - public async Task ReadBytes_with_byte_array_should_throw_when_buffer_is_null([Values(true, false)]bool async) + public async Task ReadBytes_should_throw_when_buffer_is_null([Values(true, false)]bool async) { var stream = new Mock().Object; byte[] destination = null; var exception = async ? - await Record.ExceptionAsync(() => stream.ReadBytesAsync(OperationContext.NoTimeout, destination, 0, 0, Timeout.InfiniteTimeSpan)) : - Record.Exception(() => stream.ReadBytes(OperationContext.NoTimeout, destination, 0, 0, Timeout.InfiniteTimeSpan)); + await Record.ExceptionAsync(() => stream.ReadBytesAsync(destination, 0, 0, CancellationToken.None)) : + Record.Exception(() => stream.ReadBytes(destination, 0, 0, CancellationToken.None)); exception.Should().BeOfType().Subject - .ParamName.Should().Be("buffer"); + .ParamName.Should().Be("destination"); } [Theory] @@ -158,14 +156,14 @@ await Record.ExceptionAsync(() => stream.ReadBytesAsync(OperationContext.NoTimeo [InlineData(false, 0, -1)] [InlineData(false, 1, 2)] [InlineData(false, 2, 1)] - public async Task ReadBytes_with_byte_array_should_throw_when_count_is_invalid(bool async, int offset, int count) + public async Task ReadBytes_should_throw_when_count_is_invalid(bool async, int offset, int count) { var stream = new Mock().Object; var destination = new byte[2]; var exception = async ? - await Record.ExceptionAsync(() => stream.ReadBytesAsync(OperationContext.NoTimeout, destination, offset, count, Timeout.InfiniteTimeSpan)) : - Record.Exception(() => stream.ReadBytes(OperationContext.NoTimeout, destination, offset, count, Timeout.InfiniteTimeSpan)); + await Record.ExceptionAsync(() => stream.ReadBytesAsync(destination, offset, count, CancellationToken.None)) : + Record.Exception(() => stream.ReadBytes(destination, offset, count, CancellationToken.None)); exception.Should().BeOfType().Subject .ParamName.Should().Be("count"); @@ -173,7 +171,7 @@ await Record.ExceptionAsync(() => stream.ReadBytesAsync(OperationContext.NoTimeo [Theory] [ParameterAttributeData] - public async Task ReadBytes_with_byte_array_should_throw_when_offset_is_invalid( + public async Task ReadBytes_should_throw_when_offset_is_invalid( [Values(true, false)]bool async, [Values(-1, 3)]int offset) { @@ -181,8 +179,8 @@ public async Task ReadBytes_with_byte_array_should_throw_when_offset_is_invalid( var destination = new byte[2]; var exception = async ? - await Record.ExceptionAsync(() => stream.ReadBytesAsync(OperationContext.NoTimeout, destination, offset, 0, Timeout.InfiniteTimeSpan)) : - Record.Exception(() => stream.ReadBytes(OperationContext.NoTimeout, destination, offset, 0, Timeout.InfiniteTimeSpan)); + await Record.ExceptionAsync(() => stream.ReadBytesAsync(destination, offset, 0, CancellationToken.None)) : + Record.Exception(() => stream.ReadBytes(destination, offset, 0, CancellationToken.None)); exception.Should().BeOfType().Subject .ParamName.Should().Be("offset"); @@ -190,190 +188,14 @@ await Record.ExceptionAsync(() => stream.ReadBytesAsync(OperationContext.NoTimeo [Theory] [ParameterAttributeData] - public async Task ReadBytes_with_byte_array_should_throw_when_stream_is_null([Values(true, false)]bool async) + public async Task ReadBytes_should_throw_when_stream_is_null([Values(true, false)]bool async) { Stream stream = null; var destination = new byte[0]; var exception = async ? - await Record.ExceptionAsync(() => stream.ReadBytesAsync(OperationContext.NoTimeout, destination, 0, 0, Timeout.InfiniteTimeSpan)) : - Record.Exception(() => stream.ReadBytes(OperationContext.NoTimeout, destination, 0, 0, Timeout.InfiniteTimeSpan)); - - exception.Should().BeOfType().Subject - .ParamName.Should().Be("stream"); - } - - [Theory] - [InlineData(true, 0, new byte[] { 0, 0 })] - [InlineData(true, 1, new byte[] { 1, 0 })] - [InlineData(true, 2, new byte[] { 1, 2 })] - [InlineData(false, 0, new byte[] { 0, 0 })] - [InlineData(false, 1, new byte[] { 1, 0 })] - [InlineData(false, 2, new byte[] { 1, 2 })] - public async Task ReadBytes_with_byte_buffer_should_have_expected_effect_for_count(bool async, int count, byte[] expectedBytes) - { - var bytes = new byte[] { 1, 2 }; - var stream = new MemoryStream(bytes); - var destination = new ByteArrayBuffer(new byte[2]); - - if (async) - { - await stream.ReadBytesAsync(OperationContext.NoTimeout, destination, 0, count, Timeout.InfiniteTimeSpan); - } - else - { - stream.ReadBytes(OperationContext.NoTimeout, destination, 0, count, Timeout.InfiniteTimeSpan); - } - - destination.AccessBackingBytes(0).Array.Should().Equal(expectedBytes); - } - - [Theory] - [InlineData(true, 1, new byte[] { 0, 1, 0 })] - [InlineData(true, 2, new byte[] { 0, 0, 1 })] - [InlineData(false, 1, new byte[] { 0, 1, 0 })] - [InlineData(false, 2, new byte[] { 0, 0, 1 })] - public async Task ReadBytes_with_byte_buffer_should_have_expected_effect_for_offset(bool async, int offset, byte[] expectedBytes) - { - var bytes = new byte[] { 1 }; - var stream = new MemoryStream(bytes); - var destination = new ByteArrayBuffer(new byte[3]); - - if (async) - { - await stream.ReadBytesAsync(OperationContext.NoTimeout, destination, offset, 1, Timeout.InfiniteTimeSpan); - } - else - { - stream.ReadBytes(OperationContext.NoTimeout, destination, offset, 1, Timeout.InfiniteTimeSpan); - } - - destination.AccessBackingBytes(0).Array.Should().Equal(expectedBytes); - } - - [Theory] - [InlineData(true, 1, new[] { 3 })] - [InlineData(true, 2, new[] { 1, 2 })] - [InlineData(true, 3, new[] { 2, 1 })] - [InlineData(true, 4, new[] { 1, 1, 1 })] - [InlineData(false, 1, new[] { 3 })] - [InlineData(false, 2, new[] { 1, 2 })] - [InlineData(false, 3, new[] { 2, 1 })] - [InlineData(false, 4, new[] { 1, 1, 1 })] - public async Task ReadBytes_with_byte_buffer_should_have_expected_effect_for_partial_reads(bool async, int testCase, int[] partition) - { - var bytes = new byte[] { 1, 2, 3 }; - var mockStream = new Mock(); - var destination = new ByteArrayBuffer(new byte[3], 3); - var n = 0; - var position = 0; - Task ReadPartial (byte[] buffer, int offset, int count) - { - var length = partition[n++]; - Buffer.BlockCopy(bytes, position, buffer, offset, length); - position += length; - return Task.FromResult(length); - } - - mockStream.Setup(s => s.ReadAsync(It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny())) - .Returns((byte[] buffer, int offset, int count, CancellationToken cancellationToken) => ReadPartial(buffer, offset, count)); - mockStream.Setup(s => s.BeginRead(It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny())) - .Returns((byte[] buffer, int offset, int count, AsyncCallback callback, object state) => ReadPartial(buffer, offset, count)); - mockStream.Setup(s => s.EndRead(It.IsAny())) - .Returns(x => ((Task)x).GetAwaiter().GetResult()); - - if (async) - { - await mockStream.Object.ReadBytesAsync(OperationContext.NoTimeout, destination, 0, 3, Timeout.InfiniteTimeSpan); - } - else - { - mockStream.Object.ReadBytes(OperationContext.NoTimeout, destination, 0, 3, Timeout.InfiniteTimeSpan); - } - - destination.AccessBackingBytes(0).Array.Should().Equal(bytes); - } - - [Theory] - [ParameterAttributeData] - public async Task ReadBytes_with_byte_buffer_should_throw_when_end_of_stream_is_reached([Values(true, false)]bool async) - { - var mockStream = new Mock(); - var destination = CreateMockByteBuffer(1).Object; - mockStream.Setup(s => s.ReadAsync(It.IsAny(), 0, 1, It.IsAny())) - .ReturnsAsync(0); - mockStream.Setup(s => s.BeginRead(It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny())) - .Returns(Task.FromResult(0)); - - var exception = async ? - await Record.ExceptionAsync(() => mockStream.Object.ReadBytesAsync(OperationContext.NoTimeout, destination, 0, 1, Timeout.InfiniteTimeSpan)) : - Record.Exception(() => mockStream.Object.ReadBytes(OperationContext.NoTimeout, destination, 0, 1, Timeout.InfiniteTimeSpan)); - - exception.Should().BeOfType(); - } - - [Theory] - [ParameterAttributeData] - public async Task ReadBytes_with_byte_buffer_should_throw_when_buffer_is_null([Values(true, false)]bool async) - { - var stream = new Mock().Object; - IByteBuffer destination = null; - - var exception = async ? - await Record.ExceptionAsync(() => stream.ReadBytesAsync(OperationContext.NoTimeout, destination, 0, 0, Timeout.InfiniteTimeSpan)) : - Record.Exception(() => stream.ReadBytes(OperationContext.NoTimeout, destination, 0, 0, Timeout.InfiniteTimeSpan)); - - exception.Should().BeOfType().Subject - .ParamName.Should().Be("buffer"); - } - - [Theory] - [InlineData(true, 0, -1)] - [InlineData(true, 1, 2)] - [InlineData(true, 2, 1)] - [InlineData(false, 0, -1)] - [InlineData(false, 1, 2)] - [InlineData(false, 2, 1)] - public async Task ReadBytes_with_byte_buffer_should_throw_when_count_is_invalid(bool async, int offset, int count) - { - var stream = new Mock().Object; - var destination = CreateMockByteBuffer(2).Object; - - var exception = async ? - await Record.ExceptionAsync(() => stream.ReadBytesAsync(OperationContext.NoTimeout, destination, offset, count, Timeout.InfiniteTimeSpan)) : - Record.Exception(() => stream.ReadBytes(OperationContext.NoTimeout, destination, offset, count, Timeout.InfiniteTimeSpan)); - - exception.Should().BeOfType().Subject - .ParamName.Should().Be("count"); - } - - [Theory] - [ParameterAttributeData] - public async Task ReadBytes_with_byte_buffer_should_throw_when_offset_is_invalid( - [Values(true, false)] bool async, - [Values(-1, 3)]int offset) - { - var stream = new Mock().Object; - var destination = CreateMockByteBuffer(2).Object; - - var exception = async ? - await Record.ExceptionAsync(() => stream.ReadBytesAsync(OperationContext.NoTimeout, destination, offset, 0, Timeout.InfiniteTimeSpan)) : - Record.Exception(() => stream.ReadBytes(OperationContext.NoTimeout, destination, offset, 0, Timeout.InfiniteTimeSpan)); - - exception.Should().BeOfType().Subject - .ParamName.Should().Be("offset"); - } - - [Theory] - [ParameterAttributeData] - public async Task ReadBytes_with_byte_buffer_should_throw_when_stream_is_null([Values(true, false)]bool async) - { - Stream stream = null; - var destination = new Mock().Object; - - var exception = async ? - await Record.ExceptionAsync(() => stream.ReadBytesAsync(OperationContext.NoTimeout, destination, 0, 0, Timeout.InfiniteTimeSpan)) : - Record.Exception(() => stream.ReadBytes(OperationContext.NoTimeout, destination, 0, 0, Timeout.InfiniteTimeSpan)); + await Record.ExceptionAsync(() => stream.ReadBytesAsync(destination, 0, 0, CancellationToken.None)) : + Record.Exception(() => stream.ReadBytes(destination, 0, 0, CancellationToken.None)); exception.Should().BeOfType().Subject .ParamName.Should().Be("stream"); diff --git a/tests/MongoDB.Driver.Tests/Specifications/connection-monitoring-and-pooling/PendingResponseProseTests.cs b/tests/MongoDB.Driver.Tests/Specifications/connection-monitoring-and-pooling/PendingResponseProseTests.cs new file mode 100644 index 00000000000..de68de605e0 --- /dev/null +++ b/tests/MongoDB.Driver.Tests/Specifications/connection-monitoring-and-pooling/PendingResponseProseTests.cs @@ -0,0 +1,146 @@ +/* Copyright 2010-present MongoDB Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +using System; +using System.Threading.Tasks; +using FluentAssertions; +using MongoDB.Bson; +using MongoDB.Driver.Core; +using MongoDB.Driver.Core.Events; +using MongoDB.Driver.Core.Servers; +using MongoDB.TestHelpers.XunitExtensions; +using Xunit; + +namespace MongoDB.Driver.Tests.Specifications.connection_monitoring_and_pooling; + +[Category("CSOTPendingResponse")] +public class PendingResponseProseTests +{ + public PendingResponseProseTests() + { + RequireEnvironment.Check().EnvironmentVariable("ENABLE_CSOT_PENDING_RESPONSE_TESTS"); + } + + [Theory] + [ParameterAttributeData] + public async Task RecoverPartiallyReadResponse( + [Values(true, false)]bool async, + [Values(2, 4, 10)]int sendBytes) + { + var (client, eventCapturer) = CreateClient(); + using (client) + { + var database = client.GetDatabase("test"); + // drop collection if present and also pre-heat the connection pool + database.DropCollection("coll"); + + // TODO: CSOT: Implement timeout support for RunCommand and use it instead of MongoDatabaseSettings. + var testDatabase = client.GetDatabase("test", new MongoDatabaseSettings { Timeout = TimeSpan.FromMilliseconds(200) }); + var testCommand = CreateInsertCommand(42); + testCommand.Add("proxyTest", CreateTestProxyDocument(sendBytes)); + + var exception = async ? + await Record.ExceptionAsync(() => testDatabase.RunCommandAsync(testCommand)) : + Record.Exception(() => testDatabase.RunCommand(testCommand)); + exception.Should().BeOfType().Subject.InnerException + .Should().BeOfType(); + + exception = async ? + await Record.ExceptionAsync(() => database.RunCommandAsync(CreateInsertCommand(43))) : + Record.Exception(() => database.RunCommand(CreateInsertCommand(43))); + exception.Should().BeNull(); + + eventCapturer.Next().Should().BeOfType(); + eventCapturer.Next().Should().BeOfType(); + eventCapturer.Next().Should().BeOfType(); + eventCapturer.Any().Should().BeFalse(); + } + } + + [Theory] + [ParameterAttributeData] + public async Task NonDestructiveAlivenessCheck([Values(true, false)]bool async) + { + var (client, eventCapturer) = CreateClient(); + using (client) + { + var database = client.GetDatabase("test"); + // drop collection if present and also pre-heat the connection pool + database.DropCollection("coll"); + + // TODO: CSOT: Implement timeout support for RunCommand and use it instead of MongoDatabaseSettings. + var testDatabase = client.GetDatabase("test", new MongoDatabaseSettings { Timeout = TimeSpan.FromMilliseconds(200) }); + var testCommand = CreateInsertCommand(42); + + testCommand.Add("proxyTest", CreateTestProxyDocument(2)); + + var exception = async ? + await Record.ExceptionAsync(() => testDatabase.RunCommandAsync(testCommand)) : + Record.Exception(() => testDatabase.RunCommand(testCommand)); + exception.Should().BeOfType().Subject.InnerException + .Should().BeOfType(); + + await Task.Delay(4000); + + exception = async ? + await Record.ExceptionAsync(() => database.RunCommandAsync(CreateInsertCommand(43))) : + Record.Exception(() => database.RunCommand(CreateInsertCommand(43))); + exception.Should().BeNull(); + + eventCapturer.Next().Should().BeOfType(); + eventCapturer.Next().Should().BeOfType(); + eventCapturer.Next().Should().BeOfType(); + eventCapturer.Any().Should().BeFalse(); + } + } + + private (IMongoClient client, EventCapturer eventCapturer) CreateClient() + { + var eventCapturer = new EventCapturer() + .Capture() + .Capture() + .Capture() + .Capture() + .Capture(); + var settings = DriverTestConfiguration.GetClientSettings(); + settings.RetryReads = false; + settings.RetryWrites = false; + settings.MinConnectionPoolSize = 0; + settings.MaxConnectionPoolSize = 1; + settings.ServerMonitoringMode = ServerMonitoringMode.Poll; + settings.ClusterConfigurator = (builder) => builder.Subscribe(eventCapturer); + return (DriverTestConfiguration.CreateMongoClient(settings), eventCapturer); + } + + private BsonDocument CreateInsertCommand(int documentId) => + new() + { + { "insert", "coll" }, + { "documents", BsonArray.Create(new[] { new BsonDocument { { "_id", documentId } } }) } + }; + + private BsonDocument CreateTestProxyDocument(int sendBytes) => + new() + { + { "actions", BsonArray.Create(new[] + { + new BsonDocument("sendBytes", 2), + new BsonDocument("delayMs", 400), + new BsonDocument("sendAll", true) + }) } + }; + +} + diff --git a/tests/MongoDB.Driver.Tests/UnifiedTestOperations/Matchers/UnifiedEventMatcher.cs b/tests/MongoDB.Driver.Tests/UnifiedTestOperations/Matchers/UnifiedEventMatcher.cs index cd7edc7b842..9b4a04e59eb 100644 --- a/tests/MongoDB.Driver.Tests/UnifiedTestOperations/Matchers/UnifiedEventMatcher.cs +++ b/tests/MongoDB.Driver.Tests/UnifiedTestOperations/Matchers/UnifiedEventMatcher.cs @@ -44,6 +44,9 @@ public class UnifiedEventMatcher { "connectionClosedEvent", (EventType.ConnectionClosed, EventSetType.Cmap) }, { "connectionCreatedEvent", (EventType.ConnectionCreated, EventSetType.Cmap) }, { "connectionCheckOutFailedEvent", (EventType.ConnectionPoolCheckingOutConnectionFailed, EventSetType.Cmap) }, + { "connectionPendingResponseStartedEvent", (EventType.ConnectionReadingPendingResponse, EventSetType.Cmap)}, + { "connectionPendingResponseSucceededEvent", (EventType.ConnectionReadPendingResponse, EventSetType.Cmap)}, + { "connectionPendingResponseFailedEvent", (EventType.ConnectionReadingPendingResponseFailed, EventSetType.Cmap)}, { "poolClearedEvent", (EventType.ConnectionPoolCleared, EventSetType.Cmap) }, { "poolReadyEvent", (EventType.ConnectionPoolReady, EventSetType.Cmap) }, @@ -254,6 +257,36 @@ private void AssertEvents(List actualEvents, BsonArray expectedEventsDoc case ConnectionOpenedEvent: expectedEventValue.ElementCount.Should().Be(0); // empty document break; + case ConnectionReadingPendingResponseEvent: + expectedEventValue.ElementCount.Should().Be(0); // empty document + break; + case ConnectionReadPendingResponseEvent: + expectedEventValue.ElementCount.Should().Be(0); // empty document + break; + case ConnectionReadingPendingResponseFailedEvent pendingResponseFailedEvent: + foreach (var element in expectedEventValue) + { + switch (element.Name) + { + case "reason": + var exception = pendingResponseFailedEvent.Exception; + switch (element.Value.AsString) + { + case "timeout": + exception.Should().BeOfType(); + break; + case "error": + exception.Should().NotBeNull(); + break; + default: + throw new FormatException($"Unexpected {expectedEventType} '{element.Name}' value: {element.Value}."); + } + break; + default: + throw new FormatException($"Unexpected {expectedEventType} field: '{element.Name}'."); + } + } + break; case ConnectionPoolCheckingOutConnectionEvent: expectedEventValue.ElementCount.Should().Be(0); // empty document break; diff --git a/tests/MongoDB.Driver.Tests/UnifiedTestOperations/Matchers/UnifiedLogMatcher.cs b/tests/MongoDB.Driver.Tests/UnifiedTestOperations/Matchers/UnifiedLogMatcher.cs index 9a9b3dc1885..2c947061d31 100644 --- a/tests/MongoDB.Driver.Tests/UnifiedTestOperations/Matchers/UnifiedLogMatcher.cs +++ b/tests/MongoDB.Driver.Tests/UnifiedTestOperations/Matchers/UnifiedLogMatcher.cs @@ -18,6 +18,7 @@ using FluentAssertions; using MongoDB.Bson; using MongoDB.Driver.Core.TestHelpers.Logging; +using Xunit; namespace MongoDB.Driver.Tests.UnifiedTestOperations.Matchers { @@ -34,6 +35,12 @@ public void AssertLogsMatch(LogEntry[] actualLogs, BsonArray expectedLogs, bool { if (ignoreExtraLogs) { + if (actualLogs.Length < expectedLogs.Count) + { + Assert.Fail(string.Join(":::", actualLogs.Select(l => l.FormattedMessage))); + } + + actualLogs.Length.Should().BeGreaterOrEqualTo(expectedLogs.Count); } else diff --git a/tests/MongoDB.Driver.Tests/UnifiedTestOperations/UnifiedEntityMap.cs b/tests/MongoDB.Driver.Tests/UnifiedTestOperations/UnifiedEntityMap.cs index 5649ca8a3c2..9a08c27ce93 100644 --- a/tests/MongoDB.Driver.Tests/UnifiedTestOperations/UnifiedEntityMap.cs +++ b/tests/MongoDB.Driver.Tests/UnifiedTestOperations/UnifiedEntityMap.cs @@ -1026,6 +1026,15 @@ private EventCapturer CreateEventCapturer(IEnumerable eventTypesToCaptur case "connectioncheckedinevent": eventCapturer = eventCapturer.Capture(); break; + case "connectionpendingresponsestartedevent": + eventCapturer = eventCapturer.Capture(); + break; + case "connectionpendingresponsesucceededevent": + eventCapturer = eventCapturer.Capture(); + break; + case "connectionpendingresponsefailedevent": + eventCapturer = eventCapturer.Capture(); + break; case "serverdescriptionchangedevent": eventCapturer = eventCapturer.Capture(); break; diff --git a/tests/MongoDB.Driver.Tests/UnifiedTestOperations/UnifiedTestRunner.cs b/tests/MongoDB.Driver.Tests/UnifiedTestOperations/UnifiedTestRunner.cs index 3b4625bc4f1..5de68ad2772 100644 --- a/tests/MongoDB.Driver.Tests/UnifiedTestOperations/UnifiedTestRunner.cs +++ b/tests/MongoDB.Driver.Tests/UnifiedTestOperations/UnifiedTestRunner.cs @@ -108,7 +108,7 @@ public void Run( var schemaSemanticVersion = SemanticVersion.Parse(schemaVersion); if (schemaSemanticVersion < new SemanticVersion(1, 0, 0) || - schemaSemanticVersion > new SemanticVersion(1, 26, 0)) + schemaSemanticVersion > new SemanticVersion(1, 28, 0)) { throw new FormatException($"Schema version '{schemaVersion}' is not supported."); } From 4aae4e5e69d38277b61bc87066c69b6f8afcb133 Mon Sep 17 00:00:00 2001 From: Oleksandr Poliakov Date: Thu, 9 Oct 2025 12:55:37 -0700 Subject: [PATCH 2/3] pr --- .../Core/Connections/BinaryConnection.cs | 147 ++++++++---------- 1 file changed, 68 insertions(+), 79 deletions(-) diff --git a/src/MongoDB.Driver/Core/Connections/BinaryConnection.cs b/src/MongoDB.Driver/Core/Connections/BinaryConnection.cs index a633fcf8ac7..a863fd23638 100644 --- a/src/MongoDB.Driver/Core/Connections/BinaryConnection.cs +++ b/src/MongoDB.Driver/Core/Connections/BinaryConnection.cs @@ -180,6 +180,7 @@ private void Dispose(bool disposing) try { _stream.Dispose(); + _responseHelper.Dispose(); } catch { @@ -367,21 +368,13 @@ public ResponseMessage ReceiveMessage( try { helper.ReceivingMessage(); - while (true) - { - using (var buffer = _responseHelper.ReadResponse(operationContext)) - { - if (responseTo != GetResponseTo(buffer)) - { - continue; - } + using var buffer = _responseHelper.ReadResponse(operationContext); + EnsureResponseToIsCorrect(buffer, responseTo); - var message = helper.DecodeMessage(operationContext, buffer, encoderSelector); - helper.ReceivedMessage(buffer, message); - _lastUsedAtUtc = DateTime.UtcNow; - return message; - } - } + var message = helper.DecodeMessage(operationContext, buffer, encoderSelector); + helper.ReceivedMessage(buffer, message); + _lastUsedAtUtc = DateTime.UtcNow; + return message; } catch (Exception ex) { @@ -392,7 +385,14 @@ public ResponseMessage ReceiveMessage( } helper.FailedReceivingMessage(ex); - if (wrappedException == null) { throw; } else { throw wrappedException; } + if (wrappedException == null) + { + throw; + } + else + { + throw wrappedException; + } } } @@ -409,21 +409,13 @@ public async Task ReceiveMessageAsync( try { helper.ReceivingMessage(); - while (true) - { - using (var buffer = await _responseHelper.ReadResponseAsync(operationContext).ConfigureAwait(false)) - { - if (responseTo != GetResponseTo(buffer)) - { - continue; - } + using var buffer = await _responseHelper.ReadResponseAsync(operationContext).ConfigureAwait(false); + EnsureResponseToIsCorrect(buffer, responseTo); - var message = helper.DecodeMessage(operationContext, buffer, encoderSelector); - helper.ReceivedMessage(buffer, message); - _lastUsedAtUtc = DateTime.UtcNow; - return message; - } - } + var message = helper.DecodeMessage(operationContext, buffer, encoderSelector); + helper.ReceivedMessage(buffer, message); + _lastUsedAtUtc = DateTime.UtcNow; + return message; } catch (Exception ex) { @@ -438,10 +430,14 @@ public async Task ReceiveMessageAsync( } } - private int GetResponseTo(IByteBuffer message) + private void EnsureResponseToIsCorrect(IByteBuffer message, int expectedResponseTo) { var backingBytes = message.AccessBackingBytes(8); - return BitConverter.ToInt32(backingBytes.Array, backingBytes.Offset); + var receivedResponseTo = BitConverter.ToInt32(backingBytes.Array, backingBytes.Offset); + if (receivedResponseTo != expectedResponseTo) + { + throw new InvalidOperationException($"Expected responseTo to be {expectedResponseTo} but was {receivedResponseTo}."); // should not be reached + } } private void SendBuffer(OperationContext operationContext, IByteBuffer buffer) @@ -728,9 +724,7 @@ public void OpeningConnection() } } -#pragma warning disable CA1001 - private sealed class ResponseHelper -#pragma warning restore CA1001 + private sealed class ResponseHelper : IDisposable { private const int MessageSizePrefixLength = 4; private static readonly TimeSpan __pendingReadsWindow = TimeSpan.FromSeconds(3); @@ -738,10 +732,9 @@ private sealed class ResponseHelper private readonly IClock _clock; private readonly BinaryConnection _connection; private readonly byte[] _responseHeaderBuffer = new byte[MessageSizePrefixLength]; - private readonly ManualResetEventSlim _manualResetEventSlim = new ManualResetEventSlim(); + private readonly ManualResetEventSlim _manualResetEventSlim = new(); private IByteBuffer _byteBuffer; private int _bytesRead; - private Exception _readException; private long _lastReadTimestamp; private long? _pendingRequestId; private Task _pendingReadTask; @@ -754,6 +747,13 @@ public ResponseHelper(BinaryConnection connection, IClock clock) public bool HasPendingReads => _pendingRequestId.HasValue; + public void Dispose() + { + _manualResetEventSlim.Dispose(); + _byteBuffer?.Dispose(); + _byteBuffer = null; + } + public void DiscardPendingReads(OperationContext operationContext) { if (!_pendingRequestId.HasValue) @@ -829,9 +829,9 @@ public IByteBuffer ReadResponse(OperationContext operationContext) } catch (Exception ex) { + // Should not dispose buffer if timeout, as it is used in ContinueReadAsync if (ex is not (TimeoutException or OperationCanceledException or TaskCanceledException)) { - // Should not dispose buffer here, as it is passed into ContinueReadAsync _byteBuffer?.Dispose(); } @@ -895,7 +895,6 @@ private void HandlePendingReadsException(Exception ex) private void ClearPendingReads() { - _readException = null; _bytesRead = 0; _byteBuffer = null; _pendingRequestId = null; @@ -918,19 +917,13 @@ private Task ConfigurePendingReadsTask(Task readTask) return readTask; } - private async Task ContinueReadAsync(IAsyncResult asyncResult = null) + private async Task ContinueReadAsync(Task previousTask = null) { try { - if (asyncResult != null) + if (previousTask != null) { - var asyncResultTask = Task.Factory.FromAsync(asyncResult, _ => { }); - await asyncResultTask.ConfigureAwait(false); - - if (_readException != null) - { - throw _readException; - } + await previousTask.ConfigureAwait(false); } var messageSize = await ReadMessageSizeAsync().ConfigureAwait(false); @@ -1018,7 +1011,6 @@ private void ReadingRendingResponseDone(TimeSpan duration) private void ReadBytes(OperationContext operationContext, byte[] destination, int offset, int count) { var bytesRead = 0; - var initialBytesRead = _bytesRead; while (bytesRead < count) { _manualResetEventSlim.Reset(); @@ -1029,47 +1021,44 @@ private void ReadBytes(OperationContext operationContext, byte[] destination, in count - bytesRead, result => { - var responseHelper = (ResponseHelper)result.AsyncState; - int readResult = 0; - try - { - - readResult = responseHelper._connection._stream.EndRead(result); - } - catch (ObjectDisposedException ex) - { - _readException = new EndOfStreamException(ex.Message, ex); - } - catch (Exception ex) - { - // Async handler should not throw any errors, we mark the current read as failed so appropriate exception could be thrown later. - _readException = ex; - } - - if (readResult == 0) - { - _readException ??= new EndOfStreamException(); - } - - responseHelper._bytesRead += readResult; - responseHelper._lastReadTimestamp = responseHelper._clock.GetTimestamp(); - responseHelper._manualResetEventSlim.Set(); + var manualResetEventSlim = (ManualResetEventSlim)result.AsyncState; + manualResetEventSlim.Set(); }, - this); + _manualResetEventSlim); if (!_manualResetEventSlim.Wait(operationContext.RemainingTimeout, operationContext.CancellationToken)) { - _pendingReadTask = ConfigurePendingReadsTask(ContinueReadAsync(operation)); + var asyncResultTask = Task.Factory.FromAsync(operation, ReadCallback); + + _pendingReadTask = ConfigurePendingReadsTask(ContinueReadAsync(asyncResultTask)); operationContext.CancellationToken.ThrowIfCancellationRequested(); throw new TimeoutException(); } - if (_readException != null) + bytesRead += ReadCallback(operation); + } + + int ReadCallback(IAsyncResult result) + { + int readResult; + try + { + + readResult = _connection._stream.EndRead(result); + } + catch (ObjectDisposedException ex) + { + throw new IOException(ex.Message, ex); + } + + if (readResult == 0) { - throw _readException; + throw new EndOfStreamException(); } - bytesRead = _bytesRead - initialBytesRead; + _bytesRead += readResult; + _lastReadTimestamp = _clock.GetTimestamp(); + return readResult; } } @@ -1093,7 +1082,7 @@ private async Task ReadBytesAsync(byte[] destination, int offset, int count) } catch (ObjectDisposedException ex) { - throw new EndOfStreamException(ex.Message, ex); + throw new IOException(ex.Message, ex); } } } From 839b252b02f03f98a1efa27a43560e3937c4451a Mon Sep 17 00:00:00 2001 From: Oleksandr Poliakov Date: Fri, 10 Oct 2025 16:47:43 -0700 Subject: [PATCH 3/3] pr --- .../Core/Connections/BinaryConnection.cs | 81 +++++++++---------- src/MongoDB.Driver/OperationContext.cs | 2 +- .../UnifiedTestOperations/UnifiedEntityMap.cs | 1 + 3 files changed, 41 insertions(+), 43 deletions(-) diff --git a/src/MongoDB.Driver/Core/Connections/BinaryConnection.cs b/src/MongoDB.Driver/Core/Connections/BinaryConnection.cs index a863fd23638..a20065773ba 100644 --- a/src/MongoDB.Driver/Core/Connections/BinaryConnection.cs +++ b/src/MongoDB.Driver/Core/Connections/BinaryConnection.cs @@ -53,7 +53,6 @@ internal sealed class BinaryConnection : IConnection private readonly ResponseHelper _responseHelper; private CompressorType? _sendCompressorType; private readonly ConnectionSettings _settings; - private readonly TimeSpan _socketReadTimeout; private readonly TimeSpan _socketWriteTimeout; private readonly InterlockedInt32 _state; private Stream _stream; @@ -85,9 +84,8 @@ public BinaryConnection( _compressorSource = new CompressorSource(settings.Compressors); _eventLogger = loggerFactory.CreateEventLogger(eventSubscriber); _commandEventHelper = new CommandEventHelper(loggerFactory.CreateEventLogger(eventSubscriber)); - _socketReadTimeout = socketReadTimeout; _socketWriteTimeout = socketWriteTimeout; - _responseHelper = new ResponseHelper(this, SystemClock.Instance); + _responseHelper = new ResponseHelper(this, SystemClock.Instance, socketReadTimeout); } // properties @@ -202,7 +200,7 @@ public void EnsureConnectionReady(OperationContext operationContext) } ThrowIfCancelledOrDisposed(operationContext); - _responseHelper.DiscardPendingReads(operationContext); + _responseHelper.DiscardPendingResponse(operationContext); } public Task EnsureConnectionReadyAsync(OperationContext operationContext) @@ -213,7 +211,7 @@ public Task EnsureConnectionReadyAsync(OperationContext operationContext) } ThrowIfCancelledOrDisposed(operationContext); - return _responseHelper.DiscardPendingReadsAsync(operationContext); + return _responseHelper.DiscardPendingResponseAsync(operationContext); } public void Open(OperationContext operationContext) @@ -433,7 +431,7 @@ public async Task ReceiveMessageAsync( private void EnsureResponseToIsCorrect(IByteBuffer message, int expectedResponseTo) { var backingBytes = message.AccessBackingBytes(8); - var receivedResponseTo = BitConverter.ToInt32(backingBytes.Array, backingBytes.Offset); + var receivedResponseTo = BinaryPrimitives.ReadInt32LittleEndian(backingBytes.Array.AsSpan().Slice(backingBytes.Offset, 4)); if (receivedResponseTo != expectedResponseTo) { throw new InvalidOperationException($"Expected responseTo to be {expectedResponseTo} but was {receivedResponseTo}."); // should not be reached @@ -731,6 +729,7 @@ private sealed class ResponseHelper : IDisposable private readonly IClock _clock; private readonly BinaryConnection _connection; + private readonly TimeSpan _socketReadTimeout; private readonly byte[] _responseHeaderBuffer = new byte[MessageSizePrefixLength]; private readonly ManualResetEventSlim _manualResetEventSlim = new(); private IByteBuffer _byteBuffer; @@ -739,10 +738,11 @@ private sealed class ResponseHelper : IDisposable private long? _pendingRequestId; private Task _pendingReadTask; - public ResponseHelper(BinaryConnection connection, IClock clock) + public ResponseHelper(BinaryConnection connection, IClock clock, TimeSpan socketReadTimeout) { _connection = connection; _clock = clock; + _socketReadTimeout = socketReadTimeout; } public bool HasPendingReads => _pendingRequestId.HasValue; @@ -754,60 +754,54 @@ public void Dispose() _byteBuffer = null; } - public void DiscardPendingReads(OperationContext operationContext) + public void DiscardPendingResponse(OperationContext operationContext) { if (!_pendingRequestId.HasValue) { throw new InvalidOperationException(); } - ReadingRendingResponseStarted(); - _pendingReadTask ??= ConfigurePendingReadsTask(ContinueReadAsync()); - using var pendingReadsOperationContext = CreatePendingReadsContext(operationContext); + PendingResponseStarted(); + using var pendingResponseContext = CreatePendingResponseContext(operationContext); try { - pendingReadsOperationContext.WaitTask(_pendingReadTask); - ReadingRendingResponseDone(pendingReadsOperationContext.Elapsed); - ClearPendingReads(); + pendingResponseContext.WaitTask(_pendingReadTask); + PendingResponseDone(pendingResponseContext.Elapsed); } catch (Exception ex) { - ReadingRendingResponseFailed(pendingReadsOperationContext.Elapsed, ex); - HandlePendingReadsException(ex); + PendingResponseFailed(pendingResponseContext.Elapsed, ex); throw; } } - public async Task DiscardPendingReadsAsync(OperationContext operationContext) + public async Task DiscardPendingResponseAsync(OperationContext operationContext) { if (!_pendingRequestId.HasValue) { throw new InvalidOperationException(); } - ReadingRendingResponseStarted(); - _pendingReadTask ??= ConfigurePendingReadsTask(ContinueReadAsync()); - using var pendingReadsOperationContext = CreatePendingReadsContext(operationContext); + PendingResponseStarted(); + using var pendingResponseContext = CreatePendingResponseContext(operationContext); try { - await pendingReadsOperationContext.WaitTaskAsync(_pendingReadTask).ConfigureAwait(false); - ReadingRendingResponseDone(pendingReadsOperationContext.Elapsed); - ClearPendingReads(); + await pendingResponseContext.WaitTaskAsync(_pendingReadTask).ConfigureAwait(false); + PendingResponseDone(pendingResponseContext.Elapsed); } catch (Exception ex) { - ReadingRendingResponseFailed(pendingReadsOperationContext.Elapsed, ex); - HandlePendingReadsException(ex); + PendingResponseFailed(pendingResponseContext.Elapsed, ex); throw; } } public IByteBuffer ReadResponse(OperationContext operationContext) { - // TODO: remove this operationContext fork together with removal of SocketTimeout. - using var readOperationContext = operationContext.Timeout != null ? operationContext.Fork() : operationContext.WithTimeout(_connection._socketReadTimeout); + var remainingTimeout = operationContext.RemainingTimeoutOrDefault(_socketReadTimeout); + using var readOperationContext = operationContext.WithTimeout(remainingTimeout); try { @@ -824,7 +818,7 @@ public IByteBuffer ReadResponse(OperationContext operationContext) _byteBuffer.MakeReadOnly(); var result = _byteBuffer; - ClearPendingReads(); + ClearPendingResponse(); return result; } catch (Exception ex) @@ -843,24 +837,24 @@ public async Task ReadResponseAsync(OperationContext operationConte { var readTask = ContinueReadAsync(); - // TODO: remove this operationContext fork together with removal of SocketTimeout. - using var readOperationContext = operationContext.Timeout != null ? operationContext.Fork() : operationContext.WithTimeout(_connection._socketReadTimeout); + var remainingTimeout = operationContext.RemainingTimeoutOrDefault(_socketReadTimeout); + using var readOperationContext = operationContext.WithTimeout(remainingTimeout); try { await readOperationContext.WaitTaskAsync(readTask).ConfigureAwait(false); var result = await readTask.ConfigureAwait(false); - ClearPendingReads(); + ClearPendingResponse(); return result; } catch (Exception ex) when (ex is TimeoutException or TaskCanceledException or TaskCanceledException) { - _pendingReadTask = ConfigurePendingReadsTask(readTask); + _pendingReadTask = ConfigurePendingResponseTask(readTask); throw; } } - public void SetPendingReads(long requestId) + public void SetPendingResponse(long requestId) { if (_pendingRequestId.HasValue) { @@ -870,7 +864,7 @@ public void SetPendingReads(long requestId) _pendingRequestId = requestId; } - private OperationContext CreatePendingReadsContext(OperationContext operationContext) + private OperationContext CreatePendingResponseContext(OperationContext operationContext) { var spend = _clock.TimeSince(_lastReadTimestamp); var pendingReadsRemainingTimeout = __pendingReadsWindow - spend; @@ -882,7 +876,7 @@ private OperationContext CreatePendingReadsContext(OperationContext operationCon return operationContext.WithTimeout(pendingReadsRemainingTimeout); } - private void HandlePendingReadsException(Exception ex) + private void HandlePendingResponseException(Exception ex) { if (ex is TimeoutException && __pendingReadsWindow - _clock.TimeSince(_lastReadTimestamp) > TimeSpan.FromMilliseconds(1)) { @@ -893,7 +887,7 @@ private void HandlePendingReadsException(Exception ex) _connection.ConnectionFailed(ex); } - private void ClearPendingReads() + private void ClearPendingResponse() { _bytesRead = 0; _byteBuffer = null; @@ -902,7 +896,7 @@ private void ClearPendingReads() _lastReadTimestamp = 0; } - private Task ConfigurePendingReadsTask(Task readTask) + private Task ConfigurePendingResponseTask(Task readTask) { _lastReadTimestamp = _clock.GetTimestamp(); readTask.ContinueWith(t => @@ -990,22 +984,25 @@ private async Task ReadMessageSizeAsync() return messageSize; } - private void ReadingRendingResponseStarted() + private void PendingResponseStarted() { + _pendingReadTask ??= ConfigurePendingResponseTask(ContinueReadAsync()); _connection._eventLogger.LogAndPublish( new ConnectionReadingPendingResponseEvent(_connection._connectionId, _pendingRequestId.Value)); } - private void ReadingRendingResponseFailed(TimeSpan duration, Exception ex) + private void PendingResponseFailed(TimeSpan duration, Exception ex) { _connection._eventLogger.LogAndPublish( new ConnectionReadingPendingResponseFailedEvent(_connection._connectionId, _pendingRequestId.Value, duration, ex)); + HandlePendingResponseException(ex); } - private void ReadingRendingResponseDone(TimeSpan duration) + private void PendingResponseDone(TimeSpan duration) { _connection._eventLogger.LogAndPublish( new ConnectionReadPendingResponseEvent(_connection._connectionId, _pendingRequestId.Value, duration)); + ClearPendingResponse(); } private void ReadBytes(OperationContext operationContext, byte[] destination, int offset, int count) @@ -1030,7 +1027,7 @@ private void ReadBytes(OperationContext operationContext, byte[] destination, in { var asyncResultTask = Task.Factory.FromAsync(operation, ReadCallback); - _pendingReadTask = ConfigurePendingReadsTask(ContinueReadAsync(asyncResultTask)); + _pendingReadTask = ConfigurePendingResponseTask(ContinueReadAsync(asyncResultTask)); operationContext.CancellationToken.ThrowIfCancellationRequested(); throw new TimeoutException(); } @@ -1255,7 +1252,7 @@ public void SentMessage(RequestMessage message, int bufferLength) if (message.ResponseExpected) { - _connection._responseHelper.SetPendingReads(message.RequestId); + _connection._responseHelper.SetPendingResponse(message.RequestId); } if (_connection._commandEventHelper.ShouldCallAfterSending) diff --git a/src/MongoDB.Driver/OperationContext.cs b/src/MongoDB.Driver/OperationContext.cs index e86617381f7..b20e2100ad1 100644 --- a/src/MongoDB.Driver/OperationContext.cs +++ b/src/MongoDB.Driver/OperationContext.cs @@ -143,7 +143,7 @@ public void WaitTask(Task task) return; } - var timeout = RemainingTimeout; + var timeout = TimeSpan.FromMilliseconds(Math.Ceiling(RemainingTimeout.TotalMilliseconds)); if (timeout == TimeSpan.Zero) { throw new TimeoutException(); diff --git a/tests/MongoDB.Driver.Tests/UnifiedTestOperations/UnifiedEntityMap.cs b/tests/MongoDB.Driver.Tests/UnifiedTestOperations/UnifiedEntityMap.cs index 9a08c27ce93..a300ec439ae 100644 --- a/tests/MongoDB.Driver.Tests/UnifiedTestOperations/UnifiedEntityMap.cs +++ b/tests/MongoDB.Driver.Tests/UnifiedTestOperations/UnifiedEntityMap.cs @@ -811,6 +811,7 @@ private IGridFSBucket CreateBucket(BsonDocument entity, Dictionary