mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-08 14:22:58 +00:00
feat: time series distribution in scanner (#5675)
* define distribution Signed-off-by: Ruihang Xia <waynestxia@gmail.com> * feat: SeqScan support per series distribution * probe distribution Signed-off-by: Ruihang Xia <waynestxia@gmail.com> * reverse sort order Signed-off-by: Ruihang Xia <waynestxia@gmail.com> * more strict check Signed-off-by: Ruihang Xia <waynestxia@gmail.com> * fix clippy Signed-off-by: Ruihang Xia <waynestxia@gmail.com> * change null's ordering Signed-off-by: Ruihang Xia <waynestxia@gmail.com> --------- Signed-off-by: Ruihang Xia <waynestxia@gmail.com> Co-authored-by: evenyag <realevenyag@gmail.com>
This commit is contained in:
@@ -338,6 +338,7 @@ impl MetadataRegion {
|
||||
limit: None,
|
||||
series_row_selector: None,
|
||||
sequence: None,
|
||||
distribution: None,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -527,6 +528,7 @@ impl MetadataRegion {
|
||||
limit: None,
|
||||
series_row_selector: None,
|
||||
sequence: None,
|
||||
distribution: None,
|
||||
};
|
||||
let record_batch_stream = self
|
||||
.mito
|
||||
|
||||
@@ -80,6 +80,7 @@ async fn test_scan_projection() {
|
||||
limit: None,
|
||||
series_row_selector: None,
|
||||
sequence: None,
|
||||
distribution: None,
|
||||
};
|
||||
let stream = engine.scan_to_stream(region_id, request).await.unwrap();
|
||||
let batches = RecordBatches::try_collect(stream).await.unwrap();
|
||||
|
||||
@@ -21,6 +21,7 @@ use common_time::Timestamp;
|
||||
use parquet::arrow::arrow_reader::RowSelection;
|
||||
use smallvec::{smallvec, SmallVec};
|
||||
use store_api::region_engine::PartitionRange;
|
||||
use store_api::storage::TimeSeriesDistribution;
|
||||
|
||||
use crate::cache::CacheStrategy;
|
||||
use crate::error::Result;
|
||||
@@ -98,8 +99,8 @@ impl RangeMeta {
|
||||
Self::push_seq_file_ranges(input.memtables.len(), &input.files, &mut ranges);
|
||||
|
||||
let ranges = group_ranges_for_seq_scan(ranges);
|
||||
if compaction {
|
||||
// We don't split ranges in compaction.
|
||||
if compaction || input.distribution == Some(TimeSeriesDistribution::PerSeries) {
|
||||
// We don't split ranges in compaction or TimeSeriesDistribution::PerSeries.
|
||||
return ranges;
|
||||
}
|
||||
maybe_split_ranges_for_seq_scan(ranges)
|
||||
|
||||
@@ -31,7 +31,7 @@ use datafusion_expr::Expr;
|
||||
use smallvec::SmallVec;
|
||||
use store_api::metadata::RegionMetadata;
|
||||
use store_api::region_engine::{PartitionRange, RegionScannerRef};
|
||||
use store_api::storage::{ScanRequest, TimeSeriesRowSelector};
|
||||
use store_api::storage::{ScanRequest, TimeSeriesDistribution, TimeSeriesRowSelector};
|
||||
use table::predicate::{build_time_range_predicate, Predicate};
|
||||
use tokio::sync::{mpsc, Semaphore};
|
||||
use tokio_stream::wrappers::ReceiverStream;
|
||||
@@ -287,9 +287,16 @@ impl ScanRegion {
|
||||
|
||||
/// Returns true if the region can use unordered scan for current request.
|
||||
fn use_unordered_scan(&self) -> bool {
|
||||
// If table is append only and there is no series row selector, we use unordered scan in query.
|
||||
// We use unordered scan when:
|
||||
// 1. The region is in append mode.
|
||||
// 2. There is no series row selector.
|
||||
// 3. The required distribution is None or TimeSeriesDistribution::TimeWindowed.
|
||||
//
|
||||
// We still use seq scan in compaction.
|
||||
self.version.options.append_mode && self.request.series_row_selector.is_none()
|
||||
self.version.options.append_mode
|
||||
&& self.request.series_row_selector.is_none()
|
||||
&& (self.request.distribution.is_none()
|
||||
|| self.request.distribution == Some(TimeSeriesDistribution::TimeWindowed))
|
||||
}
|
||||
|
||||
/// Creates a scan input.
|
||||
@@ -377,7 +384,8 @@ impl ScanRegion {
|
||||
.with_append_mode(self.version.options.append_mode)
|
||||
.with_filter_deleted(filter_deleted)
|
||||
.with_merge_mode(self.version.options.merge_mode())
|
||||
.with_series_row_selector(self.request.series_row_selector);
|
||||
.with_series_row_selector(self.request.series_row_selector)
|
||||
.with_distribution(self.request.distribution);
|
||||
Ok(input)
|
||||
}
|
||||
|
||||
@@ -557,6 +565,8 @@ pub(crate) struct ScanInput {
|
||||
pub(crate) merge_mode: MergeMode,
|
||||
/// Hint to select rows from time series.
|
||||
pub(crate) series_row_selector: Option<TimeSeriesRowSelector>,
|
||||
/// Hint for the required distribution of the scanner.
|
||||
pub(crate) distribution: Option<TimeSeriesDistribution>,
|
||||
}
|
||||
|
||||
impl ScanInput {
|
||||
@@ -581,6 +591,7 @@ impl ScanInput {
|
||||
filter_deleted: true,
|
||||
merge_mode: MergeMode::default(),
|
||||
series_row_selector: None,
|
||||
distribution: None,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -693,6 +704,16 @@ impl ScanInput {
|
||||
self
|
||||
}
|
||||
|
||||
/// Sets the distribution hint.
|
||||
#[must_use]
|
||||
pub(crate) fn with_distribution(
|
||||
mut self,
|
||||
distribution: Option<TimeSeriesDistribution>,
|
||||
) -> Self {
|
||||
self.distribution = distribution;
|
||||
self
|
||||
}
|
||||
|
||||
/// Sets the time series row selector.
|
||||
#[must_use]
|
||||
pub(crate) fn with_series_row_selector(
|
||||
|
||||
@@ -29,7 +29,7 @@ use datatypes::schema::SchemaRef;
|
||||
use snafu::ResultExt;
|
||||
use store_api::metadata::RegionMetadataRef;
|
||||
use store_api::region_engine::{PartitionRange, PrepareRequest, RegionScanner, ScannerProperties};
|
||||
use store_api::storage::TimeSeriesRowSelector;
|
||||
use store_api::storage::{TimeSeriesDistribution, TimeSeriesRowSelector};
|
||||
use tokio::sync::Semaphore;
|
||||
|
||||
use crate::error::{PartitionOutOfRangeSnafu, Result};
|
||||
@@ -206,32 +206,16 @@ impl SeqScan {
|
||||
));
|
||||
}
|
||||
|
||||
if self.stream_ctx.input.distribution == Some(TimeSeriesDistribution::PerSeries) {
|
||||
return self.scan_partition_by_series(partition);
|
||||
}
|
||||
|
||||
let stream_ctx = self.stream_ctx.clone();
|
||||
let semaphore = if self.properties.target_partitions() > self.properties.num_partitions() {
|
||||
// We can use additional tasks to read the data if we have more target partitions than actual partitions.
|
||||
// This semaphore is partition level.
|
||||
// We don't use a global semaphore to avoid a partition waiting for others. The final concurrency
|
||||
// of tasks usually won't exceed the target partitions a lot as compaction can reduce the number of
|
||||
// files in a part range.
|
||||
Some(Arc::new(Semaphore::new(
|
||||
self.properties.target_partitions() - self.properties.num_partitions() + 1,
|
||||
)))
|
||||
} else {
|
||||
None
|
||||
};
|
||||
let semaphore = self.new_semaphore();
|
||||
let partition_ranges = self.properties.partitions[partition].clone();
|
||||
let compaction = self.compaction;
|
||||
let distinguish_range = self.properties.distinguish_partition_range;
|
||||
let part_metrics = PartitionMetrics::new(
|
||||
self.stream_ctx.input.mapper.metadata().region_id,
|
||||
partition,
|
||||
get_scanner_type(self.compaction),
|
||||
stream_ctx.query_start,
|
||||
ScannerMetrics {
|
||||
prepare_scan_cost: self.stream_ctx.query_start.elapsed(),
|
||||
..Default::default()
|
||||
},
|
||||
);
|
||||
let part_metrics = self.new_partition_metrics(partition);
|
||||
|
||||
let stream = try_stream! {
|
||||
part_metrics.on_first_poll();
|
||||
@@ -321,6 +305,124 @@ impl SeqScan {
|
||||
|
||||
Ok(stream)
|
||||
}
|
||||
|
||||
/// Scans all ranges in the given partition and merge by time series.
|
||||
/// Otherwise the returned stream might not contains any data.
|
||||
fn scan_partition_by_series(
|
||||
&self,
|
||||
partition: usize,
|
||||
) -> Result<SendableRecordBatchStream, BoxedError> {
|
||||
let stream_ctx = self.stream_ctx.clone();
|
||||
let semaphore = self.new_semaphore();
|
||||
let partition_ranges = self.properties.partitions[partition].clone();
|
||||
let distinguish_range = self.properties.distinguish_partition_range;
|
||||
let part_metrics = self.new_partition_metrics(partition);
|
||||
debug_assert!(!self.compaction);
|
||||
|
||||
let stream = try_stream! {
|
||||
part_metrics.on_first_poll();
|
||||
|
||||
let range_builder_list = Arc::new(RangeBuilderList::new(
|
||||
stream_ctx.input.num_memtables(),
|
||||
stream_ctx.input.num_files(),
|
||||
));
|
||||
// Scans all parts.
|
||||
let mut sources = Vec::with_capacity(partition_ranges.len());
|
||||
for part_range in partition_ranges {
|
||||
build_sources(
|
||||
&stream_ctx,
|
||||
&part_range,
|
||||
false,
|
||||
&part_metrics,
|
||||
range_builder_list.clone(),
|
||||
&mut sources,
|
||||
);
|
||||
}
|
||||
|
||||
// Builds a reader that merge sources from all parts.
|
||||
let mut reader =
|
||||
Self::build_reader_from_sources(&stream_ctx, sources, semaphore.clone())
|
||||
.await
|
||||
.map_err(BoxedError::new)
|
||||
.context(ExternalSnafu)?;
|
||||
let cache = &stream_ctx.input.cache_strategy;
|
||||
let mut metrics = ScannerMetrics::default();
|
||||
let mut fetch_start = Instant::now();
|
||||
|
||||
while let Some(batch) = reader
|
||||
.next_batch()
|
||||
.await
|
||||
.map_err(BoxedError::new)
|
||||
.context(ExternalSnafu)?
|
||||
{
|
||||
metrics.scan_cost += fetch_start.elapsed();
|
||||
metrics.num_batches += 1;
|
||||
metrics.num_rows += batch.num_rows();
|
||||
|
||||
debug_assert!(!batch.is_empty());
|
||||
if batch.is_empty() {
|
||||
continue;
|
||||
}
|
||||
|
||||
let convert_start = Instant::now();
|
||||
let record_batch = stream_ctx.input.mapper.convert(&batch, cache)?;
|
||||
metrics.convert_cost += convert_start.elapsed();
|
||||
let yield_start = Instant::now();
|
||||
yield record_batch;
|
||||
metrics.yield_cost += yield_start.elapsed();
|
||||
|
||||
fetch_start = Instant::now();
|
||||
}
|
||||
|
||||
// Yields an empty part to indicate this range is terminated.
|
||||
// The query engine can use this to optimize some queries.
|
||||
if distinguish_range {
|
||||
let yield_start = Instant::now();
|
||||
yield stream_ctx.input.mapper.empty_record_batch();
|
||||
metrics.yield_cost += yield_start.elapsed();
|
||||
}
|
||||
|
||||
metrics.scan_cost += fetch_start.elapsed();
|
||||
part_metrics.merge_metrics(&metrics);
|
||||
|
||||
part_metrics.on_finish();
|
||||
};
|
||||
|
||||
let stream = Box::pin(RecordBatchStreamWrapper::new(
|
||||
self.stream_ctx.input.mapper.output_schema(),
|
||||
Box::pin(stream),
|
||||
));
|
||||
|
||||
Ok(stream)
|
||||
}
|
||||
|
||||
fn new_semaphore(&self) -> Option<Arc<Semaphore>> {
|
||||
if self.properties.target_partitions() > self.properties.num_partitions() {
|
||||
// We can use additional tasks to read the data if we have more target partitions than actual partitions.
|
||||
// This semaphore is partition level.
|
||||
// We don't use a global semaphore to avoid a partition waiting for others. The final concurrency
|
||||
// of tasks usually won't exceed the target partitions a lot as compaction can reduce the number of
|
||||
// files in a part range.
|
||||
Some(Arc::new(Semaphore::new(
|
||||
self.properties.target_partitions() - self.properties.num_partitions() + 1,
|
||||
)))
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
fn new_partition_metrics(&self, partition: usize) -> PartitionMetrics {
|
||||
PartitionMetrics::new(
|
||||
self.stream_ctx.input.mapper.metadata().region_id,
|
||||
partition,
|
||||
get_scanner_type(self.compaction),
|
||||
self.stream_ctx.query_start,
|
||||
ScannerMetrics {
|
||||
prepare_scan_cost: self.stream_ctx.query_start.elapsed(),
|
||||
..Default::default()
|
||||
},
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
impl RegionScanner for SeqScan {
|
||||
@@ -370,7 +472,7 @@ impl fmt::Debug for SeqScan {
|
||||
}
|
||||
}
|
||||
|
||||
/// Builds sources for the partition range.
|
||||
/// Builds sources for the partition range and push them to the `sources` vector.
|
||||
fn build_sources(
|
||||
stream_ctx: &Arc<StreamContext>,
|
||||
part_range: &PartitionRange,
|
||||
@@ -382,8 +484,8 @@ fn build_sources(
|
||||
// Gets range meta.
|
||||
let range_meta = &stream_ctx.ranges[part_range.identifier];
|
||||
#[cfg(debug_assertions)]
|
||||
if compaction {
|
||||
// Compaction expects input sources are not been split.
|
||||
if compaction || stream_ctx.input.distribution == Some(TimeSeriesDistribution::PerSeries) {
|
||||
// Compaction or per series distribution expects input sources are not been split.
|
||||
debug_assert_eq!(range_meta.indices.len(), range_meta.row_group_indices.len());
|
||||
for (i, row_group_idx) in range_meta.row_group_indices.iter().enumerate() {
|
||||
// It should scan all row groups.
|
||||
|
||||
@@ -31,7 +31,7 @@ use datatypes::arrow::datatypes::SchemaRef;
|
||||
use snafu::ResultExt;
|
||||
use store_api::metadata::RegionMetadataRef;
|
||||
use store_api::region_engine::RegionEngineRef;
|
||||
use store_api::storage::{RegionId, ScanRequest, TimeSeriesRowSelector};
|
||||
use store_api::storage::{RegionId, ScanRequest, TimeSeriesDistribution, TimeSeriesRowSelector};
|
||||
use table::table::scan::RegionScanExec;
|
||||
|
||||
use crate::error::{GetRegionMetadataSnafu, Result};
|
||||
@@ -175,10 +175,10 @@ impl TableProvider for DummyTableProvider {
|
||||
|
||||
let scanner = self
|
||||
.engine
|
||||
.handle_query(self.region_id, request)
|
||||
.handle_query(self.region_id, request.clone())
|
||||
.await
|
||||
.map_err(|e| DataFusionError::External(Box::new(e)))?;
|
||||
Ok(Arc::new(RegionScanExec::new(scanner)))
|
||||
Ok(Arc::new(RegionScanExec::new(scanner, request)?))
|
||||
}
|
||||
|
||||
fn supports_filters_pushdown(
|
||||
@@ -233,6 +233,11 @@ impl DummyTableProvider {
|
||||
self.scan_request.lock().unwrap().output_ordering = Some(order_opts.to_vec());
|
||||
}
|
||||
|
||||
/// Sets the distribution hint of the query to the provider.
|
||||
pub fn with_distribution(&self, distribution: TimeSeriesDistribution) {
|
||||
self.scan_request.lock().unwrap().distribution = Some(distribution);
|
||||
}
|
||||
|
||||
/// Sets the time series selector hint of the query to the provider.
|
||||
pub fn with_time_series_selector_hint(&self, selector: TimeSeriesRowSelector) {
|
||||
self.scan_request.lock().unwrap().series_row_selector = Some(selector);
|
||||
|
||||
@@ -23,6 +23,7 @@ use datafusion::physical_plan::ExecutionPlan;
|
||||
use datafusion_common::tree_node::{Transformed, TreeNode};
|
||||
use datafusion_common::{DataFusionError, Result};
|
||||
use store_api::region_engine::PartitionRange;
|
||||
use store_api::storage::TimeSeriesDistribution;
|
||||
use table::table::scan::RegionScanExec;
|
||||
|
||||
#[derive(Debug)]
|
||||
@@ -65,6 +66,14 @@ impl ParallelizeScan {
|
||||
return Ok(Transformed::no(plan));
|
||||
}
|
||||
|
||||
// don't parallelize if we want per series distribution
|
||||
if matches!(
|
||||
region_scan_exec.distribution(),
|
||||
Some(TimeSeriesDistribution::PerSeries)
|
||||
) {
|
||||
return Ok(Transformed::no(plan));
|
||||
}
|
||||
|
||||
let ranges = region_scan_exec.get_partition_ranges();
|
||||
let total_range_num = ranges.len();
|
||||
let expected_partition_num = config.execution.target_partitions;
|
||||
|
||||
@@ -23,7 +23,7 @@ use datafusion_common::{Column, Result};
|
||||
use datafusion_expr::expr::Sort;
|
||||
use datafusion_expr::{utils, Expr, LogicalPlan};
|
||||
use datafusion_optimizer::{OptimizerConfig, OptimizerRule};
|
||||
use store_api::storage::TimeSeriesRowSelector;
|
||||
use store_api::storage::{TimeSeriesDistribution, TimeSeriesRowSelector};
|
||||
|
||||
use crate::dummy_catalog::DummyTableProvider;
|
||||
|
||||
@@ -121,6 +121,36 @@ impl ScanHintRule {
|
||||
});
|
||||
}
|
||||
adapter.with_ordering_hint(&opts);
|
||||
|
||||
let mut sort_expr_cursor = order_expr.iter().filter_map(|s| s.expr.try_as_col());
|
||||
let region_metadata = adapter.region_metadata();
|
||||
// ignore table without pk
|
||||
if region_metadata.primary_key.is_empty() {
|
||||
return;
|
||||
}
|
||||
let mut pk_column_iter = region_metadata.primary_key_columns();
|
||||
let mut curr_sort_expr = sort_expr_cursor.next();
|
||||
let mut curr_pk_col = pk_column_iter.next();
|
||||
|
||||
while let (Some(sort_expr), Some(pk_col)) = (curr_sort_expr, curr_pk_col) {
|
||||
if sort_expr.name == pk_col.column_schema.name {
|
||||
curr_sort_expr = sort_expr_cursor.next();
|
||||
curr_pk_col = pk_column_iter.next();
|
||||
} else {
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
let next_remaining = sort_expr_cursor.next();
|
||||
match (curr_sort_expr, next_remaining) {
|
||||
(Some(expr), None)
|
||||
if expr.name == region_metadata.time_index_column().column_schema.name =>
|
||||
{
|
||||
adapter.with_distribution(TimeSeriesDistribution::PerSeries);
|
||||
}
|
||||
(None, _) => adapter.with_distribution(TimeSeriesDistribution::PerSeries),
|
||||
(Some(_), _) => {}
|
||||
}
|
||||
}
|
||||
|
||||
fn set_time_series_row_selector_hint(
|
||||
|
||||
@@ -25,5 +25,5 @@ pub use datatypes::schema::{
|
||||
};
|
||||
|
||||
pub use self::descriptors::*;
|
||||
pub use self::requests::{ScanRequest, TimeSeriesRowSelector};
|
||||
pub use self::requests::{ScanRequest, TimeSeriesDistribution, TimeSeriesRowSelector};
|
||||
pub use self::types::SequenceNumber;
|
||||
|
||||
@@ -25,6 +25,17 @@ pub enum TimeSeriesRowSelector {
|
||||
LastRow,
|
||||
}
|
||||
|
||||
/// A hint on how to distribute time-series data on the scan output.
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Display)]
|
||||
pub enum TimeSeriesDistribution {
|
||||
/// Data are distributed by time window first. The scanner will
|
||||
/// return all data within one time window before moving to the next one.
|
||||
TimeWindowed,
|
||||
/// Data are organized by time-series first. The scanner will return
|
||||
/// all data for one time-series before moving to the next one.
|
||||
PerSeries,
|
||||
}
|
||||
|
||||
#[derive(Default, Clone, Debug, PartialEq, Eq)]
|
||||
pub struct ScanRequest {
|
||||
/// Indices of columns to read, `None` to read all columns. This indices is
|
||||
@@ -45,4 +56,6 @@ pub struct ScanRequest {
|
||||
/// If set, only rows with a sequence number lesser or equal to this value
|
||||
/// will be returned.
|
||||
pub sequence: Option<SequenceNumber>,
|
||||
/// Optional hint for the distribution of time-series data.
|
||||
pub distribution: Option<TimeSeriesDistribution>,
|
||||
}
|
||||
|
||||
@@ -13,6 +13,7 @@
|
||||
// limitations under the License.
|
||||
|
||||
#![feature(assert_matches)]
|
||||
#![feature(try_blocks)]
|
||||
|
||||
pub mod dist_table;
|
||||
pub mod error;
|
||||
|
||||
@@ -32,10 +32,15 @@ use datafusion::physical_plan::{
|
||||
};
|
||||
use datafusion_common::stats::Precision;
|
||||
use datafusion_common::{ColumnStatistics, DataFusionError, Statistics};
|
||||
use datafusion_physical_expr::{EquivalenceProperties, Partitioning, PhysicalSortExpr};
|
||||
use datafusion_physical_expr::expressions::Column;
|
||||
use datafusion_physical_expr::{
|
||||
EquivalenceProperties, LexOrdering, Partitioning, PhysicalSortExpr,
|
||||
};
|
||||
use datatypes::arrow::datatypes::SchemaRef as ArrowSchemaRef;
|
||||
use datatypes::compute::SortOptions;
|
||||
use futures::{Stream, StreamExt};
|
||||
use store_api::region_engine::{PartitionRange, PrepareRequest, RegionScannerRef};
|
||||
use store_api::storage::{ScanRequest, TimeSeriesDistribution};
|
||||
|
||||
use crate::table::metrics::StreamMetrics;
|
||||
|
||||
@@ -51,10 +56,12 @@ pub struct RegionScanExec {
|
||||
append_mode: bool,
|
||||
total_rows: usize,
|
||||
is_partition_set: bool,
|
||||
// TODO(ruihang): handle TimeWindowed dist via this parameter
|
||||
distribution: Option<TimeSeriesDistribution>,
|
||||
}
|
||||
|
||||
impl RegionScanExec {
|
||||
pub fn new(scanner: RegionScannerRef) -> Self {
|
||||
pub fn new(scanner: RegionScannerRef, request: ScanRequest) -> DfResult<Self> {
|
||||
let arrow_schema = scanner.schema().arrow_schema().clone();
|
||||
let scanner_props = scanner.properties();
|
||||
let mut num_output_partition = scanner_props.num_partitions();
|
||||
@@ -64,14 +71,67 @@ impl RegionScanExec {
|
||||
if num_output_partition == 0 {
|
||||
num_output_partition = 1;
|
||||
}
|
||||
|
||||
let metadata = scanner.metadata();
|
||||
let mut pk_columns: Vec<PhysicalSortExpr> = metadata
|
||||
.primary_key_columns()
|
||||
.filter_map(|col| {
|
||||
Some(PhysicalSortExpr::new(
|
||||
Arc::new(Column::new_with_schema(&col.column_schema.name, &arrow_schema).ok()?)
|
||||
as _,
|
||||
SortOptions {
|
||||
descending: false,
|
||||
nulls_first: true,
|
||||
},
|
||||
))
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
let ts_col: Option<PhysicalSortExpr> = try {
|
||||
PhysicalSortExpr::new(
|
||||
Arc::new(
|
||||
Column::new_with_schema(
|
||||
&metadata.time_index_column().column_schema.name,
|
||||
&arrow_schema,
|
||||
)
|
||||
.ok()?,
|
||||
) as _,
|
||||
SortOptions {
|
||||
descending: false,
|
||||
nulls_first: true,
|
||||
},
|
||||
)
|
||||
};
|
||||
|
||||
let eq_props = match request.distribution {
|
||||
Some(TimeSeriesDistribution::PerSeries) => {
|
||||
if let Some(ts) = ts_col {
|
||||
pk_columns.push(ts);
|
||||
}
|
||||
EquivalenceProperties::new_with_orderings(
|
||||
arrow_schema.clone(),
|
||||
&[LexOrdering::new(pk_columns)],
|
||||
)
|
||||
}
|
||||
Some(TimeSeriesDistribution::TimeWindowed) => {
|
||||
if let Some(ts_col) = ts_col {
|
||||
pk_columns.insert(0, ts_col);
|
||||
}
|
||||
EquivalenceProperties::new_with_orderings(
|
||||
arrow_schema.clone(),
|
||||
&[LexOrdering::new(pk_columns)],
|
||||
)
|
||||
}
|
||||
None => EquivalenceProperties::new(arrow_schema.clone()),
|
||||
};
|
||||
|
||||
let properties = PlanProperties::new(
|
||||
EquivalenceProperties::new(arrow_schema.clone()),
|
||||
eq_props,
|
||||
Partitioning::UnknownPartitioning(num_output_partition),
|
||||
ExecutionMode::Bounded,
|
||||
);
|
||||
let append_mode = scanner_props.append_mode();
|
||||
let total_rows = scanner_props.total_rows();
|
||||
Self {
|
||||
Ok(Self {
|
||||
scanner: Arc::new(Mutex::new(scanner)),
|
||||
arrow_schema,
|
||||
output_ordering: None,
|
||||
@@ -80,7 +140,8 @@ impl RegionScanExec {
|
||||
append_mode,
|
||||
total_rows,
|
||||
is_partition_set: false,
|
||||
}
|
||||
distribution: request.distribution,
|
||||
})
|
||||
}
|
||||
|
||||
/// Get the partition ranges of the scanner. This method will collapse the ranges into
|
||||
@@ -140,9 +201,14 @@ impl RegionScanExec {
|
||||
append_mode: self.append_mode,
|
||||
total_rows: self.total_rows,
|
||||
is_partition_set: true,
|
||||
distribution: self.distribution,
|
||||
})
|
||||
}
|
||||
|
||||
pub fn distribution(&self) -> Option<TimeSeriesDistribution> {
|
||||
self.distribution
|
||||
}
|
||||
|
||||
pub fn with_distinguish_partition_range(&self, distinguish_partition_range: bool) {
|
||||
let mut scanner = self.scanner.lock().unwrap();
|
||||
// set distinguish_partition_range won't fail
|
||||
@@ -388,7 +454,7 @@ mod test {
|
||||
let region_metadata = Arc::new(builder.build().unwrap());
|
||||
|
||||
let scanner = Box::new(SinglePartitionScanner::new(stream, false, region_metadata));
|
||||
let plan = RegionScanExec::new(scanner);
|
||||
let plan = RegionScanExec::new(scanner, ScanRequest::default()).unwrap();
|
||||
let actual: SchemaRef = Arc::new(
|
||||
plan.properties
|
||||
.eq_properties
|
||||
|
||||
@@ -316,3 +316,63 @@ drop table t;
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
-- ORDER BY with projections
|
||||
CREATE TABLE test (
|
||||
c1 INTEGER,
|
||||
c2 INTEGER,
|
||||
c3 STRING,
|
||||
c4 DOUBLE,
|
||||
ts TIMESTAMP TIME INDEX,
|
||||
PRIMARY KEY (c1, c3, c2)
|
||||
);
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
INSERT INTO test VALUES (1, NULL, 'a', 3.0, 1), (2, 3, 'b', 4.0, 2), (3, 4, 'c', 5.0, 3);
|
||||
|
||||
Affected Rows: 3
|
||||
|
||||
SELECT c1, c3 FROM test ORDER BY c2;
|
||||
|
||||
+----+----+
|
||||
| c1 | c3 |
|
||||
+----+----+
|
||||
| 2 | b |
|
||||
| 3 | c |
|
||||
| 1 | a |
|
||||
+----+----+
|
||||
|
||||
SELECT c1, c3 FROM test ORDER BY c2 NULLS FIRST;
|
||||
|
||||
+----+----+
|
||||
| c1 | c3 |
|
||||
+----+----+
|
||||
| 1 | a |
|
||||
| 2 | b |
|
||||
| 3 | c |
|
||||
+----+----+
|
||||
|
||||
SELECT c1, c3 FROM test ORDER BY c3, c1;
|
||||
|
||||
+----+----+
|
||||
| c1 | c3 |
|
||||
+----+----+
|
||||
| 1 | a |
|
||||
| 2 | b |
|
||||
| 3 | c |
|
||||
+----+----+
|
||||
|
||||
SELECT c2 FROM test ORDER BY ts;
|
||||
|
||||
+----+
|
||||
| c2 |
|
||||
+----+
|
||||
| |
|
||||
| 3 |
|
||||
| 4 |
|
||||
+----+
|
||||
|
||||
drop table test;
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
|
||||
@@ -97,3 +97,26 @@ select tag from t where num > 6 order by ts;
|
||||
explain analyze select tag from t where num > 6 order by ts desc limit 2;
|
||||
|
||||
drop table t;
|
||||
|
||||
-- ORDER BY with projections
|
||||
CREATE TABLE test (
|
||||
c1 INTEGER,
|
||||
c2 INTEGER,
|
||||
c3 STRING,
|
||||
c4 DOUBLE,
|
||||
ts TIMESTAMP TIME INDEX,
|
||||
PRIMARY KEY (c1, c3, c2)
|
||||
);
|
||||
|
||||
INSERT INTO test VALUES (1, NULL, 'a', 3.0, 1), (2, 3, 'b', 4.0, 2), (3, 4, 'c', 5.0, 3);
|
||||
|
||||
SELECT c1, c3 FROM test ORDER BY c2;
|
||||
|
||||
SELECT c1, c3 FROM test ORDER BY c2 NULLS FIRST;
|
||||
|
||||
SELECT c1, c3 FROM test ORDER BY c3, c1;
|
||||
|
||||
SELECT c2 FROM test ORDER BY ts;
|
||||
|
||||
drop table test;
|
||||
|
||||
|
||||
@@ -114,3 +114,39 @@ DROP TABLE test;
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
-- partition table
|
||||
CREATE TABLE test(i DOUBLE, j TIMESTAMP TIME INDEX, k STRING, l STRING, PRIMARY KEY(k, l)) PARTITION ON COLUMNS (k) (k < 'a', k >= 'a');
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
-- SQLNESS REPLACE (metrics.*) REDACTED
|
||||
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
|
||||
-- SQLNESS REPLACE (-+) -
|
||||
-- SQLNESS REPLACE (\s\s+) _
|
||||
-- SQLNESS REPLACE (peers.*) REDACTED
|
||||
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
|
||||
TQL ANALYZE (0, 10, '5s') test;
|
||||
|
||||
+-+-+-+
|
||||
| stage | node | plan_|
|
||||
+-+-+-+
|
||||
| 0_| 0_|_PromInstantManipulateExec: range=[0..10000], lookback=[300000], interval=[5000], time index=[j] REDACTED
|
||||
|_|_|_PromSeriesNormalizeExec: offset=[0], time index=[j], filter NaN: [false] REDACTED
|
||||
|_|_|_PromSeriesDivideExec: tags=["k", "l"] REDACTED
|
||||
|_|_|_SortPreservingMergeExec: [k@2 ASC NULLS LAST, l@3 ASC NULLS LAST] REDACTED
|
||||
|_|_|_SortExec: expr=[k@2 ASC NULLS LAST, l@3 ASC NULLS LAST], preserve_partitioning=[true] REDACTED
|
||||
|_|_|_MergeScanExec: REDACTED
|
||||
|_|_|_|
|
||||
| 1_| 0_|_SortExec: expr=[k@2 DESC NULLS LAST, l@3 DESC NULLS LAST, j@1 DESC NULLS LAST], preserve_partitioning=[false] REDACTED
|
||||
|_|_|_SeqScan: region=REDACTED, partition_count=0 (0 memtable ranges, 0 file 0 ranges) REDACTED
|
||||
|_|_|_|
|
||||
| 1_| 1_|_SortExec: expr=[k@2 DESC NULLS LAST, l@3 DESC NULLS LAST, j@1 DESC NULLS LAST], preserve_partitioning=[false] REDACTED
|
||||
|_|_|_SeqScan: region=REDACTED, partition_count=0 (0 memtable ranges, 0 file 0 ranges) REDACTED
|
||||
|_|_|_|
|
||||
|_|_| Total rows: 0_|
|
||||
+-+-+-+
|
||||
|
||||
drop table test;
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
|
||||
@@ -43,3 +43,16 @@ TQL ANALYZE ('1970-01-01T00:00:00'::timestamp, '1970-01-01T00:00:00'::timestamp
|
||||
TQL ANALYZE VERBOSE (0, 10, '5s') test;
|
||||
|
||||
DROP TABLE test;
|
||||
|
||||
-- partition table
|
||||
CREATE TABLE test(i DOUBLE, j TIMESTAMP TIME INDEX, k STRING, l STRING, PRIMARY KEY(k, l)) PARTITION ON COLUMNS (k) (k < 'a', k >= 'a');
|
||||
|
||||
-- SQLNESS REPLACE (metrics.*) REDACTED
|
||||
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
|
||||
-- SQLNESS REPLACE (-+) -
|
||||
-- SQLNESS REPLACE (\s\s+) _
|
||||
-- SQLNESS REPLACE (peers.*) REDACTED
|
||||
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
|
||||
TQL ANALYZE (0, 10, '5s') test;
|
||||
|
||||
drop table test;
|
||||
|
||||
Reference in New Issue
Block a user