From 73ca39f37e072ec784972679d0240058580de4a1 Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Sun, 9 Mar 2025 22:43:17 -0700 Subject: [PATCH] feat: time series distribution in scanner (#5675) * define distribution Signed-off-by: Ruihang Xia * feat: SeqScan support per series distribution * probe distribution Signed-off-by: Ruihang Xia * reverse sort order Signed-off-by: Ruihang Xia * more strict check Signed-off-by: Ruihang Xia * fix clippy Signed-off-by: Ruihang Xia * change null's ordering Signed-off-by: Ruihang Xia --------- Signed-off-by: Ruihang Xia Co-authored-by: evenyag --- src/metric-engine/src/metadata_region.rs | 2 + src/mito2/src/engine/projection_test.rs | 1 + src/mito2/src/read/range.rs | 5 +- src/mito2/src/read/scan_region.rs | 29 +++- src/mito2/src/read/seq_scan.rs | 154 +++++++++++++++--- src/query/src/dummy_catalog.rs | 11 +- src/query/src/optimizer/parallelize_scan.rs | 9 + src/query/src/optimizer/scan_hint.rs | 32 +++- src/store-api/src/storage.rs | 2 +- src/store-api/src/storage/requests.rs | 13 ++ src/table/src/lib.rs | 1 + src/table/src/table/scan.rs | 78 ++++++++- .../standalone/common/order/order_by.result | 60 +++++++ .../standalone/common/order/order_by.sql | 23 +++ .../common/tql-explain-analyze/analyze.result | 36 ++++ .../common/tql-explain-analyze/analyze.sql | 13 ++ 16 files changed, 426 insertions(+), 43 deletions(-) diff --git a/src/metric-engine/src/metadata_region.rs b/src/metric-engine/src/metadata_region.rs index 753251c72b..2b066a0bde 100644 --- a/src/metric-engine/src/metadata_region.rs +++ b/src/metric-engine/src/metadata_region.rs @@ -338,6 +338,7 @@ impl MetadataRegion { limit: None, series_row_selector: None, sequence: None, + distribution: None, } } @@ -527,6 +528,7 @@ impl MetadataRegion { limit: None, series_row_selector: None, sequence: None, + distribution: None, }; let record_batch_stream = self .mito diff --git a/src/mito2/src/engine/projection_test.rs b/src/mito2/src/engine/projection_test.rs index b3c4fc83e1..c7880d298a 100644 --- a/src/mito2/src/engine/projection_test.rs +++ b/src/mito2/src/engine/projection_test.rs @@ -80,6 +80,7 @@ async fn test_scan_projection() { limit: None, series_row_selector: None, sequence: None, + distribution: None, }; let stream = engine.scan_to_stream(region_id, request).await.unwrap(); let batches = RecordBatches::try_collect(stream).await.unwrap(); diff --git a/src/mito2/src/read/range.rs b/src/mito2/src/read/range.rs index 4dce0625a1..4d01016d64 100644 --- a/src/mito2/src/read/range.rs +++ b/src/mito2/src/read/range.rs @@ -21,6 +21,7 @@ use common_time::Timestamp; use parquet::arrow::arrow_reader::RowSelection; use smallvec::{smallvec, SmallVec}; use store_api::region_engine::PartitionRange; +use store_api::storage::TimeSeriesDistribution; use crate::cache::CacheStrategy; use crate::error::Result; @@ -98,8 +99,8 @@ impl RangeMeta { Self::push_seq_file_ranges(input.memtables.len(), &input.files, &mut ranges); let ranges = group_ranges_for_seq_scan(ranges); - if compaction { - // We don't split ranges in compaction. + if compaction || input.distribution == Some(TimeSeriesDistribution::PerSeries) { + // We don't split ranges in compaction or TimeSeriesDistribution::PerSeries. return ranges; } maybe_split_ranges_for_seq_scan(ranges) diff --git a/src/mito2/src/read/scan_region.rs b/src/mito2/src/read/scan_region.rs index fca1e89860..2349792e6f 100644 --- a/src/mito2/src/read/scan_region.rs +++ b/src/mito2/src/read/scan_region.rs @@ -31,7 +31,7 @@ use datafusion_expr::Expr; use smallvec::SmallVec; use store_api::metadata::RegionMetadata; use store_api::region_engine::{PartitionRange, RegionScannerRef}; -use store_api::storage::{ScanRequest, TimeSeriesRowSelector}; +use store_api::storage::{ScanRequest, TimeSeriesDistribution, TimeSeriesRowSelector}; use table::predicate::{build_time_range_predicate, Predicate}; use tokio::sync::{mpsc, Semaphore}; use tokio_stream::wrappers::ReceiverStream; @@ -287,9 +287,16 @@ impl ScanRegion { /// Returns true if the region can use unordered scan for current request. fn use_unordered_scan(&self) -> bool { - // If table is append only and there is no series row selector, we use unordered scan in query. + // We use unordered scan when: + // 1. The region is in append mode. + // 2. There is no series row selector. + // 3. The required distribution is None or TimeSeriesDistribution::TimeWindowed. + // // We still use seq scan in compaction. - self.version.options.append_mode && self.request.series_row_selector.is_none() + self.version.options.append_mode + && self.request.series_row_selector.is_none() + && (self.request.distribution.is_none() + || self.request.distribution == Some(TimeSeriesDistribution::TimeWindowed)) } /// Creates a scan input. @@ -377,7 +384,8 @@ impl ScanRegion { .with_append_mode(self.version.options.append_mode) .with_filter_deleted(filter_deleted) .with_merge_mode(self.version.options.merge_mode()) - .with_series_row_selector(self.request.series_row_selector); + .with_series_row_selector(self.request.series_row_selector) + .with_distribution(self.request.distribution); Ok(input) } @@ -557,6 +565,8 @@ pub(crate) struct ScanInput { pub(crate) merge_mode: MergeMode, /// Hint to select rows from time series. pub(crate) series_row_selector: Option, + /// Hint for the required distribution of the scanner. + pub(crate) distribution: Option, } impl ScanInput { @@ -581,6 +591,7 @@ impl ScanInput { filter_deleted: true, merge_mode: MergeMode::default(), series_row_selector: None, + distribution: None, } } @@ -693,6 +704,16 @@ impl ScanInput { self } + /// Sets the distribution hint. + #[must_use] + pub(crate) fn with_distribution( + mut self, + distribution: Option, + ) -> Self { + self.distribution = distribution; + self + } + /// Sets the time series row selector. #[must_use] pub(crate) fn with_series_row_selector( diff --git a/src/mito2/src/read/seq_scan.rs b/src/mito2/src/read/seq_scan.rs index 5ae5b6b8ec..99cb9348bc 100644 --- a/src/mito2/src/read/seq_scan.rs +++ b/src/mito2/src/read/seq_scan.rs @@ -29,7 +29,7 @@ use datatypes::schema::SchemaRef; use snafu::ResultExt; use store_api::metadata::RegionMetadataRef; use store_api::region_engine::{PartitionRange, PrepareRequest, RegionScanner, ScannerProperties}; -use store_api::storage::TimeSeriesRowSelector; +use store_api::storage::{TimeSeriesDistribution, TimeSeriesRowSelector}; use tokio::sync::Semaphore; use crate::error::{PartitionOutOfRangeSnafu, Result}; @@ -206,32 +206,16 @@ impl SeqScan { )); } + if self.stream_ctx.input.distribution == Some(TimeSeriesDistribution::PerSeries) { + return self.scan_partition_by_series(partition); + } + let stream_ctx = self.stream_ctx.clone(); - let semaphore = if self.properties.target_partitions() > self.properties.num_partitions() { - // We can use additional tasks to read the data if we have more target partitions than actual partitions. - // This semaphore is partition level. - // We don't use a global semaphore to avoid a partition waiting for others. The final concurrency - // of tasks usually won't exceed the target partitions a lot as compaction can reduce the number of - // files in a part range. - Some(Arc::new(Semaphore::new( - self.properties.target_partitions() - self.properties.num_partitions() + 1, - ))) - } else { - None - }; + let semaphore = self.new_semaphore(); let partition_ranges = self.properties.partitions[partition].clone(); let compaction = self.compaction; let distinguish_range = self.properties.distinguish_partition_range; - let part_metrics = PartitionMetrics::new( - self.stream_ctx.input.mapper.metadata().region_id, - partition, - get_scanner_type(self.compaction), - stream_ctx.query_start, - ScannerMetrics { - prepare_scan_cost: self.stream_ctx.query_start.elapsed(), - ..Default::default() - }, - ); + let part_metrics = self.new_partition_metrics(partition); let stream = try_stream! { part_metrics.on_first_poll(); @@ -321,6 +305,124 @@ impl SeqScan { Ok(stream) } + + /// Scans all ranges in the given partition and merge by time series. + /// Otherwise the returned stream might not contains any data. + fn scan_partition_by_series( + &self, + partition: usize, + ) -> Result { + let stream_ctx = self.stream_ctx.clone(); + let semaphore = self.new_semaphore(); + let partition_ranges = self.properties.partitions[partition].clone(); + let distinguish_range = self.properties.distinguish_partition_range; + let part_metrics = self.new_partition_metrics(partition); + debug_assert!(!self.compaction); + + let stream = try_stream! { + part_metrics.on_first_poll(); + + let range_builder_list = Arc::new(RangeBuilderList::new( + stream_ctx.input.num_memtables(), + stream_ctx.input.num_files(), + )); + // Scans all parts. + let mut sources = Vec::with_capacity(partition_ranges.len()); + for part_range in partition_ranges { + build_sources( + &stream_ctx, + &part_range, + false, + &part_metrics, + range_builder_list.clone(), + &mut sources, + ); + } + + // Builds a reader that merge sources from all parts. + let mut reader = + Self::build_reader_from_sources(&stream_ctx, sources, semaphore.clone()) + .await + .map_err(BoxedError::new) + .context(ExternalSnafu)?; + let cache = &stream_ctx.input.cache_strategy; + let mut metrics = ScannerMetrics::default(); + let mut fetch_start = Instant::now(); + + while let Some(batch) = reader + .next_batch() + .await + .map_err(BoxedError::new) + .context(ExternalSnafu)? + { + metrics.scan_cost += fetch_start.elapsed(); + metrics.num_batches += 1; + metrics.num_rows += batch.num_rows(); + + debug_assert!(!batch.is_empty()); + if batch.is_empty() { + continue; + } + + let convert_start = Instant::now(); + let record_batch = stream_ctx.input.mapper.convert(&batch, cache)?; + metrics.convert_cost += convert_start.elapsed(); + let yield_start = Instant::now(); + yield record_batch; + metrics.yield_cost += yield_start.elapsed(); + + fetch_start = Instant::now(); + } + + // Yields an empty part to indicate this range is terminated. + // The query engine can use this to optimize some queries. + if distinguish_range { + let yield_start = Instant::now(); + yield stream_ctx.input.mapper.empty_record_batch(); + metrics.yield_cost += yield_start.elapsed(); + } + + metrics.scan_cost += fetch_start.elapsed(); + part_metrics.merge_metrics(&metrics); + + part_metrics.on_finish(); + }; + + let stream = Box::pin(RecordBatchStreamWrapper::new( + self.stream_ctx.input.mapper.output_schema(), + Box::pin(stream), + )); + + Ok(stream) + } + + fn new_semaphore(&self) -> Option> { + if self.properties.target_partitions() > self.properties.num_partitions() { + // We can use additional tasks to read the data if we have more target partitions than actual partitions. + // This semaphore is partition level. + // We don't use a global semaphore to avoid a partition waiting for others. The final concurrency + // of tasks usually won't exceed the target partitions a lot as compaction can reduce the number of + // files in a part range. + Some(Arc::new(Semaphore::new( + self.properties.target_partitions() - self.properties.num_partitions() + 1, + ))) + } else { + None + } + } + + fn new_partition_metrics(&self, partition: usize) -> PartitionMetrics { + PartitionMetrics::new( + self.stream_ctx.input.mapper.metadata().region_id, + partition, + get_scanner_type(self.compaction), + self.stream_ctx.query_start, + ScannerMetrics { + prepare_scan_cost: self.stream_ctx.query_start.elapsed(), + ..Default::default() + }, + ) + } } impl RegionScanner for SeqScan { @@ -370,7 +472,7 @@ impl fmt::Debug for SeqScan { } } -/// Builds sources for the partition range. +/// Builds sources for the partition range and push them to the `sources` vector. fn build_sources( stream_ctx: &Arc, part_range: &PartitionRange, @@ -382,8 +484,8 @@ fn build_sources( // Gets range meta. let range_meta = &stream_ctx.ranges[part_range.identifier]; #[cfg(debug_assertions)] - if compaction { - // Compaction expects input sources are not been split. + if compaction || stream_ctx.input.distribution == Some(TimeSeriesDistribution::PerSeries) { + // Compaction or per series distribution expects input sources are not been split. debug_assert_eq!(range_meta.indices.len(), range_meta.row_group_indices.len()); for (i, row_group_idx) in range_meta.row_group_indices.iter().enumerate() { // It should scan all row groups. diff --git a/src/query/src/dummy_catalog.rs b/src/query/src/dummy_catalog.rs index cbd9749b2f..0e3b3616a9 100644 --- a/src/query/src/dummy_catalog.rs +++ b/src/query/src/dummy_catalog.rs @@ -31,7 +31,7 @@ use datatypes::arrow::datatypes::SchemaRef; use snafu::ResultExt; use store_api::metadata::RegionMetadataRef; use store_api::region_engine::RegionEngineRef; -use store_api::storage::{RegionId, ScanRequest, TimeSeriesRowSelector}; +use store_api::storage::{RegionId, ScanRequest, TimeSeriesDistribution, TimeSeriesRowSelector}; use table::table::scan::RegionScanExec; use crate::error::{GetRegionMetadataSnafu, Result}; @@ -175,10 +175,10 @@ impl TableProvider for DummyTableProvider { let scanner = self .engine - .handle_query(self.region_id, request) + .handle_query(self.region_id, request.clone()) .await .map_err(|e| DataFusionError::External(Box::new(e)))?; - Ok(Arc::new(RegionScanExec::new(scanner))) + Ok(Arc::new(RegionScanExec::new(scanner, request)?)) } fn supports_filters_pushdown( @@ -233,6 +233,11 @@ impl DummyTableProvider { self.scan_request.lock().unwrap().output_ordering = Some(order_opts.to_vec()); } + /// Sets the distribution hint of the query to the provider. + pub fn with_distribution(&self, distribution: TimeSeriesDistribution) { + self.scan_request.lock().unwrap().distribution = Some(distribution); + } + /// Sets the time series selector hint of the query to the provider. pub fn with_time_series_selector_hint(&self, selector: TimeSeriesRowSelector) { self.scan_request.lock().unwrap().series_row_selector = Some(selector); diff --git a/src/query/src/optimizer/parallelize_scan.rs b/src/query/src/optimizer/parallelize_scan.rs index eb712ec920..b6d22f7de5 100644 --- a/src/query/src/optimizer/parallelize_scan.rs +++ b/src/query/src/optimizer/parallelize_scan.rs @@ -23,6 +23,7 @@ use datafusion::physical_plan::ExecutionPlan; use datafusion_common::tree_node::{Transformed, TreeNode}; use datafusion_common::{DataFusionError, Result}; use store_api::region_engine::PartitionRange; +use store_api::storage::TimeSeriesDistribution; use table::table::scan::RegionScanExec; #[derive(Debug)] @@ -65,6 +66,14 @@ impl ParallelizeScan { return Ok(Transformed::no(plan)); } + // don't parallelize if we want per series distribution + if matches!( + region_scan_exec.distribution(), + Some(TimeSeriesDistribution::PerSeries) + ) { + return Ok(Transformed::no(plan)); + } + let ranges = region_scan_exec.get_partition_ranges(); let total_range_num = ranges.len(); let expected_partition_num = config.execution.target_partitions; diff --git a/src/query/src/optimizer/scan_hint.rs b/src/query/src/optimizer/scan_hint.rs index 6be11d63c1..e8cc95c7f1 100644 --- a/src/query/src/optimizer/scan_hint.rs +++ b/src/query/src/optimizer/scan_hint.rs @@ -23,7 +23,7 @@ use datafusion_common::{Column, Result}; use datafusion_expr::expr::Sort; use datafusion_expr::{utils, Expr, LogicalPlan}; use datafusion_optimizer::{OptimizerConfig, OptimizerRule}; -use store_api::storage::TimeSeriesRowSelector; +use store_api::storage::{TimeSeriesDistribution, TimeSeriesRowSelector}; use crate::dummy_catalog::DummyTableProvider; @@ -121,6 +121,36 @@ impl ScanHintRule { }); } adapter.with_ordering_hint(&opts); + + let mut sort_expr_cursor = order_expr.iter().filter_map(|s| s.expr.try_as_col()); + let region_metadata = adapter.region_metadata(); + // ignore table without pk + if region_metadata.primary_key.is_empty() { + return; + } + let mut pk_column_iter = region_metadata.primary_key_columns(); + let mut curr_sort_expr = sort_expr_cursor.next(); + let mut curr_pk_col = pk_column_iter.next(); + + while let (Some(sort_expr), Some(pk_col)) = (curr_sort_expr, curr_pk_col) { + if sort_expr.name == pk_col.column_schema.name { + curr_sort_expr = sort_expr_cursor.next(); + curr_pk_col = pk_column_iter.next(); + } else { + return; + } + } + + let next_remaining = sort_expr_cursor.next(); + match (curr_sort_expr, next_remaining) { + (Some(expr), None) + if expr.name == region_metadata.time_index_column().column_schema.name => + { + adapter.with_distribution(TimeSeriesDistribution::PerSeries); + } + (None, _) => adapter.with_distribution(TimeSeriesDistribution::PerSeries), + (Some(_), _) => {} + } } fn set_time_series_row_selector_hint( diff --git a/src/store-api/src/storage.rs b/src/store-api/src/storage.rs index d3331e4e70..97e25542b1 100644 --- a/src/store-api/src/storage.rs +++ b/src/store-api/src/storage.rs @@ -25,5 +25,5 @@ pub use datatypes::schema::{ }; pub use self::descriptors::*; -pub use self::requests::{ScanRequest, TimeSeriesRowSelector}; +pub use self::requests::{ScanRequest, TimeSeriesDistribution, TimeSeriesRowSelector}; pub use self::types::SequenceNumber; diff --git a/src/store-api/src/storage/requests.rs b/src/store-api/src/storage/requests.rs index dfdcd1037d..c9a440eaea 100644 --- a/src/store-api/src/storage/requests.rs +++ b/src/store-api/src/storage/requests.rs @@ -25,6 +25,17 @@ pub enum TimeSeriesRowSelector { LastRow, } +/// A hint on how to distribute time-series data on the scan output. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Display)] +pub enum TimeSeriesDistribution { + /// Data are distributed by time window first. The scanner will + /// return all data within one time window before moving to the next one. + TimeWindowed, + /// Data are organized by time-series first. The scanner will return + /// all data for one time-series before moving to the next one. + PerSeries, +} + #[derive(Default, Clone, Debug, PartialEq, Eq)] pub struct ScanRequest { /// Indices of columns to read, `None` to read all columns. This indices is @@ -45,4 +56,6 @@ pub struct ScanRequest { /// If set, only rows with a sequence number lesser or equal to this value /// will be returned. pub sequence: Option, + /// Optional hint for the distribution of time-series data. + pub distribution: Option, } diff --git a/src/table/src/lib.rs b/src/table/src/lib.rs index f4eb68cc85..7b1d5372a5 100644 --- a/src/table/src/lib.rs +++ b/src/table/src/lib.rs @@ -13,6 +13,7 @@ // limitations under the License. #![feature(assert_matches)] +#![feature(try_blocks)] pub mod dist_table; pub mod error; diff --git a/src/table/src/table/scan.rs b/src/table/src/table/scan.rs index 04424cf5a1..9c9435d141 100644 --- a/src/table/src/table/scan.rs +++ b/src/table/src/table/scan.rs @@ -32,10 +32,15 @@ use datafusion::physical_plan::{ }; use datafusion_common::stats::Precision; use datafusion_common::{ColumnStatistics, DataFusionError, Statistics}; -use datafusion_physical_expr::{EquivalenceProperties, Partitioning, PhysicalSortExpr}; +use datafusion_physical_expr::expressions::Column; +use datafusion_physical_expr::{ + EquivalenceProperties, LexOrdering, Partitioning, PhysicalSortExpr, +}; use datatypes::arrow::datatypes::SchemaRef as ArrowSchemaRef; +use datatypes::compute::SortOptions; use futures::{Stream, StreamExt}; use store_api::region_engine::{PartitionRange, PrepareRequest, RegionScannerRef}; +use store_api::storage::{ScanRequest, TimeSeriesDistribution}; use crate::table::metrics::StreamMetrics; @@ -51,10 +56,12 @@ pub struct RegionScanExec { append_mode: bool, total_rows: usize, is_partition_set: bool, + // TODO(ruihang): handle TimeWindowed dist via this parameter + distribution: Option, } impl RegionScanExec { - pub fn new(scanner: RegionScannerRef) -> Self { + pub fn new(scanner: RegionScannerRef, request: ScanRequest) -> DfResult { let arrow_schema = scanner.schema().arrow_schema().clone(); let scanner_props = scanner.properties(); let mut num_output_partition = scanner_props.num_partitions(); @@ -64,14 +71,67 @@ impl RegionScanExec { if num_output_partition == 0 { num_output_partition = 1; } + + let metadata = scanner.metadata(); + let mut pk_columns: Vec = metadata + .primary_key_columns() + .filter_map(|col| { + Some(PhysicalSortExpr::new( + Arc::new(Column::new_with_schema(&col.column_schema.name, &arrow_schema).ok()?) + as _, + SortOptions { + descending: false, + nulls_first: true, + }, + )) + }) + .collect::>(); + let ts_col: Option = try { + PhysicalSortExpr::new( + Arc::new( + Column::new_with_schema( + &metadata.time_index_column().column_schema.name, + &arrow_schema, + ) + .ok()?, + ) as _, + SortOptions { + descending: false, + nulls_first: true, + }, + ) + }; + + let eq_props = match request.distribution { + Some(TimeSeriesDistribution::PerSeries) => { + if let Some(ts) = ts_col { + pk_columns.push(ts); + } + EquivalenceProperties::new_with_orderings( + arrow_schema.clone(), + &[LexOrdering::new(pk_columns)], + ) + } + Some(TimeSeriesDistribution::TimeWindowed) => { + if let Some(ts_col) = ts_col { + pk_columns.insert(0, ts_col); + } + EquivalenceProperties::new_with_orderings( + arrow_schema.clone(), + &[LexOrdering::new(pk_columns)], + ) + } + None => EquivalenceProperties::new(arrow_schema.clone()), + }; + let properties = PlanProperties::new( - EquivalenceProperties::new(arrow_schema.clone()), + eq_props, Partitioning::UnknownPartitioning(num_output_partition), ExecutionMode::Bounded, ); let append_mode = scanner_props.append_mode(); let total_rows = scanner_props.total_rows(); - Self { + Ok(Self { scanner: Arc::new(Mutex::new(scanner)), arrow_schema, output_ordering: None, @@ -80,7 +140,8 @@ impl RegionScanExec { append_mode, total_rows, is_partition_set: false, - } + distribution: request.distribution, + }) } /// Get the partition ranges of the scanner. This method will collapse the ranges into @@ -140,9 +201,14 @@ impl RegionScanExec { append_mode: self.append_mode, total_rows: self.total_rows, is_partition_set: true, + distribution: self.distribution, }) } + pub fn distribution(&self) -> Option { + self.distribution + } + pub fn with_distinguish_partition_range(&self, distinguish_partition_range: bool) { let mut scanner = self.scanner.lock().unwrap(); // set distinguish_partition_range won't fail @@ -388,7 +454,7 @@ mod test { let region_metadata = Arc::new(builder.build().unwrap()); let scanner = Box::new(SinglePartitionScanner::new(stream, false, region_metadata)); - let plan = RegionScanExec::new(scanner); + let plan = RegionScanExec::new(scanner, ScanRequest::default()).unwrap(); let actual: SchemaRef = Arc::new( plan.properties .eq_properties diff --git a/tests/cases/standalone/common/order/order_by.result b/tests/cases/standalone/common/order/order_by.result index b052f8ffd9..c586b15879 100644 --- a/tests/cases/standalone/common/order/order_by.result +++ b/tests/cases/standalone/common/order/order_by.result @@ -316,3 +316,63 @@ drop table t; Affected Rows: 0 +-- ORDER BY with projections +CREATE TABLE test ( + c1 INTEGER, + c2 INTEGER, + c3 STRING, + c4 DOUBLE, + ts TIMESTAMP TIME INDEX, + PRIMARY KEY (c1, c3, c2) +); + +Affected Rows: 0 + +INSERT INTO test VALUES (1, NULL, 'a', 3.0, 1), (2, 3, 'b', 4.0, 2), (3, 4, 'c', 5.0, 3); + +Affected Rows: 3 + +SELECT c1, c3 FROM test ORDER BY c2; + ++----+----+ +| c1 | c3 | ++----+----+ +| 2 | b | +| 3 | c | +| 1 | a | ++----+----+ + +SELECT c1, c3 FROM test ORDER BY c2 NULLS FIRST; + ++----+----+ +| c1 | c3 | ++----+----+ +| 1 | a | +| 2 | b | +| 3 | c | ++----+----+ + +SELECT c1, c3 FROM test ORDER BY c3, c1; + ++----+----+ +| c1 | c3 | ++----+----+ +| 1 | a | +| 2 | b | +| 3 | c | ++----+----+ + +SELECT c2 FROM test ORDER BY ts; + ++----+ +| c2 | ++----+ +| | +| 3 | +| 4 | ++----+ + +drop table test; + +Affected Rows: 0 + diff --git a/tests/cases/standalone/common/order/order_by.sql b/tests/cases/standalone/common/order/order_by.sql index 6e140e5c04..2461cf5a13 100644 --- a/tests/cases/standalone/common/order/order_by.sql +++ b/tests/cases/standalone/common/order/order_by.sql @@ -97,3 +97,26 @@ select tag from t where num > 6 order by ts; explain analyze select tag from t where num > 6 order by ts desc limit 2; drop table t; + +-- ORDER BY with projections +CREATE TABLE test ( + c1 INTEGER, + c2 INTEGER, + c3 STRING, + c4 DOUBLE, + ts TIMESTAMP TIME INDEX, + PRIMARY KEY (c1, c3, c2) +); + +INSERT INTO test VALUES (1, NULL, 'a', 3.0, 1), (2, 3, 'b', 4.0, 2), (3, 4, 'c', 5.0, 3); + +SELECT c1, c3 FROM test ORDER BY c2; + +SELECT c1, c3 FROM test ORDER BY c2 NULLS FIRST; + +SELECT c1, c3 FROM test ORDER BY c3, c1; + +SELECT c2 FROM test ORDER BY ts; + +drop table test; + diff --git a/tests/cases/standalone/common/tql-explain-analyze/analyze.result b/tests/cases/standalone/common/tql-explain-analyze/analyze.result index f9c1e3651e..03b3f8d3fa 100644 --- a/tests/cases/standalone/common/tql-explain-analyze/analyze.result +++ b/tests/cases/standalone/common/tql-explain-analyze/analyze.result @@ -114,3 +114,39 @@ DROP TABLE test; Affected Rows: 0 +-- partition table +CREATE TABLE test(i DOUBLE, j TIMESTAMP TIME INDEX, k STRING, l STRING, PRIMARY KEY(k, l)) PARTITION ON COLUMNS (k) (k < 'a', k >= 'a'); + +Affected Rows: 0 + +-- SQLNESS REPLACE (metrics.*) REDACTED +-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED +-- SQLNESS REPLACE (-+) - +-- SQLNESS REPLACE (\s\s+) _ +-- SQLNESS REPLACE (peers.*) REDACTED +-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED +TQL ANALYZE (0, 10, '5s') test; + ++-+-+-+ +| stage | node | plan_| ++-+-+-+ +| 0_| 0_|_PromInstantManipulateExec: range=[0..10000], lookback=[300000], interval=[5000], time index=[j] REDACTED +|_|_|_PromSeriesNormalizeExec: offset=[0], time index=[j], filter NaN: [false] REDACTED +|_|_|_PromSeriesDivideExec: tags=["k", "l"] REDACTED +|_|_|_SortPreservingMergeExec: [k@2 ASC NULLS LAST, l@3 ASC NULLS LAST] REDACTED +|_|_|_SortExec: expr=[k@2 ASC NULLS LAST, l@3 ASC NULLS LAST], preserve_partitioning=[true] REDACTED +|_|_|_MergeScanExec: REDACTED +|_|_|_| +| 1_| 0_|_SortExec: expr=[k@2 DESC NULLS LAST, l@3 DESC NULLS LAST, j@1 DESC NULLS LAST], preserve_partitioning=[false] REDACTED +|_|_|_SeqScan: region=REDACTED, partition_count=0 (0 memtable ranges, 0 file 0 ranges) REDACTED +|_|_|_| +| 1_| 1_|_SortExec: expr=[k@2 DESC NULLS LAST, l@3 DESC NULLS LAST, j@1 DESC NULLS LAST], preserve_partitioning=[false] REDACTED +|_|_|_SeqScan: region=REDACTED, partition_count=0 (0 memtable ranges, 0 file 0 ranges) REDACTED +|_|_|_| +|_|_| Total rows: 0_| ++-+-+-+ + +drop table test; + +Affected Rows: 0 + diff --git a/tests/cases/standalone/common/tql-explain-analyze/analyze.sql b/tests/cases/standalone/common/tql-explain-analyze/analyze.sql index 639e1e8597..0fb3613746 100644 --- a/tests/cases/standalone/common/tql-explain-analyze/analyze.sql +++ b/tests/cases/standalone/common/tql-explain-analyze/analyze.sql @@ -43,3 +43,16 @@ TQL ANALYZE ('1970-01-01T00:00:00'::timestamp, '1970-01-01T00:00:00'::timestamp TQL ANALYZE VERBOSE (0, 10, '5s') test; DROP TABLE test; + +-- partition table +CREATE TABLE test(i DOUBLE, j TIMESTAMP TIME INDEX, k STRING, l STRING, PRIMARY KEY(k, l)) PARTITION ON COLUMNS (k) (k < 'a', k >= 'a'); + +-- SQLNESS REPLACE (metrics.*) REDACTED +-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED +-- SQLNESS REPLACE (-+) - +-- SQLNESS REPLACE (\s\s+) _ +-- SQLNESS REPLACE (peers.*) REDACTED +-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED +TQL ANALYZE (0, 10, '5s') test; + +drop table test;