From 6190ffba8a0546e2b50ab13be2e80cd813a2a1ed Mon Sep 17 00:00:00 2001 From: benthecarman Date: Mon, 26 Jan 2026 08:31:17 -0600 Subject: [PATCH 1/3] Add PaginatedKVStore traits upstreamed from ldk-server Allows for a paginated KV store for more efficient listing of keys so you don't need to fetch all at once. Uses monotonic counter or timestamp to track the order of keys and allow for pagination. The traits are largely just copy-pasted from ldk-server. Adds some basic tests that were generated using claude code. --- lightning/src/util/persist.rs | 265 ++++++++++++++++++++++++++++++- lightning/src/util/test_utils.rs | 119 +++++++++++++- 2 files changed, 382 insertions(+), 2 deletions(-) diff --git a/lightning/src/util/persist.rs b/lightning/src/util/persist.rs index 440d1d31331..12a0e4859c2 100644 --- a/lightning/src/util/persist.rs +++ b/lightning/src/util/persist.rs @@ -374,6 +374,195 @@ where } } +/// An opaque token used for paginated listing operations. +/// +/// This token should be treated as an opaque value by callers. Pass the token returned from +/// one `list_paginated` call to the next call to continue pagination. The internal format +/// is implementation-defined and may change between versions. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct PageToken(pub String); + +/// Represents the response from a paginated `list` operation. +/// +/// Contains the list of keys and a token for retrieving the next page of results. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct PaginatedListResponse { + /// A vector of keys, ordered from most recently created to least recently created. + pub keys: Vec, + + /// A token that can be passed to the next call to continue pagination. + /// + /// Is `None` if there are no more pages to retrieve. + pub next_page_token: Option, +} + +/// Provides an interface that allows storage and retrieval of persisted values that are associated +/// with given keys, with support for pagination. +/// +/// In order to avoid collisions, the key space is segmented based on the given `primary_namespace`s +/// and `secondary_namespace`s. Implementations of this trait are free to handle them in different +/// ways, as long as per-namespace key uniqueness is asserted. +/// +/// Keys and namespaces are required to be valid ASCII strings in the range of +/// [`KVSTORE_NAMESPACE_KEY_ALPHABET`] and no longer than [`KVSTORE_NAMESPACE_KEY_MAX_LEN`]. Empty +/// primary namespaces and secondary namespaces (`""`) are considered valid; however, if +/// `primary_namespace` is empty, `secondary_namespace` must also be empty. This means that concerns +/// should always be separated by primary namespace first, before secondary namespaces are used. +/// While the number of primary namespaces will be relatively small and determined at compile time, +/// there may be many secondary namespaces per primary namespace. Note that per-namespace uniqueness +/// needs to also hold for keys *and* namespaces in any given namespace, i.e., conflicts between keys +/// and equally named primary or secondary namespaces must be avoided. +/// +/// **Note:** This trait extends the functionality of [`KVStoreSync`] by adding support for +/// paginated listing of keys in creation order. This is useful when dealing with a large number +/// of keys that cannot be efficiently retrieved all at once. +/// +/// For an asynchronous version of this trait, see [`PaginatedKVStore`]. +pub trait PaginatedKVStoreSync: KVStoreSync { + /// Returns a paginated list of keys that are stored under the given `secondary_namespace` in + /// `primary_namespace`, ordered from most recently created to least recently created. + /// + /// Implementations must return keys in reverse creation order (newest first). How creation + /// order is tracked is implementation-defined (e.g., storing creation timestamps, using an + /// incrementing ID, or another mechanism). Creation order (not last-updated order) is used + /// to prevent race conditions during pagination: if keys were ordered by update time, a key + /// updated mid-pagination could shift position, causing it to be skipped or returned twice + /// across pages. + /// + /// If `page_token` is provided, listing continues from where the previous page left off. + /// If `None`, listing starts from the most recently created entry. The `next_page_token` + /// in the returned [`PaginatedListResponse`] can be passed to subsequent calls to fetch + /// the next page. + /// + /// Implementations must generate a [`PageToken`] that encodes enough information to resume + /// listing from the correct position. The token should encode the creation timestamp (or + /// sequence number) and key name of the last returned entry. Tokens must remain valid across + /// multiple calls within a reasonable timeframe. If the entry referenced by a token has been + /// deleted, implementations should resume from the next valid position rather than failing. + /// Tokens are scoped to a specific `(primary_namespace, secondary_namespace)` pair and should + /// not be used across different namespace pairs. + /// + /// Returns an empty list if `primary_namespace` or `secondary_namespace` is unknown or if + /// there are no more keys to return. + fn list_paginated( + &self, primary_namespace: &str, secondary_namespace: &str, page_token: Option, + ) -> Result; +} + +/// A wrapper around a [`PaginatedKVStoreSync`] that implements the [`PaginatedKVStore`] trait. +/// It is not necessary to use this type directly. +#[derive(Clone)] +pub struct PaginatedKVStoreSyncWrapper(pub K) +where + K::Target: PaginatedKVStoreSync; + +/// This is not exported to bindings users as async is only supported in Rust. +impl KVStore for PaginatedKVStoreSyncWrapper +where + K::Target: PaginatedKVStoreSync, +{ + fn read( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, + ) -> impl Future, io::Error>> + 'static + MaybeSend { + let res = self.0.read(primary_namespace, secondary_namespace, key); + + async move { res } + } + + fn write( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec, + ) -> impl Future> + 'static + MaybeSend { + let res = self.0.write(primary_namespace, secondary_namespace, key, buf); + + async move { res } + } + + fn remove( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool, + ) -> impl Future> + 'static + MaybeSend { + let res = self.0.remove(primary_namespace, secondary_namespace, key, lazy); + + async move { res } + } + + fn list( + &self, primary_namespace: &str, secondary_namespace: &str, + ) -> impl Future, io::Error>> + 'static + MaybeSend { + let res = self.0.list(primary_namespace, secondary_namespace); + + async move { res } + } +} + +/// This is not exported to bindings users as async is only supported in Rust. +impl PaginatedKVStore for PaginatedKVStoreSyncWrapper +where + K::Target: PaginatedKVStoreSync, +{ + fn list_paginated( + &self, primary_namespace: &str, secondary_namespace: &str, page_token: Option, + ) -> impl Future> + 'static + MaybeSend { + let res = self.0.list_paginated(primary_namespace, secondary_namespace, page_token); + + async move { res } + } +} + +/// Provides an interface that allows storage and retrieval of persisted values that are associated +/// with given keys, with support for pagination. +/// +/// In order to avoid collisions, the key space is segmented based on the given `primary_namespace`s +/// and `secondary_namespace`s. Implementations of this trait are free to handle them in different +/// ways, as long as per-namespace key uniqueness is asserted. +/// +/// Keys and namespaces are required to be valid ASCII strings in the range of +/// [`KVSTORE_NAMESPACE_KEY_ALPHABET`] and no longer than [`KVSTORE_NAMESPACE_KEY_MAX_LEN`]. Empty +/// primary namespaces and secondary namespaces (`""`) are considered valid; however, if +/// `primary_namespace` is empty, `secondary_namespace` must also be empty. This means that concerns +/// should always be separated by primary namespace first, before secondary namespaces are used. +/// While the number of primary namespaces will be relatively small and determined at compile time, +/// there may be many secondary namespaces per primary namespace. Note that per-namespace uniqueness +/// needs to also hold for keys *and* namespaces in any given namespace, i.e., conflicts between keys +/// and equally named primary or secondary namespaces must be avoided. +/// +/// **Note:** This trait extends the functionality of [`KVStore`] by adding support for +/// paginated listing of keys in creation order. This is useful when dealing with a large number +/// of keys that cannot be efficiently retrieved all at once. +/// +/// For a synchronous version of this trait, see [`PaginatedKVStoreSync`]. +/// +/// This is not exported to bindings users as async is only supported in Rust. +pub trait PaginatedKVStore: KVStore { + /// Returns a paginated list of keys that are stored under the given `secondary_namespace` in + /// `primary_namespace`, ordered from most recently created to least recently created. + /// + /// Implementations must return keys in reverse creation order (newest first). How creation + /// order is tracked is implementation-defined (e.g., storing creation timestamps, using an + /// incrementing ID, or another mechanism). Creation order (not last-updated order) is used + /// to prevent race conditions during pagination: if keys were ordered by update time, a key + /// updated mid-pagination could shift position, causing it to be skipped or returned twice + /// across pages. + /// + /// If `page_token` is provided, listing continues from where the previous page left off. + /// If `None`, listing starts from the most recently created entry. The `next_page_token` + /// in the returned [`PaginatedListResponse`] can be passed to subsequent calls to fetch + /// the next page. + /// + /// Implementations must generate a [`PageToken`] that encodes enough information to resume + /// listing from the correct position. The token should encode the creation timestamp (or + /// sequence number) and key name of the last returned entry. Tokens must remain valid across + /// multiple calls within a reasonable timeframe. If the entry referenced by a token has been + /// deleted, implementations should resume from the next valid position rather than failing. + /// Tokens are scoped to a specific `(primary_namespace, secondary_namespace)` pair and should + /// not be used across different namespace pairs. + /// + /// Returns an empty list if `primary_namespace` or `secondary_namespace` is unknown or if + /// there are no more keys to return. + fn list_paginated( + &self, primary_namespace: &str, secondary_namespace: &str, page_token: Option, + ) -> impl Future> + 'static + MaybeSend; +} + /// Provides additional interface methods that are required for [`KVStore`]-to-[`KVStore`] /// data migration. pub trait MigratableKVStore: KVStoreSync { @@ -1546,7 +1735,7 @@ mod tests { use crate::ln::msgs::BaseMessageHandler; use crate::sync::Arc; use crate::util::test_channel_signer::TestChannelSigner; - use crate::util::test_utils::{self, TestStore}; + use crate::util::test_utils::{self, TestPaginatedStore, TestStore}; use bitcoin::hashes::hex::FromHex; use core::cmp; @@ -1956,4 +2145,78 @@ mod tests { let store: Arc = Arc::new(TestStore::new(false)); assert!(persist_fn::<_, TestChannelSigner>(Arc::clone(&store))); } + + #[test] + fn paginated_store_basic_operations() { + let store = TestPaginatedStore::new(10); + + // Write some data + store.write("ns1", "ns2", "key1", vec![1, 2, 3]).unwrap(); + store.write("ns1", "ns2", "key2", vec![4, 5, 6]).unwrap(); + + // Read it back + assert_eq!(KVStoreSync::read(&store, "ns1", "ns2", "key1").unwrap(), vec![1, 2, 3]); + assert_eq!(KVStoreSync::read(&store, "ns1", "ns2", "key2").unwrap(), vec![4, 5, 6]); + + // List should return keys in descending order + let response = store.list_paginated("ns1", "ns2", None).unwrap(); + assert_eq!(response.keys, vec!["key2", "key1"]); + assert!(response.next_page_token.is_none()); + + // Remove a key + KVStoreSync::remove(&store, "ns1", "ns2", "key1", false).unwrap(); + assert!(KVStoreSync::read(&store, "ns1", "ns2", "key1").is_err()); + } + + #[test] + fn paginated_store_pagination() { + let store = TestPaginatedStore::new(2); + + // Write 5 items with different order values + for i in 0..5i64 { + store.write("ns", "", &format!("key{i}"), vec![i as u8]).unwrap(); + } + + // First page should have 2 items (most recently created first: key4, key3) + let page1 = store.list_paginated("ns", "", None).unwrap(); + assert_eq!(page1.keys.len(), 2); + assert_eq!(page1.keys, vec!["key4", "key3"]); + assert!(page1.next_page_token.is_some()); + + // Second page + let page2 = store.list_paginated("ns", "", page1.next_page_token).unwrap(); + assert_eq!(page2.keys.len(), 2); + assert_eq!(page2.keys, vec!["key2", "key1"]); + assert!(page2.next_page_token.is_some()); + + // Third page (last item) + let page3 = store.list_paginated("ns", "", page2.next_page_token).unwrap(); + assert_eq!(page3.keys.len(), 1); + assert_eq!(page3.keys, vec!["key0"]); + assert!(page3.next_page_token.is_none()); + } + + #[test] + fn paginated_store_update_preserves_order() { + let store = TestPaginatedStore::new(10); + + // Write items with specific order values + store.write("ns", "", "key1", vec![1]).unwrap(); + store.write("ns", "", "key2", vec![2]).unwrap(); + store.write("ns", "", "key3", vec![3]).unwrap(); + + // Verify initial order (newest first) + let response = store.list_paginated("ns", "", None).unwrap(); + assert_eq!(response.keys, vec!["key3", "key2", "key1"]); + + // Update key1 with a new order value that would put it first if used + store.write("ns", "", "key1", vec![1, 1]).unwrap(); + + // Verify data was updated + assert_eq!(KVStoreSync::read(&store, "ns", "", "key1").unwrap(), vec![1, 1]); + + // Verify order is unchanged - creation order should have been preserved + let response = store.list_paginated("ns", "", None).unwrap(); + assert_eq!(response.keys, vec!["key3", "key2", "key1"]); + } } diff --git a/lightning/src/util/test_utils.rs b/lightning/src/util/test_utils.rs index 34f5d5fe36e..de04054861f 100644 --- a/lightning/src/util/test_utils.rs +++ b/lightning/src/util/test_utils.rs @@ -51,6 +51,7 @@ use crate::sign::{ChannelSigner, PeerStorageKey}; use crate::sync::RwLock; use crate::types::features::{ChannelFeatures, InitFeatures, NodeFeatures}; use crate::util::async_poll::MaybeSend; +use crate::util::atomic_counter::AtomicCounter; use crate::util::config::UserConfig; use crate::util::dyn_signer::{ DynKeysInterface, DynKeysInterfaceTrait, DynPhantomKeysInterface, DynSigner, @@ -58,7 +59,7 @@ use crate::util::dyn_signer::{ use crate::util::logger::{Logger, Record}; #[cfg(feature = "std")] use crate::util::mut_global::MutGlobal; -use crate::util::persist::{KVStore, KVStoreSync, MonitorName}; +use crate::util::persist::{KVStore, KVStoreSync, MonitorName, PageToken, PaginatedListResponse}; use crate::util::ser::{Readable, ReadableArgs, Writeable, Writer}; use crate::util::test_channel_signer::{EnforcementState, TestChannelSigner}; use crate::util::wakers::Notifier; @@ -1124,6 +1125,122 @@ impl KVStoreSync for TestStore { unsafe impl Sync for TestStore {} unsafe impl Send for TestStore {} +/// A simple in-memory implementation of [`PaginatedKVStoreSync`] for testing. +/// +/// [`PaginatedKVStoreSync`]: crate::util::persist::PaginatedKVStoreSync +pub struct TestPaginatedStore { + data: Mutex)>>, + page_size: usize, + time_counter: AtomicCounter, +} + +impl TestPaginatedStore { + /// Creates a new `TestPaginatedStore` with the given page size. + pub fn new(page_size: usize) -> Self { + Self { data: Mutex::new(new_hash_map()), page_size, time_counter: AtomicCounter::new() } + } +} + +impl KVStoreSync for TestPaginatedStore { + fn read( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, + ) -> Result, io::Error> { + let data = self.data.lock().unwrap(); + data.get(&(primary_namespace.to_string(), secondary_namespace.to_string(), key.to_string())) + .map(|(_, v)| v.clone()) + .ok_or_else(|| io::Error::new(io::ErrorKind::NotFound, "key not found")) + } + + fn write( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec, + ) -> Result<(), io::Error> { + let mut data = self.data.lock().unwrap(); + let order = self.time_counter.next() as i64; + let key_tuple = + (primary_namespace.to_string(), secondary_namespace.to_string(), key.to_string()); + // Only use order for new entries; preserve existing order on updates + let order_to_use = + data.get(&key_tuple).map(|(existing_order, _)| *existing_order).unwrap_or(order); + data.insert(key_tuple, (order_to_use, buf)); + Ok(()) + } + + fn remove( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, _lazy: bool, + ) -> Result<(), io::Error> { + let mut data = self.data.lock().unwrap(); + data.remove(&( + primary_namespace.to_string(), + secondary_namespace.to_string(), + key.to_string(), + )); + Ok(()) + } + + fn list( + &self, primary_namespace: &str, secondary_namespace: &str, + ) -> Result, io::Error> { + let mut all_keys = Vec::new(); + let mut page_token = None; + loop { + let response = crate::util::persist::PaginatedKVStoreSync::list_paginated( + self, + primary_namespace, + secondary_namespace, + page_token, + )?; + all_keys.extend(response.keys); + match response.next_page_token { + Some(token) => page_token = Some(token), + None => break, + } + } + Ok(all_keys) + } +} + +impl crate::util::persist::PaginatedKVStoreSync for TestPaginatedStore { + fn list_paginated( + &self, primary_namespace: &str, secondary_namespace: &str, page_token: Option, + ) -> Result { + let data = self.data.lock().unwrap(); + let mut entries: Vec<_> = data + .iter() + .filter(|((pn, sn, _), _)| pn == primary_namespace && sn == secondary_namespace) + .map(|((_, _, k), (t, _))| (k.clone(), *t)) + .collect(); + + // Sort by time descending, then by key + entries.sort_by(|a, b| b.1.cmp(&a.1).then_with(|| a.0.cmp(&b.0))); + + // Apply pagination: find the first entry AFTER the given key in sort order. + // This implementation uses the last key as the page token. + let start_idx = if let Some(PageToken(ref last_key)) = page_token { + // Find the position of this key and start after it + entries.iter().position(|(k, _)| k == last_key).map(|pos| pos + 1).unwrap_or(0) + } else { + 0 + }; + + let page_entries: Vec<_> = + entries.into_iter().skip(start_idx).take(self.page_size).collect(); + + let next_page_token = if page_entries.len() == self.page_size { + page_entries.last().map(|(k, _)| PageToken(k.clone())) + } else { + None + }; + + Ok(PaginatedListResponse { + keys: page_entries.into_iter().map(|(k, _)| k).collect(), + next_page_token, + }) + } +} + +unsafe impl Sync for TestPaginatedStore {} +unsafe impl Send for TestPaginatedStore {} + pub struct TestBroadcaster { pub txn_broadcasted: Mutex>, pub blocks: Arc>>, From 86c6db1d5280aa358f63d148491eba42c4761481 Mon Sep 17 00:00:00 2001 From: benthecarman Date: Tue, 3 Feb 2026 12:11:20 -0600 Subject: [PATCH 2/3] prefactor: move FilesystemStore utilies into fs_store_common.rs Ahead of adding the FilesystemStoreV2 we move some common utilies into a shared file. --- lightning-persister/src/fs_store.rs | 537 +++++++++------------ lightning-persister/src/fs_store_common.rs | 321 ++++++++++++ lightning-persister/src/lib.rs | 1 + 3 files changed, 540 insertions(+), 319 deletions(-) create mode 100644 lightning-persister/src/fs_store_common.rs diff --git a/lightning-persister/src/fs_store.rs b/lightning-persister/src/fs_store.rs index 73c24dc6fc0..72e917418d0 100644 --- a/lightning-persister/src/fs_store.rs +++ b/lightning-persister/src/fs_store.rs @@ -1,58 +1,39 @@ //! Objects related to [`FilesystemStore`] live here. +use crate::fs_store_common::{prepare_atomic_write, FilesystemStoreState, WriteOptions}; use crate::utils::{check_namespace_key_validity, is_valid_kvstore_str}; use lightning::types::string::PrintableString; use lightning::util::persist::{KVStoreSync, MigratableKVStore}; -use std::collections::HashMap; use std::fs; -use std::io::{Read, Write}; +use std::io::Read; use std::path::{Path, PathBuf}; -use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering}; -use std::sync::{Arc, Mutex, RwLock}; +use std::sync::atomic::{AtomicU64, Ordering}; +use std::sync::{Arc, RwLock}; #[cfg(feature = "tokio")] use core::future::Future; #[cfg(feature = "tokio")] use lightning::util::persist::KVStore; +#[cfg(not(target_os = "windows"))] +use crate::fs_store_common::finalize_atomic_write_unix; #[cfg(target_os = "windows")] -use {std::ffi::OsStr, std::os::windows::ffi::OsStrExt}; - +use crate::fs_store_common::finalize_atomic_write_windows; +#[cfg(not(target_os = "windows"))] +use crate::fs_store_common::remove_file_unix; #[cfg(target_os = "windows")] -macro_rules! call { - ($e: expr) => { - if $e != 0 { - Ok(()) - } else { - Err(std::io::Error::last_os_error()) - } - }; -} +use crate::fs_store_common::remove_file_windows; -#[cfg(target_os = "windows")] -fn path_to_windows_str>(path: &T) -> Vec { - path.as_ref().encode_wide().chain(Some(0)).collect() -} - -// The number of times we retry listing keys in `FilesystemStore::list` before we give up reaching +// The number of times we retry listing keys in `list` before we give up reaching // a consistent view and error out. const LIST_DIR_CONSISTENCY_RETRIES: usize = 10; -struct FilesystemStoreInner { - data_dir: PathBuf, - tmp_file_counter: AtomicUsize, - - // Per path lock that ensures that we don't have concurrent writes to the same file. The lock also encapsulates the - // latest written version per key. - locks: Mutex>>>, -} - /// A [`KVStore`] and [`KVStoreSync`] implementation that writes to and reads from the file system. /// /// [`KVStore`]: lightning::util::persist::KVStore pub struct FilesystemStore { - inner: Arc, + inner: Arc, // Version counter to ensure that writes are applied in the correct order. It is assumed that read and list // operations aren't sensitive to the order of execution. @@ -62,10 +43,8 @@ pub struct FilesystemStore { impl FilesystemStore { /// Constructs a new [`FilesystemStore`]. pub fn new(data_dir: PathBuf) -> Self { - let locks = Mutex::new(HashMap::new()); - let tmp_file_counter = AtomicUsize::new(0); Self { - inner: Arc::new(FilesystemStoreInner { data_dir, tmp_file_counter, locks }), + inner: Arc::new(FilesystemStoreState::new(data_dir)), next_version: AtomicU64::new(1), } } @@ -94,81 +73,11 @@ impl FilesystemStore { let outer_lock = self.inner.locks.lock().unwrap(); outer_lock.len() } -} - -impl KVStoreSync for FilesystemStore { - fn read( - &self, primary_namespace: &str, secondary_namespace: &str, key: &str, - ) -> Result, lightning::io::Error> { - let path = self.inner.get_checked_dest_file_path( - primary_namespace, - secondary_namespace, - Some(key), - "read", - )?; - self.inner.read(path) - } - - fn write( - &self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec, - ) -> Result<(), lightning::io::Error> { - let path = self.inner.get_checked_dest_file_path( - primary_namespace, - secondary_namespace, - Some(key), - "write", - )?; - let (inner_lock_ref, version) = self.get_new_version_and_lock_ref(path.clone()); - self.inner.write_version(inner_lock_ref, path, buf, version) - } - - fn remove( - &self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool, - ) -> Result<(), lightning::io::Error> { - let path = self.inner.get_checked_dest_file_path( - primary_namespace, - secondary_namespace, - Some(key), - "remove", - )?; - let (inner_lock_ref, version) = self.get_new_version_and_lock_ref(path.clone()); - self.inner.remove_version(inner_lock_ref, path, lazy, version) - } - - fn list( - &self, primary_namespace: &str, secondary_namespace: &str, - ) -> Result, lightning::io::Error> { - let path = self.inner.get_checked_dest_file_path( - primary_namespace, - secondary_namespace, - None, - "list", - )?; - self.inner.list(path) - } -} - -impl FilesystemStoreInner { - fn get_inner_lock_ref(&self, path: PathBuf) -> Arc> { - let mut outer_lock = self.locks.lock().unwrap(); - Arc::clone(&outer_lock.entry(path).or_default()) - } fn get_dest_dir_path( &self, primary_namespace: &str, secondary_namespace: &str, ) -> std::io::Result { - let mut dest_dir_path = { - #[cfg(target_os = "windows")] - { - let data_dir = self.data_dir.clone(); - fs::create_dir_all(data_dir.clone())?; - fs::canonicalize(data_dir)? - } - #[cfg(not(target_os = "windows"))] - { - self.data_dir.clone() - } - }; + let mut dest_dir_path = self.inner.get_base_dir_path()?; dest_dir_path.push(primary_namespace); if !secondary_namespace.is_empty() { @@ -192,11 +101,11 @@ impl FilesystemStoreInner { Ok(dest_file_path) } - fn read(&self, dest_file_path: PathBuf) -> lightning::io::Result> { + fn read_impl(&self, dest_file_path: PathBuf) -> lightning::io::Result> { let mut buf = Vec::new(); - self.execute_locked_read(dest_file_path.clone(), || { - let mut f = fs::File::open(dest_file_path)?; + self.inner.execute_locked_read(dest_file_path.clone(), || { + let mut f = fs::File::open(&dest_file_path)?; f.read_to_end(&mut buf)?; Ok(()) })?; @@ -204,216 +113,62 @@ impl FilesystemStoreInner { Ok(buf) } - fn execute_locked_write Result<(), lightning::io::Error>>( - &self, inner_lock_ref: Arc>, dest_file_path: PathBuf, version: u64, callback: F, - ) -> Result<(), lightning::io::Error> { - let res = { - let mut last_written_version = inner_lock_ref.write().unwrap(); - - // Check if we already have a newer version written/removed. This is used in async contexts to realize eventual - // consistency. - let is_stale_version = version <= *last_written_version; - - // If the version is not stale, we execute the callback. Otherwise we can and must skip writing. - if is_stale_version { - Ok(()) - } else { - callback().map(|_| { - *last_written_version = version; - }) - } - }; - - self.clean_locks(&inner_lock_ref, dest_file_path); - - res - } - - fn execute_locked_read Result<(), lightning::io::Error>>( - &self, dest_file_path: PathBuf, callback: F, - ) -> Result<(), lightning::io::Error> { - let inner_lock_ref = self.get_inner_lock_ref(dest_file_path.clone()); - let res = { - let _guard = inner_lock_ref.read().unwrap(); - callback() - }; - self.clean_locks(&inner_lock_ref, dest_file_path); - res - } - - fn clean_locks(&self, inner_lock_ref: &Arc>, dest_file_path: PathBuf) { - // If there no arcs in use elsewhere, this means that there are no in-flight writes. We can remove the map entry - // to prevent leaking memory. The two arcs that are expected are the one in the map and the one held here in - // inner_lock_ref. The outer lock is obtained first, to avoid a new arc being cloned after we've already - // counted. - let mut outer_lock = self.locks.lock().unwrap(); - - let strong_count = Arc::strong_count(&inner_lock_ref); - debug_assert!(strong_count >= 2, "Unexpected FilesystemStore strong count"); - - if strong_count == 2 { - outer_lock.remove(&dest_file_path); - } - } - /// Writes a specific version of a key to the filesystem. If a newer version has been written already, this function /// returns early without writing. fn write_version( &self, inner_lock_ref: Arc>, dest_file_path: PathBuf, buf: Vec, version: u64, ) -> lightning::io::Result<()> { - let parent_directory = dest_file_path.parent().ok_or_else(|| { - let msg = - format!("Could not retrieve parent directory of {}.", dest_file_path.display()); - std::io::Error::new(std::io::ErrorKind::InvalidInput, msg) - })?; - fs::create_dir_all(&parent_directory)?; - - // Do a crazy dance with lots of fsync()s to be overly cautious here... - // We never want to end up in a state where we've lost the old data, or end up using the - // old data on power loss after we've returned. - // The way to atomically write a file on Unix platforms is: - // open(tmpname), write(tmpfile), fsync(tmpfile), close(tmpfile), rename(), fsync(dir) - let mut tmp_file_path = dest_file_path.clone(); - let tmp_file_ext = format!("{}.tmp", self.tmp_file_counter.fetch_add(1, Ordering::AcqRel)); - tmp_file_path.set_extension(tmp_file_ext); + let options = WriteOptions::default(); + let tmp_file_path = prepare_atomic_write(&self.inner, &dest_file_path, &buf, &options)?; - { - let mut tmp_file = fs::File::create(&tmp_file_path)?; - tmp_file.write_all(&buf)?; - tmp_file.sync_all()?; - } - - self.execute_locked_write(inner_lock_ref, dest_file_path.clone(), version, || { - #[cfg(not(target_os = "windows"))] - { - fs::rename(&tmp_file_path, &dest_file_path)?; - let dir_file = fs::OpenOptions::new().read(true).open(&parent_directory)?; - dir_file.sync_all()?; - Ok(()) - } - - #[cfg(target_os = "windows")] - { - let res = if dest_file_path.exists() { - call!(unsafe { - windows_sys::Win32::Storage::FileSystem::ReplaceFileW( - path_to_windows_str(&dest_file_path).as_ptr(), - path_to_windows_str(&tmp_file_path).as_ptr(), - std::ptr::null(), - windows_sys::Win32::Storage::FileSystem::REPLACEFILE_IGNORE_MERGE_ERRORS, - std::ptr::null_mut() as *const core::ffi::c_void, - std::ptr::null_mut() as *const core::ffi::c_void, - ) - }) - } else { - call!(unsafe { - windows_sys::Win32::Storage::FileSystem::MoveFileExW( - path_to_windows_str(&tmp_file_path).as_ptr(), - path_to_windows_str(&dest_file_path).as_ptr(), - windows_sys::Win32::Storage::FileSystem::MOVEFILE_WRITE_THROUGH - | windows_sys::Win32::Storage::FileSystem::MOVEFILE_REPLACE_EXISTING, - ) - }) - }; + self.inner + .execute_locked_write(inner_lock_ref, dest_file_path.clone(), version, || { + #[cfg(not(target_os = "windows"))] + { + finalize_atomic_write_unix(&tmp_file_path, &dest_file_path) + } - match res { - Ok(()) => { - // We fsync the dest file in hopes this will also flush the metadata to disk. - let dest_file = - fs::OpenOptions::new().read(true).write(true).open(&dest_file_path)?; - dest_file.sync_all()?; - Ok(()) - }, - Err(e) => Err(e.into()), + #[cfg(target_os = "windows")] + { + finalize_atomic_write_windows(&tmp_file_path, &dest_file_path, &options) } - } - }) + }) + .map(|_| ()) } fn remove_version( &self, inner_lock_ref: Arc>, dest_file_path: PathBuf, lazy: bool, version: u64, ) -> lightning::io::Result<()> { - self.execute_locked_write(inner_lock_ref, dest_file_path.clone(), version, || { - if !dest_file_path.is_file() { - return Ok(()); - } - - if lazy { - // If we're lazy we just call remove and be done with it. - fs::remove_file(&dest_file_path)?; - } else { - // If we're not lazy we try our best to persist the updated metadata to ensure - // atomicity of this call. - #[cfg(not(target_os = "windows"))] - { - fs::remove_file(&dest_file_path)?; - - let parent_directory = dest_file_path.parent().ok_or_else(|| { - let msg = format!( - "Could not retrieve parent directory of {}.", - dest_file_path.display() - ); - std::io::Error::new(std::io::ErrorKind::InvalidInput, msg) - })?; - let dir_file = fs::OpenOptions::new().read(true).open(parent_directory)?; - // The above call to `fs::remove_file` corresponds to POSIX `unlink`, whose changes - // to the inode might get cached (and hence possibly lost on crash), depending on - // the target platform and file system. - // - // In order to assert we permanently removed the file in question we therefore - // call `fsync` on the parent directory on platforms that support it. - dir_file.sync_all()?; + self.inner + .execute_locked_write(inner_lock_ref, dest_file_path.clone(), version, || { + if !dest_file_path.is_file() { + return Ok(()); } - #[cfg(target_os = "windows")] - { - // Since Windows `DeleteFile` API is not persisted until the last open file handle - // is dropped, and there seemingly is no reliable way to flush the directory - // metadata, we here fall back to use a 'recycling bin' model, i.e., first move the - // file to be deleted to a temporary trash file and remove the latter file - // afterwards. - // - // This should be marginally better, as, according to the documentation, - // `MoveFileExW` APIs should offer stronger persistence guarantees, - // at least if `MOVEFILE_WRITE_THROUGH`/`MOVEFILE_REPLACE_EXISTING` is set. - // However, all this is partially based on assumptions and local experiments, as - // Windows API is horribly underdocumented. - let mut trash_file_path = dest_file_path.clone(); - let trash_file_ext = - format!("{}.trash", self.tmp_file_counter.fetch_add(1, Ordering::AcqRel)); - trash_file_path.set_extension(trash_file_ext); - - call!(unsafe { - windows_sys::Win32::Storage::FileSystem::MoveFileExW( - path_to_windows_str(&dest_file_path).as_ptr(), - path_to_windows_str(&trash_file_path).as_ptr(), - windows_sys::Win32::Storage::FileSystem::MOVEFILE_WRITE_THROUGH - | windows_sys::Win32::Storage::FileSystem::MOVEFILE_REPLACE_EXISTING, - ) - })?; - + if lazy { + // If we're lazy we just call remove and be done with it. + fs::remove_file(&dest_file_path)?; + } else { + // If we're not lazy we try our best to persist the updated metadata to ensure + // atomicity of this call. + #[cfg(not(target_os = "windows"))] { - // We fsync the trash file in hopes this will also flush the original's file - // metadata to disk. - let trash_file = fs::OpenOptions::new() - .read(true) - .write(true) - .open(&trash_file_path.clone())?; - trash_file.sync_all()?; + remove_file_unix(&dest_file_path)?; } - // We're fine if this remove would fail as the trash file will be cleaned up in - // list eventually. - fs::remove_file(trash_file_path).ok(); + #[cfg(target_os = "windows")] + { + remove_file_windows(&self.inner, &dest_file_path)?; + } } - } - Ok(()) - }) + Ok(()) + }) + .map(|_| ()) } - fn list(&self, prefixed_dest: PathBuf) -> lightning::io::Result> { + fn list_impl(&self, prefixed_dest: PathBuf) -> lightning::io::Result> { if !Path::new(&prefixed_dest).exists() { return Ok(Vec::new()); } @@ -446,7 +201,7 @@ impl FilesystemStoreInner { continue 'retry_list; } else { // For all errors or if we exhausted retries, bubble up. - return Err(e.into()); + return Err(e); } }, } @@ -458,13 +213,61 @@ impl FilesystemStoreInner { } } +impl KVStoreSync for FilesystemStore { + fn read( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, + ) -> Result, lightning::io::Error> { + let path = self.get_checked_dest_file_path( + primary_namespace, + secondary_namespace, + Some(key), + "read", + )?; + self.read_impl(path) + } + + fn write( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec, + ) -> Result<(), lightning::io::Error> { + let path = self.get_checked_dest_file_path( + primary_namespace, + secondary_namespace, + Some(key), + "write", + )?; + let (inner_lock_ref, version) = self.get_new_version_and_lock_ref(path.clone()); + self.write_version(inner_lock_ref, path, buf, version) + } + + fn remove( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool, + ) -> Result<(), lightning::io::Error> { + let path = self.get_checked_dest_file_path( + primary_namespace, + secondary_namespace, + Some(key), + "remove", + )?; + let (inner_lock_ref, version) = self.get_new_version_and_lock_ref(path.clone()); + self.remove_version(inner_lock_ref, path, lazy, version) + } + + fn list( + &self, primary_namespace: &str, secondary_namespace: &str, + ) -> Result, lightning::io::Error> { + let path = + self.get_checked_dest_file_path(primary_namespace, secondary_namespace, None, "list")?; + self.list_impl(path) + } +} + #[cfg(feature = "tokio")] impl KVStore for FilesystemStore { fn read( &self, primary_namespace: &str, secondary_namespace: &str, key: &str, ) -> impl Future, lightning::io::Error>> + 'static + Send { let this = Arc::clone(&self.inner); - let path = this.get_checked_dest_file_path( + let path = self.get_checked_dest_file_path( primary_namespace, secondary_namespace, Some(key), @@ -476,9 +279,18 @@ impl KVStore for FilesystemStore { Ok(path) => path, Err(e) => return Err(e), }; - tokio::task::spawn_blocking(move || this.read(path)).await.unwrap_or_else(|e| { - Err(lightning::io::Error::new(lightning::io::ErrorKind::Other, e)) + + tokio::task::spawn_blocking(move || { + let mut buf = Vec::new(); + this.execute_locked_read(path.clone(), || { + let mut f = fs::File::open(&path)?; + f.read_to_end(&mut buf)?; + Ok(()) + })?; + Ok(buf) }) + .await + .unwrap_or_else(|e| Err(lightning::io::Error::new(lightning::io::ErrorKind::Other, e))) } } @@ -486,17 +298,39 @@ impl KVStore for FilesystemStore { &self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec, ) -> impl Future> + 'static + Send { let this = Arc::clone(&self.inner); - let path = this - .get_checked_dest_file_path(primary_namespace, secondary_namespace, Some(key), "write") - .map(|path| (self.get_new_version_and_lock_ref(path.clone()), path)); + let path_result = self.get_checked_dest_file_path( + primary_namespace, + secondary_namespace, + Some(key), + "write", + ); + let version_and_lock = path_result + .as_ref() + .ok() + .map(|path| (self.get_new_version_and_lock_ref(path.clone()), path.clone())); async move { - let ((inner_lock_ref, version), path) = match path { - Ok(res) => res, - Err(e) => return Err(e), + let ((inner_lock_ref, version), dest_file_path) = match version_and_lock { + Some(v) => v, + None => return Err(path_result.unwrap_err()), }; + tokio::task::spawn_blocking(move || { - this.write_version(inner_lock_ref, path, buf, version) + let options = WriteOptions::default(); + let tmp_file_path = prepare_atomic_write(&this, &dest_file_path, &buf, &options)?; + + this.execute_locked_write(inner_lock_ref, dest_file_path.clone(), version, || { + #[cfg(not(target_os = "windows"))] + { + finalize_atomic_write_unix(&tmp_file_path, &dest_file_path) + } + + #[cfg(target_os = "windows")] + { + finalize_atomic_write_windows(&tmp_file_path, &dest_file_path, &options) + } + }) + .map(|_| ()) }) .await .unwrap_or_else(|e| Err(lightning::io::Error::new(lightning::io::ErrorKind::Other, e))) @@ -507,17 +341,46 @@ impl KVStore for FilesystemStore { &self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool, ) -> impl Future> + 'static + Send { let this = Arc::clone(&self.inner); - let path = this - .get_checked_dest_file_path(primary_namespace, secondary_namespace, Some(key), "remove") - .map(|path| (self.get_new_version_and_lock_ref(path.clone()), path)); + let path_result = self.get_checked_dest_file_path( + primary_namespace, + secondary_namespace, + Some(key), + "remove", + ); + let version_and_lock = path_result + .as_ref() + .ok() + .map(|path| (self.get_new_version_and_lock_ref(path.clone()), path.clone())); async move { - let ((inner_lock_ref, version), path) = match path { - Ok(res) => res, - Err(e) => return Err(e), + let ((inner_lock_ref, version), dest_file_path) = match version_and_lock { + Some(v) => v, + None => return Err(path_result.unwrap_err()), }; + tokio::task::spawn_blocking(move || { - this.remove_version(inner_lock_ref, path, lazy, version) + this.execute_locked_write(inner_lock_ref, dest_file_path.clone(), version, || { + if !dest_file_path.is_file() { + return Ok(()); + } + + if lazy { + fs::remove_file(&dest_file_path)?; + } else { + #[cfg(not(target_os = "windows"))] + { + remove_file_unix(&dest_file_path)?; + } + + #[cfg(target_os = "windows")] + { + remove_file_windows(&this, &dest_file_path)?; + } + } + + Ok(()) + }) + .map(|_| ()) }) .await .unwrap_or_else(|e| Err(lightning::io::Error::new(lightning::io::ErrorKind::Other, e))) @@ -527,19 +390,55 @@ impl KVStore for FilesystemStore { fn list( &self, primary_namespace: &str, secondary_namespace: &str, ) -> impl Future, lightning::io::Error>> + 'static + Send { - let this = Arc::clone(&self.inner); - let path = - this.get_checked_dest_file_path(primary_namespace, secondary_namespace, None, "list"); + self.get_checked_dest_file_path(primary_namespace, secondary_namespace, None, "list"); async move { let path = match path { Ok(path) => path, Err(e) => return Err(e), }; - tokio::task::spawn_blocking(move || this.list(path)).await.unwrap_or_else(|e| { - Err(lightning::io::Error::new(lightning::io::ErrorKind::Other, e)) + + tokio::task::spawn_blocking(move || { + if !Path::new(&path).exists() { + return Ok(Vec::new()); + } + + let mut keys; + let mut retries = LIST_DIR_CONSISTENCY_RETRIES; + + 'retry_list: loop { + keys = Vec::new(); + 'skip_entry: for entry in fs::read_dir(&path)? { + let entry = entry?; + let p = entry.path(); + + let res = dir_entry_is_key(&entry); + match res { + Ok(true) => { + let key = get_key_from_dir_entry_path(&p, &path)?; + keys.push(key); + }, + Ok(false) => { + continue 'skip_entry; + }, + Err(e) => { + if e.kind() == lightning::io::ErrorKind::NotFound && retries > 0 { + retries -= 1; + continue 'retry_list; + } else { + return Err(e); + } + }, + } + } + break 'retry_list; + } + + Ok(keys) }) + .await + .unwrap_or_else(|e| Err(lightning::io::Error::new(lightning::io::ErrorKind::Other, e))) } } } @@ -585,7 +484,7 @@ fn dir_entry_is_key(dir_entry: &fs::DirEntry) -> Result Result { - match p.strip_prefix(&base_path) { + match p.strip_prefix(base_path) { Ok(stripped_path) => { if let Some(relative_path) = stripped_path.to_str() { if is_valid_kvstore_str(relative_path) { diff --git a/lightning-persister/src/fs_store_common.rs b/lightning-persister/src/fs_store_common.rs new file mode 100644 index 00000000000..a992d6b1666 --- /dev/null +++ b/lightning-persister/src/fs_store_common.rs @@ -0,0 +1,321 @@ +//! Common utilities shared between [`FilesystemStore`] and [`FilesystemStoreV2`]. +//! +//! [`FilesystemStore`]: crate::fs_store::FilesystemStore + +use std::collections::HashMap; +use std::fs; +use std::io::Write; +use std::path::PathBuf; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::{Arc, Mutex, RwLock}; + +#[cfg(target_os = "windows")] +use std::ffi::OsStr; +#[cfg(target_os = "windows")] +use std::os::windows::ffi::OsStrExt; + +/// Calls a Windows API function and returns Ok(()) on success or the last OS error on failure. +#[cfg(target_os = "windows")] +macro_rules! call { + ($e: expr) => { + if $e != 0 { + Ok(()) + } else { + Err(std::io::Error::last_os_error()) + } + }; +} + +#[cfg(target_os = "windows")] +pub(crate) use call; + +/// Converts a path to a null-terminated wide string for Windows API calls. +#[cfg(target_os = "windows")] +pub(crate) fn path_to_windows_str>(path: &T) -> Vec { + path.as_ref().encode_wide().chain(Some(0)).collect() +} + +/// Inner state shared between sync and async operations for filesystem stores. +/// +/// This struct manages the data directory, temporary file counter, and per-path locks +/// that ensure we don't have concurrent writes to the same file. +pub(crate) struct FilesystemStoreState { + pub(crate) data_dir: PathBuf, + pub(crate) tmp_file_counter: AtomicUsize, + /// Per path lock that ensures that we don't have concurrent writes to the same file. + /// The lock also encapsulates the latest written version per key. + pub(crate) locks: Mutex>>>, +} + +impl FilesystemStoreState { + /// Creates a new `FilesystemStoreState` with the given data directory. + pub(crate) fn new(data_dir: PathBuf) -> Self { + Self { data_dir, tmp_file_counter: AtomicUsize::new(0), locks: Mutex::new(HashMap::new()) } + } + + /// Gets or creates a lock reference for the given path. + pub(crate) fn get_inner_lock_ref(&self, path: PathBuf) -> Arc> { + let mut outer_lock = self.locks.lock().unwrap(); + Arc::clone(&outer_lock.entry(path).or_default()) + } + + /// Cleans up unused locks to prevent memory leaks. + /// + /// If there are no arcs in use elsewhere (besides the map entry and the provided reference), + /// we can remove the map entry to prevent leaking memory. + pub(crate) fn clean_locks(&self, inner_lock_ref: &Arc>, dest_file_path: PathBuf) { + let mut outer_lock = self.locks.lock().unwrap(); + + let strong_count = Arc::strong_count(inner_lock_ref); + debug_assert!(strong_count >= 2, "Unexpected FilesystemStore strong count"); + + if strong_count == 2 { + outer_lock.remove(&dest_file_path); + } + } + + /// Executes a read operation while holding the read lock for the given path. + pub(crate) fn execute_locked_read Result<(), lightning::io::Error>>( + &self, dest_file_path: PathBuf, callback: F, + ) -> Result<(), lightning::io::Error> { + let inner_lock_ref = self.get_inner_lock_ref(dest_file_path.clone()); + let res = { + let _guard = inner_lock_ref.read().unwrap(); + callback() + }; + self.clean_locks(&inner_lock_ref, dest_file_path); + res + } + + /// Executes a write operation with version tracking. + /// + /// Returns `Ok(true)` if the callback was executed, `Ok(false)` if skipped due to staleness. + pub(crate) fn execute_locked_write Result<(), lightning::io::Error>>( + &self, inner_lock_ref: Arc>, lock_key: PathBuf, version: u64, callback: F, + ) -> Result { + let res = { + let mut last_written_version = inner_lock_ref.write().unwrap(); + + // Check if we already have a newer version written/removed. This is used in async + // contexts to realize eventual consistency. + let is_stale_version = version <= *last_written_version; + + // If the version is not stale, we execute the callback. Otherwise we can and must skip. + if is_stale_version { + Ok(false) + } else { + callback().map(|_| { + *last_written_version = version; + true + }) + } + }; + + self.clean_locks(&inner_lock_ref, lock_key); + + res + } + + /// Returns the base directory path for a namespace combination. + /// + /// On Windows, this canonicalizes the path after creating the data directory. + pub(crate) fn get_base_dir_path(&self) -> std::io::Result { + #[cfg(target_os = "windows")] + { + let data_dir = self.data_dir.clone(); + fs::create_dir_all(data_dir.clone())?; + fs::canonicalize(data_dir) + } + #[cfg(not(target_os = "windows"))] + { + Ok(self.data_dir.clone()) + } + } + + /// Generates a unique temporary file path based on the destination path. + pub(crate) fn get_tmp_file_path(&self, dest_file_path: &PathBuf) -> PathBuf { + let mut tmp_file_path = dest_file_path.clone(); + let tmp_file_ext = format!("{}.tmp", self.tmp_file_counter.fetch_add(1, Ordering::AcqRel)); + tmp_file_path.set_extension(tmp_file_ext); + tmp_file_path + } + + /// Generates a unique trash file path for Windows deletion operations. + #[cfg(target_os = "windows")] + pub(crate) fn get_trash_file_path(&self, dest_file_path: &PathBuf) -> PathBuf { + let mut trash_file_path = dest_file_path.clone(); + let trash_file_ext = + format!("{}.trash", self.tmp_file_counter.fetch_add(1, Ordering::AcqRel)); + trash_file_path.set_extension(trash_file_ext); + trash_file_path + } +} + +/// Options for writing a file atomically. +#[derive(Default)] +pub(crate) struct WriteOptions { + /// If set, the file's modification time will be set to this value. + pub(crate) preserve_mtime: Option, +} + +/// Writes data to a temporary file and prepares it for atomic rename. +/// +/// This handles: +/// - Creating the parent directory +/// - Writing to a temporary file +/// - Setting mtime if requested (for FilesystemStoreV2) +/// - Syncing the temp file +/// +/// Returns the temporary file path that should be renamed to the destination. +pub(crate) fn prepare_atomic_write( + state: &FilesystemStoreState, dest_file_path: &PathBuf, buf: &[u8], options: &WriteOptions, +) -> lightning::io::Result { + let parent_directory = dest_file_path.parent().ok_or_else(|| { + let msg = format!("Could not retrieve parent directory of {}.", dest_file_path.display()); + std::io::Error::new(std::io::ErrorKind::InvalidInput, msg) + })?; + fs::create_dir_all(parent_directory)?; + + let tmp_file_path = state.get_tmp_file_path(dest_file_path); + + { + let tmp_file = fs::File::create(&tmp_file_path)?; + let mut writer = std::io::BufWriter::new(&tmp_file); + writer.write_all(buf)?; + writer.flush()?; + + // If we need to preserve the original mtime (for updates), set it before fsync. + if let Some(mtime) = options.preserve_mtime { + let times = std::fs::FileTimes::new().set_modified(mtime); + tmp_file.set_times(times)?; + } + + tmp_file.sync_all()?; + } + + Ok(tmp_file_path) +} + +/// Performs the atomic rename from temp file to destination on Unix. +#[cfg(not(target_os = "windows"))] +pub(crate) fn finalize_atomic_write_unix( + tmp_file_path: &PathBuf, dest_file_path: &PathBuf, +) -> lightning::io::Result<()> { + fs::rename(tmp_file_path, dest_file_path)?; + + let parent_directory = dest_file_path.parent().ok_or_else(|| { + let msg = format!("Could not retrieve parent directory of {}.", dest_file_path.display()); + std::io::Error::new(std::io::ErrorKind::InvalidInput, msg) + })?; + + let dir_file = fs::OpenOptions::new().read(true).open(parent_directory)?; + dir_file.sync_all()?; + Ok(()) +} + +/// Performs the atomic rename from temp file to destination on Windows. +#[cfg(target_os = "windows")] +pub(crate) fn finalize_atomic_write_windows( + tmp_file_path: &PathBuf, dest_file_path: &PathBuf, options: &WriteOptions, +) -> lightning::io::Result<()> { + let res = if dest_file_path.exists() { + call!(unsafe { + windows_sys::Win32::Storage::FileSystem::ReplaceFileW( + path_to_windows_str(&dest_file_path).as_ptr(), + path_to_windows_str(&tmp_file_path).as_ptr(), + std::ptr::null(), + windows_sys::Win32::Storage::FileSystem::REPLACEFILE_IGNORE_MERGE_ERRORS, + std::ptr::null_mut() as *const core::ffi::c_void, + std::ptr::null_mut() as *const core::ffi::c_void, + ) + }) + } else { + call!(unsafe { + windows_sys::Win32::Storage::FileSystem::MoveFileExW( + path_to_windows_str(&tmp_file_path).as_ptr(), + path_to_windows_str(&dest_file_path).as_ptr(), + windows_sys::Win32::Storage::FileSystem::MOVEFILE_WRITE_THROUGH + | windows_sys::Win32::Storage::FileSystem::MOVEFILE_REPLACE_EXISTING, + ) + }) + }; + + match res { + Ok(()) => { + // Open the destination file to fsync it and set mtime if needed. + let dest_file = fs::OpenOptions::new().read(true).write(true).open(dest_file_path)?; + + // On Windows, ReplaceFileW/MoveFileExW may not preserve the mtime we set + // on the tmp file, so we explicitly set it again here. + if let Some(mtime) = options.preserve_mtime { + let times = std::fs::FileTimes::new().set_modified(mtime); + dest_file.set_times(times)?; + } + + dest_file.sync_all()?; + Ok(()) + }, + Err(e) => Err(e.into()), + } +} + +/// Removes a file atomically on Unix with fsync on the parent directory. +#[cfg(not(target_os = "windows"))] +pub(crate) fn remove_file_unix(dest_file_path: &PathBuf) -> lightning::io::Result<()> { + fs::remove_file(dest_file_path)?; + + let parent_directory = dest_file_path.parent().ok_or_else(|| { + let msg = format!("Could not retrieve parent directory of {}.", dest_file_path.display()); + std::io::Error::new(std::io::ErrorKind::InvalidInput, msg) + })?; + let dir_file = fs::OpenOptions::new().read(true).open(parent_directory)?; + // The above call to `fs::remove_file` corresponds to POSIX `unlink`, whose changes + // to the inode might get cached (and hence possibly lost on crash), depending on + // the target platform and file system. + // + // In order to assert we permanently removed the file in question we therefore + // call `fsync` on the parent directory on platforms that support it. + dir_file.sync_all()?; + Ok(()) +} + +/// Removes a file on Windows using the trash file approach for durability. +#[cfg(target_os = "windows")] +pub(crate) fn remove_file_windows( + state: &FilesystemStoreState, dest_file_path: &PathBuf, +) -> lightning::io::Result<()> { + // Since Windows `DeleteFile` API is not persisted until the last open file handle + // is dropped, and there seemingly is no reliable way to flush the directory + // metadata, we here fall back to use a 'recycling bin' model, i.e., first move the + // file to be deleted to a temporary trash file and remove the latter file + // afterwards. + // + // This should be marginally better, as, according to the documentation, + // `MoveFileExW` APIs should offer stronger persistence guarantees, + // at least if `MOVEFILE_WRITE_THROUGH`/`MOVEFILE_REPLACE_EXISTING` is set. + // However, all this is partially based on assumptions and local experiments, as + // Windows API is horribly underdocumented. + let trash_file_path = state.get_trash_file_path(dest_file_path); + + call!(unsafe { + windows_sys::Win32::Storage::FileSystem::MoveFileExW( + path_to_windows_str(&dest_file_path).as_ptr(), + path_to_windows_str(&trash_file_path).as_ptr(), + windows_sys::Win32::Storage::FileSystem::MOVEFILE_WRITE_THROUGH + | windows_sys::Win32::Storage::FileSystem::MOVEFILE_REPLACE_EXISTING, + ) + })?; + + { + // We fsync the trash file in hopes this will also flush the original's file + // metadata to disk. + let trash_file = fs::OpenOptions::new().read(true).write(true).open(&trash_file_path)?; + trash_file.sync_all()?; + } + + // We're fine if this remove would fail as the trash file will be cleaned up in + // list eventually. + fs::remove_file(trash_file_path).ok(); + + Ok(()) +} diff --git a/lightning-persister/src/lib.rs b/lightning-persister/src/lib.rs index 0e3541e1b27..e4973f532e8 100644 --- a/lightning-persister/src/lib.rs +++ b/lightning-persister/src/lib.rs @@ -10,6 +10,7 @@ extern crate criterion; pub mod fs_store; +mod fs_store_common; mod utils; #[cfg(test)] From b55f58d7111a48145246a3b8f789c1f65addf083 Mon Sep 17 00:00:00 2001 From: benthecarman Date: Mon, 2 Feb 2026 11:22:22 -0600 Subject: [PATCH 3/3] Add FilesystemStoreV2 with paginated listing support Implements PaginatedKVStore traits with timestamp-prefixed filenames for newest-first pagination and [empty] directory markers for consistent namespace hierarchy. Includes v1 to v2 migration utility. Co-Authored-By: Claude Opus 4.5 --- lightning-persister/src/fs_store_common.rs | 1 + lightning-persister/src/fs_store_v2.rs | 1345 ++++++++++++++++++++ lightning-persister/src/lib.rs | 1 + 3 files changed, 1347 insertions(+) create mode 100644 lightning-persister/src/fs_store_v2.rs diff --git a/lightning-persister/src/fs_store_common.rs b/lightning-persister/src/fs_store_common.rs index a992d6b1666..c15f3aed0ab 100644 --- a/lightning-persister/src/fs_store_common.rs +++ b/lightning-persister/src/fs_store_common.rs @@ -1,6 +1,7 @@ //! Common utilities shared between [`FilesystemStore`] and [`FilesystemStoreV2`]. //! //! [`FilesystemStore`]: crate::fs_store::FilesystemStore +//! [`FilesystemStoreV2`]: crate::fs_store_v2::FilesystemStoreV2 use std::collections::HashMap; use std::fs; diff --git a/lightning-persister/src/fs_store_v2.rs b/lightning-persister/src/fs_store_v2.rs new file mode 100644 index 00000000000..01f411214ed --- /dev/null +++ b/lightning-persister/src/fs_store_v2.rs @@ -0,0 +1,1345 @@ +//! Objects related to [`FilesystemStoreV2`] live here. +use crate::fs_store_common::{prepare_atomic_write, FilesystemStoreState, WriteOptions}; +use crate::utils::{check_namespace_key_validity, is_valid_kvstore_str}; + +use lightning::util::persist::{ + KVStoreSync, MigratableKVStore, PageToken, PaginatedKVStoreSync, PaginatedListResponse, +}; + +use std::fs; +use std::io::Read; +use std::path::{Path, PathBuf}; +use std::sync::atomic::{AtomicU64, Ordering}; +use std::sync::{Arc, RwLock}; +use std::time::{SystemTime, UNIX_EPOCH}; + +#[cfg(feature = "tokio")] +use core::future::Future; +#[cfg(feature = "tokio")] +use lightning::util::persist::{KVStore, PaginatedKVStore}; + +#[cfg(not(target_os = "windows"))] +use crate::fs_store_common::finalize_atomic_write_unix; +#[cfg(target_os = "windows")] +use crate::fs_store_common::finalize_atomic_write_windows; +#[cfg(not(target_os = "windows"))] +use crate::fs_store_common::remove_file_unix; +#[cfg(target_os = "windows")] +use crate::fs_store_common::remove_file_windows; + +/// The fixed page size for paginated listing operations. +const PAGE_SIZE: usize = 50; + +/// The directory name used for empty namespaces. +/// Uses brackets which are not in KVSTORE_NAMESPACE_KEY_ALPHABET, preventing collisions +/// with valid namespace names. +const EMPTY_NAMESPACE_DIR: &str = "[empty]"; + +/// The length of the timestamp in a page token (milliseconds since epoch as 16-digit decimal). +const PAGE_TOKEN_TIMESTAMP_LEN: usize = 16; + +/// A [`KVStore`] and [`KVStoreSync`] implementation that writes to and reads from the file system. +/// +/// This is version 2 of the filesystem store which provides: +/// - Consistent directory structure using `[empty]` for empty namespaces +/// - File modification times for creation-order pagination +/// - Support for [`PaginatedKVStoreSync`] with newest-first ordering +/// +/// ## Directory Structure +/// +/// Files are stored with a consistent two-level namespace hierarchy: +/// ```text +/// data_dir/ +/// [empty]/ # empty primary namespace +/// [empty]/ # empty secondary namespace +/// {key} +/// primary_ns/ +/// [empty]/ # empty secondary namespace +/// {key} +/// secondary_ns/ +/// {key} +/// ``` +/// +/// ## File Ordering +/// +/// Files are ordered by their modification time (mtime). When a file is created, it gets +/// the current time. When updated, the original creation time is preserved by setting +/// the mtime of the new file to match the original before the atomic rename. +/// +/// [`KVStore`]: lightning::util::persist::KVStore +pub struct FilesystemStoreV2 { + inner: Arc, + + // Version counter to ensure that writes are applied in the correct order. It is assumed that read and list + // operations aren't sensitive to the order of execution. + next_version: AtomicU64, +} + +impl FilesystemStoreV2 { + /// Constructs a new [`FilesystemStoreV2`]. + pub fn new(data_dir: PathBuf) -> std::io::Result { + Ok(Self { + inner: Arc::new(FilesystemStoreState::new(data_dir)), + next_version: AtomicU64::new(1), + }) + } + + /// Returns the data directory. + pub fn get_data_dir(&self) -> PathBuf { + self.inner.data_dir.clone() + } + + fn get_new_version_and_lock_ref(&self, lock_key: PathBuf) -> (Arc>, u64) { + let version = self.next_version.fetch_add(1, Ordering::Relaxed); + if version == u64::MAX { + panic!("FilesystemStoreV2 version counter overflowed"); + } + + // Get a reference to the inner lock. We do this early so that the arc can double as an in-flight counter for + // cleaning up unused locks. + let inner_lock_ref = self.inner.get_inner_lock_ref(lock_key); + + (inner_lock_ref, version) + } + + #[cfg(any(all(feature = "tokio", test), fuzzing))] + /// Returns the size of the async state. + pub fn state_size(&self) -> usize { + let outer_lock = self.inner.locks.lock().unwrap(); + outer_lock.len() + } + + fn get_dest_dir_path( + &self, primary_namespace: &str, secondary_namespace: &str, + ) -> std::io::Result { + let mut dest_dir_path = self.inner.get_base_dir_path()?; + + // Use [empty] for empty namespaces to ensure consistent directory depth + let primary_dir = + if primary_namespace.is_empty() { EMPTY_NAMESPACE_DIR } else { primary_namespace }; + let secondary_dir = + if secondary_namespace.is_empty() { EMPTY_NAMESPACE_DIR } else { secondary_namespace }; + + dest_dir_path.push(primary_dir); + dest_dir_path.push(secondary_dir); + + Ok(dest_dir_path) + } + + /// Returns the file path for a given namespace/key combination. + fn get_file_path( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, + ) -> std::io::Result { + let dir = self.get_dest_dir_path(primary_namespace, secondary_namespace)?; + Ok(dir.join(key)) + } + + fn read_impl(&self, dest_file_path: PathBuf) -> lightning::io::Result> { + let mut buf = Vec::new(); + + self.inner.execute_locked_read(dest_file_path.clone(), || { + let mut f = fs::File::open(&dest_file_path)?; + f.read_to_end(&mut buf)?; + Ok(()) + })?; + + Ok(buf) + } + + /// Writes a specific version of a key to the filesystem. If a newer version has been written already, this function + /// returns early without writing. + /// If `preserve_mtime` is Some, the file's modification time will be set to that value to preserve creation order. + /// Returns `Ok(true)` if the write was performed, `Ok(false)` if skipped due to staleness. + fn write_version( + &self, inner_lock_ref: Arc>, dest_file_path: PathBuf, buf: Vec, + preserve_mtime: Option, version: u64, + ) -> lightning::io::Result { + let options = WriteOptions { preserve_mtime }; + let tmp_file_path = prepare_atomic_write(&self.inner, &dest_file_path, &buf, &options)?; + + self.inner.execute_locked_write(inner_lock_ref, dest_file_path.clone(), version, || { + #[cfg(not(target_os = "windows"))] + { + finalize_atomic_write_unix(&tmp_file_path, &dest_file_path) + } + + #[cfg(target_os = "windows")] + { + finalize_atomic_write_windows(&tmp_file_path, &dest_file_path, &options) + } + }) + } + + /// Removes a specific version of a key from the filesystem. If a newer version has been written already, this function + /// returns early without removing. + /// Returns `Ok(true)` if the remove was performed, `Ok(false)` if skipped due to staleness. + fn remove_version( + &self, inner_lock_ref: Arc>, lock_key: PathBuf, dest_file_path: PathBuf, + lazy: bool, version: u64, + ) -> lightning::io::Result { + self.inner.execute_locked_write(inner_lock_ref, lock_key, version, || { + if !dest_file_path.is_file() { + return Ok(()); + } + + if lazy { + // If we're lazy we just call remove and be done with it. + fs::remove_file(&dest_file_path)?; + } else { + // If we're not lazy we try our best to persist the updated metadata to ensure + // atomicity of this call. + #[cfg(not(target_os = "windows"))] + { + remove_file_unix(&dest_file_path)?; + } + + #[cfg(target_os = "windows")] + { + remove_file_windows(&self.inner, &dest_file_path)?; + } + } + + Ok(()) + }) + } + + fn list_impl(&self, prefixed_dest: PathBuf) -> lightning::io::Result> { + if !Path::new(&prefixed_dest).exists() { + return Ok(Vec::new()); + } + + let mut keys = Vec::new(); + for entry in fs::read_dir(&prefixed_dest)? { + let entry = entry?; + let path = entry.path(); + + if let Some(key) = entry_to_key(&path) { + keys.push(key); + } + } + + Ok(keys) + } + + fn list_paginated_impl( + &self, prefixed_dest: PathBuf, page_token: Option, + ) -> lightning::io::Result { + if !Path::new(&prefixed_dest).exists() { + return Ok(PaginatedListResponse { keys: Vec::new(), next_page_token: None }); + } + + // Collect all entries with their modification times + let mut entries: Vec<(u64, String)> = Vec::new(); + for entry in fs::read_dir(&prefixed_dest)? { + let entry = entry?; + let path = entry.path(); + + if let Some(key) = entry_to_key(&path) { + // Get modification time as millis since epoch + let mtime_millis = entry + .metadata() + .ok() + .and_then(|m| m.modified().ok()) + .and_then(|t| t.duration_since(UNIX_EPOCH).ok()) + .map(|d| d.as_millis() as u64) + .unwrap_or(0); + + entries.push((mtime_millis, key)); + } + } + + // Sort by mtime descending (newest first), then by key descending for same mtime + entries.sort_by(|a, b| b.0.cmp(&a.0).then_with(|| b.1.cmp(&a.1))); + + // Find starting position based on page token + let start_idx = if let Some(token) = page_token { + let (token_mtime, token_key) = parse_page_token(&token.0)?; + + // Find entries that come after the token (older entries = lower mtime) + // or same mtime but lexicographically smaller key (since we sort descending) + entries + .iter() + .position(|(mtime, key)| { + *mtime < token_mtime + || (*mtime == token_mtime && key.as_str() < token_key.as_str()) + }) + .unwrap_or(entries.len()) + } else { + 0 + }; + + // Take PAGE_SIZE entries starting from start_idx + let page_entries: Vec<_> = + entries.iter().skip(start_idx).take(PAGE_SIZE).cloned().collect(); + + let keys: Vec = page_entries.iter().map(|(_, key)| key.clone()).collect(); + + // Determine next page token + let next_page_token = if start_idx + PAGE_SIZE < entries.len() { + page_entries.last().map(|(mtime, key)| PageToken(format_page_token(*mtime, key))) + } else { + None + }; + + Ok(PaginatedListResponse { keys, next_page_token }) + } +} + +impl KVStoreSync for FilesystemStoreV2 { + fn read( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, + ) -> Result, lightning::io::Error> { + check_namespace_key_validity(primary_namespace, secondary_namespace, Some(key), "read")?; + + let file_path = self.get_file_path(primary_namespace, secondary_namespace, key)?; + self.read_impl(file_path) + } + + fn write( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec, + ) -> Result<(), lightning::io::Error> { + check_namespace_key_validity(primary_namespace, secondary_namespace, Some(key), "write")?; + + let dest_file_path = self.get_file_path(primary_namespace, secondary_namespace, key)?; + + // Get the existing file's mtime if it exists (to preserve creation order on update) + let existing_mtime = fs::metadata(&dest_file_path).ok().and_then(|m| m.modified().ok()); + + let (inner_lock_ref, version) = self.get_new_version_and_lock_ref(dest_file_path.clone()); + self.write_version(inner_lock_ref, dest_file_path, buf, existing_mtime, version).map(|_| ()) + } + + fn remove( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool, + ) -> Result<(), lightning::io::Error> { + check_namespace_key_validity(primary_namespace, secondary_namespace, Some(key), "remove")?; + + let file_path = self.get_file_path(primary_namespace, secondary_namespace, key)?; + + if !file_path.exists() { + // File doesn't exist, nothing to remove + return Ok(()); + } + + let (inner_lock_ref, version) = self.get_new_version_and_lock_ref(file_path.clone()); + self.remove_version(inner_lock_ref, file_path.clone(), file_path, lazy, version).map(|_| ()) + } + + fn list( + &self, primary_namespace: &str, secondary_namespace: &str, + ) -> Result, lightning::io::Error> { + check_namespace_key_validity(primary_namespace, secondary_namespace, None, "list")?; + + let dest_dir_path = self.get_dest_dir_path(primary_namespace, secondary_namespace)?; + self.list_impl(dest_dir_path) + } +} + +impl PaginatedKVStoreSync for FilesystemStoreV2 { + fn list_paginated( + &self, primary_namespace: &str, secondary_namespace: &str, page_token: Option, + ) -> Result { + check_namespace_key_validity( + primary_namespace, + secondary_namespace, + None, + "list_paginated", + )?; + + let dest_dir_path = self.get_dest_dir_path(primary_namespace, secondary_namespace)?; + self.list_paginated_impl(dest_dir_path, page_token) + } +} + +/// Extracts key from a path if it's a valid key file. +fn entry_to_key(path: &Path) -> Option { + if let Some(ext) = path.extension() { + #[cfg(target_os = "windows")] + { + // Clean up any trash files lying around. + if ext == "trash" { + fs::remove_file(path).ok(); + return None; + } + } + if ext == "tmp" { + return None; + } + } + + if !path.is_file() { + return None; + } + + path.file_name().and_then(|n| n.to_str()).and_then(|key| { + if is_valid_kvstore_str(key) { + Some(key.to_string()) + } else { + None + } + }) +} + +/// Formats a page token from mtime (millis since epoch) and key. +fn format_page_token(mtime_millis: u64, key: &str) -> String { + format!("{:016}:{}", mtime_millis, key) +} + +/// Parses a page token into mtime (millis since epoch) and key. +fn parse_page_token(token: &str) -> lightning::io::Result<(u64, String)> { + let colon_pos = token.find(':').ok_or_else(|| { + lightning::io::Error::new( + lightning::io::ErrorKind::InvalidInput, + "Invalid page token format", + ) + })?; + + if colon_pos != PAGE_TOKEN_TIMESTAMP_LEN { + return Err(lightning::io::Error::new( + lightning::io::ErrorKind::InvalidInput, + "Invalid page token format", + )); + } + + let mtime = token[..colon_pos].parse::().map_err(|_| { + lightning::io::Error::new( + lightning::io::ErrorKind::InvalidInput, + "Invalid page token timestamp", + ) + })?; + + let key = token[colon_pos + 1..].to_string(); + + Ok((mtime, key)) +} + +#[cfg(feature = "tokio")] +impl KVStore for FilesystemStoreV2 { + fn read( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, + ) -> impl Future, lightning::io::Error>> + 'static + Send { + let this = Arc::clone(&self.inner); + let primary_namespace = primary_namespace.to_string(); + let secondary_namespace = secondary_namespace.to_string(); + let key = key.to_string(); + let validation = check_namespace_key_validity( + &primary_namespace, + &secondary_namespace, + Some(&key), + "read", + ); + let file_path = self.get_file_path(&primary_namespace, &secondary_namespace, &key); + + async move { + validation?; + let file_path = file_path + .map_err(|e| lightning::io::Error::new(lightning::io::ErrorKind::Other, e))?; + + tokio::task::spawn_blocking(move || { + let mut buf = Vec::new(); + this.execute_locked_read(file_path.clone(), || { + let mut f = fs::File::open(&file_path)?; + f.read_to_end(&mut buf)?; + Ok(()) + })?; + Ok(buf) + }) + .await + .unwrap_or_else(|e| Err(lightning::io::Error::new(lightning::io::ErrorKind::Other, e))) + } + } + + fn write( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec, + ) -> impl Future> + 'static + Send { + let this = Arc::clone(&self.inner); + let primary_namespace = primary_namespace.to_string(); + let secondary_namespace = secondary_namespace.to_string(); + let key_str = key.to_string(); + let validation = check_namespace_key_validity( + &primary_namespace, + &secondary_namespace, + Some(&key_str), + "write", + ); + + let dest_file_path = self.get_file_path(&primary_namespace, &secondary_namespace, &key_str); + let (inner_lock_ref, version) = match &dest_file_path { + Ok(path) => self.get_new_version_and_lock_ref(path.clone()), + Err(_) => { + // We'll error out below, but we need placeholder values + (Arc::new(RwLock::new(0)), 0) + }, + }; + + async move { + validation?; + let dest_file_path = dest_file_path + .map_err(|e| lightning::io::Error::new(lightning::io::ErrorKind::Other, e))?; + + tokio::task::spawn_blocking(move || { + // Get the existing file's mtime if it exists (to preserve creation order on update) + let existing_mtime = + fs::metadata(&dest_file_path).ok().and_then(|m| m.modified().ok()); + + let options = WriteOptions { preserve_mtime: existing_mtime }; + let tmp_file_path = prepare_atomic_write(&this, &dest_file_path, &buf, &options)?; + + this.execute_locked_write(inner_lock_ref, dest_file_path.clone(), version, || { + #[cfg(not(target_os = "windows"))] + { + finalize_atomic_write_unix(&tmp_file_path, &dest_file_path) + } + + #[cfg(target_os = "windows")] + { + finalize_atomic_write_windows(&tmp_file_path, &dest_file_path, &options) + } + }) + .map(|_| ()) + }) + .await + .unwrap_or_else(|e| Err(lightning::io::Error::new(lightning::io::ErrorKind::Other, e))) + } + } + + fn remove( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool, + ) -> impl Future> + 'static + Send { + let this = Arc::clone(&self.inner); + let primary_namespace = primary_namespace.to_string(); + let secondary_namespace = secondary_namespace.to_string(); + let key_str = key.to_string(); + let validation = check_namespace_key_validity( + &primary_namespace, + &secondary_namespace, + Some(&key_str), + "remove", + ); + + let file_path = self.get_file_path(&primary_namespace, &secondary_namespace, &key_str); + let (inner_lock_ref, version) = match &file_path { + Ok(path) => self.get_new_version_and_lock_ref(path.clone()), + Err(_) => (Arc::new(RwLock::new(0)), 0), + }; + + async move { + validation?; + let file_path = file_path + .map_err(|e| lightning::io::Error::new(lightning::io::ErrorKind::Other, e))?; + + tokio::task::spawn_blocking(move || { + if !file_path.exists() { + // File doesn't exist, but we still need to clean up the lock + this.clean_locks(&inner_lock_ref, file_path); + return Ok(()); + } + + this.execute_locked_write(inner_lock_ref, file_path.clone(), version, || { + if !file_path.is_file() { + return Ok(()); + } + + if lazy { + fs::remove_file(&file_path)?; + } else { + #[cfg(not(target_os = "windows"))] + { + remove_file_unix(&file_path)?; + } + + #[cfg(target_os = "windows")] + { + remove_file_windows(&this, &file_path)?; + } + } + + Ok(()) + }) + .map(|_| ()) + }) + .await + .unwrap_or_else(|e| Err(lightning::io::Error::new(lightning::io::ErrorKind::Other, e))) + } + } + + fn list( + &self, primary_namespace: &str, secondary_namespace: &str, + ) -> impl Future, lightning::io::Error>> + 'static + Send { + let primary_namespace = primary_namespace.to_string(); + let secondary_namespace = secondary_namespace.to_string(); + let validation = + check_namespace_key_validity(&primary_namespace, &secondary_namespace, None, "list"); + let dest_dir_path = self.get_dest_dir_path(&primary_namespace, &secondary_namespace); + + async move { + validation?; + let path = dest_dir_path + .map_err(|e| lightning::io::Error::new(lightning::io::ErrorKind::Other, e))?; + + tokio::task::spawn_blocking(move || { + if !Path::new(&path).exists() { + return Ok(Vec::new()); + } + + let mut keys = Vec::new(); + for entry in fs::read_dir(&path)? { + let entry = entry?; + let entry_path = entry.path(); + + if let Some(key) = entry_to_key(&entry_path) { + keys.push(key); + } + } + + Ok(keys) + }) + .await + .unwrap_or_else(|e| Err(lightning::io::Error::new(lightning::io::ErrorKind::Other, e))) + } + } +} + +#[cfg(feature = "tokio")] +impl PaginatedKVStore for FilesystemStoreV2 { + fn list_paginated( + &self, primary_namespace: &str, secondary_namespace: &str, page_token: Option, + ) -> impl Future> + 'static + Send + { + let primary_namespace = primary_namespace.to_string(); + let secondary_namespace = secondary_namespace.to_string(); + let validation = check_namespace_key_validity( + &primary_namespace, + &secondary_namespace, + None, + "list_paginated", + ); + let dest_dir_path = self.get_dest_dir_path(&primary_namespace, &secondary_namespace); + + async move { + validation?; + let path = dest_dir_path + .map_err(|e| lightning::io::Error::new(lightning::io::ErrorKind::Other, e))?; + + tokio::task::spawn_blocking(move || { + if !Path::new(&path).exists() { + return Ok(PaginatedListResponse { keys: Vec::new(), next_page_token: None }); + } + + // Collect all entries with their modification times + let mut entries: Vec<(u64, String)> = Vec::new(); + for entry in fs::read_dir(&path)? { + let entry = entry?; + let entry_path = entry.path(); + + if let Some(key) = entry_to_key(&entry_path) { + let mtime_millis = entry + .metadata() + .ok() + .and_then(|m| m.modified().ok()) + .and_then(|t| t.duration_since(UNIX_EPOCH).ok()) + .map(|d| d.as_millis() as u64) + .unwrap_or(0); + + entries.push((mtime_millis, key)); + } + } + + // Sort by mtime descending (newest first), then by key descending for same mtime + entries.sort_by(|a, b| b.0.cmp(&a.0).then_with(|| b.1.cmp(&a.1))); + + // Find starting position based on page token + let start_idx = if let Some(token) = page_token { + let (token_mtime, token_key) = parse_page_token(&token.0)?; + + entries + .iter() + .position(|(mtime, key)| { + *mtime < token_mtime + || (*mtime == token_mtime && key.as_str() < token_key.as_str()) + }) + .unwrap_or(entries.len()) + } else { + 0 + }; + + // Take PAGE_SIZE entries starting from start_idx + let page_entries: Vec<_> = + entries.iter().skip(start_idx).take(PAGE_SIZE).cloned().collect(); + + let keys: Vec = page_entries.iter().map(|(_, key)| key.clone()).collect(); + + // Determine next page token + let next_page_token = if start_idx + PAGE_SIZE < entries.len() { + page_entries + .last() + .map(|(mtime, key)| PageToken(format_page_token(*mtime, key))) + } else { + None + }; + + Ok(PaginatedListResponse { keys, next_page_token }) + }) + .await + .unwrap_or_else(|e| Err(lightning::io::Error::new(lightning::io::ErrorKind::Other, e))) + } + } +} + +impl MigratableKVStore for FilesystemStoreV2 { + fn list_all_keys(&self) -> Result, lightning::io::Error> { + let prefixed_dest = &self.inner.data_dir; + if !prefixed_dest.exists() { + return Ok(Vec::new()); + } + + let mut keys = Vec::new(); + + for primary_entry in fs::read_dir(prefixed_dest)? { + let primary_entry = primary_entry?; + let primary_path = primary_entry.path(); + + if !primary_path.is_dir() { + // Skip non-directory entries at the root level + continue; + } + + let primary_namespace = match primary_path.file_name().and_then(|n| n.to_str()) { + Some(EMPTY_NAMESPACE_DIR) => String::new(), + Some(name) if is_valid_kvstore_str(name) => name.to_string(), + _ => continue, + }; + + for secondary_entry in fs::read_dir(&primary_path)? { + let secondary_entry = secondary_entry?; + let secondary_path = secondary_entry.path(); + + if !secondary_path.is_dir() { + // Skip non-directory entries at the secondary level + continue; + } + + let secondary_namespace = match secondary_path.file_name().and_then(|n| n.to_str()) + { + Some(EMPTY_NAMESPACE_DIR) => String::new(), + Some(name) if is_valid_kvstore_str(name) => name.to_string(), + _ => continue, + }; + + // Read all key files in this namespace + for key_entry in fs::read_dir(&secondary_path)? { + let key_entry = key_entry?; + let key_path = key_entry.path(); + + if let Some(key) = entry_to_key(&key_path) { + keys.push((primary_namespace.clone(), secondary_namespace.clone(), key)); + } + } + } + } + + Ok(keys) + } +} + +/// Migrates all data from a [`FilesystemStore`] (v1) to a [`FilesystemStoreV2`]. +/// +/// This function reads all keys from the source v1 store and writes them to the target v2 store. +/// The v2 store will use the new directory structure with `[empty]` markers for empty namespaces. +/// +/// # Arguments +/// +/// * `source` - The source v1 filesystem store to migrate from +/// * `target` - The target v2 filesystem store to migrate to +/// +/// # Errors +/// +/// Returns an error if any read or write operation fails. Note that in case of an error, +/// the target store may be left in a partially migrated state. +/// +/// # Example +/// +/// ```no_run +/// use lightning_persister::fs_store::FilesystemStore; +/// use lightning_persister::fs_store_v2::{FilesystemStoreV2, migrate_v1_to_v2}; +/// use std::path::PathBuf; +/// +/// let v1_store = FilesystemStore::new(PathBuf::from("/path/to/v1/data")); +/// let v2_store = FilesystemStoreV2::new(PathBuf::from("/path/to/v2/data")) +/// .expect("Failed to open v2 store"); +/// +/// migrate_v1_to_v2(&v1_store, &v2_store).expect("Migration failed"); +/// ``` +/// +/// [`FilesystemStore`]: crate::fs_store::FilesystemStore +pub fn migrate_v1_to_v2( + source: &S, target: &FilesystemStoreV2, +) -> Result<(), lightning::io::Error> { + let keys_to_migrate = source.list_all_keys()?; + + for (primary_namespace, secondary_namespace, key) in &keys_to_migrate { + let data = source.read(primary_namespace, secondary_namespace, key)?; + KVStoreSync::write(target, primary_namespace, secondary_namespace, key, data)?; + } + + Ok(()) +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::test_utils::{ + do_read_write_remove_list_persist, do_test_data_migration, do_test_store, + }; + use std::fs::FileTimes; + + impl Drop for FilesystemStoreV2 { + fn drop(&mut self) { + // We test for invalid directory names, so it's OK if directory removal + // fails. + match fs::remove_dir_all(&self.inner.data_dir) { + Err(e) => println!("Failed to remove test persister directory: {}", e), + _ => {}, + } + } + } + + #[test] + fn read_write_remove_list_persist() { + let mut temp_path = std::env::temp_dir(); + temp_path.push("test_read_write_remove_list_persist_v2"); + let fs_store = FilesystemStoreV2::new(temp_path).unwrap(); + do_read_write_remove_list_persist(&fs_store); + } + + #[cfg(feature = "tokio")] + #[tokio::test] + async fn read_write_remove_list_persist_async() { + use lightning::util::persist::KVStore; + use std::sync::Arc; + + let mut temp_path = std::env::temp_dir(); + temp_path.push("test_read_write_remove_list_persist_async_v2"); + let fs_store = Arc::new(FilesystemStoreV2::new(temp_path).unwrap()); + assert_eq!(fs_store.state_size(), 0); + + let async_fs_store = Arc::clone(&fs_store); + + let data1 = vec![42u8; 32]; + let data2 = vec![43u8; 32]; + + let primary = "testspace"; + let secondary = "testsubspace"; + let key = "testkey"; + + // Test writing the same key twice with different data. Execute the asynchronous part out of order to ensure + // that eventual consistency works. + let fut1 = KVStore::write(&*async_fs_store, primary, secondary, key, data1); + assert_eq!(fs_store.state_size(), 1); + + let fut2 = KVStore::remove(&*async_fs_store, primary, secondary, key, false); + assert_eq!(fs_store.state_size(), 1); + + let fut3 = KVStore::write(&*async_fs_store, primary, secondary, key, data2.clone()); + assert_eq!(fs_store.state_size(), 1); + + fut3.await.unwrap(); + assert_eq!(fs_store.state_size(), 1); + + fut2.await.unwrap(); + assert_eq!(fs_store.state_size(), 1); + + fut1.await.unwrap(); + assert_eq!(fs_store.state_size(), 0); + + // Test list. + let listed_keys = KVStore::list(&*async_fs_store, primary, secondary).await.unwrap(); + assert_eq!(listed_keys.len(), 1); + assert_eq!(listed_keys[0], key); + + // Test read. We expect to read data2, as the write call was initiated later. + let read_data = KVStore::read(&*async_fs_store, primary, secondary, key).await.unwrap(); + assert_eq!(data2, &*read_data); + + // Test remove. + KVStore::remove(&*async_fs_store, primary, secondary, key, false).await.unwrap(); + + let listed_keys = KVStore::list(&*async_fs_store, primary, secondary).await.unwrap(); + assert_eq!(listed_keys.len(), 0); + } + + #[test] + fn test_data_migration() { + let mut source_temp_path = std::env::temp_dir(); + source_temp_path.push("test_data_migration_source_v2"); + let mut source_store = FilesystemStoreV2::new(source_temp_path).unwrap(); + + let mut target_temp_path = std::env::temp_dir(); + target_temp_path.push("test_data_migration_target_v2"); + let mut target_store = FilesystemStoreV2::new(target_temp_path).unwrap(); + + do_test_data_migration(&mut source_store, &mut target_store); + } + + #[test] + fn test_v1_to_v2_migration() { + use crate::fs_store::FilesystemStore; + use lightning::util::persist::KVStoreSync; + + // Create v1 store and populate with test data + let mut v1_path = std::env::temp_dir(); + v1_path.push("test_v1_to_v2_migration_source"); + let v1_store = FilesystemStore::new(v1_path.clone()); + + let data = vec![42u8; 32]; + + // Write data with various namespace combinations + KVStoreSync::write(&v1_store, "", "", "root_key", data.clone()).unwrap(); + KVStoreSync::write(&v1_store, "primary", "", "primary_key", data.clone()).unwrap(); + KVStoreSync::write(&v1_store, "primary", "secondary", "nested_key", data.clone()).unwrap(); + + // Create v2 store + let mut v2_path = std::env::temp_dir(); + v2_path.push("test_v1_to_v2_migration_target"); + let v2_store = FilesystemStoreV2::new(v2_path.clone()).unwrap(); + + // Migrate + migrate_v1_to_v2(&v1_store, &v2_store).unwrap(); + + // Verify all data was migrated correctly + assert_eq!(KVStoreSync::read(&v2_store, "", "", "root_key").unwrap(), data); + assert_eq!(KVStoreSync::read(&v2_store, "primary", "", "primary_key").unwrap(), data); + assert_eq!( + KVStoreSync::read(&v2_store, "primary", "secondary", "nested_key").unwrap(), + data + ); + + // Verify v2 directory structure uses [empty] for empty namespaces + assert!(v2_path.join(EMPTY_NAMESPACE_DIR).join(EMPTY_NAMESPACE_DIR).exists()); + assert!(v2_path.join("primary").join(EMPTY_NAMESPACE_DIR).exists()); + assert!(v2_path.join("primary").join("secondary").exists()); + + // Verify list_all_keys works on the migrated data + let mut all_keys = v2_store.list_all_keys().unwrap(); + all_keys.sort(); + assert_eq!(all_keys.len(), 3); + assert!(all_keys.contains(&("".to_string(), "".to_string(), "root_key".to_string()))); + assert!(all_keys.contains(&( + "primary".to_string(), + "".to_string(), + "primary_key".to_string() + ))); + assert!(all_keys.contains(&( + "primary".to_string(), + "secondary".to_string(), + "nested_key".to_string() + ))); + } + + #[test] + fn test_v1_to_v2_migration_empty_store() { + use crate::fs_store::FilesystemStore; + + // Create empty v1 store + let mut v1_path = std::env::temp_dir(); + v1_path.push("test_v1_to_v2_migration_empty_source"); + let v1_store = FilesystemStore::new(v1_path); + + // Create v2 store + let mut v2_path = std::env::temp_dir(); + v2_path.push("test_v1_to_v2_migration_empty_target"); + let v2_store = FilesystemStoreV2::new(v2_path).unwrap(); + + // Migrate empty store should succeed + migrate_v1_to_v2(&v1_store, &v2_store).unwrap(); + + // Verify no keys exist + let all_keys = v2_store.list_all_keys().unwrap(); + assert_eq!(all_keys.len(), 0); + } + + #[test] + fn test_v1_to_v2_migration_data_integrity() { + use crate::fs_store::FilesystemStore; + use lightning::util::persist::KVStoreSync; + + // Create v1 store with different data for each key + let mut v1_path = std::env::temp_dir(); + v1_path.push("test_v1_to_v2_migration_integrity_source"); + let v1_store = FilesystemStore::new(v1_path); + + // Write unique data for each key + let data1 = vec![1u8; 100]; + let data2 = vec![2u8; 200]; + let data3 = vec![3u8; 50]; + let data4 = (0..255u8).collect::>(); // All byte values + + KVStoreSync::write(&v1_store, "", "", "key1", data1.clone()).unwrap(); + KVStoreSync::write(&v1_store, "ns1", "", "key2", data2.clone()).unwrap(); + KVStoreSync::write(&v1_store, "ns1", "ns2", "key3", data3.clone()).unwrap(); + KVStoreSync::write(&v1_store, "ns1", "ns2", "key4", data4.clone()).unwrap(); + + // Create v2 store and migrate + let mut v2_path = std::env::temp_dir(); + v2_path.push("test_v1_to_v2_migration_integrity_target"); + let v2_store = FilesystemStoreV2::new(v2_path).unwrap(); + + migrate_v1_to_v2(&v1_store, &v2_store).unwrap(); + + // Verify each key has exactly the right data + assert_eq!(KVStoreSync::read(&v2_store, "", "", "key1").unwrap(), data1); + assert_eq!(KVStoreSync::read(&v2_store, "ns1", "", "key2").unwrap(), data2); + assert_eq!(KVStoreSync::read(&v2_store, "ns1", "ns2", "key3").unwrap(), data3); + assert_eq!(KVStoreSync::read(&v2_store, "ns1", "ns2", "key4").unwrap(), data4); + } + + #[test] + fn test_v1_to_v2_migration_many_keys() { + use crate::fs_store::FilesystemStore; + use lightning::util::persist::{KVStoreSync, PaginatedKVStoreSync}; + + // Create v1 store with many keys + let mut v1_path = std::env::temp_dir(); + v1_path.push("test_v1_to_v2_migration_many_source"); + let v1_store = FilesystemStore::new(v1_path); + + let num_keys = 75; // More than one page (PAGE_SIZE = 50) + for i in 0..num_keys { + let key = format!("key_{:04}", i); + let data = vec![i as u8; 32]; + KVStoreSync::write(&v1_store, "bulk", "test", &key, data).unwrap(); + } + + // Create v2 store and migrate + let mut v2_path = std::env::temp_dir(); + v2_path.push("test_v1_to_v2_migration_many_target"); + let v2_store = FilesystemStoreV2::new(v2_path).unwrap(); + + migrate_v1_to_v2(&v1_store, &v2_store).unwrap(); + + // Verify all keys migrated + let keys = KVStoreSync::list(&v2_store, "bulk", "test").unwrap(); + assert_eq!(keys.len(), num_keys); + + // Verify pagination works on migrated data + let page1 = PaginatedKVStoreSync::list_paginated(&v2_store, "bulk", "test", None).unwrap(); + assert_eq!(page1.keys.len(), PAGE_SIZE); + assert!(page1.next_page_token.is_some()); + + let page2 = + PaginatedKVStoreSync::list_paginated(&v2_store, "bulk", "test", page1.next_page_token) + .unwrap(); + assert_eq!(page2.keys.len(), num_keys - PAGE_SIZE); + assert!(page2.next_page_token.is_none()); + + // Verify data integrity for a few random keys + for i in [0, 25, 50, 74] { + let key = format!("key_{:04}", i); + let expected_data = vec![i as u8; 32]; + assert_eq!(KVStoreSync::read(&v2_store, "bulk", "test", &key).unwrap(), expected_data); + } + } + + #[test] + fn test_v1_to_v2_migration_post_migration_operations() { + use crate::fs_store::FilesystemStore; + use lightning::util::persist::KVStoreSync; + + // Create v1 store with some data + let mut v1_path = std::env::temp_dir(); + v1_path.push("test_v1_to_v2_migration_post_ops_source"); + let v1_store = FilesystemStore::new(v1_path); + + let original_data = vec![42u8; 32]; + KVStoreSync::write(&v1_store, "ns", "sub", "existing_key", original_data.clone()).unwrap(); + + // Create v2 store and migrate + let mut v2_path = std::env::temp_dir(); + v2_path.push("test_v1_to_v2_migration_post_ops_target"); + let v2_store = FilesystemStoreV2::new(v2_path).unwrap(); + + migrate_v1_to_v2(&v1_store, &v2_store).unwrap(); + + // Test that we can write new keys after migration + let new_data = vec![43u8; 32]; + KVStoreSync::write(&v2_store, "ns", "sub", "new_key", new_data.clone()).unwrap(); + + // Test that we can update migrated keys + let updated_data = vec![44u8; 32]; + KVStoreSync::write(&v2_store, "ns", "sub", "existing_key", updated_data.clone()).unwrap(); + + // Verify reads work correctly + assert_eq!( + KVStoreSync::read(&v2_store, "ns", "sub", "existing_key").unwrap(), + updated_data + ); + assert_eq!(KVStoreSync::read(&v2_store, "ns", "sub", "new_key").unwrap(), new_data); + + // Verify list includes both old and new keys + let mut keys = KVStoreSync::list(&v2_store, "ns", "sub").unwrap(); + keys.sort(); + assert_eq!(keys, vec!["existing_key", "new_key"]); + + // Test removal works + KVStoreSync::remove(&v2_store, "ns", "sub", "existing_key", false).unwrap(); + let keys = KVStoreSync::list(&v2_store, "ns", "sub").unwrap(); + assert_eq!(keys, vec!["new_key"]); + } + + #[test] + fn test_v1_to_v2_migration_max_length_names() { + use crate::fs_store::FilesystemStore; + use lightning::util::persist::{KVStoreSync, KVSTORE_NAMESPACE_KEY_MAX_LEN}; + + // Create v1 store with maximum length names + let mut v1_path = std::env::temp_dir(); + v1_path.push("test_v1_to_v2_migration_max_len_source"); + let v1_store = FilesystemStore::new(v1_path); + + let max_name = "A".repeat(KVSTORE_NAMESPACE_KEY_MAX_LEN); + let data = vec![42u8; 32]; + + KVStoreSync::write(&v1_store, &max_name, &max_name, &max_name, data.clone()).unwrap(); + + // Create v2 store and migrate + let mut v2_path = std::env::temp_dir(); + v2_path.push("test_v1_to_v2_migration_max_len_target"); + let v2_store = FilesystemStoreV2::new(v2_path).unwrap(); + + migrate_v1_to_v2(&v1_store, &v2_store).unwrap(); + + // Verify the key was migrated correctly + assert_eq!(KVStoreSync::read(&v2_store, &max_name, &max_name, &max_name).unwrap(), data); + + // Verify list works + let keys = KVStoreSync::list(&v2_store, &max_name, &max_name).unwrap(); + assert_eq!(keys, vec![max_name.clone()]); + } + + #[test] + fn test_filesystem_store_v2() { + // Create the nodes, giving them FilesystemStoreV2s for data stores. + let store_0 = FilesystemStoreV2::new("test_filesystem_store_v2_0".into()).unwrap(); + let store_1 = FilesystemStoreV2::new("test_filesystem_store_v2_1".into()).unwrap(); + do_test_store(&store_0, &store_1) + } + + #[test] + fn test_page_token_format() { + let mtime: u64 = 1706500000000; + let key = "test_key"; + let token = format_page_token(mtime, key); + assert_eq!(token, "0001706500000000:test_key"); + + let parsed = parse_page_token(&token).unwrap(); + assert_eq!(parsed, (mtime, key.to_string())); + + // Test invalid tokens + assert!(parse_page_token("invalid").is_err()); + assert!(parse_page_token("0001706500000000_key").is_err()); // wrong separator + } + + #[test] + fn test_directory_structure() { + use lightning::util::persist::KVStoreSync; + + let mut temp_path = std::env::temp_dir(); + temp_path.push("test_directory_structure_v2"); + let fs_store = FilesystemStoreV2::new(temp_path.clone()).unwrap(); + + let data = vec![42u8; 32]; + + // Write with empty namespaces + KVStoreSync::write(&fs_store, "", "", "key1", data.clone()).unwrap(); + assert!(temp_path.join(EMPTY_NAMESPACE_DIR).join(EMPTY_NAMESPACE_DIR).exists()); + + // Write with non-empty primary, empty secondary + KVStoreSync::write(&fs_store, "primary", "", "key2", data.clone()).unwrap(); + assert!(temp_path.join("primary").join(EMPTY_NAMESPACE_DIR).exists()); + + // Write with both non-empty + KVStoreSync::write(&fs_store, "primary", "secondary", "key3", data.clone()).unwrap(); + assert!(temp_path.join("primary").join("secondary").exists()); + + // Verify we can read them back + assert_eq!(KVStoreSync::read(&fs_store, "", "", "key1").unwrap(), data); + assert_eq!(KVStoreSync::read(&fs_store, "primary", "", "key2").unwrap(), data); + assert_eq!(KVStoreSync::read(&fs_store, "primary", "secondary", "key3").unwrap(), data); + + // Verify files are named just by key (no timestamp prefix) + assert!(temp_path + .join(EMPTY_NAMESPACE_DIR) + .join(EMPTY_NAMESPACE_DIR) + .join("key1") + .exists()); + assert!(temp_path.join("primary").join(EMPTY_NAMESPACE_DIR).join("key2").exists()); + assert!(temp_path.join("primary").join("secondary").join("key3").exists()); + } + + #[test] + fn test_update_preserves_mtime() { + use lightning::util::persist::KVStoreSync; + + let mut temp_path = std::env::temp_dir(); + temp_path.push("test_update_preserves_mtime_v2"); + let fs_store = FilesystemStoreV2::new(temp_path.clone()).unwrap(); + + let data1 = vec![42u8; 32]; + let data2 = vec![43u8; 32]; + + // Write initial data + KVStoreSync::write(&fs_store, "ns", "sub", "key", data1).unwrap(); + + // Get the original mtime + let file_path = temp_path.join("ns").join("sub").join("key"); + let original_mtime = fs::metadata(&file_path).unwrap().modified().unwrap(); + + // Sleep briefly to ensure different timestamp if not preserved + std::thread::sleep(std::time::Duration::from_millis(50)); + + // Update with new data + KVStoreSync::write(&fs_store, "ns", "sub", "key", data2.clone()).unwrap(); + + // Verify mtime is preserved + let updated_mtime = fs::metadata(&file_path).unwrap().modified().unwrap(); + assert_eq!(original_mtime, updated_mtime); + + // Verify data was updated + assert_eq!(KVStoreSync::read(&fs_store, "ns", "sub", "key").unwrap(), data2); + } + + #[test] + fn test_paginated_listing() { + use lightning::util::persist::{KVStoreSync, PaginatedKVStoreSync}; + + let mut temp_path = std::env::temp_dir(); + temp_path.push("test_paginated_listing_v2"); + let fs_store = FilesystemStoreV2::new(temp_path).unwrap(); + + let data = vec![42u8; 32]; + + // Write several keys with small delays to ensure different mtimes + let keys: Vec = (0..5).map(|i| format!("key{}", i)).collect(); + for key in &keys { + KVStoreSync::write(&fs_store, "ns", "sub", key, data.clone()).unwrap(); + std::thread::sleep(std::time::Duration::from_millis(10)); + } + + // List paginated - should return newest first + let response = PaginatedKVStoreSync::list_paginated(&fs_store, "ns", "sub", None).unwrap(); + assert_eq!(response.keys.len(), 5); + // Newest key (key4) should be first + assert_eq!(response.keys[0], "key4"); + assert_eq!(response.keys[4], "key0"); + assert!(response.next_page_token.is_none()); // Less than PAGE_SIZE items + } + + #[test] + fn test_paginated_listing_with_pagination() { + use lightning::util::persist::{KVStoreSync, PaginatedKVStoreSync}; + + let mut temp_path = std::env::temp_dir(); + temp_path.push("test_paginated_listing_with_pagination_v2"); + let fs_store = FilesystemStoreV2::new(temp_path).unwrap(); + + let data = vec![42u8; 32]; + + // Write more than PAGE_SIZE keys + let num_keys = PAGE_SIZE + 50; + for i in 0..num_keys { + let key = format!("key{:04}", i); + KVStoreSync::write(&fs_store, "ns", "sub", &key, data.clone()).unwrap(); + // Small delay to ensure ordering + if i % 10 == 0 { + std::thread::sleep(std::time::Duration::from_millis(1)); + } + } + + // First page + let response1 = PaginatedKVStoreSync::list_paginated(&fs_store, "ns", "sub", None).unwrap(); + assert_eq!(response1.keys.len(), PAGE_SIZE); + assert!(response1.next_page_token.is_some()); + + // Second page + let response2 = + PaginatedKVStoreSync::list_paginated(&fs_store, "ns", "sub", response1.next_page_token) + .unwrap(); + assert_eq!(response2.keys.len(), 50); + assert!(response2.next_page_token.is_none()); + + // Verify no duplicates between pages + let all_keys: std::collections::HashSet<_> = + response1.keys.iter().chain(response2.keys.iter()).collect(); + assert_eq!(all_keys.len(), num_keys); + } + + #[test] + fn test_page_token_after_deletion() { + use lightning::util::persist::{KVStoreSync, PaginatedKVStoreSync}; + + let mut temp_path = std::env::temp_dir(); + temp_path.push("test_page_token_after_deletion_v2"); + let fs_store = FilesystemStoreV2::new(temp_path).unwrap(); + + let data = vec![42u8; 32]; + + // Write keys + for i in 0..10 { + let key = format!("key{}", i); + KVStoreSync::write(&fs_store, "ns", "sub", &key, data.clone()).unwrap(); + std::thread::sleep(std::time::Duration::from_millis(10)); + } + + // Verify initial listing + let response1 = PaginatedKVStoreSync::list_paginated(&fs_store, "ns", "sub", None).unwrap(); + assert_eq!(response1.keys.len(), 10); + + // Delete some keys + KVStoreSync::remove(&fs_store, "ns", "sub", "key5", false).unwrap(); + KVStoreSync::remove(&fs_store, "ns", "sub", "key3", false).unwrap(); + + // List again - should work fine with deleted keys + let response2 = PaginatedKVStoreSync::list_paginated(&fs_store, "ns", "sub", None).unwrap(); + assert_eq!(response2.keys.len(), 8); // 10 - 2 deleted + } + + #[test] + fn test_same_mtime_sorted_by_key() { + use lightning::util::persist::PaginatedKVStoreSync; + use std::time::Duration; + + // Create files directly on disk first with the same mtime + let mut temp_path = std::env::temp_dir(); + temp_path.push("test_same_mtime_sorted_by_key_v2"); + let _ = fs::remove_dir_all(&temp_path); + + let data = vec![42u8; 32]; + let dir = temp_path.join("ns").join("sub"); + fs::create_dir_all(&dir).unwrap(); + + // Write files with the same mtime but different keys + let keys = vec!["zebra", "apple", "mango", "banana"]; + let fixed_time = UNIX_EPOCH + Duration::from_secs(1706500000); + + for key in &keys { + let file_path = dir.join(key); + let file = fs::File::create(&file_path).unwrap(); + std::io::Write::write_all(&mut &file, &data).unwrap(); + file.set_times(FileTimes::new().set_modified(fixed_time)).unwrap(); + } + + // Open the store + let fs_store = FilesystemStoreV2::new(temp_path.clone()).unwrap(); + + // List paginated - should return keys sorted by key in reverse order + // (for same mtime, keys are sorted reverse alphabetically) + let response = PaginatedKVStoreSync::list_paginated(&fs_store, "ns", "sub", None).unwrap(); + assert_eq!(response.keys.len(), 4); + + // Same mtime means sorted by key in reverse order (z > m > b > a) + assert_eq!(response.keys[0], "zebra"); + assert_eq!(response.keys[1], "mango"); + assert_eq!(response.keys[2], "banana"); + assert_eq!(response.keys[3], "apple"); + } +} diff --git a/lightning-persister/src/lib.rs b/lightning-persister/src/lib.rs index e4973f532e8..ba64e916335 100644 --- a/lightning-persister/src/lib.rs +++ b/lightning-persister/src/lib.rs @@ -9,6 +9,7 @@ extern crate criterion; pub mod fs_store; +pub mod fs_store_v2; mod fs_store_common; mod utils;