diff --git a/src/strands/tools/mcp/mcp_client.py b/src/strands/tools/mcp/mcp_client.py index 2fe006466..1182b0d0f 100644 --- a/src/strands/tools/mcp/mcp_client.py +++ b/src/strands/tools/mcp/mcp_client.py @@ -21,11 +21,20 @@ import anyio from mcp import ClientSession, ListToolsResult from mcp.client.session import ElicitationFnT -from mcp.types import BlobResourceContents, GetPromptResult, ListPromptsResult, TextResourceContents +from mcp.types import ( + BlobResourceContents, + GetPromptResult, + ListPromptsResult, + ListResourcesResult, + ListResourceTemplatesResult, + ReadResourceResult, + TextResourceContents, +) from mcp.types import CallToolResult as MCPCallToolResult from mcp.types import EmbeddedResource as MCPEmbeddedResource from mcp.types import ImageContent as MCPImageContent from mcp.types import TextContent as MCPTextContent +from pydantic import AnyUrl from typing_extensions import Protocol, TypedDict from ...experimental.tools import ToolProvider @@ -429,6 +438,82 @@ async def _get_prompt_async() -> GetPromptResult: return get_prompt_result + def list_resources_sync(self, pagination_token: Optional[str] = None) -> ListResourcesResult: + """Synchronously retrieves the list of available resources from the MCP server. + + This method calls the asynchronous list_resources method on the MCP session + and returns the raw ListResourcesResult with pagination support. + + Args: + pagination_token: Optional token for pagination + + Returns: + ListResourcesResult: The raw MCP response containing resources and pagination info + """ + self._log_debug_with_thread("listing MCP resources synchronously") + if not self._is_session_active(): + raise MCPClientInitializationError(CLIENT_SESSION_NOT_RUNNING_ERROR_MESSAGE) + + async def _list_resources_async() -> ListResourcesResult: + return await cast(ClientSession, self._background_thread_session).list_resources(cursor=pagination_token) + + list_resources_result: ListResourcesResult = self._invoke_on_background_thread(_list_resources_async()).result() + self._log_debug_with_thread("received %d resources from MCP server", len(list_resources_result.resources)) + + return list_resources_result + + def read_resource_sync(self, uri: AnyUrl | str) -> ReadResourceResult: + """Synchronously reads a resource from the MCP server. + + Args: + uri: The URI of the resource to read + + Returns: + ReadResourceResult: The resource content from the MCP server + """ + self._log_debug_with_thread("reading MCP resource synchronously: %s", uri) + if not self._is_session_active(): + raise MCPClientInitializationError(CLIENT_SESSION_NOT_RUNNING_ERROR_MESSAGE) + + async def _read_resource_async() -> ReadResourceResult: + # Convert string to AnyUrl if needed + resource_uri = AnyUrl(uri) if isinstance(uri, str) else uri + return await cast(ClientSession, self._background_thread_session).read_resource(resource_uri) + + read_resource_result: ReadResourceResult = self._invoke_on_background_thread(_read_resource_async()).result() + self._log_debug_with_thread("received resource content from MCP server") + + return read_resource_result + + def list_resource_templates_sync(self, pagination_token: Optional[str] = None) -> ListResourceTemplatesResult: + """Synchronously retrieves the list of available resource templates from the MCP server. + + Resource templates define URI patterns that can be used to access resources dynamically. + + Args: + pagination_token: Optional token for pagination + + Returns: + ListResourceTemplatesResult: The raw MCP response containing resource templates and pagination info + """ + self._log_debug_with_thread("listing MCP resource templates synchronously") + if not self._is_session_active(): + raise MCPClientInitializationError(CLIENT_SESSION_NOT_RUNNING_ERROR_MESSAGE) + + async def _list_resource_templates_async() -> ListResourceTemplatesResult: + return await cast(ClientSession, self._background_thread_session).list_resource_templates( + cursor=pagination_token + ) + + list_resource_templates_result: ListResourceTemplatesResult = self._invoke_on_background_thread( + _list_resource_templates_async() + ).result() + self._log_debug_with_thread( + "received %d resource templates from MCP server", len(list_resource_templates_result.resourceTemplates) + ) + + return list_resource_templates_result + def call_tool_sync( self, tool_use_id: str, diff --git a/tests/strands/tools/mcp/test_mcp_client.py b/tests/strands/tools/mcp/test_mcp_client.py index 130a4703e..ce0d75922 100644 --- a/tests/strands/tools/mcp/test_mcp_client.py +++ b/tests/strands/tools/mcp/test_mcp_client.py @@ -5,9 +5,21 @@ import pytest from mcp import ListToolsResult from mcp.types import CallToolResult as MCPCallToolResult -from mcp.types import GetPromptResult, ListPromptsResult, Prompt, PromptMessage +from mcp.types import ( + GetPromptResult, + ListPromptsResult, + ListResourcesResult, + ListResourceTemplatesResult, + Prompt, + PromptMessage, + ReadResourceResult, + Resource, + ResourceTemplate, + TextResourceContents, +) from mcp.types import TextContent as MCPTextContent from mcp.types import Tool as MCPTool +from pydantic import AnyUrl from strands.tools.mcp import MCPClient from strands.tools.mcp.mcp_types import MCPToolResult @@ -688,3 +700,143 @@ def __init__(self): mock_session.call_tool.assert_called_once_with("get_file_contents", {}, None) assert result["status"] == "success" assert len(result["content"]) == 0 # Unknown resource type should be dropped + + +# Resource Tests - Sync Methods + + +def test_list_resources_sync(mock_transport, mock_session): + """Test that list_resources_sync correctly retrieves resources.""" + mock_resource = Resource( + uri=AnyUrl("file://documents/test.txt"), name="test.txt", description="A test document", mimeType="text/plain" + ) + mock_session.list_resources.return_value = ListResourcesResult(resources=[mock_resource]) + + with MCPClient(mock_transport["transport_callable"]) as client: + result = client.list_resources_sync() + + mock_session.list_resources.assert_called_once_with(cursor=None) + assert len(result.resources) == 1 + assert result.resources[0].name == "test.txt" + assert str(result.resources[0].uri) == "file://documents/test.txt" + assert result.nextCursor is None + + +def test_list_resources_sync_with_pagination_token(mock_transport, mock_session): + """Test that list_resources_sync correctly passes pagination token and returns next cursor.""" + mock_resource = Resource( + uri=AnyUrl("file://documents/test.txt"), name="test.txt", description="A test document", mimeType="text/plain" + ) + mock_session.list_resources.return_value = ListResourcesResult(resources=[mock_resource], nextCursor="next_page") + + with MCPClient(mock_transport["transport_callable"]) as client: + result = client.list_resources_sync(pagination_token="current_page") + + mock_session.list_resources.assert_called_once_with(cursor="current_page") + assert len(result.resources) == 1 + assert result.resources[0].name == "test.txt" + assert result.nextCursor == "next_page" + + +def test_list_resources_sync_session_not_active(): + """Test that list_resources_sync raises an error when session is not active.""" + client = MCPClient(MagicMock()) + + with pytest.raises(MCPClientInitializationError, match="client session is not running"): + client.list_resources_sync() + + +def test_read_resource_sync(mock_transport, mock_session): + """Test that read_resource_sync correctly reads a resource.""" + mock_content = TextResourceContents( + uri=AnyUrl("file://documents/test.txt"), text="Resource content", mimeType="text/plain" + ) + mock_session.read_resource.return_value = ReadResourceResult(contents=[mock_content]) + + with MCPClient(mock_transport["transport_callable"]) as client: + result = client.read_resource_sync("file://documents/test.txt") + + # Verify the session method was called + mock_session.read_resource.assert_called_once() + # Check the URI argument (it will be wrapped as AnyUrl) + call_args = mock_session.read_resource.call_args[0] + assert str(call_args[0]) == "file://documents/test.txt" + + assert len(result.contents) == 1 + assert result.contents[0].text == "Resource content" + + +def test_read_resource_sync_with_anyurl(mock_transport, mock_session): + """Test that read_resource_sync correctly handles AnyUrl input.""" + mock_content = TextResourceContents( + uri=AnyUrl("file://documents/test.txt"), text="Resource content", mimeType="text/plain" + ) + mock_session.read_resource.return_value = ReadResourceResult(contents=[mock_content]) + + with MCPClient(mock_transport["transport_callable"]) as client: + uri = AnyUrl("file://documents/test.txt") + result = client.read_resource_sync(uri) + + mock_session.read_resource.assert_called_once() + call_args = mock_session.read_resource.call_args[0] + assert str(call_args[0]) == "file://documents/test.txt" + + assert len(result.contents) == 1 + assert result.contents[0].text == "Resource content" + + +def test_read_resource_sync_session_not_active(): + """Test that read_resource_sync raises an error when session is not active.""" + client = MCPClient(MagicMock()) + + with pytest.raises(MCPClientInitializationError, match="client session is not running"): + client.read_resource_sync("file://documents/test.txt") + + +def test_list_resource_templates_sync(mock_transport, mock_session): + """Test that list_resource_templates_sync correctly retrieves resource templates.""" + mock_template = ResourceTemplate( + uriTemplate="file://documents/{name}", + name="document_template", + description="Template for documents", + mimeType="text/plain", + ) + mock_session.list_resource_templates.return_value = ListResourceTemplatesResult(resourceTemplates=[mock_template]) + + with MCPClient(mock_transport["transport_callable"]) as client: + result = client.list_resource_templates_sync() + + mock_session.list_resource_templates.assert_called_once_with(cursor=None) + assert len(result.resourceTemplates) == 1 + assert result.resourceTemplates[0].name == "document_template" + assert result.resourceTemplates[0].uriTemplate == "file://documents/{name}" + assert result.nextCursor is None + + +def test_list_resource_templates_sync_with_pagination_token(mock_transport, mock_session): + """Test that list_resource_templates_sync correctly passes pagination token and returns next cursor.""" + mock_template = ResourceTemplate( + uriTemplate="file://documents/{name}", + name="document_template", + description="Template for documents", + mimeType="text/plain", + ) + mock_session.list_resource_templates.return_value = ListResourceTemplatesResult( + resourceTemplates=[mock_template], nextCursor="next_page" + ) + + with MCPClient(mock_transport["transport_callable"]) as client: + result = client.list_resource_templates_sync(pagination_token="current_page") + + mock_session.list_resource_templates.assert_called_once_with(cursor="current_page") + assert len(result.resourceTemplates) == 1 + assert result.resourceTemplates[0].name == "document_template" + assert result.nextCursor == "next_page" + + +def test_list_resource_templates_sync_session_not_active(): + """Test that list_resource_templates_sync raises an error when session is not active.""" + client = MCPClient(MagicMock()) + + with pytest.raises(MCPClientInitializationError, match="client session is not running"): + client.list_resource_templates_sync()