mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2025-12-23 14:40:01 +00:00
Compare commits
36 Commits
docs/vecto
...
jkt
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
edd8cb6710 | ||
|
|
7ee61e5d28 | ||
|
|
1b30aca5a5 | ||
|
|
99b352cea1 | ||
|
|
0f521956bf | ||
|
|
aee72ab363 | ||
|
|
5b78d76fc5 | ||
|
|
a166430650 | ||
|
|
007a2b3dfe | ||
|
|
f35e957ddd | ||
|
|
68414bf593 | ||
|
|
5e836a0d1b | ||
|
|
f5e0da2fc8 | ||
|
|
fb96d26ebf | ||
|
|
0046d3f65b | ||
|
|
d7b97fc877 | ||
|
|
bfdaa28b25 | ||
|
|
6293bb1f5b | ||
|
|
8fa1ebcc3e | ||
|
|
c18c3f5839 | ||
|
|
629e72d8c0 | ||
|
|
e4065505ab | ||
|
|
aafd164483 | ||
|
|
1386e903d6 | ||
|
|
12692a940c | ||
|
|
4d44cbb8b2 | ||
|
|
f4911aa3bb | ||
|
|
5ac61f17bc | ||
|
|
e0d34c6d95 | ||
|
|
8a98b9c433 | ||
|
|
1f5d36a203 | ||
|
|
6fc7168893 | ||
|
|
2799d67212 | ||
|
|
d97a76c312 | ||
|
|
15caca244e | ||
|
|
8638075cdd |
@@ -163,7 +163,7 @@ paste = "1.0"
|
|||||||
pin-project = "1.0"
|
pin-project = "1.0"
|
||||||
prometheus = { version = "0.13.3", features = ["process"] }
|
prometheus = { version = "0.13.3", features = ["process"] }
|
||||||
promql-parser = { version = "0.5.1", features = ["ser"] }
|
promql-parser = { version = "0.5.1", features = ["ser"] }
|
||||||
prost = "0.13"
|
prost = { version = "0.13", features = ["no-recursion-limit"] }
|
||||||
raft-engine = { version = "0.4.1", default-features = false }
|
raft-engine = { version = "0.4.1", default-features = false }
|
||||||
rand = "0.9"
|
rand = "0.9"
|
||||||
ratelimit = "0.10"
|
ratelimit = "0.10"
|
||||||
|
|||||||
@@ -37,7 +37,7 @@ impl fmt::Display for RateFunction {
|
|||||||
|
|
||||||
impl Function for RateFunction {
|
impl Function for RateFunction {
|
||||||
fn name(&self) -> &str {
|
fn name(&self) -> &str {
|
||||||
"prom_rate"
|
"rate"
|
||||||
}
|
}
|
||||||
|
|
||||||
fn return_type(&self, _input_types: &[ConcreteDataType]) -> Result<ConcreteDataType> {
|
fn return_type(&self, _input_types: &[ConcreteDataType]) -> Result<ConcreteDataType> {
|
||||||
@@ -82,7 +82,7 @@ mod tests {
|
|||||||
#[test]
|
#[test]
|
||||||
fn test_rate_function() {
|
fn test_rate_function() {
|
||||||
let rate = RateFunction;
|
let rate = RateFunction;
|
||||||
assert_eq!("prom_rate", rate.name());
|
assert_eq!("rate", rate.name());
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
ConcreteDataType::float64_datatype(),
|
ConcreteDataType::float64_datatype(),
|
||||||
rate.return_type(&[]).unwrap()
|
rate.return_type(&[]).unwrap()
|
||||||
|
|||||||
@@ -16,8 +16,8 @@ use std::any::Any;
|
|||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
use arrow::array::Array;
|
use arrow::array::Array;
|
||||||
use arrow::datatypes::Int32Type;
|
use arrow::datatypes::Int64Type;
|
||||||
use arrow_array::{ArrayRef, DictionaryArray, Int32Array};
|
use arrow_array::{ArrayRef, DictionaryArray, Int64Array};
|
||||||
use serde_json::Value as JsonValue;
|
use serde_json::Value as JsonValue;
|
||||||
use snafu::ResultExt;
|
use snafu::ResultExt;
|
||||||
|
|
||||||
@@ -32,7 +32,7 @@ use crate::vectors::{self, Helper, Validity, Vector, VectorRef};
|
|||||||
/// Vector of dictionaries, basically backed by Arrow's `DictionaryArray`.
|
/// Vector of dictionaries, basically backed by Arrow's `DictionaryArray`.
|
||||||
#[derive(Debug, PartialEq)]
|
#[derive(Debug, PartialEq)]
|
||||||
pub struct DictionaryVector {
|
pub struct DictionaryVector {
|
||||||
array: DictionaryArray<Int32Type>,
|
array: DictionaryArray<Int64Type>,
|
||||||
/// The datatype of the items in the dictionary.
|
/// The datatype of the items in the dictionary.
|
||||||
item_type: ConcreteDataType,
|
item_type: ConcreteDataType,
|
||||||
/// The vector of items in the dictionary.
|
/// The vector of items in the dictionary.
|
||||||
@@ -41,7 +41,7 @@ pub struct DictionaryVector {
|
|||||||
|
|
||||||
impl DictionaryVector {
|
impl DictionaryVector {
|
||||||
/// Create a new instance of `DictionaryVector` from a dictionary array and item type
|
/// Create a new instance of `DictionaryVector` from a dictionary array and item type
|
||||||
pub fn new(array: DictionaryArray<Int32Type>, item_type: ConcreteDataType) -> Result<Self> {
|
pub fn new(array: DictionaryArray<Int64Type>, item_type: ConcreteDataType) -> Result<Self> {
|
||||||
let item_vector = Helper::try_into_vector(array.values())?;
|
let item_vector = Helper::try_into_vector(array.values())?;
|
||||||
|
|
||||||
Ok(Self {
|
Ok(Self {
|
||||||
@@ -52,12 +52,12 @@ impl DictionaryVector {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Returns the underlying Arrow dictionary array
|
/// Returns the underlying Arrow dictionary array
|
||||||
pub fn array(&self) -> &DictionaryArray<Int32Type> {
|
pub fn array(&self) -> &DictionaryArray<Int64Type> {
|
||||||
&self.array
|
&self.array
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Returns the keys array of this dictionary
|
/// Returns the keys array of this dictionary
|
||||||
pub fn keys(&self) -> &arrow_array::PrimitiveArray<Int32Type> {
|
pub fn keys(&self) -> &arrow_array::PrimitiveArray<Int64Type> {
|
||||||
self.array.keys()
|
self.array.keys()
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -74,7 +74,7 @@ impl DictionaryVector {
|
|||||||
impl Vector for DictionaryVector {
|
impl Vector for DictionaryVector {
|
||||||
fn data_type(&self) -> ConcreteDataType {
|
fn data_type(&self) -> ConcreteDataType {
|
||||||
ConcreteDataType::Dictionary(DictionaryType::new(
|
ConcreteDataType::Dictionary(DictionaryType::new(
|
||||||
ConcreteDataType::int32_datatype(),
|
ConcreteDataType::int64_datatype(),
|
||||||
self.item_type.clone(),
|
self.item_type.clone(),
|
||||||
))
|
))
|
||||||
}
|
}
|
||||||
@@ -163,10 +163,10 @@ impl Serializable for DictionaryVector {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl TryFrom<DictionaryArray<Int32Type>> for DictionaryVector {
|
impl TryFrom<DictionaryArray<Int64Type>> for DictionaryVector {
|
||||||
type Error = crate::error::Error;
|
type Error = crate::error::Error;
|
||||||
|
|
||||||
fn try_from(array: DictionaryArray<Int32Type>) -> Result<Self> {
|
fn try_from(array: DictionaryArray<Int64Type>) -> Result<Self> {
|
||||||
let item_type = ConcreteDataType::from_arrow_type(array.values().data_type());
|
let item_type = ConcreteDataType::from_arrow_type(array.values().data_type());
|
||||||
let item_vector = Helper::try_into_vector(array.values())?;
|
let item_vector = Helper::try_into_vector(array.values())?;
|
||||||
|
|
||||||
@@ -243,7 +243,7 @@ impl VectorOp for DictionaryVector {
|
|||||||
previous_offset = offset;
|
previous_offset = offset;
|
||||||
}
|
}
|
||||||
|
|
||||||
let new_keys = Int32Array::from(replicated_keys);
|
let new_keys = Int64Array::from(replicated_keys);
|
||||||
let new_array = DictionaryArray::try_new(new_keys, self.values().clone())
|
let new_array = DictionaryArray::try_new(new_keys, self.values().clone())
|
||||||
.expect("Failed to create replicated dictionary array");
|
.expect("Failed to create replicated dictionary array");
|
||||||
|
|
||||||
@@ -261,7 +261,7 @@ impl VectorOp for DictionaryVector {
|
|||||||
let filtered_key_array = filtered_key_vector.to_arrow_array();
|
let filtered_key_array = filtered_key_vector.to_arrow_array();
|
||||||
let filtered_key_array = filtered_key_array
|
let filtered_key_array = filtered_key_array
|
||||||
.as_any()
|
.as_any()
|
||||||
.downcast_ref::<Int32Array>()
|
.downcast_ref::<Int64Array>()
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
let new_array = DictionaryArray::try_new(filtered_key_array.clone(), self.values().clone())
|
let new_array = DictionaryArray::try_new(filtered_key_array.clone(), self.values().clone())
|
||||||
@@ -291,7 +291,7 @@ impl VectorOp for DictionaryVector {
|
|||||||
let key_vector = Helper::try_into_vector(&key_array)?;
|
let key_vector = Helper::try_into_vector(&key_array)?;
|
||||||
let new_key_vector = key_vector.take(indices)?;
|
let new_key_vector = key_vector.take(indices)?;
|
||||||
let new_key_array = new_key_vector.to_arrow_array();
|
let new_key_array = new_key_vector.to_arrow_array();
|
||||||
let new_key_array = new_key_array.as_any().downcast_ref::<Int32Array>().unwrap();
|
let new_key_array = new_key_array.as_any().downcast_ref::<Int64Array>().unwrap();
|
||||||
|
|
||||||
let new_array = DictionaryArray::try_new(new_key_array.clone(), self.values().clone())
|
let new_array = DictionaryArray::try_new(new_key_array.clone(), self.values().clone())
|
||||||
.expect("Failed to create filtered dictionary array");
|
.expect("Failed to create filtered dictionary array");
|
||||||
@@ -318,7 +318,7 @@ mod tests {
|
|||||||
// Keys: [0, 1, 2, null, 1, 3]
|
// Keys: [0, 1, 2, null, 1, 3]
|
||||||
// Resulting in: ["a", "b", "c", null, "b", "d"]
|
// Resulting in: ["a", "b", "c", null, "b", "d"]
|
||||||
let values = StringArray::from(vec!["a", "b", "c", "d"]);
|
let values = StringArray::from(vec!["a", "b", "c", "d"]);
|
||||||
let keys = Int32Array::from(vec![Some(0), Some(1), Some(2), None, Some(1), Some(3)]);
|
let keys = Int64Array::from(vec![Some(0), Some(1), Some(2), None, Some(1), Some(3)]);
|
||||||
let dict_array = DictionaryArray::new(keys, Arc::new(values));
|
let dict_array = DictionaryArray::new(keys, Arc::new(values));
|
||||||
DictionaryVector::try_from(dict_array).unwrap()
|
DictionaryVector::try_from(dict_array).unwrap()
|
||||||
}
|
}
|
||||||
@@ -404,7 +404,7 @@ mod tests {
|
|||||||
assert_eq!(
|
assert_eq!(
|
||||||
casted.data_type(),
|
casted.data_type(),
|
||||||
ConcreteDataType::Dictionary(DictionaryType::new(
|
ConcreteDataType::Dictionary(DictionaryType::new(
|
||||||
ConcreteDataType::int32_datatype(),
|
ConcreteDataType::int64_datatype(),
|
||||||
ConcreteDataType::string_datatype(),
|
ConcreteDataType::string_datatype(),
|
||||||
))
|
))
|
||||||
);
|
);
|
||||||
|
|||||||
@@ -20,7 +20,7 @@ use std::sync::Arc;
|
|||||||
use arrow::array::{Array, ArrayRef, StringArray};
|
use arrow::array::{Array, ArrayRef, StringArray};
|
||||||
use arrow::compute;
|
use arrow::compute;
|
||||||
use arrow::compute::kernels::comparison;
|
use arrow::compute::kernels::comparison;
|
||||||
use arrow::datatypes::{DataType as ArrowDataType, Int32Type, TimeUnit};
|
use arrow::datatypes::{DataType as ArrowDataType, Int64Type, TimeUnit};
|
||||||
use arrow_array::DictionaryArray;
|
use arrow_array::DictionaryArray;
|
||||||
use arrow_schema::IntervalUnit;
|
use arrow_schema::IntervalUnit;
|
||||||
use datafusion_common::ScalarValue;
|
use datafusion_common::ScalarValue;
|
||||||
@@ -348,11 +348,11 @@ impl Helper {
|
|||||||
ArrowDataType::Decimal128(_, _) => {
|
ArrowDataType::Decimal128(_, _) => {
|
||||||
Arc::new(Decimal128Vector::try_from_arrow_array(array)?)
|
Arc::new(Decimal128Vector::try_from_arrow_array(array)?)
|
||||||
}
|
}
|
||||||
ArrowDataType::Dictionary(key, value) if matches!(&**key, ArrowDataType::Int32) => {
|
ArrowDataType::Dictionary(key, value) if matches!(&**key, ArrowDataType::Int64) => {
|
||||||
let array = array
|
let array = array
|
||||||
.as_ref()
|
.as_ref()
|
||||||
.as_any()
|
.as_any()
|
||||||
.downcast_ref::<DictionaryArray<Int32Type>>()
|
.downcast_ref::<DictionaryArray<Int64Type>>()
|
||||||
.unwrap(); // Safety: the type is guarded by match arm condition
|
.unwrap(); // Safety: the type is guarded by match arm condition
|
||||||
Arc::new(DictionaryVector::new(
|
Arc::new(DictionaryVector::new(
|
||||||
array.clone(),
|
array.clone(),
|
||||||
|
|||||||
@@ -710,8 +710,8 @@ pub enum Error {
|
|||||||
error: std::io::Error,
|
error: std::io::Error,
|
||||||
},
|
},
|
||||||
|
|
||||||
#[snafu(display("Failed to filter record batch"))]
|
#[snafu(display("Record batch error"))]
|
||||||
FilterRecordBatch {
|
RecordBatch {
|
||||||
source: common_recordbatch::error::Error,
|
source: common_recordbatch::error::Error,
|
||||||
#[snafu(implicit)]
|
#[snafu(implicit)]
|
||||||
location: Location,
|
location: Location,
|
||||||
@@ -1032,6 +1032,20 @@ pub enum Error {
|
|||||||
#[snafu(implicit)]
|
#[snafu(implicit)]
|
||||||
location: Location,
|
location: Location,
|
||||||
},
|
},
|
||||||
|
|
||||||
|
#[snafu(display("Failed to scan series"))]
|
||||||
|
ScanSeries {
|
||||||
|
#[snafu(implicit)]
|
||||||
|
location: Location,
|
||||||
|
source: Arc<Error>,
|
||||||
|
},
|
||||||
|
|
||||||
|
#[snafu(display("Partition {} scan multiple times", partition))]
|
||||||
|
ScanMultiTimes {
|
||||||
|
partition: usize,
|
||||||
|
#[snafu(implicit)]
|
||||||
|
location: Location,
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
pub type Result<T, E = Error> = std::result::Result<T, E>;
|
pub type Result<T, E = Error> = std::result::Result<T, E>;
|
||||||
@@ -1154,7 +1168,7 @@ impl ErrorExt for Error {
|
|||||||
|
|
||||||
External { source, .. } => source.status_code(),
|
External { source, .. } => source.status_code(),
|
||||||
|
|
||||||
FilterRecordBatch { source, .. } => source.status_code(),
|
RecordBatch { source, .. } => source.status_code(),
|
||||||
|
|
||||||
Download { .. } | Upload { .. } => StatusCode::StorageUnavailable,
|
Download { .. } | Upload { .. } => StatusCode::StorageUnavailable,
|
||||||
ChecksumMismatch { .. } => StatusCode::Unexpected,
|
ChecksumMismatch { .. } => StatusCode::Unexpected,
|
||||||
@@ -1183,7 +1197,12 @@ impl ErrorExt for Error {
|
|||||||
ManualCompactionOverride {} => StatusCode::Cancelled,
|
ManualCompactionOverride {} => StatusCode::Cancelled,
|
||||||
|
|
||||||
IncompatibleWalProviderChange { .. } => StatusCode::InvalidArguments,
|
IncompatibleWalProviderChange { .. } => StatusCode::InvalidArguments,
|
||||||
|
|
||||||
ConvertDataType { .. } => StatusCode::Internal,
|
ConvertDataType { .. } => StatusCode::Internal,
|
||||||
|
|
||||||
|
ScanSeries { source, .. } => source.status_code(),
|
||||||
|
|
||||||
|
ScanMultiTimes { .. } => StatusCode::InvalidArguments,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -24,6 +24,7 @@ pub(crate) mod range;
|
|||||||
pub(crate) mod scan_region;
|
pub(crate) mod scan_region;
|
||||||
pub(crate) mod scan_util;
|
pub(crate) mod scan_util;
|
||||||
pub(crate) mod seq_scan;
|
pub(crate) mod seq_scan;
|
||||||
|
pub(crate) mod series_scan;
|
||||||
pub(crate) mod unordered_scan;
|
pub(crate) mod unordered_scan;
|
||||||
|
|
||||||
use std::collections::{HashMap, HashSet};
|
use std::collections::{HashMap, HashSet};
|
||||||
|
|||||||
@@ -21,7 +21,7 @@ use datatypes::arrow::array::BooleanArray;
|
|||||||
use datatypes::arrow::buffer::BooleanBuffer;
|
use datatypes::arrow::buffer::BooleanBuffer;
|
||||||
use snafu::ResultExt;
|
use snafu::ResultExt;
|
||||||
|
|
||||||
use crate::error::{FilterRecordBatchSnafu, Result};
|
use crate::error::{RecordBatchSnafu, Result};
|
||||||
use crate::memtable::BoxedBatchIterator;
|
use crate::memtable::BoxedBatchIterator;
|
||||||
use crate::read::last_row::RowGroupLastRowCachedReader;
|
use crate::read::last_row::RowGroupLastRowCachedReader;
|
||||||
use crate::read::{Batch, BatchReader};
|
use crate::read::{Batch, BatchReader};
|
||||||
@@ -201,7 +201,7 @@ impl PruneTimeIterator {
|
|||||||
for filter in filters.iter() {
|
for filter in filters.iter() {
|
||||||
let result = filter
|
let result = filter
|
||||||
.evaluate_vector(batch.timestamps())
|
.evaluate_vector(batch.timestamps())
|
||||||
.context(FilterRecordBatchSnafu)?;
|
.context(RecordBatchSnafu)?;
|
||||||
mask = mask.bitand(&result);
|
mask = mask.bitand(&result);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -46,6 +46,7 @@ use crate::read::compat::{self, CompatBatch};
|
|||||||
use crate::read::projection::ProjectionMapper;
|
use crate::read::projection::ProjectionMapper;
|
||||||
use crate::read::range::{FileRangeBuilder, MemRangeBuilder, RangeMeta, RowGroupIndex};
|
use crate::read::range::{FileRangeBuilder, MemRangeBuilder, RangeMeta, RowGroupIndex};
|
||||||
use crate::read::seq_scan::SeqScan;
|
use crate::read::seq_scan::SeqScan;
|
||||||
|
use crate::read::series_scan::SeriesScan;
|
||||||
use crate::read::unordered_scan::UnorderedScan;
|
use crate::read::unordered_scan::UnorderedScan;
|
||||||
use crate::read::{Batch, Source};
|
use crate::read::{Batch, Source};
|
||||||
use crate::region::options::MergeMode;
|
use crate::region::options::MergeMode;
|
||||||
@@ -66,6 +67,8 @@ pub(crate) enum Scanner {
|
|||||||
Seq(SeqScan),
|
Seq(SeqScan),
|
||||||
/// Unordered scan.
|
/// Unordered scan.
|
||||||
Unordered(UnorderedScan),
|
Unordered(UnorderedScan),
|
||||||
|
/// Per-series scan.
|
||||||
|
Series(SeriesScan),
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Scanner {
|
impl Scanner {
|
||||||
@@ -75,6 +78,7 @@ impl Scanner {
|
|||||||
match self {
|
match self {
|
||||||
Scanner::Seq(seq_scan) => seq_scan.build_stream(),
|
Scanner::Seq(seq_scan) => seq_scan.build_stream(),
|
||||||
Scanner::Unordered(unordered_scan) => unordered_scan.build_stream().await,
|
Scanner::Unordered(unordered_scan) => unordered_scan.build_stream().await,
|
||||||
|
Scanner::Series(series_scan) => series_scan.build_stream().await,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -86,6 +90,7 @@ impl Scanner {
|
|||||||
match self {
|
match self {
|
||||||
Scanner::Seq(seq_scan) => seq_scan.input().num_files(),
|
Scanner::Seq(seq_scan) => seq_scan.input().num_files(),
|
||||||
Scanner::Unordered(unordered_scan) => unordered_scan.input().num_files(),
|
Scanner::Unordered(unordered_scan) => unordered_scan.input().num_files(),
|
||||||
|
Scanner::Series(series_scan) => series_scan.input().num_files(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -94,6 +99,7 @@ impl Scanner {
|
|||||||
match self {
|
match self {
|
||||||
Scanner::Seq(seq_scan) => seq_scan.input().num_memtables(),
|
Scanner::Seq(seq_scan) => seq_scan.input().num_memtables(),
|
||||||
Scanner::Unordered(unordered_scan) => unordered_scan.input().num_memtables(),
|
Scanner::Unordered(unordered_scan) => unordered_scan.input().num_memtables(),
|
||||||
|
Scanner::Series(series_scan) => series_scan.input().num_memtables(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -102,6 +108,7 @@ impl Scanner {
|
|||||||
match self {
|
match self {
|
||||||
Scanner::Seq(seq_scan) => seq_scan.input().file_ids(),
|
Scanner::Seq(seq_scan) => seq_scan.input().file_ids(),
|
||||||
Scanner::Unordered(unordered_scan) => unordered_scan.input().file_ids(),
|
Scanner::Unordered(unordered_scan) => unordered_scan.input().file_ids(),
|
||||||
|
Scanner::Series(series_scan) => series_scan.input().file_ids(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -113,6 +120,7 @@ impl Scanner {
|
|||||||
match self {
|
match self {
|
||||||
Scanner::Seq(seq_scan) => seq_scan.prepare(request).unwrap(),
|
Scanner::Seq(seq_scan) => seq_scan.prepare(request).unwrap(),
|
||||||
Scanner::Unordered(unordered_scan) => unordered_scan.prepare(request).unwrap(),
|
Scanner::Unordered(unordered_scan) => unordered_scan.prepare(request).unwrap(),
|
||||||
|
Scanner::Series(series_scan) => series_scan.prepare(request).unwrap(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -248,7 +256,9 @@ impl ScanRegion {
|
|||||||
|
|
||||||
/// Returns a [Scanner] to scan the region.
|
/// Returns a [Scanner] to scan the region.
|
||||||
pub(crate) fn scanner(self) -> Result<Scanner> {
|
pub(crate) fn scanner(self) -> Result<Scanner> {
|
||||||
if self.use_unordered_scan() {
|
if self.use_series_scan() {
|
||||||
|
self.series_scan().map(Scanner::Series)
|
||||||
|
} else if self.use_unordered_scan() {
|
||||||
// If table is append only and there is no series row selector, we use unordered scan in query.
|
// If table is append only and there is no series row selector, we use unordered scan in query.
|
||||||
// We still use seq scan in compaction.
|
// We still use seq scan in compaction.
|
||||||
self.unordered_scan().map(Scanner::Unordered)
|
self.unordered_scan().map(Scanner::Unordered)
|
||||||
@@ -260,7 +270,9 @@ impl ScanRegion {
|
|||||||
/// Returns a [RegionScanner] to scan the region.
|
/// Returns a [RegionScanner] to scan the region.
|
||||||
#[tracing::instrument(level = tracing::Level::DEBUG, skip_all)]
|
#[tracing::instrument(level = tracing::Level::DEBUG, skip_all)]
|
||||||
pub(crate) fn region_scanner(self) -> Result<RegionScannerRef> {
|
pub(crate) fn region_scanner(self) -> Result<RegionScannerRef> {
|
||||||
if self.use_unordered_scan() {
|
if self.use_series_scan() {
|
||||||
|
self.series_scan().map(|scanner| Box::new(scanner) as _)
|
||||||
|
} else if self.use_unordered_scan() {
|
||||||
self.unordered_scan().map(|scanner| Box::new(scanner) as _)
|
self.unordered_scan().map(|scanner| Box::new(scanner) as _)
|
||||||
} else {
|
} else {
|
||||||
self.seq_scan().map(|scanner| Box::new(scanner) as _)
|
self.seq_scan().map(|scanner| Box::new(scanner) as _)
|
||||||
@@ -279,6 +291,12 @@ impl ScanRegion {
|
|||||||
Ok(UnorderedScan::new(input))
|
Ok(UnorderedScan::new(input))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Scans by series.
|
||||||
|
pub(crate) fn series_scan(self) -> Result<SeriesScan> {
|
||||||
|
let input = self.scan_input(true)?;
|
||||||
|
Ok(SeriesScan::new(input))
|
||||||
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
pub(crate) fn scan_without_filter_deleted(self) -> Result<SeqScan> {
|
pub(crate) fn scan_without_filter_deleted(self) -> Result<SeqScan> {
|
||||||
let input = self.scan_input(false)?;
|
let input = self.scan_input(false)?;
|
||||||
@@ -299,6 +317,11 @@ impl ScanRegion {
|
|||||||
|| self.request.distribution == Some(TimeSeriesDistribution::TimeWindowed))
|
|| self.request.distribution == Some(TimeSeriesDistribution::TimeWindowed))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Returns true if the region can use series scan for current request.
|
||||||
|
fn use_series_scan(&self) -> bool {
|
||||||
|
self.request.distribution == Some(TimeSeriesDistribution::PerSeries)
|
||||||
|
}
|
||||||
|
|
||||||
/// Creates a scan input.
|
/// Creates a scan input.
|
||||||
fn scan_input(mut self, filter_deleted: bool) -> Result<ScanInput> {
|
fn scan_input(mut self, filter_deleted: bool) -> Result<ScanInput> {
|
||||||
let time_range = self.build_time_range_predicate();
|
let time_range = self.build_time_range_predicate();
|
||||||
|
|||||||
@@ -92,6 +92,8 @@ struct ScanMetricsSet {
|
|||||||
|
|
||||||
/// Elapsed time before the first poll operation.
|
/// Elapsed time before the first poll operation.
|
||||||
first_poll: Duration,
|
first_poll: Duration,
|
||||||
|
/// Number of send timeout in SeriesScan.
|
||||||
|
num_series_send_timeout: usize,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl fmt::Debug for ScanMetricsSet {
|
impl fmt::Debug for ScanMetricsSet {
|
||||||
@@ -122,6 +124,7 @@ impl fmt::Debug for ScanMetricsSet {
|
|||||||
num_sst_batches,
|
num_sst_batches,
|
||||||
num_sst_rows,
|
num_sst_rows,
|
||||||
first_poll,
|
first_poll,
|
||||||
|
num_series_send_timeout,
|
||||||
} = self;
|
} = self;
|
||||||
|
|
||||||
write!(
|
write!(
|
||||||
@@ -150,7 +153,8 @@ impl fmt::Debug for ScanMetricsSet {
|
|||||||
num_sst_record_batches={num_sst_record_batches}, \
|
num_sst_record_batches={num_sst_record_batches}, \
|
||||||
num_sst_batches={num_sst_batches}, \
|
num_sst_batches={num_sst_batches}, \
|
||||||
num_sst_rows={num_sst_rows}, \
|
num_sst_rows={num_sst_rows}, \
|
||||||
first_poll={first_poll:?}}}"
|
first_poll={first_poll:?}, \
|
||||||
|
num_series_send_timeout={num_series_send_timeout}}}"
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -439,6 +443,12 @@ impl PartitionMetrics {
|
|||||||
pub(crate) fn on_finish(&self) {
|
pub(crate) fn on_finish(&self) {
|
||||||
self.0.on_finish();
|
self.0.on_finish();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Sets the `num_series_send_timeout`.
|
||||||
|
pub(crate) fn set_num_series_send_timeout(&self, num_timeout: usize) {
|
||||||
|
let mut metrics = self.0.metrics.lock().unwrap();
|
||||||
|
metrics.num_series_send_timeout = num_timeout;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl fmt::Debug for PartitionMetrics {
|
impl fmt::Debug for PartitionMetrics {
|
||||||
|
|||||||
@@ -30,7 +30,7 @@ use datatypes::schema::SchemaRef;
|
|||||||
use snafu::ResultExt;
|
use snafu::ResultExt;
|
||||||
use store_api::metadata::RegionMetadataRef;
|
use store_api::metadata::RegionMetadataRef;
|
||||||
use store_api::region_engine::{PartitionRange, PrepareRequest, RegionScanner, ScannerProperties};
|
use store_api::region_engine::{PartitionRange, PrepareRequest, RegionScanner, ScannerProperties};
|
||||||
use store_api::storage::{TimeSeriesDistribution, TimeSeriesRowSelector};
|
use store_api::storage::TimeSeriesRowSelector;
|
||||||
use tokio::sync::Semaphore;
|
use tokio::sync::Semaphore;
|
||||||
|
|
||||||
use crate::error::{PartitionOutOfRangeSnafu, Result};
|
use crate::error::{PartitionOutOfRangeSnafu, Result};
|
||||||
@@ -149,7 +149,7 @@ impl SeqScan {
|
|||||||
/// Builds a reader to read sources. If `semaphore` is provided, reads sources in parallel
|
/// Builds a reader to read sources. If `semaphore` is provided, reads sources in parallel
|
||||||
/// if possible.
|
/// if possible.
|
||||||
#[tracing::instrument(level = tracing::Level::DEBUG, skip_all)]
|
#[tracing::instrument(level = tracing::Level::DEBUG, skip_all)]
|
||||||
async fn build_reader_from_sources(
|
pub(crate) async fn build_reader_from_sources(
|
||||||
stream_ctx: &StreamContext,
|
stream_ctx: &StreamContext,
|
||||||
mut sources: Vec<Source>,
|
mut sources: Vec<Source>,
|
||||||
semaphore: Option<Arc<Semaphore>>,
|
semaphore: Option<Arc<Semaphore>>,
|
||||||
@@ -206,9 +206,13 @@ impl SeqScan {
|
|||||||
.build(),
|
.build(),
|
||||||
));
|
));
|
||||||
}
|
}
|
||||||
|
if self.properties.partitions[partition].is_empty() {
|
||||||
if self.stream_ctx.input.distribution == Some(TimeSeriesDistribution::PerSeries) {
|
return Ok(Box::pin(RecordBatchStreamWrapper::new(
|
||||||
return self.scan_partition_by_series(metrics_set, partition);
|
self.stream_ctx.input.mapper.output_schema(),
|
||||||
|
common_recordbatch::EmptyRecordBatchStream::new(
|
||||||
|
self.stream_ctx.input.mapper.output_schema(),
|
||||||
|
),
|
||||||
|
)));
|
||||||
}
|
}
|
||||||
|
|
||||||
let stream_ctx = self.stream_ctx.clone();
|
let stream_ctx = self.stream_ctx.clone();
|
||||||
@@ -237,14 +241,14 @@ impl SeqScan {
|
|||||||
&mut sources,
|
&mut sources,
|
||||||
);
|
);
|
||||||
|
|
||||||
|
let mut metrics = ScannerMetrics::default();
|
||||||
|
let mut fetch_start = Instant::now();
|
||||||
let mut reader =
|
let mut reader =
|
||||||
Self::build_reader_from_sources(&stream_ctx, sources, semaphore.clone())
|
Self::build_reader_from_sources(&stream_ctx, sources, semaphore.clone())
|
||||||
.await
|
.await
|
||||||
.map_err(BoxedError::new)
|
.map_err(BoxedError::new)
|
||||||
.context(ExternalSnafu)?;
|
.context(ExternalSnafu)?;
|
||||||
let cache = &stream_ctx.input.cache_strategy;
|
let cache = &stream_ctx.input.cache_strategy;
|
||||||
let mut metrics = ScannerMetrics::default();
|
|
||||||
let mut fetch_start = Instant::now();
|
|
||||||
#[cfg(debug_assertions)]
|
#[cfg(debug_assertions)]
|
||||||
let mut checker = crate::read::BatchChecker::default()
|
let mut checker = crate::read::BatchChecker::default()
|
||||||
.with_start(Some(part_range.start))
|
.with_start(Some(part_range.start))
|
||||||
@@ -307,97 +311,6 @@ impl SeqScan {
|
|||||||
Ok(stream)
|
Ok(stream)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Scans all ranges in the given partition and merge by time series.
|
|
||||||
/// Otherwise the returned stream might not contains any data.
|
|
||||||
fn scan_partition_by_series(
|
|
||||||
&self,
|
|
||||||
metrics_set: &ExecutionPlanMetricsSet,
|
|
||||||
partition: usize,
|
|
||||||
) -> Result<SendableRecordBatchStream, BoxedError> {
|
|
||||||
let stream_ctx = self.stream_ctx.clone();
|
|
||||||
let semaphore = self.new_semaphore();
|
|
||||||
let partition_ranges = self.properties.partitions[partition].clone();
|
|
||||||
let distinguish_range = self.properties.distinguish_partition_range;
|
|
||||||
let part_metrics = self.new_partition_metrics(metrics_set, partition);
|
|
||||||
debug_assert!(!self.compaction);
|
|
||||||
|
|
||||||
let stream = try_stream! {
|
|
||||||
part_metrics.on_first_poll();
|
|
||||||
|
|
||||||
let range_builder_list = Arc::new(RangeBuilderList::new(
|
|
||||||
stream_ctx.input.num_memtables(),
|
|
||||||
stream_ctx.input.num_files(),
|
|
||||||
));
|
|
||||||
// Scans all parts.
|
|
||||||
let mut sources = Vec::with_capacity(partition_ranges.len());
|
|
||||||
for part_range in partition_ranges {
|
|
||||||
build_sources(
|
|
||||||
&stream_ctx,
|
|
||||||
&part_range,
|
|
||||||
false,
|
|
||||||
&part_metrics,
|
|
||||||
range_builder_list.clone(),
|
|
||||||
&mut sources,
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
// Builds a reader that merge sources from all parts.
|
|
||||||
let mut reader =
|
|
||||||
Self::build_reader_from_sources(&stream_ctx, sources, semaphore.clone())
|
|
||||||
.await
|
|
||||||
.map_err(BoxedError::new)
|
|
||||||
.context(ExternalSnafu)?;
|
|
||||||
let cache = &stream_ctx.input.cache_strategy;
|
|
||||||
let mut metrics = ScannerMetrics::default();
|
|
||||||
let mut fetch_start = Instant::now();
|
|
||||||
|
|
||||||
while let Some(batch) = reader
|
|
||||||
.next_batch()
|
|
||||||
.await
|
|
||||||
.map_err(BoxedError::new)
|
|
||||||
.context(ExternalSnafu)?
|
|
||||||
{
|
|
||||||
metrics.scan_cost += fetch_start.elapsed();
|
|
||||||
metrics.num_batches += 1;
|
|
||||||
metrics.num_rows += batch.num_rows();
|
|
||||||
|
|
||||||
debug_assert!(!batch.is_empty());
|
|
||||||
if batch.is_empty() {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
let convert_start = Instant::now();
|
|
||||||
let record_batch = stream_ctx.input.mapper.convert(&batch, cache)?;
|
|
||||||
metrics.convert_cost += convert_start.elapsed();
|
|
||||||
let yield_start = Instant::now();
|
|
||||||
yield record_batch;
|
|
||||||
metrics.yield_cost += yield_start.elapsed();
|
|
||||||
|
|
||||||
fetch_start = Instant::now();
|
|
||||||
}
|
|
||||||
|
|
||||||
// Yields an empty part to indicate this range is terminated.
|
|
||||||
// The query engine can use this to optimize some queries.
|
|
||||||
if distinguish_range {
|
|
||||||
let yield_start = Instant::now();
|
|
||||||
yield stream_ctx.input.mapper.empty_record_batch();
|
|
||||||
metrics.yield_cost += yield_start.elapsed();
|
|
||||||
}
|
|
||||||
|
|
||||||
metrics.scan_cost += fetch_start.elapsed();
|
|
||||||
part_metrics.merge_metrics(&metrics);
|
|
||||||
|
|
||||||
part_metrics.on_finish();
|
|
||||||
};
|
|
||||||
|
|
||||||
let stream = Box::pin(RecordBatchStreamWrapper::new(
|
|
||||||
self.stream_ctx.input.mapper.output_schema(),
|
|
||||||
Box::pin(stream),
|
|
||||||
));
|
|
||||||
|
|
||||||
Ok(stream)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn new_semaphore(&self) -> Option<Arc<Semaphore>> {
|
fn new_semaphore(&self) -> Option<Arc<Semaphore>> {
|
||||||
if self.properties.target_partitions() > self.properties.num_partitions() {
|
if self.properties.target_partitions() > self.properties.num_partitions() {
|
||||||
// We can use additional tasks to read the data if we have more target partitions than actual partitions.
|
// We can use additional tasks to read the data if we have more target partitions than actual partitions.
|
||||||
@@ -498,7 +411,7 @@ impl fmt::Debug for SeqScan {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Builds sources for the partition range and push them to the `sources` vector.
|
/// Builds sources for the partition range and push them to the `sources` vector.
|
||||||
fn build_sources(
|
pub(crate) fn build_sources(
|
||||||
stream_ctx: &Arc<StreamContext>,
|
stream_ctx: &Arc<StreamContext>,
|
||||||
part_range: &PartitionRange,
|
part_range: &PartitionRange,
|
||||||
compaction: bool,
|
compaction: bool,
|
||||||
@@ -509,8 +422,8 @@ fn build_sources(
|
|||||||
// Gets range meta.
|
// Gets range meta.
|
||||||
let range_meta = &stream_ctx.ranges[part_range.identifier];
|
let range_meta = &stream_ctx.ranges[part_range.identifier];
|
||||||
#[cfg(debug_assertions)]
|
#[cfg(debug_assertions)]
|
||||||
if compaction || stream_ctx.input.distribution == Some(TimeSeriesDistribution::PerSeries) {
|
if compaction {
|
||||||
// Compaction or per series distribution expects input sources are not been split.
|
// Compaction expects input sources are not been split.
|
||||||
debug_assert_eq!(range_meta.indices.len(), range_meta.row_group_indices.len());
|
debug_assert_eq!(range_meta.indices.len(), range_meta.row_group_indices.len());
|
||||||
for (i, row_group_idx) in range_meta.row_group_indices.iter().enumerate() {
|
for (i, row_group_idx) in range_meta.row_group_indices.iter().enumerate() {
|
||||||
// It should scan all row groups.
|
// It should scan all row groups.
|
||||||
|
|||||||
547
src/mito2/src/read/series_scan.rs
Normal file
547
src/mito2/src/read/series_scan.rs
Normal file
@@ -0,0 +1,547 @@
|
|||||||
|
// Copyright 2023 Greptime Team
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
//! Per-series scan implementation.
|
||||||
|
|
||||||
|
use std::fmt;
|
||||||
|
use std::sync::{Arc, Mutex};
|
||||||
|
use std::time::{Duration, Instant};
|
||||||
|
|
||||||
|
use async_stream::try_stream;
|
||||||
|
use common_error::ext::BoxedError;
|
||||||
|
use common_recordbatch::error::ExternalSnafu;
|
||||||
|
use common_recordbatch::util::ChainedRecordBatchStream;
|
||||||
|
use common_recordbatch::{RecordBatch, RecordBatchStreamWrapper, SendableRecordBatchStream};
|
||||||
|
use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
|
||||||
|
use datafusion::physical_plan::{DisplayAs, DisplayFormatType};
|
||||||
|
use datatypes::compute::concat_batches;
|
||||||
|
use datatypes::schema::SchemaRef;
|
||||||
|
use smallvec::{smallvec, SmallVec};
|
||||||
|
use snafu::{ensure, OptionExt, ResultExt};
|
||||||
|
use store_api::metadata::RegionMetadataRef;
|
||||||
|
use store_api::region_engine::{PartitionRange, PrepareRequest, RegionScanner, ScannerProperties};
|
||||||
|
use tokio::sync::mpsc::error::{SendTimeoutError, TrySendError};
|
||||||
|
use tokio::sync::mpsc::{self, Receiver, Sender};
|
||||||
|
use tokio::sync::Semaphore;
|
||||||
|
|
||||||
|
use crate::error::{
|
||||||
|
ComputeArrowSnafu, Error, InvalidSenderSnafu, PartitionOutOfRangeSnafu, Result,
|
||||||
|
ScanMultiTimesSnafu, ScanSeriesSnafu,
|
||||||
|
};
|
||||||
|
use crate::read::range::RangeBuilderList;
|
||||||
|
use crate::read::scan_region::{ScanInput, StreamContext};
|
||||||
|
use crate::read::scan_util::{PartitionMetrics, PartitionMetricsList};
|
||||||
|
use crate::read::seq_scan::{build_sources, SeqScan};
|
||||||
|
use crate::read::{Batch, ScannerMetrics};
|
||||||
|
|
||||||
|
/// Timeout to send a batch to a sender.
|
||||||
|
const SEND_TIMEOUT: Duration = Duration::from_millis(10);
|
||||||
|
|
||||||
|
/// List of receivers.
|
||||||
|
type ReceiverList = Vec<Option<Receiver<Result<SeriesBatch>>>>;
|
||||||
|
|
||||||
|
/// Scans a region and returns sorted rows of a series in the same partition.
|
||||||
|
///
|
||||||
|
/// The output order is always order by `(primary key, time index)` inside every
|
||||||
|
/// partition.
|
||||||
|
/// Always returns the same series (primary key) to the same partition.
|
||||||
|
pub struct SeriesScan {
|
||||||
|
/// Properties of the scanner.
|
||||||
|
properties: ScannerProperties,
|
||||||
|
/// Context of streams.
|
||||||
|
stream_ctx: Arc<StreamContext>,
|
||||||
|
/// Receivers of each partition.
|
||||||
|
receivers: Mutex<ReceiverList>,
|
||||||
|
/// Metrics for each partition.
|
||||||
|
/// The scanner only sets in query and keeps it empty during compaction.
|
||||||
|
metrics_list: Arc<PartitionMetricsList>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl SeriesScan {
|
||||||
|
/// Creates a new [SeriesScan].
|
||||||
|
pub(crate) fn new(input: ScanInput) -> Self {
|
||||||
|
let mut properties = ScannerProperties::default()
|
||||||
|
.with_append_mode(input.append_mode)
|
||||||
|
.with_total_rows(input.total_rows());
|
||||||
|
let stream_ctx = Arc::new(StreamContext::seq_scan_ctx(input, false));
|
||||||
|
properties.partitions = vec![stream_ctx.partition_ranges()];
|
||||||
|
|
||||||
|
Self {
|
||||||
|
properties,
|
||||||
|
stream_ctx,
|
||||||
|
receivers: Mutex::new(Vec::new()),
|
||||||
|
metrics_list: Arc::new(PartitionMetricsList::default()),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn scan_partition_impl(
|
||||||
|
&self,
|
||||||
|
metrics_set: &ExecutionPlanMetricsSet,
|
||||||
|
partition: usize,
|
||||||
|
) -> Result<SendableRecordBatchStream, BoxedError> {
|
||||||
|
if partition >= self.properties.num_partitions() {
|
||||||
|
return Err(BoxedError::new(
|
||||||
|
PartitionOutOfRangeSnafu {
|
||||||
|
given: partition,
|
||||||
|
all: self.properties.num_partitions(),
|
||||||
|
}
|
||||||
|
.build(),
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
|
self.maybe_start_distributor(metrics_set, &self.metrics_list);
|
||||||
|
|
||||||
|
let part_metrics =
|
||||||
|
new_partition_metrics(&self.stream_ctx, metrics_set, partition, &self.metrics_list);
|
||||||
|
let mut receiver = self.take_receiver(partition).map_err(BoxedError::new)?;
|
||||||
|
let stream_ctx = self.stream_ctx.clone();
|
||||||
|
|
||||||
|
let stream = try_stream! {
|
||||||
|
part_metrics.on_first_poll();
|
||||||
|
|
||||||
|
let cache = &stream_ctx.input.cache_strategy;
|
||||||
|
let mut df_record_batches = Vec::new();
|
||||||
|
let mut fetch_start = Instant::now();
|
||||||
|
while let Some(result) = receiver.recv().await {
|
||||||
|
let mut metrics = ScannerMetrics::default();
|
||||||
|
let series = result.map_err(BoxedError::new).context(ExternalSnafu)?;
|
||||||
|
metrics.scan_cost += fetch_start.elapsed();
|
||||||
|
fetch_start = Instant::now();
|
||||||
|
|
||||||
|
let convert_start = Instant::now();
|
||||||
|
df_record_batches.reserve(series.batches.len());
|
||||||
|
for batch in series.batches {
|
||||||
|
metrics.num_batches += 1;
|
||||||
|
metrics.num_rows += batch.num_rows();
|
||||||
|
|
||||||
|
let record_batch = stream_ctx.input.mapper.convert(&batch, cache)?;
|
||||||
|
df_record_batches.push(record_batch.into_df_record_batch());
|
||||||
|
}
|
||||||
|
|
||||||
|
let output_schema = stream_ctx.input.mapper.output_schema();
|
||||||
|
let df_record_batch =
|
||||||
|
concat_batches(output_schema.arrow_schema(), &df_record_batches)
|
||||||
|
.context(ComputeArrowSnafu)
|
||||||
|
.map_err(BoxedError::new)
|
||||||
|
.context(ExternalSnafu)?;
|
||||||
|
df_record_batches.clear();
|
||||||
|
let record_batch =
|
||||||
|
RecordBatch::try_from_df_record_batch(output_schema, df_record_batch)?;
|
||||||
|
metrics.convert_cost += convert_start.elapsed();
|
||||||
|
|
||||||
|
let yield_start = Instant::now();
|
||||||
|
yield record_batch;
|
||||||
|
metrics.yield_cost += yield_start.elapsed();
|
||||||
|
|
||||||
|
part_metrics.merge_metrics(&metrics);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
let stream = Box::pin(RecordBatchStreamWrapper::new(
|
||||||
|
self.stream_ctx.input.mapper.output_schema(),
|
||||||
|
Box::pin(stream),
|
||||||
|
));
|
||||||
|
|
||||||
|
Ok(stream)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Takes the receiver for the partition.
|
||||||
|
fn take_receiver(&self, partition: usize) -> Result<Receiver<Result<SeriesBatch>>> {
|
||||||
|
let mut rx_list = self.receivers.lock().unwrap();
|
||||||
|
rx_list[partition]
|
||||||
|
.take()
|
||||||
|
.context(ScanMultiTimesSnafu { partition })
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Starts the distributor if the receiver list is empty.
|
||||||
|
fn maybe_start_distributor(
|
||||||
|
&self,
|
||||||
|
metrics_set: &ExecutionPlanMetricsSet,
|
||||||
|
metrics_list: &Arc<PartitionMetricsList>,
|
||||||
|
) {
|
||||||
|
let mut rx_list = self.receivers.lock().unwrap();
|
||||||
|
if !rx_list.is_empty() {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
let (senders, receivers) = new_channel_list(self.properties.num_partitions());
|
||||||
|
let mut distributor = SeriesDistributor {
|
||||||
|
stream_ctx: self.stream_ctx.clone(),
|
||||||
|
semaphore: Some(Arc::new(Semaphore::new(self.properties.num_partitions()))),
|
||||||
|
partitions: self.properties.partitions.clone(),
|
||||||
|
senders,
|
||||||
|
metrics_set: metrics_set.clone(),
|
||||||
|
metrics_list: metrics_list.clone(),
|
||||||
|
};
|
||||||
|
common_runtime::spawn_global(async move {
|
||||||
|
distributor.execute().await;
|
||||||
|
});
|
||||||
|
|
||||||
|
*rx_list = receivers;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Scans the region and returns a stream.
|
||||||
|
pub(crate) async fn build_stream(&self) -> Result<SendableRecordBatchStream, BoxedError> {
|
||||||
|
let part_num = self.properties.num_partitions();
|
||||||
|
let metrics_set = ExecutionPlanMetricsSet::default();
|
||||||
|
let streams = (0..part_num)
|
||||||
|
.map(|i| self.scan_partition(&metrics_set, i))
|
||||||
|
.collect::<Result<Vec<_>, BoxedError>>()?;
|
||||||
|
let chained_stream = ChainedRecordBatchStream::new(streams).map_err(BoxedError::new)?;
|
||||||
|
Ok(Box::pin(chained_stream))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn new_channel_list(num_partitions: usize) -> (SenderList, ReceiverList) {
|
||||||
|
let (senders, receivers): (Vec<_>, Vec<_>) = (0..num_partitions)
|
||||||
|
.map(|_| {
|
||||||
|
let (sender, receiver) = mpsc::channel(1);
|
||||||
|
(Some(sender), Some(receiver))
|
||||||
|
})
|
||||||
|
.unzip();
|
||||||
|
(SenderList::new(senders), receivers)
|
||||||
|
}
|
||||||
|
|
||||||
|
impl RegionScanner for SeriesScan {
|
||||||
|
fn properties(&self) -> &ScannerProperties {
|
||||||
|
&self.properties
|
||||||
|
}
|
||||||
|
|
||||||
|
fn schema(&self) -> SchemaRef {
|
||||||
|
self.stream_ctx.input.mapper.output_schema()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn metadata(&self) -> RegionMetadataRef {
|
||||||
|
self.stream_ctx.input.mapper.metadata().clone()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn scan_partition(
|
||||||
|
&self,
|
||||||
|
metrics_set: &ExecutionPlanMetricsSet,
|
||||||
|
partition: usize,
|
||||||
|
) -> Result<SendableRecordBatchStream, BoxedError> {
|
||||||
|
self.scan_partition_impl(metrics_set, partition)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn prepare(&mut self, request: PrepareRequest) -> Result<(), BoxedError> {
|
||||||
|
self.properties.prepare(request);
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn has_predicate(&self) -> bool {
|
||||||
|
let predicate = self.stream_ctx.input.predicate();
|
||||||
|
predicate.map(|p| !p.exprs().is_empty()).unwrap_or(false)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn set_logical_region(&mut self, logical_region: bool) {
|
||||||
|
self.properties.set_logical_region(logical_region);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl DisplayAs for SeriesScan {
|
||||||
|
fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
|
||||||
|
write!(
|
||||||
|
f,
|
||||||
|
"SeriesScan: region={}, ",
|
||||||
|
self.stream_ctx.input.mapper.metadata().region_id
|
||||||
|
)?;
|
||||||
|
match t {
|
||||||
|
DisplayFormatType::Default => self.stream_ctx.format_for_explain(false, f),
|
||||||
|
DisplayFormatType::Verbose => {
|
||||||
|
self.stream_ctx.format_for_explain(true, f)?;
|
||||||
|
self.metrics_list.format_verbose_metrics(f)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl fmt::Debug for SeriesScan {
|
||||||
|
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||||
|
f.debug_struct("SeriesScan")
|
||||||
|
.field("num_ranges", &self.stream_ctx.ranges.len())
|
||||||
|
.finish()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
impl SeriesScan {
|
||||||
|
/// Returns the input.
|
||||||
|
pub(crate) fn input(&self) -> &ScanInput {
|
||||||
|
&self.stream_ctx.input
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// The distributor scans series and distributes them to different partitions.
|
||||||
|
struct SeriesDistributor {
|
||||||
|
/// Context for the scan stream.
|
||||||
|
stream_ctx: Arc<StreamContext>,
|
||||||
|
/// Optional semaphore for limiting the number of concurrent scans.
|
||||||
|
semaphore: Option<Arc<Semaphore>>,
|
||||||
|
/// Partition ranges to scan.
|
||||||
|
partitions: Vec<Vec<PartitionRange>>,
|
||||||
|
/// Senders of all partitions.
|
||||||
|
senders: SenderList,
|
||||||
|
/// Metrics set to report.
|
||||||
|
/// The distributor report the metrics as an additional partition.
|
||||||
|
/// This may double the scan cost of the [SeriesScan] metrics. We can
|
||||||
|
/// get per-partition metrics in verbose mode to see the metrics of the
|
||||||
|
/// distributor.
|
||||||
|
metrics_set: ExecutionPlanMetricsSet,
|
||||||
|
metrics_list: Arc<PartitionMetricsList>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl SeriesDistributor {
|
||||||
|
/// Executes the distributor.
|
||||||
|
async fn execute(&mut self) {
|
||||||
|
if let Err(e) = self.scan_partitions().await {
|
||||||
|
self.senders.send_error(e).await;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Scans all parts.
|
||||||
|
async fn scan_partitions(&mut self) -> Result<()> {
|
||||||
|
let part_metrics = new_partition_metrics(
|
||||||
|
&self.stream_ctx,
|
||||||
|
&self.metrics_set,
|
||||||
|
self.partitions.len(),
|
||||||
|
&self.metrics_list,
|
||||||
|
);
|
||||||
|
part_metrics.on_first_poll();
|
||||||
|
|
||||||
|
let range_builder_list = Arc::new(RangeBuilderList::new(
|
||||||
|
self.stream_ctx.input.num_memtables(),
|
||||||
|
self.stream_ctx.input.num_files(),
|
||||||
|
));
|
||||||
|
// Scans all parts.
|
||||||
|
let mut sources = Vec::with_capacity(self.partitions.len());
|
||||||
|
for partition in &self.partitions {
|
||||||
|
sources.reserve(partition.len());
|
||||||
|
for part_range in partition {
|
||||||
|
build_sources(
|
||||||
|
&self.stream_ctx,
|
||||||
|
part_range,
|
||||||
|
false,
|
||||||
|
&part_metrics,
|
||||||
|
range_builder_list.clone(),
|
||||||
|
&mut sources,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Builds a reader that merge sources from all parts.
|
||||||
|
let mut reader =
|
||||||
|
SeqScan::build_reader_from_sources(&self.stream_ctx, sources, self.semaphore.clone())
|
||||||
|
.await?;
|
||||||
|
let mut metrics = ScannerMetrics::default();
|
||||||
|
let mut fetch_start = Instant::now();
|
||||||
|
|
||||||
|
let mut current_series = SeriesBatch::default();
|
||||||
|
while let Some(batch) = reader.next_batch().await? {
|
||||||
|
metrics.scan_cost += fetch_start.elapsed();
|
||||||
|
fetch_start = Instant::now();
|
||||||
|
metrics.num_batches += 1;
|
||||||
|
metrics.num_rows += batch.num_rows();
|
||||||
|
|
||||||
|
debug_assert!(!batch.is_empty());
|
||||||
|
if batch.is_empty() {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
let Some(last_key) = current_series.current_key() else {
|
||||||
|
current_series.push(batch);
|
||||||
|
continue;
|
||||||
|
};
|
||||||
|
|
||||||
|
if last_key == batch.primary_key() {
|
||||||
|
current_series.push(batch);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
// We find a new series, send the current one.
|
||||||
|
let to_send = std::mem::replace(&mut current_series, SeriesBatch::single(batch));
|
||||||
|
let yield_start = Instant::now();
|
||||||
|
self.senders.send_batch(to_send).await?;
|
||||||
|
metrics.yield_cost += yield_start.elapsed();
|
||||||
|
}
|
||||||
|
|
||||||
|
if !current_series.is_empty() {
|
||||||
|
let yield_start = Instant::now();
|
||||||
|
self.senders.send_batch(current_series).await?;
|
||||||
|
metrics.yield_cost += yield_start.elapsed();
|
||||||
|
}
|
||||||
|
|
||||||
|
metrics.scan_cost += fetch_start.elapsed();
|
||||||
|
part_metrics.merge_metrics(&metrics);
|
||||||
|
part_metrics.set_num_series_send_timeout(self.senders.num_timeout);
|
||||||
|
|
||||||
|
part_metrics.on_finish();
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Batches of the same series.
|
||||||
|
#[derive(Default)]
|
||||||
|
struct SeriesBatch {
|
||||||
|
batches: SmallVec<[Batch; 4]>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl SeriesBatch {
|
||||||
|
/// Creates a new [SeriesBatch] from a single [Batch].
|
||||||
|
fn single(batch: Batch) -> Self {
|
||||||
|
Self {
|
||||||
|
batches: smallvec![batch],
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn current_key(&self) -> Option<&[u8]> {
|
||||||
|
self.batches.first().map(|batch| batch.primary_key())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn push(&mut self, batch: Batch) {
|
||||||
|
self.batches.push(batch);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Returns true if there is no batch.
|
||||||
|
fn is_empty(&self) -> bool {
|
||||||
|
self.batches.is_empty()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// List of senders.
|
||||||
|
struct SenderList {
|
||||||
|
senders: Vec<Option<Sender<Result<SeriesBatch>>>>,
|
||||||
|
/// Number of None senders.
|
||||||
|
num_nones: usize,
|
||||||
|
/// Index of the current partition to send.
|
||||||
|
sender_idx: usize,
|
||||||
|
/// Number of timeout.
|
||||||
|
num_timeout: usize,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl SenderList {
|
||||||
|
fn new(senders: Vec<Option<Sender<Result<SeriesBatch>>>>) -> Self {
|
||||||
|
let num_nones = senders.iter().filter(|sender| sender.is_none()).count();
|
||||||
|
Self {
|
||||||
|
senders,
|
||||||
|
num_nones,
|
||||||
|
sender_idx: 0,
|
||||||
|
num_timeout: 0,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Finds a partition and tries to send the batch to the partition.
|
||||||
|
/// Returns None if it sends successfully.
|
||||||
|
fn try_send_batch(&mut self, mut batch: SeriesBatch) -> Result<Option<SeriesBatch>> {
|
||||||
|
for _ in 0..self.senders.len() {
|
||||||
|
ensure!(self.num_nones < self.senders.len(), InvalidSenderSnafu);
|
||||||
|
|
||||||
|
let sender_idx = self.fetch_add_sender_idx();
|
||||||
|
let Some(sender) = &self.senders[sender_idx] else {
|
||||||
|
continue;
|
||||||
|
};
|
||||||
|
|
||||||
|
match sender.try_send(Ok(batch)) {
|
||||||
|
Ok(()) => return Ok(None),
|
||||||
|
Err(TrySendError::Full(res)) => {
|
||||||
|
// Safety: we send Ok.
|
||||||
|
batch = res.unwrap();
|
||||||
|
}
|
||||||
|
Err(TrySendError::Closed(res)) => {
|
||||||
|
self.senders[sender_idx] = None;
|
||||||
|
self.num_nones += 1;
|
||||||
|
// Safety: we send Ok.
|
||||||
|
batch = res.unwrap();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(Some(batch))
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Finds a partition and sends the batch to the partition.
|
||||||
|
async fn send_batch(&mut self, mut batch: SeriesBatch) -> Result<()> {
|
||||||
|
// Sends the batch without blocking first.
|
||||||
|
match self.try_send_batch(batch)? {
|
||||||
|
Some(b) => {
|
||||||
|
// Unable to send batch to partition.
|
||||||
|
batch = b;
|
||||||
|
}
|
||||||
|
None => {
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
loop {
|
||||||
|
ensure!(self.num_nones < self.senders.len(), InvalidSenderSnafu);
|
||||||
|
|
||||||
|
let sender_idx = self.fetch_add_sender_idx();
|
||||||
|
let Some(sender) = &self.senders[sender_idx] else {
|
||||||
|
continue;
|
||||||
|
};
|
||||||
|
// Adds a timeout to avoid blocking indefinitely and sending
|
||||||
|
// the batch in a round-robin fashion when some partitions
|
||||||
|
// don't poll their inputs. This may happen if we have a
|
||||||
|
// node like sort merging. But it is rare when we are using SeriesScan.
|
||||||
|
match sender.send_timeout(Ok(batch), SEND_TIMEOUT).await {
|
||||||
|
Ok(()) => break,
|
||||||
|
Err(SendTimeoutError::Timeout(res)) => {
|
||||||
|
self.num_timeout += 1;
|
||||||
|
// Safety: we send Ok.
|
||||||
|
batch = res.unwrap();
|
||||||
|
}
|
||||||
|
Err(SendTimeoutError::Closed(res)) => {
|
||||||
|
self.senders[sender_idx] = None;
|
||||||
|
self.num_nones += 1;
|
||||||
|
// Safety: we send Ok.
|
||||||
|
batch = res.unwrap();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn send_error(&self, error: Error) {
|
||||||
|
let error = Arc::new(error);
|
||||||
|
for sender in self.senders.iter().flatten() {
|
||||||
|
let result = Err(error.clone()).context(ScanSeriesSnafu);
|
||||||
|
let _ = sender.send(result).await;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn fetch_add_sender_idx(&mut self) -> usize {
|
||||||
|
let sender_idx = self.sender_idx;
|
||||||
|
self.sender_idx = (self.sender_idx + 1) % self.senders.len();
|
||||||
|
sender_idx
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn new_partition_metrics(
|
||||||
|
stream_ctx: &StreamContext,
|
||||||
|
metrics_set: &ExecutionPlanMetricsSet,
|
||||||
|
partition: usize,
|
||||||
|
metrics_list: &PartitionMetricsList,
|
||||||
|
) -> PartitionMetrics {
|
||||||
|
let metrics = PartitionMetrics::new(
|
||||||
|
stream_ctx.input.mapper.metadata().region_id,
|
||||||
|
partition,
|
||||||
|
"SeriesScan",
|
||||||
|
stream_ctx.query_start,
|
||||||
|
metrics_set,
|
||||||
|
);
|
||||||
|
|
||||||
|
metrics_list.set(partition, metrics.clone());
|
||||||
|
metrics
|
||||||
|
}
|
||||||
@@ -27,7 +27,7 @@ use snafu::{OptionExt, ResultExt};
|
|||||||
use store_api::storage::TimeSeriesRowSelector;
|
use store_api::storage::TimeSeriesRowSelector;
|
||||||
|
|
||||||
use crate::error::{
|
use crate::error::{
|
||||||
DecodeStatsSnafu, FieldTypeMismatchSnafu, FilterRecordBatchSnafu, Result, StatsNotPresentSnafu,
|
DecodeStatsSnafu, FieldTypeMismatchSnafu, RecordBatchSnafu, Result, StatsNotPresentSnafu,
|
||||||
};
|
};
|
||||||
use crate::read::compat::CompatBatch;
|
use crate::read::compat::CompatBatch;
|
||||||
use crate::read::last_row::RowGroupLastRowCachedReader;
|
use crate::read::last_row::RowGroupLastRowCachedReader;
|
||||||
@@ -294,7 +294,7 @@ impl RangeBase {
|
|||||||
};
|
};
|
||||||
if filter
|
if filter
|
||||||
.evaluate_scalar(&pk_value)
|
.evaluate_scalar(&pk_value)
|
||||||
.context(FilterRecordBatchSnafu)?
|
.context(RecordBatchSnafu)?
|
||||||
{
|
{
|
||||||
continue;
|
continue;
|
||||||
} else {
|
} else {
|
||||||
@@ -311,11 +311,11 @@ impl RangeBase {
|
|||||||
let field_col = &input.fields()[field_index].data;
|
let field_col = &input.fields()[field_index].data;
|
||||||
filter
|
filter
|
||||||
.evaluate_vector(field_col)
|
.evaluate_vector(field_col)
|
||||||
.context(FilterRecordBatchSnafu)?
|
.context(RecordBatchSnafu)?
|
||||||
}
|
}
|
||||||
SemanticType::Timestamp => filter
|
SemanticType::Timestamp => filter
|
||||||
.evaluate_vector(input.timestamps())
|
.evaluate_vector(input.timestamps())
|
||||||
.context(FilterRecordBatchSnafu)?,
|
.context(RecordBatchSnafu)?,
|
||||||
};
|
};
|
||||||
|
|
||||||
mask = mask.bitand(&result);
|
mask = mask.bitand(&result);
|
||||||
|
|||||||
@@ -91,9 +91,9 @@ impl UserDefinedLogicalNodeCore for InstantManipulate {
|
|||||||
_exprs: Vec<Expr>,
|
_exprs: Vec<Expr>,
|
||||||
inputs: Vec<LogicalPlan>,
|
inputs: Vec<LogicalPlan>,
|
||||||
) -> DataFusionResult<Self> {
|
) -> DataFusionResult<Self> {
|
||||||
if inputs.is_empty() {
|
if inputs.len() != 1 {
|
||||||
return Err(DataFusionError::Internal(
|
return Err(DataFusionError::Internal(
|
||||||
"InstantManipulate should have at least one input".to_string(),
|
"InstantManipulate should have exact one input".to_string(),
|
||||||
));
|
));
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -354,6 +354,9 @@ impl Stream for InstantManipulateStream {
|
|||||||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
||||||
let poll = match ready!(self.input.poll_next_unpin(cx)) {
|
let poll = match ready!(self.input.poll_next_unpin(cx)) {
|
||||||
Some(Ok(batch)) => {
|
Some(Ok(batch)) => {
|
||||||
|
if batch.num_rows() == 0 {
|
||||||
|
return Poll::Pending;
|
||||||
|
}
|
||||||
let timer = std::time::Instant::now();
|
let timer = std::time::Instant::now();
|
||||||
self.num_series.add(1);
|
self.num_series.add(1);
|
||||||
let result = Ok(batch).and_then(|batch| self.manipulate(batch));
|
let result = Ok(batch).and_then(|batch| self.manipulate(batch));
|
||||||
|
|||||||
@@ -42,7 +42,7 @@ use greptime_proto::substrait_extension as pb;
|
|||||||
use prost::Message;
|
use prost::Message;
|
||||||
use snafu::ResultExt;
|
use snafu::ResultExt;
|
||||||
|
|
||||||
use crate::error::{DataFusionPlanningSnafu, DeserializeSnafu, Result};
|
use crate::error::{DeserializeSnafu, Result};
|
||||||
use crate::extension_plan::{Millisecond, METRIC_NUM_SERIES};
|
use crate::extension_plan::{Millisecond, METRIC_NUM_SERIES};
|
||||||
use crate::metrics::PROMQL_SERIES_COUNT;
|
use crate::metrics::PROMQL_SERIES_COUNT;
|
||||||
use crate::range_array::RangeArray;
|
use crate::range_array::RangeArray;
|
||||||
@@ -194,20 +194,26 @@ impl RangeManipulate {
|
|||||||
|
|
||||||
pub fn deserialize(bytes: &[u8]) -> Result<Self> {
|
pub fn deserialize(bytes: &[u8]) -> Result<Self> {
|
||||||
let pb_range_manipulate = pb::RangeManipulate::decode(bytes).context(DeserializeSnafu)?;
|
let pb_range_manipulate = pb::RangeManipulate::decode(bytes).context(DeserializeSnafu)?;
|
||||||
|
let empty_schema = Arc::new(DFSchema::empty());
|
||||||
let placeholder_plan = LogicalPlan::EmptyRelation(EmptyRelation {
|
let placeholder_plan = LogicalPlan::EmptyRelation(EmptyRelation {
|
||||||
produce_one_row: false,
|
produce_one_row: false,
|
||||||
schema: Arc::new(DFSchema::empty()),
|
schema: empty_schema.clone(),
|
||||||
});
|
});
|
||||||
Self::new(
|
|
||||||
pb_range_manipulate.start,
|
// Unlike `Self::new()`, this method doesn't check the input schema as it will fail
|
||||||
pb_range_manipulate.end,
|
// because the input schema is empty.
|
||||||
pb_range_manipulate.interval,
|
// But this is Ok since datafusion guarantees to call `with_exprs_and_inputs` for the
|
||||||
pb_range_manipulate.range,
|
// deserialized plan.
|
||||||
pb_range_manipulate.time_index,
|
Ok(Self {
|
||||||
pb_range_manipulate.tag_columns,
|
start: pb_range_manipulate.start,
|
||||||
placeholder_plan,
|
end: pb_range_manipulate.end,
|
||||||
)
|
interval: pb_range_manipulate.interval,
|
||||||
.context(DataFusionPlanningSnafu)
|
range: pb_range_manipulate.range,
|
||||||
|
time_index: pb_range_manipulate.time_index,
|
||||||
|
field_columns: pb_range_manipulate.tag_columns,
|
||||||
|
input: placeholder_plan,
|
||||||
|
output_schema: empty_schema,
|
||||||
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -270,14 +276,19 @@ impl UserDefinedLogicalNodeCore for RangeManipulate {
|
|||||||
fn with_exprs_and_inputs(
|
fn with_exprs_and_inputs(
|
||||||
&self,
|
&self,
|
||||||
_exprs: Vec<Expr>,
|
_exprs: Vec<Expr>,
|
||||||
inputs: Vec<LogicalPlan>,
|
mut inputs: Vec<LogicalPlan>,
|
||||||
) -> DataFusionResult<Self> {
|
) -> DataFusionResult<Self> {
|
||||||
if inputs.is_empty() {
|
if inputs.len() != 1 {
|
||||||
return Err(DataFusionError::Internal(
|
return Err(DataFusionError::Internal(
|
||||||
"RangeManipulate should have at least one input".to_string(),
|
"RangeManipulate should have at exact one input".to_string(),
|
||||||
));
|
));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
let input: LogicalPlan = inputs.pop().unwrap();
|
||||||
|
let input_schema = input.schema();
|
||||||
|
let output_schema =
|
||||||
|
Self::calculate_output_schema(input_schema, &self.time_index, &self.field_columns)?;
|
||||||
|
|
||||||
Ok(Self {
|
Ok(Self {
|
||||||
start: self.start,
|
start: self.start,
|
||||||
end: self.end,
|
end: self.end,
|
||||||
@@ -285,8 +296,8 @@ impl UserDefinedLogicalNodeCore for RangeManipulate {
|
|||||||
range: self.range,
|
range: self.range,
|
||||||
time_index: self.time_index.clone(),
|
time_index: self.time_index.clone(),
|
||||||
field_columns: self.field_columns.clone(),
|
field_columns: self.field_columns.clone(),
|
||||||
input: inputs.into_iter().next().unwrap(),
|
input,
|
||||||
output_schema: self.output_schema.clone(),
|
output_schema,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -106,6 +106,10 @@ impl SeriesDivide {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn tags(&self) -> &[String] {
|
||||||
|
&self.tag_columns
|
||||||
|
}
|
||||||
|
|
||||||
pub fn serialize(&self) -> Vec<u8> {
|
pub fn serialize(&self) -> Vec<u8> {
|
||||||
pb::SeriesDivide {
|
pb::SeriesDivide {
|
||||||
tag_columns: self.tag_columns.clone(),
|
tag_columns: self.tag_columns.clone(),
|
||||||
@@ -315,7 +319,9 @@ impl Stream for SeriesDivideStream {
|
|||||||
let next_batch = ready!(self.as_mut().fetch_next_batch(cx)).transpose()?;
|
let next_batch = ready!(self.as_mut().fetch_next_batch(cx)).transpose()?;
|
||||||
let timer = std::time::Instant::now();
|
let timer = std::time::Instant::now();
|
||||||
if let Some(next_batch) = next_batch {
|
if let Some(next_batch) = next_batch {
|
||||||
self.buffer.push(next_batch);
|
if next_batch.num_rows() != 0 {
|
||||||
|
self.buffer.push(next_batch);
|
||||||
|
}
|
||||||
continue;
|
continue;
|
||||||
} else {
|
} else {
|
||||||
// input stream is ended
|
// input stream is ended
|
||||||
|
|||||||
@@ -40,7 +40,7 @@ pub use holt_winters::HoltWinters;
|
|||||||
pub use idelta::IDelta;
|
pub use idelta::IDelta;
|
||||||
pub use predict_linear::PredictLinear;
|
pub use predict_linear::PredictLinear;
|
||||||
pub use quantile::QuantileOverTime;
|
pub use quantile::QuantileOverTime;
|
||||||
pub use quantile_aggr::quantile_udaf;
|
pub use quantile_aggr::{quantile_udaf, QUANTILE_NAME};
|
||||||
pub use resets::Resets;
|
pub use resets::Resets;
|
||||||
pub use round::Round;
|
pub use round::Round;
|
||||||
|
|
||||||
|
|||||||
@@ -228,7 +228,7 @@ impl<const IS_COUNTER: bool, const IS_RATE: bool> ExtrapolatedRate<IS_COUNTER, I
|
|||||||
|
|
||||||
// delta
|
// delta
|
||||||
impl ExtrapolatedRate<false, false> {
|
impl ExtrapolatedRate<false, false> {
|
||||||
pub fn name() -> &'static str {
|
pub const fn name() -> &'static str {
|
||||||
"prom_delta"
|
"prom_delta"
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -239,7 +239,7 @@ impl ExtrapolatedRate<false, false> {
|
|||||||
|
|
||||||
// rate
|
// rate
|
||||||
impl ExtrapolatedRate<true, true> {
|
impl ExtrapolatedRate<true, true> {
|
||||||
pub fn name() -> &'static str {
|
pub const fn name() -> &'static str {
|
||||||
"prom_rate"
|
"prom_rate"
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -250,7 +250,7 @@ impl ExtrapolatedRate<true, true> {
|
|||||||
|
|
||||||
// increase
|
// increase
|
||||||
impl ExtrapolatedRate<true, false> {
|
impl ExtrapolatedRate<true, false> {
|
||||||
pub fn name() -> &'static str {
|
pub const fn name() -> &'static str {
|
||||||
"prom_increase"
|
"prom_increase"
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -27,7 +27,7 @@ use datatypes::arrow::datatypes::{DataType, Field, Float64Type};
|
|||||||
|
|
||||||
use crate::functions::quantile::quantile_impl;
|
use crate::functions::quantile::quantile_impl;
|
||||||
|
|
||||||
const QUANTILE_NAME: &str = "quantile";
|
pub const QUANTILE_NAME: &str = "quantile";
|
||||||
|
|
||||||
const VALUES_FIELD_NAME: &str = "values";
|
const VALUES_FIELD_NAME: &str = "values";
|
||||||
const DEFAULT_LIST_FIELD_NAME: &str = "item";
|
const DEFAULT_LIST_FIELD_NAME: &str = "item";
|
||||||
|
|||||||
@@ -55,12 +55,16 @@ impl Categorizer {
|
|||||||
LogicalPlan::Filter(filter) => Self::check_expr(&filter.predicate),
|
LogicalPlan::Filter(filter) => Self::check_expr(&filter.predicate),
|
||||||
LogicalPlan::Window(_) => Commutativity::Unimplemented,
|
LogicalPlan::Window(_) => Commutativity::Unimplemented,
|
||||||
LogicalPlan::Aggregate(aggr) => {
|
LogicalPlan::Aggregate(aggr) => {
|
||||||
if Self::check_partition(&aggr.group_expr, &partition_cols) {
|
if !Self::check_partition(&aggr.group_expr, &partition_cols) {
|
||||||
return Commutativity::Commutative;
|
return Commutativity::NonCommutative;
|
||||||
}
|
}
|
||||||
|
for expr in &aggr.aggr_expr {
|
||||||
// check all children exprs and uses the strictest level
|
let commutativity = Self::check_expr(expr);
|
||||||
Commutativity::Unimplemented
|
if !matches!(commutativity, Commutativity::Commutative) {
|
||||||
|
return commutativity;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Commutativity::Commutative
|
||||||
}
|
}
|
||||||
LogicalPlan::Sort(_) => {
|
LogicalPlan::Sort(_) => {
|
||||||
if partition_cols.is_empty() {
|
if partition_cols.is_empty() {
|
||||||
@@ -94,7 +98,7 @@ impl Categorizer {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
LogicalPlan::Extension(extension) => {
|
LogicalPlan::Extension(extension) => {
|
||||||
Self::check_extension_plan(extension.node.as_ref() as _)
|
Self::check_extension_plan(extension.node.as_ref() as _, &partition_cols)
|
||||||
}
|
}
|
||||||
LogicalPlan::Distinct(_) => {
|
LogicalPlan::Distinct(_) => {
|
||||||
if partition_cols.is_empty() {
|
if partition_cols.is_empty() {
|
||||||
@@ -116,13 +120,30 @@ impl Categorizer {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn check_extension_plan(plan: &dyn UserDefinedLogicalNode) -> Commutativity {
|
pub fn check_extension_plan(
|
||||||
|
plan: &dyn UserDefinedLogicalNode,
|
||||||
|
partition_cols: &[String],
|
||||||
|
) -> Commutativity {
|
||||||
match plan.name() {
|
match plan.name() {
|
||||||
name if name == EmptyMetric::name()
|
name if name == SeriesDivide::name() => {
|
||||||
|
let series_divide = plan.as_any().downcast_ref::<SeriesDivide>().unwrap();
|
||||||
|
let tags = series_divide.tags().iter().collect::<HashSet<_>>();
|
||||||
|
for partition_col in partition_cols {
|
||||||
|
if !tags.contains(partition_col) {
|
||||||
|
return Commutativity::NonCommutative;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Commutativity::Commutative
|
||||||
|
}
|
||||||
|
name if name == SeriesNormalize::name()
|
||||||
|| name == InstantManipulate::name()
|
|| name == InstantManipulate::name()
|
||||||
|| name == SeriesNormalize::name()
|
|| name == RangeManipulate::name() =>
|
||||||
|| name == RangeManipulate::name()
|
{
|
||||||
|| name == SeriesDivide::name()
|
// They should always follows Series Divide.
|
||||||
|
// Either all commutative or all non-commutative (which will be blocked by SeriesDivide).
|
||||||
|
Commutativity::Commutative
|
||||||
|
}
|
||||||
|
name if name == EmptyMetric::name()
|
||||||
|| name == MergeScanLogicalPlan::name()
|
|| name == MergeScanLogicalPlan::name()
|
||||||
|| name == MergeSortLogicalPlan::name() =>
|
|| name == MergeSortLogicalPlan::name() =>
|
||||||
{
|
{
|
||||||
@@ -148,8 +169,9 @@ impl Categorizer {
|
|||||||
| Expr::Negative(_)
|
| Expr::Negative(_)
|
||||||
| Expr::Between(_)
|
| Expr::Between(_)
|
||||||
| Expr::Exists(_)
|
| Expr::Exists(_)
|
||||||
| Expr::InList(_)
|
| Expr::InList(_) => Commutativity::Commutative,
|
||||||
| Expr::ScalarFunction(_) => Commutativity::Commutative,
|
Expr::ScalarFunction(_udf) => Commutativity::Commutative,
|
||||||
|
Expr::AggregateFunction(_udaf) => Commutativity::Commutative,
|
||||||
|
|
||||||
Expr::Like(_)
|
Expr::Like(_)
|
||||||
| Expr::SimilarTo(_)
|
| Expr::SimilarTo(_)
|
||||||
@@ -158,7 +180,6 @@ impl Categorizer {
|
|||||||
| Expr::Case(_)
|
| Expr::Case(_)
|
||||||
| Expr::Cast(_)
|
| Expr::Cast(_)
|
||||||
| Expr::TryCast(_)
|
| Expr::TryCast(_)
|
||||||
| Expr::AggregateFunction(_)
|
|
||||||
| Expr::WindowFunction(_)
|
| Expr::WindowFunction(_)
|
||||||
| Expr::InSubquery(_)
|
| Expr::InSubquery(_)
|
||||||
| Expr::ScalarSubquery(_)
|
| Expr::ScalarSubquery(_)
|
||||||
|
|||||||
@@ -62,21 +62,28 @@ impl ParallelizeScan {
|
|||||||
} else if let Some(region_scan_exec) =
|
} else if let Some(region_scan_exec) =
|
||||||
plan.as_any().downcast_ref::<RegionScanExec>()
|
plan.as_any().downcast_ref::<RegionScanExec>()
|
||||||
{
|
{
|
||||||
|
let expected_partition_num = config.execution.target_partitions;
|
||||||
if region_scan_exec.is_partition_set() {
|
if region_scan_exec.is_partition_set() {
|
||||||
return Ok(Transformed::no(plan));
|
return Ok(Transformed::no(plan));
|
||||||
}
|
}
|
||||||
|
|
||||||
// don't parallelize if we want per series distribution
|
|
||||||
if matches!(
|
if matches!(
|
||||||
region_scan_exec.distribution(),
|
region_scan_exec.distribution(),
|
||||||
Some(TimeSeriesDistribution::PerSeries)
|
Some(TimeSeriesDistribution::PerSeries)
|
||||||
) {
|
) {
|
||||||
return Ok(Transformed::no(plan));
|
let partition_range = region_scan_exec.get_partition_ranges();
|
||||||
|
// HACK: Allocate expected_partition_num empty partitions to indicate
|
||||||
|
// the expected partition number.
|
||||||
|
let mut new_partitions = vec![vec![]; expected_partition_num];
|
||||||
|
new_partitions[0] = partition_range;
|
||||||
|
let new_plan = region_scan_exec
|
||||||
|
.with_new_partitions(new_partitions, expected_partition_num)
|
||||||
|
.map_err(|e| DataFusionError::External(e.into_inner()))?;
|
||||||
|
return Ok(Transformed::yes(Arc::new(new_plan)));
|
||||||
}
|
}
|
||||||
|
|
||||||
let ranges = region_scan_exec.get_partition_ranges();
|
let ranges = region_scan_exec.get_partition_ranges();
|
||||||
let total_range_num = ranges.len();
|
let total_range_num = ranges.len();
|
||||||
let expected_partition_num = config.execution.target_partitions;
|
|
||||||
|
|
||||||
// assign ranges to each partition
|
// assign ranges to each partition
|
||||||
let mut partition_ranges =
|
let mut partition_ranges =
|
||||||
@@ -131,26 +138,18 @@ impl ParallelizeScan {
|
|||||||
) -> Vec<Vec<PartitionRange>> {
|
) -> Vec<Vec<PartitionRange>> {
|
||||||
if ranges.is_empty() {
|
if ranges.is_empty() {
|
||||||
// Returns a single partition with no range.
|
// Returns a single partition with no range.
|
||||||
return vec![vec![]];
|
return vec![vec![]; expected_partition_num];
|
||||||
}
|
}
|
||||||
|
|
||||||
if ranges.len() == 1 {
|
if ranges.len() == 1 {
|
||||||
return vec![ranges];
|
let mut vec = vec![vec![]; expected_partition_num];
|
||||||
|
vec[0] = ranges;
|
||||||
|
return vec;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Sort ranges by number of rows in descending order.
|
// Sort ranges by number of rows in descending order.
|
||||||
ranges.sort_by(|a, b| b.num_rows.cmp(&a.num_rows));
|
ranges.sort_by(|a, b| b.num_rows.cmp(&a.num_rows));
|
||||||
// Get the max row number of the ranges. Note that the number of rows may be 0 if statistics are not available.
|
let mut partition_ranges = vec![vec![]; expected_partition_num];
|
||||||
let max_rows = ranges[0].num_rows;
|
|
||||||
let total_rows = ranges.iter().map(|range| range.num_rows).sum::<usize>();
|
|
||||||
// Computes the partition num by the max row number. This eliminates the unbalance of the partitions.
|
|
||||||
let balanced_partition_num = if max_rows > 0 {
|
|
||||||
total_rows.div_ceil(max_rows)
|
|
||||||
} else {
|
|
||||||
ranges.len()
|
|
||||||
};
|
|
||||||
let actual_partition_num = expected_partition_num.min(balanced_partition_num).max(1);
|
|
||||||
let mut partition_ranges = vec![vec![]; actual_partition_num];
|
|
||||||
|
|
||||||
#[derive(Eq, PartialEq)]
|
#[derive(Eq, PartialEq)]
|
||||||
struct HeapNode {
|
struct HeapNode {
|
||||||
@@ -172,7 +171,7 @@ impl ParallelizeScan {
|
|||||||
}
|
}
|
||||||
|
|
||||||
let mut part_heap =
|
let mut part_heap =
|
||||||
BinaryHeap::from_iter((0..actual_partition_num).map(|partition_idx| HeapNode {
|
BinaryHeap::from_iter((0..expected_partition_num).map(|partition_idx| HeapNode {
|
||||||
num_rows: 0,
|
num_rows: 0,
|
||||||
partition_idx,
|
partition_idx,
|
||||||
}));
|
}));
|
||||||
@@ -263,7 +262,7 @@ mod test {
|
|||||||
];
|
];
|
||||||
assert_eq!(result, expected);
|
assert_eq!(result, expected);
|
||||||
|
|
||||||
// assign 4 ranges to 5 partitions. Only 4 partitions are returned.
|
// assign 4 ranges to 5 partitions.
|
||||||
let expected_partition_num = 5;
|
let expected_partition_num = 5;
|
||||||
let result = ParallelizeScan::assign_partition_range(ranges, expected_partition_num);
|
let result = ParallelizeScan::assign_partition_range(ranges, expected_partition_num);
|
||||||
let expected = vec![
|
let expected = vec![
|
||||||
@@ -273,32 +272,31 @@ mod test {
|
|||||||
num_rows: 250,
|
num_rows: 250,
|
||||||
identifier: 4,
|
identifier: 4,
|
||||||
}],
|
}],
|
||||||
|
vec![PartitionRange {
|
||||||
|
start: Timestamp::new(0, TimeUnit::Second),
|
||||||
|
end: Timestamp::new(10, TimeUnit::Second),
|
||||||
|
num_rows: 100,
|
||||||
|
identifier: 1,
|
||||||
|
}],
|
||||||
vec![PartitionRange {
|
vec![PartitionRange {
|
||||||
start: Timestamp::new(10, TimeUnit::Second),
|
start: Timestamp::new(10, TimeUnit::Second),
|
||||||
end: Timestamp::new(20, TimeUnit::Second),
|
end: Timestamp::new(20, TimeUnit::Second),
|
||||||
num_rows: 200,
|
num_rows: 200,
|
||||||
identifier: 2,
|
identifier: 2,
|
||||||
}],
|
}],
|
||||||
vec![
|
vec![],
|
||||||
PartitionRange {
|
vec![PartitionRange {
|
||||||
start: Timestamp::new(20, TimeUnit::Second),
|
start: Timestamp::new(20, TimeUnit::Second),
|
||||||
end: Timestamp::new(30, TimeUnit::Second),
|
end: Timestamp::new(30, TimeUnit::Second),
|
||||||
num_rows: 150,
|
num_rows: 150,
|
||||||
identifier: 3,
|
identifier: 3,
|
||||||
},
|
}],
|
||||||
PartitionRange {
|
|
||||||
start: Timestamp::new(0, TimeUnit::Second),
|
|
||||||
end: Timestamp::new(10, TimeUnit::Second),
|
|
||||||
num_rows: 100,
|
|
||||||
identifier: 1,
|
|
||||||
},
|
|
||||||
],
|
|
||||||
];
|
];
|
||||||
assert_eq!(result, expected);
|
assert_eq!(result, expected);
|
||||||
|
|
||||||
// assign 0 ranges to 5 partitions. Only 1 partition is returned.
|
// assign 0 ranges to 5 partitions. Should return 5 empty ranges.
|
||||||
let result = ParallelizeScan::assign_partition_range(vec![], 5);
|
let result = ParallelizeScan::assign_partition_range(vec![], 5);
|
||||||
assert_eq!(result.len(), 1);
|
assert_eq!(result.len(), 5);
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
|
|||||||
@@ -29,6 +29,11 @@ use datafusion::execution::{FunctionRegistry, SessionStateBuilder};
|
|||||||
use datafusion::logical_expr::LogicalPlan;
|
use datafusion::logical_expr::LogicalPlan;
|
||||||
use datafusion_expr::UserDefinedLogicalNode;
|
use datafusion_expr::UserDefinedLogicalNode;
|
||||||
use greptime_proto::substrait_extension::MergeScan as PbMergeScan;
|
use greptime_proto::substrait_extension::MergeScan as PbMergeScan;
|
||||||
|
use promql::functions::{
|
||||||
|
AbsentOverTime, AvgOverTime, Changes, CountOverTime, Delta, Deriv, IDelta, Increase,
|
||||||
|
LastOverTime, MaxOverTime, MinOverTime, PresentOverTime, Rate, Resets, StddevOverTime,
|
||||||
|
StdvarOverTime, SumOverTime,
|
||||||
|
};
|
||||||
use prost::Message;
|
use prost::Message;
|
||||||
use session::context::QueryContextRef;
|
use session::context::QueryContextRef;
|
||||||
use snafu::ResultExt;
|
use snafu::ResultExt;
|
||||||
@@ -132,6 +137,26 @@ impl SubstraitPlanDecoder for DefaultPlanDecoder {
|
|||||||
let _ = session_state.register_udaf(Arc::new(HllState::state_udf_impl()));
|
let _ = session_state.register_udaf(Arc::new(HllState::state_udf_impl()));
|
||||||
let _ = session_state.register_udaf(Arc::new(HllState::merge_udf_impl()));
|
let _ = session_state.register_udaf(Arc::new(HllState::merge_udf_impl()));
|
||||||
let _ = session_state.register_udaf(Arc::new(GeoPathAccumulator::udf_impl()));
|
let _ = session_state.register_udaf(Arc::new(GeoPathAccumulator::udf_impl()));
|
||||||
|
|
||||||
|
let _ = session_state.register_udf(Arc::new(IDelta::<false>::scalar_udf()));
|
||||||
|
let _ = session_state.register_udf(Arc::new(IDelta::<true>::scalar_udf()));
|
||||||
|
let _ = session_state.register_udf(Arc::new(Rate::scalar_udf()));
|
||||||
|
let _ = session_state.register_udf(Arc::new(Increase::scalar_udf()));
|
||||||
|
let _ = session_state.register_udf(Arc::new(Delta::scalar_udf()));
|
||||||
|
let _ = session_state.register_udf(Arc::new(Resets::scalar_udf()));
|
||||||
|
let _ = session_state.register_udf(Arc::new(Changes::scalar_udf()));
|
||||||
|
let _ = session_state.register_udf(Arc::new(Deriv::scalar_udf()));
|
||||||
|
let _ = session_state.register_udf(Arc::new(AvgOverTime::scalar_udf()));
|
||||||
|
let _ = session_state.register_udf(Arc::new(MinOverTime::scalar_udf()));
|
||||||
|
let _ = session_state.register_udf(Arc::new(MaxOverTime::scalar_udf()));
|
||||||
|
let _ = session_state.register_udf(Arc::new(SumOverTime::scalar_udf()));
|
||||||
|
let _ = session_state.register_udf(Arc::new(CountOverTime::scalar_udf()));
|
||||||
|
let _ = session_state.register_udf(Arc::new(LastOverTime::scalar_udf()));
|
||||||
|
let _ = session_state.register_udf(Arc::new(AbsentOverTime::scalar_udf()));
|
||||||
|
let _ = session_state.register_udf(Arc::new(PresentOverTime::scalar_udf()));
|
||||||
|
let _ = session_state.register_udf(Arc::new(StddevOverTime::scalar_udf()));
|
||||||
|
let _ = session_state.register_udf(Arc::new(StdvarOverTime::scalar_udf()));
|
||||||
|
// TODO(ruihang): add quantile_over_time, predict_linear, holt_winters, round
|
||||||
}
|
}
|
||||||
let logical_plan = DFLogicalSubstraitConvertor
|
let logical_plan = DFLogicalSubstraitConvertor
|
||||||
.decode(message, session_state)
|
.decode(message, session_state)
|
||||||
|
|||||||
@@ -31,6 +31,7 @@ use datafusion::error::Result as DfResult;
|
|||||||
use datafusion::execution::context::{QueryPlanner, SessionConfig, SessionContext, SessionState};
|
use datafusion::execution::context::{QueryPlanner, SessionConfig, SessionContext, SessionState};
|
||||||
use datafusion::execution::runtime_env::RuntimeEnv;
|
use datafusion::execution::runtime_env::RuntimeEnv;
|
||||||
use datafusion::execution::SessionStateBuilder;
|
use datafusion::execution::SessionStateBuilder;
|
||||||
|
use datafusion::physical_optimizer::enforce_sorting::EnforceSorting;
|
||||||
use datafusion::physical_optimizer::optimizer::PhysicalOptimizer;
|
use datafusion::physical_optimizer::optimizer::PhysicalOptimizer;
|
||||||
use datafusion::physical_optimizer::sanity_checker::SanityCheckPlan;
|
use datafusion::physical_optimizer::sanity_checker::SanityCheckPlan;
|
||||||
use datafusion::physical_optimizer::PhysicalOptimizerRule;
|
use datafusion::physical_optimizer::PhysicalOptimizerRule;
|
||||||
@@ -142,6 +143,9 @@ impl QueryEngineState {
|
|||||||
physical_optimizer
|
physical_optimizer
|
||||||
.rules
|
.rules
|
||||||
.insert(1, Arc::new(PassDistribution));
|
.insert(1, Arc::new(PassDistribution));
|
||||||
|
physical_optimizer
|
||||||
|
.rules
|
||||||
|
.insert(2, Arc::new(EnforceSorting {}));
|
||||||
// Add rule for windowed sort
|
// Add rule for windowed sort
|
||||||
physical_optimizer
|
physical_optimizer
|
||||||
.rules
|
.rules
|
||||||
|
|||||||
@@ -14,6 +14,7 @@
|
|||||||
|
|
||||||
#![feature(assert_matches)]
|
#![feature(assert_matches)]
|
||||||
#![feature(try_blocks)]
|
#![feature(try_blocks)]
|
||||||
|
#![feature(let_chains)]
|
||||||
|
|
||||||
pub mod dist_table;
|
pub mod dist_table;
|
||||||
pub mod error;
|
pub mod error;
|
||||||
|
|||||||
@@ -82,11 +82,17 @@ impl RegionScanExec {
|
|||||||
if scanner.properties().is_logical_region() {
|
if scanner.properties().is_logical_region() {
|
||||||
pk_names.sort_unstable();
|
pk_names.sort_unstable();
|
||||||
}
|
}
|
||||||
let mut pk_columns: Vec<PhysicalSortExpr> = pk_names
|
let pk_columns = pk_names
|
||||||
.into_iter()
|
.iter()
|
||||||
|
.filter_map(
|
||||||
|
|col| Some(Arc::new(Column::new_with_schema(col, &arrow_schema).ok()?) as _),
|
||||||
|
)
|
||||||
|
.collect::<Vec<_>>();
|
||||||
|
let mut pk_sort_columns: Vec<PhysicalSortExpr> = pk_names
|
||||||
|
.iter()
|
||||||
.filter_map(|col| {
|
.filter_map(|col| {
|
||||||
Some(PhysicalSortExpr::new(
|
Some(PhysicalSortExpr::new(
|
||||||
Arc::new(Column::new_with_schema(&col, &arrow_schema).ok()?) as _,
|
Arc::new(Column::new_with_schema(col, &arrow_schema).ok()?) as _,
|
||||||
SortOptions {
|
SortOptions {
|
||||||
descending: false,
|
descending: false,
|
||||||
nulls_first: true,
|
nulls_first: true,
|
||||||
@@ -113,28 +119,37 @@ impl RegionScanExec {
|
|||||||
let eq_props = match request.distribution {
|
let eq_props = match request.distribution {
|
||||||
Some(TimeSeriesDistribution::PerSeries) => {
|
Some(TimeSeriesDistribution::PerSeries) => {
|
||||||
if let Some(ts) = ts_col {
|
if let Some(ts) = ts_col {
|
||||||
pk_columns.push(ts);
|
pk_sort_columns.push(ts);
|
||||||
}
|
}
|
||||||
EquivalenceProperties::new_with_orderings(
|
EquivalenceProperties::new_with_orderings(
|
||||||
arrow_schema.clone(),
|
arrow_schema.clone(),
|
||||||
&[LexOrdering::new(pk_columns)],
|
&[LexOrdering::new(pk_sort_columns)],
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
Some(TimeSeriesDistribution::TimeWindowed) => {
|
Some(TimeSeriesDistribution::TimeWindowed) => {
|
||||||
if let Some(ts_col) = ts_col {
|
if let Some(ts_col) = ts_col {
|
||||||
pk_columns.insert(0, ts_col);
|
pk_sort_columns.insert(0, ts_col);
|
||||||
}
|
}
|
||||||
EquivalenceProperties::new_with_orderings(
|
EquivalenceProperties::new_with_orderings(
|
||||||
arrow_schema.clone(),
|
arrow_schema.clone(),
|
||||||
&[LexOrdering::new(pk_columns)],
|
&[LexOrdering::new(pk_sort_columns)],
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
None => EquivalenceProperties::new(arrow_schema.clone()),
|
None => EquivalenceProperties::new(arrow_schema.clone()),
|
||||||
};
|
};
|
||||||
|
|
||||||
|
let partitioning = match request.distribution {
|
||||||
|
Some(TimeSeriesDistribution::PerSeries) => {
|
||||||
|
Partitioning::Hash(pk_columns.clone(), num_output_partition)
|
||||||
|
}
|
||||||
|
Some(TimeSeriesDistribution::TimeWindowed) | None => {
|
||||||
|
Partitioning::UnknownPartitioning(num_output_partition)
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
let properties = PlanProperties::new(
|
let properties = PlanProperties::new(
|
||||||
eq_props,
|
eq_props,
|
||||||
Partitioning::UnknownPartitioning(num_output_partition),
|
partitioning,
|
||||||
EmissionType::Incremental,
|
EmissionType::Incremental,
|
||||||
Boundedness::Bounded,
|
Boundedness::Bounded,
|
||||||
);
|
);
|
||||||
@@ -188,9 +203,14 @@ impl RegionScanExec {
|
|||||||
warn!("Setting partition ranges more than once for RegionScanExec");
|
warn!("Setting partition ranges more than once for RegionScanExec");
|
||||||
}
|
}
|
||||||
|
|
||||||
let num_partitions = partitions.len();
|
|
||||||
let mut properties = self.properties.clone();
|
let mut properties = self.properties.clone();
|
||||||
properties.partitioning = Partitioning::UnknownPartitioning(num_partitions);
|
let new_partitioning = match properties.partitioning {
|
||||||
|
Partitioning::Hash(ref columns, _) => {
|
||||||
|
Partitioning::Hash(columns.clone(), target_partitions)
|
||||||
|
}
|
||||||
|
_ => Partitioning::UnknownPartitioning(target_partitions),
|
||||||
|
};
|
||||||
|
properties.partitioning = new_partitioning;
|
||||||
|
|
||||||
{
|
{
|
||||||
let mut scanner = self.scanner.lock().unwrap();
|
let mut scanner = self.scanner.lock().unwrap();
|
||||||
|
|||||||
@@ -130,8 +130,7 @@ tql eval (3000, 3000, '1s') label_replace(histogram_quantile(0.8, histogram_buck
|
|||||||
-- quantile with rate is covered in other cases
|
-- quantile with rate is covered in other cases
|
||||||
tql eval (3000, 3000, '1s') histogram_quantile(0.2, rate(histogram_bucket[5m]));
|
tql eval (3000, 3000, '1s') histogram_quantile(0.2, rate(histogram_bucket[5m]));
|
||||||
|
|
||||||
++
|
Error: 3001(EngineExecuteQuery), Unsupported arrow data type, type: Dictionary(Int64, Float64)
|
||||||
++
|
|
||||||
|
|
||||||
drop table histogram_bucket;
|
drop table histogram_bucket;
|
||||||
|
|
||||||
|
|||||||
@@ -17,11 +17,14 @@ tql analyze (1, 3, '1s') t1{ a = "a" };
|
|||||||
+-+-+-+
|
+-+-+-+
|
||||||
| stage | node | plan_|
|
| stage | node | plan_|
|
||||||
+-+-+-+
|
+-+-+-+
|
||||||
| 0_| 0_|_PromInstantManipulateExec: range=[1000..3000], lookback=[300000], interval=[1000], time index=[b] REDACTED
|
| 0_| 0_|_MergeScanExec: REDACTED
|
||||||
|_|_|_PromSeriesDivideExec: tags=["a"] REDACTED
|
|
||||||
|_|_|_MergeScanExec: REDACTED
|
|
||||||
|_|_|_|
|
|_|_|_|
|
||||||
| 1_| 0_|_SeqScan: region=REDACTED, partition_count=1 (1 memtable ranges, 0 file 0 ranges), distribution=PerSeries REDACTED
|
| 1_| 0_|_PromInstantManipulateExec: range=[1000..3000], lookback=[300000], interval=[1000], time index=[b] REDACTED
|
||||||
|
|_|_|_PromSeriesDivideExec: tags=["a"] REDACTED
|
||||||
|
|_|_|_SortExec: expr=[a@0 ASC], preserve_partitioning=[true] REDACTED
|
||||||
|
|_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED
|
||||||
|
|_|_|_RepartitionExec: partitioning=Hash([a@0], 32), input_partitions=1 REDACTED
|
||||||
|
|_|_|_SeqScan: region=REDACTED, partition_count=1 (1 memtable ranges, 0 file 0 ranges), distribution=PerSeries REDACTED
|
||||||
|_|_|_|
|
|_|_|_|
|
||||||
|_|_| Total rows: 3_|
|
|_|_| Total rows: 3_|
|
||||||
+-+-+-+
|
+-+-+-+
|
||||||
@@ -37,11 +40,14 @@ tql analyze (1, 3, '1s') t1{ a =~ ".*" };
|
|||||||
+-+-+-+
|
+-+-+-+
|
||||||
| stage | node | plan_|
|
| stage | node | plan_|
|
||||||
+-+-+-+
|
+-+-+-+
|
||||||
| 0_| 0_|_PromInstantManipulateExec: range=[1000..3000], lookback=[300000], interval=[1000], time index=[b] REDACTED
|
| 0_| 0_|_MergeScanExec: REDACTED
|
||||||
|_|_|_PromSeriesDivideExec: tags=["a"] REDACTED
|
|
||||||
|_|_|_MergeScanExec: REDACTED
|
|
||||||
|_|_|_|
|
|_|_|_|
|
||||||
| 1_| 0_|_SeqScan: region=REDACTED, partition_count=1 (1 memtable ranges, 0 file 0 ranges), distribution=PerSeries REDACTED
|
| 1_| 0_|_PromInstantManipulateExec: range=[1000..3000], lookback=[300000], interval=[1000], time index=[b] REDACTED
|
||||||
|
|_|_|_PromSeriesDivideExec: tags=["a"] REDACTED
|
||||||
|
|_|_|_SortExec: expr=[a@0 ASC], preserve_partitioning=[true] REDACTED
|
||||||
|
|_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED
|
||||||
|
|_|_|_RepartitionExec: partitioning=Hash([a@0], 32), input_partitions=1 REDACTED
|
||||||
|
|_|_|_SeqScan: region=REDACTED, partition_count=1 (1 memtable ranges, 0 file 0 ranges), distribution=PerSeries REDACTED
|
||||||
|_|_|_|
|
|_|_|_|
|
||||||
|_|_| Total rows: 6_|
|
|_|_| Total rows: 6_|
|
||||||
+-+-+-+
|
+-+-+-+
|
||||||
@@ -57,11 +63,14 @@ tql analyze (1, 3, '1s') t1{ a =~ "a.*" };
|
|||||||
+-+-+-+
|
+-+-+-+
|
||||||
| stage | node | plan_|
|
| stage | node | plan_|
|
||||||
+-+-+-+
|
+-+-+-+
|
||||||
| 0_| 0_|_PromInstantManipulateExec: range=[1000..3000], lookback=[300000], interval=[1000], time index=[b] REDACTED
|
| 0_| 0_|_MergeScanExec: REDACTED
|
||||||
|_|_|_PromSeriesDivideExec: tags=["a"] REDACTED
|
|
||||||
|_|_|_MergeScanExec: REDACTED
|
|
||||||
|_|_|_|
|
|_|_|_|
|
||||||
| 1_| 0_|_SeqScan: region=REDACTED, partition_count=1 (1 memtable ranges, 0 file 0 ranges), distribution=PerSeries REDACTED
|
| 1_| 0_|_PromInstantManipulateExec: range=[1000..3000], lookback=[300000], interval=[1000], time index=[b] REDACTED
|
||||||
|
|_|_|_PromSeriesDivideExec: tags=["a"] REDACTED
|
||||||
|
|_|_|_SortExec: expr=[a@0 ASC], preserve_partitioning=[true] REDACTED
|
||||||
|
|_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED
|
||||||
|
|_|_|_RepartitionExec: partitioning=Hash([a@0], 32), input_partitions=1 REDACTED
|
||||||
|
|_|_|_SeqScan: region=REDACTED, partition_count=1 (1 memtable ranges, 0 file 0 ranges), distribution=PerSeries REDACTED
|
||||||
|_|_|_|
|
|_|_|_|
|
||||||
|_|_| Total rows: 3_|
|
|_|_| Total rows: 3_|
|
||||||
+-+-+-+
|
+-+-+-+
|
||||||
|
|||||||
@@ -19,11 +19,14 @@ TQL ANALYZE (0, 10, '5s') test;
|
|||||||
+-+-+-+
|
+-+-+-+
|
||||||
| stage | node | plan_|
|
| stage | node | plan_|
|
||||||
+-+-+-+
|
+-+-+-+
|
||||||
| 0_| 0_|_PromInstantManipulateExec: range=[0..10000], lookback=[300000], interval=[5000], time index=[j] REDACTED
|
| 0_| 0_|_MergeScanExec: REDACTED
|
||||||
|_|_|_PromSeriesDivideExec: tags=["k"] REDACTED
|
|
||||||
|_|_|_MergeScanExec: REDACTED
|
|
||||||
|_|_|_|
|
|_|_|_|
|
||||||
| 1_| 0_|_SeqScan: region=REDACTED, partition_count=1 (1 memtable ranges, 0 file 0 ranges), distribution=PerSeries REDACTED
|
| 1_| 0_|_PromInstantManipulateExec: range=[0..10000], lookback=[300000], interval=[5000], time index=[j] REDACTED
|
||||||
|
|_|_|_PromSeriesDivideExec: tags=["k"] REDACTED
|
||||||
|
|_|_|_SortExec: expr=[k@2 ASC], preserve_partitioning=[true] REDACTED
|
||||||
|
|_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED
|
||||||
|
|_|_|_RepartitionExec: partitioning=Hash([k@2], 32), input_partitions=1 REDACTED
|
||||||
|
|_|_|_SeqScan: region=REDACTED, partition_count=1 (1 memtable ranges, 0 file 0 ranges), distribution=PerSeries REDACTED
|
||||||
|_|_|_|
|
|_|_|_|
|
||||||
|_|_| Total rows: 4_|
|
|_|_| Total rows: 4_|
|
||||||
+-+-+-+
|
+-+-+-+
|
||||||
@@ -41,11 +44,14 @@ TQL ANALYZE (0, 10, '1s', '2s') test;
|
|||||||
+-+-+-+
|
+-+-+-+
|
||||||
| stage | node | plan_|
|
| stage | node | plan_|
|
||||||
+-+-+-+
|
+-+-+-+
|
||||||
| 0_| 0_|_PromInstantManipulateExec: range=[0..10000], lookback=[2000], interval=[1000], time index=[j] REDACTED
|
| 0_| 0_|_MergeScanExec: REDACTED
|
||||||
|_|_|_PromSeriesDivideExec: tags=["k"] REDACTED
|
|
||||||
|_|_|_MergeScanExec: REDACTED
|
|
||||||
|_|_|_|
|
|_|_|_|
|
||||||
| 1_| 0_|_SeqScan: region=REDACTED, partition_count=1 (1 memtable ranges, 0 file 0 ranges), distribution=PerSeries REDACTED
|
| 1_| 0_|_PromInstantManipulateExec: range=[0..10000], lookback=[2000], interval=[1000], time index=[j] REDACTED
|
||||||
|
|_|_|_PromSeriesDivideExec: tags=["k"] REDACTED
|
||||||
|
|_|_|_SortExec: expr=[k@2 ASC], preserve_partitioning=[true] REDACTED
|
||||||
|
|_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED
|
||||||
|
|_|_|_RepartitionExec: partitioning=Hash([k@2], 32), input_partitions=1 REDACTED
|
||||||
|
|_|_|_SeqScan: region=REDACTED, partition_count=1 (1 memtable ranges, 0 file 0 ranges), distribution=PerSeries REDACTED
|
||||||
|_|_|_|
|
|_|_|_|
|
||||||
|_|_| Total rows: 4_|
|
|_|_| Total rows: 4_|
|
||||||
+-+-+-+
|
+-+-+-+
|
||||||
@@ -62,11 +68,14 @@ TQL ANALYZE ('1970-01-01T00:00:00'::timestamp, '1970-01-01T00:00:00'::timestamp
|
|||||||
+-+-+-+
|
+-+-+-+
|
||||||
| stage | node | plan_|
|
| stage | node | plan_|
|
||||||
+-+-+-+
|
+-+-+-+
|
||||||
| 0_| 0_|_PromInstantManipulateExec: range=[0..10000], lookback=[300000], interval=[5000], time index=[j] REDACTED
|
| 0_| 0_|_MergeScanExec: REDACTED
|
||||||
|_|_|_PromSeriesDivideExec: tags=["k"] REDACTED
|
|
||||||
|_|_|_MergeScanExec: REDACTED
|
|
||||||
|_|_|_|
|
|_|_|_|
|
||||||
| 1_| 0_|_SeqScan: region=REDACTED, partition_count=1 (1 memtable ranges, 0 file 0 ranges), distribution=PerSeries REDACTED
|
| 1_| 0_|_PromInstantManipulateExec: range=[0..10000], lookback=[300000], interval=[5000], time index=[j] REDACTED
|
||||||
|
|_|_|_PromSeriesDivideExec: tags=["k"] REDACTED
|
||||||
|
|_|_|_SortExec: expr=[k@2 ASC], preserve_partitioning=[true] REDACTED
|
||||||
|
|_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED
|
||||||
|
|_|_|_RepartitionExec: partitioning=Hash([k@2], 32), input_partitions=1 REDACTED
|
||||||
|
|_|_|_SeqScan: region=REDACTED, partition_count=1 (1 memtable ranges, 0 file 0 ranges), distribution=PerSeries REDACTED
|
||||||
|_|_|_|
|
|_|_|_|
|
||||||
|_|_| Total rows: 4_|
|
|_|_| Total rows: 4_|
|
||||||
+-+-+-+
|
+-+-+-+
|
||||||
@@ -85,11 +94,14 @@ TQL ANALYZE VERBOSE (0, 10, '5s') test;
|
|||||||
+-+-+-+
|
+-+-+-+
|
||||||
| stage | node | plan_|
|
| stage | node | plan_|
|
||||||
+-+-+-+
|
+-+-+-+
|
||||||
| 0_| 0_|_PromInstantManipulateExec: range=[0..10000], lookback=[300000], interval=[5000], time index=[j] REDACTED
|
| 0_| 0_|_MergeScanExec: REDACTED
|
||||||
|_|_|_PromSeriesDivideExec: tags=["k"] REDACTED
|
|
||||||
|_|_|_MergeScanExec: REDACTED
|
|
||||||
|_|_|_|
|
|_|_|_|
|
||||||
| 1_| 0_|_SeqScan: region=REDACTED, partition_count=1 (1 memtable ranges, 0 file 0 ranges), distribution=PerSeries, projection=["i", "j", "k"], filters=[j >= TimestampMillisecond(-300000, None), j <= TimestampMillisecond(310000, None)], REDACTED
|
| 1_| 0_|_PromInstantManipulateExec: range=[0..10000], lookback=[300000], interval=[5000], time index=[j] REDACTED
|
||||||
|
|_|_|_PromSeriesDivideExec: tags=["k"] REDACTED
|
||||||
|
|_|_|_SortExec: expr=[k@2 ASC], preserve_partitioning=[true] REDACTED
|
||||||
|
|_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED
|
||||||
|
|_|_|_RepartitionExec: partitioning=Hash([k@2], 32), input_partitions=1 REDACTED
|
||||||
|
|_|_|_SeqScan: region=REDACTED, partition_count=1 (1 memtable ranges, 0 file 0 ranges), distribution=PerSeries, projection=["i", "j", "k"], filters=[j >= TimestampMillisecond(-300000, None), j <= TimestampMillisecond(310000, None)], REDACTED
|
||||||
|_|_|_|
|
|_|_|_|
|
||||||
|_|_| Total rows: 4_|
|
|_|_| Total rows: 4_|
|
||||||
+-+-+-+
|
+-+-+-+
|
||||||
@@ -114,13 +126,23 @@ TQL ANALYZE (0, 10, '5s') test;
|
|||||||
+-+-+-+
|
+-+-+-+
|
||||||
| stage | node | plan_|
|
| stage | node | plan_|
|
||||||
+-+-+-+
|
+-+-+-+
|
||||||
| 0_| 0_|_PromInstantManipulateExec: range=[0..10000], lookback=[300000], interval=[5000], time index=[j] REDACTED
|
| 0_| 0_|_SortPreservingMergeExec: [k@2 ASC, l@3 ASC, j@1 ASC] REDACTED
|
||||||
|_|_|_PromSeriesDivideExec: tags=["k", "l"] REDACTED
|
|_|_|_SortExec: expr=[k@2 ASC, l@3 ASC, j@1 ASC], preserve_partitioning=[true] REDACTED
|
||||||
|_|_|_MergeScanExec: REDACTED
|
|_|_|_MergeScanExec: REDACTED
|
||||||
|_|_|_|
|
|_|_|_|
|
||||||
| 1_| 0_|_SeqScan: region=REDACTED, partition_count=0 (0 memtable ranges, 0 file 0 ranges), distribution=PerSeries REDACTED
|
| 1_| 0_|_PromInstantManipulateExec: range=[0..10000], lookback=[300000], interval=[5000], time index=[j] REDACTED
|
||||||
|
|_|_|_PromSeriesDivideExec: tags=["k", "l"] REDACTED
|
||||||
|
|_|_|_SortExec: expr=[k@2 ASC, l@3 ASC], preserve_partitioning=[true] REDACTED
|
||||||
|
|_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED
|
||||||
|
|_|_|_RepartitionExec: partitioning=Hash([k@2, l@3], 32), input_partitions=1 REDACTED
|
||||||
|
|_|_|_SeqScan: region=REDACTED, partition_count=0 (0 memtable ranges, 0 file 0 ranges), distribution=PerSeries REDACTED
|
||||||
|_|_|_|
|
|_|_|_|
|
||||||
| 1_| 1_|_SeqScan: region=REDACTED, partition_count=0 (0 memtable ranges, 0 file 0 ranges), distribution=PerSeries REDACTED
|
| 1_| 1_|_PromInstantManipulateExec: range=[0..10000], lookback=[300000], interval=[5000], time index=[j] REDACTED
|
||||||
|
|_|_|_PromSeriesDivideExec: tags=["k", "l"] REDACTED
|
||||||
|
|_|_|_SortExec: expr=[k@2 ASC, l@3 ASC], preserve_partitioning=[true] REDACTED
|
||||||
|
|_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED
|
||||||
|
|_|_|_RepartitionExec: partitioning=Hash([k@2, l@3], 32), input_partitions=1 REDACTED
|
||||||
|
|_|_|_SeqScan: region=REDACTED, partition_count=0 (0 memtable ranges, 0 file 0 ranges), distribution=PerSeries REDACTED
|
||||||
|_|_|_|
|
|_|_|_|
|
||||||
|_|_| Total rows: 0_|
|
|_|_| Total rows: 0_|
|
||||||
+-+-+-+
|
+-+-+-+
|
||||||
@@ -144,9 +166,21 @@ TQL ANALYZE (0, 10, '5s') rate(test[10s]);
|
|||||||
|_|_|_PromSeriesDivideExec: tags=["k", "l"] REDACTED
|
|_|_|_PromSeriesDivideExec: tags=["k", "l"] REDACTED
|
||||||
|_|_|_MergeScanExec: REDACTED
|
|_|_|_MergeScanExec: REDACTED
|
||||||
|_|_|_|
|
|_|_|_|
|
||||||
| 1_| 0_|_SeqScan: region=REDACTED, partition_count=0 (0 memtable ranges, 0 file 0 ranges), distribution=PerSeries REDACTED
|
| 1_| 0_|_PromRangeManipulateExec: req range=[0..10000], interval=[5000], eval range=[10000], time index=[j] REDACTED
|
||||||
|
|_|_|_PromSeriesNormalizeExec: offset=[0], time index=[j], filter NaN: [true] REDACTED
|
||||||
|
|_|_|_PromSeriesDivideExec: tags=["k", "l"] REDACTED
|
||||||
|
|_|_|_SortExec: expr=[k@2 ASC, l@3 ASC], preserve_partitioning=[true] REDACTED
|
||||||
|
|_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED
|
||||||
|
|_|_|_RepartitionExec: partitioning=Hash([k@2, l@3], 32), input_partitions=1 REDACTED
|
||||||
|
|_|_|_SeqScan: region=REDACTED, partition_count=0 (0 memtable ranges, 0 file 0 ranges), distribution=PerSeries REDACTED
|
||||||
|_|_|_|
|
|_|_|_|
|
||||||
| 1_| 1_|_SeqScan: region=REDACTED, partition_count=0 (0 memtable ranges, 0 file 0 ranges), distribution=PerSeries REDACTED
|
| 1_| 1_|_PromRangeManipulateExec: req range=[0..10000], interval=[5000], eval range=[10000], time index=[j] REDACTED
|
||||||
|
|_|_|_PromSeriesNormalizeExec: offset=[0], time index=[j], filter NaN: [true] REDACTED
|
||||||
|
|_|_|_PromSeriesDivideExec: tags=["k", "l"] REDACTED
|
||||||
|
|_|_|_SortExec: expr=[k@2 ASC, l@3 ASC], preserve_partitioning=[true] REDACTED
|
||||||
|
|_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED
|
||||||
|
|_|_|_RepartitionExec: partitioning=Hash([k@2, l@3], 32), input_partitions=1 REDACTED
|
||||||
|
|_|_|_SeqScan: region=REDACTED, partition_count=0 (0 memtable ranges, 0 file 0 ranges), distribution=PerSeries REDACTED
|
||||||
|_|_|_|
|
|_|_|_|
|
||||||
|_|_| Total rows: 0_|
|
|_|_| Total rows: 0_|
|
||||||
+-+-+-+
|
+-+-+-+
|
||||||
|
|||||||
@@ -12,18 +12,13 @@ Affected Rows: 3
|
|||||||
-- SQLNESS REPLACE (peers.*) REDACTED
|
-- SQLNESS REPLACE (peers.*) REDACTED
|
||||||
TQL EXPLAIN (0, 10, '5s') test;
|
TQL EXPLAIN (0, 10, '5s') test;
|
||||||
|
|
||||||
+---------------+-----------------------------------------------------------------------------------------------+
|
+---------------+-------------------------------------------------+
|
||||||
| plan_type | plan |
|
| plan_type | plan |
|
||||||
+---------------+-----------------------------------------------------------------------------------------------+
|
+---------------+-------------------------------------------------+
|
||||||
| logical_plan | PromInstantManipulate: range=[0..0], lookback=[300000], interval=[300000], time index=[j] |
|
| logical_plan | MergeScan [is_placeholder=false] |
|
||||||
| | PromSeriesDivide: tags=["k"] |
|
| physical_plan | MergeScanExec: REDACTED
|
||||||
| | Projection: test.i, test.j, test.k |
|
| | |
|
||||||
| | MergeScan [is_placeholder=false] |
|
+---------------+-------------------------------------------------+
|
||||||
| physical_plan | PromInstantManipulateExec: range=[0..0], lookback=[300000], interval=[300000], time index=[j] |
|
|
||||||
| | PromSeriesDivideExec: tags=["k"] |
|
|
||||||
| | MergeScanExec: REDACTED
|
|
||||||
| | |
|
|
||||||
+---------------+-----------------------------------------------------------------------------------------------+
|
|
||||||
|
|
||||||
-- 'lookback' parameter is not fully supported, the test has to be updated
|
-- 'lookback' parameter is not fully supported, the test has to be updated
|
||||||
-- explain at 0s, 5s and 10s. No point at 0s.
|
-- explain at 0s, 5s and 10s. No point at 0s.
|
||||||
@@ -31,36 +26,26 @@ TQL EXPLAIN (0, 10, '5s') test;
|
|||||||
-- SQLNESS REPLACE (peers.*) REDACTED
|
-- SQLNESS REPLACE (peers.*) REDACTED
|
||||||
TQL EXPLAIN (0, 10, '1s', '2s') test;
|
TQL EXPLAIN (0, 10, '1s', '2s') test;
|
||||||
|
|
||||||
+---------------+---------------------------------------------------------------------------------------------+
|
+---------------+-------------------------------------------------+
|
||||||
| plan_type | plan |
|
| plan_type | plan |
|
||||||
+---------------+---------------------------------------------------------------------------------------------+
|
+---------------+-------------------------------------------------+
|
||||||
| logical_plan | PromInstantManipulate: range=[0..0], lookback=[2000], interval=[300000], time index=[j] |
|
| logical_plan | MergeScan [is_placeholder=false] |
|
||||||
| | PromSeriesDivide: tags=["k"] |
|
| physical_plan | MergeScanExec: REDACTED
|
||||||
| | Projection: test.i, test.j, test.k |
|
| | |
|
||||||
| | MergeScan [is_placeholder=false] |
|
+---------------+-------------------------------------------------+
|
||||||
| physical_plan | PromInstantManipulateExec: range=[0..0], lookback=[2000], interval=[300000], time index=[j] |
|
|
||||||
| | PromSeriesDivideExec: tags=["k"] |
|
|
||||||
| | MergeScanExec: REDACTED
|
|
||||||
| | |
|
|
||||||
+---------------+---------------------------------------------------------------------------------------------+
|
|
||||||
|
|
||||||
-- explain at 0s, 5s and 10s. No point at 0s.
|
-- explain at 0s, 5s and 10s. No point at 0s.
|
||||||
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
|
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
|
||||||
-- SQLNESS REPLACE (peers.*) REDACTED
|
-- SQLNESS REPLACE (peers.*) REDACTED
|
||||||
TQL EXPLAIN ('1970-01-01T00:00:00'::timestamp, '1970-01-01T00:00:00'::timestamp + '10 seconds'::interval, '5s') test;
|
TQL EXPLAIN ('1970-01-01T00:00:00'::timestamp, '1970-01-01T00:00:00'::timestamp + '10 seconds'::interval, '5s') test;
|
||||||
|
|
||||||
+---------------+-----------------------------------------------------------------------------------------------+
|
+---------------+-------------------------------------------------+
|
||||||
| plan_type | plan |
|
| plan_type | plan |
|
||||||
+---------------+-----------------------------------------------------------------------------------------------+
|
+---------------+-------------------------------------------------+
|
||||||
| logical_plan | PromInstantManipulate: range=[0..0], lookback=[300000], interval=[300000], time index=[j] |
|
| logical_plan | MergeScan [is_placeholder=false] |
|
||||||
| | PromSeriesDivide: tags=["k"] |
|
| physical_plan | MergeScanExec: REDACTED
|
||||||
| | Projection: test.i, test.j, test.k |
|
| | |
|
||||||
| | MergeScan [is_placeholder=false] |
|
+---------------+-------------------------------------------------+
|
||||||
| physical_plan | PromInstantManipulateExec: range=[0..0], lookback=[300000], interval=[300000], time index=[j] |
|
|
||||||
| | PromSeriesDivideExec: tags=["k"] |
|
|
||||||
| | MergeScanExec: REDACTED
|
|
||||||
| | |
|
|
||||||
+---------------+-----------------------------------------------------------------------------------------------+
|
|
||||||
|
|
||||||
-- explain verbose at 0s, 5s and 10s. No point at 0s.
|
-- explain verbose at 0s, 5s and 10s. No point at 0s.
|
||||||
-- SQLNESS REPLACE (-+) -
|
-- SQLNESS REPLACE (-+) -
|
||||||
@@ -85,9 +70,7 @@ TQL EXPLAIN VERBOSE (0, 10, '5s') test;
|
|||||||
| logical_plan after expand_wildcard_rule_| SAME TEXT AS ABOVE_|
|
| logical_plan after expand_wildcard_rule_| SAME TEXT AS ABOVE_|
|
||||||
| logical_plan after resolve_grouping_function_| SAME TEXT AS ABOVE_|
|
| logical_plan after resolve_grouping_function_| SAME TEXT AS ABOVE_|
|
||||||
| logical_plan after type_coercion_| SAME TEXT AS ABOVE_|
|
| logical_plan after type_coercion_| SAME TEXT AS ABOVE_|
|
||||||
| logical_plan after DistPlannerAnalyzer_| PromInstantManipulate: range=[0..0], lookback=[300000], interval=[300000], time index=[j]_|
|
| logical_plan after DistPlannerAnalyzer_| Projection: test.i, test.j, test.k_|
|
||||||
|_|_PromSeriesDivide: tags=["k"]_|
|
|
||||||
|_|_Projection: test.i, test.j, test.k_|
|
|
||||||
|_|_MergeScan [is_placeholder=false]_|
|
|_|_MergeScan [is_placeholder=false]_|
|
||||||
| analyzed_logical_plan_| SAME TEXT AS ABOVE_|
|
| analyzed_logical_plan_| SAME TEXT AS ABOVE_|
|
||||||
| logical_plan after eliminate_nested_union_| SAME TEXT AS ABOVE_|
|
| logical_plan after eliminate_nested_union_| SAME TEXT AS ABOVE_|
|
||||||
@@ -114,37 +97,45 @@ TQL EXPLAIN VERBOSE (0, 10, '5s') test;
|
|||||||
| logical_plan after unwrap_cast_in_comparison_| SAME TEXT AS ABOVE_|
|
| logical_plan after unwrap_cast_in_comparison_| SAME TEXT AS ABOVE_|
|
||||||
| logical_plan after common_sub_expression_eliminate_| SAME TEXT AS ABOVE_|
|
| logical_plan after common_sub_expression_eliminate_| SAME TEXT AS ABOVE_|
|
||||||
| logical_plan after eliminate_group_by_constant_| SAME TEXT AS ABOVE_|
|
| logical_plan after eliminate_group_by_constant_| SAME TEXT AS ABOVE_|
|
||||||
|
| logical_plan after optimize_projections_| MergeScan [is_placeholder=false]_|
|
||||||
|
| logical_plan after ScanHintRule_| SAME TEXT AS ABOVE_|
|
||||||
|
| logical_plan after eliminate_nested_union_| SAME TEXT AS ABOVE_|
|
||||||
|
| logical_plan after simplify_expressions_| SAME TEXT AS ABOVE_|
|
||||||
|
| logical_plan after unwrap_cast_in_comparison_| SAME TEXT AS ABOVE_|
|
||||||
|
| logical_plan after replace_distinct_aggregate_| SAME TEXT AS ABOVE_|
|
||||||
|
| logical_plan after eliminate_join_| SAME TEXT AS ABOVE_|
|
||||||
|
| logical_plan after decorrelate_predicate_subquery_| SAME TEXT AS ABOVE_|
|
||||||
|
| logical_plan after scalar_subquery_to_join_| SAME TEXT AS ABOVE_|
|
||||||
|
| logical_plan after extract_equijoin_predicate_| SAME TEXT AS ABOVE_|
|
||||||
|
| logical_plan after eliminate_duplicated_expr_| SAME TEXT AS ABOVE_|
|
||||||
|
| logical_plan after eliminate_filter_| SAME TEXT AS ABOVE_|
|
||||||
|
| logical_plan after eliminate_cross_join_| SAME TEXT AS ABOVE_|
|
||||||
|
| logical_plan after common_sub_expression_eliminate_| SAME TEXT AS ABOVE_|
|
||||||
|
| logical_plan after eliminate_limit_| SAME TEXT AS ABOVE_|
|
||||||
|
| logical_plan after propagate_empty_relation_| SAME TEXT AS ABOVE_|
|
||||||
|
| logical_plan after eliminate_one_union_| SAME TEXT AS ABOVE_|
|
||||||
|
| logical_plan after filter_null_join_keys_| SAME TEXT AS ABOVE_|
|
||||||
|
| logical_plan after eliminate_outer_join_| SAME TEXT AS ABOVE_|
|
||||||
|
| logical_plan after push_down_limit_| SAME TEXT AS ABOVE_|
|
||||||
|
| logical_plan after push_down_filter_| SAME TEXT AS ABOVE_|
|
||||||
|
| logical_plan after single_distinct_aggregation_to_group_by | SAME TEXT AS ABOVE_|
|
||||||
|
| logical_plan after simplify_expressions_| SAME TEXT AS ABOVE_|
|
||||||
|
| logical_plan after unwrap_cast_in_comparison_| SAME TEXT AS ABOVE_|
|
||||||
|
| logical_plan after common_sub_expression_eliminate_| SAME TEXT AS ABOVE_|
|
||||||
|
| logical_plan after eliminate_group_by_constant_| SAME TEXT AS ABOVE_|
|
||||||
| logical_plan after optimize_projections_| SAME TEXT AS ABOVE_|
|
| logical_plan after optimize_projections_| SAME TEXT AS ABOVE_|
|
||||||
| logical_plan after ScanHintRule_| SAME TEXT AS ABOVE_|
|
| logical_plan after ScanHintRule_| SAME TEXT AS ABOVE_|
|
||||||
| logical_plan_| PromInstantManipulate: range=[0..0], lookback=[300000], interval=[300000], time index=[j]_|
|
| logical_plan_| MergeScan [is_placeholder=false]_|
|
||||||
|_|_PromSeriesDivide: tags=["k"]_|
|
| initial_physical_plan_| MergeScanExec: REDACTED
|
||||||
|_|_Projection: test.i, test.j, test.k_|
|
|
||||||
|_|_MergeScan [is_placeholder=false]_|
|
|
||||||
| initial_physical_plan_| PromInstantManipulateExec: range=[0..0], lookback=[300000], interval=[300000], time index=[j]_|
|
|
||||||
|_|_PromSeriesDivideExec: tags=["k"]_|
|
|
||||||
|_|_ProjectionExec: expr=[i@0 as i, j@1 as j, k@2 as k]_|
|
|
||||||
|_|_MergeScanExec: REDACTED
|
|
||||||
|_|_|
|
|_|_|
|
||||||
| initial_physical_plan_with_stats_| PromInstantManipulateExec: range=[0..0], lookback=[300000], interval=[300000], time index=[j], statistics=[Rows=Inexact(0), Bytes=Absent, [(Col[0]:),(Col[1]:),(Col[2]:)]] |
|
| initial_physical_plan_with_stats_| MergeScanExec: REDACTED
|
||||||
|_|_PromSeriesDivideExec: tags=["k"], statistics=[Rows=Absent, Bytes=Absent, [(Col[0]:),(Col[1]:),(Col[2]:)]]_|
|
|
||||||
|_|_ProjectionExec: expr=[i@0 as i, j@1 as j, k@2 as k], statistics=[Rows=Absent, Bytes=Absent, [(Col[0]:),(Col[1]:),(Col[2]:)]]_|
|
|
||||||
|_|_MergeScanExec: REDACTED
|
|
||||||
|_|_|
|
|_|_|
|
||||||
| initial_physical_plan_with_schema_| PromInstantManipulateExec: range=[0..0], lookback=[300000], interval=[300000], time index=[j], schema=[i:Float64;N, j:Timestamp(Millisecond, None), k:Utf8;N]_|
|
| initial_physical_plan_with_schema_| MergeScanExec: REDACTED
|
||||||
|_|_PromSeriesDivideExec: tags=["k"], schema=[i:Float64;N, j:Timestamp(Millisecond, None), k:Utf8;N]_|
|
|
||||||
|_|_ProjectionExec: expr=[i@0 as i, j@1 as j, k@2 as k], schema=[i:Float64;N, j:Timestamp(Millisecond, None), k:Utf8;N]_|
|
|
||||||
|_|_MergeScanExec: REDACTED
|
|
||||||
|_|_|
|
|_|_|
|
||||||
| physical_plan after parallelize_scan_| PromInstantManipulateExec: range=[0..0], lookback=[300000], interval=[300000], time index=[j]_|
|
| physical_plan after parallelize_scan_| MergeScanExec: REDACTED
|
||||||
|_|_PromSeriesDivideExec: tags=["k"]_|
|
|
||||||
|_|_ProjectionExec: expr=[i@0 as i, j@1 as j, k@2 as k]_|
|
|
||||||
|_|_MergeScanExec: REDACTED
|
|
||||||
|_|_|
|
|_|_|
|
||||||
| physical_plan after PassDistributionRule_| SAME TEXT AS ABOVE_|
|
| physical_plan after PassDistributionRule_| SAME TEXT AS ABOVE_|
|
||||||
| physical_plan after OutputRequirements_| OutputRequirementExec_|
|
| physical_plan after OutputRequirements_| OutputRequirementExec_|
|
||||||
|_|_PromInstantManipulateExec: range=[0..0], lookback=[300000], interval=[300000], time index=[j]_|
|
|
||||||
|_|_PromSeriesDivideExec: tags=["k"]_|
|
|
||||||
|_|_ProjectionExec: expr=[i@0 as i, j@1 as j, k@2 as k]_|
|
|
||||||
|_|_MergeScanExec: REDACTED
|
|_|_MergeScanExec: REDACTED
|
||||||
|_|_|
|
|_|_|
|
||||||
| physical_plan after aggregate_statistics_| SAME TEXT AS ABOVE_|
|
| physical_plan after aggregate_statistics_| SAME TEXT AS ABOVE_|
|
||||||
@@ -154,15 +145,9 @@ TQL EXPLAIN VERBOSE (0, 10, '5s') test;
|
|||||||
| physical_plan after CombinePartialFinalAggregate_| SAME TEXT AS ABOVE_|
|
| physical_plan after CombinePartialFinalAggregate_| SAME TEXT AS ABOVE_|
|
||||||
| physical_plan after EnforceSorting_| SAME TEXT AS ABOVE_|
|
| physical_plan after EnforceSorting_| SAME TEXT AS ABOVE_|
|
||||||
| physical_plan after OptimizeAggregateOrder_| SAME TEXT AS ABOVE_|
|
| physical_plan after OptimizeAggregateOrder_| SAME TEXT AS ABOVE_|
|
||||||
| physical_plan after ProjectionPushdown_| OutputRequirementExec_|
|
| physical_plan after ProjectionPushdown_| SAME TEXT AS ABOVE_|
|
||||||
|_|_PromInstantManipulateExec: range=[0..0], lookback=[300000], interval=[300000], time index=[j]_|
|
|
||||||
|_|_PromSeriesDivideExec: tags=["k"]_|
|
|
||||||
|_|_MergeScanExec: REDACTED
|
|
||||||
|_|_|
|
|
||||||
| physical_plan after coalesce_batches_| SAME TEXT AS ABOVE_|
|
| physical_plan after coalesce_batches_| SAME TEXT AS ABOVE_|
|
||||||
| physical_plan after OutputRequirements_| PromInstantManipulateExec: range=[0..0], lookback=[300000], interval=[300000], time index=[j]_|
|
| physical_plan after OutputRequirements_| MergeScanExec: REDACTED
|
||||||
|_|_PromSeriesDivideExec: tags=["k"]_|
|
|
||||||
|_|_MergeScanExec: REDACTED
|
|
||||||
|_|_|
|
|_|_|
|
||||||
| physical_plan after LimitAggregation_| SAME TEXT AS ABOVE_|
|
| physical_plan after LimitAggregation_| SAME TEXT AS ABOVE_|
|
||||||
| physical_plan after ProjectionPushdown_| SAME TEXT AS ABOVE_|
|
| physical_plan after ProjectionPushdown_| SAME TEXT AS ABOVE_|
|
||||||
@@ -171,17 +156,11 @@ TQL EXPLAIN VERBOSE (0, 10, '5s') test;
|
|||||||
| physical_plan after MatchesConstantTerm_| SAME TEXT AS ABOVE_|
|
| physical_plan after MatchesConstantTerm_| SAME TEXT AS ABOVE_|
|
||||||
| physical_plan after RemoveDuplicateRule_| SAME TEXT AS ABOVE_|
|
| physical_plan after RemoveDuplicateRule_| SAME TEXT AS ABOVE_|
|
||||||
| physical_plan after SanityCheckPlan_| SAME TEXT AS ABOVE_|
|
| physical_plan after SanityCheckPlan_| SAME TEXT AS ABOVE_|
|
||||||
| physical_plan_| PromInstantManipulateExec: range=[0..0], lookback=[300000], interval=[300000], time index=[j]_|
|
| physical_plan_| MergeScanExec: REDACTED
|
||||||
|_|_PromSeriesDivideExec: tags=["k"]_|
|
|
||||||
|_|_MergeScanExec: REDACTED
|
|
||||||
|_|_|
|
|_|_|
|
||||||
| physical_plan_with_stats_| PromInstantManipulateExec: range=[0..0], lookback=[300000], interval=[300000], time index=[j], statistics=[Rows=Inexact(0), Bytes=Absent, [(Col[0]:),(Col[1]:),(Col[2]:)]] |
|
| physical_plan_with_stats_| MergeScanExec: REDACTED
|
||||||
|_|_PromSeriesDivideExec: tags=["k"], statistics=[Rows=Absent, Bytes=Absent, [(Col[0]:),(Col[1]:),(Col[2]:)]]_|
|
|
||||||
|_|_MergeScanExec: REDACTED
|
|
||||||
|_|_|
|
|_|_|
|
||||||
| physical_plan_with_schema_| PromInstantManipulateExec: range=[0..0], lookback=[300000], interval=[300000], time index=[j], schema=[i:Float64;N, j:Timestamp(Millisecond, None), k:Utf8;N]_|
|
| physical_plan_with_schema_| MergeScanExec: REDACTED
|
||||||
|_|_PromSeriesDivideExec: tags=["k"], schema=[i:Float64;N, j:Timestamp(Millisecond, None), k:Utf8;N]_|
|
|
||||||
|_|_MergeScanExec: REDACTED
|
|
||||||
|_|_|
|
|_|_|
|
||||||
+-+-+
|
+-+-+
|
||||||
|
|
||||||
|
|||||||
164
tests/cases/standalone/common/tql/partition.result
Normal file
164
tests/cases/standalone/common/tql/partition.result
Normal file
@@ -0,0 +1,164 @@
|
|||||||
|
-- no partition
|
||||||
|
create table t (
|
||||||
|
i double,
|
||||||
|
j timestamp time index,
|
||||||
|
k string primary key
|
||||||
|
);
|
||||||
|
|
||||||
|
Affected Rows: 0
|
||||||
|
|
||||||
|
-- SQLNESS REPLACE (metrics.*) REDACTED
|
||||||
|
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
|
||||||
|
-- SQLNESS REPLACE (-+) -
|
||||||
|
-- SQLNESS REPLACE (\s\s+) _
|
||||||
|
-- SQLNESS REPLACE (peers.*) REDACTED
|
||||||
|
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
|
||||||
|
tql analyze (0, 10, '1s') 100 - (avg by (k) (irate(t[1m])) * 100);
|
||||||
|
|
||||||
|
+-+-+-+
|
||||||
|
| stage | node | plan_|
|
||||||
|
+-+-+-+
|
||||||
|
| 0_| 0_|_MergeScanExec: REDACTED
|
||||||
|
|_|_|_|
|
||||||
|
| 1_| 0_|_ProjectionExec: expr=[k@0 as k, j@1 as j, 100 - avg(prom_irate(j_range,i))@2 * 100 as Float64(100) - avg(prom_irate(j_range,i)) * Float64(100)] REDACTED
|
||||||
|
|_|_|_RepartitionExec: partitioning=REDACTED
|
||||||
|
|_|_|_SortPreservingMergeExec: [k@0 ASC NULLS LAST, j@1 ASC NULLS LAST] REDACTED
|
||||||
|
|_|_|_SortExec: expr=[k@0 ASC NULLS LAST, j@1 ASC NULLS LAST], preserve_partitioning=[true] REDACTED
|
||||||
|
|_|_|_AggregateExec: mode=FinalPartitioned, gby=[k@0 as k, j@1 as j], aggr=[avg(prom_irate(j_range,i))] REDACTED
|
||||||
|
|_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED
|
||||||
|
|_|_|_RepartitionExec: partitioning=Hash([k@0, j@1], 32), input_partitions=32 REDACTED
|
||||||
|
|_|_|_AggregateExec: mode=Partial, gby=[k@2 as k, j@0 as j], aggr=[avg(prom_irate(j_range,i))] REDACTED
|
||||||
|
|_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED
|
||||||
|
|_|_|_FilterExec: prom_irate(j_range,i)@1 IS NOT NULL REDACTED
|
||||||
|
|_|_|_ProjectionExec: expr=[j@1 as j, prom_irate(j_range@3, i@0) as prom_irate(j_range,i), k@2 as k] REDACTED
|
||||||
|
|_|_|_PromRangeManipulateExec: req range=[0..10000], interval=[1000], eval range=[60000], time index=[j] REDACTED
|
||||||
|
|_|_|_PromSeriesNormalizeExec: offset=[0], time index=[j], filter NaN: [true] REDACTED
|
||||||
|
|_|_|_PromSeriesDivideExec: tags=["k"] REDACTED
|
||||||
|
|_|_|_SortExec: expr=[k@2 ASC], preserve_partitioning=[true] REDACTED
|
||||||
|
|_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED
|
||||||
|
|_|_|_RepartitionExec: partitioning=Hash([k@2], 32), input_partitions=1 REDACTED
|
||||||
|
|_|_|_SeqScan: region=REDACTED, partition_count=0 (0 memtable ranges, 0 file 0 ranges), distribution=PerSeries REDACTED
|
||||||
|
|_|_|_|
|
||||||
|
|_|_| Total rows: 0_|
|
||||||
|
+-+-+-+
|
||||||
|
|
||||||
|
drop table t;
|
||||||
|
|
||||||
|
Affected Rows: 0
|
||||||
|
|
||||||
|
-- partition on tag
|
||||||
|
create table t (
|
||||||
|
i double,
|
||||||
|
j timestamp time index,
|
||||||
|
k string,
|
||||||
|
l string,
|
||||||
|
primary key (k, l)
|
||||||
|
) partition on columns (k, l) (k < 'a', k >= 'a');
|
||||||
|
|
||||||
|
Affected Rows: 0
|
||||||
|
|
||||||
|
-- SQLNESS REPLACE (metrics.*) REDACTED
|
||||||
|
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
|
||||||
|
-- SQLNESS REPLACE (-+) -
|
||||||
|
-- SQLNESS REPLACE (\s\s+) _
|
||||||
|
-- SQLNESS REPLACE (peers.*) REDACTED
|
||||||
|
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
|
||||||
|
tql analyze (0, 10, '1s') 100 - (avg by (k) (irate(t[1m])) * 100);
|
||||||
|
|
||||||
|
+-+-+-+
|
||||||
|
| stage | node | plan_|
|
||||||
|
+-+-+-+
|
||||||
|
| 0_| 0_|_ProjectionExec: expr=[k@0 as k, j@1 as j, 100 - avg(prom_irate(j_range,i))@2 * 100 as Float64(100) - avg(prom_irate(j_range,i)) * Float64(100)] REDACTED
|
||||||
|
|_|_|_RepartitionExec: partitioning=REDACTED
|
||||||
|
|_|_|_SortPreservingMergeExec: [k@0 ASC NULLS LAST, j@1 ASC NULLS LAST] REDACTED
|
||||||
|
|_|_|_SortExec: expr=[k@0 ASC NULLS LAST, j@1 ASC NULLS LAST], preserve_partitioning=[true] REDACTED
|
||||||
|
|_|_|_AggregateExec: mode=FinalPartitioned, gby=[k@0 as k, j@1 as j], aggr=[avg(prom_irate(j_range,i))], ordering_mode=PartiallySorted([0]) REDACTED
|
||||||
|
|_|_|_SortExec: expr=[k@0 ASC NULLS LAST], preserve_partitioning=[true] REDACTED
|
||||||
|
|_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED
|
||||||
|
|_|_|_RepartitionExec: partitioning=Hash([k@0, j@1], 32), input_partitions=32 REDACTED
|
||||||
|
|_|_|_AggregateExec: mode=Partial, gby=[k@2 as k, j@0 as j], aggr=[avg(prom_irate(j_range,i))], ordering_mode=PartiallySorted([0]) REDACTED
|
||||||
|
|_|_|_ProjectionExec: expr=[j@0 as j, prom_irate(j_range,i)@1 as prom_irate(j_range,i), k@2 as k] REDACTED
|
||||||
|
|_|_|_SortExec: expr=[k@2 ASC, l@3 ASC, j@0 ASC], preserve_partitioning=[true] REDACTED
|
||||||
|
|_|_|_MergeScanExec: REDACTED
|
||||||
|
|_|_|_|
|
||||||
|
| 1_| 0_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED
|
||||||
|
|_|_|_FilterExec: prom_irate(j_range,i)@1 IS NOT NULL REDACTED
|
||||||
|
|_|_|_ProjectionExec: expr=[j@1 as j, prom_irate(j_range@4, i@0) as prom_irate(j_range,i), k@2 as k, l@3 as l] REDACTED
|
||||||
|
|_|_|_PromRangeManipulateExec: req range=[0..10000], interval=[1000], eval range=[60000], time index=[j] REDACTED
|
||||||
|
|_|_|_PromSeriesNormalizeExec: offset=[0], time index=[j], filter NaN: [true] REDACTED
|
||||||
|
|_|_|_PromSeriesDivideExec: tags=["k", "l"] REDACTED
|
||||||
|
|_|_|_SortExec: expr=[k@2 ASC, l@3 ASC], preserve_partitioning=[true] REDACTED
|
||||||
|
|_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED
|
||||||
|
|_|_|_RepartitionExec: partitioning=Hash([k@2, l@3], 32), input_partitions=1 REDACTED
|
||||||
|
|_|_|_SeqScan: region=REDACTED, partition_count=0 (0 memtable ranges, 0 file 0 ranges), distribution=PerSeries REDACTED
|
||||||
|
|_|_|_|
|
||||||
|
| 1_| 1_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED
|
||||||
|
|_|_|_FilterExec: prom_irate(j_range,i)@1 IS NOT NULL REDACTED
|
||||||
|
|_|_|_ProjectionExec: expr=[j@1 as j, prom_irate(j_range@4, i@0) as prom_irate(j_range,i), k@2 as k, l@3 as l] REDACTED
|
||||||
|
|_|_|_PromRangeManipulateExec: req range=[0..10000], interval=[1000], eval range=[60000], time index=[j] REDACTED
|
||||||
|
|_|_|_PromSeriesNormalizeExec: offset=[0], time index=[j], filter NaN: [true] REDACTED
|
||||||
|
|_|_|_PromSeriesDivideExec: tags=["k", "l"] REDACTED
|
||||||
|
|_|_|_SortExec: expr=[k@2 ASC, l@3 ASC], preserve_partitioning=[true] REDACTED
|
||||||
|
|_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED
|
||||||
|
|_|_|_RepartitionExec: partitioning=Hash([k@2, l@3], 32), input_partitions=1 REDACTED
|
||||||
|
|_|_|_SeqScan: region=REDACTED, partition_count=0 (0 memtable ranges, 0 file 0 ranges), distribution=PerSeries REDACTED
|
||||||
|
|_|_|_|
|
||||||
|
|_|_| Total rows: 0_|
|
||||||
|
+-+-+-+
|
||||||
|
|
||||||
|
drop table t;
|
||||||
|
|
||||||
|
Affected Rows: 0
|
||||||
|
|
||||||
|
-- partition on value
|
||||||
|
create table t (
|
||||||
|
i double,
|
||||||
|
j timestamp time index,
|
||||||
|
k string,
|
||||||
|
l string,
|
||||||
|
primary key (k, l)
|
||||||
|
) partition on columns (i) (i < 1.0, i >= 1.0);
|
||||||
|
|
||||||
|
Affected Rows: 0
|
||||||
|
|
||||||
|
-- SQLNESS REPLACE (metrics.*) REDACTED
|
||||||
|
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
|
||||||
|
-- SQLNESS REPLACE (-+) -
|
||||||
|
-- SQLNESS REPLACE (\s\s+) _
|
||||||
|
-- SQLNESS REPLACE (peers.*) REDACTED
|
||||||
|
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
|
||||||
|
tql analyze (0, 10, '1s') 100 - (avg by (k) (irate(t[1m])) * 100);
|
||||||
|
|
||||||
|
+-+-+-+
|
||||||
|
| stage | node | plan_|
|
||||||
|
+-+-+-+
|
||||||
|
| 0_| 0_|_ProjectionExec: expr=[k@0 as k, j@1 as j, 100 - avg(prom_irate(j_range,i))@2 * 100 as Float64(100) - avg(prom_irate(j_range,i)) * Float64(100)] REDACTED
|
||||||
|
|_|_|_RepartitionExec: partitioning=REDACTED
|
||||||
|
|_|_|_SortPreservingMergeExec: [k@0 ASC NULLS LAST, j@1 ASC NULLS LAST] REDACTED
|
||||||
|
|_|_|_SortExec: expr=[k@0 ASC NULLS LAST, j@1 ASC NULLS LAST], preserve_partitioning=[true] REDACTED
|
||||||
|
|_|_|_AggregateExec: mode=FinalPartitioned, gby=[k@0 as k, j@1 as j], aggr=[avg(prom_irate(j_range,i))] REDACTED
|
||||||
|
|_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED
|
||||||
|
|_|_|_RepartitionExec: partitioning=Hash([k@0, j@1], 32), input_partitions=32 REDACTED
|
||||||
|
|_|_|_AggregateExec: mode=Partial, gby=[k@2 as k, j@0 as j], aggr=[avg(prom_irate(j_range,i))] REDACTED
|
||||||
|
|_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED
|
||||||
|
|_|_|_FilterExec: prom_irate(j_range,i)@1 IS NOT NULL REDACTED
|
||||||
|
|_|_|_ProjectionExec: expr=[j@1 as j, prom_irate(j_range@4, i@0) as prom_irate(j_range,i), k@2 as k] REDACTED
|
||||||
|
|_|_|_PromRangeManipulateExec: req range=[0..10000], interval=[1000], eval range=[60000], time index=[j] REDACTED
|
||||||
|
|_|_|_PromSeriesNormalizeExec: offset=[0], time index=[j], filter NaN: [true] REDACTED
|
||||||
|
|_|_|_PromSeriesDivideExec: tags=["k", "l"] REDACTED
|
||||||
|
|_|_|_SortExec: expr=[k@2 ASC, l@3 ASC], preserve_partitioning=[true] REDACTED
|
||||||
|
|_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED
|
||||||
|
|_|_|_RepartitionExec: partitioning=Hash([k@2, l@3], 32), input_partitions=32 REDACTED
|
||||||
|
|_|_|_MergeScanExec: REDACTED
|
||||||
|
|_|_|_|
|
||||||
|
| 1_| 0_|_SeqScan: region=REDACTED, partition_count=0 (0 memtable ranges, 0 file 0 ranges), distribution=PerSeries REDACTED
|
||||||
|
|_|_|_|
|
||||||
|
| 1_| 1_|_SeqScan: region=REDACTED, partition_count=0 (0 memtable ranges, 0 file 0 ranges), distribution=PerSeries REDACTED
|
||||||
|
|_|_|_|
|
||||||
|
|_|_| Total rows: 0_|
|
||||||
|
+-+-+-+
|
||||||
|
|
||||||
|
drop table t;
|
||||||
|
|
||||||
|
Affected Rows: 0
|
||||||
|
|
||||||
54
tests/cases/standalone/common/tql/partition.sql
Normal file
54
tests/cases/standalone/common/tql/partition.sql
Normal file
@@ -0,0 +1,54 @@
|
|||||||
|
-- no partition
|
||||||
|
create table t (
|
||||||
|
i double,
|
||||||
|
j timestamp time index,
|
||||||
|
k string primary key
|
||||||
|
);
|
||||||
|
|
||||||
|
-- SQLNESS REPLACE (metrics.*) REDACTED
|
||||||
|
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
|
||||||
|
-- SQLNESS REPLACE (-+) -
|
||||||
|
-- SQLNESS REPLACE (\s\s+) _
|
||||||
|
-- SQLNESS REPLACE (peers.*) REDACTED
|
||||||
|
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
|
||||||
|
tql analyze (0, 10, '1s') 100 - (avg by (k) (irate(t[1m])) * 100);
|
||||||
|
|
||||||
|
drop table t;
|
||||||
|
|
||||||
|
-- partition on tag
|
||||||
|
create table t (
|
||||||
|
i double,
|
||||||
|
j timestamp time index,
|
||||||
|
k string,
|
||||||
|
l string,
|
||||||
|
primary key (k, l)
|
||||||
|
) partition on columns (k, l) (k < 'a', k >= 'a');
|
||||||
|
|
||||||
|
-- SQLNESS REPLACE (metrics.*) REDACTED
|
||||||
|
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
|
||||||
|
-- SQLNESS REPLACE (-+) -
|
||||||
|
-- SQLNESS REPLACE (\s\s+) _
|
||||||
|
-- SQLNESS REPLACE (peers.*) REDACTED
|
||||||
|
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
|
||||||
|
tql analyze (0, 10, '1s') 100 - (avg by (k) (irate(t[1m])) * 100);
|
||||||
|
|
||||||
|
drop table t;
|
||||||
|
|
||||||
|
-- partition on value
|
||||||
|
create table t (
|
||||||
|
i double,
|
||||||
|
j timestamp time index,
|
||||||
|
k string,
|
||||||
|
l string,
|
||||||
|
primary key (k, l)
|
||||||
|
) partition on columns (i) (i < 1.0, i >= 1.0);
|
||||||
|
|
||||||
|
-- SQLNESS REPLACE (metrics.*) REDACTED
|
||||||
|
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
|
||||||
|
-- SQLNESS REPLACE (-+) -
|
||||||
|
-- SQLNESS REPLACE (\s\s+) _
|
||||||
|
-- SQLNESS REPLACE (peers.*) REDACTED
|
||||||
|
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
|
||||||
|
tql analyze (0, 10, '1s') 100 - (avg by (k) (irate(t[1m])) * 100);
|
||||||
|
|
||||||
|
drop table t;
|
||||||
Reference in New Issue
Block a user