From 190660afbb80ba97b7e92619e2ddd78cbd35b23c Mon Sep 17 00:00:00 2001 From: jason Date: Wed, 5 Nov 2025 20:33:07 +0800 Subject: [PATCH 01/10] scheduler change after node leave --- src/parallax/p2p/server.py | 6 ++- src/scheduling/layer_allocation.py | 75 ++++++++++++++++++++---------- 2 files changed, 55 insertions(+), 26 deletions(-) diff --git a/src/parallax/p2p/server.py b/src/parallax/p2p/server.py index b7be6173..38ee533d 100644 --- a/src/parallax/p2p/server.py +++ b/src/parallax/p2p/server.py @@ -569,6 +569,7 @@ def _announcer_thread(): try: while not self.stop_event.is_set(): # Announce the range ID + should_sleep = True try: if self.scheduler_peer_id is not None: response_future = self.scheduler_stub.node_update( @@ -615,6 +616,8 @@ def _announcer_thread(): "Layer allocation updated. Executor will reload on next check. " "Status set to INITIALIZING to prevent new requests." ) + # Skip sleep to immediately send next heartbeat with new status + should_sleep = False else: logger.warning(f"Heartbeat response: {response}") else: @@ -636,7 +639,8 @@ def _announcer_thread(): f"Failed to announce {self.prefix_id}_{self.lattica.peer_id()}: {e}" ) - time.sleep(10) + if should_sleep: + time.sleep(10) except Exception as e: logger.exception(f"Module announcer thread error: {e}") diff --git a/src/scheduling/layer_allocation.py b/src/scheduling/layer_allocation.py index 91a8e127..d4095cf4 100644 --- a/src/scheduling/layer_allocation.py +++ b/src/scheduling/layer_allocation.py @@ -529,39 +529,64 @@ def _adjust_end_layer_for_tail(self, node: Node, proposed_start_layer: int) -> i return end_layer - def has_full_pipeline(self) -> bool: - """Return True if there exists at least one pipeline covering [0, num_total_layers). + def _check_pipeline_exists(self, active_only: bool = False) -> bool: + """Check if there exists at least one pipeline covering [0, num_total_layers). - Checks whether we can chain contiguous node allocations starting at 0 to reach L. + Args: + active_only: If True, only consider active nodes. + + Returns: + True if a complete pipeline exists, False otherwise. """ total_layers = self.num_total_layers - layer_count: Dict[int, int] = {} - for _, (s, e) in self.node_allocation.items(): + + # Build index of nodes by start_layer + start_to_nodes: Dict[int, List[Node]] = {} + for node_id, (s, e) in self.node_allocation.items(): if s is None or e is None: continue - for layer in range(s, e): - layer_count[layer] = layer_count.get(layer, 0) + 1 + node = self.node_id_to_node.get(node_id) + if node is None or (active_only and not node.is_active): + continue + start_to_nodes.setdefault(s, []).append(node) - for layer in range(total_layers): - if layer not in layer_count or layer_count[layer] == 0: - return False - return True + # Must have at least one node starting at layer 0 + if not start_to_nodes.get(0): + return False + + # DFS to check if we can reach total_layers from any head node + def can_reach_target(current_end: int) -> bool: + if current_end >= total_layers: + return current_end == total_layers + + for nxt in start_to_nodes.get(current_end, []): + if nxt.end_layer and nxt.end_layer > current_end: + if can_reach_target(nxt.end_layer): + return True + return False + + return any( + head.end_layer and can_reach_target(head.end_layer) + for head in start_to_nodes.get(0, []) + ) + + def has_full_pipeline(self) -> bool: + """Return True if there exists at least one pipeline covering [0, num_total_layers). + + Checks whether we can chain contiguous node allocations starting at 0 to reach L. + This requires that there exists at least one node starting at layer 0 and a chain + of contiguous node ranges that reaches num_total_layers. + """ + return self._check_pipeline_exists(active_only=False) def has_full_active_pipeline(self) -> bool: - """Return True if there exists at least one active pipeline covering [0, num_total_layers).""" - total_layers = self.num_total_layers - layer_count: Dict[int, int] = {} - for node_id, (s, e) in self.node_allocation.items(): - if self.node_id_to_node[node_id].is_active is False: - continue - if s is None or e is None: - continue - for layer in range(s, e): - layer_count[layer] = layer_count.get(layer, 0) + 1 - for layer in range(total_layers): - if layer not in layer_count or layer_count[layer] == 0: - return False - return True + """Return True if there exists at least one active pipeline covering [0, num_total_layers). + + Checks whether we can chain contiguous active node allocations starting at 0 to reach L. + This requires that there exists at least one active node starting at layer 0 and a chain + of contiguous node ranges that reaches num_total_layers. + """ + return self._check_pipeline_exists(active_only=True) def layer_replication_stats(self) -> Tuple[int, int, float]: """Return (min, max, avg) number of nodes hosting each layer. From 2dd38db47e55258299d184810890cad86b31e6ee Mon Sep 17 00:00:00 2001 From: jason Date: Thu, 6 Nov 2025 14:16:14 +0800 Subject: [PATCH 02/10] fix : bootstrapped true after node leave --- src/scheduling/layer_allocation.py | 1 + src/scheduling/model_info.py | 12 ++++++------ src/scheduling/node.py | 10 +++++----- src/scheduling/scheduler.py | 8 +++++++- 4 files changed, 19 insertions(+), 12 deletions(-) diff --git a/src/scheduling/layer_allocation.py b/src/scheduling/layer_allocation.py index d4095cf4..242c7a6d 100644 --- a/src/scheduling/layer_allocation.py +++ b/src/scheduling/layer_allocation.py @@ -216,6 +216,7 @@ def join(self, node: Node) -> None: logger.debug("Joining node dynamically: %s", node.node_id) self.declare(node) lightest_layer = self.get_lightest_layer() + logger.debug("Lightest layer: %s", lightest_layer) if lightest_layer is None: raise ValueError("No layers to assign") diff --git a/src/scheduling/model_info.py b/src/scheduling/model_info.py index b79d9e45..9bc7f74a 100644 --- a/src/scheduling/model_info.py +++ b/src/scheduling/model_info.py @@ -184,12 +184,12 @@ def decoder_layer_io_bytes( ffn_params *= self.num_local_experts kv_cache_size = 0 - logger.debug( - "Model Info ffn_params=%d, kv_cache_size=%d, attention_params=%d", - ffn_params, - kv_cache_size, - attention_params, - ) + # logger.debug( + # "Model Info ffn_params=%d, kv_cache_size=%d, attention_params=%d", + # ffn_params, + # kv_cache_size, + # attention_params, + # ) return round(ffn_params + kv_cache_size + attention_params) def lm_head_flops(self, target_seq_len: int = 1) -> int: diff --git a/src/scheduling/node.py b/src/scheduling/node.py index 33080349..68f56418 100644 --- a/src/scheduling/node.py +++ b/src/scheduling/node.py @@ -280,11 +280,11 @@ def get_decoder_layer_capacity( if not (include_input_embed and self.model_info.tie_embedding): available_memory_bytes -= self.model_info.embedding_io_bytes - logger.debug( - "Node available_memory_bytes=%d, decoder_layer_io_bytes=%d", - available_memory_bytes, - self.model_info.decoder_layer_io_bytes(roofline=False), - ) + # logger.debug( + # "Node available_memory_bytes=%d, decoder_layer_io_bytes=%d", + # available_memory_bytes, + # self.model_info.decoder_layer_io_bytes(roofline=False), + # ) if self.hardware.device == "mlx": # For mlx, consider mlx bit factor return floor( diff --git a/src/scheduling/scheduler.py b/src/scheduling/scheduler.py index af43c55e..c9da673f 100644 --- a/src/scheduling/scheduler.py +++ b/src/scheduling/scheduler.py @@ -309,7 +309,13 @@ def leave(self, node_id: str) -> None: for n in self.nodes: if n.start_layer is not None and n.end_layer is not None: self.layer_allocator.deallocate(n) - self.layer_allocator.global_allocation() + success = self.layer_allocator.global_allocation() + if not success: + logger.warning("Global rebalance failed to produce a full pipeline") + else: + logger.debug("Global rebalance completed successfully") + self._bootstrapped = True + self._bootstrapped_event.set() with self._node_count_cv: self._node_count_cv.notify_all() From 639be4d30bb7c314f5f41ad80d10616951dde1d1 Mon Sep 17 00:00:00 2001 From: jason Date: Thu, 6 Nov 2025 15:02:52 +0800 Subject: [PATCH 03/10] merge main --- src/scheduling/scheduler.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/src/scheduling/scheduler.py b/src/scheduling/scheduler.py index 35faea5b..8d9ab237 100644 --- a/src/scheduling/scheduler.py +++ b/src/scheduling/scheduler.py @@ -322,7 +322,13 @@ def leave(self, node_id: str) -> None: for n in self.nodes: if n.start_layer is not None and n.end_layer is not None: self.layer_allocator.deallocate(n) - self.layer_allocator.global_allocation() + success = self.layer_allocator.global_allocation() + if not success: + logger.warning("Global rebalance failed to produce a full pipeline") + else: + logger.debug("Global rebalance completed successfully") + self._bootstrapped = True + self._bootstrapped_event.set() with self._node_count_cv: self._node_count_cv.notify_all() From 6802db78ac77ce2666640295ac5f39252a0a95a9 Mon Sep 17 00:00:00 2001 From: jason Date: Thu, 6 Nov 2025 20:40:34 +0800 Subject: [PATCH 04/10] no global_allocation when left nodes can cover --- src/scheduling/node.py | 2 +- src/scheduling/scheduler.py | 87 +++++++++++++++++++++++++++++++------ 2 files changed, 75 insertions(+), 14 deletions(-) diff --git a/src/scheduling/node.py b/src/scheduling/node.py index 68f56418..fd4e6f98 100644 --- a/src/scheduling/node.py +++ b/src/scheduling/node.py @@ -185,7 +185,7 @@ class Node: current_requests: int = 0 # todo upload is_active - is_active: bool = True + is_active: bool = False last_heartbeat: float = 0.0 # Will be updated by node broadcasting # otherwise, use roofline performance model to estimate diff --git a/src/scheduling/scheduler.py b/src/scheduling/scheduler.py index 8d9ab237..dafde06e 100644 --- a/src/scheduling/scheduler.py +++ b/src/scheduling/scheduler.py @@ -151,7 +151,60 @@ def list_node_allocations(self) -> List[Tuple[str, int, int]]: """List the allocations of all nodes.""" return self.layer_allocator.list_node_allocations() - # Warm-up and re-shard + def _try_adjust_allocations_to_form_pipeline(self) -> bool: + """Try to form a full pipeline by adjusting existing node allocations, avoiding global reallocation. + + Applicable when: after a node leaves, remaining nodes have overlapping coverage but non-contiguous allocations. + Example: node-2: [0,22), node-1: [14,28) -> can be adjusted to node-2: [0,14), node-1: [14,28) + """ + # Collect and sort nodes with allocations + nodes = sorted( + [n for n in self.nodes if n.start_layer is not None and n.end_layer is not None], + key=lambda n: n.start_layer, + ) + if not nodes or nodes[0].start_layer != 0: + return False + + num_layers = self.model_info.num_layers + current_pos = 0 + adjustments = [] + + # Build continuous path and compute adjustments + for i, node in enumerate(nodes): + if current_pos >= num_layers: + break + if node.start_layer > current_pos: + return False # Gap exists, cannot recover + + # Skip nodes that don't cover current_pos + if node.end_layer <= current_pos: + continue + + new_end = min( + node.end_layer, + nodes[i + 1].start_layer if i + 1 < len(nodes) else num_layers, + num_layers, + ) + if new_end > current_pos: + adjustments.append((node, current_pos, new_end)) + current_pos = new_end + + if current_pos < num_layers: + return False + + # Apply adjustments + if adjustments: + logger.debug("Adjusting allocations to form continuous pipeline") + for node, new_start, new_end in adjustments: + if node.start_layer != new_start or node.end_layer != new_end: + logger.debug( + f" {node.node_id}: [{node.start_layer}, {node.end_layer}) -> [{new_start}, {new_end})" + ) + self.layer_allocator.deallocate(node) + self.layer_allocator.allocate(node, new_start, new_end) + + return self.layer_allocator.has_full_pipeline() + def _run_warmup_and_truncate(self) -> None: """Run a brief warm-up to detect truncation points and shrink shards. @@ -316,19 +369,27 @@ def leave(self, node_id: str) -> None: f"Mixed assignment detected ({manual_count} manual, {total_count - manual_count} automatic); skipping rebalance" ) else: - # All nodes are automatic, proceed with rebalance - self._bootstrapped = False - self._bootstrapped_event.clear() - for n in self.nodes: - if n.start_layer is not None and n.end_layer is not None: - self.layer_allocator.deallocate(n) - success = self.layer_allocator.global_allocation() - if not success: - logger.warning("Global rebalance failed to produce a full pipeline") + # All nodes are automatic, try to recover pipeline through adjustment first + # This avoids unnecessary global reallocation when nodes have overlapping coverage + if self._try_adjust_allocations_to_form_pipeline(): + logger.info( + "Pipeline recovered through allocation adjustment, skipping global rebalance" + ) else: - logger.debug("Global rebalance completed successfully") - self._bootstrapped = True - self._bootstrapped_event.set() + # If adjustment failed, proceed with full global rebalance + logger.debug("Allocation adjustment failed, proceeding with global rebalance") + self._bootstrapped = False + self._bootstrapped_event.clear() + for n in self.nodes: + if n.start_layer is not None and n.end_layer is not None: + self.layer_allocator.deallocate(n) + success = self.layer_allocator.global_allocation() + if not success: + logger.warning("Global rebalance failed to produce a full pipeline") + else: + logger.debug("Global rebalance completed successfully") + self._bootstrapped = True + self._bootstrapped_event.set() with self._node_count_cv: self._node_count_cv.notify_all() From d6bb0aab782c1c27dcb9b4c65524877cc55df2ad Mon Sep 17 00:00:00 2001 From: jason Date: Thu, 6 Nov 2025 20:51:21 +0800 Subject: [PATCH 05/10] remove has_full_active_pipeline --- src/backend/server/scheduler_manage.py | 2 +- src/scheduling/layer_allocation.py | 30 +++++--------------------- src/scheduling/scheduler.py | 2 +- 3 files changed, 7 insertions(+), 27 deletions(-) diff --git a/src/backend/server/scheduler_manage.py b/src/backend/server/scheduler_manage.py index b2fd52bc..99784f4d 100644 --- a/src/backend/server/scheduler_manage.py +++ b/src/backend/server/scheduler_manage.py @@ -247,7 +247,7 @@ def get_schedule_status(self): # todo rebalance status status = ( NODE_STATUS_AVAILABLE - if self.scheduler.layer_allocator.has_full_active_pipeline() + if self.scheduler.layer_allocator.has_full_pipeline(active_only=True) else NODE_STATUS_WAITING ) logger.debug(f"SchedulerManage status queried: {status}") diff --git a/src/scheduling/layer_allocation.py b/src/scheduling/layer_allocation.py index 242c7a6d..07939787 100644 --- a/src/scheduling/layer_allocation.py +++ b/src/scheduling/layer_allocation.py @@ -530,14 +530,12 @@ def _adjust_end_layer_for_tail(self, node: Node, proposed_start_layer: int) -> i return end_layer - def _check_pipeline_exists(self, active_only: bool = False) -> bool: - """Check if there exists at least one pipeline covering [0, num_total_layers). - - Args: - active_only: If True, only consider active nodes. + def has_full_pipeline(self, active_only: bool = False) -> bool: + """Return True if there exists at least one pipeline covering [0, num_total_layers). - Returns: - True if a complete pipeline exists, False otherwise. + Checks whether we can chain contiguous node allocations starting at 0 to reach L. + This requires that there exists at least one node starting at layer 0 and a chain + of contiguous node ranges that reaches num_total_layers. """ total_layers = self.num_total_layers @@ -571,24 +569,6 @@ def can_reach_target(current_end: int) -> bool: for head in start_to_nodes.get(0, []) ) - def has_full_pipeline(self) -> bool: - """Return True if there exists at least one pipeline covering [0, num_total_layers). - - Checks whether we can chain contiguous node allocations starting at 0 to reach L. - This requires that there exists at least one node starting at layer 0 and a chain - of contiguous node ranges that reaches num_total_layers. - """ - return self._check_pipeline_exists(active_only=False) - - def has_full_active_pipeline(self) -> bool: - """Return True if there exists at least one active pipeline covering [0, num_total_layers). - - Checks whether we can chain contiguous active node allocations starting at 0 to reach L. - This requires that there exists at least one active node starting at layer 0 and a chain - of contiguous node ranges that reaches num_total_layers. - """ - return self._check_pipeline_exists(active_only=True) - def layer_replication_stats(self) -> Tuple[int, int, float]: """Return (min, max, avg) number of nodes hosting each layer. diff --git a/src/scheduling/scheduler.py b/src/scheduling/scheduler.py index dafde06e..5342c1e3 100644 --- a/src/scheduling/scheduler.py +++ b/src/scheduling/scheduler.py @@ -354,7 +354,7 @@ def leave(self, node_id: str) -> None: ) self.layer_allocator.leave(node_id) if self.layer_allocator.should_global_rebalance(): - logger.debug("Global rebalance triggered due to node leave") + logger.debug("Reallocation needed due to node leave, attempting adjustment first") # Count manual vs automatic nodes manual_count = sum(1 for n in self.nodes if n.manual_layer_assignment) From ec9f8de6fbdac428bd6ff10fec1cb11ad7144d06 Mon Sep 17 00:00:00 2001 From: jason Date: Thu, 6 Nov 2025 21:01:10 +0800 Subject: [PATCH 06/10] remove no global allocation fix, for pytest error --- src/scheduling/scheduler.py | 86 ++++++------------------------------- 1 file changed, 12 insertions(+), 74 deletions(-) diff --git a/src/scheduling/scheduler.py b/src/scheduling/scheduler.py index 5342c1e3..a06e5243 100644 --- a/src/scheduling/scheduler.py +++ b/src/scheduling/scheduler.py @@ -151,60 +151,6 @@ def list_node_allocations(self) -> List[Tuple[str, int, int]]: """List the allocations of all nodes.""" return self.layer_allocator.list_node_allocations() - def _try_adjust_allocations_to_form_pipeline(self) -> bool: - """Try to form a full pipeline by adjusting existing node allocations, avoiding global reallocation. - - Applicable when: after a node leaves, remaining nodes have overlapping coverage but non-contiguous allocations. - Example: node-2: [0,22), node-1: [14,28) -> can be adjusted to node-2: [0,14), node-1: [14,28) - """ - # Collect and sort nodes with allocations - nodes = sorted( - [n for n in self.nodes if n.start_layer is not None and n.end_layer is not None], - key=lambda n: n.start_layer, - ) - if not nodes or nodes[0].start_layer != 0: - return False - - num_layers = self.model_info.num_layers - current_pos = 0 - adjustments = [] - - # Build continuous path and compute adjustments - for i, node in enumerate(nodes): - if current_pos >= num_layers: - break - if node.start_layer > current_pos: - return False # Gap exists, cannot recover - - # Skip nodes that don't cover current_pos - if node.end_layer <= current_pos: - continue - - new_end = min( - node.end_layer, - nodes[i + 1].start_layer if i + 1 < len(nodes) else num_layers, - num_layers, - ) - if new_end > current_pos: - adjustments.append((node, current_pos, new_end)) - current_pos = new_end - - if current_pos < num_layers: - return False - - # Apply adjustments - if adjustments: - logger.debug("Adjusting allocations to form continuous pipeline") - for node, new_start, new_end in adjustments: - if node.start_layer != new_start or node.end_layer != new_end: - logger.debug( - f" {node.node_id}: [{node.start_layer}, {node.end_layer}) -> [{new_start}, {new_end})" - ) - self.layer_allocator.deallocate(node) - self.layer_allocator.allocate(node, new_start, new_end) - - return self.layer_allocator.has_full_pipeline() - def _run_warmup_and_truncate(self) -> None: """Run a brief warm-up to detect truncation points and shrink shards. @@ -354,7 +300,7 @@ def leave(self, node_id: str) -> None: ) self.layer_allocator.leave(node_id) if self.layer_allocator.should_global_rebalance(): - logger.debug("Reallocation needed due to node leave, attempting adjustment first") + logger.debug("Global rebalance triggered due to node leave") # Count manual vs automatic nodes manual_count = sum(1 for n in self.nodes if n.manual_layer_assignment) @@ -370,26 +316,18 @@ def leave(self, node_id: str) -> None: ) else: # All nodes are automatic, try to recover pipeline through adjustment first - # This avoids unnecessary global reallocation when nodes have overlapping coverage - if self._try_adjust_allocations_to_form_pipeline(): - logger.info( - "Pipeline recovered through allocation adjustment, skipping global rebalance" - ) + self._bootstrapped = False + self._bootstrapped_event.clear() + for n in self.nodes: + if n.start_layer is not None and n.end_layer is not None: + self.layer_allocator.deallocate(n) + success = self.layer_allocator.global_allocation() + if not success: + logger.warning("Global rebalance failed to produce a full pipeline") else: - # If adjustment failed, proceed with full global rebalance - logger.debug("Allocation adjustment failed, proceeding with global rebalance") - self._bootstrapped = False - self._bootstrapped_event.clear() - for n in self.nodes: - if n.start_layer is not None and n.end_layer is not None: - self.layer_allocator.deallocate(n) - success = self.layer_allocator.global_allocation() - if not success: - logger.warning("Global rebalance failed to produce a full pipeline") - else: - logger.debug("Global rebalance completed successfully") - self._bootstrapped = True - self._bootstrapped_event.set() + logger.debug("Global rebalance completed successfully") + self._bootstrapped = True + self._bootstrapped_event.set() with self._node_count_cv: self._node_count_cv.notify_all() From f6de9eb5b3421b7e186bc5e6159e0746ed7b07ab Mon Sep 17 00:00:00 2001 From: jason Date: Thu, 6 Nov 2025 21:06:56 +0800 Subject: [PATCH 07/10] is_active: bool = True --- src/scheduling/node.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/scheduling/node.py b/src/scheduling/node.py index fd4e6f98..68f56418 100644 --- a/src/scheduling/node.py +++ b/src/scheduling/node.py @@ -185,7 +185,7 @@ class Node: current_requests: int = 0 # todo upload is_active - is_active: bool = False + is_active: bool = True last_heartbeat: float = 0.0 # Will be updated by node broadcasting # otherwise, use roofline performance model to estimate From 11b74f9f21dbf2851ceea25b79480b3b946f065f Mon Sep 17 00:00:00 2001 From: jason Date: Thu, 6 Nov 2025 21:21:05 +0800 Subject: [PATCH 08/10] reback: no global_allocation when left nodes can cover --- src/scheduling/scheduler.py | 85 +++++++++++++++++++++++++++++++------ 1 file changed, 73 insertions(+), 12 deletions(-) diff --git a/src/scheduling/scheduler.py b/src/scheduling/scheduler.py index a06e5243..8ac1e414 100644 --- a/src/scheduling/scheduler.py +++ b/src/scheduling/scheduler.py @@ -151,6 +151,59 @@ def list_node_allocations(self) -> List[Tuple[str, int, int]]: """List the allocations of all nodes.""" return self.layer_allocator.list_node_allocations() + def _try_adjust_allocations_to_form_pipeline(self) -> bool: + """Try to form a full pipeline by adjusting existing node allocations, avoiding global reallocation. + Applicable when: after a node leaves, remaining nodes have overlapping coverage but non-contiguous allocations. + Example: node-2: [0,22), node-1: [14,28) -> can be adjusted to node-2: [0,14), node-1: [14,28) + """ + # Collect and sort nodes with allocations + nodes = sorted( + [n for n in self.nodes if n.start_layer is not None and n.end_layer is not None], + key=lambda n: n.start_layer, + ) + if not nodes or nodes[0].start_layer != 0: + return False + + num_layers = self.model_info.num_layers + current_pos = 0 + adjustments = [] + + # Build continuous path and compute adjustments + for i, node in enumerate(nodes): + if current_pos >= num_layers: + break + if node.start_layer > current_pos: + return False # Gap exists, cannot recover + + # Skip nodes that don't cover current_pos + if node.end_layer <= current_pos: + continue + + new_end = min( + node.end_layer, + nodes[i + 1].start_layer if i + 1 < len(nodes) else num_layers, + num_layers, + ) + if new_end > current_pos: + adjustments.append((node, current_pos, new_end)) + current_pos = new_end + + if current_pos < num_layers: + return False + + # Apply adjustments + if adjustments: + logger.debug("Adjusting allocations to form continuous pipeline") + for node, new_start, new_end in adjustments: + if node.start_layer != new_start or node.end_layer != new_end: + logger.debug( + f" {node.node_id}: [{node.start_layer}, {node.end_layer}) -> [{new_start}, {new_end})" + ) + self.layer_allocator.deallocate(node) + self.layer_allocator.allocate(node, new_start, new_end) + + return self.layer_allocator.has_full_pipeline() + def _run_warmup_and_truncate(self) -> None: """Run a brief warm-up to detect truncation points and shrink shards. @@ -300,7 +353,7 @@ def leave(self, node_id: str) -> None: ) self.layer_allocator.leave(node_id) if self.layer_allocator.should_global_rebalance(): - logger.debug("Global rebalance triggered due to node leave") + logger.debug("Reallocation needed due to node leave, attempting adjustment first") # Count manual vs automatic nodes manual_count = sum(1 for n in self.nodes if n.manual_layer_assignment) @@ -316,18 +369,26 @@ def leave(self, node_id: str) -> None: ) else: # All nodes are automatic, try to recover pipeline through adjustment first - self._bootstrapped = False - self._bootstrapped_event.clear() - for n in self.nodes: - if n.start_layer is not None and n.end_layer is not None: - self.layer_allocator.deallocate(n) - success = self.layer_allocator.global_allocation() - if not success: - logger.warning("Global rebalance failed to produce a full pipeline") + # This avoids unnecessary global reallocation when nodes have overlapping coverage + if self._try_adjust_allocations_to_form_pipeline(): + logger.info( + "Pipeline recovered through allocation adjustment, skipping global rebalance" + ) else: - logger.debug("Global rebalance completed successfully") - self._bootstrapped = True - self._bootstrapped_event.set() + # If adjustment failed, proceed with full global rebalance + logger.debug("Allocation adjustment failed, proceeding with global rebalance") + self._bootstrapped = False + self._bootstrapped_event.clear() + for n in self.nodes: + if n.start_layer is not None and n.end_layer is not None: + self.layer_allocator.deallocate(n) + success = self.layer_allocator.global_allocation() + if not success: + logger.warning("Global rebalance failed to produce a full pipeline") + else: + logger.debug("Global rebalance completed successfully") + self._bootstrapped = True + self._bootstrapped_event.set() with self._node_count_cv: self._node_count_cv.notify_all() From b77a59d3b8dc041dddad33ca13bc5e0238106e2b Mon Sep 17 00:00:00 2001 From: jason Date: Thu, 6 Nov 2025 21:48:15 +0800 Subject: [PATCH 09/10] small change after gufeng review --- src/parallax/p2p/server.py | 6 +----- src/scheduling/layer_allocation.py | 1 + src/scheduling/model_info.py | 6 ------ src/scheduling/node.py | 5 ----- src/scheduling/scheduler.py | 1 + 5 files changed, 3 insertions(+), 16 deletions(-) diff --git a/src/parallax/p2p/server.py b/src/parallax/p2p/server.py index e839b533..85e5d091 100644 --- a/src/parallax/p2p/server.py +++ b/src/parallax/p2p/server.py @@ -570,7 +570,6 @@ def _announcer_thread(): try: while not self.stop_event.is_set(): # Announce the range ID - should_sleep = True try: if self.scheduler_peer_id is not None: response_future = self.scheduler_stub.node_update( @@ -617,8 +616,6 @@ def _announcer_thread(): "Layer allocation updated. Executor will reload on next check. " "Status set to INITIALIZING to prevent new requests." ) - # Skip sleep to immediately send next heartbeat with new status - should_sleep = False else: logger.warning(f"Heartbeat response: {response}") else: @@ -640,8 +637,7 @@ def _announcer_thread(): f"Failed to announce {self.prefix_id}_{self.lattica.peer_id()}: {e}" ) - if should_sleep: - time.sleep(10) + time.sleep(10) except Exception as e: logger.exception(f"Module announcer thread error: {e}") diff --git a/src/scheduling/layer_allocation.py b/src/scheduling/layer_allocation.py index 07939787..cee8799c 100644 --- a/src/scheduling/layer_allocation.py +++ b/src/scheduling/layer_allocation.py @@ -200,6 +200,7 @@ def deallocate(self, node: Node) -> None: if layer_id in self.layer_to_load: self.layer_to_load[layer_id].remove_node(node) node.clear_layer_allocation() + node.is_active = False self._update_layer_loads_heap() def declare(self, node: Node) -> None: diff --git a/src/scheduling/model_info.py b/src/scheduling/model_info.py index 9bc7f74a..a19c645e 100644 --- a/src/scheduling/model_info.py +++ b/src/scheduling/model_info.py @@ -184,12 +184,6 @@ def decoder_layer_io_bytes( ffn_params *= self.num_local_experts kv_cache_size = 0 - # logger.debug( - # "Model Info ffn_params=%d, kv_cache_size=%d, attention_params=%d", - # ffn_params, - # kv_cache_size, - # attention_params, - # ) return round(ffn_params + kv_cache_size + attention_params) def lm_head_flops(self, target_seq_len: int = 1) -> int: diff --git a/src/scheduling/node.py b/src/scheduling/node.py index 68f56418..6603ed53 100644 --- a/src/scheduling/node.py +++ b/src/scheduling/node.py @@ -280,11 +280,6 @@ def get_decoder_layer_capacity( if not (include_input_embed and self.model_info.tie_embedding): available_memory_bytes -= self.model_info.embedding_io_bytes - # logger.debug( - # "Node available_memory_bytes=%d, decoder_layer_io_bytes=%d", - # available_memory_bytes, - # self.model_info.decoder_layer_io_bytes(roofline=False), - # ) if self.hardware.device == "mlx": # For mlx, consider mlx bit factor return floor( diff --git a/src/scheduling/scheduler.py b/src/scheduling/scheduler.py index 8ac1e414..4a1170ce 100644 --- a/src/scheduling/scheduler.py +++ b/src/scheduling/scheduler.py @@ -204,6 +204,7 @@ def _try_adjust_allocations_to_form_pipeline(self) -> bool: return self.layer_allocator.has_full_pipeline() + # Warm-up and re-shard def _run_warmup_and_truncate(self) -> None: """Run a brief warm-up to detect truncation points and shrink shards. From af8a00cded5a71ec1313de4d2ebe9d673c01a38c Mon Sep 17 00:00:00 2001 From: jason Date: Thu, 6 Nov 2025 23:27:45 +0800 Subject: [PATCH 10/10] remove _try_adjust_allocations_to_form_pipeline --- src/scheduling/scheduler.py | 87 ++++++------------------------------- 1 file changed, 13 insertions(+), 74 deletions(-) diff --git a/src/scheduling/scheduler.py b/src/scheduling/scheduler.py index 4a1170ce..8d9ab237 100644 --- a/src/scheduling/scheduler.py +++ b/src/scheduling/scheduler.py @@ -151,59 +151,6 @@ def list_node_allocations(self) -> List[Tuple[str, int, int]]: """List the allocations of all nodes.""" return self.layer_allocator.list_node_allocations() - def _try_adjust_allocations_to_form_pipeline(self) -> bool: - """Try to form a full pipeline by adjusting existing node allocations, avoiding global reallocation. - Applicable when: after a node leaves, remaining nodes have overlapping coverage but non-contiguous allocations. - Example: node-2: [0,22), node-1: [14,28) -> can be adjusted to node-2: [0,14), node-1: [14,28) - """ - # Collect and sort nodes with allocations - nodes = sorted( - [n for n in self.nodes if n.start_layer is not None and n.end_layer is not None], - key=lambda n: n.start_layer, - ) - if not nodes or nodes[0].start_layer != 0: - return False - - num_layers = self.model_info.num_layers - current_pos = 0 - adjustments = [] - - # Build continuous path and compute adjustments - for i, node in enumerate(nodes): - if current_pos >= num_layers: - break - if node.start_layer > current_pos: - return False # Gap exists, cannot recover - - # Skip nodes that don't cover current_pos - if node.end_layer <= current_pos: - continue - - new_end = min( - node.end_layer, - nodes[i + 1].start_layer if i + 1 < len(nodes) else num_layers, - num_layers, - ) - if new_end > current_pos: - adjustments.append((node, current_pos, new_end)) - current_pos = new_end - - if current_pos < num_layers: - return False - - # Apply adjustments - if adjustments: - logger.debug("Adjusting allocations to form continuous pipeline") - for node, new_start, new_end in adjustments: - if node.start_layer != new_start or node.end_layer != new_end: - logger.debug( - f" {node.node_id}: [{node.start_layer}, {node.end_layer}) -> [{new_start}, {new_end})" - ) - self.layer_allocator.deallocate(node) - self.layer_allocator.allocate(node, new_start, new_end) - - return self.layer_allocator.has_full_pipeline() - # Warm-up and re-shard def _run_warmup_and_truncate(self) -> None: """Run a brief warm-up to detect truncation points and shrink shards. @@ -354,7 +301,7 @@ def leave(self, node_id: str) -> None: ) self.layer_allocator.leave(node_id) if self.layer_allocator.should_global_rebalance(): - logger.debug("Reallocation needed due to node leave, attempting adjustment first") + logger.debug("Global rebalance triggered due to node leave") # Count manual vs automatic nodes manual_count = sum(1 for n in self.nodes if n.manual_layer_assignment) @@ -369,27 +316,19 @@ def leave(self, node_id: str) -> None: f"Mixed assignment detected ({manual_count} manual, {total_count - manual_count} automatic); skipping rebalance" ) else: - # All nodes are automatic, try to recover pipeline through adjustment first - # This avoids unnecessary global reallocation when nodes have overlapping coverage - if self._try_adjust_allocations_to_form_pipeline(): - logger.info( - "Pipeline recovered through allocation adjustment, skipping global rebalance" - ) + # All nodes are automatic, proceed with rebalance + self._bootstrapped = False + self._bootstrapped_event.clear() + for n in self.nodes: + if n.start_layer is not None and n.end_layer is not None: + self.layer_allocator.deallocate(n) + success = self.layer_allocator.global_allocation() + if not success: + logger.warning("Global rebalance failed to produce a full pipeline") else: - # If adjustment failed, proceed with full global rebalance - logger.debug("Allocation adjustment failed, proceeding with global rebalance") - self._bootstrapped = False - self._bootstrapped_event.clear() - for n in self.nodes: - if n.start_layer is not None and n.end_layer is not None: - self.layer_allocator.deallocate(n) - success = self.layer_allocator.global_allocation() - if not success: - logger.warning("Global rebalance failed to produce a full pipeline") - else: - logger.debug("Global rebalance completed successfully") - self._bootstrapped = True - self._bootstrapped_event.set() + logger.debug("Global rebalance completed successfully") + self._bootstrapped = True + self._bootstrapped_event.set() with self._node_count_cv: self._node_count_cv.notify_all()