Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
179 changes: 179 additions & 0 deletions src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ use crate::logger::{log_error, LdkLogger, LogLevel, LogWriter, Logger};
use crate::message_handler::NodeCustomMessageHandler;
use crate::payment::asynchronous::om_mailbox::OnionMessageMailbox;
use crate::peer_store::PeerStore;
use crate::probing::{HighCapacityStrategy, ProbingService, ProbingStrategy};
use crate::runtime::Runtime;
use crate::tx_broadcaster::TransactionBroadcaster;
use crate::types::{
Expand Down Expand Up @@ -129,6 +130,56 @@ struct LiquiditySourceConfig {
lsps2_service: Option<LSPS2ServiceConfig>,
}

#[derive(Clone, Debug)]
struct ProbingServiceConfig {
/// Time in seconds between consecutive probing attempts.
probing_interval_secs: u64,

/// Amount in milli-satoshis used for each probe.
probing_amount_msat: u64,

/// Configuration for the probing strategy as a shareable trait-object.
strategy: ProbingStrategyConfig,
}

pub enum ProbingStrategyConfig {
Custom { strategy: Arc<dyn ProbingStrategy + Send + Sync> },
HighCapacity { max_targets_per_cycle: usize, target_cache_reuse_limit: usize },
}

impl fmt::Debug for ProbingStrategyConfig {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Custom { .. } => {
f.debug_struct("Custom").field("strategy", &"<ProbingStrategy>").finish()
},
Self::HighCapacity {
max_targets_per_cycle: max_targets,
target_cache_reuse_limit: max_reloads,
} => f
.debug_struct("HighCapacity")
.field("max_targets", max_targets)
.field("max_reloads", max_reloads)
.finish(),
}
}
}

impl Clone for ProbingStrategyConfig {
fn clone(&self) -> Self {
match self {
Self::Custom { strategy } => Self::Custom { strategy: Arc::clone(strategy) },
Self::HighCapacity {
max_targets_per_cycle: max_targets,
target_cache_reuse_limit: max_reloads,
} => Self::HighCapacity {
max_targets_per_cycle: *max_targets,
target_cache_reuse_limit: *max_reloads,
},
}
}
}

#[derive(Clone)]
enum LogWriterConfig {
File { log_file_path: Option<String>, max_log_level: Option<LogLevel> },
Expand Down Expand Up @@ -253,6 +304,7 @@ pub struct NodeBuilder {
async_payments_role: Option<AsyncPaymentsRole>,
runtime_handle: Option<tokio::runtime::Handle>,
pathfinding_scores_sync_config: Option<PathfindingScoresSyncConfig>,
probing_service_config: Option<ProbingServiceConfig>,
}

impl NodeBuilder {
Expand All @@ -271,6 +323,7 @@ impl NodeBuilder {
let log_writer_config = None;
let runtime_handle = None;
let pathfinding_scores_sync_config = None;
let probing_service_config = None;
Self {
config,
entropy_source_config,
Expand All @@ -281,6 +334,7 @@ impl NodeBuilder {
runtime_handle,
async_payments_role: None,
pathfinding_scores_sync_config,
probing_service_config,
}
}

Expand Down Expand Up @@ -488,6 +542,55 @@ impl NodeBuilder {
self
}

/// Configures the probing service with a custom target selection strategy.
///
/// This allows full control over how probing targets are selected by providing
/// a custom implementation of the [`ProbingStrategy`] trait.
///
/// # Parameters
/// * `probing_interval_secs` - Seconds between probing cycles
/// * `probing_amount_msat` - Amount in milli-satoshis per probe
/// * `strategy` - Custom [`ProbingStrategy`] implementation
///
/// [`ProbingStrategy`]: crate::probing::ProbingStrategy
pub fn set_probing_service_with_custom_strategy<T: ProbingStrategy + Send + Sync + 'static>(
&mut self, probing_interval_secs: u64, probing_amount_msat: u64, strategy: T,
) -> &mut Self {
self.probing_service_config = Some(ProbingServiceConfig {
probing_interval_secs,
probing_amount_msat,
strategy: ProbingStrategyConfig::Custom { strategy: Arc::new(strategy) },
});
self
}

/// Configures the probing service with the built-in high-capacity strategy.
///
/// Targets peers with the highest total channel capacity to assess liquidity
/// on the most significant network routes.
///
/// # Parameters
/// * `probing_interval_secs` - Seconds between probing cycles
/// * `probing_amount_msat` - Amount in milli-satoshis per probe
/// * `max_targets_per_cycle` - Maximum peers to probe each cycle
/// * `target_cache_reuse_limit` - Number of cycles to reuse targets before refreshing.
/// Acts as a cache: targets are reloaded from the network graph only after this many cycles,
/// reducing overhead while adapting to network changes.
pub fn set_probing_service_with_high_capacity_strategy(
&mut self, probing_interval_secs: u64, probing_amount_msat: u64,
max_targets_per_cycle: usize, target_cache_reuse_limit: usize,
) -> &mut Self {
self.probing_service_config = Some(ProbingServiceConfig {
probing_interval_secs,
probing_amount_msat,
strategy: ProbingStrategyConfig::HighCapacity {
max_targets_per_cycle,
target_cache_reuse_limit,
},
});
self
}

/// Sets the used storage directory path.
pub fn set_storage_dir_path(&mut self, storage_dir_path: String) -> &mut Self {
self.config.storage_dir_path = storage_dir_path;
Expand Down Expand Up @@ -744,6 +847,7 @@ impl NodeBuilder {
runtime,
logger,
Arc::new(vss_store),
self.probing_service_config.as_ref(),
)
}

Expand Down Expand Up @@ -778,6 +882,7 @@ impl NodeBuilder {
runtime,
logger,
kv_store,
self.probing_service_config.as_ref(),
)
}
}
Expand Down Expand Up @@ -977,6 +1082,51 @@ impl ArcedNodeBuilder {
self.inner.write().unwrap().set_liquidity_provider_lsps2(service_config);
}

/// Configures the probing service with a custom target selection strategy.
///
/// This allows full control over how probing targets are selected by providing
/// a custom implementation of the [`ProbingStrategy`] trait.
///
/// # Parameters
/// * `probing_interval_secs` - Seconds between probing cycles
/// * `probing_amount_msat` - Amount in milli-satoshis per probe
/// * `strategy` - Custom [`ProbingStrategy`] implementation
///
/// [`ProbingStrategy`]: crate::probing::ProbingStrategy
pub fn set_probing_service_with_custom_strategy<T: ProbingStrategy + Send + Sync + 'static>(
&self, probing_interval_secs: u64, probing_amount_msat: u64, strategy: T,
) {
self.inner.write().unwrap().set_probing_service_with_custom_strategy(
probing_interval_secs,
probing_amount_msat,
strategy,
);
}

/// Configures the probing service with the built-in high-capacity strategy.
///
/// Targets peers with the highest total channel capacity to assess liquidity
/// on the most significant network routes.
///
/// # Parameters
/// * `probing_interval_secs` - Seconds between probing cycles
/// * `probing_amount_msat` - Amount in milli-satoshis per probe
/// * `max_targets_per_cycle` - Maximum peers to probe each cycle
/// * `target_cache_reuse_limit` - Number of cycles to reuse targets before refreshing.
/// Acts as a cache: targets are reloaded from the network graph only after this many cycles,
/// reducing overhead while adapting to network changes.
pub fn set_probing_service_with_high_capacity_strategy(
&self, probing_interval_secs: u64, probing_amount_msat: u64, max_targets_per_cycle: usize,
target_cache_reuse_limit: usize,
) {
self.inner.write().unwrap().set_probing_service_with_high_capacity_strategy(
probing_interval_secs,
probing_amount_msat,
max_targets_per_cycle,
target_cache_reuse_limit,
);
}

/// Sets the used storage directory path.
pub fn set_storage_dir_path(&self, storage_dir_path: String) {
self.inner.write().unwrap().set_storage_dir_path(storage_dir_path);
Expand Down Expand Up @@ -1142,6 +1292,7 @@ fn build_with_store_internal(
pathfinding_scores_sync_config: Option<&PathfindingScoresSyncConfig>,
async_payments_role: Option<AsyncPaymentsRole>, seed_bytes: [u8; 64], runtime: Arc<Runtime>,
logger: Arc<Logger>, kv_store: Arc<DynStore>,
probing_service_config: Option<&ProbingServiceConfig>,
) -> Result<Node, BuildError> {
optionally_install_rustls_cryptoprovider();

Expand Down Expand Up @@ -1767,6 +1918,33 @@ fn build_with_store_internal(

let pathfinding_scores_sync_url = pathfinding_scores_sync_config.map(|c| c.url.clone());

let probing_service = if let Some(pro_ser) = probing_service_config {
let strategy: Arc<dyn ProbingStrategy + Send + Sync> = match &pro_ser.strategy {
ProbingStrategyConfig::Custom { strategy } => Arc::clone(strategy),
ProbingStrategyConfig::HighCapacity {
max_targets_per_cycle: max_targets,
target_cache_reuse_limit: max_reloads,
} => Arc::new(HighCapacityStrategy::new(
Arc::clone(&network_graph),
*max_targets,
*max_reloads,
)),
};
Some(Arc::new(ProbingService::new(
pro_ser.probing_interval_secs,
pro_ser.probing_amount_msat,
Arc::clone(&strategy),
Arc::clone(&config),
Arc::clone(&logger),
Arc::clone(&channel_manager),
Arc::clone(&keys_manager),
Arc::clone(&is_running),
Arc::clone(&payment_store),
)))
} else {
None
};

Ok(Node {
runtime,
stop_sender,
Expand Down Expand Up @@ -1797,6 +1975,7 @@ fn build_with_store_internal(
node_metrics,
om_mailbox,
async_payments_role,
probing_service,
})
}

Expand Down
35 changes: 35 additions & 0 deletions src/graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ use lightning::routing::gossip::RoutingFees;
#[cfg(not(feature = "uniffi"))]
use lightning::routing::gossip::{ChannelInfo, NodeInfo};

use std::cmp::Reverse;
use std::collections::{BinaryHeap, HashMap};

use crate::types::Graph;

/// Represents the network as nodes and channels between them.
Expand Down Expand Up @@ -48,6 +51,38 @@ impl NetworkGraph {
pub fn node(&self, node_id: &NodeId) -> Option<NodeInfo> {
self.inner.read_only().nodes().get(node_id).cloned().map(|n| n.into())
}

/// Selects nodes with the highest total channel capacity in the network.
pub fn select_highest_capacity_nodes(&self, quantity_nodes: usize) -> Vec<NodeId> {
// Calculate total capacity for each node by summing all their channel capacities
let node_capacities = self.inner.read_only().channels().unordered_iter().fold(
HashMap::new(),
|mut acc, (_, chan_info)| {
let cap = chan_info.capacity_sats.unwrap_or(0);
*acc.entry(chan_info.node_one).or_insert(0) += cap;
*acc.entry(chan_info.node_two).or_insert(0) += cap;
acc
},
);

// Use a min-heap to efficiently track the top N nodes by capacity
node_capacities
.into_iter()
.fold(BinaryHeap::with_capacity(quantity_nodes), |mut top_heap, (node_id, cap)| {
if top_heap.len() < quantity_nodes {
top_heap.push(Reverse((cap, node_id)));
} else if let Some(Reverse((min_cap, _))) = top_heap.peek() {
if cap > *min_cap {
top_heap.pop();
top_heap.push(Reverse((cap, node_id)));
}
}
top_heap
})
.into_iter()
.map(|Reverse((_, node_id))| node_id)
.collect()
}
}

/// Details about a channel (both directions).
Expand Down
30 changes: 30 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ pub mod logger;
mod message_handler;
pub mod payment;
mod peer_store;
mod probing;
mod runtime;
mod scoring;
mod tx_broadcaster;
Expand Down Expand Up @@ -149,6 +150,8 @@ use payment::{
UnifiedQrPayment,
};
use peer_store::{PeerInfo, PeerStore};
use probing::ProbingService;
pub use probing::ProbingStrategy;
use rand::Rng;
use runtime::Runtime;
use types::{
Expand Down Expand Up @@ -196,6 +199,7 @@ pub struct Node {
scorer: Arc<Mutex<Scorer>>,
peer_store: Arc<PeerStore<Arc<Logger>>>,
payment_store: Arc<PaymentStore>,
probing_service: Option<Arc<ProbingService>>,
is_running: Arc<RwLock<bool>>,
node_metrics: Arc<RwLock<NodeMetrics>>,
om_mailbox: Option<Arc<OnionMessageMailbox>>,
Expand Down Expand Up @@ -625,6 +629,32 @@ impl Node {
});
}

if let Some(probing_service) = self.probing_service.as_ref() {
let mut stop_probing_service = self.stop_sender.subscribe();
let probing_service = Arc::clone(probing_service);
let probing_logger = Arc::clone(&self.logger);

self.runtime.spawn_cancellable_background_task(async move {
let mut interval = tokio::time::interval(Duration::from_secs(
probing_service.probing_interval_secs,
));
loop {
tokio::select! {
_ = stop_probing_service.changed() => {
log_debug!(
probing_logger,
"Stopping probing service.",
);
return;
}
_ = interval.tick() => {
probing_service.handle_probing();
}
}
}
});
}

log_info!(self.logger, "Startup complete.");
*is_running_lock = true;
Ok(())
Expand Down
Loading
Loading