From 3da912489c1cfe0855cc0f6a4e584de483540b4e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Florentin=20D=C3=B6rre?= Date: Thu, 6 Nov 2025 15:20:41 +0100 Subject: [PATCH 1/6] Fix all_shortest_paths prefix + add delta.stats --- .../single_source_delta_endpoints.py | 56 +++++++++++++++++++ .../single_source_delta_arrow_endpoints.py | 38 +++++++++++++ .../single_source_delta_cypher_endpoints.py | 36 ++++++++++++ .../session/session_v2_endpoints.py | 2 +- ...est_single_source_delta_arrow_endpoints.py | 14 +++++ ...st_single_source_delta_cypher_endpoints.py | 14 +++++ 6 files changed, 159 insertions(+), 1 deletion(-) diff --git a/graphdatascience/procedure_surface/api/pathfinding/single_source_delta_endpoints.py b/graphdatascience/procedure_surface/api/pathfinding/single_source_delta_endpoints.py index 57808fe8f..5321b5639 100644 --- a/graphdatascience/procedure_surface/api/pathfinding/single_source_delta_endpoints.py +++ b/graphdatascience/procedure_surface/api/pathfinding/single_source_delta_endpoints.py @@ -10,6 +10,13 @@ from graphdatascience.procedure_surface.api.estimation_result import EstimationResult +class DeltaSteppingStatsResult(BaseResult): + pre_processing_millis: int + compute_millis: int + post_processing_millis: int + configuration: dict[str, Any] + + class DeltaSteppingWriteResult(BaseResult): pre_processing_millis: int compute_millis: int @@ -78,6 +85,55 @@ def stream( The shortest path results as a DataFrame with columns for sourceNode, targetNode, totalCost, nodeIds, costs, index. """ + @abstractmethod + def stats( + self, + G: GraphV2, + source_node: int, + delta: float = 2.0, + relationship_weight_property: str | None = None, + relationship_types: list[str] | None = None, + node_labels: list[str] | None = None, + sudo: bool = False, + log_progress: bool = True, + username: str | None = None, + concurrency: int | None = None, + job_id: str | None = None, + ) -> DeltaSteppingStatsResult: + """ + Runs the Delta Stepping shortest path algorithm and returns statistics about the execution. + + Parameters + ---------- + G : GraphV2 + The graph to run the algorithm on. + source_node : int + The source node for the shortest path computation. + delta : float, default=2.0 + The bucket width for grouping nodes by tentative distance. + relationship_weight_property : str | None, default=None + The relationship property to use as weights. + relationship_types : list[str] | None, default=None + Filter on relationship types. + node_labels : list[str] | None, default=None + Filter on node labels. + sudo : bool, default=False + Run the algorithm with elevated privileges. + log_progress : bool, default=True + Whether to log progress. + username : str | None, default=None + Username for the operation. + concurrency : int | None, default=None + Concurrency configuration. + job_id : str | None, default=None + Job ID for the operation. + + Returns + ------- + DeltaSteppingStatsResult + Object containing statistics from the execution. + """ + @abstractmethod def mutate( self, diff --git a/graphdatascience/procedure_surface/arrow/pathfinding/single_source_delta_arrow_endpoints.py b/graphdatascience/procedure_surface/arrow/pathfinding/single_source_delta_arrow_endpoints.py index 56bb8d235..799e937b0 100644 --- a/graphdatascience/procedure_surface/arrow/pathfinding/single_source_delta_arrow_endpoints.py +++ b/graphdatascience/procedure_surface/arrow/pathfinding/single_source_delta_arrow_endpoints.py @@ -10,6 +10,7 @@ from graphdatascience.procedure_surface.api.estimation_result import EstimationResult from graphdatascience.procedure_surface.api.pathfinding.single_source_delta_endpoints import ( DeltaSteppingMutateResult, + DeltaSteppingStatsResult, DeltaSteppingWriteResult, SingleSourceDeltaEndpoints, ) @@ -61,6 +62,43 @@ def stream( return result + def stats( + self, + G: GraphV2, + source_node: int, + delta: float = 2.0, + relationship_weight_property: str | None = None, + relationship_types: list[str] | None = None, + node_labels: list[str] | None = None, + sudo: bool = False, + log_progress: bool = True, + username: str | None = None, + concurrency: int | None = None, + job_id: str | None = None, + ) -> DeltaSteppingStatsResult: + config = self._endpoints_helper.create_base_config( + G, + sourceNode=source_node, + delta=delta, + relationshipWeightProperty=relationship_weight_property, + relationshipTypes=relationship_types, + nodeLabels=node_labels, + sudo=sudo, + logProgress=log_progress, + username=username, + concurrency=concurrency, + jobId=job_id, + ) + + result = self._endpoints_helper.run_job_and_get_result( + "v2/pathfinding.singleSource.deltaStepping", + G, + config, + ["preProcessingMillis", "computeMillis", "postProcessingMillis", "configuration"], + ) + + return DeltaSteppingStatsResult(**result) + def mutate( self, G: GraphV2, diff --git a/graphdatascience/procedure_surface/cypher/pathfinding/single_source_delta_cypher_endpoints.py b/graphdatascience/procedure_surface/cypher/pathfinding/single_source_delta_cypher_endpoints.py index 6d82dcf70..d62bd66fe 100644 --- a/graphdatascience/procedure_surface/cypher/pathfinding/single_source_delta_cypher_endpoints.py +++ b/graphdatascience/procedure_surface/cypher/pathfinding/single_source_delta_cypher_endpoints.py @@ -9,6 +9,7 @@ from graphdatascience.procedure_surface.api.estimation_result import EstimationResult from graphdatascience.procedure_surface.api.pathfinding.single_source_delta_endpoints import ( DeltaSteppingMutateResult, + DeltaSteppingStatsResult, DeltaSteppingWriteResult, SingleSourceDeltaEndpoints, ) @@ -57,6 +58,41 @@ def stream( yields=["sourceNode", "targetNode", "totalCost", "nodeIds", "costs", "index"], # skip path column ) + def stats( + self, + G: GraphV2, + source_node: int, + delta: float = 2.0, + relationship_weight_property: str | None = None, + relationship_types: list[str] | None = None, + node_labels: list[str] | None = None, + sudo: bool = False, + log_progress: bool = True, + username: str | None = None, + concurrency: int | None = None, + job_id: str | None = None, + ) -> DeltaSteppingStatsResult: + config = ConfigConverter.convert_to_gds_config( + sourceNode=source_node, + delta=delta, + relationshipWeightProperty=relationship_weight_property, + relationshipTypes=relationship_types, + nodeLabels=node_labels, + sudo=sudo, + logProgress=log_progress, + username=username, + concurrency=concurrency, + jobId=job_id, + ) + params = CallParameters(graph_name=G.name(), config=config) + params.ensure_job_id_in_config() + + result = self._query_runner.call_procedure( + "gds.allShortestPaths.delta.stats", params=params, logging=log_progress + ).iloc[0] + + return DeltaSteppingStatsResult(**result.to_dict()) + def mutate( self, G: GraphV2, diff --git a/graphdatascience/session/session_v2_endpoints.py b/graphdatascience/session/session_v2_endpoints.py index a7ca076e6..785698ed7 100644 --- a/graphdatascience/session/session_v2_endpoints.py +++ b/graphdatascience/session/session_v2_endpoints.py @@ -136,7 +136,7 @@ def graph(self) -> CatalogArrowEndpoints: ## Algorithms @property - def all_shortest_path(self) -> AllShortestPathEndpoints: + def all_shortest_paths(self) -> AllShortestPathEndpoints: return AllShortestPathArrowEndpoints( self._arrow_client, self._write_back_client, show_progress=self._show_progress ) diff --git a/graphdatascience/tests/integrationV2/procedure_surface/arrow/pathfinding/test_single_source_delta_arrow_endpoints.py b/graphdatascience/tests/integrationV2/procedure_surface/arrow/pathfinding/test_single_source_delta_arrow_endpoints.py index f34d7971e..96c7a09ef 100644 --- a/graphdatascience/tests/integrationV2/procedure_surface/arrow/pathfinding/test_single_source_delta_arrow_endpoints.py +++ b/graphdatascience/tests/integrationV2/procedure_surface/arrow/pathfinding/test_single_source_delta_arrow_endpoints.py @@ -75,6 +75,20 @@ def test_delta_stepping_stream(delta_stepping_endpoints: DeltaSteppingArrowEndpo assert len(result_df) == 5 +def test_delta_stepping_stats(delta_stepping_endpoints: DeltaSteppingArrowEndpoints, sample_graph: GraphV2) -> None: + result = delta_stepping_endpoints.stats( + G=sample_graph, + source_node=0, + delta=3.0, + relationship_weight_property="cost", + ) + + assert result.pre_processing_millis >= 0 + assert result.compute_millis >= 0 + assert result.post_processing_millis >= 0 + assert "sourceNode" in result.configuration + + def test_delta_stepping_mutate(delta_stepping_endpoints: DeltaSteppingArrowEndpoints, sample_graph: GraphV2) -> None: result = delta_stepping_endpoints.mutate( G=sample_graph, diff --git a/graphdatascience/tests/integrationV2/procedure_surface/cypher/pathfinding/test_single_source_delta_cypher_endpoints.py b/graphdatascience/tests/integrationV2/procedure_surface/cypher/pathfinding/test_single_source_delta_cypher_endpoints.py index 5c7bf8864..fd1693892 100644 --- a/graphdatascience/tests/integrationV2/procedure_surface/cypher/pathfinding/test_single_source_delta_cypher_endpoints.py +++ b/graphdatascience/tests/integrationV2/procedure_surface/cypher/pathfinding/test_single_source_delta_cypher_endpoints.py @@ -64,6 +64,20 @@ def test_delta_stepping_stream(delta_stepping_endpoints: DeltaSteppingCypherEndp assert len(result_df) == 5 +def test_delta_stepping_stats(delta_stepping_endpoints: DeltaSteppingCypherEndpoints, sample_graph: GraphV2) -> None: + result = delta_stepping_endpoints.stats( + G=sample_graph, + source_node=find_node_by_name(delta_stepping_endpoints._query_runner, "A"), + delta=3.0, + relationship_weight_property="cost", + ) + + assert result.pre_processing_millis >= 0 + assert result.compute_millis >= 0 + assert result.post_processing_millis >= 0 + assert "sourceNode" in result.configuration + + def test_delta_stepping_mutate(delta_stepping_endpoints: DeltaSteppingCypherEndpoints, sample_graph: GraphV2) -> None: result = delta_stepping_endpoints.mutate( G=sample_graph, From 517779d8f017987f0f88184b4eacbc73700f25d4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Florentin=20D=C3=B6rre?= Date: Thu, 6 Nov 2025 15:21:42 +0100 Subject: [PATCH 2/6] Fetch gds-api-spec if not provided --- .../single_source_delta_arrow_endpoints.py | 7 +- .../procedure_surface/session/conftest.py | 49 ++++++++ .../procedure_surface/session/gds_api_spec.py | 79 ++++++++++++ .../session/test_session_api_spec_coverage.py | 116 ++++++++++++++++++ 4 files changed, 245 insertions(+), 6 deletions(-) create mode 100644 graphdatascience/tests/integrationV2/procedure_surface/session/gds_api_spec.py create mode 100644 graphdatascience/tests/integrationV2/procedure_surface/session/test_session_api_spec_coverage.py diff --git a/graphdatascience/procedure_surface/arrow/pathfinding/single_source_delta_arrow_endpoints.py b/graphdatascience/procedure_surface/arrow/pathfinding/single_source_delta_arrow_endpoints.py index 799e937b0..4980257fa 100644 --- a/graphdatascience/procedure_surface/arrow/pathfinding/single_source_delta_arrow_endpoints.py +++ b/graphdatascience/procedure_surface/arrow/pathfinding/single_source_delta_arrow_endpoints.py @@ -90,12 +90,7 @@ def stats( jobId=job_id, ) - result = self._endpoints_helper.run_job_and_get_result( - "v2/pathfinding.singleSource.deltaStepping", - G, - config, - ["preProcessingMillis", "computeMillis", "postProcessingMillis", "configuration"], - ) + result = self._endpoints_helper.run_job_and_get_summary("v2/pathfinding.singleSource.deltaStepping", config) return DeltaSteppingStatsResult(**result) diff --git a/graphdatascience/tests/integrationV2/procedure_surface/session/conftest.py b/graphdatascience/tests/integrationV2/procedure_surface/session/conftest.py index 9c2511647..2e49a74d1 100644 --- a/graphdatascience/tests/integrationV2/procedure_surface/session/conftest.py +++ b/graphdatascience/tests/integrationV2/procedure_surface/session/conftest.py @@ -1,3 +1,5 @@ +import os +import subprocess from pathlib import Path from typing import Generator @@ -14,6 +16,10 @@ start_database, start_session, ) +from graphdatascience.tests.integrationV2.procedure_surface.session.gds_api_spec import ( + EndpointSpec, + resolve_spec_from_file, +) @pytest.fixture(scope="package") @@ -36,3 +42,46 @@ def neo4j_connection(network: Network, logs_dir: Path) -> Generator[DbmsConnecti @pytest.fixture(scope="package") def db_query_runner(neo4j_connection: DbmsConnectionInfo) -> Generator[QueryRunner, None, None]: yield from create_db_query_runner(neo4j_connection) + + +@pytest.fixture(scope="session") +def gds_api_spec(tmp_path_factory: pytest.TempPathFactory) -> Generator[list[EndpointSpec], None, None]: + provided_spec_file = os.environ.get("GDS_API_SPEC_FILE") + + spec_file: Path | None = None + if provided_spec_file: + spec_file = Path(provided_spec_file) + + if spec_file and not spec_file.exists(): + raise FileNotFoundError(f"GDS_API_SPEC_FILE is set to '{spec_file}', but the file does not exist.") + + if not spec_file: + spec_dir = tmp_path_factory.mktemp("gds_api_spec") + spec_file = spec_dir / "gds-api-spec.json" + + # allow for caching + if not spec_file.exists(): + download_gds_api_spec(spec_file) + + # Adjust the path to pull from graph-analytics + yield resolve_spec_from_file(spec_file) + + +def download_gds_api_spec(destination: Path) -> None: + import requests + + url = "https://raw.githubusercontent.com/neo-technology/graph-analytics/refs/heads/master/tools/gds-api-spec/gds-api-spec.json" + gh_token = os.environ.get("GITHUB_TOKEN") + if not gh_token: + try: + result = subprocess.run(["gh", "auth", "token"], capture_output=True, text=True, check=True) + gh_token = result.stdout.strip() + except (subprocess.CalledProcessError, FileNotFoundError) as e: + raise ValueError("Failed to get GitHub token. Set GITHUB_TOKEN or authenticate with gh CLI.") from e + + headers = {"Authorization": f"Token {gh_token}"} + response = requests.get(url, headers=headers) + response.raise_for_status() + + with open(destination, "wb") as f: + f.write(response.content) diff --git a/graphdatascience/tests/integrationV2/procedure_surface/session/gds_api_spec.py b/graphdatascience/tests/integrationV2/procedure_surface/session/gds_api_spec.py new file mode 100644 index 000000000..15d0e8f3e --- /dev/null +++ b/graphdatascience/tests/integrationV2/procedure_surface/session/gds_api_spec.py @@ -0,0 +1,79 @@ +import json +from pathlib import Path +from typing import Any, List, Optional + +from pydantic import BaseModel, Field + + +class TypeInfo(BaseModel, extra="forbid", populate_by_name=True): + """Represents type information for a parameter or return field.""" + + typeName: str = Field(alias="typeName") + optional: bool = Field(alias="optional") + + +class Parameter(BaseModel, extra="forbid", populate_by_name=True): + """Represents a procedure parameter.""" + + name: str + type: TypeInfo + defaultValue: Optional[Any] = None + + +class ReturnField(BaseModel, extra="forbid", populate_by_name=True): + """Represents a return field from a procedure mode.""" + + name: str + type: TypeInfo + + +class Mode(BaseModel, extra="forbid", populate_by_name=True): + """Represents an execution mode (stream, stats, mutate, write) for a procedure.""" + + mode: str + parameters: List[Parameter] + returnFields: List[ReturnField] + + +class EndpointSpec(BaseModel, extra="forbid", populate_by_name=True): + """Represents a GDS procedure with its parameters and modes.""" + + name: str + parameters: List[Parameter] + modes: List[Mode] + + def parameters_for_mode(self, mode_name: str) -> List[Parameter]: + """Get the parameters for a specific mode.""" + result = self.parameters.copy() + for mode in self.modes: + if mode.mode == mode_name: + result.extend(mode.parameters) + return result + raise ValueError( + f"Mode '{mode_name}' not found in procedure '{self.name}'. Available modes: {[m.mode for m in self.modes]}." + ) + + def callable_modes(self) -> List[str]: + if not self.modes: + return [self.name] + return [f"{self.name}.{mode.mode}" for mode in self.modes] + + +def resolve_spec_from_file(file_path: Path) -> list[EndpointSpec]: + """ + Load and parse the gds-api-spec.json file. + + Args: + file_path: Path to the gds-api-spec.json file. + If None, uses the default location in the repository root. + + Returns: + GdsApiSpec: Parsed API specification containing all procedures. + """ + with open(file_path, "r") as f: + data = json.load(f) + + # The JSON file is a list of procedures at the root level + procedures = [EndpointSpec(**proc) for proc in data] + + return procedures diff --git a/graphdatascience/tests/integrationV2/procedure_surface/session/test_session_api_spec_coverage.py b/graphdatascience/tests/integrationV2/procedure_surface/session/test_session_api_spec_coverage.py new file mode 100644 index 000000000..fb7fbd899 --- /dev/null +++ b/graphdatascience/tests/integrationV2/procedure_surface/session/test_session_api_spec_coverage.py @@ -0,0 +1,116 @@ +import re +from typing import Any +from unittest import mock + +from graphdatascience.arrow_client.authenticated_flight_client import AuthenticatedArrowClient +from graphdatascience.session.session_v2_endpoints import SessionV2Endpoints +from graphdatascience.tests.integrationV2.procedure_surface.session.gds_api_spec import ( + EndpointSpec, +) + +MISSING_ENDPOINTS: set[str] = set( + "all_shortest_path.stream", +) + +# mapping of the snake-cased version of endpoint parts to the actual attribute names in SessionV2Endpoints +ENDPOINT_MAPPINGS = { + # centrality algos + "betweenness": "betweenness_centrality", + "celf": "influence_maximization_celf", + "closeness": "closeness_centrality", + "degree": "degree_centrality", + "eigenvector": "eigenvector_centrality", + "harmonic": "harmonic_centrality", + # community algos + "cliquecounting": "clique_counting", + "k1coloring": "k1_coloring", + "kcore": "k_core_decomposition", + "maxkcut": "max_k_cut", + # embedding algos + "fastrp": "fast_rp", + "graphSage": "graphsage", + "hashgnn": "hash_gnn", + # pathfinding algos + "kspanning_tree": "k_spanning_tree", + "prizesteiner_tree": "prize_steiner_tree", + "spanning_tree": "spanning_tree", + "steiner_tree": "steiner_tree", +} + + +def to_snake(camel: str) -> str: + # adjusted version of pydantic.alias_generators.to_snake (without digit handling) + + # Handle the sequence of uppercase letters followed by a lowercase letter + snake = re.sub(r"([A-Z]+)([A-Z][a-z])", lambda m: f"{m.group(1)}_{m.group(2)}", camel) + # Insert an underscore between a lowercase letter and an uppercase letter + snake = re.sub(r"([a-z])([A-Z])", lambda m: f"{m.group(1)}_{m.group(2)}", snake) + # Replace hyphens with underscores to handle kebab-case + snake = snake.replace("-", "_") + return snake.lower() + + +def pythonic_endpoint_name(endpoint: str) -> str: + endpoint = endpoint.removeprefix("gds.") # endpoints are called on a object called `gds` + endpoint_parts = endpoint.split(".") + endpoint_parts = [to_snake(part) for part in endpoint_parts] + endpoint_parts = [ENDPOINT_MAPPINGS.get(part, part) for part in endpoint_parts] + + return ".".join(endpoint_parts) + + +def resolve_callable_object(endpoints: SessionV2Endpoints, endpoint: str) -> Any | None: + """Check if an algorithm is available through gds.v2 interface""" + endpoint_parts = endpoint.split(".") + + callable_object = endpoints + for endpoint_part in endpoint_parts: + # Get the algorithm endpoint + if not hasattr(callable_object, endpoint_part): + return None + + callable_object = getattr(callable_object, endpoint_part) + + if not callable(callable_object): + raise ValueError(f"Resolved object {callable_object} for endpoint {endpoint} is not callable") + + return callable_object + + +def test_api_spec_coverage(gds_api_spec: list[EndpointSpec]) -> None: + endpoints = SessionV2Endpoints(mock.Mock(speck=AuthenticatedArrowClient), db_client=None, show_progress=False) + + missing_endpoints: set[str] = set() + available_endpoints: set[str] = set() + + for endpoint_spec in gds_api_spec: + if "alpha." in endpoint_spec.name or "beta." in endpoint_spec.name: + # skip alpha/beta endpoints + continue + + endpoint_names = [pythonic_endpoint_name(e) for e in endpoint_spec.callable_modes()] + + for endpoint_name in endpoint_names: + callable_object = resolve_callable_object( + endpoints, + endpoint_name, + ) + if not callable_object: + missing_endpoints.add(endpoint_name) + else: + # TODO verify against gds-api spec + # returnFields = callable_object.__ + available_endpoints.add(endpoint_name) + + # Print summary + print("\nGDS API Spec Coverage Summary:") + print(f"Total endpoint specs found: {len(available_endpoints) + len(missing_endpoints)}") + print(f"Available through gds.v2: {len(available_endpoints)}") + + # check if any previously missing algos are now available + newly_available_endpoints = available_endpoints.intersection(MISSING_ENDPOINTS) + assert not newly_available_endpoints, "Endpoints now available, please remove from MISSING_ENDPOINTS" + + # check missing endpoints against known missing algos + missing_endpoints = missing_endpoints.difference(MISSING_ENDPOINTS) + assert not missing_endpoints, f"Unexpectedly missing endpoints {len(missing_endpoints)}" From 768f0a6fe1f533c04d96bc469c28f58de11e5ce7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Florentin=20D=C3=B6rre?= Date: Thu, 6 Nov 2025 15:38:58 +0100 Subject: [PATCH 3/6] Add stats endpoint to BellmanFord --- .../all_shortest_path_endpoints.py | 9 --- .../single_source_bellman_ford_endpoints.py | 56 +++++++++++++++++++ .../all_shortest_path_arrow_endpoints.py | 7 --- ...gle_source_bellman_ford_arrow_endpoints.py | 31 ++++++++++ ...le_source_bellman_ford_cypher_endpoints.py | 32 +++++++++++ .../session/session_v2_endpoints.py | 10 ++++ ...gle_source_bellman_ford_arrow_endpoints.py | 14 +++++ ...le_source_bellman_ford_cypher_endpoints.py | 14 +++++ 8 files changed, 157 insertions(+), 16 deletions(-) diff --git a/graphdatascience/procedure_surface/api/pathfinding/all_shortest_path_endpoints.py b/graphdatascience/procedure_surface/api/pathfinding/all_shortest_path_endpoints.py index 55ff5c8ff..485bcce80 100644 --- a/graphdatascience/procedure_surface/api/pathfinding/all_shortest_path_endpoints.py +++ b/graphdatascience/procedure_surface/api/pathfinding/all_shortest_path_endpoints.py @@ -2,9 +2,6 @@ from abc import ABC, abstractmethod -from graphdatascience.procedure_surface.api.pathfinding.single_source_bellman_ford_endpoints import ( - SingleSourceBellmanFordEndpoints, -) from graphdatascience.procedure_surface.api.pathfinding.single_source_delta_endpoints import SingleSourceDeltaEndpoints from graphdatascience.procedure_surface.api.pathfinding.single_source_dijkstra_endpoints import ( SingleSourceDijkstraEndpoints, @@ -28,9 +25,3 @@ def delta(self) -> SingleSourceDeltaEndpoints: def dijkstra(self) -> SingleSourceDijkstraEndpoints: """Access to Dijkstra shortest path algorithm endpoints.""" ... - - @property - @abstractmethod - def bellman_ford(self) -> SingleSourceBellmanFordEndpoints: - """Access to Bellman-Ford shortest path algorithm endpoints.""" - ... diff --git a/graphdatascience/procedure_surface/api/pathfinding/single_source_bellman_ford_endpoints.py b/graphdatascience/procedure_surface/api/pathfinding/single_source_bellman_ford_endpoints.py index 51c674331..fc2f95625 100644 --- a/graphdatascience/procedure_surface/api/pathfinding/single_source_bellman_ford_endpoints.py +++ b/graphdatascience/procedure_surface/api/pathfinding/single_source_bellman_ford_endpoints.py @@ -10,6 +10,14 @@ from graphdatascience.procedure_surface.api.estimation_result import EstimationResult +class BellmanFordStatsResult(BaseResult): + pre_processing_millis: int + compute_millis: int + post_processing_millis: int + contains_negative_cycle: bool + configuration: dict[str, Any] + + class BellmanFordWriteResult(BaseResult): pre_processing_millis: int compute_millis: int @@ -80,6 +88,54 @@ def stream( totalCost, nodeIds, costs, index, and isNegativeCycle. """ + @abstractmethod + def stats( + self, + G: GraphV2, + source_node: int, + relationship_weight_property: str | None = None, + relationship_types: list[str] | None = None, + node_labels: list[str] | None = None, + sudo: bool = False, + log_progress: bool = True, + username: str | None = None, + concurrency: int | None = None, + job_id: str | None = None, + ) -> BellmanFordStatsResult: + """ + Runs the Bellman-Ford shortest path algorithm and returns statistics about the execution. + + The Bellman-Ford algorithm can detect negative cycles in the graph. + + Parameters + ---------- + G : GraphV2 + The graph to run the algorithm on. + source_node : int + The source node for the shortest path computation. + relationship_weight_property : str | None, default=None + The relationship property to use as weights. + relationship_types : list[str] | None, default=None + Filter on relationship types. + node_labels : list[str] | None, default=None + Filter on node labels. + sudo : bool, default=False + Run the algorithm with elevated privileges. + log_progress : bool, default=True + Whether to log progress. + username : str | None, default=None + Username for the operation. + concurrency : int | None, default=None + Concurrency configuration. + job_id : str | None, default=None + Job ID for the operation. + + Returns + ------- + BellmanFordStatsResult + Object containing statistics from the execution, including whether negative cycles were detected. + """ + @abstractmethod def mutate( self, diff --git a/graphdatascience/procedure_surface/arrow/pathfinding/all_shortest_path_arrow_endpoints.py b/graphdatascience/procedure_surface/arrow/pathfinding/all_shortest_path_arrow_endpoints.py index 673c691f9..9ec3125c9 100644 --- a/graphdatascience/procedure_surface/arrow/pathfinding/all_shortest_path_arrow_endpoints.py +++ b/graphdatascience/procedure_surface/arrow/pathfinding/all_shortest_path_arrow_endpoints.py @@ -3,9 +3,6 @@ from graphdatascience.arrow_client.authenticated_flight_client import AuthenticatedArrowClient from graphdatascience.arrow_client.v2.remote_write_back_client import RemoteWriteBackClient from graphdatascience.procedure_surface.api.pathfinding.all_shortest_path_endpoints import AllShortestPathEndpoints -from graphdatascience.procedure_surface.api.pathfinding.single_source_bellman_ford_endpoints import ( - SingleSourceBellmanFordEndpoints, -) from graphdatascience.procedure_surface.api.pathfinding.single_source_delta_endpoints import SingleSourceDeltaEndpoints from graphdatascience.procedure_surface.api.pathfinding.single_source_dijkstra_endpoints import ( SingleSourceDijkstraEndpoints, @@ -39,7 +36,3 @@ def delta(self) -> SingleSourceDeltaEndpoints: @property def dijkstra(self) -> SingleSourceDijkstraEndpoints: return self._dijkstra - - @property - def bellman_ford(self) -> SingleSourceBellmanFordEndpoints: - return self._bellman_ford diff --git a/graphdatascience/procedure_surface/arrow/pathfinding/single_source_bellman_ford_arrow_endpoints.py b/graphdatascience/procedure_surface/arrow/pathfinding/single_source_bellman_ford_arrow_endpoints.py index d5437458b..5277e4f5f 100644 --- a/graphdatascience/procedure_surface/arrow/pathfinding/single_source_bellman_ford_arrow_endpoints.py +++ b/graphdatascience/procedure_surface/arrow/pathfinding/single_source_bellman_ford_arrow_endpoints.py @@ -10,6 +10,7 @@ from graphdatascience.procedure_surface.api.estimation_result import EstimationResult from graphdatascience.procedure_surface.api.pathfinding.single_source_bellman_ford_endpoints import ( BellmanFordMutateResult, + BellmanFordStatsResult, BellmanFordWriteResult, SingleSourceBellmanFordEndpoints, ) @@ -61,6 +62,36 @@ def stream( return result + def stats( + self, + G: GraphV2, + source_node: int, + relationship_weight_property: str | None = None, + relationship_types: list[str] | None = None, + node_labels: list[str] | None = None, + sudo: bool = False, + log_progress: bool = True, + username: str | None = None, + concurrency: int | None = None, + job_id: str | None = None, + ) -> BellmanFordStatsResult: + config = self._endpoints_helper.create_base_config( + G, + sourceNode=source_node, + relationshipWeightProperty=relationship_weight_property, + relationshipTypes=relationship_types, + nodeLabels=node_labels, + sudo=sudo, + logProgress=log_progress, + username=username, + concurrency=concurrency, + jobId=job_id, + ) + + result = self._endpoints_helper.run_job_and_get_summary("v2/pathfinding.singleSource.bellmanFord", config) + + return BellmanFordStatsResult(**result) + def mutate( self, G: GraphV2, diff --git a/graphdatascience/procedure_surface/cypher/pathfinding/single_source_bellman_ford_cypher_endpoints.py b/graphdatascience/procedure_surface/cypher/pathfinding/single_source_bellman_ford_cypher_endpoints.py index 7101d69d6..1730b0d93 100644 --- a/graphdatascience/procedure_surface/cypher/pathfinding/single_source_bellman_ford_cypher_endpoints.py +++ b/graphdatascience/procedure_surface/cypher/pathfinding/single_source_bellman_ford_cypher_endpoints.py @@ -9,6 +9,7 @@ from graphdatascience.procedure_surface.api.estimation_result import EstimationResult from graphdatascience.procedure_surface.api.pathfinding.single_source_bellman_ford_endpoints import ( BellmanFordMutateResult, + BellmanFordStatsResult, BellmanFordWriteResult, SingleSourceBellmanFordEndpoints, ) @@ -63,6 +64,37 @@ def stream( ], # skip reoute column ) + def stats( + self, + G: GraphV2, + source_node: int, + relationship_weight_property: str | None = None, + relationship_types: list[str] | None = None, + node_labels: list[str] | None = None, + sudo: bool = False, + log_progress: bool = True, + username: str | None = None, + concurrency: int | None = None, + job_id: str | None = None, + ) -> BellmanFordStatsResult: + config = ConfigConverter.convert_to_gds_config( + sourceNode=source_node, + relationshipWeightProperty=relationship_weight_property, + relationshipTypes=relationship_types, + nodeLabels=node_labels, + sudo=sudo, + logProgress=log_progress, + username=username, + concurrency=concurrency, + jobId=job_id, + ) + params = CallParameters(graph_name=G.name(), config=config) + params.ensure_job_id_in_config() + + result = self._query_runner.call_procedure("gds.bellmanFord.stats", params=params, logging=log_progress).iloc[0] + + return BellmanFordStatsResult(**result.to_dict()) + def mutate( self, G: GraphV2, diff --git a/graphdatascience/session/session_v2_endpoints.py b/graphdatascience/session/session_v2_endpoints.py index 785698ed7..7ea0813be 100644 --- a/graphdatascience/session/session_v2_endpoints.py +++ b/graphdatascience/session/session_v2_endpoints.py @@ -36,6 +36,9 @@ from graphdatascience.procedure_surface.api.pathfinding.k_spanning_tree_endpoints import KSpanningTreeEndpoints from graphdatascience.procedure_surface.api.pathfinding.prize_steiner_tree_endpoints import PrizeSteinerTreeEndpoints from graphdatascience.procedure_surface.api.pathfinding.shortest_path_endpoints import ShortestPathEndpoints +from graphdatascience.procedure_surface.api.pathfinding.single_source_bellman_ford_endpoints import ( + SingleSourceBellmanFordEndpoints, +) from graphdatascience.procedure_surface.api.pathfinding.spanning_tree_endpoints import SpanningTreeEndpoints from graphdatascience.procedure_surface.api.pathfinding.steiner_tree_endpoints import SteinerTreeEndpoints from graphdatascience.procedure_surface.api.similarity.knn_endpoints import KnnEndpoints @@ -100,6 +103,9 @@ from graphdatascience.procedure_surface.arrow.pathfinding.shortest_path_arrow_endpoints import ( ShortestPathArrowEndpoints, ) +from graphdatascience.procedure_surface.arrow.pathfinding.single_source_bellman_ford_arrow_endpoints import ( + BellmanFordArrowEndpoints, +) from graphdatascience.procedure_surface.arrow.pathfinding.spanning_tree_arrow_endpoints import ( SpanningTreeArrowEndpoints, ) @@ -155,6 +161,10 @@ def articulation_points(self) -> ArticulationPointsEndpoints: def betweenness_centrality(self) -> BetweennessEndpoints: return BetweennessArrowEndpoints(self._arrow_client, self._write_back_client, show_progress=self._show_progress) + @property + def bellman_ford(self) -> SingleSourceBellmanFordEndpoints: + return BellmanFordArrowEndpoints(self._arrow_client, self._write_back_client, show_progress=self._show_progress) + @property def clique_counting(self) -> CliqueCountingEndpoints: return CliqueCountingArrowEndpoints( diff --git a/graphdatascience/tests/integrationV2/procedure_surface/arrow/pathfinding/test_single_source_bellman_ford_arrow_endpoints.py b/graphdatascience/tests/integrationV2/procedure_surface/arrow/pathfinding/test_single_source_bellman_ford_arrow_endpoints.py index 24c6c1eb6..1b5dc21ae 100644 --- a/graphdatascience/tests/integrationV2/procedure_surface/arrow/pathfinding/test_single_source_bellman_ford_arrow_endpoints.py +++ b/graphdatascience/tests/integrationV2/procedure_surface/arrow/pathfinding/test_single_source_bellman_ford_arrow_endpoints.py @@ -84,6 +84,20 @@ def test_bellman_ford_stream(bellman_ford_endpoints: BellmanFordArrowEndpoints, assert len(result_df) == 5 +def test_bellman_ford_stats(bellman_ford_endpoints: BellmanFordArrowEndpoints, sample_graph: GraphV2) -> None: + result = bellman_ford_endpoints.stats( + G=sample_graph, + source_node=0, + relationship_weight_property="cost", + ) + + assert result.pre_processing_millis >= 0 + assert result.compute_millis >= 0 + assert result.post_processing_millis >= 0 + assert result.contains_negative_cycle is False + assert "sourceNode" in result.configuration + + def test_bellman_ford_mutate(bellman_ford_endpoints: BellmanFordArrowEndpoints, sample_graph: GraphV2) -> None: result = bellman_ford_endpoints.mutate( G=sample_graph, diff --git a/graphdatascience/tests/integrationV2/procedure_surface/cypher/pathfinding/test_single_source_bellman_ford_cypher_endpoints.py b/graphdatascience/tests/integrationV2/procedure_surface/cypher/pathfinding/test_single_source_bellman_ford_cypher_endpoints.py index 9984fffc2..deb715406 100644 --- a/graphdatascience/tests/integrationV2/procedure_surface/cypher/pathfinding/test_single_source_bellman_ford_cypher_endpoints.py +++ b/graphdatascience/tests/integrationV2/procedure_surface/cypher/pathfinding/test_single_source_bellman_ford_cypher_endpoints.py @@ -71,6 +71,20 @@ def test_bellman_ford_stream(bellman_ford_endpoints: BellmanFordCypherEndpoints, assert len(result_df) > 0 +def test_bellman_ford_stats(bellman_ford_endpoints: BellmanFordCypherEndpoints, sample_graph: GraphV2) -> None: + result = bellman_ford_endpoints.stats( + G=sample_graph, + source_node=find_node_by_name(bellman_ford_endpoints._query_runner, "A"), + relationship_weight_property="cost", + ) + + assert result.pre_processing_millis >= 0 + assert result.compute_millis >= 0 + assert result.post_processing_millis >= 0 + assert result.contains_negative_cycle is False + assert "sourceNode" in result.configuration + + def test_bellman_ford_mutate(bellman_ford_endpoints: BellmanFordCypherEndpoints, sample_graph: GraphV2) -> None: result = bellman_ford_endpoints.mutate( G=sample_graph, From e14676045bbaf638daef6c8ea75735a47f52a239 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Florentin=20D=C3=B6rre?= Date: Thu, 6 Nov 2025 16:36:20 +0100 Subject: [PATCH 4/6] Add missing endpoints from gds-api-spec --- .../session/test_session_api_spec_coverage.py | 99 +++++++++++++------ 1 file changed, 68 insertions(+), 31 deletions(-) diff --git a/graphdatascience/tests/integrationV2/procedure_surface/session/test_session_api_spec_coverage.py b/graphdatascience/tests/integrationV2/procedure_surface/session/test_session_api_spec_coverage.py index fb7fbd899..c6c5a0ec7 100644 --- a/graphdatascience/tests/integrationV2/procedure_surface/session/test_session_api_spec_coverage.py +++ b/graphdatascience/tests/integrationV2/procedure_surface/session/test_session_api_spec_coverage.py @@ -1,4 +1,5 @@ import re +from collections import OrderedDict from typing import Any from unittest import mock @@ -8,36 +9,69 @@ EndpointSpec, ) -MISSING_ENDPOINTS: set[str] = set( - "all_shortest_path.stream", -) - -# mapping of the snake-cased version of endpoint parts to the actual attribute names in SessionV2Endpoints -ENDPOINT_MAPPINGS = { - # centrality algos - "betweenness": "betweenness_centrality", - "celf": "influence_maximization_celf", - "closeness": "closeness_centrality", - "degree": "degree_centrality", - "eigenvector": "eigenvector_centrality", - "harmonic": "harmonic_centrality", - # community algos - "cliquecounting": "clique_counting", - "k1coloring": "k1_coloring", - "kcore": "k_core_decomposition", - "maxkcut": "max_k_cut", - # embedding algos - "fastrp": "fast_rp", - "graphSage": "graphsage", - "hashgnn": "hash_gnn", - # pathfinding algos - "kspanning_tree": "k_spanning_tree", - "prizesteiner_tree": "prize_steiner_tree", - "spanning_tree": "spanning_tree", - "steiner_tree": "steiner_tree", +MISSING_ENDPOINTS: set[str] = { + "all_shortest_paths.stream", + "bfs.stream", + "bfs.mutate", + "bfs.stats", + "bfs.write", + "bridges.stream", + "conductance.stream", + "dag.longest_path.stream", + "dag.topological_sort.stream", + "dfs.mutate", + "dfs.stream", + "hits.mutate", + "hits.stream", + "hits.stats", + "hits.write", + "max_flow.stream", + "max_flow.write", + "max_flow.stats", + "max_flow.mutate", + "modularity.stats", + "modularity.stream", + "random_walk.stats", + "random_walk.stream", + "random_walk.mutate", + "ml.kge.predict.mutate", + "ml.kge.predict.stream", + "ml.kge.predict.write", + "scale_properties.mutate", + "scale_properties.stats", + "scale_properties.stream", + "scale_properties.write", } +ENDPOINT_MAPPINGS = OrderedDict( + [ + # centrality algos + ("closeness.harmonic", "harmonic_centrality"), + ("closeness", "closeness_centrality"), + ("betweenness", "betweenness_centrality"), + ("degree", "degree_centrality"), + ("eigenvector", "eigenvector_centrality"), + ("influenceMaximization.celf", "influence_maximization_celf"), + # community algos + ("cliquecounting", "clique_counting"), + ("k1coloring", "k1_coloring"), + ("kcore", "k_core_decomposition"), + ("maxkcut", "max_k_cut"), + # embedding algos + ("fastrp", "fast_rp"), + ("graphSage", "graphsage"), + ("hashgnn", "hash_gnn"), + # pathfinding algos + ("astar", "a_star"), + ("kspanning_tree", "k_spanning_tree"), + ("prizesteiner_tree", "prize_steiner_tree"), + ("spanning_tree", "spanning_tree"), + ("steiner_tree", "steiner_tree"), + ] +) + + def to_snake(camel: str) -> str: # adjusted version of pydantic.alias_generators.to_snake (without digit handling) @@ -52,9 +86,13 @@ def to_snake(camel: str) -> str: def pythonic_endpoint_name(endpoint: str) -> str: endpoint = endpoint.removeprefix("gds.") # endpoints are called on a object called `gds` + + for old, new in ENDPOINT_MAPPINGS.items(): + if old in endpoint: + endpoint = endpoint.replace(old, new) + endpoint_parts = endpoint.split(".") endpoint_parts = [to_snake(part) for part in endpoint_parts] - endpoint_parts = [ENDPOINT_MAPPINGS.get(part, part) for part in endpoint_parts] return ".".join(endpoint_parts) @@ -95,9 +133,9 @@ def test_api_spec_coverage(gds_api_spec: list[EndpointSpec]) -> None: endpoints, endpoint_name, ) - if not callable_object: + if not callable_object and endpoint_name not in MISSING_ENDPOINTS: missing_endpoints.add(endpoint_name) - else: + elif callable_object: # TODO verify against gds-api spec # returnFields = callable_object.__ available_endpoints.add(endpoint_name) @@ -112,5 +150,4 @@ def test_api_spec_coverage(gds_api_spec: list[EndpointSpec]) -> None: assert not newly_available_endpoints, "Endpoints now available, please remove from MISSING_ENDPOINTS" # check missing endpoints against known missing algos - missing_endpoints = missing_endpoints.difference(MISSING_ENDPOINTS) assert not missing_endpoints, f"Unexpectedly missing endpoints {len(missing_endpoints)}" From 6f33699cdc00e43ba4d83f667ff8508cc04775cb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Florentin=20D=C3=B6rre?= Date: Thu, 6 Nov 2025 16:51:40 +0100 Subject: [PATCH 5/6] Adjust endpoint mappings for arrow endpoints due to previous changes to align with gds-api-spec --- ...e.py => test_session_arrow_endpoint_coverage.py} | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) rename graphdatascience/tests/integrationV2/procedure_surface/session/{test_session_endpoint_coverage.py => test_session_arrow_endpoint_coverage.py} (93%) diff --git a/graphdatascience/tests/integrationV2/procedure_surface/session/test_session_endpoint_coverage.py b/graphdatascience/tests/integrationV2/procedure_surface/session/test_session_arrow_endpoint_coverage.py similarity index 93% rename from graphdatascience/tests/integrationV2/procedure_surface/session/test_session_endpoint_coverage.py rename to graphdatascience/tests/integrationV2/procedure_surface/session/test_session_arrow_endpoint_coverage.py index 34a874f7f..6b7223db0 100644 --- a/graphdatascience/tests/integrationV2/procedure_surface/session/test_session_endpoint_coverage.py +++ b/graphdatascience/tests/integrationV2/procedure_surface/session/test_session_arrow_endpoint_coverage.py @@ -22,12 +22,12 @@ "maxkcut": "max_k_cut", # embedding algos "fastrp": "fast_rp", - "graphSage": "graphsage", "hashgnn": "hash_gnn", # pathfinding algos - "source_target": "shortest_path", - "single_source": "all_shortest_path", - "delta_stepping": "delta", + "sourceTarget": "shortest_path", + "singleSource.bellmanFord": "bellman_ford", + "singleSource": "all_shortest_paths", + "deltaStepping": "delta", "kspanning_tree": "k_spanning_tree", "prizesteiner_tree": "prize_steiner_tree", "spanning_tree": "spanning_tree", @@ -55,9 +55,12 @@ def to_snake(camel: str) -> str: def check_gds_v2_availability(endpoints: SessionV2Endpoints, algo: str) -> bool: """Check if an algorithm is available through gds.v2 interface""" + for old, new in ENDPOINT_MAPPINGS.items(): + if old in algo: + algo = algo.replace(old, new) + algo_parts = algo.split(".") algo_parts = [to_snake(part) for part in algo_parts] - algo_parts = [ENDPOINT_MAPPINGS.get(part, part) for part in algo_parts] callable_object = endpoints for algo_part in algo_parts: From 17acc8761be87e681ab7835cecec6804f7811bc3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Florentin=20D=C3=B6rre?= Date: Thu, 6 Nov 2025 17:04:35 +0100 Subject: [PATCH 6/6] Allow forwarding spec_file and gh token through tox target --- tox.ini | 3 +++ 1 file changed, 3 insertions(+) diff --git a/tox.ini b/tox.ini index bbfa1c22f..c8b02661a 100644 --- a/tox.ini +++ b/tox.ini @@ -119,5 +119,8 @@ passenv = NEO4J_DATABASE_IMAGE BUILD_NUMBER GDS_LICENSE_KEY + + GDS_API_SPEC_FILE + GITHUB_TOKEN commands = pytest graphdatascience/tests/integrationV2 --include-integration-v2