Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 46 additions & 0 deletions cadence/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from grpc.aio import Channel, ClientInterceptor, secure_channel, insecure_channel
from cadence.api.v1.service_workflow_pb2_grpc import WorkflowAPIStub
from cadence.api.v1.service_workflow_pb2 import (
SignalWorkflowExecutionRequest,
StartWorkflowExecutionRequest,
StartWorkflowExecutionResponse,
)
Expand Down Expand Up @@ -229,6 +230,51 @@ async def start_workflow(
except Exception:
raise

async def signal_workflow(
self,
workflow_id: str,
run_id: str,
signal_name: str,
signal_input: Any = None,
) -> None:
"""
Send a signal to a running workflow execution.

Args:
workflow_id: The workflow ID
run_id: The run ID (can be empty string to signal current run)
signal_name: Name of the signal
signal_input: Input data for the signal

Raises:
ValueError: If signal encoding fails
Exception: If the gRPC call fails
"""
signal_payload = None
if signal_input is not None:
try:
signal_payload = await self.data_converter.to_data(signal_input)
except Exception as e:
raise ValueError(f"Failed to encode signal input: {e}")

workflow_execution = WorkflowExecution()
workflow_execution.workflow_id = workflow_id
if run_id:
workflow_execution.run_id = run_id

signal_request = SignalWorkflowExecutionRequest(
domain=self.domain,
workflow_execution=workflow_execution,
identity=self.identity,
request_id=str(uuid.uuid4()),
signal_name=signal_name,
)

if signal_payload:
signal_request.signal_input.CopyFrom(signal_payload)

await self.workflow_stub.SignalWorkflowExecution(signal_request)


def _validate_and_copy_defaults(options: ClientOptions) -> ClientOptions:
if "target" not in options:
Expand Down
49 changes: 49 additions & 0 deletions tests/integration_tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,3 +135,52 @@ async def test_workflow_stub_start_and_describe(helper: CadenceHelper):
assert task_timeout_seconds == task_timeout.total_seconds(), (
f"task_start_to_close_timeout mismatch: expected {task_timeout.total_seconds()}s, got {task_timeout_seconds}s"
)


@pytest.mark.usefixtures("helper")
async def test_signal_workflow(helper: CadenceHelper):
"""Test signal_workflow method.

This integration test verifies:
1. Starting a workflow execution
2. Sending a signal to the running workflow
3. Signal is accepted (no errors thrown)
"""
async with helper.client() as client:
workflow_type = "test-workflow-signal"
task_list_name = "test-task-list-signal"
workflow_id = "test-workflow-signal-789"
execution_timeout = timedelta(minutes=5)
signal_name = "test-signal"
signal_input = {"action": "update", "value": 42}

execution = await client.start_workflow(
workflow_type,
task_list=task_list_name,
execution_start_to_close_timeout=execution_timeout,
workflow_id=workflow_id,
)

await client.signal_workflow(
workflow_id=execution.workflow_id,
run_id=execution.run_id,
signal_name=signal_name,
signal_input=signal_input,
)

describe_request = DescribeWorkflowExecutionRequest(
domain=DOMAIN_NAME,
workflow_execution=WorkflowExecution(
workflow_id=execution.workflow_id,
run_id=execution.run_id,
),
)

response = await client.workflow_stub.DescribeWorkflowExecution(
describe_request
)

assert (
response.workflow_execution_info.workflow_execution.workflow_id
== workflow_id
)
Loading