From 6dc9e8ddb4b291aae67ca50efcb1f77eb44c3c7a Mon Sep 17 00:00:00 2001 From: LFC <990479+MichaelScofield@users.noreply.github.com> Date: Wed, 9 Jul 2025 10:11:23 +0800 Subject: [PATCH] feat: display extension ranges in "explain" (#6475) * feat: display extension ranges in "explain" Signed-off-by: luofucong * fix ci Signed-off-by: luofucong --------- Signed-off-by: luofucong --- src/common/time/src/range.rs | 107 ++++++++++++++++++++++++++++++ src/datanode/src/error.rs | 5 +- src/datanode/src/tests.rs | 6 +- src/mito2/src/extension.rs | 3 +- src/mito2/src/read/scan_region.rs | 34 +++++++++- src/mito2/src/read/scan_util.rs | 9 +-- 6 files changed, 153 insertions(+), 11 deletions(-) diff --git a/src/common/time/src/range.rs b/src/common/time/src/range.rs index 0461378449..91fc75fea4 100644 --- a/src/common/time/src/range.rs +++ b/src/common/time/src/range.rs @@ -13,6 +13,7 @@ // limitations under the License. use std::fmt::{Debug, Display, Formatter}; +use std::ops::{Bound, RangeBounds}; use serde::{Deserialize, Serialize}; @@ -303,6 +304,41 @@ impl TimestampRange { } } +/// Create [TimestampRange] from a timestamp tuple. +/// The tuple's two elements form the "start" and "end" (both inclusive) of the timestamp range. +impl From<(Timestamp, Timestamp)> for TimestampRange { + fn from((start, end): (Timestamp, Timestamp)) -> Self { + if start > end { + Self::empty() + } else { + Self::new_inclusive(Some(start), Some(end)) + } + } +} + +/// Create [TimestampRange] from Rust's "range". +impl> From for TimestampRange { + fn from(r: R) -> Self { + let start = match r.start_bound() { + Bound::Included(x) => Some(*x), + Bound::Excluded(x) => x + .value() + .checked_sub(1) + .map(|v| Timestamp::new(v, x.unit())), + Bound::Unbounded => None, + }; + let end = match r.end_bound() { + Bound::Included(x) => x + .value() + .checked_add(1) + .map(|v| Timestamp::new(v, x.unit())), + Bound::Excluded(x) => Some(*x), + Bound::Unbounded => None, + }; + Self::from_optional(start, end) + } +} + /// Time range in milliseconds. pub type RangeMillis = GenericRange; @@ -545,4 +581,75 @@ mod tests { TimeUnit::Nanosecond ); } + + #[test] + fn test_from_timestamp_tuple() { + let timestamp_range: TimestampRange = + (Timestamp::new_millisecond(1), Timestamp::new_millisecond(3)).into(); + assert_eq!( + timestamp_range, + TimestampRange::from_optional( + Some(Timestamp::new_millisecond(1)), + Some(Timestamp::new_millisecond(4)) + ) + ); + + let timestamp_range: TimestampRange = + (Timestamp::new_millisecond(1), Timestamp::new_millisecond(1)).into(); + assert_eq!( + timestamp_range, + TimestampRange::from_optional( + Some(Timestamp::new_millisecond(1)), + Some(Timestamp::new_millisecond(2)) + ) + ); + + let timestamp_range: TimestampRange = + (Timestamp::new_second(1), Timestamp::new_millisecond(3)).into(); + assert_eq!(timestamp_range, TimestampRange::empty()); + } + + #[test] + fn test_from_timestamp_range() { + let timestamp_range: TimestampRange = + (Timestamp::new_millisecond(1)..Timestamp::new_millisecond(3)).into(); + assert_eq!( + timestamp_range, + TimestampRange::from_optional( + Some(Timestamp::new_millisecond(1)), + Some(Timestamp::new_millisecond(3)) + ) + ); + + let timestamp_range: TimestampRange = + (Timestamp::new_millisecond(1)..=Timestamp::new_millisecond(3)).into(); + assert_eq!( + timestamp_range, + TimestampRange::from_optional( + Some(Timestamp::new_millisecond(1)), + Some(Timestamp::new_millisecond(4)) + ) + ); + + let timestamp_range: TimestampRange = (Timestamp::new_millisecond(1)..).into(); + assert_eq!( + timestamp_range, + TimestampRange::from_optional(Some(Timestamp::new_millisecond(1)), None) + ); + + let timestamp_range: TimestampRange = (..Timestamp::new_millisecond(3)).into(); + assert_eq!( + timestamp_range, + TimestampRange::from_optional(None, Some(Timestamp::new_millisecond(3))) + ); + + let timestamp_range: TimestampRange = (..=Timestamp::new_millisecond(3)).into(); + assert_eq!( + timestamp_range, + TimestampRange::from_optional(None, Some(Timestamp::new_millisecond(4))) + ); + + let timestamp_range: TimestampRange = (..).into(); + assert_eq!(timestamp_range, TimestampRange::min_to_max(),); + } } diff --git a/src/datanode/src/error.rs b/src/datanode/src/error.rs index b77eea293f..aa15cf8428 100644 --- a/src/datanode/src/error.rs +++ b/src/datanode/src/error.rs @@ -402,6 +402,9 @@ pub enum Error { #[snafu(implicit)] location: Location, }, + + #[snafu(display("Not yet implemented: {what}"))] + NotYetImplemented { what: String }, } pub type Result = std::result::Result; @@ -456,7 +459,7 @@ impl ErrorExt for Error { OpenLogStore { source, .. } => source.status_code(), MetaClientInit { source, .. } => source.status_code(), - UnsupportedOutput { .. } => StatusCode::Unsupported, + UnsupportedOutput { .. } | NotYetImplemented { .. } => StatusCode::Unsupported, HandleRegionRequest { source, .. } | GetRegionMetadata { source, .. } | HandleBatchOpenRequest { source, .. } diff --git a/src/datanode/src/tests.rs b/src/datanode/src/tests.rs index b5c59a35a5..870973247c 100644 --- a/src/datanode/src/tests.rs +++ b/src/datanode/src/tests.rs @@ -40,7 +40,7 @@ use store_api::storage::{RegionId, ScanRequest, SequenceNumber}; use table::TableRef; use tokio::sync::mpsc::{Receiver, Sender}; -use crate::error::Error; +use crate::error::{Error, NotYetImplementedSnafu}; use crate::event_listener::NoopRegionServerEventListener; use crate::region_server::RegionServer; @@ -232,7 +232,9 @@ impl RegionEngine for MockRegionEngine { _region_id: RegionId, _request: ScanRequest, ) -> Result { - unimplemented!() + Err(BoxedError::new( + NotYetImplementedSnafu { what: "blah" }.build(), + )) } async fn get_metadata(&self, region_id: RegionId) -> Result { diff --git a/src/mito2/src/extension.rs b/src/mito2/src/extension.rs index 2f93731873..728b423c05 100644 --- a/src/mito2/src/extension.rs +++ b/src/mito2/src/extension.rs @@ -1,3 +1,4 @@ +use std::fmt::Display; use std::sync::Arc; use async_trait::async_trait; @@ -19,7 +20,7 @@ pub type InclusiveTimeRange = (Timestamp, Timestamp); /// memtable range and sst file range, but resides on the outside. /// It can be scanned side by side as other ranges to produce the final result, so it's very useful /// to extend the source of data in GreptimeDB. -pub trait ExtensionRange: Send + Sync { +pub trait ExtensionRange: Display + Send + Sync { /// The number of rows in this range. fn num_rows(&self) -> u64; diff --git a/src/mito2/src/read/scan_region.rs b/src/mito2/src/read/scan_region.rs index a98777f9ce..e980478d03 100644 --- a/src/mito2/src/read/scan_region.rs +++ b/src/mito2/src/read/scan_region.rs @@ -1040,13 +1040,15 @@ impl StreamContext { /// Format the context for explain. pub(crate) fn format_for_explain(&self, verbose: bool, f: &mut fmt::Formatter) -> fmt::Result { - let (mut num_mem_ranges, mut num_file_ranges) = (0, 0); + let (mut num_mem_ranges, mut num_file_ranges, mut num_other_ranges) = (0, 0, 0); for range_meta in &self.ranges { for idx in &range_meta.row_group_indices { if self.is_mem_range_index(*idx) { num_mem_ranges += 1; - } else { + } else if self.is_file_range_index(*idx) { num_file_ranges += 1; + } else { + num_other_ranges += 1; } } } @@ -1055,12 +1057,17 @@ impl StreamContext { } write!( f, - "\"partition_count\":{{\"count\":{}, \"mem_ranges\":{}, \"files\":{}, \"file_ranges\":{}}}", + r#""partition_count":{{"count":{}, "mem_ranges":{}, "files":{}, "file_ranges":{}"#, self.ranges.len(), num_mem_ranges, self.input.num_files(), num_file_ranges, )?; + if num_other_ranges > 0 { + write!(f, r#"", other_ranges":{}"#, num_other_ranges)?; + } + write!(f, "}}")?; + if let Some(selector) = &self.input.series_row_selector { write!(f, ", \"selector\":\"{}\"", selector)?; } @@ -1102,6 +1109,24 @@ impl StreamContext { input: &'a ScanInput, } + #[cfg(feature = "enterprise")] + impl InputWrapper<'_> { + fn format_extension_ranges(&self, f: &mut fmt::Formatter) -> fmt::Result { + if self.input.extension_ranges.is_empty() { + return Ok(()); + } + + let mut delimiter = ""; + write!(f, ", extension_ranges: [")?; + for range in self.input.extension_ranges() { + write!(f, "{}{}", delimiter, range)?; + delimiter = ", "; + } + write!(f, "]")?; + Ok(()) + } + } + impl fmt::Debug for InputWrapper<'_> { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { let output_schema = self.input.mapper.output_schema(); @@ -1127,6 +1152,9 @@ impl StreamContext { .finish()?; } + #[cfg(feature = "enterprise")] + self.format_extension_ranges(f)?; + Ok(()) } } diff --git a/src/mito2/src/read/scan_util.rs b/src/mito2/src/read/scan_util.rs index ce4505ac7a..10c135201e 100644 --- a/src/mito2/src/read/scan_util.rs +++ b/src/mito2/src/read/scan_util.rs @@ -598,16 +598,17 @@ pub fn build_file_range_scan_stream( pub(crate) async fn scan_extension_range( context: Arc, index: RowGroupIndex, - metrics: PartitionMetrics, + partition_metrics: PartitionMetrics, ) -> Result { use snafu::ResultExt; let range = context.input.extension_range(index.index); let reader = range.reader(context.as_ref()); - reader - .read(context, metrics, index) + let stream = reader + .read(context, partition_metrics, index) .await - .context(crate::error::ScanExternalRangeSnafu) + .context(crate::error::ScanExternalRangeSnafu)?; + Ok(stream) } pub(crate) async fn maybe_scan_other_ranges(