feat: display extension ranges in "explain" (#6475)

* feat: display extension ranges in "explain"

Signed-off-by: luofucong <luofc@foxmail.com>

* fix ci

Signed-off-by: luofucong <luofc@foxmail.com>

---------

Signed-off-by: luofucong <luofc@foxmail.com>
This commit is contained in:
LFC
2025-07-09 10:11:23 +08:00
committed by GitHub
parent af03e89139
commit 6dc9e8ddb4
6 changed files with 153 additions and 11 deletions

View File

@@ -13,6 +13,7 @@
// limitations under the License.
use std::fmt::{Debug, Display, Formatter};
use std::ops::{Bound, RangeBounds};
use serde::{Deserialize, Serialize};
@@ -303,6 +304,41 @@ impl TimestampRange {
}
}
/// Create [TimestampRange] from a timestamp tuple.
/// The tuple's two elements form the "start" and "end" (both inclusive) of the timestamp range.
impl From<(Timestamp, Timestamp)> for TimestampRange {
fn from((start, end): (Timestamp, Timestamp)) -> Self {
if start > end {
Self::empty()
} else {
Self::new_inclusive(Some(start), Some(end))
}
}
}
/// Create [TimestampRange] from Rust's "range".
impl<R: RangeBounds<Timestamp>> From<R> for TimestampRange {
fn from(r: R) -> Self {
let start = match r.start_bound() {
Bound::Included(x) => Some(*x),
Bound::Excluded(x) => x
.value()
.checked_sub(1)
.map(|v| Timestamp::new(v, x.unit())),
Bound::Unbounded => None,
};
let end = match r.end_bound() {
Bound::Included(x) => x
.value()
.checked_add(1)
.map(|v| Timestamp::new(v, x.unit())),
Bound::Excluded(x) => Some(*x),
Bound::Unbounded => None,
};
Self::from_optional(start, end)
}
}
/// Time range in milliseconds.
pub type RangeMillis = GenericRange<TimestampMillis>;
@@ -545,4 +581,75 @@ mod tests {
TimeUnit::Nanosecond
);
}
#[test]
fn test_from_timestamp_tuple() {
let timestamp_range: TimestampRange =
(Timestamp::new_millisecond(1), Timestamp::new_millisecond(3)).into();
assert_eq!(
timestamp_range,
TimestampRange::from_optional(
Some(Timestamp::new_millisecond(1)),
Some(Timestamp::new_millisecond(4))
)
);
let timestamp_range: TimestampRange =
(Timestamp::new_millisecond(1), Timestamp::new_millisecond(1)).into();
assert_eq!(
timestamp_range,
TimestampRange::from_optional(
Some(Timestamp::new_millisecond(1)),
Some(Timestamp::new_millisecond(2))
)
);
let timestamp_range: TimestampRange =
(Timestamp::new_second(1), Timestamp::new_millisecond(3)).into();
assert_eq!(timestamp_range, TimestampRange::empty());
}
#[test]
fn test_from_timestamp_range() {
let timestamp_range: TimestampRange =
(Timestamp::new_millisecond(1)..Timestamp::new_millisecond(3)).into();
assert_eq!(
timestamp_range,
TimestampRange::from_optional(
Some(Timestamp::new_millisecond(1)),
Some(Timestamp::new_millisecond(3))
)
);
let timestamp_range: TimestampRange =
(Timestamp::new_millisecond(1)..=Timestamp::new_millisecond(3)).into();
assert_eq!(
timestamp_range,
TimestampRange::from_optional(
Some(Timestamp::new_millisecond(1)),
Some(Timestamp::new_millisecond(4))
)
);
let timestamp_range: TimestampRange = (Timestamp::new_millisecond(1)..).into();
assert_eq!(
timestamp_range,
TimestampRange::from_optional(Some(Timestamp::new_millisecond(1)), None)
);
let timestamp_range: TimestampRange = (..Timestamp::new_millisecond(3)).into();
assert_eq!(
timestamp_range,
TimestampRange::from_optional(None, Some(Timestamp::new_millisecond(3)))
);
let timestamp_range: TimestampRange = (..=Timestamp::new_millisecond(3)).into();
assert_eq!(
timestamp_range,
TimestampRange::from_optional(None, Some(Timestamp::new_millisecond(4)))
);
let timestamp_range: TimestampRange = (..).into();
assert_eq!(timestamp_range, TimestampRange::min_to_max(),);
}
}

View File

@@ -402,6 +402,9 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Not yet implemented: {what}"))]
NotYetImplemented { what: String },
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -456,7 +459,7 @@ impl ErrorExt for Error {
OpenLogStore { source, .. } => source.status_code(),
MetaClientInit { source, .. } => source.status_code(),
UnsupportedOutput { .. } => StatusCode::Unsupported,
UnsupportedOutput { .. } | NotYetImplemented { .. } => StatusCode::Unsupported,
HandleRegionRequest { source, .. }
| GetRegionMetadata { source, .. }
| HandleBatchOpenRequest { source, .. }

View File

@@ -40,7 +40,7 @@ use store_api::storage::{RegionId, ScanRequest, SequenceNumber};
use table::TableRef;
use tokio::sync::mpsc::{Receiver, Sender};
use crate::error::Error;
use crate::error::{Error, NotYetImplementedSnafu};
use crate::event_listener::NoopRegionServerEventListener;
use crate::region_server::RegionServer;
@@ -232,7 +232,9 @@ impl RegionEngine for MockRegionEngine {
_region_id: RegionId,
_request: ScanRequest,
) -> Result<RegionScannerRef, BoxedError> {
unimplemented!()
Err(BoxedError::new(
NotYetImplementedSnafu { what: "blah" }.build(),
))
}
async fn get_metadata(&self, region_id: RegionId) -> Result<RegionMetadataRef, BoxedError> {

View File

@@ -1,3 +1,4 @@
use std::fmt::Display;
use std::sync::Arc;
use async_trait::async_trait;
@@ -19,7 +20,7 @@ pub type InclusiveTimeRange = (Timestamp, Timestamp);
/// memtable range and sst file range, but resides on the outside.
/// It can be scanned side by side as other ranges to produce the final result, so it's very useful
/// to extend the source of data in GreptimeDB.
pub trait ExtensionRange: Send + Sync {
pub trait ExtensionRange: Display + Send + Sync {
/// The number of rows in this range.
fn num_rows(&self) -> u64;

View File

@@ -1040,13 +1040,15 @@ impl StreamContext {
/// Format the context for explain.
pub(crate) fn format_for_explain(&self, verbose: bool, f: &mut fmt::Formatter) -> fmt::Result {
let (mut num_mem_ranges, mut num_file_ranges) = (0, 0);
let (mut num_mem_ranges, mut num_file_ranges, mut num_other_ranges) = (0, 0, 0);
for range_meta in &self.ranges {
for idx in &range_meta.row_group_indices {
if self.is_mem_range_index(*idx) {
num_mem_ranges += 1;
} else {
} else if self.is_file_range_index(*idx) {
num_file_ranges += 1;
} else {
num_other_ranges += 1;
}
}
}
@@ -1055,12 +1057,17 @@ impl StreamContext {
}
write!(
f,
"\"partition_count\":{{\"count\":{}, \"mem_ranges\":{}, \"files\":{}, \"file_ranges\":{}}}",
r#""partition_count":{{"count":{}, "mem_ranges":{}, "files":{}, "file_ranges":{}"#,
self.ranges.len(),
num_mem_ranges,
self.input.num_files(),
num_file_ranges,
)?;
if num_other_ranges > 0 {
write!(f, r#"", other_ranges":{}"#, num_other_ranges)?;
}
write!(f, "}}")?;
if let Some(selector) = &self.input.series_row_selector {
write!(f, ", \"selector\":\"{}\"", selector)?;
}
@@ -1102,6 +1109,24 @@ impl StreamContext {
input: &'a ScanInput,
}
#[cfg(feature = "enterprise")]
impl InputWrapper<'_> {
fn format_extension_ranges(&self, f: &mut fmt::Formatter) -> fmt::Result {
if self.input.extension_ranges.is_empty() {
return Ok(());
}
let mut delimiter = "";
write!(f, ", extension_ranges: [")?;
for range in self.input.extension_ranges() {
write!(f, "{}{}", delimiter, range)?;
delimiter = ", ";
}
write!(f, "]")?;
Ok(())
}
}
impl fmt::Debug for InputWrapper<'_> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
let output_schema = self.input.mapper.output_schema();
@@ -1127,6 +1152,9 @@ impl StreamContext {
.finish()?;
}
#[cfg(feature = "enterprise")]
self.format_extension_ranges(f)?;
Ok(())
}
}

View File

@@ -598,16 +598,17 @@ pub fn build_file_range_scan_stream(
pub(crate) async fn scan_extension_range(
context: Arc<StreamContext>,
index: RowGroupIndex,
metrics: PartitionMetrics,
partition_metrics: PartitionMetrics,
) -> Result<BoxedBatchStream> {
use snafu::ResultExt;
let range = context.input.extension_range(index.index);
let reader = range.reader(context.as_ref());
reader
.read(context, metrics, index)
let stream = reader
.read(context, partition_metrics, index)
.await
.context(crate::error::ScanExternalRangeSnafu)
.context(crate::error::ScanExternalRangeSnafu)?;
Ok(stream)
}
pub(crate) async fn maybe_scan_other_ranges(