diff --git a/fuzz/src/chanmon_consistency.rs b/fuzz/src/chanmon_consistency.rs index 5fb07431b17..c1dd7b01362 100644 --- a/fuzz/src/chanmon_consistency.rs +++ b/fuzz/src/chanmon_consistency.rs @@ -283,6 +283,7 @@ impl TestChainMonitor { Arc::clone(&persister), Arc::clone(&keys), keys.get_peer_storage_key(), + false, )), logger, keys, diff --git a/fuzz/src/full_stack.rs b/fuzz/src/full_stack.rs index 600335b5083..4b725e3eb19 100644 --- a/fuzz/src/full_stack.rs +++ b/fuzz/src/full_stack.rs @@ -597,6 +597,7 @@ pub fn do_test(mut data: &[u8], logger: &Arc) { Arc::new(TestPersister { update_ret: Mutex::new(ChannelMonitorUpdateStatus::Completed) }), Arc::clone(&keys_manager), keys_manager.get_peer_storage_key(), + false, )); let network = Network::Bitcoin; diff --git a/fuzz/src/lsps_message.rs b/fuzz/src/lsps_message.rs index 547a27b70ee..d01e5632025 100644 --- a/fuzz/src/lsps_message.rs +++ b/fuzz/src/lsps_message.rs @@ -59,6 +59,7 @@ pub fn do_test(data: &[u8]) { Arc::clone(&kv_store), Arc::clone(&keys_manager), keys_manager.get_peer_storage_key(), + false, )); let best_block = BestBlock::from_network(network); let params = ChainParameters { network, best_block }; diff --git a/lightning-background-processor/src/lib.rs b/lightning-background-processor/src/lib.rs index bb99d65e6b5..00a7acc57e9 100644 --- a/lightning-background-processor/src/lib.rs +++ b/lightning-background-processor/src/lib.rs @@ -1120,6 +1120,10 @@ where let mut futures = Joiner::new(); + // Capture the pending count before persisting. Only this many writes will be + // flushed afterward, so that updates arriving after persist aren't included. + let pending_monitor_writes = chain_monitor.get_cm().pending_operation_count(); + if channel_manager.get_cm().get_and_clear_needs_persistence() { log_trace!(logger, "Persisting ChannelManager..."); @@ -1317,6 +1321,10 @@ where res?; } + // Flush monitor writes that were pending before we persisted. New updates that + // arrived after are left for the next iteration. + chain_monitor.get_cm().flush(pending_monitor_writes, &logger); + match check_and_reset_sleeper(&mut last_onion_message_handler_call, || { sleeper(ONION_MESSAGE_HANDLER_TIMER) }) { @@ -1373,6 +1381,7 @@ where // After we exit, ensure we persist the ChannelManager one final time - this avoids // some races where users quit while channel updates were in-flight, with // ChannelMonitor update(s) persisted without a corresponding ChannelManager update. + let pending_monitor_writes = chain_monitor.get_cm().pending_operation_count(); kv_store .write( CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, @@ -1381,6 +1390,10 @@ where channel_manager.get_cm().encode(), ) .await?; + + // Flush monitor writes that were pending before final persistence. + chain_monitor.get_cm().flush(pending_monitor_writes, &logger); + if let Some(ref scorer) = scorer { kv_store .write( @@ -1571,7 +1584,8 @@ impl BackgroundProcessor { ) -> Self where L::Target: 'static + Logger, - M::Target: AChainMonitor::Signer, Logger = L>, + M::Target: + 'static + AChainMonitor::Signer, Logger = L>, CM::Target: AChannelManager, OM::Target: AOnionMessenger, PM::Target: APeerManager, @@ -1684,6 +1698,11 @@ impl BackgroundProcessor { channel_manager.get_cm().timer_tick_occurred(); last_freshness_call = Instant::now(); } + + // Capture the pending count before persisting. Only this many writes will be + // flushed afterward, so that updates arriving after persist aren't included. + let pending_monitor_writes = chain_monitor.get_cm().pending_operation_count(); + if channel_manager.get_cm().get_and_clear_needs_persistence() { log_trace!(logger, "Persisting ChannelManager..."); (kv_store.write( @@ -1695,6 +1714,10 @@ impl BackgroundProcessor { log_trace!(logger, "Done persisting ChannelManager."); } + // Flush monitor writes that were pending before we persisted. New updates + // that arrived after are left for the next iteration. + chain_monitor.get_cm().flush(pending_monitor_writes, &logger); + if let Some(liquidity_manager) = liquidity_manager.as_ref() { log_trace!(logger, "Persisting LiquidityManager..."); let _ = liquidity_manager.get_lm().persist().map_err(|e| { @@ -1809,12 +1832,17 @@ impl BackgroundProcessor { // After we exit, ensure we persist the ChannelManager one final time - this avoids // some races where users quit while channel updates were in-flight, with // ChannelMonitor update(s) persisted without a corresponding ChannelManager update. + let pending_monitor_writes = chain_monitor.get_cm().pending_operation_count(); kv_store.write( CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, CHANNEL_MANAGER_PERSISTENCE_KEY, channel_manager.get_cm().encode(), )?; + + // Flush monitor writes that were pending before final persistence. + chain_monitor.get_cm().flush(pending_monitor_writes, &logger); + if let Some(ref scorer) = scorer { kv_store.write( SCORER_PERSISTENCE_PRIMARY_NAMESPACE, @@ -1896,9 +1924,10 @@ mod tests { use bitcoin::transaction::{Transaction, TxOut}; use bitcoin::{Amount, ScriptBuf, Txid}; use core::sync::atomic::{AtomicBool, Ordering}; + use lightning::chain::chainmonitor::ChainMonitor as LdkChainMonitor; use lightning::chain::channelmonitor::ANTI_REORG_DELAY; use lightning::chain::transaction::OutPoint; - use lightning::chain::{chainmonitor, BestBlock, Confirm, Filter}; + use lightning::chain::{BestBlock, Confirm, Filter}; use lightning::events::{Event, PathFailure, ReplayEvent}; use lightning::ln::channelmanager; use lightning::ln::channelmanager::{ @@ -2008,7 +2037,7 @@ mod tests { Arc, >; - type ChainMonitor = chainmonitor::ChainMonitor< + type ChainMonitor = LdkChainMonitor< InMemorySigner, Arc, Arc, @@ -2436,7 +2465,7 @@ mod tests { let now = Duration::from_secs(genesis_block.header.time as u64); let keys_manager = Arc::new(KeysManager::new(&seed, now.as_secs(), now.subsec_nanos(), true)); - let chain_monitor = Arc::new(chainmonitor::ChainMonitor::new( + let chain_monitor = Arc::new(LdkChainMonitor::new( Some(Arc::clone(&chain_source)), Arc::clone(&tx_broadcaster), Arc::clone(&logger), @@ -2444,6 +2473,7 @@ mod tests { Arc::clone(&kv_store), Arc::clone(&keys_manager), keys_manager.get_peer_storage_key(), + true, )); let best_block = BestBlock::from_network(network); let params = ChainParameters { network, best_block }; @@ -2580,12 +2610,20 @@ mod tests { tx.clone(), ) .unwrap(); + // Flush deferred monitor operations so messages aren't held back + $node_a + .chain_monitor + .flush($node_a.chain_monitor.pending_operation_count(), &$node_a.logger); let msg_a = get_event_msg!( $node_a, MessageSendEvent::SendFundingCreated, $node_b.node.get_our_node_id() ); $node_b.node.handle_funding_created($node_a.node.get_our_node_id(), &msg_a); + // Flush node_b's monitor so it releases the FundingSigned message + $node_b + .chain_monitor + .flush($node_b.chain_monitor.pending_operation_count(), &$node_b.logger); get_event!($node_b, Event::ChannelPending); let msg_b = get_event_msg!( $node_b, @@ -2593,6 +2631,10 @@ mod tests { $node_a.node.get_our_node_id() ); $node_a.node.handle_funding_signed($node_b.node.get_our_node_id(), &msg_b); + // Flush node_a's monitor for the final update + $node_a + .chain_monitor + .flush($node_a.chain_monitor.pending_operation_count(), &$node_a.logger); get_event!($node_a, Event::ChannelPending); tx }}; @@ -3039,11 +3081,23 @@ mod tests { .node .funding_transaction_generated(temporary_channel_id, node_1_id, funding_tx.clone()) .unwrap(); + // Flush node_0's deferred monitor operations so the FundingCreated message is released + nodes[0] + .chain_monitor + .flush(nodes[0].chain_monitor.pending_operation_count(), &nodes[0].logger); let msg_0 = get_event_msg!(nodes[0], MessageSendEvent::SendFundingCreated, node_1_id); nodes[1].node.handle_funding_created(node_0_id, &msg_0); + // Flush node_1's deferred monitor operations so events and FundingSigned are released + nodes[1] + .chain_monitor + .flush(nodes[1].chain_monitor.pending_operation_count(), &nodes[1].logger); get_event!(nodes[1], Event::ChannelPending); let msg_1 = get_event_msg!(nodes[1], MessageSendEvent::SendFundingSigned, node_0_id); nodes[0].node.handle_funding_signed(node_1_id, &msg_1); + // Flush node_0's monitor for the funding_signed update + nodes[0] + .chain_monitor + .flush(nodes[0].chain_monitor.pending_operation_count(), &nodes[0].logger); channel_pending_recv .recv_timeout(EVENT_DEADLINE) .expect("ChannelPending not handled within deadline"); @@ -3104,6 +3158,10 @@ mod tests { error_message.to_string(), ) .unwrap(); + // Flush the monitor update triggered by force close so the commitment tx is broadcasted + nodes[0] + .chain_monitor + .flush(nodes[0].chain_monitor.pending_operation_count(), &nodes[0].logger); let commitment_tx = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().pop().unwrap(); confirm_transaction_depth(&mut nodes[0], &commitment_tx, BREAKDOWN_TIMEOUT as u32); diff --git a/lightning/src/chain/chainmonitor.rs b/lightning/src/chain/chainmonitor.rs index 17693f8ca7a..1017664132c 100644 --- a/lightning/src/chain/chainmonitor.rs +++ b/lightning/src/chain/chainmonitor.rs @@ -60,12 +60,21 @@ use crate::util::persist::{KVStore, MonitorName, MonitorUpdatingPersisterAsync}; use crate::util::ser::{VecWriter, Writeable}; use crate::util::wakers::{Future, Notifier}; +use alloc::collections::VecDeque; use alloc::sync::Arc; #[cfg(peer_storage)] use core::iter::Cycle; use core::ops::Deref; use core::sync::atomic::{AtomicUsize, Ordering}; +/// A pending operation queued for later execution when `ChainMonitor` is in deferred mode. +enum PendingMonitorOp { + /// A new monitor to insert and persist. + NewMonitor { channel_id: ChannelId, monitor: ChannelMonitor, update_id: u64 }, + /// An update to apply and persist. + Update { channel_id: ChannelId, update: ChannelMonitorUpdate }, +} + /// `Persist` defines behavior for persisting channel monitors: this could mean /// writing once to disk, and/or uploading to one or more backup services. /// @@ -371,6 +380,33 @@ pub struct ChainMonitor< #[cfg(peer_storage)] our_peerstorage_encryption_key: PeerStorageKey, + + /// When `true`, [`chain::Watch`] operations are queued rather than executed immediately. + /// The queued operations can be flushed with [`Self::flush`]. + deferred: bool, + /// Queued monitor operations awaiting flush. Empty when `deferred` is `false`. + /// + /// # Locking order with `monitors` + /// + /// The consistent lock order is `pending_ops` **before** `monitors`. + /// + /// [`watch_channel`] holds `pending_ops.lock()` **then** `monitors.read()` to atomically + /// check for duplicate channel IDs in both the pending queue and the flushed set. + /// + /// [`flush`] holds `pending_ops.lock()` across [`watch_channel_internal`] (which acquires + /// `monitors.write()`) for [`NewMonitor`] ops, ensuring mutual exclusion with + /// `watch_channel`: both acquire `pending_ops` first, so they serialize on that lock. + /// + /// For [`Update`] ops, `pending_ops` is released before calling + /// [`update_channel_internal`] so that concurrent `update_channel` queuing is not blocked. + /// + /// [`watch_channel`]: chain::Watch::watch_channel + /// [`flush`]: Self::flush + /// [`watch_channel_internal`]: Self::watch_channel_internal + /// [`update_channel_internal`]: Self::update_channel_internal + /// [`NewMonitor`]: PendingMonitorOp::NewMonitor + /// [`Update`]: PendingMonitorOp::Update + pending_ops: Mutex>>, } impl< @@ -397,7 +433,7 @@ where pub fn new_async_beta( chain_source: Option, broadcaster: T, logger: L, feeest: F, persister: MonitorUpdatingPersisterAsync, _entropy_source: ES, - _our_peerstorage_encryption_key: PeerStorageKey, + _our_peerstorage_encryption_key: PeerStorageKey, deferred: bool, ) -> Self { let event_notifier = Arc::new(Notifier::new()); Self { @@ -414,6 +450,8 @@ where pending_send_only_events: Mutex::new(Vec::new()), #[cfg(peer_storage)] our_peerstorage_encryption_key: _our_peerstorage_encryption_key, + deferred, + pending_ops: Mutex::new(VecDeque::new()), } } } @@ -603,7 +641,7 @@ where /// [`ChannelManager`]: crate::ln::channelmanager::ChannelManager pub fn new( chain_source: Option, broadcaster: T, logger: L, feeest: F, persister: P, - _entropy_source: ES, _our_peerstorage_encryption_key: PeerStorageKey, + _entropy_source: ES, _our_peerstorage_encryption_key: PeerStorageKey, deferred: bool, ) -> Self { Self { monitors: RwLock::new(new_hash_map()), @@ -619,6 +657,8 @@ where pending_send_only_events: Mutex::new(Vec::new()), #[cfg(peer_storage)] our_peerstorage_encryption_key: _our_peerstorage_encryption_key, + deferred, + pending_ops: Mutex::new(VecDeque::new()), } } @@ -1058,6 +1098,258 @@ where Ok(ChannelMonitorUpdateStatus::Completed) } + + fn watch_channel_internal( + &self, channel_id: ChannelId, monitor: ChannelMonitor, + ) -> Result { + let logger = WithChannelMonitor::from(&self.logger, &monitor, None); + let mut monitors = self.monitors.write().unwrap(); + let entry = match monitors.entry(channel_id) { + hash_map::Entry::Occupied(_) => { + log_error!(logger, "Failed to add new channel data: channel monitor for given channel ID is already present"); + return Err(()); + }, + hash_map::Entry::Vacant(e) => e, + }; + log_trace!(logger, "Got new ChannelMonitor"); + let update_id = monitor.get_latest_update_id(); + let mut pending_monitor_updates = Vec::new(); + let persist_res = self.persister.persist_new_channel(monitor.persistence_key(), &monitor); + match persist_res { + ChannelMonitorUpdateStatus::InProgress => { + log_info!(logger, "Persistence of new ChannelMonitor in progress",); + pending_monitor_updates.push(update_id); + }, + ChannelMonitorUpdateStatus::Completed => { + log_info!(logger, "Persistence of new ChannelMonitor completed",); + }, + ChannelMonitorUpdateStatus::UnrecoverableError => { + let err_str = "ChannelMonitor[Update] persistence failed unrecoverably. This indicates we cannot continue normal operation and must shut down."; + log_error!(logger, "{}", err_str); + panic!("{}", err_str); + }, + } + if let Some(ref chain_source) = self.chain_source { + monitor.load_outputs_to_watch(chain_source, &self.logger); + } + entry.insert(MonitorHolder { + monitor, + pending_monitor_updates: Mutex::new(pending_monitor_updates), + }); + Ok(persist_res) + } + + fn update_channel_internal( + &self, channel_id: ChannelId, update: &ChannelMonitorUpdate, + ) -> ChannelMonitorUpdateStatus { + // `ChannelMonitorUpdate`'s `channel_id` is `None` prior to 0.0.121 and all channels in those + // versions are V1-established. For 0.0.121+ the `channel_id` fields is always `Some`. + debug_assert_eq!(update.channel_id.unwrap(), channel_id); + // Update the monitor that watches the channel referred to by the given outpoint. + let monitors = self.monitors.read().unwrap(); + match monitors.get(&channel_id) { + None => { + let logger = WithContext::from(&self.logger, None, Some(channel_id), None); + log_error!(logger, "Failed to update channel monitor: no such monitor registered"); + + // We should never ever trigger this from within ChannelManager. Technically a + // user could use this object with some proxying in between which makes this + // possible, but in tests and fuzzing, this should be a panic. + #[cfg(debug_assertions)] + panic!("ChannelManager generated a channel update for a channel that was not yet registered!"); + #[cfg(not(debug_assertions))] + ChannelMonitorUpdateStatus::InProgress + }, + Some(monitor_state) => { + let monitor = &monitor_state.monitor; + let logger = WithChannelMonitor::from(&self.logger, &monitor, None); + log_trace!(logger, "Updating ChannelMonitor to id {}", update.update_id,); + + // We hold a `pending_monitor_updates` lock through `update_monitor` to ensure we + // have well-ordered updates from the users' point of view. See the + // `pending_monitor_updates` docs for more. + let mut pending_monitor_updates = + monitor_state.pending_monitor_updates.lock().unwrap(); + let update_res = monitor.update_monitor( + update, + &self.broadcaster, + &self.fee_estimator, + &self.logger, + ); + + let update_id = update.update_id; + let persist_res = if update_res.is_err() { + // Even if updating the monitor returns an error, the monitor's state will + // still be changed. Therefore, we should persist the updated monitor despite the error. + // We don't want to persist a `monitor_update` which results in a failure to apply later + // while reading `channel_monitor` with updates from storage. Instead, we should persist + // the entire `channel_monitor` here. + log_warn!(logger, "Failed to update ChannelMonitor. Going ahead and persisting the entire ChannelMonitor"); + self.persister.update_persisted_channel( + monitor.persistence_key(), + None, + monitor, + ) + } else { + self.persister.update_persisted_channel( + monitor.persistence_key(), + Some(update), + monitor, + ) + }; + match persist_res { + ChannelMonitorUpdateStatus::InProgress => { + pending_monitor_updates.push(update_id); + log_debug!( + logger, + "Persistence of ChannelMonitorUpdate id {:?} in progress", + update_id, + ); + }, + ChannelMonitorUpdateStatus::Completed => { + log_debug!( + logger, + "Persistence of ChannelMonitorUpdate id {:?} completed", + update_id, + ); + }, + ChannelMonitorUpdateStatus::UnrecoverableError => { + // Take the monitors lock for writing so that we poison it and any future + // operations going forward fail immediately. + core::mem::drop(pending_monitor_updates); + core::mem::drop(monitors); + let _poison = self.monitors.write().unwrap(); + let err_str = "ChannelMonitor[Update] persistence failed unrecoverably. This indicates we cannot continue normal operation and must shut down."; + log_error!(logger, "{}", err_str); + panic!("{}", err_str); + }, + } + + // We may need to start monitoring for any alternative funding transactions. + if let Some(ref chain_source) = self.chain_source { + for (funding_outpoint, funding_script) in + update.internal_renegotiated_funding_data() + { + log_trace!( + logger, + "Registering renegotiated funding outpoint {} with the filter to monitor confirmations and spends", + funding_outpoint + ); + chain_source.register_tx(&funding_outpoint.txid, &funding_script); + chain_source.register_output(WatchedOutput { + block_hash: None, + outpoint: funding_outpoint, + script_pubkey: funding_script, + }); + } + } + + if update_res.is_err() { + ChannelMonitorUpdateStatus::InProgress + } else { + persist_res + } + }, + } + } + + /// Returns the number of pending monitor operations queued for later execution. + /// + /// When the `ChainMonitor` is not in deferred mode, this always returns 0. + pub fn pending_operation_count(&self) -> usize { + self.pending_ops.lock().unwrap().len() + } + + /// Flushes up to `count` pending monitor operations. + /// + /// For both [`NewMonitor`] and [`Update`] variants, the operation is forwarded to the + /// internal watch/update methods. If the result is [`Completed`], + /// [`ChainMonitor::channel_monitor_updated`] is called immediately so the + /// [`ChannelManager`] can release any held messages. For [`InProgress`], + /// `channel_monitor_updated` is not called — the async persister will signal completion + /// later. [`UnrecoverableError`] panics. + /// + /// For [`NewMonitor`], an `Err(())` from `watch_channel_internal` (e.g. duplicate + /// channel) stops processing. The monitor is consumed and cannot be retried, but + /// remaining operations are left in the queue. + /// + /// Returns early if the queue empties before `count` operations have been processed. + /// + /// [`NewMonitor`]: PendingMonitorOp::NewMonitor + /// [`Update`]: PendingMonitorOp::Update + /// [`Completed`]: ChannelMonitorUpdateStatus::Completed + /// [`InProgress`]: ChannelMonitorUpdateStatus::InProgress + /// [`UnrecoverableError`]: ChannelMonitorUpdateStatus::UnrecoverableError + /// [`ChannelManager`]: crate::ln::channelmanager::ChannelManager + /// # Locking strategy + /// + /// For [`NewMonitor`] operations, `pending_ops` is held across the + /// `watch_channel_internal` call (which acquires `monitors` write lock). + /// [`watch_channel`] acquires `pending_ops` then `monitors.read()` for its + /// duplicate check — the same `pending_ops`-first order — so the two + /// serialize on `pending_ops`, eliminating any race on duplicate detection. + /// + /// For [`Update`] operations, `pending_ops` is released before calling + /// `update_channel_internal`, so that concurrent `update_channel` calls (the + /// hot path) are not blocked by ongoing persistence. + /// + /// [`watch_channel`]: chain::Watch::watch_channel + pub fn flush(&self, count: usize, logger: &L) { + if count > 0 { + log_trace!(logger, "Flushing up to {} monitor writes", count); + } + for _ in 0..count { + let mut queue = self.pending_ops.lock().unwrap(); + let op = match queue.pop_front() { + Some(op) => op, + None => return, + }; + + let (channel_id, update_id, status) = match op { + PendingMonitorOp::NewMonitor { channel_id, monitor, update_id } => { + // Hold `pending_ops` across the internal call so that + // `watch_channel` (which checks `monitors` + `pending_ops` + // atomically) cannot race with this insertion. + match self.watch_channel_internal(channel_id, monitor) { + Ok(status) => { + drop(queue); + (channel_id, update_id, status) + }, + Err(()) => { + drop(queue); + log_error!(logger, "watch_channel failed for channel {}", channel_id); + return; + }, + } + }, + PendingMonitorOp::Update { channel_id, update } => { + // Release `pending_ops` before the internal call so that + // concurrent `update_channel` queuing is not blocked. + drop(queue); + let update_id = update.update_id; + let status = self.update_channel_internal(channel_id, &update); + (channel_id, update_id, status) + }, + }; + + match status { + ChannelMonitorUpdateStatus::Completed => { + if let Err(e) = self.channel_monitor_updated(channel_id, update_id) { + log_error!( + logger, + "channel_monitor_updated failed for channel {}: {:?}", + channel_id, + e + ); + } + }, + ChannelMonitorUpdateStatus::InProgress => {}, + ChannelMonitorUpdateStatus::UnrecoverableError => { + panic!("UnrecoverableError during monitor operation"); + }, + } + } + } } impl< @@ -1272,154 +1564,42 @@ where fn watch_channel( &self, channel_id: ChannelId, monitor: ChannelMonitor, ) -> Result { - let logger = WithChannelMonitor::from(&self.logger, &monitor, None); - let mut monitors = self.monitors.write().unwrap(); - let entry = match monitors.entry(channel_id) { - hash_map::Entry::Occupied(_) => { - log_error!(logger, "Failed to add new channel data: channel monitor for given channel ID is already present"); + if self.deferred { + let update_id = monitor.get_latest_update_id(); + // Hold both `pending_ops` and `monitors` (read) to atomically check for + // duplicates in both the pending queue and the flushed monitor set. We + // acquire `pending_ops` first to match the lock order in `flush`, which + // holds `pending_ops` across `watch_channel_internal` (acquiring + // `monitors` write lock). See the `flush` and `pending_ops` documentation + // for details. + let mut pending_ops = self.pending_ops.lock().unwrap(); + let monitors = self.monitors.read().unwrap(); + if monitors.contains_key(&channel_id) { return Err(()); - }, - hash_map::Entry::Vacant(e) => e, - }; - log_trace!(logger, "Got new ChannelMonitor"); - let update_id = monitor.get_latest_update_id(); - let mut pending_monitor_updates = Vec::new(); - let persist_res = self.persister.persist_new_channel(monitor.persistence_key(), &monitor); - match persist_res { - ChannelMonitorUpdateStatus::InProgress => { - log_info!(logger, "Persistence of new ChannelMonitor in progress",); - pending_monitor_updates.push(update_id); - }, - ChannelMonitorUpdateStatus::Completed => { - log_info!(logger, "Persistence of new ChannelMonitor completed",); - }, - ChannelMonitorUpdateStatus::UnrecoverableError => { - let err_str = "ChannelMonitor[Update] persistence failed unrecoverably. This indicates we cannot continue normal operation and must shut down."; - log_error!(logger, "{}", err_str); - panic!("{}", err_str); - }, - } - if let Some(ref chain_source) = self.chain_source { - monitor.load_outputs_to_watch(chain_source, &self.logger); + } + let already_pending = pending_ops.iter().any(|op| match op { + PendingMonitorOp::NewMonitor { channel_id: id, .. } => *id == channel_id, + _ => false, + }); + if already_pending { + return Err(()); + } + pending_ops.push_back(PendingMonitorOp::NewMonitor { channel_id, monitor, update_id }); + Ok(ChannelMonitorUpdateStatus::InProgress) + } else { + self.watch_channel_internal(channel_id, monitor) } - entry.insert(MonitorHolder { - monitor, - pending_monitor_updates: Mutex::new(pending_monitor_updates), - }); - Ok(persist_res) } fn update_channel( &self, channel_id: ChannelId, update: &ChannelMonitorUpdate, ) -> ChannelMonitorUpdateStatus { - // `ChannelMonitorUpdate`'s `channel_id` is `None` prior to 0.0.121 and all channels in those - // versions are V1-established. For 0.0.121+ the `channel_id` fields is always `Some`. - debug_assert_eq!(update.channel_id.unwrap(), channel_id); - // Update the monitor that watches the channel referred to by the given outpoint. - let monitors = self.monitors.read().unwrap(); - match monitors.get(&channel_id) { - None => { - let logger = WithContext::from(&self.logger, None, Some(channel_id), None); - log_error!(logger, "Failed to update channel monitor: no such monitor registered"); - - // We should never ever trigger this from within ChannelManager. Technically a - // user could use this object with some proxying in between which makes this - // possible, but in tests and fuzzing, this should be a panic. - #[cfg(debug_assertions)] - panic!("ChannelManager generated a channel update for a channel that was not yet registered!"); - #[cfg(not(debug_assertions))] - ChannelMonitorUpdateStatus::InProgress - }, - Some(monitor_state) => { - let monitor = &monitor_state.monitor; - let logger = WithChannelMonitor::from(&self.logger, &monitor, None); - log_trace!(logger, "Updating ChannelMonitor to id {}", update.update_id,); - - // We hold a `pending_monitor_updates` lock through `update_monitor` to ensure we - // have well-ordered updates from the users' point of view. See the - // `pending_monitor_updates` docs for more. - let mut pending_monitor_updates = - monitor_state.pending_monitor_updates.lock().unwrap(); - let update_res = monitor.update_monitor( - update, - &self.broadcaster, - &self.fee_estimator, - &self.logger, - ); - - let update_id = update.update_id; - let persist_res = if update_res.is_err() { - // Even if updating the monitor returns an error, the monitor's state will - // still be changed. Therefore, we should persist the updated monitor despite the error. - // We don't want to persist a `monitor_update` which results in a failure to apply later - // while reading `channel_monitor` with updates from storage. Instead, we should persist - // the entire `channel_monitor` here. - log_warn!(logger, "Failed to update ChannelMonitor. Going ahead and persisting the entire ChannelMonitor"); - self.persister.update_persisted_channel( - monitor.persistence_key(), - None, - monitor, - ) - } else { - self.persister.update_persisted_channel( - monitor.persistence_key(), - Some(update), - monitor, - ) - }; - match persist_res { - ChannelMonitorUpdateStatus::InProgress => { - pending_monitor_updates.push(update_id); - log_debug!( - logger, - "Persistence of ChannelMonitorUpdate id {:?} in progress", - update_id, - ); - }, - ChannelMonitorUpdateStatus::Completed => { - log_debug!( - logger, - "Persistence of ChannelMonitorUpdate id {:?} completed", - update_id, - ); - }, - ChannelMonitorUpdateStatus::UnrecoverableError => { - // Take the monitors lock for writing so that we poison it and any future - // operations going forward fail immediately. - core::mem::drop(pending_monitor_updates); - core::mem::drop(monitors); - let _poison = self.monitors.write().unwrap(); - let err_str = "ChannelMonitor[Update] persistence failed unrecoverably. This indicates we cannot continue normal operation and must shut down."; - log_error!(logger, "{}", err_str); - panic!("{}", err_str); - }, - } - - // We may need to start monitoring for any alternative funding transactions. - if let Some(ref chain_source) = self.chain_source { - for (funding_outpoint, funding_script) in - update.internal_renegotiated_funding_data() - { - log_trace!( - logger, - "Registering renegotiated funding outpoint {} with the filter to monitor confirmations and spends", - funding_outpoint - ); - chain_source.register_tx(&funding_outpoint.txid, &funding_script); - chain_source.register_output(WatchedOutput { - block_hash: None, - outpoint: funding_outpoint, - script_pubkey: funding_script, - }); - } - } - - if update_res.is_err() { - ChannelMonitorUpdateStatus::InProgress - } else { - persist_res - } - }, + if self.deferred { + let mut pending_ops = self.pending_ops.lock().unwrap(); + pending_ops.push_back(PendingMonitorOp::Update { channel_id, update: update.clone() }); + ChannelMonitorUpdateStatus::InProgress + } else { + self.update_channel_internal(channel_id, update) } } @@ -1550,12 +1730,16 @@ where #[cfg(test)] mod tests { + use super::ChainMonitor; use crate::chain::channelmonitor::ANTI_REORG_DELAY; use crate::chain::{ChannelMonitorUpdateStatus, Watch}; use crate::events::{ClosureReason, Event}; use crate::ln::functional_test_utils::*; use crate::ln::msgs::{BaseMessageHandler, ChannelMessageHandler, MessageSendEvent}; + use crate::ln::types::ChannelId; use crate::{expect_payment_path_successful, get_event_msg}; + use bitcoin::hash_types::Txid; + use bitcoin::secp256k1::PublicKey; const CHAINSYNC_MONITOR_PARTITION_FACTOR: u32 = 5; @@ -1808,4 +1992,339 @@ mod tests { }) .is_err()); } + + // ==================== Deferred mode tests ==================== + + use crate::chain::channelmonitor::ChannelMonitorUpdate; + use crate::chain::transaction::OutPoint; + use crate::ln::chan_utils::{ + ChannelTransactionParameters, CounterpartyChannelTransactionParameters, + HolderCommitmentTransaction, + }; + use crate::ln::channel_keys::{DelayedPaymentBasepoint, HtlcBasepoint, RevocationBasepoint}; + use crate::ln::script::ShutdownScript; + use crate::sign::{ChannelSigner, InMemorySigner, NodeSigner}; + use crate::types::features::ChannelTypeFeatures; + use crate::util::dyn_signer::DynSigner; + use crate::util::test_channel_signer::TestChannelSigner; + use crate::util::test_utils::{ + TestBroadcaster, TestChainSource, TestFeeEstimator, TestKeysInterface, TestLogger, + TestPersister, + }; + use bitcoin::hashes::Hash; + use bitcoin::script::ScriptBuf; + use bitcoin::secp256k1::{Secp256k1, SecretKey}; + use bitcoin::Network; + + /// Concrete `ChainMonitor` type wired to the standard test utilities in deferred mode. + type TestDeferredChainMonitor<'a> = ChainMonitor< + TestChannelSigner, + &'a TestChainSource, + &'a TestBroadcaster, + &'a TestFeeEstimator, + &'a TestLogger, + &'a TestPersister, + &'a TestKeysInterface, + >; + + /// Creates a minimal `ChannelMonitorUpdate` with no actual update steps. + fn dummy_update(update_id: u64, channel_id: ChannelId) -> ChannelMonitorUpdate { + ChannelMonitorUpdate { updates: vec![], update_id, channel_id: Some(channel_id) } + } + + /// Creates a minimal `ChannelMonitor` for the given `channel_id`. + fn dummy_monitor( + channel_id: ChannelId, + ) -> crate::chain::channelmonitor::ChannelMonitor { + let secp_ctx = Secp256k1::new(); + let dummy_key = + PublicKey::from_secret_key(&secp_ctx, &SecretKey::from_slice(&[42; 32]).unwrap()); + let keys = InMemorySigner::new( + SecretKey::from_slice(&[41; 32]).unwrap(), + SecretKey::from_slice(&[41; 32]).unwrap(), + SecretKey::from_slice(&[41; 32]).unwrap(), + SecretKey::from_slice(&[41; 32]).unwrap(), + true, + SecretKey::from_slice(&[41; 32]).unwrap(), + SecretKey::from_slice(&[41; 32]).unwrap(), + [41; 32], + [0; 32], + [0; 32], + ); + let counterparty_pubkeys = crate::ln::chan_utils::ChannelPublicKeys { + funding_pubkey: dummy_key, + revocation_basepoint: RevocationBasepoint::from(dummy_key), + payment_point: dummy_key, + delayed_payment_basepoint: DelayedPaymentBasepoint::from(dummy_key), + htlc_basepoint: HtlcBasepoint::from(dummy_key), + }; + let funding_outpoint = OutPoint { txid: Txid::all_zeros(), index: u16::MAX }; + let channel_parameters = ChannelTransactionParameters { + holder_pubkeys: keys.pubkeys(&secp_ctx), + holder_selected_contest_delay: 66, + is_outbound_from_holder: true, + counterparty_parameters: Some(CounterpartyChannelTransactionParameters { + pubkeys: counterparty_pubkeys, + selected_contest_delay: 67, + }), + funding_outpoint: Some(funding_outpoint), + splice_parent_funding_txid: None, + channel_type_features: ChannelTypeFeatures::only_static_remote_key(), + channel_value_satoshis: 0, + }; + let shutdown_script = ShutdownScript::new_p2wpkh_from_pubkey(dummy_key); + let best_block = crate::chain::BestBlock::from_network(Network::Testnet); + let signer = TestChannelSigner::new(DynSigner::new(keys)); + crate::chain::channelmonitor::ChannelMonitor::new( + secp_ctx, + signer, + Some(shutdown_script.into_inner()), + 0, + &ScriptBuf::new(), + &channel_parameters, + true, + 0, + HolderCommitmentTransaction::dummy(0, funding_outpoint, Vec::new()), + best_block, + dummy_key, + channel_id, + false, + ) + } + + fn create_deferred_chain_monitor<'a>( + chain_source: &'a TestChainSource, broadcaster: &'a TestBroadcaster, + logger: &'a TestLogger, fee_est: &'a TestFeeEstimator, persister: &'a TestPersister, + keys: &'a TestKeysInterface, + ) -> TestDeferredChainMonitor<'a> { + ChainMonitor::new( + Some(chain_source), + broadcaster, + logger, + fee_est, + persister, + keys, + keys.get_peer_storage_key(), + true, + ) + } + + /// Tests queueing and flushing of both `watch_channel` and `update_channel` operations + /// when `ChainMonitor` is in deferred mode, verifying that operations flow through to + /// `Persist` and that `channel_monitor_updated` is called on `Completed` status. + #[test] + fn test_queue_and_flush() { + let broadcaster = TestBroadcaster::new(Network::Testnet); + let fee_est = TestFeeEstimator::new(253); + let logger = TestLogger::new(); + let persister = TestPersister::new(); + let chain_source = TestChainSource::new(Network::Testnet); + let keys = TestKeysInterface::new(&[0; 32], Network::Testnet); + let deferred = create_deferred_chain_monitor( + &chain_source, + &broadcaster, + &logger, + &fee_est, + &persister, + &keys, + ); + + // Queue starts empty. + assert_eq!(deferred.pending_operation_count(), 0); + + // Queue a watch_channel, verifying InProgress status. + let chan = ChannelId::from_bytes([1u8; 32]); + let status = Watch::watch_channel(&deferred, chan, dummy_monitor(chan)); + assert_eq!(status, Ok(ChannelMonitorUpdateStatus::InProgress)); + assert_eq!(deferred.pending_operation_count(), 1); + + // Nothing persisted yet — operations are only queued. + assert!(persister.new_channel_persistences.lock().unwrap().is_empty()); + + // Queue two updates after the watch. Update IDs must be sequential (starting + // from 1 since the initial monitor has update_id 0). + assert_eq!( + Watch::update_channel(&deferred, chan, &dummy_update(1, chan)), + ChannelMonitorUpdateStatus::InProgress + ); + assert_eq!( + Watch::update_channel(&deferred, chan, &dummy_update(2, chan)), + ChannelMonitorUpdateStatus::InProgress + ); + assert_eq!(deferred.pending_operation_count(), 3); + + // Flush 2 of 3: persist_new_channel returns Completed (triggers + // channel_monitor_updated), update_persisted_channel returns InProgress (does not). + persister.set_update_ret(ChannelMonitorUpdateStatus::Completed); + persister.set_update_ret(ChannelMonitorUpdateStatus::InProgress); + deferred.flush(2, &&logger); + + assert_eq!(deferred.pending_operation_count(), 1); + + // persist_new_channel was called for the watch. + assert_eq!(persister.new_channel_persistences.lock().unwrap().len(), 1); + + // Because persist_new_channel returned Completed, channel_monitor_updated was called, + // so update_id 0 should no longer be pending. + let pending = deferred.list_pending_monitor_updates(); + let pending_for_chan = pending.get(&chan).unwrap(); + assert!(!pending_for_chan.contains(&0)); + + // update_persisted_channel was called for update_id 1, and because it returned + // InProgress, update_id 1 remains pending. + let monitor_name = deferred.get_monitor(chan).unwrap().persistence_key(); + assert!(persister + .offchain_monitor_updates + .lock() + .unwrap() + .get(&monitor_name) + .unwrap() + .contains(&1)); + assert!(pending_for_chan.contains(&1)); + + // Flush remaining: update_persisted_channel returns Completed (default), triggers + // channel_monitor_updated. + deferred.flush(1, &&logger); + assert_eq!(deferred.pending_operation_count(), 0); + + // update_persisted_channel was called for update_id 2. + assert!(persister + .offchain_monitor_updates + .lock() + .unwrap() + .get(&monitor_name) + .unwrap() + .contains(&2)); + + // update_id 1 is still pending from the InProgress earlier, but update_id 2 was + // completed in this flush so it is no longer pending. + let pending = deferred.list_pending_monitor_updates(); + let pending_for_chan = pending.get(&chan).unwrap(); + assert!(pending_for_chan.contains(&1)); + assert!(!pending_for_chan.contains(&2)); + + // Flushing an empty queue is a no-op. + let persist_count_before = persister.new_channel_persistences.lock().unwrap().len(); + deferred.flush(5, &&logger); + assert_eq!(persister.new_channel_persistences.lock().unwrap().len(), persist_count_before); + } + + /// Tests that `ChainMonitor` in deferred mode properly defers `watch_channel` and + /// `update_channel` operations until `flush()` is called, using standard test + /// infrastructure and a complete channel open + payment flow. + #[test] + fn test_deferred_monitor_payment() { + let chanmon_cfgs = create_chanmon_cfgs(2); + let node_cfgs = create_node_cfgs_deferred(2, &chanmon_cfgs); + let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]); + let nodes = create_network(2, &node_cfgs, &node_chanmgrs); + + let node_a_id = nodes[0].node.get_our_node_id(); + let node_b_id = nodes[1].node.get_our_node_id(); + let chain_monitor_a = &nodes[0].chain_monitor.chain_monitor; + let chain_monitor_b = &nodes[1].chain_monitor.chain_monitor; + + // ===== Open unannounced channel with manual funding to test deferred watch_channel ===== + assert_eq!(chain_monitor_a.pending_operation_count(), 0); + + let mut no_announce_cfg = nodes[0].node.get_current_config(); + no_announce_cfg.channel_handshake_config.announce_for_forwarding = false; + nodes[0] + .node + .create_channel(node_b_id, 100_000, 10_001, 42, None, Some(no_announce_cfg)) + .unwrap(); + let open_channel = get_event_msg!(nodes[0], MessageSendEvent::SendOpenChannel, node_b_id); + nodes[1].node.handle_open_channel(node_a_id, &open_channel); + let accept_channel = + get_event_msg!(nodes[1], MessageSendEvent::SendAcceptChannel, node_a_id); + nodes[0].node.handle_accept_channel(node_b_id, &accept_channel); + let (temporary_channel_id, tx, _funding_outpoint) = + create_funding_transaction(&nodes[0], &node_b_id, 100_000, 42); + + nodes[0] + .node + .funding_transaction_generated(temporary_channel_id, node_b_id, tx.clone()) + .unwrap(); + check_added_monitors(&nodes[0], 0); + + let funding_created = + get_event_msg!(nodes[0], MessageSendEvent::SendFundingCreated, node_b_id); + nodes[1].node.handle_funding_created(node_a_id, &funding_created); + check_added_monitors(&nodes[1], 1); + + // Node 1's watch_channel should be queued (deferred). + assert_eq!( + chain_monitor_b.pending_operation_count(), + 1, + "node 1 watch_channel should be queued" + ); + + // Flush so node 1's monitor is persisted and the ChannelManager can advance. + nodes[1].chain_monitor.flush_all(); + assert_eq!(chain_monitor_b.pending_operation_count(), 0); + + expect_channel_pending_event(&nodes[1], &node_a_id); + let funding_signed = + get_event_msg!(nodes[1], MessageSendEvent::SendFundingSigned, node_a_id); + nodes[0].node.handle_funding_signed(node_b_id, &funding_signed); + check_added_monitors(&nodes[0], 1); + + // Node 0's watch_channel should be queued (deferred). + assert_eq!( + chain_monitor_a.pending_operation_count(), + 1, + "node 0 watch_channel should be queued" + ); + assert!( + chain_monitor_a.list_monitors().is_empty(), + "Monitor should not be in ChainMonitor before flush" + ); + + // Flush node 0's watch_channel. + nodes[0].chain_monitor.flush_all(); + assert_eq!(chain_monitor_a.pending_operation_count(), 0); + assert_eq!( + chain_monitor_a.list_monitors().len(), + 1, + "Monitor should be in ChainMonitor after flush" + ); + + expect_channel_pending_event(&nodes[0], &node_b_id); + + // Drain any leftover messages (e.g. channel_ready, tx_broadcast). + nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().clear(); + let _ = nodes[0].node.get_and_clear_pending_msg_events(); + + // ===== Confirm funding and exchange channel_ready ===== + let conf_height = + core::cmp::max(nodes[0].best_block_info().1 + 1, nodes[1].best_block_info().1 + 1); + confirm_transaction_at(&nodes[0], &tx, conf_height); + connect_blocks(&nodes[0], CHAN_CONFIRM_DEPTH - 1); + confirm_transaction_at(&nodes[1], &tx, conf_height); + connect_blocks(&nodes[1], CHAN_CONFIRM_DEPTH - 1); + + let as_channel_ready = + get_event_msg!(nodes[0], MessageSendEvent::SendChannelReady, node_b_id); + let bs_channel_ready = + get_event_msg!(nodes[1], MessageSendEvent::SendChannelReady, node_a_id); + nodes[0].node.handle_channel_ready(node_b_id, &bs_channel_ready); + expect_channel_ready_event(&nodes[0], &node_b_id); + let _as_update = get_event_msg!(nodes[0], MessageSendEvent::SendChannelUpdate, node_b_id); + nodes[1].node.handle_channel_ready(node_a_id, &as_channel_ready); + expect_channel_ready_event(&nodes[1], &node_a_id); + let _bs_update = get_event_msg!(nodes[1], MessageSendEvent::SendChannelUpdate, node_a_id); + + assert!(!nodes[0].node.list_usable_channels().is_empty(), "Channel should be usable"); + + // ===== Send and claim a payment using standard helpers ===== + // The auto-flush in TestChainMonitor::release_pending_monitor_events ensures + // deferred update_channel operations are flushed before the ChannelManager + // processes monitor completion events. + let (preimage, _hash, ..) = route_payment(&nodes[0], &[&nodes[1]], 10_000); + claim_payment(&nodes[0], &[&nodes[1]], preimage); + + // Both monitors should still be present after the payment flow. + assert_eq!(chain_monitor_a.list_monitors().len(), 1); + assert_eq!(chain_monitor_b.list_monitors().len(), 1); + } } diff --git a/lightning/src/ln/chanmon_update_fail_tests.rs b/lightning/src/ln/chanmon_update_fail_tests.rs index 3fa2073d5ba..6036d493752 100644 --- a/lightning/src/ln/chanmon_update_fail_tests.rs +++ b/lightning/src/ln/chanmon_update_fail_tests.rs @@ -4916,6 +4916,7 @@ fn native_async_persist() { native_async_persister, Arc::clone(&keys_manager), keys_manager.get_peer_storage_key(), + false, ); // Write the initial ChannelMonitor async, testing primarily that the `MonitorEvent::Completed` diff --git a/lightning/src/ln/channelmanager.rs b/lightning/src/ln/channelmanager.rs index bbede9589db..1ee4aee7ff3 100644 --- a/lightning/src/ln/channelmanager.rs +++ b/lightning/src/ln/channelmanager.rs @@ -20600,7 +20600,7 @@ pub mod bench { let seed_a = [1u8; 32]; let keys_manager_a = KeysManager::new(&seed_a, 42, 42, true); - let chain_monitor_a = ChainMonitor::new(None, &tx_broadcaster, &logger_a, &fee_estimator, &persister_a, &keys_manager_a, keys_manager_a.get_peer_storage_key()); + let chain_monitor_a = ChainMonitor::new(None, &tx_broadcaster, &logger_a, &fee_estimator, &persister_a, &keys_manager_a, keys_manager_a.get_peer_storage_key(), false); let node_a = ChannelManager::new(&fee_estimator, &chain_monitor_a, &tx_broadcaster, &router, &message_router, &logger_a, &keys_manager_a, &keys_manager_a, &keys_manager_a, config.clone(), ChainParameters { network, best_block: BestBlock::from_network(network), @@ -20610,7 +20610,7 @@ pub mod bench { let logger_b = test_utils::TestLogger::with_id("node a".to_owned()); let seed_b = [2u8; 32]; let keys_manager_b = KeysManager::new(&seed_b, 42, 42, true); - let chain_monitor_b = ChainMonitor::new(None, &tx_broadcaster, &logger_a, &fee_estimator, &persister_b, &keys_manager_b, keys_manager_b.get_peer_storage_key()); + let chain_monitor_b = ChainMonitor::new(None, &tx_broadcaster, &logger_a, &fee_estimator, &persister_b, &keys_manager_b, keys_manager_b.get_peer_storage_key(), false); let node_b = ChannelManager::new(&fee_estimator, &chain_monitor_b, &tx_broadcaster, &router, &message_router, &logger_b, &keys_manager_b, &keys_manager_b, &keys_manager_b, config.clone(), ChainParameters { network, best_block: BestBlock::from_network(network), diff --git a/lightning/src/ln/functional_test_utils.rs b/lightning/src/ln/functional_test_utils.rs index e8965752331..3645de5d86f 100644 --- a/lightning/src/ln/functional_test_utils.rs +++ b/lightning/src/ln/functional_test_utils.rs @@ -4447,6 +4447,26 @@ fn create_node_cfgs_internal<'a, F>( node_count: usize, chanmon_cfgs: &'a Vec, persisters: Vec<&'a impl test_utils::SyncPersist>, message_router_constructor: F, ) -> Vec> +where + F: Fn( + Arc>, + &'a TestKeysInterface, + ) -> test_utils::TestMessageRouter<'a>, +{ + create_node_cfgs_internal_deferred( + node_count, + chanmon_cfgs, + persisters, + message_router_constructor, + false, + ) +} + +fn create_node_cfgs_internal_deferred<'a, F>( + node_count: usize, chanmon_cfgs: &'a Vec, + persisters: Vec<&'a impl test_utils::SyncPersist>, message_router_constructor: F, + deferred: bool, +) -> Vec> where F: Fn( Arc>, @@ -4458,14 +4478,25 @@ where for i in 0..node_count { let cfg = &chanmon_cfgs[i]; let network_graph = Arc::new(NetworkGraph::new(Network::Testnet, &cfg.logger)); - let chain_monitor = test_utils::TestChainMonitor::new( - Some(&cfg.chain_source), - &cfg.tx_broadcaster, - &cfg.logger, - &cfg.fee_estimator, - persisters[i], - &cfg.keys_manager, - ); + let chain_monitor = if deferred { + test_utils::TestChainMonitor::new_deferred( + Some(&cfg.chain_source), + &cfg.tx_broadcaster, + &cfg.logger, + &cfg.fee_estimator, + persisters[i], + &cfg.keys_manager, + ) + } else { + test_utils::TestChainMonitor::new( + Some(&cfg.chain_source), + &cfg.tx_broadcaster, + &cfg.logger, + &cfg.fee_estimator, + persisters[i], + &cfg.keys_manager, + ) + }; let seed = [i as u8; 32]; nodes.push(NodeCfg { @@ -4505,6 +4536,19 @@ pub fn create_node_cfgs<'a>( ) } +pub fn create_node_cfgs_deferred<'a>( + node_count: usize, chanmon_cfgs: &'a Vec, +) -> Vec> { + let persisters = chanmon_cfgs.iter().map(|c| &c.persister).collect(); + create_node_cfgs_internal_deferred( + node_count, + chanmon_cfgs, + persisters, + test_utils::TestMessageRouter::new_default, + true, + ) +} + pub fn create_node_cfgs_with_persisters<'a>( node_count: usize, chanmon_cfgs: &'a Vec, persisters: Vec<&'a impl test_utils::SyncPersist>, diff --git a/lightning/src/util/test_utils.rs b/lightning/src/util/test_utils.rs index 34f5d5fe36e..c993c0c4978 100644 --- a/lightning/src/util/test_utils.rs +++ b/lightning/src/util/test_utils.rs @@ -509,6 +509,7 @@ pub struct TestChainMonitor<'a> { &'a TestKeysInterface, >, pub keys_manager: &'a TestKeysInterface, + pub logger: &'a TestLogger, /// If this is set to Some(), the next update_channel call (not watch_channel) must be a /// ChannelForceClosed event for the given channel_id with should_broadcast set to the given /// boolean. @@ -524,6 +525,38 @@ impl<'a> TestChainMonitor<'a> { chain_source: Option<&'a TestChainSource>, broadcaster: &'a dyn SyncBroadcaster, logger: &'a TestLogger, fee_estimator: &'a TestFeeEstimator, persister: &'a dyn SyncPersist, keys_manager: &'a TestKeysInterface, + ) -> Self { + Self::with_deferred( + chain_source, + broadcaster, + logger, + fee_estimator, + persister, + keys_manager, + false, + ) + } + + pub fn new_deferred( + chain_source: Option<&'a TestChainSource>, broadcaster: &'a dyn SyncBroadcaster, + logger: &'a TestLogger, fee_estimator: &'a TestFeeEstimator, + persister: &'a dyn SyncPersist, keys_manager: &'a TestKeysInterface, + ) -> Self { + Self::with_deferred( + chain_source, + broadcaster, + logger, + fee_estimator, + persister, + keys_manager, + true, + ) + } + + fn with_deferred( + chain_source: Option<&'a TestChainSource>, broadcaster: &'a dyn SyncBroadcaster, + logger: &'a TestLogger, fee_estimator: &'a TestFeeEstimator, + persister: &'a dyn SyncPersist, keys_manager: &'a TestKeysInterface, deferred: bool, ) -> Self { Self { added_monitors: Mutex::new(Vec::new()), @@ -537,8 +570,10 @@ impl<'a> TestChainMonitor<'a> { persister, keys_manager, keys_manager.get_peer_storage_key(), + deferred, ), keys_manager, + logger, expect_channel_force_closed: Mutex::new(None), expect_monitor_round_trip_fail: Mutex::new(None), #[cfg(feature = "std")] @@ -546,6 +581,15 @@ impl<'a> TestChainMonitor<'a> { } } + pub fn pending_operation_count(&self) -> usize { + self.chain_monitor.pending_operation_count() + } + + pub fn flush_all(&self) { + let count = self.pending_operation_count(); + self.chain_monitor.flush(count, &self.logger); + } + pub fn complete_sole_pending_chan_update(&self, channel_id: &ChannelId) { let (_, latest_update) = self.latest_monitor_update_id.lock().unwrap().get(channel_id).unwrap().clone(); @@ -671,6 +715,10 @@ impl<'a> chain::Watch for TestChainMonitor<'a> { fn release_pending_monitor_events( &self, ) -> Vec<(OutPoint, ChannelId, Vec, PublicKey)> { + // Auto-flush pending operations so that the ChannelManager can pick up monitor + // completion events. This is a no-op when not in deferred mode. It ensures standard + // test helpers (route_payment, etc.) work with deferred chain monitors. + self.flush_all(); return self.chain_monitor.release_pending_monitor_events(); } } @@ -830,6 +878,8 @@ pub struct TestPersister { /// The queue of update statuses we'll return. If none are queued, ::Completed will always be /// returned. pub update_rets: Mutex>, + /// When we get a persist_new_channel call, we push the monitor name here. + pub new_channel_persistences: Mutex>, /// When we get an update_persisted_channel call *with* a ChannelMonitorUpdate, we insert the /// [`ChannelMonitor::get_latest_update_id`] here. pub offchain_monitor_updates: Mutex>>, @@ -840,9 +890,15 @@ pub struct TestPersister { impl TestPersister { pub fn new() -> Self { let update_rets = Mutex::new(VecDeque::new()); + let new_channel_persistences = Mutex::new(Vec::new()); let offchain_monitor_updates = Mutex::new(new_hash_map()); let chain_sync_monitor_persistences = Mutex::new(VecDeque::new()); - Self { update_rets, offchain_monitor_updates, chain_sync_monitor_persistences } + Self { + update_rets, + new_channel_persistences, + offchain_monitor_updates, + chain_sync_monitor_persistences, + } } /// Queue an update status to return. @@ -852,8 +908,9 @@ impl TestPersister { } impl Persist for TestPersister { fn persist_new_channel( - &self, _monitor_name: MonitorName, _data: &ChannelMonitor, + &self, monitor_name: MonitorName, _data: &ChannelMonitor, ) -> chain::ChannelMonitorUpdateStatus { + self.new_channel_persistences.lock().unwrap().push(monitor_name); if let Some(update_ret) = self.update_rets.lock().unwrap().pop_front() { return update_ret; }