Skip to content

Commit 5b5f32c

Browse files
Add ProbingService: optional service for periodic probe payments to evaluate channel liquidity and identify reliable routes
- Allows clients to configure probing interval, max targets, and probe amount in sats - Targets peers with highest capacity for sending small, intentional payment failures - Non-mandatory feature to enhance routing decisions via network probing
1 parent 9d71d3a commit 5b5f32c

File tree

4 files changed

+227
-0
lines changed

4 files changed

+227
-0
lines changed

src/builder.rs

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ use crate::logger::{log_error, LdkLogger, LogLevel, LogWriter, Logger};
6666
use crate::message_handler::NodeCustomMessageHandler;
6767
use crate::payment::asynchronous::om_mailbox::OnionMessageMailbox;
6868
use crate::peer_store::PeerStore;
69+
use crate::probing::ProbingService;
6970
use crate::runtime::Runtime;
7071
use crate::tx_broadcaster::TransactionBroadcaster;
7172
use crate::types::{
@@ -129,6 +130,16 @@ struct LiquiditySourceConfig {
129130
lsps2_service: Option<LSPS2ServiceConfig>,
130131
}
131132

133+
#[derive(Debug, Clone)]
134+
struct ProbingServiceConfig {
135+
/// Time in seconds between consecutive probing attempts.
136+
probing_interval_secs: u64,
137+
/// Maximum number of distinct targets to probe concurrently.
138+
max_probing_targets: usize,
139+
/// Amount in milli-satoshis used for each probe.
140+
probing_amount_msat: u64,
141+
}
142+
132143
#[derive(Clone)]
133144
enum LogWriterConfig {
134145
File { log_file_path: Option<String>, max_log_level: Option<LogLevel> },
@@ -253,6 +264,7 @@ pub struct NodeBuilder {
253264
async_payments_role: Option<AsyncPaymentsRole>,
254265
runtime_handle: Option<tokio::runtime::Handle>,
255266
pathfinding_scores_sync_config: Option<PathfindingScoresSyncConfig>,
267+
probing_service_config: Option<ProbingServiceConfig>,
256268
}
257269

258270
impl NodeBuilder {
@@ -271,6 +283,7 @@ impl NodeBuilder {
271283
let log_writer_config = None;
272284
let runtime_handle = None;
273285
let pathfinding_scores_sync_config = None;
286+
let probing_service_config = None;
274287
Self {
275288
config,
276289
entropy_source_config,
@@ -281,6 +294,7 @@ impl NodeBuilder {
281294
runtime_handle,
282295
async_payments_role: None,
283296
pathfinding_scores_sync_config,
297+
probing_service_config,
284298
}
285299
}
286300

@@ -488,6 +502,23 @@ impl NodeBuilder {
488502
self
489503
}
490504

505+
/// Configures the probing service used to evaluate channel liquidity by sending
506+
/// pre-flight probes to peers and routes.
507+
///
508+
/// * `probing_interval_secs` - Time in seconds between consecutive probing attempts.
509+
/// * `max_probing_targets` - Maximum number of distinct targets to probe concurrently.
510+
/// * `probing_amount_msat` - Amount in milli-satoshis used for each probe.
511+
pub fn set_probing_service_config(
512+
&mut self, probing_interval_secs: u64, max_probing_targets: usize, probing_amount_msat: u64,
513+
) -> &mut Self {
514+
self.probing_service_config = Some(ProbingServiceConfig {
515+
probing_interval_secs,
516+
max_probing_targets,
517+
probing_amount_msat,
518+
});
519+
self
520+
}
521+
491522
/// Sets the used storage directory path.
492523
pub fn set_storage_dir_path(&mut self, storage_dir_path: String) -> &mut Self {
493524
self.config.storage_dir_path = storage_dir_path;
@@ -744,6 +775,7 @@ impl NodeBuilder {
744775
runtime,
745776
logger,
746777
Arc::new(vss_store),
778+
self.probing_service_config.as_ref(),
747779
)
748780
}
749781

@@ -778,6 +810,7 @@ impl NodeBuilder {
778810
runtime,
779811
logger,
780812
kv_store,
813+
self.probing_service_config.as_ref(),
781814
)
782815
}
783816
}
@@ -982,6 +1015,17 @@ impl ArcedNodeBuilder {
9821015
self.inner.write().unwrap().set_storage_dir_path(storage_dir_path);
9831016
}
9841017

1018+
// Sets the probing service used to evaluate channel liquidity by sending
1019+
pub fn set_probing_service_config(
1020+
&self, probing_interval_secs: u64, max_probing_targets: usize, probing_amount_msat: u64,
1021+
) {
1022+
self.inner.write().unwrap().set_probing_service_config(
1023+
probing_interval_secs,
1024+
max_probing_targets,
1025+
probing_amount_msat,
1026+
);
1027+
}
1028+
9851029
/// Configures the [`Node`] instance to write logs to the filesystem.
9861030
///
9871031
/// The `log_file_path` defaults to [`DEFAULT_LOG_FILENAME`] in the configured
@@ -1142,6 +1186,7 @@ fn build_with_store_internal(
11421186
pathfinding_scores_sync_config: Option<&PathfindingScoresSyncConfig>,
11431187
async_payments_role: Option<AsyncPaymentsRole>, seed_bytes: [u8; 64], runtime: Arc<Runtime>,
11441188
logger: Arc<Logger>, kv_store: Arc<DynStore>,
1189+
probing_service_config: Option<&ProbingServiceConfig>,
11451190
) -> Result<Node, BuildError> {
11461191
optionally_install_rustls_cryptoprovider();
11471192

@@ -1760,6 +1805,23 @@ fn build_with_store_internal(
17601805

17611806
let pathfinding_scores_sync_url = pathfinding_scores_sync_config.map(|c| c.url.clone());
17621807

1808+
let probing_service = if let Some(pro_ser) = probing_service_config {
1809+
Some(Arc::new(ProbingService::new(
1810+
pro_ser.probing_interval_secs,
1811+
pro_ser.max_probing_targets,
1812+
pro_ser.probing_amount_msat,
1813+
Arc::clone(&config),
1814+
Arc::clone(&logger),
1815+
Arc::clone(&channel_manager),
1816+
Arc::clone(&keys_manager),
1817+
Arc::clone(&is_running),
1818+
Arc::clone(&payment_store),
1819+
Arc::clone(&network_graph),
1820+
)))
1821+
} else {
1822+
None
1823+
};
1824+
17631825
Ok(Node {
17641826
runtime,
17651827
stop_sender,
@@ -1790,6 +1852,7 @@ fn build_with_store_internal(
17901852
node_metrics,
17911853
om_mailbox,
17921854
async_payments_role,
1855+
probing_service,
17931856
})
17941857
}
17951858

src/graph.rs

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,9 @@ use lightning::routing::gossip::RoutingFees;
1717
#[cfg(not(feature = "uniffi"))]
1818
use lightning::routing::gossip::{ChannelInfo, NodeInfo};
1919

20+
use std::cmp::Reverse;
21+
use std::collections::{BinaryHeap, HashMap};
22+
2023
use crate::types::Graph;
2124

2225
/// Represents the network as nodes and channels between them.
@@ -48,6 +51,38 @@ impl NetworkGraph {
4851
pub fn node(&self, node_id: &NodeId) -> Option<NodeInfo> {
4952
self.inner.read_only().nodes().get(node_id).cloned().map(|n| n.into())
5053
}
54+
55+
/// Selects a set of nodes as targets for probing based on their total channel capacity.
56+
pub fn select_probing_targets(&self, max_targets: usize) -> Vec<NodeId> {
57+
// Compute the total capacity for each node
58+
let node_capacities = self.inner.read_only().channels().unordered_iter().fold(
59+
HashMap::new(),
60+
|mut acc, (_, chan_info)| {
61+
let cap = chan_info.capacity_sats.unwrap_or(0);
62+
*acc.entry(chan_info.node_one).or_insert(0) += cap;
63+
*acc.entry(chan_info.node_two).or_insert(0) += cap;
64+
acc
65+
},
66+
);
67+
68+
// Use a min-heap to keep track of the top `max_targets` nodes by capacity
69+
node_capacities
70+
.into_iter()
71+
.fold(BinaryHeap::with_capacity(max_targets), |mut top_heap, (node_id, cap)| {
72+
if top_heap.len() < max_targets {
73+
top_heap.push(Reverse((cap, node_id)));
74+
} else if let Some(Reverse((min_cap, _))) = top_heap.peek() {
75+
if cap > *min_cap {
76+
top_heap.pop();
77+
top_heap.push(Reverse((cap, node_id)));
78+
}
79+
}
80+
top_heap
81+
})
82+
.into_iter()
83+
.map(|Reverse((_, node_id))| node_id)
84+
.collect()
85+
}
5186
}
5287

5388
/// Details about a channel (both directions).

src/lib.rs

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,7 @@ pub mod logger;
9696
mod message_handler;
9797
pub mod payment;
9898
mod peer_store;
99+
mod probing;
99100
mod runtime;
100101
mod scoring;
101102
mod tx_broadcaster;
@@ -149,6 +150,7 @@ use payment::{
149150
UnifiedQrPayment,
150151
};
151152
use peer_store::{PeerInfo, PeerStore};
153+
use probing::ProbingService;
152154
use rand::Rng;
153155
use runtime::Runtime;
154156
use types::{
@@ -196,6 +198,7 @@ pub struct Node {
196198
scorer: Arc<Mutex<Scorer>>,
197199
peer_store: Arc<PeerStore<Arc<Logger>>>,
198200
payment_store: Arc<PaymentStore>,
201+
probing_service: Option<Arc<ProbingService>>,
199202
is_running: Arc<RwLock<bool>>,
200203
node_metrics: Arc<RwLock<NodeMetrics>>,
201204
om_mailbox: Option<Arc<OnionMessageMailbox>>,
@@ -625,6 +628,32 @@ impl Node {
625628
});
626629
}
627630

631+
if let Some(probing_service) = self.probing_service.as_ref() {
632+
let mut stop_probing_service = self.stop_sender.subscribe();
633+
let probing_service = Arc::clone(probing_service);
634+
let probing_logger = Arc::clone(&self.logger);
635+
636+
self.runtime.spawn_cancellable_background_task(async move {
637+
let mut interval = tokio::time::interval(Duration::from_secs(
638+
probing_service.probing_interval_secs,
639+
));
640+
loop {
641+
tokio::select! {
642+
_ = stop_probing_service.changed() => {
643+
log_debug!(
644+
probing_logger,
645+
"Stopping probing service.",
646+
);
647+
return;
648+
}
649+
_ = interval.tick() => {
650+
probing_service.handle_probing();
651+
}
652+
}
653+
}
654+
});
655+
}
656+
628657
log_info!(self.logger, "Startup complete.");
629658
*is_running_lock = true;
630659
Ok(())

src/probing.rs

Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
use std::sync::{Arc, RwLock};
2+
3+
use crate::{
4+
config::Config,
5+
graph::NetworkGraph,
6+
logger::{log_debug, log_error, LdkLogger, Logger},
7+
payment::SpontaneousPayment,
8+
types::{ChannelManager, Graph, KeysManager, PaymentStore},
9+
};
10+
11+
/// Configuration for the probing service used to evaluate channel liquidity by sending pre-flight
12+
/// probes to peers and routes.
13+
pub struct ProbingService {
14+
pub probing_interval_secs: u64,
15+
max_probing_targets: usize,
16+
probing_amount_msat: u64,
17+
config: Arc<Config>,
18+
logger: Arc<Logger>,
19+
channel_manager: Arc<ChannelManager>,
20+
keys_manager: Arc<KeysManager>,
21+
is_running: Arc<RwLock<bool>>,
22+
payment_store: Arc<PaymentStore>,
23+
network_graph: Arc<Graph>,
24+
}
25+
26+
impl ProbingService {
27+
/// Creates a new probing service with the given configuration and dependencies.
28+
pub fn new(
29+
probing_interval_secs: u64, max_probing_targets: usize, probing_amount_msat: u64,
30+
config: Arc<Config>, logger: Arc<Logger>, channel_manager: Arc<ChannelManager>,
31+
keys_manager: Arc<KeysManager>, is_running: Arc<RwLock<bool>>,
32+
payment_store: Arc<PaymentStore>, network_graph: Arc<Graph>,
33+
) -> Self {
34+
Self {
35+
probing_interval_secs,
36+
max_probing_targets,
37+
probing_amount_msat,
38+
config,
39+
logger,
40+
channel_manager,
41+
keys_manager,
42+
is_running,
43+
payment_store,
44+
network_graph,
45+
}
46+
}
47+
48+
pub fn handle_probing(&self) {
49+
let channels = self.channel_manager.list_channels().len();
50+
if channels == 0 {
51+
log_debug!(self.logger, "Probing service found no channels, skipping probing.");
52+
return;
53+
}
54+
55+
let network = self.network_graph();
56+
let spontaneous_payment = self.spontaneous_payment();
57+
58+
let targets = network.select_probing_targets(self.max_probing_targets);
59+
for target in targets {
60+
let public_key = match target.as_pubkey() {
61+
Ok(pk) => pk,
62+
Err(_) => {
63+
log_error!(
64+
self.logger,
65+
"Probing service failed to get target pubkey: {}",
66+
target
67+
);
68+
continue;
69+
},
70+
};
71+
72+
match spontaneous_payment.send_probes(self.probing_amount_msat, public_key) {
73+
Ok(_) => {
74+
log_debug!(self.logger, "Probing service sent probe to target: {}", public_key)
75+
},
76+
Err(e) => log_error!(
77+
self.logger,
78+
"Probing service failed to send probe to target {}: {}",
79+
public_key,
80+
e
81+
),
82+
}
83+
}
84+
}
85+
86+
fn network_graph(&self) -> NetworkGraph {
87+
NetworkGraph::new(Arc::clone(&self.network_graph))
88+
}
89+
90+
fn spontaneous_payment(&self) -> SpontaneousPayment {
91+
SpontaneousPayment::new(
92+
Arc::clone(&self.channel_manager),
93+
Arc::clone(&self.keys_manager),
94+
Arc::clone(&self.payment_store),
95+
Arc::clone(&self.config),
96+
Arc::clone(&self.is_running),
97+
Arc::clone(&self.logger),
98+
)
99+
}
100+
}

0 commit comments

Comments
 (0)