Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .evergreen/resync-specs.sh
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,9 @@ do
change-streams|change_streams)
cpjson change-streams/tests/ change_streams/
;;
client-backpressure|client_backpressure)
cpjson client-backpressure/tests client-backpressure
;;
client-side-encryption|csfle|fle)
cpjson client-side-encryption/tests/ client-side-encryption/spec
cpjson client-side-encryption/corpus/ client-side-encryption/corpus
Expand Down
1 change: 1 addition & 0 deletions pymongo/asynchronous/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,7 @@ async def _hello(
cmd = self.hello_cmd()
performing_handshake = not self.performed_handshake
awaitable = False
cmd["backpressure"] = True
if performing_handshake:
self.performed_handshake = True
cmd["client"] = self.opts.metadata
Expand Down
1 change: 1 addition & 0 deletions pymongo/synchronous/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,7 @@ def _hello(
cmd = self.hello_cmd()
performing_handshake = not self.performed_handshake
awaitable = False
cmd["backpressure"] = True
if performing_handshake:
self.performed_handshake = True
cmd["client"] = self.opts.metadata
Expand Down
114 changes: 114 additions & 0 deletions test/asynchronous/test_client_backpressure.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
# Copyright 2025-present MongoDB, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import annotations

import os
import pathlib
import sys
from time import perf_counter
from unittest.mock import patch

from pymongo.errors import OperationFailure

sys.path[0:0] = [""]

from test.asynchronous import (
AsyncIntegrationTest,
async_client_context,
unittest,
)
from test.asynchronous.unified_format import generate_test_classes
from test.utils_shared import EventListener, OvertCommandListener

_IS_SYNC = False


class AsyncTestClientBackpressure(AsyncIntegrationTest):
listener: EventListener

@classmethod
def setUpClass(cls) -> None:
cls.listener = OvertCommandListener()

@async_client_context.require_connection
async def asyncSetUp(self) -> None:
await super().asyncSetUp()
self.listener.reset()
self.app_name = self.__class__.__name__.lower()
self.client = await self.async_rs_or_single_client(
event_listeners=[self.listener], retryWrites=False, appName=self.app_name
)

@patch("random.random")
async def test_01_operation_retry_uses_exponential_backoff(self, random_func):
# Drivers should test that retries do not occur immediately when a SystemOverloadedError is encountered.

# 1. let `client` be a `MongoClient`
client = self.client

# 2. let `collection` be a collection
collection = client.test.test

# 3. Now, run transactions without backoff:

# a. Configure the random number generator used for jitter to always return `0` -- this effectively disables backoff.
random_func.return_value = 0

# b. Configure the following failPoint:
fail_point = dict(
mode="alwaysOn",
data=dict(
failCommands=["insert"],
errorCode=2,
errorLabels=["SystemOverloadedError", "RetryableError"],
appName=self.app_name,
),
)
async with self.fail_point(fail_point):
# c. Execute the following command. Expect that the command errors. Measure the duration of the command execution.
start0 = perf_counter()
with self.assertRaises(OperationFailure):
await collection.insert_one({"a": 1})
end0 = perf_counter()

# d. Configure the random number generator used for jitter to always return `1`.
random_func.return_value = 1

# e. Execute step c again.
start1 = perf_counter()
with self.assertRaises(OperationFailure):
await collection.insert_one({"a": 1})
end1 = perf_counter()

# f. Compare the two time between the two runs.
# The sum of 5 backoffs is 3.1 seconds. There is a 1-second window to account for potential variance between the two
# runs.
self.assertTrue(abs((end1 - start1) - (end0 - start0 + 3.1)) < 1)


# Location of JSON test specifications.
if _IS_SYNC:
_TEST_PATH = os.path.join(pathlib.Path(__file__).resolve().parent, "client-backpressure")
else:
_TEST_PATH = os.path.join(pathlib.Path(__file__).resolve().parent.parent, "client-backpressure")

globals().update(
generate_test_classes(
_TEST_PATH,
module=__name__,
)
)

if __name__ == "__main__":
unittest.main()
13 changes: 13 additions & 0 deletions test/asynchronous/test_client_metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,19 @@ async def test_duplicate_driver_name_no_op(self):
# add same metadata again
await self.check_metadata_added(client, "Framework", None, None)

async def test_handshake_documents_include_backpressure(self):
# Create a `MongoClient` that is configured to record all handshake documents sent to the server as a part of
# connection establishment.
client = await self.async_rs_or_single_client("mongodb://" + self.server.address_string)

# Send a `ping` command to the server and verify that the command succeeds. This ensure that a connection is
# established on all topologies. Note: MockupDB only supports standalone servers.
await client.admin.command("ping")

# Assert that for every handshake document intercepted:
# the document has a field `backpressure` whose value is `true`.
self.assertEqual(self.handshake_req["backpressure"], True)


if __name__ == "__main__":
unittest.main()
Loading
Loading