From a100502ce4896567b44241751c640b87017add67 Mon Sep 17 00:00:00 2001 From: Daniel Gerlag Date: Tue, 4 Nov 2025 17:02:05 -0800 Subject: [PATCH 1/2] add drasi tool Signed-off-by: Daniel Gerlag --- src/oss/python/integrations/tools/drasi.mdx | 265 ++++++++++++++++++++ src/oss/python/integrations/tools/index.mdx | 2 + 2 files changed, 267 insertions(+) create mode 100644 src/oss/python/integrations/tools/drasi.mdx diff --git a/src/oss/python/integrations/tools/drasi.mdx b/src/oss/python/integrations/tools/drasi.mdx new file mode 100644 index 000000000..ab8d97514 --- /dev/null +++ b/src/oss/python/integrations/tools/drasi.mdx @@ -0,0 +1,265 @@ +--- +title: Drasi +description: Connect agents to real-time data changes with Drasi's continuous query platform +--- + +This guide provides a quick overview for getting started with the Drasi tool. For a detailed listing of all Drasi features, parameters, and configurations, head to the [Drasi documentation](https://drasi.io/), and the [langchain_drasi](https://github.com/drasi-project/langchain-drasi) repository. + +## Overview + +Drasi is a change detection platform that makes it easy and efficient to detect and react to changes in databases. The LangChain-Drasi integration creates reactive, change-driven AI agents by connecting external data changes with workflow execution. This allows agents to discover, subscribe to, and react to real-time query updates by bridging external data changes with LangGraph workflows. Drasi continuous queries stream real-time updates that trigger agent state transitions, modify memory, or dynamically control workflow execution—transforming static agents into ambient long-lived, responsive systems. + +### Details + +| Class | Package | Serializable | JS support | Downloads | Version | +| :--- | :--- | :---: | :---: | :---: | :---: | +| [DrasiTool](https://github.com/drasi-project/langchain-drasi) | [langchain-drasi](https://pypi.org/project/langchain-drasi/) | ❌ | ❌ | ![PyPI - Downloads](https://img.shields.io/pypi/dm/langchain-drasi?style=flat-square&label=%20) | ![PyPI - Version](https://img.shields.io/pypi/v/langchain-drasi?style=flat-square&label=%20) | + +### Features + +- **Query Discovery** - Automatically identify available Drasi queries +- **Real-time Subscriptions** - Monitor continuous query updates +- **Notification Handlers** - Six built-in handlers for different use cases + - Console + - Logging + - Memory + - Buffer + - LangChain Memory + - LangGraph Memory +- **Custom Handlers** - Extend base handler for domain-specific logic + +--- + +## Setup + +To access the Drasi tool, you'll need to have Drasi and the Drasi MCP server running. + +### Prerequisites + +- [Drasi platform](https://drasi.io/how-to-guides/installation/) - Installed and running +- [Drasi MCP server](https://github.com/drasi-project/drasi-platform/tree/main/reactions/mcp) - Configured and accessible +- Python 3.11+ - Required for the langchain-drasi package + +### Credentials (Optional) + +If your Drasi MCP server requires authentication, you can configure headers with Bearer tokens or other authentication methods: + +```python Configure authentication icon="key" +from langchain_drasi import MCPConnectionConfig + +config = MCPConnectionConfig( + server_url="http://localhost:8083", + headers={"Authorization": "Bearer your-token"}, + timeout=30.0 +) +``` + +### Installation + +The Drasi tool lives in the `langchain-drasi` package: + + + ```python pip + pip install -U langchain-drasi + ``` + ```python uv + uv add langchain-drasi + ``` + + +--- + +## Instantiation + +Now we can instantiate an instance of the Drasi tool. You'll need to configure the MCP connection and optionally add notification handlers to process real-time updates: + +```python Initialize tool instance icon="robot" +from langchain_drasi import create_drasi_tool, MCPConnectionConfig, ConsoleHandler + +# Configure connection to Drasi MCP server +config = MCPConnectionConfig( + server_url="http://localhost:8083", + timeout=30.0 +) + +# Create a notification handler +handler = ConsoleHandler() + +# Create the tool +tool = create_drasi_tool( + mcp_config=config, + notification_handlers=[handler] +) +``` + +--- + +## Invocation + +### Directly + +Below is a simple example of calling the tool directly. + +```python Call tool icon="rocket" +# Discover available queries +queries = await tool.discover_queries() +# Returns: [QueryInfo, QueryInfo, ...] + +# Subscribe to a specific query +await tool.subscribe("hot-freezers") +# Notifications routed to registered handlers + +# Read current results from a query +result = await tool.read_query("active-orders") +# Returns: QueryResult with current data +``` + +### As a ToolCall + +We can also invoke the tool with a model-generated `ToolCall`, in which case a @[`ToolMessage`] will be returned. + +### Within an agent + +We can use the Drasi tool in a LangGraph agent to create reactive, event-driven workflows. For this we will need a model with tool-calling capabilities. + +```python Agent with tool icon="robot" +from langchain_anthropic import ChatAnthropic +from langgraph.prebuilt import create_react_agent + +# Initialize the model +model = ChatAnthropic(model="claude-sonnet-4-5-20250929") + +# Create agent with Drasi tool +agent = create_react_agent(model, [tool]) + +# Run the agent +result = agent.invoke( + {"messages": [{"role": "user", "content": "What queries are available?"}]} +) + +print(result["messages"][-1].content) + +result = agent.invoke( + {"messages": [{"role": "user", "content": "Subscribe to the customer-orders query"}]} +) + +print(result["messages"][-1].content) +``` + +--- + +## Notification Handlers + +One of Drasi's key features is its built-in notification handlers that process real-time query result changes. You can use these handlers to take specific actions based on the data changes. + +### Built-in Handlers + +**ConsoleHandler** - Outputs formatted notifications to stdout: + +```python +from langchain_drasi import ConsoleHandler + +handler = ConsoleHandler() +``` + +**LoggingHandler** - Logs notifications using Python's logging framework: + +```python +from langchain_drasi import LoggingHandler +import logging + +handler = LoggingHandler( + logger_name="drasi.notifications", + log_level=logging.INFO +) +``` + +**MemoryHandler** - Stores notifications in memory with optional filtering: + +```python +from langchain_drasi import MemoryHandler + +handler = MemoryHandler(max_size=100) + +# Retrieve notifications +all_notifs = handler.get_all() +freezer_notifs = handler.get_by_query("hot-freezers") +added_events = handler.get_by_type("added") +``` + +**BufferHandler** - FIFO queue for sequential processing: + +This is useful for buffering incoming change notifications when you workflow is busy on something else, you can then have a loop in the workflow to consume the notifications from the buffer when it is ready. + +```python +from langchain_drasi import BufferHandler + +handler = BufferHandler(max_size=100) +# Later, consume notifications +notification = handler.consume() # Remove and return next notification +notification = handler.peek() # View next notification without removing +``` + +**LangGraphMemoryHandler** - Inject updates directly into LangGraph checkpoints: + +```python +from langchain_drasi import LangGraphMemoryHandler +from langgraph.checkpoint.memory import MemorySaver + +checkpoint_manager = MemorySaver() +handler = LangGraphMemoryHandler( + checkpointer=checkpoint_manager, + thread_id="your-thread-id" +) +``` + +### Custom Handlers + +You can create custom handlers by extending `BaseDrasiNotificationHandler`: + +```python +from langchain_drasi import BaseDrasiNotificationHandler + +class CustomHandler(BaseDrasiNotificationHandler): + def on_result_added(self, query_name: str, added_data: dict): + # Handle new results + print(f"New result in {query_name}: {added_data}") + + def on_result_updated(self, query_name: str, updated_data: dict): + # Handle updated results + print(f"Updated result in {query_name}: {updated_data}") + + def on_result_deleted(self, query_name: str, deleted_data: dict): + # Handle deleted results + print(f"Deleted result in {query_name}: {deleted_data}") + +handler = CustomHandler() +tool = create_drasi_tool( + mcp_config=config, + notification_handlers=[handler] +) +``` + +--- + +## Examples + +- [Interactive Chat](https://github.com/drasi-project/langchain-drasi/tree/main/examples/chat): A chat application that uses Drasi for real-time memory updates. +- [Terminator Game](https://github.com/drasi-project/langchain-drasi/tree/main/examples/terminator): A game that leverages Drasi for dynamic NPC behavior. + +## Use Cases + +Drasi is particularly useful for building ambient agents that need to react to real-time data changes. Some example use cases include: + +- **AI Co-pilots** - Assistants that monitor and respond to system events +- **AI game players** - NPCs that adapt to in-game events +- **IoT Monitoring** - Agents that process sensor data streams +- **Customer Support** - Bots that react to ticket updates or customer actions +- **DevOps Assistants** - Tools that monitor infrastructure changes +- **Collaborative Editing** - Systems that respond to document or code changes + +--- + +## API reference + +For detailed documentation of all Drasi features and configurations, head to the [API reference](https://github.com/drasi-project/langchain-drasi). diff --git a/src/oss/python/integrations/tools/index.mdx b/src/oss/python/integrations/tools/index.mdx index 7941ec2cc..e4bb03e79 100644 --- a/src/oss/python/integrations/tools/index.mdx +++ b/src/oss/python/integrations/tools/index.mdx @@ -75,6 +75,7 @@ The following table shows tools that can be used to automate tasks in databases: | [MCP Toolbox](/oss/integrations/tools/toolbox) | Any SQL operation | | [SQLDatabase Toolkit](/oss/integrations/tools/sql_database) | Any SQL operation | | [Spark SQL Toolkit](/oss/integrations/tools/spark_sql) | Any SQL operation | +| [Drasi Toolkit](/oss/integrations/tools/drasi) | Real-time database change detection | ## Finance @@ -130,6 +131,7 @@ The following platforms provide access to multiple tools and services through a + From 009edb8cfb29a2a3afba0e002ac3660e6ee31d33 Mon Sep 17 00:00:00 2001 From: Daniel Gerlag Date: Tue, 4 Nov 2025 17:16:33 -0800 Subject: [PATCH 2/2] Update src/oss/python/integrations/tools/drasi.mdx Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- src/oss/python/integrations/tools/drasi.mdx | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/oss/python/integrations/tools/drasi.mdx b/src/oss/python/integrations/tools/drasi.mdx index ab8d97514..ef8406884 100644 --- a/src/oss/python/integrations/tools/drasi.mdx +++ b/src/oss/python/integrations/tools/drasi.mdx @@ -189,7 +189,7 @@ added_events = handler.get_by_type("added") **BufferHandler** - FIFO queue for sequential processing: -This is useful for buffering incoming change notifications when you workflow is busy on something else, you can then have a loop in the workflow to consume the notifications from the buffer when it is ready. +This is useful for buffering incoming change notifications when your workflow is busy on something else; you can then have a loop in the workflow to consume the notifications from the buffer when it is ready. ```python from langchain_drasi import BufferHandler