From c7ed07ccb1529c04e12aed878ded9855d96d6366 Mon Sep 17 00:00:00 2001 From: Joost Jager Date: Mon, 9 Feb 2026 13:55:24 +0100 Subject: [PATCH 1/3] Extract watch_channel_internal/update_channel_internal from Watch impl Pure refactor: move the bodies of Watch::watch_channel and Watch::update_channel into pub(crate) methods on ChainMonitor, and have the Watch trait methods delegate to them. This prepares for adding deferred mode where the Watch methods will conditionally queue operations instead of executing them immediately. Co-Authored-By: Claude Opus 4.6 --- lightning/src/chain/chainmonitor.rs | 300 +++++++++++++++------------- 1 file changed, 156 insertions(+), 144 deletions(-) diff --git a/lightning/src/chain/chainmonitor.rs b/lightning/src/chain/chainmonitor.rs index 17693f8ca7a..cc1ef92bd35 100644 --- a/lightning/src/chain/chainmonitor.rs +++ b/lightning/src/chain/chainmonitor.rs @@ -1058,6 +1058,160 @@ where Ok(ChannelMonitorUpdateStatus::Completed) } + + fn watch_channel_internal( + &self, channel_id: ChannelId, monitor: ChannelMonitor, + ) -> Result { + let logger = WithChannelMonitor::from(&self.logger, &monitor, None); + let mut monitors = self.monitors.write().unwrap(); + let entry = match monitors.entry(channel_id) { + hash_map::Entry::Occupied(_) => { + log_error!(logger, "Failed to add new channel data: channel monitor for given channel ID is already present"); + return Err(()); + }, + hash_map::Entry::Vacant(e) => e, + }; + log_trace!(logger, "Got new ChannelMonitor"); + let update_id = monitor.get_latest_update_id(); + let mut pending_monitor_updates = Vec::new(); + let persist_res = self.persister.persist_new_channel(monitor.persistence_key(), &monitor); + match persist_res { + ChannelMonitorUpdateStatus::InProgress => { + log_info!(logger, "Persistence of new ChannelMonitor in progress",); + pending_monitor_updates.push(update_id); + }, + ChannelMonitorUpdateStatus::Completed => { + log_info!(logger, "Persistence of new ChannelMonitor completed",); + }, + ChannelMonitorUpdateStatus::UnrecoverableError => { + let err_str = "ChannelMonitor[Update] persistence failed unrecoverably. This indicates we cannot continue normal operation and must shut down."; + log_error!(logger, "{}", err_str); + panic!("{}", err_str); + }, + } + if let Some(ref chain_source) = self.chain_source { + monitor.load_outputs_to_watch(chain_source, &self.logger); + } + entry.insert(MonitorHolder { + monitor, + pending_monitor_updates: Mutex::new(pending_monitor_updates), + }); + Ok(persist_res) + } + + fn update_channel_internal( + &self, channel_id: ChannelId, update: &ChannelMonitorUpdate, + ) -> ChannelMonitorUpdateStatus { + // `ChannelMonitorUpdate`'s `channel_id` is `None` prior to 0.0.121 and all channels in those + // versions are V1-established. For 0.0.121+ the `channel_id` fields is always `Some`. + debug_assert_eq!(update.channel_id.unwrap(), channel_id); + // Update the monitor that watches the channel referred to by the given outpoint. + let monitors = self.monitors.read().unwrap(); + match monitors.get(&channel_id) { + None => { + let logger = WithContext::from(&self.logger, None, Some(channel_id), None); + log_error!(logger, "Failed to update channel monitor: no such monitor registered"); + + // We should never ever trigger this from within ChannelManager. Technically a + // user could use this object with some proxying in between which makes this + // possible, but in tests and fuzzing, this should be a panic. + #[cfg(debug_assertions)] + panic!("ChannelManager generated a channel update for a channel that was not yet registered!"); + #[cfg(not(debug_assertions))] + ChannelMonitorUpdateStatus::InProgress + }, + Some(monitor_state) => { + let monitor = &monitor_state.monitor; + let logger = WithChannelMonitor::from(&self.logger, &monitor, None); + log_trace!(logger, "Updating ChannelMonitor to id {}", update.update_id,); + + // We hold a `pending_monitor_updates` lock through `update_monitor` to ensure we + // have well-ordered updates from the users' point of view. See the + // `pending_monitor_updates` docs for more. + let mut pending_monitor_updates = + monitor_state.pending_monitor_updates.lock().unwrap(); + let update_res = monitor.update_monitor( + update, + &self.broadcaster, + &self.fee_estimator, + &self.logger, + ); + + let update_id = update.update_id; + let persist_res = if update_res.is_err() { + // Even if updating the monitor returns an error, the monitor's state will + // still be changed. Therefore, we should persist the updated monitor despite the error. + // We don't want to persist a `monitor_update` which results in a failure to apply later + // while reading `channel_monitor` with updates from storage. Instead, we should persist + // the entire `channel_monitor` here. + log_warn!(logger, "Failed to update ChannelMonitor. Going ahead and persisting the entire ChannelMonitor"); + self.persister.update_persisted_channel( + monitor.persistence_key(), + None, + monitor, + ) + } else { + self.persister.update_persisted_channel( + monitor.persistence_key(), + Some(update), + monitor, + ) + }; + match persist_res { + ChannelMonitorUpdateStatus::InProgress => { + pending_monitor_updates.push(update_id); + log_debug!( + logger, + "Persistence of ChannelMonitorUpdate id {:?} in progress", + update_id, + ); + }, + ChannelMonitorUpdateStatus::Completed => { + log_debug!( + logger, + "Persistence of ChannelMonitorUpdate id {:?} completed", + update_id, + ); + }, + ChannelMonitorUpdateStatus::UnrecoverableError => { + // Take the monitors lock for writing so that we poison it and any future + // operations going forward fail immediately. + core::mem::drop(pending_monitor_updates); + core::mem::drop(monitors); + let _poison = self.monitors.write().unwrap(); + let err_str = "ChannelMonitor[Update] persistence failed unrecoverably. This indicates we cannot continue normal operation and must shut down."; + log_error!(logger, "{}", err_str); + panic!("{}", err_str); + }, + } + + // We may need to start monitoring for any alternative funding transactions. + if let Some(ref chain_source) = self.chain_source { + for (funding_outpoint, funding_script) in + update.internal_renegotiated_funding_data() + { + log_trace!( + logger, + "Registering renegotiated funding outpoint {} with the filter to monitor confirmations and spends", + funding_outpoint + ); + chain_source.register_tx(&funding_outpoint.txid, &funding_script); + chain_source.register_output(WatchedOutput { + block_hash: None, + outpoint: funding_outpoint, + script_pubkey: funding_script, + }); + } + } + + if update_res.is_err() { + ChannelMonitorUpdateStatus::InProgress + } else { + persist_res + } + }, + } + } } impl< @@ -1272,155 +1426,13 @@ where fn watch_channel( &self, channel_id: ChannelId, monitor: ChannelMonitor, ) -> Result { - let logger = WithChannelMonitor::from(&self.logger, &monitor, None); - let mut monitors = self.monitors.write().unwrap(); - let entry = match monitors.entry(channel_id) { - hash_map::Entry::Occupied(_) => { - log_error!(logger, "Failed to add new channel data: channel monitor for given channel ID is already present"); - return Err(()); - }, - hash_map::Entry::Vacant(e) => e, - }; - log_trace!(logger, "Got new ChannelMonitor"); - let update_id = monitor.get_latest_update_id(); - let mut pending_monitor_updates = Vec::new(); - let persist_res = self.persister.persist_new_channel(monitor.persistence_key(), &monitor); - match persist_res { - ChannelMonitorUpdateStatus::InProgress => { - log_info!(logger, "Persistence of new ChannelMonitor in progress",); - pending_monitor_updates.push(update_id); - }, - ChannelMonitorUpdateStatus::Completed => { - log_info!(logger, "Persistence of new ChannelMonitor completed",); - }, - ChannelMonitorUpdateStatus::UnrecoverableError => { - let err_str = "ChannelMonitor[Update] persistence failed unrecoverably. This indicates we cannot continue normal operation and must shut down."; - log_error!(logger, "{}", err_str); - panic!("{}", err_str); - }, - } - if let Some(ref chain_source) = self.chain_source { - monitor.load_outputs_to_watch(chain_source, &self.logger); - } - entry.insert(MonitorHolder { - monitor, - pending_monitor_updates: Mutex::new(pending_monitor_updates), - }); - Ok(persist_res) + self.watch_channel_internal(channel_id, monitor) } fn update_channel( &self, channel_id: ChannelId, update: &ChannelMonitorUpdate, ) -> ChannelMonitorUpdateStatus { - // `ChannelMonitorUpdate`'s `channel_id` is `None` prior to 0.0.121 and all channels in those - // versions are V1-established. For 0.0.121+ the `channel_id` fields is always `Some`. - debug_assert_eq!(update.channel_id.unwrap(), channel_id); - // Update the monitor that watches the channel referred to by the given outpoint. - let monitors = self.monitors.read().unwrap(); - match monitors.get(&channel_id) { - None => { - let logger = WithContext::from(&self.logger, None, Some(channel_id), None); - log_error!(logger, "Failed to update channel monitor: no such monitor registered"); - - // We should never ever trigger this from within ChannelManager. Technically a - // user could use this object with some proxying in between which makes this - // possible, but in tests and fuzzing, this should be a panic. - #[cfg(debug_assertions)] - panic!("ChannelManager generated a channel update for a channel that was not yet registered!"); - #[cfg(not(debug_assertions))] - ChannelMonitorUpdateStatus::InProgress - }, - Some(monitor_state) => { - let monitor = &monitor_state.monitor; - let logger = WithChannelMonitor::from(&self.logger, &monitor, None); - log_trace!(logger, "Updating ChannelMonitor to id {}", update.update_id,); - - // We hold a `pending_monitor_updates` lock through `update_monitor` to ensure we - // have well-ordered updates from the users' point of view. See the - // `pending_monitor_updates` docs for more. - let mut pending_monitor_updates = - monitor_state.pending_monitor_updates.lock().unwrap(); - let update_res = monitor.update_monitor( - update, - &self.broadcaster, - &self.fee_estimator, - &self.logger, - ); - - let update_id = update.update_id; - let persist_res = if update_res.is_err() { - // Even if updating the monitor returns an error, the monitor's state will - // still be changed. Therefore, we should persist the updated monitor despite the error. - // We don't want to persist a `monitor_update` which results in a failure to apply later - // while reading `channel_monitor` with updates from storage. Instead, we should persist - // the entire `channel_monitor` here. - log_warn!(logger, "Failed to update ChannelMonitor. Going ahead and persisting the entire ChannelMonitor"); - self.persister.update_persisted_channel( - monitor.persistence_key(), - None, - monitor, - ) - } else { - self.persister.update_persisted_channel( - monitor.persistence_key(), - Some(update), - monitor, - ) - }; - match persist_res { - ChannelMonitorUpdateStatus::InProgress => { - pending_monitor_updates.push(update_id); - log_debug!( - logger, - "Persistence of ChannelMonitorUpdate id {:?} in progress", - update_id, - ); - }, - ChannelMonitorUpdateStatus::Completed => { - log_debug!( - logger, - "Persistence of ChannelMonitorUpdate id {:?} completed", - update_id, - ); - }, - ChannelMonitorUpdateStatus::UnrecoverableError => { - // Take the monitors lock for writing so that we poison it and any future - // operations going forward fail immediately. - core::mem::drop(pending_monitor_updates); - core::mem::drop(monitors); - let _poison = self.monitors.write().unwrap(); - let err_str = "ChannelMonitor[Update] persistence failed unrecoverably. This indicates we cannot continue normal operation and must shut down."; - log_error!(logger, "{}", err_str); - panic!("{}", err_str); - }, - } - - // We may need to start monitoring for any alternative funding transactions. - if let Some(ref chain_source) = self.chain_source { - for (funding_outpoint, funding_script) in - update.internal_renegotiated_funding_data() - { - log_trace!( - logger, - "Registering renegotiated funding outpoint {} with the filter to monitor confirmations and spends", - funding_outpoint - ); - chain_source.register_tx(&funding_outpoint.txid, &funding_script); - chain_source.register_output(WatchedOutput { - block_hash: None, - outpoint: funding_outpoint, - script_pubkey: funding_script, - }); - } - } - - if update_res.is_err() { - ChannelMonitorUpdateStatus::InProgress - } else { - persist_res - } - }, - } + self.update_channel_internal(channel_id, update) } fn release_pending_monitor_events( From 77380ac85cf091ec360095a7835927c61e2cea55 Mon Sep 17 00:00:00 2001 From: Joost Jager Date: Mon, 9 Feb 2026 14:46:35 +0100 Subject: [PATCH 2/3] Add deferred bool to ChainMonitor Add a `deferred` parameter to `ChainMonitor::new` and `ChainMonitor::new_async_beta`. When set to true, the Watch trait methods (watch_channel and update_channel) will unimplemented!() for now. All existing callers pass false to preserve current behavior. Co-Authored-By: Claude Opus 4.6 --- fuzz/src/chanmon_consistency.rs | 1 + fuzz/src/full_stack.rs | 1 + fuzz/src/lsps_message.rs | 1 + lightning/src/chain/chainmonitor.rs | 15 +++++++++++++-- lightning/src/ln/chanmon_update_fail_tests.rs | 1 + lightning/src/ln/channelmanager.rs | 4 ++-- lightning/src/util/test_utils.rs | 1 + 7 files changed, 20 insertions(+), 4 deletions(-) diff --git a/fuzz/src/chanmon_consistency.rs b/fuzz/src/chanmon_consistency.rs index 202488d9777..a04f5f69a49 100644 --- a/fuzz/src/chanmon_consistency.rs +++ b/fuzz/src/chanmon_consistency.rs @@ -285,6 +285,7 @@ impl TestChainMonitor { Arc::clone(&persister), Arc::clone(&keys), keys.get_peer_storage_key(), + false, )), logger, keys, diff --git a/fuzz/src/full_stack.rs b/fuzz/src/full_stack.rs index 8c887ed623a..aacaef443ce 100644 --- a/fuzz/src/full_stack.rs +++ b/fuzz/src/full_stack.rs @@ -599,6 +599,7 @@ pub fn do_test(mut data: &[u8], logger: &Arc) { Arc::new(TestPersister { update_ret: Mutex::new(ChannelMonitorUpdateStatus::Completed) }), Arc::clone(&keys_manager), keys_manager.get_peer_storage_key(), + false, )); let network = Network::Bitcoin; diff --git a/fuzz/src/lsps_message.rs b/fuzz/src/lsps_message.rs index 547a27b70ee..d01e5632025 100644 --- a/fuzz/src/lsps_message.rs +++ b/fuzz/src/lsps_message.rs @@ -59,6 +59,7 @@ pub fn do_test(data: &[u8]) { Arc::clone(&kv_store), Arc::clone(&keys_manager), keys_manager.get_peer_storage_key(), + false, )); let best_block = BestBlock::from_network(network); let params = ChainParameters { network, best_block }; diff --git a/lightning/src/chain/chainmonitor.rs b/lightning/src/chain/chainmonitor.rs index cc1ef92bd35..b009e682c7a 100644 --- a/lightning/src/chain/chainmonitor.rs +++ b/lightning/src/chain/chainmonitor.rs @@ -371,6 +371,9 @@ pub struct ChainMonitor< #[cfg(peer_storage)] our_peerstorage_encryption_key: PeerStorageKey, + + /// When `true`, [`chain::Watch`] operations are queued rather than executed immediately. + deferred: bool, } impl< @@ -397,7 +400,7 @@ where pub fn new_async_beta( chain_source: Option, broadcaster: T, logger: L, feeest: F, persister: MonitorUpdatingPersisterAsync, _entropy_source: ES, - _our_peerstorage_encryption_key: PeerStorageKey, + _our_peerstorage_encryption_key: PeerStorageKey, deferred: bool, ) -> Self { let event_notifier = Arc::new(Notifier::new()); Self { @@ -414,6 +417,7 @@ where pending_send_only_events: Mutex::new(Vec::new()), #[cfg(peer_storage)] our_peerstorage_encryption_key: _our_peerstorage_encryption_key, + deferred, } } } @@ -603,7 +607,7 @@ where /// [`ChannelManager`]: crate::ln::channelmanager::ChannelManager pub fn new( chain_source: Option, broadcaster: T, logger: L, feeest: F, persister: P, - _entropy_source: ES, _our_peerstorage_encryption_key: PeerStorageKey, + _entropy_source: ES, _our_peerstorage_encryption_key: PeerStorageKey, deferred: bool, ) -> Self { Self { monitors: RwLock::new(new_hash_map()), @@ -619,6 +623,7 @@ where pending_send_only_events: Mutex::new(Vec::new()), #[cfg(peer_storage)] our_peerstorage_encryption_key: _our_peerstorage_encryption_key, + deferred, } } @@ -1426,12 +1431,18 @@ where fn watch_channel( &self, channel_id: ChannelId, monitor: ChannelMonitor, ) -> Result { + if self.deferred { + unimplemented!(); + } self.watch_channel_internal(channel_id, monitor) } fn update_channel( &self, channel_id: ChannelId, update: &ChannelMonitorUpdate, ) -> ChannelMonitorUpdateStatus { + if self.deferred { + unimplemented!(); + } self.update_channel_internal(channel_id, update) } diff --git a/lightning/src/ln/chanmon_update_fail_tests.rs b/lightning/src/ln/chanmon_update_fail_tests.rs index 33507e4c933..6e80e8642dc 100644 --- a/lightning/src/ln/chanmon_update_fail_tests.rs +++ b/lightning/src/ln/chanmon_update_fail_tests.rs @@ -4913,6 +4913,7 @@ fn native_async_persist() { native_async_persister, Arc::clone(&keys_manager), keys_manager.get_peer_storage_key(), + false, ); // Write the initial ChannelMonitor async, testing primarily that the `MonitorEvent::Completed` diff --git a/lightning/src/ln/channelmanager.rs b/lightning/src/ln/channelmanager.rs index 8a84de69cfc..4dc9f94677f 100644 --- a/lightning/src/ln/channelmanager.rs +++ b/lightning/src/ln/channelmanager.rs @@ -20690,7 +20690,7 @@ pub mod bench { let seed_a = [1u8; 32]; let keys_manager_a = KeysManager::new(&seed_a, 42, 42, true); - let chain_monitor_a = ChainMonitor::new(None, &tx_broadcaster, &logger_a, &fee_estimator, &persister_a, &keys_manager_a, keys_manager_a.get_peer_storage_key()); + let chain_monitor_a = ChainMonitor::new(None, &tx_broadcaster, &logger_a, &fee_estimator, &persister_a, &keys_manager_a, keys_manager_a.get_peer_storage_key(), false); let node_a = ChannelManager::new(&fee_estimator, &chain_monitor_a, &tx_broadcaster, &router, &message_router, &logger_a, &keys_manager_a, &keys_manager_a, &keys_manager_a, config.clone(), ChainParameters { network, best_block: BestBlock::from_network(network), @@ -20700,7 +20700,7 @@ pub mod bench { let logger_b = test_utils::TestLogger::with_id("node a".to_owned()); let seed_b = [2u8; 32]; let keys_manager_b = KeysManager::new(&seed_b, 42, 42, true); - let chain_monitor_b = ChainMonitor::new(None, &tx_broadcaster, &logger_a, &fee_estimator, &persister_b, &keys_manager_b, keys_manager_b.get_peer_storage_key()); + let chain_monitor_b = ChainMonitor::new(None, &tx_broadcaster, &logger_a, &fee_estimator, &persister_b, &keys_manager_b, keys_manager_b.get_peer_storage_key(), false); let node_b = ChannelManager::new(&fee_estimator, &chain_monitor_b, &tx_broadcaster, &router, &message_router, &logger_b, &keys_manager_b, &keys_manager_b, &keys_manager_b, config.clone(), ChainParameters { network, best_block: BestBlock::from_network(network), diff --git a/lightning/src/util/test_utils.rs b/lightning/src/util/test_utils.rs index bcf39fde482..667cc2ae850 100644 --- a/lightning/src/util/test_utils.rs +++ b/lightning/src/util/test_utils.rs @@ -537,6 +537,7 @@ impl<'a> TestChainMonitor<'a> { persister, keys_manager, keys_manager.get_peer_storage_key(), + false, ), keys_manager, expect_channel_force_closed: Mutex::new(None), From 7eb382ce50c3ce0a4542fd70ba2d814b163a7382 Mon Sep 17 00:00:00 2001 From: Joost Jager Date: Mon, 9 Feb 2026 14:48:31 +0100 Subject: [PATCH 3/3] Implement deferred monitor write queueing and flushing Replace the unimplemented!() stubs with a full deferred write implementation. When ChainMonitor has deferred=true, Watch trait operations queue PendingMonitorOp entries instead of executing immediately. A new flush() method drains the queue and forwards operations to the internal watch/update methods, calling channel_monitor_updated on Completed status. The BackgroundProcessor is updated to capture pending_operation_count before persisting the ChannelManager, then flush that many writes afterward - ensuring monitor writes happen in the correct order relative to manager persistence. Key changes: - Add PendingMonitorOp enum and pending_ops queue to ChainMonitor - Implement flush() and pending_operation_count() public methods - Integrate flush calls in BackgroundProcessor (both sync and async) - Add TestChainMonitor::new_deferred, flush helpers, and auto-flush in release_pending_monitor_events for test compatibility - Add create_node_cfgs_deferred for deferred-mode test networks - Add unit tests for queue/flush mechanics and full payment flow Co-Authored-By: Claude Opus 4.6 --- lightning-background-processor/src/lib.rs | 66 ++- lightning/src/chain/chainmonitor.rs | 492 +++++++++++++++++++++- lightning/src/ln/functional_test_utils.rs | 60 ++- lightning/src/util/test_utils.rs | 63 ++- 4 files changed, 662 insertions(+), 19 deletions(-) diff --git a/lightning-background-processor/src/lib.rs b/lightning-background-processor/src/lib.rs index dce803e6dea..bf35d7490c1 100644 --- a/lightning-background-processor/src/lib.rs +++ b/lightning-background-processor/src/lib.rs @@ -1120,6 +1120,10 @@ where let mut futures = Joiner::new(); + // Capture the pending count before persisting. Only this many writes will be + // flushed afterward, so that updates arriving after persist aren't included. + let pending_monitor_writes = chain_monitor.get_cm().pending_operation_count(); + if channel_manager.get_cm().get_and_clear_needs_persistence() { log_trace!(logger, "Persisting ChannelManager..."); @@ -1317,6 +1321,10 @@ where res?; } + // Flush monitor writes that were pending before we persisted. New updates that + // arrived after are left for the next iteration. + chain_monitor.get_cm().flush(pending_monitor_writes, &logger); + match check_and_reset_sleeper(&mut last_onion_message_handler_call, || { sleeper(ONION_MESSAGE_HANDLER_TIMER) }) { @@ -1373,6 +1381,7 @@ where // After we exit, ensure we persist the ChannelManager one final time - this avoids // some races where users quit while channel updates were in-flight, with // ChannelMonitor update(s) persisted without a corresponding ChannelManager update. + let pending_monitor_writes = chain_monitor.get_cm().pending_operation_count(); kv_store .write( CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, @@ -1381,6 +1390,10 @@ where channel_manager.get_cm().encode(), ) .await?; + + // Flush monitor writes that were pending before final persistence. + chain_monitor.get_cm().flush(pending_monitor_writes, &logger); + if let Some(ref scorer) = scorer { kv_store .write( @@ -1571,7 +1584,8 @@ impl BackgroundProcessor { ) -> Self where L::Target: 'static + Logger, - M::Target: AChainMonitor::Signer, Logger = L>, + M::Target: + 'static + AChainMonitor::Signer, Logger = L>, CM::Target: AChannelManager, OM::Target: AOnionMessenger, PM::Target: APeerManager, @@ -1684,6 +1698,11 @@ impl BackgroundProcessor { channel_manager.get_cm().timer_tick_occurred(); last_freshness_call = Instant::now(); } + + // Capture the pending count before persisting. Only this many writes will be + // flushed afterward, so that updates arriving after persist aren't included. + let pending_monitor_writes = chain_monitor.get_cm().pending_operation_count(); + if channel_manager.get_cm().get_and_clear_needs_persistence() { log_trace!(logger, "Persisting ChannelManager..."); (kv_store.write( @@ -1695,6 +1714,10 @@ impl BackgroundProcessor { log_trace!(logger, "Done persisting ChannelManager."); } + // Flush monitor writes that were pending before we persisted. New updates + // that arrived after are left for the next iteration. + chain_monitor.get_cm().flush(pending_monitor_writes, &logger); + if let Some(liquidity_manager) = liquidity_manager.as_ref() { log_trace!(logger, "Persisting LiquidityManager..."); let _ = liquidity_manager.get_lm().persist().map_err(|e| { @@ -1809,12 +1832,17 @@ impl BackgroundProcessor { // After we exit, ensure we persist the ChannelManager one final time - this avoids // some races where users quit while channel updates were in-flight, with // ChannelMonitor update(s) persisted without a corresponding ChannelManager update. + let pending_monitor_writes = chain_monitor.get_cm().pending_operation_count(); kv_store.write( CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, CHANNEL_MANAGER_PERSISTENCE_KEY, channel_manager.get_cm().encode(), )?; + + // Flush monitor writes that were pending before final persistence. + chain_monitor.get_cm().flush(pending_monitor_writes, &logger); + if let Some(ref scorer) = scorer { kv_store.write( SCORER_PERSISTENCE_PRIMARY_NAMESPACE, @@ -1896,9 +1924,10 @@ mod tests { use bitcoin::transaction::{Transaction, TxOut}; use bitcoin::{Amount, ScriptBuf, Txid}; use core::sync::atomic::{AtomicBool, Ordering}; + use lightning::chain::chainmonitor::ChainMonitor as LdkChainMonitor; use lightning::chain::channelmonitor::ANTI_REORG_DELAY; use lightning::chain::transaction::OutPoint; - use lightning::chain::{chainmonitor, BestBlock, Confirm, Filter}; + use lightning::chain::{BestBlock, Confirm, Filter}; use lightning::events::{Event, PathFailure, ReplayEvent}; use lightning::ln::channelmanager; use lightning::ln::channelmanager::{ @@ -2008,7 +2037,7 @@ mod tests { Arc, >; - type ChainMonitor = chainmonitor::ChainMonitor< + type ChainMonitor = LdkChainMonitor< InMemorySigner, Arc, Arc, @@ -2436,7 +2465,7 @@ mod tests { let now = Duration::from_secs(genesis_block.header.time as u64); let keys_manager = Arc::new(KeysManager::new(&seed, now.as_secs(), now.subsec_nanos(), true)); - let chain_monitor = Arc::new(chainmonitor::ChainMonitor::new( + let chain_monitor = Arc::new(LdkChainMonitor::new( Some(Arc::clone(&chain_source)), Arc::clone(&tx_broadcaster), Arc::clone(&logger), @@ -2444,6 +2473,7 @@ mod tests { Arc::clone(&kv_store), Arc::clone(&keys_manager), keys_manager.get_peer_storage_key(), + true, )); let best_block = BestBlock::from_network(network); let params = ChainParameters { network, best_block }; @@ -2580,12 +2610,20 @@ mod tests { tx.clone(), ) .unwrap(); + // Flush deferred monitor operations so messages aren't held back + $node_a + .chain_monitor + .flush($node_a.chain_monitor.pending_operation_count(), &$node_a.logger); let msg_a = get_event_msg!( $node_a, MessageSendEvent::SendFundingCreated, $node_b.node.get_our_node_id() ); $node_b.node.handle_funding_created($node_a.node.get_our_node_id(), &msg_a); + // Flush node_b's monitor so it releases the FundingSigned message + $node_b + .chain_monitor + .flush($node_b.chain_monitor.pending_operation_count(), &$node_b.logger); get_event!($node_b, Event::ChannelPending); let msg_b = get_event_msg!( $node_b, @@ -2593,6 +2631,10 @@ mod tests { $node_a.node.get_our_node_id() ); $node_a.node.handle_funding_signed($node_b.node.get_our_node_id(), &msg_b); + // Flush node_a's monitor for the final update + $node_a + .chain_monitor + .flush($node_a.chain_monitor.pending_operation_count(), &$node_a.logger); get_event!($node_a, Event::ChannelPending); tx }}; @@ -3039,11 +3081,23 @@ mod tests { .node .funding_transaction_generated(temporary_channel_id, node_1_id, funding_tx.clone()) .unwrap(); + // Flush node_0's deferred monitor operations so the FundingCreated message is released + nodes[0] + .chain_monitor + .flush(nodes[0].chain_monitor.pending_operation_count(), &nodes[0].logger); let msg_0 = get_event_msg!(nodes[0], MessageSendEvent::SendFundingCreated, node_1_id); nodes[1].node.handle_funding_created(node_0_id, &msg_0); + // Flush node_1's deferred monitor operations so events and FundingSigned are released + nodes[1] + .chain_monitor + .flush(nodes[1].chain_monitor.pending_operation_count(), &nodes[1].logger); get_event!(nodes[1], Event::ChannelPending); let msg_1 = get_event_msg!(nodes[1], MessageSendEvent::SendFundingSigned, node_0_id); nodes[0].node.handle_funding_signed(node_1_id, &msg_1); + // Flush node_0's monitor for the funding_signed update + nodes[0] + .chain_monitor + .flush(nodes[0].chain_monitor.pending_operation_count(), &nodes[0].logger); channel_pending_recv .recv_timeout(EVENT_DEADLINE) .expect("ChannelPending not handled within deadline"); @@ -3104,6 +3158,10 @@ mod tests { error_message.to_string(), ) .unwrap(); + // Flush the monitor update triggered by force close so the commitment tx is broadcasted + nodes[0] + .chain_monitor + .flush(nodes[0].chain_monitor.pending_operation_count(), &nodes[0].logger); let commitment_tx = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().pop().unwrap(); confirm_transaction_depth(&mut nodes[0], &commitment_tx, BREAKDOWN_TIMEOUT as u32); diff --git a/lightning/src/chain/chainmonitor.rs b/lightning/src/chain/chainmonitor.rs index b009e682c7a..8b2501a9df6 100644 --- a/lightning/src/chain/chainmonitor.rs +++ b/lightning/src/chain/chainmonitor.rs @@ -60,12 +60,21 @@ use crate::util::persist::{KVStore, MonitorName, MonitorUpdatingPersisterAsync}; use crate::util::ser::{VecWriter, Writeable}; use crate::util::wakers::{Future, Notifier}; +use alloc::collections::VecDeque; use alloc::sync::Arc; #[cfg(peer_storage)] use core::iter::Cycle; use core::ops::Deref; use core::sync::atomic::{AtomicUsize, Ordering}; +/// A pending operation queued for later execution when `ChainMonitor` is in deferred mode. +enum PendingMonitorOp { + /// A new monitor to insert and persist. + NewMonitor { channel_id: ChannelId, monitor: ChannelMonitor, update_id: u64 }, + /// An update to apply and persist. + Update { channel_id: ChannelId, update: ChannelMonitorUpdate }, +} + /// `Persist` defines behavior for persisting channel monitors: this could mean /// writing once to disk, and/or uploading to one or more backup services. /// @@ -374,6 +383,29 @@ pub struct ChainMonitor< /// When `true`, [`chain::Watch`] operations are queued rather than executed immediately. deferred: bool, + /// Queued monitor operations awaiting flush. Unused when `deferred` is `false`. + /// + /// # Locking order with `monitors` + /// + /// The consistent lock order is `pending_ops` **before** `monitors`. + /// + /// [`watch_channel`] holds `pending_ops.lock()` **then** `monitors.read()` to atomically + /// check for duplicate channel IDs in both the pending queue and the flushed set. + /// + /// [`flush`] holds `pending_ops.lock()` across [`watch_channel_internal`] (which acquires + /// `monitors.write()`) for [`NewMonitor`] ops, ensuring mutual exclusion with + /// `watch_channel`: both acquire `pending_ops` first, so they serialize on that lock. + /// + /// For [`Update`] ops, `pending_ops` is released before calling + /// [`update_channel_internal`] so that concurrent `update_channel` queuing is not blocked. + /// + /// [`watch_channel`]: chain::Watch::watch_channel + /// [`flush`]: Self::flush + /// [`watch_channel_internal`]: Self::watch_channel_internal + /// [`update_channel_internal`]: Self::update_channel_internal + /// [`NewMonitor`]: PendingMonitorOp::NewMonitor + /// [`Update`]: PendingMonitorOp::Update + pending_ops: Mutex>>, } impl< @@ -418,6 +450,7 @@ where #[cfg(peer_storage)] our_peerstorage_encryption_key: _our_peerstorage_encryption_key, deferred, + pending_ops: Mutex::new(VecDeque::new()), } } } @@ -624,6 +657,7 @@ where #[cfg(peer_storage)] our_peerstorage_encryption_key: _our_peerstorage_encryption_key, deferred, + pending_ops: Mutex::new(VecDeque::new()), } } @@ -1217,6 +1251,90 @@ where }, } } + + /// Returns the number of pending monitor operations queued for later execution. + pub fn pending_operation_count(&self) -> usize { + self.pending_ops.lock().unwrap().len() + } + + /// Flushes up to `count` pending monitor operations. + /// + /// For both `NewMonitor` and `Update` variants, the operation is forwarded to the + /// internal watch/update methods. If the result is [`Completed`], + /// [`ChainMonitor::channel_monitor_updated`] is called immediately so the + /// [`ChannelManager`] can release any held messages. For [`InProgress`], + /// `channel_monitor_updated` is not called — the async persister will signal completion + /// later. [`UnrecoverableError`] panics. + /// + /// For `NewMonitor`, an `Err(())` from `watch_channel_internal` (e.g. duplicate + /// channel) stops processing. The monitor is consumed and cannot be retried, but + /// remaining operations are left in the queue. Note that any subsequent `Update` ops + /// for the same channel will also fail when flushed, since no monitor was inserted. + /// + /// Returns early if the queue empties before `count` operations have been processed. + /// + /// See the `pending_ops` field documentation for lock ordering details. + /// + /// [`Completed`]: ChannelMonitorUpdateStatus::Completed + /// [`InProgress`]: ChannelMonitorUpdateStatus::InProgress + /// [`UnrecoverableError`]: ChannelMonitorUpdateStatus::UnrecoverableError + /// [`ChannelManager`]: crate::ln::channelmanager::ChannelManager + pub fn flush(&self, count: usize, logger: &L) { + if count > 0 { + log_trace!(logger, "Flushing up to {} monitor writes", count); + } + for _ in 0..count { + let mut queue = self.pending_ops.lock().unwrap(); + let op = match queue.pop_front() { + Some(op) => op, + None => return, + }; + + let (channel_id, update_id, status) = match op { + PendingMonitorOp::NewMonitor { channel_id, monitor, update_id } => { + // Hold `pending_ops` across the internal call so that + // `watch_channel` (which checks `monitors` + `pending_ops` + // atomically) cannot race with this insertion. + match self.watch_channel_internal(channel_id, monitor) { + Ok(status) => { + drop(queue); + (channel_id, update_id, status) + }, + Err(()) => { + drop(queue); + log_error!(logger, "watch_channel failed for channel {}", channel_id); + return; + }, + } + }, + PendingMonitorOp::Update { channel_id, update } => { + // Release `pending_ops` before the internal call so that + // concurrent `update_channel` queuing is not blocked. + drop(queue); + let update_id = update.update_id; + let status = self.update_channel_internal(channel_id, &update); + (channel_id, update_id, status) + }, + }; + + match status { + ChannelMonitorUpdateStatus::Completed => { + if let Err(e) = self.channel_monitor_updated(channel_id, update_id) { + log_error!( + logger, + "channel_monitor_updated failed for channel {}: {:?}", + channel_id, + e + ); + } + }, + ChannelMonitorUpdateStatus::InProgress => {}, + ChannelMonitorUpdateStatus::UnrecoverableError => { + panic!("UnrecoverableError during monitor operation"); + }, + } + } + } } impl< @@ -1432,18 +1550,39 @@ where &self, channel_id: ChannelId, monitor: ChannelMonitor, ) -> Result { if self.deferred { - unimplemented!(); + let update_id = monitor.get_latest_update_id(); + // Atomically check for duplicates in both the pending queue and the + // flushed monitor set. Lock order: `pending_ops` before `monitors` + // (see `pending_ops` field doc). + let mut pending_ops = self.pending_ops.lock().unwrap(); + let monitors = self.monitors.read().unwrap(); + if monitors.contains_key(&channel_id) { + return Err(()); + } + let already_pending = pending_ops.iter().any(|op| match op { + PendingMonitorOp::NewMonitor { channel_id: id, .. } => *id == channel_id, + _ => false, + }); + if already_pending { + return Err(()); + } + pending_ops.push_back(PendingMonitorOp::NewMonitor { channel_id, monitor, update_id }); + Ok(ChannelMonitorUpdateStatus::InProgress) + } else { + self.watch_channel_internal(channel_id, monitor) } - self.watch_channel_internal(channel_id, monitor) } fn update_channel( &self, channel_id: ChannelId, update: &ChannelMonitorUpdate, ) -> ChannelMonitorUpdateStatus { if self.deferred { - unimplemented!(); + let mut pending_ops = self.pending_ops.lock().unwrap(); + pending_ops.push_back(PendingMonitorOp::Update { channel_id, update: update.clone() }); + ChannelMonitorUpdateStatus::InProgress + } else { + self.update_channel_internal(channel_id, update) } - self.update_channel_internal(channel_id, update) } fn release_pending_monitor_events( @@ -1573,12 +1712,16 @@ where #[cfg(test)] mod tests { + use super::ChainMonitor; use crate::chain::channelmonitor::ANTI_REORG_DELAY; use crate::chain::{ChannelMonitorUpdateStatus, Watch}; use crate::events::{ClosureReason, Event}; use crate::ln::functional_test_utils::*; use crate::ln::msgs::{BaseMessageHandler, ChannelMessageHandler, MessageSendEvent}; + use crate::ln::types::ChannelId; use crate::{expect_payment_path_successful, get_event_msg}; + use bitcoin::hash_types::Txid; + use bitcoin::secp256k1::PublicKey; const CHAINSYNC_MONITOR_PARTITION_FACTOR: u32 = 5; @@ -1831,4 +1974,345 @@ mod tests { }) .is_err()); } + + // ==================== Deferred mode tests ==================== + + use crate::chain::channelmonitor::ChannelMonitorUpdate; + use crate::chain::transaction::OutPoint; + use crate::ln::chan_utils::{ + ChannelTransactionParameters, CounterpartyChannelTransactionParameters, + HolderCommitmentTransaction, + }; + use crate::ln::channel_keys::{DelayedPaymentBasepoint, HtlcBasepoint, RevocationBasepoint}; + use crate::ln::script::ShutdownScript; + use crate::sign::{ChannelSigner, InMemorySigner, NodeSigner}; + use crate::types::features::ChannelTypeFeatures; + use crate::util::dyn_signer::DynSigner; + use crate::util::test_channel_signer::TestChannelSigner; + use crate::util::test_utils::{ + TestBroadcaster, TestChainSource, TestFeeEstimator, TestKeysInterface, TestLogger, + TestPersister, + }; + use bitcoin::hashes::Hash; + use bitcoin::script::ScriptBuf; + use bitcoin::secp256k1::{Secp256k1, SecretKey}; + use bitcoin::Network; + + /// Concrete `ChainMonitor` type wired to the standard test utilities in deferred mode. + type TestDeferredChainMonitor<'a> = ChainMonitor< + TestChannelSigner, + &'a TestChainSource, + &'a TestBroadcaster, + &'a TestFeeEstimator, + &'a TestLogger, + &'a TestPersister, + &'a TestKeysInterface, + >; + + /// Creates a minimal `ChannelMonitorUpdate` with no actual update steps. + fn dummy_update(update_id: u64, channel_id: ChannelId) -> ChannelMonitorUpdate { + ChannelMonitorUpdate { updates: vec![], update_id, channel_id: Some(channel_id) } + } + + /// Creates a minimal `ChannelMonitor` for the given `channel_id`. + fn dummy_monitor( + channel_id: ChannelId, + ) -> crate::chain::channelmonitor::ChannelMonitor { + let secp_ctx = Secp256k1::new(); + let dummy_key = + PublicKey::from_secret_key(&secp_ctx, &SecretKey::from_slice(&[42; 32]).unwrap()); + let keys = InMemorySigner::new( + SecretKey::from_slice(&[41; 32]).unwrap(), + SecretKey::from_slice(&[41; 32]).unwrap(), + SecretKey::from_slice(&[41; 32]).unwrap(), + SecretKey::from_slice(&[41; 32]).unwrap(), + true, + SecretKey::from_slice(&[41; 32]).unwrap(), + SecretKey::from_slice(&[41; 32]).unwrap(), + [41; 32], + [0; 32], + [0; 32], + ); + let counterparty_pubkeys = crate::ln::chan_utils::ChannelPublicKeys { + funding_pubkey: dummy_key, + revocation_basepoint: RevocationBasepoint::from(dummy_key), + payment_point: dummy_key, + delayed_payment_basepoint: DelayedPaymentBasepoint::from(dummy_key), + htlc_basepoint: HtlcBasepoint::from(dummy_key), + }; + let funding_outpoint = OutPoint { txid: Txid::all_zeros(), index: u16::MAX }; + let channel_parameters = ChannelTransactionParameters { + holder_pubkeys: keys.pubkeys(&secp_ctx), + holder_selected_contest_delay: 66, + is_outbound_from_holder: true, + counterparty_parameters: Some(CounterpartyChannelTransactionParameters { + pubkeys: counterparty_pubkeys, + selected_contest_delay: 67, + }), + funding_outpoint: Some(funding_outpoint), + splice_parent_funding_txid: None, + channel_type_features: ChannelTypeFeatures::only_static_remote_key(), + channel_value_satoshis: 0, + }; + let shutdown_script = ShutdownScript::new_p2wpkh_from_pubkey(dummy_key); + let best_block = crate::chain::BestBlock::from_network(Network::Testnet); + let signer = TestChannelSigner::new(DynSigner::new(keys)); + crate::chain::channelmonitor::ChannelMonitor::new( + secp_ctx, + signer, + Some(shutdown_script.into_inner()), + 0, + &ScriptBuf::new(), + &channel_parameters, + true, + 0, + HolderCommitmentTransaction::dummy(0, funding_outpoint, Vec::new()), + best_block, + dummy_key, + channel_id, + false, + ) + } + + fn create_deferred_chain_monitor<'a>( + chain_source: &'a TestChainSource, broadcaster: &'a TestBroadcaster, + logger: &'a TestLogger, fee_est: &'a TestFeeEstimator, persister: &'a TestPersister, + keys: &'a TestKeysInterface, + ) -> TestDeferredChainMonitor<'a> { + ChainMonitor::new( + Some(chain_source), + broadcaster, + logger, + fee_est, + persister, + keys, + keys.get_peer_storage_key(), + true, + ) + } + + /// Tests queueing and flushing of both `watch_channel` and `update_channel` operations + /// when `ChainMonitor` is in deferred mode, verifying that operations flow through to + /// `Persist` and that `channel_monitor_updated` is called on `Completed` status. + #[test] + fn test_queue_and_flush() { + let broadcaster = TestBroadcaster::new(Network::Testnet); + let fee_est = TestFeeEstimator::new(253); + let logger = TestLogger::new(); + let persister = TestPersister::new(); + let chain_source = TestChainSource::new(Network::Testnet); + let keys = TestKeysInterface::new(&[0; 32], Network::Testnet); + let deferred = create_deferred_chain_monitor( + &chain_source, + &broadcaster, + &logger, + &fee_est, + &persister, + &keys, + ); + + // Queue starts empty. + assert_eq!(deferred.pending_operation_count(), 0); + + // Queue a watch_channel, verifying InProgress status. + let chan = ChannelId::from_bytes([1u8; 32]); + let status = Watch::watch_channel(&deferred, chan, dummy_monitor(chan)); + assert_eq!(status, Ok(ChannelMonitorUpdateStatus::InProgress)); + assert_eq!(deferred.pending_operation_count(), 1); + + // Nothing persisted yet — operations are only queued. + assert!(persister.new_channel_persistences.lock().unwrap().is_empty()); + + // Queue two updates after the watch. Update IDs must be sequential (starting + // from 1 since the initial monitor has update_id 0). + assert_eq!( + Watch::update_channel(&deferred, chan, &dummy_update(1, chan)), + ChannelMonitorUpdateStatus::InProgress + ); + assert_eq!( + Watch::update_channel(&deferred, chan, &dummy_update(2, chan)), + ChannelMonitorUpdateStatus::InProgress + ); + assert_eq!(deferred.pending_operation_count(), 3); + + // Flush 2 of 3: persist_new_channel returns Completed (triggers + // channel_monitor_updated), update_persisted_channel returns InProgress (does not). + persister.set_update_ret(ChannelMonitorUpdateStatus::Completed); + persister.set_update_ret(ChannelMonitorUpdateStatus::InProgress); + deferred.flush(2, &&logger); + + assert_eq!(deferred.pending_operation_count(), 1); + + // persist_new_channel was called for the watch. + assert_eq!(persister.new_channel_persistences.lock().unwrap().len(), 1); + + // Because persist_new_channel returned Completed, channel_monitor_updated was called, + // so update_id 0 should no longer be pending. + let pending = deferred.list_pending_monitor_updates(); + #[cfg(not(c_bindings))] + let pending_for_chan = pending.get(&chan).unwrap(); + #[cfg(c_bindings)] + let pending_for_chan = &pending.iter().find(|(chan_id, _)| *chan_id == chan).unwrap().1; + assert!(!pending_for_chan.contains(&0)); + + // update_persisted_channel was called for update_id 1, and because it returned + // InProgress, update_id 1 remains pending. + let monitor_name = deferred.get_monitor(chan).unwrap().persistence_key(); + assert!(persister + .offchain_monitor_updates + .lock() + .unwrap() + .get(&monitor_name) + .unwrap() + .contains(&1)); + assert!(pending_for_chan.contains(&1)); + + // Flush remaining: update_persisted_channel returns Completed (default), triggers + // channel_monitor_updated. + deferred.flush(1, &&logger); + assert_eq!(deferred.pending_operation_count(), 0); + + // update_persisted_channel was called for update_id 2. + assert!(persister + .offchain_monitor_updates + .lock() + .unwrap() + .get(&monitor_name) + .unwrap() + .contains(&2)); + + // update_id 1 is still pending from the InProgress earlier, but update_id 2 was + // completed in this flush so it is no longer pending. + let pending = deferred.list_pending_monitor_updates(); + #[cfg(not(c_bindings))] + let pending_for_chan = pending.get(&chan).unwrap(); + #[cfg(c_bindings)] + let pending_for_chan = &pending.iter().find(|(chan_id, _)| *chan_id == chan).unwrap().1; + assert!(pending_for_chan.contains(&1)); + assert!(!pending_for_chan.contains(&2)); + + // Flushing an empty queue is a no-op. + let persist_count_before = persister.new_channel_persistences.lock().unwrap().len(); + deferred.flush(5, &&logger); + assert_eq!(persister.new_channel_persistences.lock().unwrap().len(), persist_count_before); + } + + /// Tests that `ChainMonitor` in deferred mode properly defers `watch_channel` and + /// `update_channel` operations until `flush()` is called, using standard test + /// infrastructure and a complete channel open + payment flow. + #[test] + fn test_deferred_monitor_payment() { + let chanmon_cfgs = create_chanmon_cfgs(2); + let node_cfgs = create_node_cfgs_deferred(2, &chanmon_cfgs); + let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]); + let nodes = create_network(2, &node_cfgs, &node_chanmgrs); + + let node_a_id = nodes[0].node.get_our_node_id(); + let node_b_id = nodes[1].node.get_our_node_id(); + let chain_monitor_a = &nodes[0].chain_monitor.chain_monitor; + let chain_monitor_b = &nodes[1].chain_monitor.chain_monitor; + + // ===== Open unannounced channel with manual funding to test deferred watch_channel ===== + assert_eq!(chain_monitor_a.pending_operation_count(), 0); + + let mut no_announce_cfg = nodes[0].node.get_current_config(); + no_announce_cfg.channel_handshake_config.announce_for_forwarding = false; + nodes[0] + .node + .create_channel(node_b_id, 100_000, 10_001, 42, None, Some(no_announce_cfg)) + .unwrap(); + let open_channel = get_event_msg!(nodes[0], MessageSendEvent::SendOpenChannel, node_b_id); + nodes[1].node.handle_open_channel(node_a_id, &open_channel); + let accept_channel = + get_event_msg!(nodes[1], MessageSendEvent::SendAcceptChannel, node_a_id); + nodes[0].node.handle_accept_channel(node_b_id, &accept_channel); + let (temporary_channel_id, tx, _funding_outpoint) = + create_funding_transaction(&nodes[0], &node_b_id, 100_000, 42); + + nodes[0] + .node + .funding_transaction_generated(temporary_channel_id, node_b_id, tx.clone()) + .unwrap(); + check_added_monitors(&nodes[0], 0); + + let funding_created = + get_event_msg!(nodes[0], MessageSendEvent::SendFundingCreated, node_b_id); + nodes[1].node.handle_funding_created(node_a_id, &funding_created); + check_added_monitors(&nodes[1], 1); + + // Node 1's watch_channel should be queued (deferred). + assert_eq!( + chain_monitor_b.pending_operation_count(), + 1, + "node 1 watch_channel should be queued" + ); + + // Flush so node 1's monitor is persisted and the ChannelManager can advance. + nodes[1].chain_monitor.flush_all(); + assert_eq!(chain_monitor_b.pending_operation_count(), 0); + + expect_channel_pending_event(&nodes[1], &node_a_id); + let funding_signed = + get_event_msg!(nodes[1], MessageSendEvent::SendFundingSigned, node_a_id); + nodes[0].node.handle_funding_signed(node_b_id, &funding_signed); + check_added_monitors(&nodes[0], 1); + + // Node 0's watch_channel should be queued (deferred). + assert_eq!( + chain_monitor_a.pending_operation_count(), + 1, + "node 0 watch_channel should be queued" + ); + assert!( + chain_monitor_a.list_monitors().is_empty(), + "Monitor should not be in ChainMonitor before flush" + ); + + // Flush node 0's watch_channel. + nodes[0].chain_monitor.flush_all(); + assert_eq!(chain_monitor_a.pending_operation_count(), 0); + assert_eq!( + chain_monitor_a.list_monitors().len(), + 1, + "Monitor should be in ChainMonitor after flush" + ); + + expect_channel_pending_event(&nodes[0], &node_b_id); + + // Drain any leftover messages (e.g. channel_ready, tx_broadcast). + nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().clear(); + let _ = nodes[0].node.get_and_clear_pending_msg_events(); + + // ===== Confirm funding and exchange channel_ready ===== + let conf_height = + core::cmp::max(nodes[0].best_block_info().1 + 1, nodes[1].best_block_info().1 + 1); + confirm_transaction_at(&nodes[0], &tx, conf_height); + connect_blocks(&nodes[0], CHAN_CONFIRM_DEPTH - 1); + confirm_transaction_at(&nodes[1], &tx, conf_height); + connect_blocks(&nodes[1], CHAN_CONFIRM_DEPTH - 1); + + let as_channel_ready = + get_event_msg!(nodes[0], MessageSendEvent::SendChannelReady, node_b_id); + let bs_channel_ready = + get_event_msg!(nodes[1], MessageSendEvent::SendChannelReady, node_a_id); + nodes[0].node.handle_channel_ready(node_b_id, &bs_channel_ready); + expect_channel_ready_event(&nodes[0], &node_b_id); + let _as_update = get_event_msg!(nodes[0], MessageSendEvent::SendChannelUpdate, node_b_id); + nodes[1].node.handle_channel_ready(node_a_id, &as_channel_ready); + expect_channel_ready_event(&nodes[1], &node_a_id); + let _bs_update = get_event_msg!(nodes[1], MessageSendEvent::SendChannelUpdate, node_a_id); + + assert!(!nodes[0].node.list_usable_channels().is_empty(), "Channel should be usable"); + + // ===== Send and claim a payment using standard helpers ===== + // The auto-flush in TestChainMonitor::release_pending_monitor_events ensures + // deferred update_channel operations are flushed before the ChannelManager + // processes monitor completion events. + let (preimage, _hash, ..) = route_payment(&nodes[0], &[&nodes[1]], 10_000); + claim_payment(&nodes[0], &[&nodes[1]], preimage); + + // Both monitors should still be present after the payment flow. + assert_eq!(chain_monitor_a.list_monitors().len(), 1); + assert_eq!(chain_monitor_b.list_monitors().len(), 1); + } } diff --git a/lightning/src/ln/functional_test_utils.rs b/lightning/src/ln/functional_test_utils.rs index 218779123f6..0cf126d6586 100644 --- a/lightning/src/ln/functional_test_utils.rs +++ b/lightning/src/ln/functional_test_utils.rs @@ -4446,6 +4446,26 @@ fn create_node_cfgs_internal<'a, F>( node_count: usize, chanmon_cfgs: &'a Vec, persisters: Vec<&'a impl test_utils::SyncPersist>, message_router_constructor: F, ) -> Vec> +where + F: Fn( + Arc>, + &'a TestKeysInterface, + ) -> test_utils::TestMessageRouter<'a>, +{ + create_node_cfgs_internal_deferred( + node_count, + chanmon_cfgs, + persisters, + message_router_constructor, + false, + ) +} + +fn create_node_cfgs_internal_deferred<'a, F>( + node_count: usize, chanmon_cfgs: &'a Vec, + persisters: Vec<&'a impl test_utils::SyncPersist>, message_router_constructor: F, + deferred: bool, +) -> Vec> where F: Fn( Arc>, @@ -4457,14 +4477,25 @@ where for i in 0..node_count { let cfg = &chanmon_cfgs[i]; let network_graph = Arc::new(NetworkGraph::new(Network::Testnet, &cfg.logger)); - let chain_monitor = test_utils::TestChainMonitor::new( - Some(&cfg.chain_source), - &cfg.tx_broadcaster, - &cfg.logger, - &cfg.fee_estimator, - persisters[i], - &cfg.keys_manager, - ); + let chain_monitor = if deferred { + test_utils::TestChainMonitor::new_deferred( + Some(&cfg.chain_source), + &cfg.tx_broadcaster, + &cfg.logger, + &cfg.fee_estimator, + persisters[i], + &cfg.keys_manager, + ) + } else { + test_utils::TestChainMonitor::new( + Some(&cfg.chain_source), + &cfg.tx_broadcaster, + &cfg.logger, + &cfg.fee_estimator, + persisters[i], + &cfg.keys_manager, + ) + }; let seed = [i as u8; 32]; nodes.push(NodeCfg { @@ -4504,6 +4535,19 @@ pub fn create_node_cfgs<'a>( ) } +pub fn create_node_cfgs_deferred<'a>( + node_count: usize, chanmon_cfgs: &'a Vec, +) -> Vec> { + let persisters = chanmon_cfgs.iter().map(|c| &c.persister).collect(); + create_node_cfgs_internal_deferred( + node_count, + chanmon_cfgs, + persisters, + test_utils::TestMessageRouter::new_default, + true, + ) +} + pub fn create_node_cfgs_with_persisters<'a>( node_count: usize, chanmon_cfgs: &'a Vec, persisters: Vec<&'a impl test_utils::SyncPersist>, diff --git a/lightning/src/util/test_utils.rs b/lightning/src/util/test_utils.rs index 667cc2ae850..0fdc06e9efb 100644 --- a/lightning/src/util/test_utils.rs +++ b/lightning/src/util/test_utils.rs @@ -509,6 +509,7 @@ pub struct TestChainMonitor<'a> { &'a TestKeysInterface, >, pub keys_manager: &'a TestKeysInterface, + pub logger: &'a TestLogger, /// If this is set to Some(), the next update_channel call (not watch_channel) must be a /// ChannelForceClosed event for the given channel_id with should_broadcast set to the given /// boolean. @@ -524,6 +525,38 @@ impl<'a> TestChainMonitor<'a> { chain_source: Option<&'a TestChainSource>, broadcaster: &'a dyn SyncBroadcaster, logger: &'a TestLogger, fee_estimator: &'a TestFeeEstimator, persister: &'a dyn SyncPersist, keys_manager: &'a TestKeysInterface, + ) -> Self { + Self::with_deferred( + chain_source, + broadcaster, + logger, + fee_estimator, + persister, + keys_manager, + false, + ) + } + + pub fn new_deferred( + chain_source: Option<&'a TestChainSource>, broadcaster: &'a dyn SyncBroadcaster, + logger: &'a TestLogger, fee_estimator: &'a TestFeeEstimator, + persister: &'a dyn SyncPersist, keys_manager: &'a TestKeysInterface, + ) -> Self { + Self::with_deferred( + chain_source, + broadcaster, + logger, + fee_estimator, + persister, + keys_manager, + true, + ) + } + + fn with_deferred( + chain_source: Option<&'a TestChainSource>, broadcaster: &'a dyn SyncBroadcaster, + logger: &'a TestLogger, fee_estimator: &'a TestFeeEstimator, + persister: &'a dyn SyncPersist, keys_manager: &'a TestKeysInterface, deferred: bool, ) -> Self { Self { added_monitors: Mutex::new(Vec::new()), @@ -537,9 +570,10 @@ impl<'a> TestChainMonitor<'a> { persister, keys_manager, keys_manager.get_peer_storage_key(), - false, + deferred, ), keys_manager, + logger, expect_channel_force_closed: Mutex::new(None), expect_monitor_round_trip_fail: Mutex::new(None), #[cfg(feature = "std")] @@ -547,6 +581,15 @@ impl<'a> TestChainMonitor<'a> { } } + pub fn pending_operation_count(&self) -> usize { + self.chain_monitor.pending_operation_count() + } + + pub fn flush_all(&self) { + let count = self.pending_operation_count(); + self.chain_monitor.flush(count, &self.logger); + } + pub fn complete_sole_pending_chan_update(&self, channel_id: &ChannelId) { let (_, latest_update) = self.latest_monitor_update_id.lock().unwrap().get(channel_id).unwrap().clone(); @@ -677,6 +720,11 @@ impl<'a> chain::Watch for TestChainMonitor<'a> { fn release_pending_monitor_events( &self, ) -> Vec<(OutPoint, ChannelId, Vec, PublicKey)> { + // Auto-flush pending operations so that the ChannelManager can pick up monitor + // completion events. When not in deferred mode the queue is empty so this only + // costs a lock acquisition. It ensures standard test helpers (route_payment, etc.) + // work with deferred chain monitors. + self.flush_all(); return self.chain_monitor.release_pending_monitor_events(); } } @@ -836,6 +884,8 @@ pub struct TestPersister { /// The queue of update statuses we'll return. If none are queued, ::Completed will always be /// returned. pub update_rets: Mutex>, + /// When we get a persist_new_channel call, we push the monitor name here. + pub new_channel_persistences: Mutex>, /// When we get an update_persisted_channel call *with* a ChannelMonitorUpdate, we insert the /// [`ChannelMonitor::get_latest_update_id`] here. pub offchain_monitor_updates: Mutex>>, @@ -846,9 +896,15 @@ pub struct TestPersister { impl TestPersister { pub fn new() -> Self { let update_rets = Mutex::new(VecDeque::new()); + let new_channel_persistences = Mutex::new(Vec::new()); let offchain_monitor_updates = Mutex::new(new_hash_map()); let chain_sync_monitor_persistences = Mutex::new(VecDeque::new()); - Self { update_rets, offchain_monitor_updates, chain_sync_monitor_persistences } + Self { + update_rets, + new_channel_persistences, + offchain_monitor_updates, + chain_sync_monitor_persistences, + } } /// Queue an update status to return. @@ -858,8 +914,9 @@ impl TestPersister { } impl Persist for TestPersister { fn persist_new_channel( - &self, _monitor_name: MonitorName, _data: &ChannelMonitor, + &self, monitor_name: MonitorName, _data: &ChannelMonitor, ) -> chain::ChannelMonitorUpdateStatus { + self.new_channel_persistences.lock().unwrap().push(monitor_name); if let Some(update_ret) = self.update_rets.lock().unwrap().pop_front() { return update_ret; }