Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 91 additions & 9 deletions lightning-background-processor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ use alloc::vec::Vec;
/// * Monitoring whether the [`ChannelManager`] needs to be re-persisted to disk, and if so,
/// writing it to disk/backups by invoking the callback given to it at startup.
/// [`ChannelManager`] persistence should be done in the background.
/// * Calling [`ChannelManager::timer_tick_occurred`], [`ChainMonitor::rebroadcast_pending_claims`]
/// * Calling [`ChannelManager::timer_tick_occurred`], [`lightning::chain::chainmonitor::ChainMonitor::rebroadcast_pending_claims`]
/// and [`PeerManager::timer_tick_occurred`] at the appropriate intervals.
/// * Calling [`NetworkGraph::remove_stale_channels_and_tracking`] (if a [`GossipSync`] with a
/// [`NetworkGraph`] is provided to [`BackgroundProcessor::start`]).
Expand Down Expand Up @@ -824,7 +824,7 @@ use futures_util::{dummy_waker, Joiner, OptionalSelector, Selector, SelectorOutp
/// # fn send_data(&mut self, _data: &[u8], _continue_read: bool) -> usize { 0 }
/// # fn disconnect_socket(&mut self) {}
/// # }
/// # type ChainMonitor<B, F, FE> = lightning::chain::chainmonitor::ChainMonitor<lightning::sign::InMemorySigner, Arc<F>, Arc<B>, Arc<FE>, Arc<Logger>, Arc<StoreSync>, Arc<lightning::sign::KeysManager>>;
/// # type ChainMonitor<B, F, FE> = lightning::chain::deferred::DeferredChainMonitor<lightning::sign::InMemorySigner, Arc<F>, Arc<B>, Arc<FE>, Arc<Logger>, Arc<StoreSync>, Arc<lightning::sign::KeysManager>>;
/// # type NetworkGraph = lightning::routing::gossip::NetworkGraph<Arc<Logger>>;
/// # type P2PGossipSync<UL> = lightning::routing::gossip::P2PGossipSync<Arc<NetworkGraph>, Arc<UL>, Arc<Logger>>;
/// # type ChannelManager<B, F, FE> = lightning::ln::channelmanager::SimpleArcChannelManager<ChainMonitor<B, F, FE>, B, FE, Logger>;
Expand Down Expand Up @@ -963,7 +963,9 @@ pub async fn process_events_async<
sleeper: Sleeper, mobile_interruptable_platform: bool, fetch_time: FetchTime,
) -> Result<(), lightning::io::Error>
where
M::Target: AChainMonitor<Signer = <CM::Target as AChannelManager>::Signer, Logger = L>,
M::Target: AChainMonitor<Signer = <CM::Target as AChannelManager>::Signer, Logger = L>
+ chain::Watch<<CM::Target as AChannelManager>::Signer>
+ lightning::events::EventsProvider,
CM::Target: AChannelManager,
OM::Target: AOnionMessenger,
PM::Target: APeerManager,
Expand Down Expand Up @@ -1120,6 +1122,11 @@ where

let mut futures = Joiner::new();

// Capture the number of pending monitor writes before persisting the channel manager.
// We'll only flush this many writes after the manager is persisted, to avoid flushing
// monitor updates that arrived after the manager state was captured.
let pending_monitor_writes = chain_monitor.pending_operation_count();

if channel_manager.get_cm().get_and_clear_needs_persistence() {
log_trace!(logger, "Persisting ChannelManager...");

Expand Down Expand Up @@ -1317,6 +1324,15 @@ where
res?;
}

// Flush the monitor writes that were pending before we persisted the channel manager.
// Any writes that arrived after are left in the queue for the next iteration. There's
// no need to "chase the tail" by processing new updates that arrive during flushing -
// they'll be handled in the next round.
if pending_monitor_writes > 0 {
log_trace!(logger, "Flushing {} monitor writes", pending_monitor_writes);
chain_monitor.flush(pending_monitor_writes, &logger);
}

match check_and_reset_sleeper(&mut last_onion_message_handler_call, || {
sleeper(ONION_MESSAGE_HANDLER_TIMER)
}) {
Expand Down Expand Up @@ -1381,6 +1397,14 @@ where
channel_manager.get_cm().encode(),
)
.await?;

// Flush all pending monitor writes after final channel manager persistence.
let pending_monitor_writes = chain_monitor.pending_operation_count();
if pending_monitor_writes > 0 {
log_trace!(logger, "Flushing {} monitor writes on shutdown", pending_monitor_writes);
chain_monitor.flush(pending_monitor_writes, &logger);
}

if let Some(ref scorer) = scorer {
kv_store
.write(
Expand Down Expand Up @@ -1461,7 +1485,9 @@ pub async fn process_events_async_with_kv_store_sync<
sleeper: Sleeper, mobile_interruptable_platform: bool, fetch_time: FetchTime,
) -> Result<(), lightning::io::Error>
where
M::Target: AChainMonitor<Signer = <CM::Target as AChannelManager>::Signer, Logger = L>,
M::Target: AChainMonitor<Signer = <CM::Target as AChannelManager>::Signer, Logger = L>
+ chain::Watch<<CM::Target as AChannelManager>::Signer>
+ lightning::events::EventsProvider,
CM::Target: AChannelManager,
OM::Target: AOnionMessenger,
PM::Target: APeerManager,
Expand Down Expand Up @@ -1570,8 +1596,11 @@ impl BackgroundProcessor {
liquidity_manager: Option<LM>, sweeper: Option<OS>, logger: L, scorer: Option<S>,
) -> Self
where
L::Target: 'static + Logger,
M::Target: AChainMonitor<Signer = <CM::Target as AChannelManager>::Signer, Logger = L>,
L::Target: 'static + Logger + Sized,
M::Target: 'static
+ AChainMonitor<Signer = <CM::Target as AChannelManager>::Signer, Logger = L>
+ chain::Watch<<CM::Target as AChannelManager>::Signer>
+ lightning::events::EventsProvider,
CM::Target: AChannelManager,
OM::Target: AOnionMessenger,
PM::Target: APeerManager,
Expand Down Expand Up @@ -1684,6 +1713,10 @@ impl BackgroundProcessor {
channel_manager.get_cm().timer_tick_occurred();
last_freshness_call = Instant::now();
}

// Capture the number of pending monitor writes before persisting the channel manager.
let pending_monitor_writes = chain_monitor.pending_operation_count();

if channel_manager.get_cm().get_and_clear_needs_persistence() {
log_trace!(logger, "Persisting ChannelManager...");
(kv_store.write(
Expand All @@ -1695,6 +1728,14 @@ impl BackgroundProcessor {
log_trace!(logger, "Done persisting ChannelManager.");
}

// Flush the monitor writes that were pending before we persisted the channel manager.
// There's no need to "chase the tail" by processing new updates that arrive during
// flushing - they'll be handled in the next round.
if pending_monitor_writes > 0 {
log_trace!(logger, "Flushing {} monitor writes", pending_monitor_writes);
chain_monitor.flush(pending_monitor_writes, &logger);
}

if let Some(liquidity_manager) = liquidity_manager.as_ref() {
log_trace!(logger, "Persisting LiquidityManager...");
let _ = liquidity_manager.get_lm().persist().map_err(|e| {
Expand Down Expand Up @@ -1815,6 +1856,18 @@ impl BackgroundProcessor {
CHANNEL_MANAGER_PERSISTENCE_KEY,
channel_manager.get_cm().encode(),
)?;

// Flush all pending monitor writes after final channel manager persistence.
let pending_monitor_writes = chain_monitor.pending_operation_count();
if pending_monitor_writes > 0 {
log_trace!(
logger,
"Flushing {} monitor writes on shutdown",
pending_monitor_writes
);
chain_monitor.flush(pending_monitor_writes, &logger);
}

if let Some(ref scorer) = scorer {
kv_store.write(
SCORER_PERSISTENCE_PRIMARY_NAMESPACE,
Expand Down Expand Up @@ -1896,9 +1949,10 @@ mod tests {
use bitcoin::transaction::{Transaction, TxOut};
use bitcoin::{Amount, ScriptBuf, Txid};
use core::sync::atomic::{AtomicBool, Ordering};
use lightning::chain::chainmonitor::AChainMonitor;
use lightning::chain::channelmonitor::ANTI_REORG_DELAY;
use lightning::chain::transaction::OutPoint;
use lightning::chain::{chainmonitor, BestBlock, Confirm, Filter};
use lightning::chain::{deferred, BestBlock, Confirm, Filter};
use lightning::events::{Event, PathFailure, ReplayEvent};
use lightning::ln::channelmanager;
use lightning::ln::channelmanager::{
Expand Down Expand Up @@ -2008,7 +2062,7 @@ mod tests {
Arc<test_utils::TestLogger>,
>;

type ChainMonitor = chainmonitor::ChainMonitor<
type ChainMonitor = deferred::DeferredChainMonitor<
InMemorySigner,
Arc<test_utils::TestChainSource>,
Arc<test_utils::TestBroadcaster>,
Expand Down Expand Up @@ -2436,7 +2490,7 @@ mod tests {
let now = Duration::from_secs(genesis_block.header.time as u64);
let keys_manager =
Arc::new(KeysManager::new(&seed, now.as_secs(), now.subsec_nanos(), true));
let chain_monitor = Arc::new(chainmonitor::ChainMonitor::new(
let chain_monitor = Arc::new(deferred::DeferredChainMonitor::new(
Some(Arc::clone(&chain_source)),
Arc::clone(&tx_broadcaster),
Arc::clone(&logger),
Expand Down Expand Up @@ -2580,19 +2634,31 @@ mod tests {
tx.clone(),
)
.unwrap();
// Flush deferred monitor operations so messages aren't held back
$node_a
.chain_monitor
.flush($node_a.chain_monitor.pending_operation_count(), &$node_a.logger);
let msg_a = get_event_msg!(
$node_a,
MessageSendEvent::SendFundingCreated,
$node_b.node.get_our_node_id()
);
$node_b.node.handle_funding_created($node_a.node.get_our_node_id(), &msg_a);
// Flush node_b's monitor so it releases the FundingSigned message
$node_b
.chain_monitor
.flush($node_b.chain_monitor.pending_operation_count(), &$node_b.logger);
get_event!($node_b, Event::ChannelPending);
let msg_b = get_event_msg!(
$node_b,
MessageSendEvent::SendFundingSigned,
$node_a.node.get_our_node_id()
);
$node_a.node.handle_funding_signed($node_b.node.get_our_node_id(), &msg_b);
// Flush node_a's monitor for the final update
$node_a
.chain_monitor
.flush($node_a.chain_monitor.pending_operation_count(), &$node_a.logger);
get_event!($node_a, Event::ChannelPending);
tx
}};
Expand Down Expand Up @@ -3039,11 +3105,23 @@ mod tests {
.node
.funding_transaction_generated(temporary_channel_id, node_1_id, funding_tx.clone())
.unwrap();
// Flush node_0's deferred monitor operations so the FundingCreated message is released
nodes[0]
.chain_monitor
.flush(nodes[0].chain_monitor.pending_operation_count(), &nodes[0].logger);
let msg_0 = get_event_msg!(nodes[0], MessageSendEvent::SendFundingCreated, node_1_id);
nodes[1].node.handle_funding_created(node_0_id, &msg_0);
// Flush node_1's deferred monitor operations so events and FundingSigned are released
nodes[1]
.chain_monitor
.flush(nodes[1].chain_monitor.pending_operation_count(), &nodes[1].logger);
get_event!(nodes[1], Event::ChannelPending);
let msg_1 = get_event_msg!(nodes[1], MessageSendEvent::SendFundingSigned, node_0_id);
nodes[0].node.handle_funding_signed(node_1_id, &msg_1);
// Flush node_0's monitor for the funding_signed update
nodes[0]
.chain_monitor
.flush(nodes[0].chain_monitor.pending_operation_count(), &nodes[0].logger);
channel_pending_recv
.recv_timeout(EVENT_DEADLINE)
.expect("ChannelPending not handled within deadline");
Expand Down Expand Up @@ -3104,6 +3182,10 @@ mod tests {
error_message.to_string(),
)
.unwrap();
// Flush the monitor update triggered by force close so the commitment tx is broadcasted
nodes[0]
.chain_monitor
.flush(nodes[0].chain_monitor.pending_operation_count(), &nodes[0].logger);
let commitment_tx = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().pop().unwrap();
confirm_transaction_depth(&mut nodes[0], &commitment_tx, BREAKDOWN_TIMEOUT as u32);

Expand Down
31 changes: 30 additions & 1 deletion lightning/src/chain/chainmonitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1488,10 +1488,12 @@ where
}
}

/// A trivial trait which describes any [`ChainMonitor`].
/// A trivial trait which describes any [`ChainMonitor`] or [`DeferredChainMonitor`].
///
/// This is not exported to bindings users as general cover traits aren't useful in other
/// languages.
///
/// [`DeferredChainMonitor`]: crate::chain::deferred::DeferredChainMonitor
pub trait AChainMonitor {
/// A type implementing [`EcdsaChannelSigner`].
type Signer: EcdsaChannelSigner + Sized;
Expand Down Expand Up @@ -1521,6 +1523,24 @@ pub trait AChainMonitor {
Self::Persister,
Self::EntropySource,
>;

/// Returns the number of pending monitor operations queued for later execution.
///
/// For monitors that process operations immediately (like [`ChainMonitor`]), this
/// always returns 0.
fn pending_operation_count(&self) -> usize;

/// Flushes pending monitor operations.
///
/// # Arguments
///
/// * `count` - The maximum number of operations to flush. If `count` is greater than
/// the number of pending operations, all pending operations are flushed.
/// * `logger` - Logger for error messages during flush operations.
///
/// For monitors that process operations immediately (like [`ChainMonitor`]), this
/// is a no-op.
fn flush(&self, count: usize, logger: &Self::Logger);
}

impl<
Expand All @@ -1546,6 +1566,15 @@ where
fn get_cm(&self) -> &ChainMonitor<ChannelSigner, C, T, F, L, P, ES> {
self
}

fn pending_operation_count(&self) -> usize {
// ChainMonitor processes operations immediately, so there are never any pending.
0
}

fn flush(&self, _count: usize, _logger: &L) {
// No-op: ChainMonitor processes operations immediately.
}
}

#[cfg(test)]
Expand Down
Loading
Loading