@@ -36,7 +36,7 @@ use crate::ln::types::ChannelId;
3636use crate :: sign:: { ecdsa:: EcdsaChannelSigner , EntropySource , SignerProvider } ;
3737use crate :: sync:: Mutex ;
3838use crate :: util:: async_poll:: {
39- dummy_waker, MaybeSend , MaybeSync , MultiResultFuturePoller , ResultFuture ,
39+ dummy_waker, MaybeSend , MaybeSync , MultiResultFuturePoller , ResultFuture , TwoFutureJoiner ,
4040} ;
4141use crate :: util:: logger:: Logger ;
4242use crate :: util:: native_async:: FutureSpawner ;
@@ -493,15 +493,6 @@ fn poll_sync_future<F: Future>(future: F) -> F::Output {
493493/// list channel monitors themselves and load channels individually using
494494/// [`MonitorUpdatingPersister::read_channel_monitor_with_updates`].
495495///
496- /// ## EXTREMELY IMPORTANT
497- ///
498- /// It is extremely important that your [`KVStoreSync::read`] implementation uses the
499- /// [`io::ErrorKind::NotFound`] variant correctly: that is, when a file is not found, and _only_ in
500- /// that circumstance (not when there is really a permissions error, for example). This is because
501- /// neither channel monitor reading function lists updates. Instead, either reads the monitor, and
502- /// using its stored `update_id`, synthesizes update storage keys, and tries them in sequence until
503- /// one is not found. All _other_ errors will be bubbled up in the function's [`Result`].
504- ///
505496/// # Pruning stale channel updates
506497///
507498/// Stale updates are pruned when the consolidation threshold is reached according to `maximum_pending_updates`.
@@ -569,10 +560,6 @@ where
569560 }
570561
571562 /// Reads all stored channel monitors, along with any stored updates for them.
572- ///
573- /// It is extremely important that your [`KVStoreSync::read`] implementation uses the
574- /// [`io::ErrorKind::NotFound`] variant correctly. For more information, please see the
575- /// documentation for [`MonitorUpdatingPersister`].
576563 pub fn read_all_channel_monitors_with_updates (
577564 & self ,
578565 ) -> Result <
@@ -584,10 +571,6 @@ where
584571
585572 /// Read a single channel monitor, along with any stored updates for it.
586573 ///
587- /// It is extremely important that your [`KVStoreSync::read`] implementation uses the
588- /// [`io::ErrorKind::NotFound`] variant correctly. For more information, please see the
589- /// documentation for [`MonitorUpdatingPersister`].
590- ///
591574 /// For `monitor_key`, channel storage keys can be the channel's funding [`OutPoint`], with an
592575 /// underscore `_` between txid and index for v1 channels. For example, given:
593576 ///
@@ -781,10 +764,6 @@ where
781764 /// While the reads themselves are performend in parallel, deserializing the
782765 /// [`ChannelMonitor`]s is not. For large [`ChannelMonitor`]s actively used for forwarding,
783766 /// this may substantially limit the parallelism of this method.
784- ///
785- /// It is extremely important that your [`KVStore::read`] implementation uses the
786- /// [`io::ErrorKind::NotFound`] variant correctly. For more information, please see the
787- /// documentation for [`MonitorUpdatingPersister`].
788767 pub async fn read_all_channel_monitors_with_updates (
789768 & self ,
790769 ) -> Result <
@@ -819,10 +798,6 @@ where
819798 /// Because [`FutureSpawner`] requires that the spawned future be `'static` (matching `tokio`
820799 /// and other multi-threaded runtime requirements), this method requires that `self` be an
821800 /// `Arc` that can live for `'static` and be sent and accessed across threads.
822- ///
823- /// It is extremely important that your [`KVStore::read`] implementation uses the
824- /// [`io::ErrorKind::NotFound`] variant correctly. For more information, please see the
825- /// documentation for [`MonitorUpdatingPersister`].
826801 pub async fn read_all_channel_monitors_with_updates_parallel (
827802 self : & Arc < Self > ,
828803 ) -> Result <
@@ -862,10 +837,6 @@ where
862837
863838 /// Read a single channel monitor, along with any stored updates for it.
864839 ///
865- /// It is extremely important that your [`KVStoreSync::read`] implementation uses the
866- /// [`io::ErrorKind::NotFound`] variant correctly. For more information, please see the
867- /// documentation for [`MonitorUpdatingPersister`].
868- ///
869840 /// For `monitor_key`, channel storage keys can be the channel's funding [`OutPoint`], with an
870841 /// underscore `_` between txid and index for v1 channels. For example, given:
871842 ///
@@ -1011,40 +982,37 @@ where
1011982 io:: Error ,
1012983 > {
1013984 let monitor_name = MonitorName :: from_str ( monitor_key) ?;
1014- let read_res = self . maybe_read_monitor ( & monitor_name, monitor_key) . await ?;
1015- let ( block_hash, monitor) = match read_res {
985+ // TODO: After an MSRV bump we should be able to use the pin macro rather than Box::pin
986+ let read_future = Box :: pin ( self . maybe_read_monitor ( & monitor_name, monitor_key) ) ;
987+ let list_future =
988+ Box :: pin ( self . kv_store . list ( CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE , monitor_key) ) ;
989+ let ( read_res, list_res) = TwoFutureJoiner :: new ( read_future, list_future) . await ;
990+ let ( block_hash, monitor) = match read_res? {
1016991 Some ( res) => res,
1017992 None => return Ok ( None ) ,
1018993 } ;
1019994 let mut current_update_id = monitor. get_latest_update_id ( ) ;
1020- // TODO: Parallelize this loop by speculatively reading a batch of updates
1021- loop {
1022- current_update_id = match current_update_id. checked_add ( 1 ) {
1023- Some ( next_update_id) => next_update_id,
1024- None => break ,
1025- } ;
1026- let update_name = UpdateName :: from ( current_update_id) ;
1027- let update = match self . read_monitor_update ( monitor_key, & update_name) . await {
1028- Ok ( update) => update,
1029- Err ( err) if err. kind ( ) == io:: ErrorKind :: NotFound => {
1030- // We can't find any more updates, so we are done.
1031- break ;
1032- } ,
1033- Err ( err) => return Err ( err) ,
1034- } ;
1035-
1036- monitor
1037- . update_monitor ( & update, & self . broadcaster , & self . fee_estimator , & self . logger )
1038- . map_err ( |e| {
1039- log_error ! (
1040- self . logger,
1041- "Monitor update failed. monitor: {} update: {} reason: {:?}" ,
1042- monitor_key,
1043- update_name. as_str( ) ,
1044- e
1045- ) ;
1046- io:: Error :: new ( io:: ErrorKind :: Other , "Monitor update failed" )
1047- } ) ?;
995+ let updates: Result < Vec < _ > , _ > =
996+ list_res?. into_iter ( ) . map ( |name| UpdateName :: new ( name) ) . collect ( ) ;
997+ let mut updates = updates?;
998+ updates. sort_unstable ( ) ;
999+ // TODO: Parallelize this loop
1000+ for update_name in updates {
1001+ if update_name. 0 > current_update_id {
1002+ let update = self . read_monitor_update ( monitor_key, & update_name) . await ?;
1003+ monitor
1004+ . update_monitor ( & update, & self . broadcaster , & self . fee_estimator , & self . logger )
1005+ . map_err ( |e| {
1006+ log_error ! (
1007+ self . logger,
1008+ "Monitor update failed. monitor: {} update: {} reason: {:?}" ,
1009+ monitor_key,
1010+ update_name. as_str( ) ,
1011+ e
1012+ ) ;
1013+ io:: Error :: new ( io:: ErrorKind :: Other , "Monitor update failed" )
1014+ } ) ?;
1015+ }
10481016 }
10491017 Ok ( Some ( ( block_hash, monitor) ) )
10501018 }
@@ -1416,7 +1384,7 @@ impl core::fmt::Display for MonitorName {
14161384/// let monitor_name = "some_monitor_name";
14171385/// let storage_key = format!("channel_monitor_updates/{}/{}", monitor_name, update_name.as_str());
14181386/// ```
1419- #[ derive( Debug ) ]
1387+ #[ derive( Debug , PartialEq , Eq , PartialOrd , Ord ) ]
14201388pub struct UpdateName ( pub u64 , String ) ;
14211389
14221390impl UpdateName {
0 commit comments