feat: add TimeSeriesRowSelector hint (#4327)

* feat: Add TimeSeriesRowSelector

* feat: scan allow specify series row selector

* chore: Update comment
This commit is contained in:
Yingwen
2024-07-09 20:29:47 +08:00
committed by GitHub
parent 1ddf19d886
commit 458e5d7e66
5 changed files with 30 additions and 3 deletions

View File

@@ -348,6 +348,7 @@ impl MetadataRegion {
filters: vec![],
output_ordering: None,
limit: None,
series_row_selector: None,
};
let record_batch_stream = self
.mito
@@ -405,6 +406,7 @@ impl MetadataRegion {
filters: vec![filter_expr],
output_ordering: None,
limit: None,
series_row_selector: None,
}
}
@@ -565,6 +567,7 @@ mod test {
filters: vec![expected_filter_expr],
output_ordering: None,
limit: None,
series_row_selector: None,
};
let actual_scan_request = MetadataRegion::build_read_request(key);
assert_eq!(actual_scan_request, expected_scan_request);

View File

@@ -78,6 +78,7 @@ async fn test_scan_projection() {
filters: Vec::new(),
output_ordering: None,
limit: None,
series_row_selector: None,
};
let stream = engine.scan_to_stream(region_id, request).await.unwrap();
let batches = RecordBatches::try_collect(stream).await.unwrap();

View File

@@ -26,7 +26,7 @@ use common_time::Timestamp;
use datafusion::physical_plan::DisplayFormatType;
use smallvec::SmallVec;
use store_api::region_engine::RegionScannerRef;
use store_api::storage::ScanRequest;
use store_api::storage::{ScanRequest, TimeSeriesRowSelector};
use table::predicate::{build_time_range_predicate, Predicate};
use tokio::sync::{mpsc, Mutex, Semaphore};
use tokio_stream::wrappers::ReceiverStream;
@@ -297,7 +297,8 @@ impl ScanRegion {
.with_start_time(self.start_time)
.with_append_mode(self.version.options.append_mode)
.with_filter_deleted(filter_deleted)
.with_merge_mode(self.version.options.merge_mode());
.with_merge_mode(self.version.options.merge_mode())
.with_series_row_selector(self.request.series_row_selector.clone());
Ok(input)
}
@@ -410,6 +411,8 @@ pub(crate) struct ScanInput {
pub(crate) filter_deleted: bool,
/// Mode to merge duplicate rows.
pub(crate) merge_mode: MergeMode,
/// Hint to select rows from time series.
pub(crate) series_row_selector: Option<TimeSeriesRowSelector>,
}
impl ScanInput {
@@ -431,6 +434,7 @@ impl ScanInput {
append_mode: false,
filter_deleted: true,
merge_mode: MergeMode::default(),
series_row_selector: None,
}
}
@@ -517,6 +521,16 @@ impl ScanInput {
self
}
/// Sets the time series row selector.
#[must_use]
pub(crate) fn with_series_row_selector(
mut self,
series_row_selector: Option<TimeSeriesRowSelector>,
) -> Self {
self.series_row_selector = series_row_selector;
self
}
/// Scans sources in parallel.
///
/// # Panics if the input doesn't allow parallel scan.

View File

@@ -25,5 +25,5 @@ pub use datatypes::schema::{
};
pub use self::descriptors::*;
pub use self::requests::ScanRequest;
pub use self::requests::{ScanRequest, TimeSeriesRowSelector};
pub use self::types::SequenceNumber;

View File

@@ -15,6 +15,13 @@
use common_recordbatch::OrderOption;
use datafusion_expr::expr::Expr;
/// A hint on how to select rows from a time-series.
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum TimeSeriesRowSelector {
/// Only keep the last row of each time-series.
LastRow,
}
#[derive(Default, Clone, Debug, PartialEq, Eq)]
pub struct ScanRequest {
/// Indices of columns to read, `None` to read all columns. This indices is
@@ -29,4 +36,6 @@ pub struct ScanRequest {
/// If set, it contains the amount of rows needed by the caller,
/// The data source should return *at least* this number of rows if available.
pub limit: Option<usize>,
/// Optional hint to select rows from time-series.
pub series_row_selector: Option<TimeSeriesRowSelector>,
}