Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 86 additions & 1 deletion src/strands/tools/mcp/mcp_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,20 @@
import anyio
from mcp import ClientSession, ListToolsResult
from mcp.client.session import ElicitationFnT
from mcp.types import BlobResourceContents, GetPromptResult, ListPromptsResult, TextResourceContents
from mcp.types import (
BlobResourceContents,
GetPromptResult,
ListPromptsResult,
ListResourcesResult,
ListResourceTemplatesResult,
ReadResourceResult,
TextResourceContents,
)
from mcp.types import CallToolResult as MCPCallToolResult
from mcp.types import EmbeddedResource as MCPEmbeddedResource
from mcp.types import ImageContent as MCPImageContent
from mcp.types import TextContent as MCPTextContent
from pydantic import AnyUrl
from typing_extensions import Protocol, TypedDict

from ...experimental.tools import ToolProvider
Expand Down Expand Up @@ -429,6 +438,82 @@ async def _get_prompt_async() -> GetPromptResult:

return get_prompt_result

def list_resources_sync(self, pagination_token: Optional[str] = None) -> ListResourcesResult:
"""Synchronously retrieves the list of available resources from the MCP server.

This method calls the asynchronous list_resources method on the MCP session
and returns the raw ListResourcesResult with pagination support.

Args:
pagination_token: Optional token for pagination

Returns:
ListResourcesResult: The raw MCP response containing resources and pagination info
"""
self._log_debug_with_thread("listing MCP resources synchronously")
if not self._is_session_active():
raise MCPClientInitializationError(CLIENT_SESSION_NOT_RUNNING_ERROR_MESSAGE)

async def _list_resources_async() -> ListResourcesResult:
return await cast(ClientSession, self._background_thread_session).list_resources(cursor=pagination_token)

list_resources_result: ListResourcesResult = self._invoke_on_background_thread(_list_resources_async()).result()
self._log_debug_with_thread("received %d resources from MCP server", len(list_resources_result.resources))

return list_resources_result

def read_resource_sync(self, uri: AnyUrl | str) -> ReadResourceResult:
"""Synchronously reads a resource from the MCP server.

Args:
uri: The URI of the resource to read

Returns:
ReadResourceResult: The resource content from the MCP server
"""
self._log_debug_with_thread("reading MCP resource synchronously: %s", uri)
if not self._is_session_active():
raise MCPClientInitializationError(CLIENT_SESSION_NOT_RUNNING_ERROR_MESSAGE)

async def _read_resource_async() -> ReadResourceResult:
# Convert string to AnyUrl if needed
resource_uri = AnyUrl(uri) if isinstance(uri, str) else uri
return await cast(ClientSession, self._background_thread_session).read_resource(resource_uri)

read_resource_result: ReadResourceResult = self._invoke_on_background_thread(_read_resource_async()).result()
self._log_debug_with_thread("received resource content from MCP server")

return read_resource_result

def list_resource_templates_sync(self, pagination_token: Optional[str] = None) -> ListResourceTemplatesResult:
"""Synchronously retrieves the list of available resource templates from the MCP server.

Resource templates define URI patterns that can be used to access resources dynamically.

Args:
pagination_token: Optional token for pagination

Returns:
ListResourceTemplatesResult: The raw MCP response containing resource templates and pagination info
"""
self._log_debug_with_thread("listing MCP resource templates synchronously")
if not self._is_session_active():
raise MCPClientInitializationError(CLIENT_SESSION_NOT_RUNNING_ERROR_MESSAGE)

async def _list_resource_templates_async() -> ListResourceTemplatesResult:
return await cast(ClientSession, self._background_thread_session).list_resource_templates(
cursor=pagination_token
)

list_resource_templates_result: ListResourceTemplatesResult = self._invoke_on_background_thread(
_list_resource_templates_async()
).result()
self._log_debug_with_thread(
"received %d resource templates from MCP server", len(list_resource_templates_result.resourceTemplates)
)

return list_resource_templates_result

def call_tool_sync(
self,
tool_use_id: str,
Expand Down
154 changes: 153 additions & 1 deletion tests/strands/tools/mcp/test_mcp_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,21 @@
import pytest
from mcp import ListToolsResult
from mcp.types import CallToolResult as MCPCallToolResult
from mcp.types import GetPromptResult, ListPromptsResult, Prompt, PromptMessage
from mcp.types import (
GetPromptResult,
ListPromptsResult,
ListResourcesResult,
ListResourceTemplatesResult,
Prompt,
PromptMessage,
ReadResourceResult,
Resource,
ResourceTemplate,
TextResourceContents,
)
from mcp.types import TextContent as MCPTextContent
from mcp.types import Tool as MCPTool
from pydantic import AnyUrl

from strands.tools.mcp import MCPClient
from strands.tools.mcp.mcp_types import MCPToolResult
Expand Down Expand Up @@ -688,3 +700,143 @@ def __init__(self):
mock_session.call_tool.assert_called_once_with("get_file_contents", {}, None)
assert result["status"] == "success"
assert len(result["content"]) == 0 # Unknown resource type should be dropped


# Resource Tests - Sync Methods


def test_list_resources_sync(mock_transport, mock_session):
"""Test that list_resources_sync correctly retrieves resources."""
mock_resource = Resource(
uri=AnyUrl("file://documents/test.txt"), name="test.txt", description="A test document", mimeType="text/plain"
)
mock_session.list_resources.return_value = ListResourcesResult(resources=[mock_resource])

with MCPClient(mock_transport["transport_callable"]) as client:
result = client.list_resources_sync()

mock_session.list_resources.assert_called_once_with(cursor=None)
assert len(result.resources) == 1
assert result.resources[0].name == "test.txt"
assert str(result.resources[0].uri) == "file://documents/test.txt"
assert result.nextCursor is None


def test_list_resources_sync_with_pagination_token(mock_transport, mock_session):
"""Test that list_resources_sync correctly passes pagination token and returns next cursor."""
mock_resource = Resource(
uri=AnyUrl("file://documents/test.txt"), name="test.txt", description="A test document", mimeType="text/plain"
)
mock_session.list_resources.return_value = ListResourcesResult(resources=[mock_resource], nextCursor="next_page")

with MCPClient(mock_transport["transport_callable"]) as client:
result = client.list_resources_sync(pagination_token="current_page")

mock_session.list_resources.assert_called_once_with(cursor="current_page")
assert len(result.resources) == 1
assert result.resources[0].name == "test.txt"
assert result.nextCursor == "next_page"


def test_list_resources_sync_session_not_active():
"""Test that list_resources_sync raises an error when session is not active."""
client = MCPClient(MagicMock())

with pytest.raises(MCPClientInitializationError, match="client session is not running"):
client.list_resources_sync()


def test_read_resource_sync(mock_transport, mock_session):
"""Test that read_resource_sync correctly reads a resource."""
mock_content = TextResourceContents(
uri=AnyUrl("file://documents/test.txt"), text="Resource content", mimeType="text/plain"
)
mock_session.read_resource.return_value = ReadResourceResult(contents=[mock_content])

with MCPClient(mock_transport["transport_callable"]) as client:
result = client.read_resource_sync("file://documents/test.txt")

# Verify the session method was called
mock_session.read_resource.assert_called_once()
# Check the URI argument (it will be wrapped as AnyUrl)
call_args = mock_session.read_resource.call_args[0]
assert str(call_args[0]) == "file://documents/test.txt"

assert len(result.contents) == 1
assert result.contents[0].text == "Resource content"


def test_read_resource_sync_with_anyurl(mock_transport, mock_session):
"""Test that read_resource_sync correctly handles AnyUrl input."""
mock_content = TextResourceContents(
uri=AnyUrl("file://documents/test.txt"), text="Resource content", mimeType="text/plain"
)
mock_session.read_resource.return_value = ReadResourceResult(contents=[mock_content])

with MCPClient(mock_transport["transport_callable"]) as client:
uri = AnyUrl("file://documents/test.txt")
result = client.read_resource_sync(uri)

mock_session.read_resource.assert_called_once()
call_args = mock_session.read_resource.call_args[0]
assert str(call_args[0]) == "file://documents/test.txt"

assert len(result.contents) == 1
assert result.contents[0].text == "Resource content"


def test_read_resource_sync_session_not_active():
"""Test that read_resource_sync raises an error when session is not active."""
client = MCPClient(MagicMock())

with pytest.raises(MCPClientInitializationError, match="client session is not running"):
client.read_resource_sync("file://documents/test.txt")


def test_list_resource_templates_sync(mock_transport, mock_session):
"""Test that list_resource_templates_sync correctly retrieves resource templates."""
mock_template = ResourceTemplate(
uriTemplate="file://documents/{name}",
name="document_template",
description="Template for documents",
mimeType="text/plain",
)
mock_session.list_resource_templates.return_value = ListResourceTemplatesResult(resourceTemplates=[mock_template])

with MCPClient(mock_transport["transport_callable"]) as client:
result = client.list_resource_templates_sync()

mock_session.list_resource_templates.assert_called_once_with(cursor=None)
assert len(result.resourceTemplates) == 1
assert result.resourceTemplates[0].name == "document_template"
assert result.resourceTemplates[0].uriTemplate == "file://documents/{name}"
assert result.nextCursor is None


def test_list_resource_templates_sync_with_pagination_token(mock_transport, mock_session):
"""Test that list_resource_templates_sync correctly passes pagination token and returns next cursor."""
mock_template = ResourceTemplate(
uriTemplate="file://documents/{name}",
name="document_template",
description="Template for documents",
mimeType="text/plain",
)
mock_session.list_resource_templates.return_value = ListResourceTemplatesResult(
resourceTemplates=[mock_template], nextCursor="next_page"
)

with MCPClient(mock_transport["transport_callable"]) as client:
result = client.list_resource_templates_sync(pagination_token="current_page")

mock_session.list_resource_templates.assert_called_once_with(cursor="current_page")
assert len(result.resourceTemplates) == 1
assert result.resourceTemplates[0].name == "document_template"
assert result.nextCursor == "next_page"


def test_list_resource_templates_sync_session_not_active():
"""Test that list_resource_templates_sync raises an error when session is not active."""
client = MCPClient(MagicMock())

with pytest.raises(MCPClientInitializationError, match="client session is not running"):
client.list_resource_templates_sync()