diff --git a/crates/core/common/src/catalog/physical.rs b/crates/core/common/src/catalog/physical.rs index 747c5238f..3e1b9d497 100644 --- a/crates/core/common/src/catalog/physical.rs +++ b/crates/core/common/src/catalog/physical.rs @@ -318,13 +318,21 @@ impl PhysicalTable { } pub async fn canonical_chain(&self) -> Result, CanonicalChainError> { - let segments = self.segments().await.map_err(CanonicalChainError)?; + let segments = self + .segments() + .await + .map_err(CanonicalChainError::GetSegments)?; let canonical = canonical_chain(segments); if let Some(start_block) = self.dataset_start_block && let Some(canonical) = &canonical - && canonical.start() > start_block { - return Ok(None); + let first_ranges = canonical.first_ranges(); + if first_ranges.len() > 1 { + return Err(CanonicalChainError::MultiNetworkWithStartBlock); + } + if !first_ranges.is_empty() && first_ranges[0].start() > start_block { + return Ok(None); + } } Ok(canonical) } @@ -343,25 +351,13 @@ impl PhysicalTable { // Convert FileMetadata -> Segment let FileMetadata { - file_id, - file_name, - object_meta, - parquet_meta: ParquetMeta { mut ranges, .. }, + file_id: id, + object_meta: object, + parquet_meta: ParquetMeta { ranges, .. }, .. } = file; - if ranges.len() != 1 { - return Err(GetSegmentsError::UnexpectedRangeCount { - file_name: file_name.to_string(), - count: ranges.len(), - }); - } - - Ok(Segment { - id: file_id, - range: ranges.remove(0), - object: object_meta, - }) + Ok(Segment::new(id, object, ranges)) }) .try_collect() .await @@ -453,10 +449,6 @@ pub enum GetSegmentsError { /// Failed to parse parquet metadata JSON #[error("failed to parse parquet metadata")] ParseMetadata(#[source] serde_json::Error), - - /// File has unexpected number of block ranges - #[error("expected exactly 1 range for file '{file_name}', found {count}")] - UnexpectedRangeCount { file_name: String, count: usize }, } /// Errors that can occur when computing missing ranges for a physical table. @@ -465,9 +457,17 @@ pub enum GetSegmentsError { pub struct MissingRangesError(#[source] pub GetSegmentsError); /// Errors that can occur when computing the canonical chain for a physical table. +/// Errors that can occur when computing the canonical chain. #[derive(Debug, thiserror::Error)] -#[error("failed to compute canonical chain")] -pub struct CanonicalChainError(#[source] pub GetSegmentsError); +pub enum CanonicalChainError { + /// Failed to get segments + #[error("failed to get segments")] + GetSegments(#[source] GetSegmentsError), + + /// Multi-network canonical chain not supported with start block + #[error("multi-network datasets with start_block are not supported")] + MultiNetworkWithStartBlock, +} /// Errors that can occur when creating a table snapshot. #[derive(Debug, thiserror::Error)] @@ -496,14 +496,22 @@ impl TableSnapshot { /// Return the block range to use for query execution over this table. None is returned if no /// block range has been synced. pub fn synced_range(&self) -> Option { - let segments = &self.canonical_segments; - let start = segments.iter().min_by_key(|s| s.range.start())?; - let end = segments.iter().max_by_key(|s| s.range.end())?; + let start = &self + .canonical_segments + .iter() + .min_by_key(|s| s.single_range().start())? + .ranges()[0]; + let end = &self + .canonical_segments + .iter() + .max_by_key(|s| s.single_range().end())? + .ranges()[0]; + Some(BlockRange { - network: start.range.network.clone(), - numbers: start.range.start()..=end.range.end(), - hash: end.range.hash, - prev_hash: start.range.prev_hash, + network: start.network.clone(), + numbers: start.start()..=end.end(), + hash: end.hash, + prev_hash: start.prev_hash, }) } @@ -520,9 +528,12 @@ impl TableSnapshot { &self, segment: &Segment, ) -> Result { - let metadata = self.reader_factory.get_cached_metadata(segment.id).await?; - let file = PartitionedFile::from(segment.object.clone()) - .with_extensions(Arc::new(segment.id)) + let metadata = self + .reader_factory + .get_cached_metadata(segment.id()) + .await?; + let file = PartitionedFile::from(segment.object().clone()) + .with_extensions(Arc::new(segment.id())) .with_statistics(metadata.statistics); Ok(file) } diff --git a/crates/core/common/src/metadata/segments.rs b/crates/core/common/src/metadata/segments.rs index bc3184ca0..3c12e92e4 100644 --- a/crates/core/common/src/metadata/segments.rs +++ b/crates/core/common/src/metadata/segments.rs @@ -84,9 +84,78 @@ impl From> for ResumeWatermark { /// A block range associated with the metadata from a file in object storage. #[derive(Clone, Debug, PartialEq, Eq)] pub struct Segment { - pub id: FileId, - pub range: BlockRange, - pub object: ObjectMeta, + id: FileId, + object: ObjectMeta, + ranges: Vec, +} + +impl Segment { + /// Returns a Segment, where the ranges are ordered by network. + pub fn new(id: FileId, object: ObjectMeta, mut ranges: Vec) -> Self { + ranges.sort_unstable_by(|a, b| a.network.cmp(&b.network)); + Self { id, object, ranges } + } + + /// Returns the file ID of this segment. + pub fn id(&self) -> FileId { + self.id + } + + /// Returns a reference to the object metadata. + pub fn object(&self) -> &ObjectMeta { + &self.object + } + + /// Returns a slice of all block ranges in this segment. + pub fn ranges(&self) -> &[BlockRange] { + &self.ranges + } + + /// Returns the single block range for single-network segments. + /// + /// # Panics + /// Panics if the segment does not have exactly one range. This is a temporary + /// method to support the transition to multi-network streaming. Code outside this module + /// should use this method when it can assume single-network segments. For now, it is safe to + /// assume this because we cannot stream out multi-network segments, and therefore they cannot + /// be materialized. + pub fn single_range(&self) -> &BlockRange { + assert_eq!(self.ranges.len(), 1); + &self.ranges[0] + } + + pub fn adjacent(&self, other: &Self) -> bool { + self.ranges.len() == other.ranges.len() + && std::iter::zip(&self.ranges, &other.ranges).all(|(a, b)| a.adjacent(b)) + } + + /// Lexicographic comparison of segments by start block numbers. + /// + /// Compares ranges in order: if the first network's starts differ, return that ordering. + /// Otherwise, compare the second network's starts, and so on. + fn cmp_by_start(&self, other: &Self) -> std::cmp::Ordering { + for (ra, rb) in std::iter::zip(&self.ranges, &other.ranges) { + match ra.start().cmp(&rb.start()) { + std::cmp::Ordering::Equal => continue, + ordering => return ordering, + } + } + std::cmp::Ordering::Equal + } + + /// Lexicographic comparison of segments by end block numbers. + /// + /// Compares ranges in order: if the first network's ends differ, return that ordering. + /// Otherwise, compare the second network's ends, and so on. + fn cmp_by_end(&self, other: &Self) -> std::cmp::Ordering { + for (ra, rb) in std::iter::zip(&self.ranges, &other.ranges) { + match ra.end().cmp(&rb.end()) { + std::cmp::Ordering::Equal => continue, + ordering => return ordering, + } + } + std::cmp::Ordering::Equal + } } /// A sequence of adjacent segments. @@ -97,34 +166,34 @@ impl Chain { #[cfg(test)] fn check_invariants(&self) { assert!(!self.0.is_empty()); - for w in self.0.windows(2) { - assert!(BlockRange::adjacent(&w[0].range, &w[1].range)); + for segments in self.0.windows(2) { + assert!(segments[0].adjacent(&segments[1])); } } - pub fn start(&self) -> BlockNum { - self.first().start() + /// Return all ranges from the first segment. + pub fn first_ranges(&self) -> &[BlockRange] { + &self.0.first().unwrap().ranges } - pub fn end(&self) -> BlockNum { - self.last().end() + /// Return all ranges from the last segment. + pub fn last_ranges(&self) -> &[BlockRange] { + &self.0.last().unwrap().ranges } - pub fn first(&self) -> &BlockRange { - &self.0.first().unwrap().range + /// Return the first segment in the chain. + pub fn first_segment(&self) -> &Segment { + self.0.first().unwrap() } - pub fn last(&self) -> &BlockRange { - &self.0.last().unwrap().range + /// Return the last segment in the chain. + pub fn last_segment(&self) -> &Segment { + self.0.last().unwrap() } - pub fn range(&self) -> BlockRange { - BlockRange { - numbers: self.start()..=self.end(), - network: self.first().network.clone(), - hash: self.last().hash, - prev_hash: self.first().prev_hash, - } + /// Number of networks (consistent across all segments in the chain). + pub fn network_count(&self) -> usize { + self.first_ranges().len() } } @@ -152,6 +221,9 @@ pub fn canonical_chain(segments: Vec) -> Option { /// Return the block ranges missing from this table out of the given `desired` range. The /// returned ranges are non-overlapping and sorted by their start block number. /// +/// This function is designed for single-network segments (raw datasets). All segments must +/// have exactly one network range. +/// /// To resolve reorgs, the missing ranges may include block ranges already indexed. A reorg is /// detected when there is some fork, which is a chain of segments that has a greater block height /// than the canonical chain. Divergence from the canonical chain is detected using the `hash` and @@ -181,11 +253,17 @@ pub fn missing_ranges( segments: Vec, desired: RangeInclusive, ) -> Vec> { + // Invariant: this function only works for single-network segments (raw datasets) + if let Some(first_segment) = segments.first() { + assert_eq!(first_segment.ranges.len(), 1); + } + let mut missing = vec![desired.clone()]; // remove overlapping ranges from each segment for segment in &segments { - let segment_range = segment.range.numbers.clone(); + assert_eq!(segment.ranges.len(), 1); + let segment_range = segment.ranges[0].numbers.clone(); let mut index = 0; while index < missing.len() { if block_range_intersection(missing[index].clone(), segment_range.clone()).is_none() { @@ -203,11 +281,11 @@ pub fn missing_ranges( if let Some(chains) = chains(segments) && let Some(fork) = chains.fork { - let reorg_block = fork.start() - 1; + let reorg_block = fork.first_ranges()[0].start().saturating_sub(1); let canonical_segments = &chains.canonical.0; let canonical_range = canonical_segments .iter() - .map(|s| s.range.numbers.clone()) + .map(|s| s.ranges[0].numbers.clone()) .rfind(|r| r.contains(&reorg_block)); if let Some(canonical_range) = canonical_range { let reorg_range = *canonical_range.start()..=reorg_block; @@ -249,10 +327,10 @@ fn chains(mut segments: Vec) -> Option { // Sort by start block ascending, then by last_modified descending. // - start ascending: enables forward single-pass chain construction // - last_modified descending: when multiple segments could extend the chain, the first one - // encountered (most recently modified) is preferred to favor compated segments. + // encountered (most recently modified) is preferred to favor compacted segments. segments.sort_unstable_by(|a, b| { use std::cmp::Ord; - Ord::cmp(&a.range.start(), &b.range.start()) + a.cmp_by_start(b) .then_with(|| Ord::cmp(&b.object.last_modified, &a.object.last_modified)) }); @@ -267,7 +345,7 @@ fn chains(mut segments: Vec) -> Option { continue; } - if BlockRange::adjacent(&canonical.last().unwrap().range, &segment.range) { + if canonical.last().unwrap().adjacent(&segment) { canonical.push(segment); } else { non_canonical.push(segment); @@ -282,30 +360,33 @@ fn chains(mut segments: Vec) -> Option { // segments. non_canonical.sort_unstable_by(|a, b| { use std::cmp::Ord; - Ord::cmp(&a.range.end(), &b.range.end()) + a.cmp_by_end(b) .then_with(|| Ord::cmp(&a.object.last_modified, &b.object.last_modified)) }); // Search for a valid fork chain. A fork must: // 1. Extend beyond the canonical chain's end block - // 2. Connect back to at most canonical.end() + 1 (to form a valid divergence point) + // 2. Connect back to at most canonical.last() + 1 (to form a valid divergence point) let mut fork: Option = None; while let Some(fork_end) = non_canonical.pop() { // Early exit: remaining segments all have end <= canonical.end() (sorted ascending), // so none can form a fork that extends beyond canonical. - if fork_end.range.end() <= canonical.end() { + // For multi-network, compare lexicographically. + if fork_end.cmp_by_end(canonical.last_segment()) != std::cmp::Ordering::Greater { break; } // Build a candidate fork chain backwards from fork_end. let mut fork_segments = vec![fork_end]; for segment in non_canonical.iter().rev() { - if BlockRange::adjacent(&segment.range, &fork_segments.first().unwrap().range) { + if segment.adjacent(fork_segments.first().unwrap()) { fork_segments.insert(0, segment.clone()); } } // Check if this fork connects back to a valid divergence point. - if fork_segments.first().unwrap().range.start() <= (canonical.end() + 1) { + if std::iter::zip(&fork_segments[0].ranges, &canonical.last_segment().ranges) + .all(|(fork_range, canonical_range)| fork_range.start() <= canonical_range.end() + 1) + { fork = Some(Chain(fork_segments)); break; } @@ -375,16 +456,27 @@ mod test { use super::{BlockRange, Chain, Chains, Segment}; - fn test_range(numbers: RangeInclusive, fork: (u8, u8)) -> BlockRange { - fn test_hash(number: u8, fork: u8) -> BlockHash { - let mut hash: BlockHash = Default::default(); - hash.0[0] = number; - hash.0[31] = fork; - hash + fn test_hash(number: u8, fork: u8) -> BlockHash { + let mut hash: BlockHash = Default::default(); + hash.0[0] = number; + hash.0[31] = fork; + hash + } + + fn test_object(timestamp: i64) -> ObjectMeta { + ObjectMeta { + location: Default::default(), + last_modified: DateTime::from_timestamp_millis(timestamp).unwrap(), + size: 0, + e_tag: None, + version: None, } + } + + fn test_range(network: &str, numbers: RangeInclusive, fork: (u8, u8)) -> BlockRange { BlockRange { numbers: numbers.clone(), - network: "test".parse().expect("valid network id"), + network: network.parse().expect("valid network id"), hash: test_hash(*numbers.end() as u8, fork.1), prev_hash: if *numbers.start() == 0 { Default::default() @@ -395,19 +487,27 @@ mod test { } fn test_segment(numbers: RangeInclusive, fork: (u8, u8), timestamp: i64) -> Segment { - let range = test_range(numbers.clone(), fork); - let object = ObjectMeta { - location: Default::default(), - last_modified: DateTime::from_timestamp_millis(timestamp).unwrap(), - size: 0, - e_tag: None, - version: None, - }; - Segment { - range, - object, - id: FileId::try_from(1i64).expect("FileId::MIN is 1"), - } + let range = test_range("test", numbers.clone(), fork); + Segment::new( + FileId::try_from(1i64).expect("FileId::MIN is 1"), + test_object(timestamp), + vec![range], + ) + } + + fn test_segment_multi( + network_a: (RangeInclusive, (u8, u8)), + network_b: (RangeInclusive, (u8, u8)), + timestamp: i64, + ) -> Segment { + Segment::new( + FileId::try_from(1i64).expect("FileId::MIN is 1"), + test_object(timestamp), + vec![ + test_range("a", network_a.0, network_a.1), + test_range("b", network_b.0, network_b.1), + ], + ) } #[test] @@ -609,6 +709,77 @@ mod test { ); } + #[test] + fn chains_multi_network() { + // Case 1: Canonical chain with 2 networks + // Both networks have adjacent ranges forming a canonical chain + assert_eq!( + super::chains(vec![ + test_segment_multi((0..=2, (0, 0)), (0..=2, (0, 0)), 0), + test_segment_multi((3..=5, (0, 0)), (3..=5, (0, 0)), 0), + ]), + Some(Chains { + canonical: Chain(vec![ + test_segment_multi((0..=2, (0, 0)), (0..=2, (0, 0)), 0), + test_segment_multi((3..=5, (0, 0)), (3..=5, (0, 0)), 0), + ]), + fork: None, + }) + ); + + // Case 2: First network diverges to create a fork + // Canonical chain has both networks up to block 5 + // Fork extends both networks to block 6 with different hashes + assert_eq!( + super::chains(vec![ + test_segment_multi((0..=2, (0, 0)), (0..=2, (0, 0)), 0), + test_segment_multi((3..=5, (0, 0)), (3..=5, (0, 0)), 0), + test_segment_multi((3..=6, (0, 1)), (6..=7, (0, 0)), 0), + ]), + Some(Chains { + canonical: Chain(vec![ + test_segment_multi((0..=2, (0, 0)), (0..=2, (0, 0)), 0), + test_segment_multi((3..=5, (0, 0)), (3..=5, (0, 0)), 0), + ]), + fork: Some(Chain(vec![test_segment_multi( + (3..=6, (0, 1)), + (6..=7, (0, 0)), + 0 + )])), + }) + ); + + // Case 3: First network diverges by 1 segment, second network diverges by 2 segments. + // Canonical (2 segments): + // Network a: [0..=2] -> [3..=5] + // Network b: [0..=2] -> [3..=5] + // Fork (2 segments extending past canonical): + // Segment 1: Network A [3..=6] (reorg at 3, extends to 6) + // Network B [3..=7] (reorg at 3, extends to 7) + // Segment 2: Network A [7..=8] (continues fork) + // Network B [8..=9] (continues fork, diverges more) + // Fork ends at (8, 10) which is greater than canonical's (5, 5). + // Canonical has newer timestamp (3) so it's preferred over fork (timestamp 1, 2). + assert_eq!( + super::chains(vec![ + test_segment_multi((0..=2, (0, 0)), (0..=2, (0, 0)), 0), + test_segment_multi((3..=6, (0, 1)), (3..=7, (0, 1)), 1), + test_segment_multi((7..=8, (1, 1)), (8..=9, (1, 1)), 2), + test_segment_multi((3..=5, (0, 0)), (3..=5, (0, 0)), 3), + ]), + Some(Chains { + canonical: Chain(vec![ + test_segment_multi((0..=2, (0, 0)), (0..=2, (0, 0)), 0), + test_segment_multi((3..=5, (0, 0)), (3..=5, (0, 0)), 3), + ]), + fork: Some(Chain(vec![ + test_segment_multi((3..=6, (0, 1)), (3..=7, (0, 1)), 1), + test_segment_multi((7..=8, (1, 1)), (8..=9, (1, 1)), 2), + ])), + }) + ); + } + #[test] fn missing_ranges() { fn missing_ranges( @@ -670,7 +841,7 @@ mod test { loop { let start = segments .last() - .map(|s| s.range.end() + 1) + .map(|s| s.ranges[0].end() + 1) .unwrap_or(*numbers.start()); let end = rng.random_range(start..=*numbers.end()); segments.push(test_segment(start..=end, (fork, fork), 0)); @@ -708,11 +879,15 @@ mod test { let chains = super::chains(segments).unwrap(); assert_eq!(chains.canonical, canonical); if let Some(fork) = chains.fork { - assert!(fork.end() > canonical.end()); - assert!(fork.start() > canonical.start()); - assert!(fork.start() <= canonical.end() + 1); + assert!(fork.last_ranges()[0].end() > canonical.last_ranges()[0].end()); + assert!(fork.first_ranges()[0].start() > canonical.first_ranges()[0].start()); + assert!(fork.first_ranges()[0].start() <= canonical.last_ranges()[0].end() + 1); } else { - assert!(other_chains.iter().all(|c| c.end() <= canonical.end())); + assert!( + other_chains + .iter() + .all(|c| c.last_ranges()[0].end() <= canonical.last_ranges()[0].end()) + ); } } } diff --git a/crates/core/dump/src/compaction/plan.rs b/crates/core/dump/src/compaction/plan.rs index 76c49a65e..bfb5bcbac 100644 --- a/crates/core/dump/src/compaction/plan.rs +++ b/crates/core/dump/src/compaction/plan.rs @@ -50,10 +50,10 @@ impl CompactionFile { segment: &Segment, is_tail: bool, ) -> CompactionResult { - let file_id = segment.id; - let range = segment.range.clone(); + let file_id = segment.id(); + let range = segment.single_range().clone(); - let mut file_meta = PartitionedFile::from(segment.object.clone()); + let mut file_meta = PartitionedFile::from(segment.object().clone()); file_meta.extensions = Some(Arc::new(file_id)); diff --git a/crates/core/dump/src/streaming_query.rs b/crates/core/dump/src/streaming_query.rs index ee94ff0c9..4716df4a9 100644 --- a/crates/core/dump/src/streaming_query.rs +++ b/crates/core/dump/src/streaming_query.rs @@ -651,7 +651,7 @@ impl StreamingQuery { let mut latest_src_watermarks: Vec = Default::default(); 'chain_loop: for chain in chains { for segment in chain.iter().rev() { - let watermark = (&segment.range).into(); + let watermark = segment.single_range().into(); if self .blocks_table_contains(ctx, &watermark) .await @@ -728,7 +728,7 @@ impl StreamingQuery { .0 .iter() .rev() - .map(|s| &s.range.numbers) + .map(|s| &s.single_range().numbers) .find(|r| r.contains(&min_fork_block_num)) .unwrap_or(&(0..=0)) .start(); @@ -751,11 +751,11 @@ impl StreamingQuery { // Optimization: Check segment metadata first to avoid expensive query, // Walk segments in reverse to find one that covers this block number. for segment in blocks_segments.canonical_segments().iter().rev() { - if *segment.range.numbers.start() <= watermark.number { + if *segment.single_range().numbers.start() <= watermark.number { // Found segment that could contain this block - if *segment.range.numbers.end() == watermark.number { + if *segment.single_range().numbers.end() == watermark.number { // Exact match on segment end - use segment hash directly - return Ok(segment.range.hash == watermark.hash); + return Ok(segment.single_range().hash == watermark.hash); } // Block is inside segment but not at end. // So we will need to query the data file to find the hash. diff --git a/crates/core/worker-datasets-derived/src/dataset.rs b/crates/core/worker-datasets-derived/src/dataset.rs index bc633639a..53cd6ec1f 100644 --- a/crates/core/worker-datasets-derived/src/dataset.rs +++ b/crates/core/worker-datasets-derived/src/dataset.rs @@ -564,12 +564,11 @@ async fn dump_table( // Track start time for duration calculation let table_dump_start = Instant::now(); - let latest_range = table + let resume_watermark = table .canonical_chain() .await .map_err(DumpTableSpawnError::CanonicalChain)? - .map(|c| c.last().clone()); - let resume_watermark = latest_range.map(|r| ResumeWatermark::from_ranges(&[r])); + .map(|c| ResumeWatermark::from_ranges(c.last_ranges())); // Execute the dump, capturing any errors for sync.failed event let dump_result = dump_sql_query( diff --git a/crates/services/admin-api/src/handlers/jobs/progress.rs b/crates/services/admin-api/src/handlers/jobs/progress.rs index 94ed15105..80b04e41e 100644 --- a/crates/services/admin-api/src/handlers/jobs/progress.rs +++ b/crates/services/admin-api/src/handlers/jobs/progress.rs @@ -208,7 +208,7 @@ pub async fn handler( let files_count = canonical_segments.len() as i64; let total_size_bytes = canonical_segments .iter() - .map(|s| s.object.size as i64) + .map(|s| s.object().size as i64) .sum(); let (start, end) = match synced_range {