Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 46 additions & 35 deletions crates/core/common/src/catalog/physical.rs
Original file line number Diff line number Diff line change
Expand Up @@ -318,13 +318,21 @@ impl PhysicalTable {
}

pub async fn canonical_chain(&self) -> Result<Option<Chain>, CanonicalChainError> {
let segments = self.segments().await.map_err(CanonicalChainError)?;
let segments = self
.segments()
.await
.map_err(CanonicalChainError::GetSegments)?;
let canonical = canonical_chain(segments);
if let Some(start_block) = self.dataset_start_block
&& let Some(canonical) = &canonical
&& canonical.start() > start_block
{
return Ok(None);
let first_ranges = canonical.first_ranges();
if first_ranges.len() > 1 {
return Err(CanonicalChainError::MultiNetworkWithStartBlock);
}
if !first_ranges.is_empty() && first_ranges[0].start() > start_block {
return Ok(None);
}
}
Ok(canonical)
}
Expand All @@ -343,25 +351,13 @@ impl PhysicalTable {

// Convert FileMetadata -> Segment
let FileMetadata {
file_id,
file_name,
object_meta,
parquet_meta: ParquetMeta { mut ranges, .. },
file_id: id,
object_meta: object,
parquet_meta: ParquetMeta { ranges, .. },
..
} = file;

if ranges.len() != 1 {
return Err(GetSegmentsError::UnexpectedRangeCount {
file_name: file_name.to_string(),
count: ranges.len(),
});
}

Ok(Segment {
id: file_id,
range: ranges.remove(0),
object: object_meta,
})
Ok(Segment::new(id, object, ranges))
})
.try_collect()
.await
Expand Down Expand Up @@ -453,10 +449,6 @@ pub enum GetSegmentsError {
/// Failed to parse parquet metadata JSON
#[error("failed to parse parquet metadata")]
ParseMetadata(#[source] serde_json::Error),

/// File has unexpected number of block ranges
#[error("expected exactly 1 range for file '{file_name}', found {count}")]
UnexpectedRangeCount { file_name: String, count: usize },
}

/// Errors that can occur when computing missing ranges for a physical table.
Expand All @@ -465,9 +457,17 @@ pub enum GetSegmentsError {
pub struct MissingRangesError(#[source] pub GetSegmentsError);

/// Errors that can occur when computing the canonical chain for a physical table.
/// Errors that can occur when computing the canonical chain.
#[derive(Debug, thiserror::Error)]
#[error("failed to compute canonical chain")]
pub struct CanonicalChainError(#[source] pub GetSegmentsError);
pub enum CanonicalChainError {
/// Failed to get segments
#[error("failed to get segments")]
GetSegments(#[source] GetSegmentsError),

/// Multi-network canonical chain not supported with start block
#[error("multi-network datasets with start_block are not supported")]
MultiNetworkWithStartBlock,
Comment on lines +467 to +469
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this mean that multi-chain joins need to sync the whole history?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't change current behavior I'm leaving it as an error for now, since 1 start block does not make sense for multi-network tables. I would expect we support something like this in the future, but it is unclear at this time.

}

/// Errors that can occur when creating a table snapshot.
#[derive(Debug, thiserror::Error)]
Expand Down Expand Up @@ -496,14 +496,22 @@ impl TableSnapshot {
/// Return the block range to use for query execution over this table. None is returned if no
/// block range has been synced.
pub fn synced_range(&self) -> Option<BlockRange> {
let segments = &self.canonical_segments;
let start = segments.iter().min_by_key(|s| s.range.start())?;
let end = segments.iter().max_by_key(|s| s.range.end())?;
let start = &self
.canonical_segments
.iter()
.min_by_key(|s| s.single_range().start())?
.ranges()[0];
let end = &self
.canonical_segments
.iter()
.max_by_key(|s| s.single_range().end())?
.ranges()[0];

Some(BlockRange {
network: start.range.network.clone(),
numbers: start.range.start()..=end.range.end(),
hash: end.range.hash,
prev_hash: start.range.prev_hash,
network: start.network.clone(),
numbers: start.start()..=end.end(),
hash: end.hash,
prev_hash: start.prev_hash,
})
}

Expand All @@ -520,9 +528,12 @@ impl TableSnapshot {
&self,
segment: &Segment,
) -> Result<PartitionedFile, GetCachedMetadataError> {
let metadata = self.reader_factory.get_cached_metadata(segment.id).await?;
let file = PartitionedFile::from(segment.object.clone())
.with_extensions(Arc::new(segment.id))
let metadata = self
.reader_factory
.get_cached_metadata(segment.id())
.await?;
let file = PartitionedFile::from(segment.object().clone())
.with_extensions(Arc::new(segment.id()))
.with_statistics(metadata.statistics);
Ok(file)
}
Expand Down
Loading