perf: evolve promql execution engine (#5691)

* use the same sort option across every prom plan

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* tweak plans

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* wip

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix merge compile

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* Revert "wip"

This reverts commit db58884236.

* tweak merge scan

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* handle error

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* pass distribution rule

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* reverse sort order

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* refine plans

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* more optimizations for plans

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* check logical table

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix tests

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* wierd tests

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* add comment

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* add test for series_divide

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* update sqlness result

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix scalar calculation

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* update sqlness result

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix: workaround join partition

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* update proto

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
Ruihang Xia
2025-04-08 16:12:15 +08:00
committed by GitHub
parent ee4fe9d273
commit c16bae32c4
25 changed files with 702 additions and 334 deletions

2
Cargo.lock generated
View File

@@ -4691,7 +4691,7 @@ dependencies = [
[[package]]
name = "greptime-proto"
version = "0.1.0"
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=2be0f36b3264e28ab0e1c22a980d0bb634eb3a77#2be0f36b3264e28ab0e1c22a980d0bb634eb3a77"
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=dd4a1996982534636734674db66e44464b0c0d83#dd4a1996982534636734674db66e44464b0c0d83"
dependencies = [
"prost 0.13.3",
"serde",

View File

@@ -130,7 +130,7 @@ etcd-client = "0.14"
fst = "0.4.7"
futures = "0.3"
futures-util = "0.3"
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "2be0f36b3264e28ab0e1c22a980d0bb634eb3a77" }
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "dd4a1996982534636734674db66e44464b0c0d83" }
hex = "0.4"
http = "1"
humantime = "2.1"

View File

@@ -168,8 +168,8 @@ fn test_load_metasrv_example_config() {
tracing_sample_ratio: Some(Default::default()),
slow_query: SlowQueryOptions {
enable: false,
threshold: Some(Duration::from_secs(10)),
sample_ratio: Some(1.0),
threshold: None,
sample_ratio: None,
},
..Default::default()
},

View File

@@ -301,7 +301,7 @@ impl ExecutionPlan for HistogramFoldExec {
}
fn required_input_distribution(&self) -> Vec<Distribution> {
vec![Distribution::SinglePartition; self.children().len()]
self.input.required_input_distribution()
}
fn maintains_input_order(&self) -> Vec<bool> {

View File

@@ -352,9 +352,9 @@ impl Stream for InstantManipulateStream {
type Item = DataFusionResult<RecordBatch>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let timer = std::time::Instant::now();
let poll = match ready!(self.input.poll_next_unpin(cx)) {
Some(Ok(batch)) => {
let timer = std::time::Instant::now();
self.num_series.add(1);
let result = Ok(batch).and_then(|batch| self.manipulate(batch));
self.metric.elapsed_compute().add_elapsed(timer);

View File

@@ -23,6 +23,7 @@ use datafusion::common::{DFSchema, DFSchemaRef, Result as DataFusionResult, Stat
use datafusion::error::DataFusionError;
use datafusion::execution::context::TaskContext;
use datafusion::logical_expr::{EmptyRelation, Expr, LogicalPlan, UserDefinedLogicalNodeCore};
use datafusion::physical_plan::expressions::Column as ColumnExpr;
use datafusion::physical_plan::metrics::{
BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricBuilder, MetricValue, MetricsSet,
};
@@ -32,7 +33,6 @@ use datafusion::physical_plan::{
};
use datatypes::arrow::array::TimestampMillisecondArray;
use datatypes::arrow::datatypes::SchemaRef;
use datatypes::arrow::error::Result as ArrowResult;
use datatypes::arrow::record_batch::RecordBatch;
use futures::{ready, Stream, StreamExt};
use greptime_proto::substrait_extension as pb;
@@ -55,6 +55,7 @@ pub struct SeriesNormalize {
offset: Millisecond,
time_index_column_name: String,
need_filter_out_nan: bool,
tag_columns: Vec<String>,
input: LogicalPlan,
}
@@ -100,6 +101,7 @@ impl UserDefinedLogicalNodeCore for SeriesNormalize {
time_index_column_name: self.time_index_column_name.clone(),
need_filter_out_nan: self.need_filter_out_nan,
input: inputs.into_iter().next().unwrap(),
tag_columns: self.tag_columns.clone(),
})
}
}
@@ -109,12 +111,14 @@ impl SeriesNormalize {
offset: Millisecond,
time_index_column_name: N,
need_filter_out_nan: bool,
tag_columns: Vec<String>,
input: LogicalPlan,
) -> Self {
Self {
offset,
time_index_column_name: time_index_column_name.as_ref().to_string(),
need_filter_out_nan,
tag_columns,
input,
}
}
@@ -129,6 +133,7 @@ impl SeriesNormalize {
time_index_column_name: self.time_index_column_name.clone(),
need_filter_out_nan: self.need_filter_out_nan,
input: exec_input,
tag_columns: self.tag_columns.clone(),
metric: ExecutionPlanMetricsSet::new(),
})
}
@@ -138,6 +143,7 @@ impl SeriesNormalize {
offset: self.offset,
time_index: self.time_index_column_name.clone(),
filter_nan: self.need_filter_out_nan,
tag_columns: self.tag_columns.clone(),
}
.encode_to_vec()
}
@@ -152,6 +158,7 @@ impl SeriesNormalize {
pb_normalize.offset,
pb_normalize.time_index,
pb_normalize.filter_nan,
pb_normalize.tag_columns,
placeholder_plan,
))
}
@@ -162,6 +169,7 @@ pub struct SeriesNormalizeExec {
offset: Millisecond,
time_index_column_name: String,
need_filter_out_nan: bool,
tag_columns: Vec<String>,
input: Arc<dyn ExecutionPlan>,
metric: ExecutionPlanMetricsSet,
@@ -177,7 +185,14 @@ impl ExecutionPlan for SeriesNormalizeExec {
}
fn required_input_distribution(&self) -> Vec<Distribution> {
vec![Distribution::SinglePartition]
let schema = self.input.schema();
vec![Distribution::HashPartitioned(
self.tag_columns
.iter()
// Safety: the tag column names is verified in the planning phase
.map(|tag| Arc::new(ColumnExpr::new_with_schema(tag, &schema).unwrap()) as _)
.collect(),
)]
}
fn properties(&self) -> &PlanProperties {
@@ -198,6 +213,7 @@ impl ExecutionPlan for SeriesNormalizeExec {
time_index_column_name: self.time_index_column_name.clone(),
need_filter_out_nan: self.need_filter_out_nan,
input: children[0].clone(),
tag_columns: self.tag_columns.clone(),
metric: self.metric.clone(),
}))
}
@@ -288,31 +304,24 @@ impl SeriesNormalizeStream {
// bias the timestamp column by offset
let ts_column_biased = if self.offset == 0 {
ts_column.clone()
Arc::new(ts_column.clone()) as _
} else {
TimestampMillisecondArray::from_iter(
Arc::new(TimestampMillisecondArray::from_iter(
ts_column.iter().map(|ts| ts.map(|ts| ts + self.offset)),
)
))
};
let mut columns = input.columns().to_vec();
columns[self.time_index] = Arc::new(ts_column_biased);
// sort the record batch
let ordered_indices = compute::sort_to_indices(&columns[self.time_index], None, None)?;
let ordered_columns = columns
.iter()
.map(|array| compute::take(array, &ordered_indices, None))
.collect::<ArrowResult<Vec<_>>>()?;
let ordered_batch = RecordBatch::try_new(input.schema(), ordered_columns)?;
columns[self.time_index] = ts_column_biased;
let result_batch = RecordBatch::try_new(input.schema(), columns)?;
if !self.need_filter_out_nan {
return Ok(ordered_batch);
return Ok(result_batch);
}
// TODO(ruihang): consider the "special NaN"
// filter out NaN
let mut filter = vec![true; input.num_rows()];
for column in ordered_batch.columns() {
for column in result_batch.columns() {
if let Some(float_column) = column.as_any().downcast_ref::<Float64Array>() {
for (i, flag) in filter.iter_mut().enumerate() {
if float_column.value(i).is_nan() {
@@ -322,7 +331,7 @@ impl SeriesNormalizeStream {
}
}
let result = compute::filter_record_batch(&ordered_batch, &BooleanArray::from(filter))
let result = compute::filter_record_batch(&result_batch, &BooleanArray::from(filter))
.map_err(|e| DataFusionError::ArrowError(e, None))?;
Ok(result)
}
@@ -338,10 +347,10 @@ impl Stream for SeriesNormalizeStream {
type Item = DataFusionResult<RecordBatch>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let timer = std::time::Instant::now();
let poll = match ready!(self.input.poll_next_unpin(cx)) {
Some(Ok(batch)) => {
self.num_series.add(1);
let timer = std::time::Instant::now();
let result = Ok(batch).and_then(|batch| self.normalize(batch));
self.metric.elapsed_compute().add_elapsed(timer);
Poll::Ready(Some(result))
@@ -399,6 +408,7 @@ mod test {
time_index_column_name: TIME_INDEX_COLUMN.to_string(),
need_filter_out_nan: true,
input: memory_exec,
tag_columns: vec!["path".to_string()],
metric: ExecutionPlanMetricsSet::new(),
});
let session_context = SessionContext::default();
@@ -413,11 +423,11 @@ mod test {
"+---------------------+--------+------+\
\n| timestamp | value | path |\
\n+---------------------+--------+------+\
\n| 1970-01-01T00:01:00 | 0.0 | foo |\
\n| 1970-01-01T00:02:00 | 1.0 | foo |\
\n| 1970-01-01T00:00:00 | 10.0 | foo |\
\n| 1970-01-01T00:00:30 | 100.0 | foo |\
\n| 1970-01-01T00:01:00 | 0.0 | foo |\
\n| 1970-01-01T00:01:30 | 1000.0 | foo |\
\n| 1970-01-01T00:02:00 | 1.0 | foo |\
\n+---------------------+--------+------+",
);
@@ -428,11 +438,12 @@ mod test {
async fn test_offset_record_batch() {
let memory_exec = Arc::new(prepare_test_data());
let normalize_exec = Arc::new(SeriesNormalizeExec {
offset: 1_000, // offset 1s
offset: 1_000,
time_index_column_name: TIME_INDEX_COLUMN.to_string(),
need_filter_out_nan: true,
input: memory_exec,
metric: ExecutionPlanMetricsSet::new(),
tag_columns: vec!["path".to_string()],
});
let session_context = SessionContext::default();
let result = datafusion::physical_plan::collect(normalize_exec, session_context.task_ctx())
@@ -446,11 +457,11 @@ mod test {
"+---------------------+--------+------+\
\n| timestamp | value | path |\
\n+---------------------+--------+------+\
\n| 1970-01-01T00:01:01 | 0.0 | foo |\
\n| 1970-01-01T00:02:01 | 1.0 | foo |\
\n| 1970-01-01T00:00:01 | 10.0 | foo |\
\n| 1970-01-01T00:00:31 | 100.0 | foo |\
\n| 1970-01-01T00:01:01 | 0.0 | foo |\
\n| 1970-01-01T00:01:31 | 1000.0 | foo |\
\n| 1970-01-01T00:02:01 | 1.0 | foo |\
\n+---------------------+--------+------+",
);

View File

@@ -327,7 +327,7 @@ impl ExecutionPlan for RangeManipulateExec {
}
fn required_input_distribution(&self) -> Vec<Distribution> {
vec![Distribution::SinglePartition]
self.input.required_input_distribution()
}
fn with_new_children(
@@ -564,18 +564,24 @@ impl RangeManipulateStream {
let mut ranges = vec![];
// calculate for every aligned timestamp (`curr_ts`), assume the ts column is ordered.
let mut range_start_index = 0usize;
for curr_ts in (self.start..=self.end).step_by(self.interval as _) {
let mut range_start = ts_column.len();
let mut range_end = 0;
for (index, ts) in ts_column.values().iter().enumerate() {
let mut cursor = range_start_index;
while cursor < ts_column.len() {
let ts = ts_column.value(cursor);
if ts + self.range >= curr_ts {
range_start = range_start.min(index);
range_start = range_start.min(cursor);
range_start_index = range_start;
}
if *ts <= curr_ts {
range_end = range_end.max(index);
if ts <= curr_ts {
range_end = range_end.max(cursor);
} else {
range_start_index = range_start_index.checked_sub(1usize).unwrap_or_default();
break;
}
cursor += 1;
}
if range_start > range_end {
ranges.push((0, 0));

View File

@@ -504,7 +504,10 @@ impl Stream for ScalarCalculateStream {
None => {
self.done = true;
return match self.batch.take() {
Some(batch) if !self.have_multi_series => Poll::Ready(Some(Ok(batch))),
Some(batch) if !self.have_multi_series => {
self.metric.record_output(batch.num_rows());
Poll::Ready(Some(Ok(batch)))
}
_ => {
let time_array = (self.start..=self.end)
.step_by(self.interval as _)
@@ -517,6 +520,7 @@ impl Stream for ScalarCalculateStream {
Arc::new(Float64Array::from(vec![f64::NAN; nums])),
],
)?;
self.metric.record_output(nan_batch.num_rows());
Poll::Ready(Some(Ok(nan_batch)))
}
};

View File

@@ -34,6 +34,7 @@ use datafusion::physical_plan::{
SendableRecordBatchStream,
};
use datatypes::arrow::compute;
use datatypes::compute::SortOptions;
use futures::{ready, Stream, StreamExt};
use greptime_proto::substrait_extension as pb;
use prost::Message;
@@ -146,7 +147,14 @@ impl ExecutionPlan for SeriesDivideExec {
}
fn required_input_distribution(&self) -> Vec<Distribution> {
vec![Distribution::SinglePartition]
let schema = self.input.schema();
vec![Distribution::HashPartitioned(
self.tag_columns
.iter()
// Safety: the tag column names is verified in the planning phase
.map(|tag| Arc::new(ColumnExpr::new_with_schema(tag, &schema).unwrap()) as _)
.collect(),
)]
}
fn required_input_ordering(&self) -> Vec<Option<LexRequirement>> {
@@ -157,7 +165,10 @@ impl ExecutionPlan for SeriesDivideExec {
.map(|tag| PhysicalSortRequirement {
// Safety: the tag column names is verified in the planning phase
expr: Arc::new(ColumnExpr::new_with_schema(tag, &input_schema).unwrap()),
options: None,
options: Some(SortOptions {
descending: false,
nulls_first: true,
}),
})
.collect();
if !exprs.is_empty() {
@@ -267,9 +278,9 @@ impl Stream for SeriesDivideStream {
type Item = DataFusionResult<RecordBatch>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let timer = std::time::Instant::now();
loop {
if !self.buffer.is_empty() {
let timer = std::time::Instant::now();
let cut_at = match self.find_first_diff_row() {
Ok(cut_at) => cut_at,
Err(e) => return Poll::Ready(Some(Err(e))),
@@ -287,7 +298,11 @@ impl Stream for SeriesDivideStream {
.drain(0..batch_index)
.chain([half_batch_of_first_series])
.collect::<Vec<_>>();
self.buffer[0] = half_batch_of_second_series;
if half_batch_of_second_series.num_rows() > 0 {
self.buffer[0] = half_batch_of_second_series;
} else {
self.buffer.remove(0);
}
let result_batch = compute::concat_batches(&self.schema, &result_batches)?;
self.inspect_start = 0;
@@ -295,8 +310,10 @@ impl Stream for SeriesDivideStream {
self.metric.elapsed_compute().add_elapsed(timer);
return Poll::Ready(Some(Ok(result_batch)));
} else {
self.metric.elapsed_compute().add_elapsed(timer);
// continue to fetch next batch as the current buffer only contains one time series.
let next_batch = ready!(self.as_mut().fetch_next_batch(cx)).transpose()?;
let timer = std::time::Instant::now();
if let Some(next_batch) = next_batch {
self.buffer.push(next_batch);
continue;
@@ -350,7 +367,7 @@ impl SeriesDivideStream {
let mut result_index = num_rows;
// check if the first row is the same with last batch's last row
if resumed_batch_index > self.inspect_start {
if resumed_batch_index > self.inspect_start.checked_sub(1).unwrap_or_default() {
let last_batch = &self.buffer[resumed_batch_index - 1];
let last_row = last_batch.num_rows() - 1;
for index in &self.tag_indices {
@@ -375,11 +392,34 @@ impl SeriesDivideStream {
let current_value = current_string_array.value(0);
let last_value = last_string_array.value(last_row);
if current_value != last_value {
return Ok(Some((resumed_batch_index, 0)));
return Ok(Some((resumed_batch_index - 1, last_batch.num_rows() - 1)));
}
}
}
// quick check if all rows are the same by comparing the first and last row in this batch
let mut all_same = true;
for index in &self.tag_indices {
let array = batch.column(*index);
let string_array =
array
.as_any()
.downcast_ref::<StringArray>()
.ok_or_else(|| {
datafusion::error::DataFusionError::Internal(
"Failed to downcast tag column to StringArray".to_string(),
)
})?;
if string_array.value(0) != string_array.value(num_rows - 1) {
all_same = false;
break;
}
}
if all_same {
resumed_batch_index += 1;
continue;
}
// check column by column
for index in &self.tag_indices {
let array = batch.column(*index);
@@ -595,4 +635,187 @@ mod test {
assert_eq!(formatted, expected);
}
}
#[tokio::test]
async fn test_all_batches_same_combination() {
// Create a schema with host and path columns, same as prepare_test_data
let schema = Arc::new(Schema::new(vec![
Field::new("host", DataType::Utf8, true),
Field::new("path", DataType::Utf8, true),
]));
// Create batches with three different combinations
// Each batch contains only one combination
// Batches with the same combination are adjacent
// First combination: "server1", "/var/log"
let batch1 = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(StringArray::from(vec!["server1", "server1", "server1"])) as _,
Arc::new(StringArray::from(vec!["/var/log", "/var/log", "/var/log"])) as _,
],
)
.unwrap();
let batch2 = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(StringArray::from(vec!["server1", "server1"])) as _,
Arc::new(StringArray::from(vec!["/var/log", "/var/log"])) as _,
],
)
.unwrap();
// Second combination: "server2", "/var/data"
let batch3 = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(StringArray::from(vec!["server2", "server2", "server2"])) as _,
Arc::new(StringArray::from(vec![
"/var/data",
"/var/data",
"/var/data",
])) as _,
],
)
.unwrap();
let batch4 = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(StringArray::from(vec!["server2"])) as _,
Arc::new(StringArray::from(vec!["/var/data"])) as _,
],
)
.unwrap();
// Third combination: "server3", "/opt/logs"
let batch5 = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(StringArray::from(vec!["server3", "server3"])) as _,
Arc::new(StringArray::from(vec!["/opt/logs", "/opt/logs"])) as _,
],
)
.unwrap();
let batch6 = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(StringArray::from(vec!["server3", "server3", "server3"])) as _,
Arc::new(StringArray::from(vec![
"/opt/logs",
"/opt/logs",
"/opt/logs",
])) as _,
],
)
.unwrap();
// Create MemoryExec with these batches, keeping same combinations adjacent
let memory_exec = Arc::new(
MemoryExec::try_new(
&[vec![batch1, batch2, batch3, batch4, batch5, batch6]],
schema.clone(),
None,
)
.unwrap(),
);
// Create SeriesDivideExec
let divide_exec = Arc::new(SeriesDivideExec {
tag_columns: vec!["host".to_string(), "path".to_string()],
input: memory_exec,
metric: ExecutionPlanMetricsSet::new(),
});
// Execute the division
let session_context = SessionContext::default();
let result =
datafusion::physical_plan::collect(divide_exec.clone(), session_context.task_ctx())
.await
.unwrap();
// Verify that we got 3 batches (one for each combination)
assert_eq!(result.len(), 3);
// First batch should have 5 rows (3 + 2 from the "server1" combination)
assert_eq!(result[0].num_rows(), 5);
// Second batch should have 4 rows (3 + 1 from the "server2" combination)
assert_eq!(result[1].num_rows(), 4);
// Third batch should have 5 rows (2 + 3 from the "server3" combination)
assert_eq!(result[2].num_rows(), 5);
// Verify values in first batch (server1, /var/log)
let host_array1 = result[0]
.column(0)
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
let path_array1 = result[0]
.column(1)
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
for i in 0..5 {
assert_eq!(host_array1.value(i), "server1");
assert_eq!(path_array1.value(i), "/var/log");
}
// Verify values in second batch (server2, /var/data)
let host_array2 = result[1]
.column(0)
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
let path_array2 = result[1]
.column(1)
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
for i in 0..4 {
assert_eq!(host_array2.value(i), "server2");
assert_eq!(path_array2.value(i), "/var/data");
}
// Verify values in third batch (server3, /opt/logs)
let host_array3 = result[2]
.column(0)
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
let path_array3 = result[2]
.column(1)
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
for i in 0..5 {
assert_eq!(host_array3.value(i), "server3");
assert_eq!(path_array3.value(i), "/opt/logs");
}
// Also verify streaming behavior
let mut divide_stream = divide_exec
.execute(0, SessionContext::default().task_ctx())
.unwrap();
// Should produce three batches, one for each combination
let batch1 = divide_stream.next().await.unwrap().unwrap();
assert_eq!(batch1.num_rows(), 5); // server1 combination
let batch2 = divide_stream.next().await.unwrap().unwrap();
assert_eq!(batch2.num_rows(), 4); // server2 combination
let batch3 = divide_stream.next().await.unwrap().unwrap();
assert_eq!(batch3.num_rows(), 5); // server3 combination
// No more batches should be produced
assert!(divide_stream.next().await.is_none());
}
}

View File

@@ -279,7 +279,14 @@ impl PlanRewriter {
on_node = on_node.rewrite(&mut rewriter)?.data;
// add merge scan as the new root
let mut node = MergeScanLogicalPlan::new(on_node, false).into_logical_plan();
let mut node = MergeScanLogicalPlan::new(
on_node,
false,
// at this stage, the partition cols should be set
// treat it as non-partitioned if None
self.partition_cols.clone().unwrap_or_default(),
)
.into_logical_plan();
// expand stages
for new_stage in self.stage.drain(..) {

View File

@@ -16,7 +16,8 @@ use std::any::Any;
use std::sync::{Arc, Mutex};
use std::time::Duration;
use arrow_schema::{Schema as ArrowSchema, SchemaRef as ArrowSchemaRef};
use ahash::HashSet;
use arrow_schema::{Schema as ArrowSchema, SchemaRef as ArrowSchemaRef, SortOptions};
use async_stream::stream;
use common_catalog::parse_catalog_and_schema_from_db_string;
use common_error::ext::BoxedError;
@@ -28,16 +29,19 @@ use common_recordbatch::{
DfSendableRecordBatchStream, RecordBatch, RecordBatchStreamWrapper, SendableRecordBatchStream,
};
use common_telemetry::tracing_context::TracingContext;
use datafusion::execution::TaskContext;
use datafusion::execution::{SessionState, TaskContext};
use datafusion::physical_plan::metrics::{
Count, ExecutionPlanMetricsSet, Gauge, MetricBuilder, MetricsSet, Time,
};
use datafusion::physical_plan::{
DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan, Partitioning, PlanProperties,
};
use datafusion_common::Result;
use datafusion_expr::{Extension, LogicalPlan, UserDefinedLogicalNodeCore};
use datafusion_physical_expr::EquivalenceProperties;
use datafusion_common::{Column as ColumnExpr, Result};
use datafusion_expr::{Expr, Extension, LogicalPlan, UserDefinedLogicalNodeCore};
use datafusion_physical_expr::expressions::Column;
use datafusion_physical_expr::{
Distribution, EquivalenceProperties, LexOrdering, PhysicalSortExpr,
};
use datatypes::schema::{Schema, SchemaRef};
use futures_util::StreamExt;
use greptime_proto::v1::region::RegionRequestHeader;
@@ -59,6 +63,7 @@ pub struct MergeScanLogicalPlan {
input: LogicalPlan,
/// If this plan is a placeholder
is_placeholder: bool,
partition_cols: Vec<String>,
}
impl UserDefinedLogicalNodeCore for MergeScanLogicalPlan {
@@ -95,10 +100,11 @@ impl UserDefinedLogicalNodeCore for MergeScanLogicalPlan {
}
impl MergeScanLogicalPlan {
pub fn new(input: LogicalPlan, is_placeholder: bool) -> Self {
pub fn new(input: LogicalPlan, is_placeholder: bool, partition_cols: Vec<String>) -> Self {
Self {
input,
is_placeholder,
partition_cols,
}
}
@@ -120,7 +126,12 @@ impl MergeScanLogicalPlan {
pub fn input(&self) -> &LogicalPlan {
&self.input
}
pub fn partition_cols(&self) -> &[String] {
&self.partition_cols
}
}
pub struct MergeScanExec {
table: TableName,
regions: Vec<RegionId>,
@@ -134,6 +145,7 @@ pub struct MergeScanExec {
sub_stage_metrics: Arc<Mutex<Vec<RecordBatchMetrics>>>,
query_ctx: QueryContextRef,
target_partition: usize,
partition_cols: Vec<String>,
}
impl std::fmt::Debug for MergeScanExec {
@@ -147,7 +159,9 @@ impl std::fmt::Debug for MergeScanExec {
}
impl MergeScanExec {
#[allow(clippy::too_many_arguments)]
pub fn new(
session_state: &SessionState,
table: TableName,
regions: Vec<RegionId>,
plan: LogicalPlan,
@@ -155,16 +169,60 @@ impl MergeScanExec {
region_query_handler: RegionQueryHandlerRef,
query_ctx: QueryContextRef,
target_partition: usize,
partition_cols: Vec<String>,
) -> Result<Self> {
// TODO(CookiePieWw): Initially we removed the metadata from the schema in #2000, but we have to
// keep it for #4619 to identify json type in src/datatypes/src/schema/column_schema.rs.
// Reconsider if it's possible to remove it.
let arrow_schema = Arc::new(arrow_schema.clone());
let properties = PlanProperties::new(
EquivalenceProperties::new(arrow_schema.clone()),
Partitioning::UnknownPartitioning(target_partition),
ExecutionMode::Bounded,
);
// States the output ordering of the plan.
//
// When the input plan is a sort, we can use the sort ordering as the output ordering
// if the target partition is greater than the number of regions, which means we won't
// break the ordering on merging (of MergeScan).
//
// Otherwise, we need to use the default ordering.
let eq_properties = if let LogicalPlan::Sort(sort) = &plan
&& target_partition >= regions.len()
{
let lex_ordering = sort
.expr
.iter()
.map(|sort_expr| {
let physical_expr = session_state
.create_physical_expr(sort_expr.expr.clone(), plan.schema())?;
Ok(PhysicalSortExpr::new(
physical_expr,
SortOptions {
descending: !sort_expr.asc,
nulls_first: sort_expr.nulls_first,
},
))
})
.collect::<Result<Vec<_>>>()?;
EquivalenceProperties::new_with_orderings(
arrow_schema.clone(),
&[LexOrdering::new(lex_ordering)],
)
} else {
EquivalenceProperties::new(arrow_schema.clone())
};
let partition_exprs = partition_cols
.iter()
.filter_map(|col| {
session_state
.create_physical_expr(
Expr::Column(ColumnExpr::new_unqualified(col)),
plan.schema(),
)
.ok()
})
.collect();
let partitioning = Partitioning::Hash(partition_exprs, target_partition);
let properties = PlanProperties::new(eq_properties, partitioning, ExecutionMode::Bounded);
let schema = Self::arrow_schema_to_schema(arrow_schema.clone())?;
Ok(Self {
table,
@@ -178,6 +236,7 @@ impl MergeScanExec {
properties,
query_ctx,
target_partition,
partition_cols,
})
}
@@ -291,6 +350,52 @@ impl MergeScanExec {
}))
}
pub fn try_with_new_distribution(&self, distribution: Distribution) -> Option<Self> {
let Distribution::HashPartitioned(hash_exprs) = distribution else {
// not applicable
return None;
};
if let Partitioning::Hash(curr_dist, _) = &self.properties.partitioning
&& curr_dist == &hash_exprs
{
// No need to change the distribution
return None;
}
let mut hash_cols = HashSet::default();
for expr in &hash_exprs {
if let Some(col_expr) = expr.as_any().downcast_ref::<Column>() {
hash_cols.insert(col_expr.name());
}
}
for col in &self.partition_cols {
if !hash_cols.contains(col.as_str()) {
// The partitioning columns are not the same
return None;
}
}
Some(Self {
table: self.table.clone(),
regions: self.regions.clone(),
plan: self.plan.clone(),
schema: self.schema.clone(),
arrow_schema: self.arrow_schema.clone(),
region_query_handler: self.region_query_handler.clone(),
metric: self.metric.clone(),
properties: PlanProperties::new(
self.properties.eq_properties.clone(),
Partitioning::Hash(hash_exprs, self.target_partition),
self.properties.execution_mode,
),
sub_stage_metrics: self.sub_stage_metrics.clone(),
query_ctx: self.query_ctx.clone(),
target_partition: self.target_partition,
partition_cols: self.partition_cols.clone(),
})
}
fn arrow_schema_to_schema(arrow_schema: ArrowSchemaRef) -> Result<SchemaRef> {
let schema = Schema::try_from(arrow_schema).context(ConvertSchemaSnafu)?;
Ok(Arc::new(schema))

View File

@@ -162,6 +162,7 @@ impl ExtensionPlanner for DistExtensionPlanner {
.get_extension()
.unwrap_or_else(QueryContext::arc);
let merge_scan_plan = MergeScanExec::new(
session_state,
table_name,
regions,
input_plan.clone(),
@@ -169,6 +170,7 @@ impl ExtensionPlanner for DistExtensionPlanner {
self.region_query_handler.clone(),
query_ctx,
session_state.config().target_partitions(),
merge_scan.partition_cols().to_vec(),
)?;
Ok(Some(Arc::new(merge_scan_plan) as _))
}

View File

@@ -14,6 +14,7 @@
pub mod count_wildcard;
pub mod parallelize_scan;
pub mod pass_distribution;
pub mod remove_duplicate;
pub mod scan_hint;
pub mod string_normalization;

View File

@@ -0,0 +1,81 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use datafusion::config::ConfigOptions;
use datafusion::physical_optimizer::PhysicalOptimizerRule;
use datafusion::physical_plan::ExecutionPlan;
use datafusion_common::tree_node::{Transformed, TreeNode};
use datafusion_common::Result as DfResult;
use datafusion_physical_expr::Distribution;
use crate::dist_plan::MergeScanExec;
/// This is a [`PhysicalOptimizerRule`] to pass distribution requirement to
/// [`MergeScanExec`] to avoid unnecessary shuffling.
///
/// This rule is expected to be run before [`EnforceDistribution`].
///
/// [`EnforceDistribution`]: datafusion::physical_optimizer::enforce_distribution::EnforceDistribution
/// [`MergeScanExec`]: crate::dist_plan::MergeScanExec
#[derive(Debug)]
pub struct PassDistribution;
impl PhysicalOptimizerRule for PassDistribution {
fn optimize(
&self,
plan: Arc<dyn ExecutionPlan>,
config: &ConfigOptions,
) -> DfResult<Arc<dyn ExecutionPlan>> {
Self::do_optimize(plan, config)
}
fn name(&self) -> &str {
"PassDistributionRule"
}
fn schema_check(&self) -> bool {
false
}
}
impl PassDistribution {
fn do_optimize(
plan: Arc<dyn ExecutionPlan>,
_config: &ConfigOptions,
) -> DfResult<Arc<dyn ExecutionPlan>> {
let mut distribution_requirement = None;
let result = plan.transform_down(|plan| {
if let Some(distribution) = plan.required_input_distribution().first()
&& !matches!(distribution, Distribution::UnspecifiedDistribution)
// incorrect workaround, doesn't fix the actual issue
&& plan.name() != "HashJoinExec"
{
distribution_requirement = Some(distribution.clone());
}
if let Some(merge_scan) = plan.as_any().downcast_ref::<MergeScanExec>()
&& let Some(distribution) = distribution_requirement.as_ref()
&& let Some(new_plan) = merge_scan.try_with_new_distribution(distribution.clone())
{
Ok(Transformed::yes(Arc::new(new_plan) as _))
} else {
Ok(Transformed::no(plan))
}
})?;
Ok(result.data)
}
}

View File

@@ -1038,6 +1038,9 @@ impl PromPlanner {
});
// make series_normalize plan
if !is_range_selector && offset_duration == 0 {
return Ok(divide_plan);
}
let series_normalize = SeriesNormalize::new(
offset_duration,
self.ctx
@@ -1047,6 +1050,7 @@ impl PromPlanner {
table: table_ref.to_quoted_string(),
})?,
is_range_selector,
self.ctx.tag_columns.clone(),
divide_plan,
);
let logical_plan = LogicalPlan::Extension(Extension {
@@ -1894,9 +1898,9 @@ impl PromPlanner {
.ctx
.tag_columns
.iter()
.map(|col| DfExpr::Column(Column::from_name(col)).sort(false, false))
.map(|col| DfExpr::Column(Column::from_name(col)).sort(true, true))
.collect::<Vec<_>>();
result.push(self.create_time_index_column_expr()?.sort(false, false));
result.push(self.create_time_index_column_expr()?.sort(true, true));
Ok(result)
}
@@ -1904,7 +1908,7 @@ impl PromPlanner {
self.ctx
.field_columns
.iter()
.map(|col| DfExpr::Column(Column::from_name(col)).sort(asc, false))
.map(|col| DfExpr::Column(Column::from_name(col)).sort(asc, true))
.collect::<Vec<_>>()
}
@@ -2089,7 +2093,7 @@ impl PromPlanner {
let tag_sort_exprs = self
.create_tag_column_exprs()?
.into_iter()
.map(|expr| expr.sort(asc, false));
.map(|expr| expr.sort(asc, true));
// perform window operation to each value column
let exprs: Vec<DfExpr> = self
@@ -2099,7 +2103,7 @@ impl PromPlanner {
.map(|col| {
let mut sort_exprs = Vec::with_capacity(self.ctx.tag_columns.len() + 1);
// Order by value in the specific order
sort_exprs.push(DfExpr::Column(Column::from(col)).sort(asc, false));
sort_exprs.push(DfExpr::Column(Column::from(col)).sort(asc, true));
// Then tags if the values are equal,
// Try to ensure the relative stability of the output results.
sort_exprs.extend(tag_sort_exprs.clone());
@@ -3130,11 +3134,10 @@ mod test {
"Filter: TEMPLATE(field_0) IS NOT NULL [timestamp:Timestamp(Millisecond, None), TEMPLATE(field_0):Float64;N, tag_0:Utf8]\
\n Projection: some_metric.timestamp, TEMPLATE(some_metric.field_0) AS TEMPLATE(field_0), some_metric.tag_0 [timestamp:Timestamp(Millisecond, None), TEMPLATE(field_0):Float64;N, tag_0:Utf8]\
\n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n PromSeriesNormalize: offset=[0], time index=[timestamp], filter NaN: [false] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n Sort: some_metric.tag_0 DESC NULLS LAST, some_metric.timestamp DESC NULLS LAST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n Filter: some_metric.tag_0 != Utf8(\"bar\") AND some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]"
\n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n Filter: some_metric.tag_0 != Utf8(\"bar\") AND some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]"
).replace("TEMPLATE", plan_name);
assert_eq!(plan.display_indent_schema().to_string(), expected);
@@ -3339,11 +3342,10 @@ mod test {
"Sort: some_metric.tag_1 ASC NULLS LAST, some_metric.timestamp ASC NULLS LAST [tag_1:Utf8, timestamp:Timestamp(Millisecond, None), TEMPLATE(some_metric.field_0):Float64;N, TEMPLATE(some_metric.field_1):Float64;N]\
\n Aggregate: groupBy=[[some_metric.tag_1, some_metric.timestamp]], aggr=[[TEMPLATE(some_metric.field_0), TEMPLATE(some_metric.field_1)]] [tag_1:Utf8, timestamp:Timestamp(Millisecond, None), TEMPLATE(some_metric.field_0):Float64;N, TEMPLATE(some_metric.field_1):Float64;N]\
\n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\
\n PromSeriesNormalize: offset=[0], time index=[timestamp], filter NaN: [false] [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\
\n PromSeriesDivide: tags=[\"tag_0\", \"tag_1\"] [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\
\n Sort: some_metric.tag_0 DESC NULLS LAST, some_metric.tag_1 DESC NULLS LAST, some_metric.timestamp DESC NULLS LAST [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\
\n Filter: some_metric.tag_0 != Utf8(\"bar\") AND some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\
\n TableScan: some_metric [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]"
\n PromSeriesDivide: tags=[\"tag_0\", \"tag_1\"] [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\
\n Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.tag_1 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\
\n Filter: some_metric.tag_0 != Utf8(\"bar\") AND some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\
\n TableScan: some_metric [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]"
).replace("TEMPLATE", plan_name);
assert_eq!(
plan.display_indent_schema().to_string(),
@@ -3369,11 +3371,10 @@ mod test {
"Sort: some_metric.tag_0 ASC NULLS LAST, some_metric.timestamp ASC NULLS LAST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), TEMPLATE(some_metric.field_0):Float64;N, TEMPLATE(some_metric.field_1):Float64;N]\
\n Aggregate: groupBy=[[some_metric.tag_0, some_metric.timestamp]], aggr=[[TEMPLATE(some_metric.field_0), TEMPLATE(some_metric.field_1)]] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), TEMPLATE(some_metric.field_0):Float64;N, TEMPLATE(some_metric.field_1):Float64;N]\
\n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\
\n PromSeriesNormalize: offset=[0], time index=[timestamp], filter NaN: [false] [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\
\n PromSeriesDivide: tags=[\"tag_0\", \"tag_1\"] [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\
\n Sort: some_metric.tag_0 DESC NULLS LAST, some_metric.tag_1 DESC NULLS LAST, some_metric.timestamp DESC NULLS LAST [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\
\n Filter: some_metric.tag_0 != Utf8(\"bar\") AND some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\
\n TableScan: some_metric [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]"
\n PromSeriesDivide: tags=[\"tag_0\", \"tag_1\"] [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\
\n Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.tag_1 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\
\n Filter: some_metric.tag_0 != Utf8(\"bar\") AND some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\
\n TableScan: some_metric [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]"
).replace("TEMPLATE", plan_name);
assert_eq!(plan.display_indent_schema().to_string(), expected_without);
}
@@ -3472,18 +3473,16 @@ mod test {
\n Inner Join: lhs.tag_0 = rhs.tag_0, lhs.timestamp = rhs.timestamp [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n SubqueryAlias: lhs [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n PromSeriesNormalize: offset=[0], time index=[timestamp], filter NaN: [false] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n Sort: some_metric.tag_0 DESC NULLS LAST, some_metric.timestamp DESC NULLS LAST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n Filter: some_metric.tag_0 = Utf8(\"foo\") AND some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n Filter: some_metric.tag_0 = Utf8(\"foo\") AND some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n SubqueryAlias: rhs [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n PromSeriesNormalize: offset=[0], time index=[timestamp], filter NaN: [false] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n Sort: some_metric.tag_0 DESC NULLS LAST, some_metric.timestamp DESC NULLS LAST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n Filter: some_metric.tag_0 = Utf8(\"bar\") AND some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]"
\n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n Filter: some_metric.tag_0 = Utf8(\"bar\") AND some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]"
);
assert_eq!(plan.display_indent_schema().to_string(), expected);
@@ -3526,11 +3525,10 @@ mod test {
let expected = String::from(
"Projection: some_metric.tag_0, some_metric.timestamp, Float64(1) + some_metric.field_0 AS Float64(1) + field_0 [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), Float64(1) + field_0:Float64;N]\
\n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n PromSeriesNormalize: offset=[0], time index=[timestamp], filter NaN: [false] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n Sort: some_metric.tag_0 DESC NULLS LAST, some_metric.timestamp DESC NULLS LAST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n Filter: some_metric.tag_0 = Utf8(\"bar\") AND some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]"
\n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n Filter: some_metric.tag_0 = Utf8(\"bar\") AND some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]"
);
indie_query_plan_compare(query, expected).await;
@@ -3550,11 +3548,10 @@ mod test {
let expected = String::from(
"Projection: some_metric.tag_0, some_metric.timestamp, CAST(some_metric.field_0 != Float64(1.2345) AS Float64) AS field_0 != Float64(1.2345) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0 != Float64(1.2345):Float64;N]\
\n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n PromSeriesNormalize: offset=[0], time index=[timestamp], filter NaN: [false] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n Sort: some_metric.tag_0 DESC NULLS LAST, some_metric.timestamp DESC NULLS LAST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n Filter: some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]"
\n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n Filter: some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]"
);
indie_query_plan_compare(query, expected).await;
@@ -3566,11 +3563,10 @@ mod test {
let expected = String::from(
"Projection: some_metric.tag_0, some_metric.timestamp, some_metric.field_0 + CAST(Float64(1) = Float64(2) AS Float64) AS field_0 + Float64(1) = Float64(2) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0 + Float64(1) = Float64(2):Float64;N]\
\n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n PromSeriesNormalize: offset=[0], time index=[timestamp], filter NaN: [false] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n Sort: some_metric.tag_0 DESC NULLS LAST, some_metric.timestamp DESC NULLS LAST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n Filter: some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]"
\n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n Filter: some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]"
);
indie_query_plan_compare(query, expected).await;
@@ -3582,11 +3578,10 @@ mod test {
let expected = String::from(
"Projection: some_metric.tag_0, some_metric.timestamp, (- some_metric.field_0) AS (- field_0) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), (- field_0):Float64;N]\
\n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n PromSeriesNormalize: offset=[0], time index=[timestamp], filter NaN: [false] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n Sort: some_metric.tag_0 DESC NULLS LAST, some_metric.timestamp DESC NULLS LAST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n Filter: some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]"
\n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n Filter: some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]"
);
indie_query_plan_compare(query, expected).await;
@@ -3601,7 +3596,7 @@ mod test {
\n PromRangeManipulate: req range=[0..100000000], interval=[5000], eval range=[300000], time index=[timestamp], values=[\"field_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Dictionary(Int64, Float64);N, timestamp_range:Dictionary(Int64, Timestamp(Millisecond, None))]\
\n PromSeriesNormalize: offset=[0], time index=[timestamp], filter NaN: [true] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n Sort: some_metric.tag_0 DESC NULLS LAST, some_metric.timestamp DESC NULLS LAST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n Filter: some_metric.timestamp >= TimestampMillisecond(-301000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]"
);
@@ -3615,11 +3610,10 @@ mod test {
let expected = String::from(
"Filter: some_metric.field_0 < Float64(1.2345) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n PromSeriesNormalize: offset=[0], time index=[timestamp], filter NaN: [false] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n Sort: some_metric.tag_0 DESC NULLS LAST, some_metric.timestamp DESC NULLS LAST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n Filter: some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]"
\n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n Filter: some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]"
);
indie_query_plan_compare(query, expected).await;
@@ -3634,7 +3628,7 @@ mod test {
\n PromRangeManipulate: req range=[0..100000000], interval=[5000], eval range=[300000], time index=[timestamp], values=[\"field_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Dictionary(Int64, Float64);N, timestamp_range:Dictionary(Int64, Timestamp(Millisecond, None))]\
\n PromSeriesNormalize: offset=[0], time index=[timestamp], filter NaN: [true] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n Sort: some_metric.tag_0 DESC NULLS LAST, some_metric.timestamp DESC NULLS LAST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n Filter: some_metric.timestamp >= TimestampMillisecond(-301000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]"
);
@@ -3676,22 +3670,20 @@ mod test {
let plan = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_session_state())
.await
.unwrap();
let expected = r#"Projection: http_server_requests_seconds_count.uri, http_server_requests_seconds_count.kubernetes_namespace, http_server_requests_seconds_count.kubernetes_pod_name, http_server_requests_seconds_count.greptime_timestamp, http_server_requests_seconds_sum.greptime_value / http_server_requests_seconds_count.greptime_value AS http_server_requests_seconds_sum.greptime_value / http_server_requests_seconds_count.greptime_value
Inner Join: http_server_requests_seconds_sum.greptime_timestamp = http_server_requests_seconds_count.greptime_timestamp, http_server_requests_seconds_sum.uri = http_server_requests_seconds_count.uri
SubqueryAlias: http_server_requests_seconds_sum
PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[greptime_timestamp]
PromSeriesNormalize: offset=[0], time index=[greptime_timestamp], filter NaN: [false]
PromSeriesDivide: tags=["uri", "kubernetes_namespace", "kubernetes_pod_name"]
Sort: http_server_requests_seconds_sum.uri DESC NULLS LAST, http_server_requests_seconds_sum.kubernetes_namespace DESC NULLS LAST, http_server_requests_seconds_sum.kubernetes_pod_name DESC NULLS LAST, http_server_requests_seconds_sum.greptime_timestamp DESC NULLS LAST
Filter: http_server_requests_seconds_sum.uri = Utf8("/accounts/login") AND http_server_requests_seconds_sum.greptime_timestamp >= TimestampMillisecond(-1000, None) AND http_server_requests_seconds_sum.greptime_timestamp <= TimestampMillisecond(100001000, None)
TableScan: http_server_requests_seconds_sum
SubqueryAlias: http_server_requests_seconds_count
PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[greptime_timestamp]
PromSeriesNormalize: offset=[0], time index=[greptime_timestamp], filter NaN: [false]
PromSeriesDivide: tags=["uri", "kubernetes_namespace", "kubernetes_pod_name"]
Sort: http_server_requests_seconds_count.uri DESC NULLS LAST, http_server_requests_seconds_count.kubernetes_namespace DESC NULLS LAST, http_server_requests_seconds_count.kubernetes_pod_name DESC NULLS LAST, http_server_requests_seconds_count.greptime_timestamp DESC NULLS LAST
Filter: http_server_requests_seconds_count.uri = Utf8("/accounts/login") AND http_server_requests_seconds_count.greptime_timestamp >= TimestampMillisecond(-1000, None) AND http_server_requests_seconds_count.greptime_timestamp <= TimestampMillisecond(100001000, None)
TableScan: http_server_requests_seconds_count"#;
let expected = "Projection: http_server_requests_seconds_count.uri, http_server_requests_seconds_count.kubernetes_namespace, http_server_requests_seconds_count.kubernetes_pod_name, http_server_requests_seconds_count.greptime_timestamp, http_server_requests_seconds_sum.greptime_value / http_server_requests_seconds_count.greptime_value AS http_server_requests_seconds_sum.greptime_value / http_server_requests_seconds_count.greptime_value\
\n Inner Join: http_server_requests_seconds_sum.greptime_timestamp = http_server_requests_seconds_count.greptime_timestamp, http_server_requests_seconds_sum.uri = http_server_requests_seconds_count.uri\
\n SubqueryAlias: http_server_requests_seconds_sum\
\n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[greptime_timestamp]\
\n PromSeriesDivide: tags=[\"uri\", \"kubernetes_namespace\", \"kubernetes_pod_name\"]\
\n Sort: http_server_requests_seconds_sum.uri ASC NULLS FIRST, http_server_requests_seconds_sum.kubernetes_namespace ASC NULLS FIRST, http_server_requests_seconds_sum.kubernetes_pod_name ASC NULLS FIRST, http_server_requests_seconds_sum.greptime_timestamp ASC NULLS FIRST\
\n Filter: http_server_requests_seconds_sum.uri = Utf8(\"/accounts/login\") AND http_server_requests_seconds_sum.greptime_timestamp >= TimestampMillisecond(-1000, None) AND http_server_requests_seconds_sum.greptime_timestamp <= TimestampMillisecond(100001000, None)\
\n TableScan: http_server_requests_seconds_sum\
\n SubqueryAlias: http_server_requests_seconds_count\
\n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[greptime_timestamp]\
\n PromSeriesDivide: tags=[\"uri\", \"kubernetes_namespace\", \"kubernetes_pod_name\"]\
\n Sort: http_server_requests_seconds_count.uri ASC NULLS FIRST, http_server_requests_seconds_count.kubernetes_namespace ASC NULLS FIRST, http_server_requests_seconds_count.kubernetes_pod_name ASC NULLS FIRST, http_server_requests_seconds_count.greptime_timestamp ASC NULLS FIRST\
\n Filter: http_server_requests_seconds_count.uri = Utf8(\"/accounts/login\") AND http_server_requests_seconds_count.greptime_timestamp >= TimestampMillisecond(-1000, None) AND http_server_requests_seconds_count.greptime_timestamp <= TimestampMillisecond(100001000, None)\
\n TableScan: http_server_requests_seconds_count";
assert_eq!(plan.to_string(), expected);
}
@@ -3951,20 +3943,41 @@ mod test {
async fn custom_schema() {
let query = "some_alt_metric{__schema__=\"greptime_private\"}";
let expected = String::from(
"PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\n PromSeriesNormalize: offset=[0], time index=[timestamp], filter NaN: [false] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\n Sort: greptime_private.some_alt_metric.tag_0 DESC NULLS LAST, greptime_private.some_alt_metric.timestamp DESC NULLS LAST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\n Filter: greptime_private.some_alt_metric.timestamp >= TimestampMillisecond(-1000, None) AND greptime_private.some_alt_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\n TableScan: greptime_private.some_alt_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]"
"PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n Sort: greptime_private.some_alt_metric.tag_0 ASC NULLS FIRST, greptime_private.some_alt_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n Filter: greptime_private.some_alt_metric.timestamp >= TimestampMillisecond(-1000, None) AND greptime_private.some_alt_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n TableScan: greptime_private.some_alt_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]"
);
indie_query_plan_compare(query, expected).await;
let query = "some_alt_metric{__database__=\"greptime_private\"}";
let expected = String::from(
"PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\n PromSeriesNormalize: offset=[0], time index=[timestamp], filter NaN: [false] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\n Sort: greptime_private.some_alt_metric.tag_0 DESC NULLS LAST, greptime_private.some_alt_metric.timestamp DESC NULLS LAST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\n Filter: greptime_private.some_alt_metric.timestamp >= TimestampMillisecond(-1000, None) AND greptime_private.some_alt_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\n TableScan: greptime_private.some_alt_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]"
"PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n Sort: greptime_private.some_alt_metric.tag_0 ASC NULLS FIRST, greptime_private.some_alt_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n Filter: greptime_private.some_alt_metric.timestamp >= TimestampMillisecond(-1000, None) AND greptime_private.some_alt_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n TableScan: greptime_private.some_alt_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]"
);
indie_query_plan_compare(query, expected).await;
let query = "some_alt_metric{__schema__=\"greptime_private\"} / some_metric";
let expected = String::from("Projection: some_metric.tag_0, some_metric.timestamp, greptime_private.some_alt_metric.field_0 / some_metric.field_0 AS greptime_private.some_alt_metric.field_0 / some_metric.field_0 [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), greptime_private.some_alt_metric.field_0 / some_metric.field_0:Float64;N]\n Inner Join: greptime_private.some_alt_metric.tag_0 = some_metric.tag_0, greptime_private.some_alt_metric.timestamp = some_metric.timestamp [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\n SubqueryAlias: greptime_private.some_alt_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\n PromSeriesNormalize: offset=[0], time index=[timestamp], filter NaN: [false] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\n Sort: greptime_private.some_alt_metric.tag_0 DESC NULLS LAST, greptime_private.some_alt_metric.timestamp DESC NULLS LAST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\n Filter: greptime_private.some_alt_metric.timestamp >= TimestampMillisecond(-1000, None) AND greptime_private.some_alt_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\n TableScan: greptime_private.some_alt_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\n SubqueryAlias: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\n PromSeriesNormalize: offset=[0], time index=[timestamp], filter NaN: [false] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\n Sort: some_metric.tag_0 DESC NULLS LAST, some_metric.timestamp DESC NULLS LAST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\n Filter: some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\n TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]");
let expected = String::from("Projection: some_metric.tag_0, some_metric.timestamp, greptime_private.some_alt_metric.field_0 / some_metric.field_0 AS greptime_private.some_alt_metric.field_0 / some_metric.field_0 [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), greptime_private.some_alt_metric.field_0 / some_metric.field_0:Float64;N]\
\n Inner Join: greptime_private.some_alt_metric.tag_0 = some_metric.tag_0, greptime_private.some_alt_metric.timestamp = some_metric.timestamp [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n SubqueryAlias: greptime_private.some_alt_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n Sort: greptime_private.some_alt_metric.tag_0 ASC NULLS FIRST, greptime_private.some_alt_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n Filter: greptime_private.some_alt_metric.timestamp >= TimestampMillisecond(-1000, None) AND greptime_private.some_alt_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n TableScan: greptime_private.some_alt_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n SubqueryAlias: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n Filter: some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]");
indie_query_plan_compare(query, expected).await;
}
@@ -4077,12 +4090,11 @@ mod test {
.unwrap();
assert_eq!(plan.display_indent_schema().to_string(),
"PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
\n PromSeriesNormalize: offset=[0], time index=[timestamp], filter NaN: [false] [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
\n PromSeriesDivide: tags=[\"tag\"] [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
\n Sort: metrics.tag DESC NULLS LAST, metrics.timestamp DESC NULLS LAST [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
\n Filter: metrics.tag = Utf8(\"1\") AND metrics.timestamp >= TimestampMillisecond(-1000, None) AND metrics.timestamp <= TimestampMillisecond(100001000, None) [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
\n Projection: metrics.field, metrics.tag, CAST(metrics.timestamp AS Timestamp(Millisecond, None)) AS timestamp [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
\n TableScan: metrics [tag:Utf8, timestamp:Timestamp(Nanosecond, None), field:Float64;N]"
\n PromSeriesDivide: tags=[\"tag\"] [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
\n Sort: metrics.tag ASC NULLS FIRST, metrics.timestamp ASC NULLS FIRST [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
\n Filter: metrics.tag = Utf8(\"1\") AND metrics.timestamp >= TimestampMillisecond(-1000, None) AND metrics.timestamp <= TimestampMillisecond(100001000, None) [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
\n Projection: metrics.field, metrics.tag, CAST(metrics.timestamp AS Timestamp(Millisecond, None)) AS timestamp [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
\n TableScan: metrics [tag:Utf8, timestamp:Timestamp(Nanosecond, None), field:Float64;N]"
);
let plan = PromPlanner::stmt_to_plan(
DfTableSourceProvider::new(
@@ -4111,7 +4123,7 @@ mod test {
\n PromRangeManipulate: req range=[0..100000000], interval=[5000], eval range=[5000], time index=[timestamp], values=[\"field\"] [field:Dictionary(Int64, Float64);N, tag:Utf8, timestamp:Timestamp(Millisecond, None), timestamp_range:Dictionary(Int64, Timestamp(Millisecond, None))]\
\n PromSeriesNormalize: offset=[0], time index=[timestamp], filter NaN: [true] [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
\n PromSeriesDivide: tags=[\"tag\"] [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
\n Sort: metrics.tag DESC NULLS LAST, metrics.timestamp DESC NULLS LAST [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
\n Sort: metrics.tag ASC NULLS FIRST, metrics.timestamp ASC NULLS FIRST [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
\n Filter: metrics.tag = Utf8(\"1\") AND metrics.timestamp >= TimestampMillisecond(-6000, None) AND metrics.timestamp <= TimestampMillisecond(100001000, None) [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
\n Projection: metrics.field, metrics.tag, CAST(metrics.timestamp AS Timestamp(Millisecond, None)) AS timestamp [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
\n TableScan: metrics [tag:Utf8, timestamp:Timestamp(Nanosecond, None), field:Float64;N]"
@@ -4169,14 +4181,13 @@ mod test {
.await
.unwrap();
let expected = r#"Filter: field_0 IS NOT NULL AND foo IS NOT NULL [timestamp:Timestamp(Millisecond, None), field_0:Float64;N, foo:Utf8;N, tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8]
Projection: up.timestamp, up.field_0 AS field_0, concat_ws(Utf8(","), up.tag_1, up.tag_2, up.tag_3) AS foo AS foo, up.tag_0, up.tag_1, up.tag_2, up.tag_3 [timestamp:Timestamp(Millisecond, None), field_0:Float64;N, foo:Utf8;N, tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8]
PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]
PromSeriesNormalize: offset=[0], time index=[timestamp], filter NaN: [false] [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]
PromSeriesDivide: tags=["tag_0", "tag_1", "tag_2", "tag_3"] [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]
Sort: up.tag_0 DESC NULLS LAST, up.tag_1 DESC NULLS LAST, up.tag_2 DESC NULLS LAST, up.tag_3 DESC NULLS LAST, up.timestamp DESC NULLS LAST [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]
Filter: up.tag_0 = Utf8("api-server") AND up.timestamp >= TimestampMillisecond(-1000, None) AND up.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]
TableScan: up [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]"#;
let expected = "Filter: field_0 IS NOT NULL AND foo IS NOT NULL [timestamp:Timestamp(Millisecond, None), field_0:Float64;N, foo:Utf8;N, tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8]\
\n Projection: up.timestamp, up.field_0 AS field_0, concat_ws(Utf8(\",\"), up.tag_1, up.tag_2, up.tag_3) AS foo AS foo, up.tag_0, up.tag_1, up.tag_2, up.tag_3 [timestamp:Timestamp(Millisecond, None), field_0:Float64;N, foo:Utf8;N, tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8]\
\n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n PromSeriesDivide: tags=[\"tag_0\", \"tag_1\", \"tag_2\", \"tag_3\"] [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n Sort: up.tag_0 ASC NULLS FIRST, up.tag_1 ASC NULLS FIRST, up.tag_2 ASC NULLS FIRST, up.tag_3 ASC NULLS FIRST, up.timestamp ASC NULLS FIRST [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n Filter: up.tag_0 = Utf8(\"api-server\") AND up.timestamp >= TimestampMillisecond(-1000, None) AND up.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n TableScan: up [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]";
assert_eq!(plan.display_indent_schema().to_string(), expected);
}
@@ -4204,14 +4215,13 @@ mod test {
.await
.unwrap();
let expected = r#"Filter: field_0 IS NOT NULL AND foo IS NOT NULL [timestamp:Timestamp(Millisecond, None), field_0:Float64;N, foo:Utf8;N, tag_0:Utf8]
Projection: up.timestamp, up.field_0 AS field_0, regexp_replace(up.tag_0, Utf8("(.*):.*"), Utf8("$1")) AS foo AS foo, up.tag_0 [timestamp:Timestamp(Millisecond, None), field_0:Float64;N, foo:Utf8;N, tag_0:Utf8]
PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]
PromSeriesNormalize: offset=[0], time index=[timestamp], filter NaN: [false] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]
PromSeriesDivide: tags=["tag_0"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]
Sort: up.tag_0 DESC NULLS LAST, up.timestamp DESC NULLS LAST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]
Filter: up.tag_0 = Utf8("a:c") AND up.timestamp >= TimestampMillisecond(-1000, None) AND up.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]
TableScan: up [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]"#;
let expected = "Filter: field_0 IS NOT NULL AND foo IS NOT NULL [timestamp:Timestamp(Millisecond, None), field_0:Float64;N, foo:Utf8;N, tag_0:Utf8]\
\n Projection: up.timestamp, up.field_0 AS field_0, regexp_replace(up.tag_0, Utf8(\"(.*):.*\"), Utf8(\"$1\")) AS foo AS foo, up.tag_0 [timestamp:Timestamp(Millisecond, None), field_0:Float64;N, foo:Utf8;N, tag_0:Utf8]\
\n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n Sort: up.tag_0 ASC NULLS FIRST, up.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n Filter: up.tag_0 = Utf8(\"a:c\") AND up.timestamp >= TimestampMillisecond(-1000, None) AND up.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n TableScan: up [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]";
assert_eq!(plan.display_indent_schema().to_string(), expected);
}
@@ -4243,14 +4253,13 @@ mod test {
let plan = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_session_state())
.await
.unwrap();
let expected = r#"Sort: prometheus_tsdb_head_series.timestamp ASC NULLS LAST [timestamp:Timestamp(Millisecond, None), sum(prometheus_tsdb_head_series.field_0):Float64;N, sum(prometheus_tsdb_head_series.field_1):Float64;N, sum(prometheus_tsdb_head_series.field_2):Float64;N]
Aggregate: groupBy=[[prometheus_tsdb_head_series.timestamp]], aggr=[[sum(prometheus_tsdb_head_series.field_0), sum(prometheus_tsdb_head_series.field_1), sum(prometheus_tsdb_head_series.field_2)]] [timestamp:Timestamp(Millisecond, None), sum(prometheus_tsdb_head_series.field_0):Float64;N, sum(prometheus_tsdb_head_series.field_1):Float64;N, sum(prometheus_tsdb_head_series.field_2):Float64;N]
PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N]
PromSeriesNormalize: offset=[0], time index=[timestamp], filter NaN: [false] [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N]
PromSeriesDivide: tags=["tag_0", "tag_1", "tag_2"] [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N]
Sort: prometheus_tsdb_head_series.tag_0 DESC NULLS LAST, prometheus_tsdb_head_series.tag_1 DESC NULLS LAST, prometheus_tsdb_head_series.tag_2 DESC NULLS LAST, prometheus_tsdb_head_series.timestamp DESC NULLS LAST [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N]
Filter: prometheus_tsdb_head_series.tag_1 ~ Utf8("(10\.0\.160\.237:8080|10\.0\.160\.237:9090)") AND prometheus_tsdb_head_series.timestamp >= TimestampMillisecond(-1000, None) AND prometheus_tsdb_head_series.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N]
TableScan: prometheus_tsdb_head_series [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N]"#;
let expected = "Sort: prometheus_tsdb_head_series.timestamp ASC NULLS LAST [timestamp:Timestamp(Millisecond, None), sum(prometheus_tsdb_head_series.field_0):Float64;N, sum(prometheus_tsdb_head_series.field_1):Float64;N, sum(prometheus_tsdb_head_series.field_2):Float64;N]\
\n Aggregate: groupBy=[[prometheus_tsdb_head_series.timestamp]], aggr=[[sum(prometheus_tsdb_head_series.field_0), sum(prometheus_tsdb_head_series.field_1), sum(prometheus_tsdb_head_series.field_2)]] [timestamp:Timestamp(Millisecond, None), sum(prometheus_tsdb_head_series.field_0):Float64;N, sum(prometheus_tsdb_head_series.field_1):Float64;N, sum(prometheus_tsdb_head_series.field_2):Float64;N]\
\n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N]\
\n PromSeriesDivide: tags=[\"tag_0\", \"tag_1\", \"tag_2\"] [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N]\
\n Sort: prometheus_tsdb_head_series.tag_0 ASC NULLS FIRST, prometheus_tsdb_head_series.tag_1 ASC NULLS FIRST, prometheus_tsdb_head_series.tag_2 ASC NULLS FIRST, prometheus_tsdb_head_series.timestamp ASC NULLS FIRST [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N]\
\n Filter: prometheus_tsdb_head_series.tag_1 ~ Utf8(\"(10\\.0\\.160\\.237:8080|10\\.0\\.160\\.237:9090)\") AND prometheus_tsdb_head_series.timestamp >= TimestampMillisecond(-1000, None) AND prometheus_tsdb_head_series.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N]\
\n TableScan: prometheus_tsdb_head_series [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N]";
assert_eq!(plan.display_indent_schema().to_string(), expected);
}
@@ -4287,18 +4296,17 @@ mod test {
let plan = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_session_state())
.await
.unwrap();
let expected = r#"Projection: sum(prometheus_tsdb_head_series.greptime_value), prometheus_tsdb_head_series.ip, prometheus_tsdb_head_series.greptime_timestamp [sum(prometheus_tsdb_head_series.greptime_value):Float64;N, ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None)]
Sort: prometheus_tsdb_head_series.greptime_timestamp ASC NULLS LAST, row_number() PARTITION BY [prometheus_tsdb_head_series.greptime_timestamp] ORDER BY [sum(prometheus_tsdb_head_series.greptime_value) DESC NULLS LAST, prometheus_tsdb_head_series.ip DESC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW ASC NULLS LAST [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), sum(prometheus_tsdb_head_series.greptime_value):Float64;N, row_number() PARTITION BY [prometheus_tsdb_head_series.greptime_timestamp] ORDER BY [sum(prometheus_tsdb_head_series.greptime_value) DESC NULLS LAST, prometheus_tsdb_head_series.ip DESC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW:UInt64]
Filter: row_number() PARTITION BY [prometheus_tsdb_head_series.greptime_timestamp] ORDER BY [sum(prometheus_tsdb_head_series.greptime_value) DESC NULLS LAST, prometheus_tsdb_head_series.ip DESC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW <= Float64(10) [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), sum(prometheus_tsdb_head_series.greptime_value):Float64;N, row_number() PARTITION BY [prometheus_tsdb_head_series.greptime_timestamp] ORDER BY [sum(prometheus_tsdb_head_series.greptime_value) DESC NULLS LAST, prometheus_tsdb_head_series.ip DESC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW:UInt64]
WindowAggr: windowExpr=[[row_number() PARTITION BY [prometheus_tsdb_head_series.greptime_timestamp] ORDER BY [sum(prometheus_tsdb_head_series.greptime_value) DESC NULLS LAST, prometheus_tsdb_head_series.ip DESC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), sum(prometheus_tsdb_head_series.greptime_value):Float64;N, row_number() PARTITION BY [prometheus_tsdb_head_series.greptime_timestamp] ORDER BY [sum(prometheus_tsdb_head_series.greptime_value) DESC NULLS LAST, prometheus_tsdb_head_series.ip DESC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW:UInt64]
Sort: prometheus_tsdb_head_series.ip ASC NULLS LAST, prometheus_tsdb_head_series.greptime_timestamp ASC NULLS LAST [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), sum(prometheus_tsdb_head_series.greptime_value):Float64;N]
Aggregate: groupBy=[[prometheus_tsdb_head_series.ip, prometheus_tsdb_head_series.greptime_timestamp]], aggr=[[sum(prometheus_tsdb_head_series.greptime_value)]] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), sum(prometheus_tsdb_head_series.greptime_value):Float64;N]
PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[greptime_timestamp] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]
PromSeriesNormalize: offset=[0], time index=[greptime_timestamp], filter NaN: [false] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]
PromSeriesDivide: tags=["ip"] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]
Sort: prometheus_tsdb_head_series.ip DESC NULLS LAST, prometheus_tsdb_head_series.greptime_timestamp DESC NULLS LAST [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]
Filter: prometheus_tsdb_head_series.ip ~ Utf8("(10\.0\.160\.237:8080|10\.0\.160\.237:9090)") AND prometheus_tsdb_head_series.greptime_timestamp >= TimestampMillisecond(-1000, None) AND prometheus_tsdb_head_series.greptime_timestamp <= TimestampMillisecond(100001000, None) [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]
TableScan: prometheus_tsdb_head_series [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]"#;
let expected = "Projection: sum(prometheus_tsdb_head_series.greptime_value), prometheus_tsdb_head_series.ip, prometheus_tsdb_head_series.greptime_timestamp [sum(prometheus_tsdb_head_series.greptime_value):Float64;N, ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None)]\
\n Sort: prometheus_tsdb_head_series.greptime_timestamp ASC NULLS LAST, row_number() PARTITION BY [prometheus_tsdb_head_series.greptime_timestamp] ORDER BY [sum(prometheus_tsdb_head_series.greptime_value) DESC NULLS FIRST, prometheus_tsdb_head_series.ip DESC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW ASC NULLS LAST [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), sum(prometheus_tsdb_head_series.greptime_value):Float64;N, row_number() PARTITION BY [prometheus_tsdb_head_series.greptime_timestamp] ORDER BY [sum(prometheus_tsdb_head_series.greptime_value) DESC NULLS FIRST, prometheus_tsdb_head_series.ip DESC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW:UInt64]\
\n Filter: row_number() PARTITION BY [prometheus_tsdb_head_series.greptime_timestamp] ORDER BY [sum(prometheus_tsdb_head_series.greptime_value) DESC NULLS FIRST, prometheus_tsdb_head_series.ip DESC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW <= Float64(10) [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), sum(prometheus_tsdb_head_series.greptime_value):Float64;N, row_number() PARTITION BY [prometheus_tsdb_head_series.greptime_timestamp] ORDER BY [sum(prometheus_tsdb_head_series.greptime_value) DESC NULLS FIRST, prometheus_tsdb_head_series.ip DESC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW:UInt64]\
\n WindowAggr: windowExpr=[[row_number() PARTITION BY [prometheus_tsdb_head_series.greptime_timestamp] ORDER BY [sum(prometheus_tsdb_head_series.greptime_value) DESC NULLS FIRST, prometheus_tsdb_head_series.ip DESC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), sum(prometheus_tsdb_head_series.greptime_value):Float64;N, row_number() PARTITION BY [prometheus_tsdb_head_series.greptime_timestamp] ORDER BY [sum(prometheus_tsdb_head_series.greptime_value) DESC NULLS FIRST, prometheus_tsdb_head_series.ip DESC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW:UInt64]\
\n Sort: prometheus_tsdb_head_series.ip ASC NULLS LAST, prometheus_tsdb_head_series.greptime_timestamp ASC NULLS LAST [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), sum(prometheus_tsdb_head_series.greptime_value):Float64;N]\
\n Aggregate: groupBy=[[prometheus_tsdb_head_series.ip, prometheus_tsdb_head_series.greptime_timestamp]], aggr=[[sum(prometheus_tsdb_head_series.greptime_value)]] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), sum(prometheus_tsdb_head_series.greptime_value):Float64;N]\
\n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[greptime_timestamp] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
\n PromSeriesDivide: tags=[\"ip\"] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
\n Sort: prometheus_tsdb_head_series.ip ASC NULLS FIRST, prometheus_tsdb_head_series.greptime_timestamp ASC NULLS FIRST [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
\n Filter: prometheus_tsdb_head_series.ip ~ Utf8(\"(10\\.0\\.160\\.237:8080|10\\.0\\.160\\.237:9090)\") AND prometheus_tsdb_head_series.greptime_timestamp >= TimestampMillisecond(-1000, None) AND prometheus_tsdb_head_series.greptime_timestamp <= TimestampMillisecond(100001000, None) [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
\n TableScan: prometheus_tsdb_head_series [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]";
assert_eq!(plan.display_indent_schema().to_string(), expected);
}
@@ -4336,16 +4344,15 @@ mod test {
let plan = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_session_state())
.await
.unwrap();
let expected = r#"Projection: count(prometheus_tsdb_head_series.greptime_value), prometheus_tsdb_head_series.ip, prometheus_tsdb_head_series.greptime_timestamp, series [count(prometheus_tsdb_head_series.greptime_value):Int64, ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), series:Float64;N]
Sort: prometheus_tsdb_head_series.ip ASC NULLS LAST, prometheus_tsdb_head_series.greptime_timestamp ASC NULLS LAST, prometheus_tsdb_head_series.greptime_value ASC NULLS LAST [count(prometheus_tsdb_head_series.greptime_value):Int64, ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), series:Float64;N, greptime_value:Float64;N]
Projection: count(prometheus_tsdb_head_series.greptime_value), prometheus_tsdb_head_series.ip, prometheus_tsdb_head_series.greptime_timestamp, prometheus_tsdb_head_series.greptime_value AS series, prometheus_tsdb_head_series.greptime_value [count(prometheus_tsdb_head_series.greptime_value):Int64, ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), series:Float64;N, greptime_value:Float64;N]
Aggregate: groupBy=[[prometheus_tsdb_head_series.ip, prometheus_tsdb_head_series.greptime_timestamp, prometheus_tsdb_head_series.greptime_value]], aggr=[[count(prometheus_tsdb_head_series.greptime_value)]] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N, count(prometheus_tsdb_head_series.greptime_value):Int64]
PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[greptime_timestamp] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]
PromSeriesNormalize: offset=[0], time index=[greptime_timestamp], filter NaN: [false] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]
PromSeriesDivide: tags=["ip"] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]
Sort: prometheus_tsdb_head_series.ip DESC NULLS LAST, prometheus_tsdb_head_series.greptime_timestamp DESC NULLS LAST [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]
Filter: prometheus_tsdb_head_series.ip ~ Utf8("(10\.0\.160\.237:8080|10\.0\.160\.237:9090)") AND prometheus_tsdb_head_series.greptime_timestamp >= TimestampMillisecond(-1000, None) AND prometheus_tsdb_head_series.greptime_timestamp <= TimestampMillisecond(100001000, None) [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]
TableScan: prometheus_tsdb_head_series [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]"#;
let expected = "Projection: count(prometheus_tsdb_head_series.greptime_value), prometheus_tsdb_head_series.ip, prometheus_tsdb_head_series.greptime_timestamp, series [count(prometheus_tsdb_head_series.greptime_value):Int64, ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), series:Float64;N]\
\n Sort: prometheus_tsdb_head_series.ip ASC NULLS LAST, prometheus_tsdb_head_series.greptime_timestamp ASC NULLS LAST, prometheus_tsdb_head_series.greptime_value ASC NULLS LAST [count(prometheus_tsdb_head_series.greptime_value):Int64, ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), series:Float64;N, greptime_value:Float64;N]\
\n Projection: count(prometheus_tsdb_head_series.greptime_value), prometheus_tsdb_head_series.ip, prometheus_tsdb_head_series.greptime_timestamp, prometheus_tsdb_head_series.greptime_value AS series, prometheus_tsdb_head_series.greptime_value [count(prometheus_tsdb_head_series.greptime_value):Int64, ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), series:Float64;N, greptime_value:Float64;N]\
\n Aggregate: groupBy=[[prometheus_tsdb_head_series.ip, prometheus_tsdb_head_series.greptime_timestamp, prometheus_tsdb_head_series.greptime_value]], aggr=[[count(prometheus_tsdb_head_series.greptime_value)]] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N, count(prometheus_tsdb_head_series.greptime_value):Int64]\
\n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[greptime_timestamp] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
\n PromSeriesDivide: tags=[\"ip\"] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
\n Sort: prometheus_tsdb_head_series.ip ASC NULLS FIRST, prometheus_tsdb_head_series.greptime_timestamp ASC NULLS FIRST [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
\n Filter: prometheus_tsdb_head_series.ip ~ Utf8(\"(10\\.0\\.160\\.237:8080|10\\.0\\.160\\.237:9090)\") AND prometheus_tsdb_head_series.greptime_timestamp >= TimestampMillisecond(-1000, None) AND prometheus_tsdb_head_series.greptime_timestamp <= TimestampMillisecond(100001000, None) [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
\n TableScan: prometheus_tsdb_head_series [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]";
assert_eq!(plan.display_indent_schema().to_string(), expected);
}
@@ -4383,16 +4390,15 @@ mod test {
let plan = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_session_state())
.await
.unwrap();
let expected = r#"Sort: prometheus_tsdb_head_series.greptime_timestamp ASC NULLS LAST [greptime_timestamp:Timestamp(Millisecond, None), quantile(sum(prometheus_tsdb_head_series.greptime_value)):Float64;N]
Aggregate: groupBy=[[prometheus_tsdb_head_series.greptime_timestamp]], aggr=[[quantile(sum(prometheus_tsdb_head_series.greptime_value))]] [greptime_timestamp:Timestamp(Millisecond, None), quantile(sum(prometheus_tsdb_head_series.greptime_value)):Float64;N]
Sort: prometheus_tsdb_head_series.ip ASC NULLS LAST, prometheus_tsdb_head_series.greptime_timestamp ASC NULLS LAST [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), sum(prometheus_tsdb_head_series.greptime_value):Float64;N]
Aggregate: groupBy=[[prometheus_tsdb_head_series.ip, prometheus_tsdb_head_series.greptime_timestamp]], aggr=[[sum(prometheus_tsdb_head_series.greptime_value)]] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), sum(prometheus_tsdb_head_series.greptime_value):Float64;N]
PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[greptime_timestamp] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]
PromSeriesNormalize: offset=[0], time index=[greptime_timestamp], filter NaN: [false] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]
PromSeriesDivide: tags=["ip"] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]
Sort: prometheus_tsdb_head_series.ip DESC NULLS LAST, prometheus_tsdb_head_series.greptime_timestamp DESC NULLS LAST [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]
Filter: prometheus_tsdb_head_series.ip ~ Utf8("(10\.0\.160\.237:8080|10\.0\.160\.237:9090)") AND prometheus_tsdb_head_series.greptime_timestamp >= TimestampMillisecond(-1000, None) AND prometheus_tsdb_head_series.greptime_timestamp <= TimestampMillisecond(100001000, None) [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]
TableScan: prometheus_tsdb_head_series [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]"#;
let expected = "Sort: prometheus_tsdb_head_series.greptime_timestamp ASC NULLS LAST [greptime_timestamp:Timestamp(Millisecond, None), quantile(sum(prometheus_tsdb_head_series.greptime_value)):Float64;N]\
\n Aggregate: groupBy=[[prometheus_tsdb_head_series.greptime_timestamp]], aggr=[[quantile(sum(prometheus_tsdb_head_series.greptime_value))]] [greptime_timestamp:Timestamp(Millisecond, None), quantile(sum(prometheus_tsdb_head_series.greptime_value)):Float64;N]\
\n Sort: prometheus_tsdb_head_series.ip ASC NULLS LAST, prometheus_tsdb_head_series.greptime_timestamp ASC NULLS LAST [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), sum(prometheus_tsdb_head_series.greptime_value):Float64;N]\
\n Aggregate: groupBy=[[prometheus_tsdb_head_series.ip, prometheus_tsdb_head_series.greptime_timestamp]], aggr=[[sum(prometheus_tsdb_head_series.greptime_value)]] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), sum(prometheus_tsdb_head_series.greptime_value):Float64;N]\
\n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[greptime_timestamp] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
\n PromSeriesDivide: tags=[\"ip\"] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
\n Sort: prometheus_tsdb_head_series.ip ASC NULLS FIRST, prometheus_tsdb_head_series.greptime_timestamp ASC NULLS FIRST [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
\n Filter: prometheus_tsdb_head_series.ip ~ Utf8(\"(10\\.0\\.160\\.237:8080|10\\.0\\.160\\.237:9090)\") AND prometheus_tsdb_head_series.greptime_timestamp >= TimestampMillisecond(-1000, None) AND prometheus_tsdb_head_series.greptime_timestamp <= TimestampMillisecond(100001000, None) [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
\n TableScan: prometheus_tsdb_head_series [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]";
assert_eq!(plan.display_indent_schema().to_string(), expected);
}

View File

@@ -47,6 +47,7 @@ use table::TableRef;
use crate::dist_plan::{DistExtensionPlanner, DistPlannerAnalyzer, MergeSortExtensionPlanner};
use crate::optimizer::count_wildcard::CountWildcardToTimeIndexRule;
use crate::optimizer::parallelize_scan::ParallelizeScan;
use crate::optimizer::pass_distribution::PassDistribution;
use crate::optimizer::remove_duplicate::RemoveDuplicate;
use crate::optimizer::scan_hint::ScanHintRule;
use crate::optimizer::string_normalization::StringNormalizationRule;
@@ -128,6 +129,10 @@ impl QueryEngineState {
physical_optimizer
.rules
.insert(0, Arc::new(ParallelizeScan));
// Pass distribution requirement to MergeScanExec to avoid unnecessary shuffling
physical_optimizer
.rules
.insert(1, Arc::new(PassDistribution));
// Add rule for windowed sort
physical_optimizer
.rules

View File

@@ -73,12 +73,19 @@ impl RegionScanExec {
}
let metadata = scanner.metadata();
let mut pk_columns: Vec<PhysicalSortExpr> = metadata
let mut pk_names = metadata
.primary_key_columns()
.map(|col| col.column_schema.name.clone())
.collect::<Vec<_>>();
// workaround for logical table
if scanner.properties().is_logical_region() {
pk_names.sort_unstable();
}
let mut pk_columns: Vec<PhysicalSortExpr> = pk_names
.into_iter()
.filter_map(|col| {
Some(PhysicalSortExpr::new(
Arc::new(Column::new_with_schema(&col.column_schema.name, &arrow_schema).ok()?)
as _,
Arc::new(Column::new_with_schema(&col, &arrow_schema).ok()?) as _,
SortOptions {
descending: false,
nulls_first: true,

View File

@@ -28,7 +28,6 @@ explain SELECT * FROM demo WHERE ts > cast(1000000000 as timestamp) ORDER BY hos
| logical_plan_| MergeSort: demo.host ASC NULLS LAST_|
|_|_MergeScan [is_placeholder=false]_|
| physical_plan | SortPreservingMergeExec: [host@0 ASC NULLS LAST]_|
|_|_SortExec: expr=[host@0 ASC NULLS LAST], preserve_partitioning=[true]_|
|_|_MergeScanExec: REDACTED
|_|_|
+-+-+

View File

@@ -17,10 +17,7 @@ EXPLAIN SELECT DISTINCT i%2 FROM integers ORDER BY 1;
|_|_MergeScan [is_placeholder=false]_|
| physical_plan | SortPreservingMergeExec: [integers.i % Int64(2)@0 ASC NULLS LAST]_|
|_|_SortExec: expr=[integers.i % Int64(2)@0 ASC NULLS LAST], preserve_partitioning=[true]_|
|_|_AggregateExec: mode=FinalPartitioned, gby=[integers.i % Int64(2)@0 as integers.i % Int64(2)], aggr=[] |
|_|_CoalesceBatchesExec: target_batch_size=8192_|
|_|_RepartitionExec: partitioning=REDACTED
|_|_AggregateExec: mode=Partial, gby=[integers.i % Int64(2)@0 as integers.i % Int64(2)], aggr=[]_|
|_|_AggregateExec: mode=SinglePartitioned, gby=[integers.i % Int64(2)@0 as integers.i % Int64(2)], aggr=[] |
|_|_MergeScanExec: REDACTED
|_|_|
+-+-+
@@ -63,10 +60,7 @@ EXPLAIN SELECT DISTINCT a, b FROM test ORDER BY a, b;
|_|_MergeScan [is_placeholder=false]_|
| physical_plan | SortPreservingMergeExec: [a@0 ASC NULLS LAST, b@1 ASC NULLS LAST]_|
|_|_SortExec: expr=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST], preserve_partitioning=[true] |
|_|_AggregateExec: mode=FinalPartitioned, gby=[a@0 as a, b@1 as b], aggr=[]_|
|_|_CoalesceBatchesExec: target_batch_size=8192_|
|_|_RepartitionExec: partitioning=REDACTED
|_|_AggregateExec: mode=Partial, gby=[a@0 as a, b@1 as b], aggr=[]_|
|_|_AggregateExec: mode=SinglePartitioned, gby=[a@0 as a, b@1 as b], aggr=[]_|
|_|_MergeScanExec: REDACTED
|_|_|
+-+-+

View File

@@ -64,24 +64,21 @@ Error: 3000(PlanQuery), Failed to plan SQL: Error during planning: Order by colu
-- SQLNESS REPLACE (partitioning.*) REDACTED
EXPLAIN SELECT a % 2, b FROM test UNION SELECT a % 2 AS k, b FROM test ORDER BY -1;
+---------------+-----------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+-----------------------------------------------------------------------------------------------------------+
| logical_plan | Sort: Int64(-1) ASC NULLS LAST |
| | Aggregate: groupBy=[[test.a % Int64(2), test.b]], aggr=[[]] |
| | Union |
| | MergeScan [is_placeholder=false] |
| | MergeScan [is_placeholder=false] |
| physical_plan | CoalescePartitionsExec |
| | AggregateExec: mode=FinalPartitioned, gby=[test.a % Int64(2)@0 as test.a % Int64(2), b@1 as b], aggr=[] |
| | CoalesceBatchesExec: target_batch_size=8192 |
| | RepartitionExec: REDACTED
| | AggregateExec: mode=Partial, gby=[test.a % Int64(2)@0 as test.a % Int64(2), b@1 as b], aggr=[] |
| | UnionExec |
| | MergeScanExec: REDACTED
| | MergeScanExec: REDACTED
| | |
+---------------+-----------------------------------------------------------------------------------------------------------+
+---------------+------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+------------------------------------------------------------------------------------------------------------+
| logical_plan | Sort: Int64(-1) ASC NULLS LAST |
| | Aggregate: groupBy=[[test.a % Int64(2), test.b]], aggr=[[]] |
| | Union |
| | MergeScan [is_placeholder=false] |
| | MergeScan [is_placeholder=false] |
| physical_plan | CoalescePartitionsExec |
| | AggregateExec: mode=SinglePartitioned, gby=[test.a % Int64(2)@0 as test.a % Int64(2), b@1 as b], aggr=[] |
| | InterleaveExec |
| | MergeScanExec: REDACTED
| | MergeScanExec: REDACTED
| | |
+---------------+------------------------------------------------------------------------------------------------------------+
SELECT a % 2, b FROM test UNION SELECT a % 2 AS k FROM test ORDER BY -1;

View File

@@ -18,14 +18,10 @@ tql analyze (1, 3, '1s') t1{ a = "a" };
| stage | node | plan_|
+-+-+-+
| 0_| 0_|_PromInstantManipulateExec: range=[1000..3000], lookback=[300000], interval=[1000], time index=[b] REDACTED
|_|_|_PromSeriesNormalizeExec: offset=[0], time index=[b], filter NaN: [false] REDACTED
|_|_|_PromSeriesDivideExec: tags=["a"] REDACTED
|_|_|_SortPreservingMergeExec: [a@0 ASC NULLS LAST] REDACTED
|_|_|_SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true] REDACTED
|_|_|_MergeScanExec: REDACTED
|_|_|_|
| 1_| 0_|_SortExec: expr=[a@0 DESC NULLS LAST, b@1 DESC NULLS LAST], preserve_partitioning=[false] REDACTED
|_|_|_SeqScan: region=REDACTED, partition_count=1 (1 memtable ranges, 0 file 0 ranges), distribution=PerSeries REDACTED
| 1_| 0_|_SeqScan: region=REDACTED, partition_count=1 (1 memtable ranges, 0 file 0 ranges), distribution=PerSeries REDACTED
|_|_|_|
|_|_| Total rows: 3_|
+-+-+-+
@@ -42,14 +38,10 @@ tql analyze (1, 3, '1s') t1{ a =~ ".*" };
| stage | node | plan_|
+-+-+-+
| 0_| 0_|_PromInstantManipulateExec: range=[1000..3000], lookback=[300000], interval=[1000], time index=[b] REDACTED
|_|_|_PromSeriesNormalizeExec: offset=[0], time index=[b], filter NaN: [false] REDACTED
|_|_|_PromSeriesDivideExec: tags=["a"] REDACTED
|_|_|_SortPreservingMergeExec: [a@0 ASC NULLS LAST] REDACTED
|_|_|_SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true] REDACTED
|_|_|_MergeScanExec: REDACTED
|_|_|_|
| 1_| 0_|_SortExec: expr=[a@0 DESC NULLS LAST, b@1 DESC NULLS LAST], preserve_partitioning=[false] REDACTED
|_|_|_SeqScan: region=REDACTED, partition_count=1 (1 memtable ranges, 0 file 0 ranges), distribution=PerSeries REDACTED
| 1_| 0_|_SeqScan: region=REDACTED, partition_count=1 (1 memtable ranges, 0 file 0 ranges), distribution=PerSeries REDACTED
|_|_|_|
|_|_| Total rows: 6_|
+-+-+-+
@@ -66,14 +58,10 @@ tql analyze (1, 3, '1s') t1{ a =~ "a.*" };
| stage | node | plan_|
+-+-+-+
| 0_| 0_|_PromInstantManipulateExec: range=[1000..3000], lookback=[300000], interval=[1000], time index=[b] REDACTED
|_|_|_PromSeriesNormalizeExec: offset=[0], time index=[b], filter NaN: [false] REDACTED
|_|_|_PromSeriesDivideExec: tags=["a"] REDACTED
|_|_|_SortPreservingMergeExec: [a@0 ASC NULLS LAST] REDACTED
|_|_|_SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true] REDACTED
|_|_|_MergeScanExec: REDACTED
|_|_|_|
| 1_| 0_|_SortExec: expr=[a@0 DESC NULLS LAST, b@1 DESC NULLS LAST], preserve_partitioning=[false] REDACTED
|_|_|_SeqScan: region=REDACTED, partition_count=1 (1 memtable ranges, 0 file 0 ranges), distribution=PerSeries REDACTED
| 1_| 0_|_SeqScan: region=REDACTED, partition_count=1 (1 memtable ranges, 0 file 0 ranges), distribution=PerSeries REDACTED
|_|_|_|
|_|_| Total rows: 3_|
+-+-+-+

View File

@@ -20,14 +20,10 @@ TQL ANALYZE (0, 10, '5s') test;
| stage | node | plan_|
+-+-+-+
| 0_| 0_|_PromInstantManipulateExec: range=[0..10000], lookback=[300000], interval=[5000], time index=[j] REDACTED
|_|_|_PromSeriesNormalizeExec: offset=[0], time index=[j], filter NaN: [false] REDACTED
|_|_|_PromSeriesDivideExec: tags=["k"] REDACTED
|_|_|_SortPreservingMergeExec: [k@2 ASC NULLS LAST] REDACTED
|_|_|_SortExec: expr=[k@2 ASC NULLS LAST], preserve_partitioning=[true] REDACTED
|_|_|_MergeScanExec: REDACTED
|_|_|_|
| 1_| 0_|_SortExec: expr=[k@2 DESC NULLS LAST, j@1 DESC NULLS LAST], preserve_partitioning=[false] REDACTED
|_|_|_SeqScan: region=REDACTED, partition_count=1 (1 memtable ranges, 0 file 0 ranges), distribution=PerSeries REDACTED
| 1_| 0_|_SeqScan: region=REDACTED, partition_count=1 (1 memtable ranges, 0 file 0 ranges), distribution=PerSeries REDACTED
|_|_|_|
|_|_| Total rows: 4_|
+-+-+-+
@@ -46,14 +42,10 @@ TQL ANALYZE (0, 10, '1s', '2s') test;
| stage | node | plan_|
+-+-+-+
| 0_| 0_|_PromInstantManipulateExec: range=[0..10000], lookback=[2000], interval=[1000], time index=[j] REDACTED
|_|_|_PromSeriesNormalizeExec: offset=[0], time index=[j], filter NaN: [false] REDACTED
|_|_|_PromSeriesDivideExec: tags=["k"] REDACTED
|_|_|_SortPreservingMergeExec: [k@2 ASC NULLS LAST] REDACTED
|_|_|_SortExec: expr=[k@2 ASC NULLS LAST], preserve_partitioning=[true] REDACTED
|_|_|_MergeScanExec: REDACTED
|_|_|_|
| 1_| 0_|_SortExec: expr=[k@2 DESC NULLS LAST, j@1 DESC NULLS LAST], preserve_partitioning=[false] REDACTED
|_|_|_SeqScan: region=REDACTED, partition_count=1 (1 memtable ranges, 0 file 0 ranges), distribution=PerSeries REDACTED
| 1_| 0_|_SeqScan: region=REDACTED, partition_count=1 (1 memtable ranges, 0 file 0 ranges), distribution=PerSeries REDACTED
|_|_|_|
|_|_| Total rows: 4_|
+-+-+-+
@@ -71,14 +63,10 @@ TQL ANALYZE ('1970-01-01T00:00:00'::timestamp, '1970-01-01T00:00:00'::timestamp
| stage | node | plan_|
+-+-+-+
| 0_| 0_|_PromInstantManipulateExec: range=[0..10000], lookback=[300000], interval=[5000], time index=[j] REDACTED
|_|_|_PromSeriesNormalizeExec: offset=[0], time index=[j], filter NaN: [false] REDACTED
|_|_|_PromSeriesDivideExec: tags=["k"] REDACTED
|_|_|_SortPreservingMergeExec: [k@2 ASC NULLS LAST] REDACTED
|_|_|_SortExec: expr=[k@2 ASC NULLS LAST], preserve_partitioning=[true] REDACTED
|_|_|_MergeScanExec: REDACTED
|_|_|_|
| 1_| 0_|_SortExec: expr=[k@2 DESC NULLS LAST, j@1 DESC NULLS LAST], preserve_partitioning=[false] REDACTED
|_|_|_SeqScan: region=REDACTED, partition_count=1 (1 memtable ranges, 0 file 0 ranges), distribution=PerSeries REDACTED
| 1_| 0_|_SeqScan: region=REDACTED, partition_count=1 (1 memtable ranges, 0 file 0 ranges), distribution=PerSeries REDACTED
|_|_|_|
|_|_| Total rows: 4_|
+-+-+-+
@@ -98,14 +86,10 @@ TQL ANALYZE VERBOSE (0, 10, '5s') test;
| stage | node | plan_|
+-+-+-+
| 0_| 0_|_PromInstantManipulateExec: range=[0..10000], lookback=[300000], interval=[5000], time index=[j] REDACTED
|_|_|_PromSeriesNormalizeExec: offset=[0], time index=[j], filter NaN: [false] REDACTED
|_|_|_PromSeriesDivideExec: tags=["k"] REDACTED
|_|_|_SortPreservingMergeExec: [k@2 ASC NULLS LAST] REDACTED
|_|_|_SortExec: expr=[k@2 ASC NULLS LAST], preserve_partitioning=[true] REDACTED
|_|_|_MergeScanExec: REDACTED
|_|_|_|
| 1_| 0_|_SortExec: expr=[k@2 DESC NULLS LAST, j@1 DESC NULLS LAST], preserve_partitioning=[false] REDACTED
|_|_|_SeqScan: region=REDACTED, partition_count=1 (1 memtable ranges, 0 file 0 ranges), distribution=PerSeries, projection=["i", "j", "k"], filters=[j >= TimestampMillisecond(-300000, None), j <= TimestampMillisecond(310000, None)], REDACTED
| 1_| 0_|_SeqScan: region=REDACTED, partition_count=1 (1 memtable ranges, 0 file 0 ranges), distribution=PerSeries, projection=["i", "j", "k"], filters=[j >= TimestampMillisecond(-300000, None), j <= TimestampMillisecond(310000, None)], REDACTED
|_|_|_|
|_|_| Total rows: 4_|
+-+-+-+
@@ -131,17 +115,12 @@ TQL ANALYZE (0, 10, '5s') test;
| stage | node | plan_|
+-+-+-+
| 0_| 0_|_PromInstantManipulateExec: range=[0..10000], lookback=[300000], interval=[5000], time index=[j] REDACTED
|_|_|_PromSeriesNormalizeExec: offset=[0], time index=[j], filter NaN: [false] REDACTED
|_|_|_PromSeriesDivideExec: tags=["k", "l"] REDACTED
|_|_|_SortPreservingMergeExec: [k@2 ASC NULLS LAST, l@3 ASC NULLS LAST] REDACTED
|_|_|_SortExec: expr=[k@2 ASC NULLS LAST, l@3 ASC NULLS LAST], preserve_partitioning=[true] REDACTED
|_|_|_MergeScanExec: REDACTED
|_|_|_|
| 1_| 0_|_SortExec: expr=[k@2 DESC NULLS LAST, l@3 DESC NULLS LAST, j@1 DESC NULLS LAST], preserve_partitioning=[false] REDACTED
|_|_|_SeqScan: region=REDACTED, partition_count=0 (0 memtable ranges, 0 file 0 ranges), distribution=PerSeries REDACTED
| 1_| 0_|_SeqScan: region=REDACTED, partition_count=0 (0 memtable ranges, 0 file 0 ranges), distribution=PerSeries REDACTED
|_|_|_|
| 1_| 1_|_SortExec: expr=[k@2 DESC NULLS LAST, l@3 DESC NULLS LAST, j@1 DESC NULLS LAST], preserve_partitioning=[false] REDACTED
|_|_|_SeqScan: region=REDACTED, partition_count=0 (0 memtable ranges, 0 file 0 ranges), distribution=PerSeries REDACTED
| 1_| 1_|_SeqScan: region=REDACTED, partition_count=0 (0 memtable ranges, 0 file 0 ranges), distribution=PerSeries REDACTED
|_|_|_|
|_|_| Total rows: 0_|
+-+-+-+

View File

@@ -16,16 +16,12 @@ TQL EXPLAIN (0, 10, '5s') test;
| plan_type | plan |
+---------------+-----------------------------------------------------------------------------------------------+
| logical_plan | PromInstantManipulate: range=[0..0], lookback=[300000], interval=[300000], time index=[j] |
| | PromSeriesNormalize: offset=[0], time index=[j], filter NaN: [false] |
| | PromSeriesDivide: tags=["k"] |
| | Projection: test.i, test.j, test.k |
| | MergeScan [is_placeholder=false] |
| | PromSeriesDivide: tags=["k"] |
| | Projection: test.i, test.j, test.k |
| | MergeScan [is_placeholder=false] |
| physical_plan | PromInstantManipulateExec: range=[0..0], lookback=[300000], interval=[300000], time index=[j] |
| | PromSeriesNormalizeExec: offset=[0], time index=[j], filter NaN: [false] |
| | PromSeriesDivideExec: tags=["k"] |
| | SortPreservingMergeExec: [k@2 ASC NULLS LAST] |
| | SortExec: expr=[k@2 ASC NULLS LAST], preserve_partitioning=[true] |
| | MergeScanExec: REDACTED
| | PromSeriesDivideExec: tags=["k"] |
| | MergeScanExec: REDACTED
| | |
+---------------+-----------------------------------------------------------------------------------------------+
@@ -39,16 +35,12 @@ TQL EXPLAIN (0, 10, '1s', '2s') test;
| plan_type | plan |
+---------------+---------------------------------------------------------------------------------------------+
| logical_plan | PromInstantManipulate: range=[0..0], lookback=[2000], interval=[300000], time index=[j] |
| | PromSeriesNormalize: offset=[0], time index=[j], filter NaN: [false] |
| | PromSeriesDivide: tags=["k"] |
| | Projection: test.i, test.j, test.k |
| | MergeScan [is_placeholder=false] |
| | PromSeriesDivide: tags=["k"] |
| | Projection: test.i, test.j, test.k |
| | MergeScan [is_placeholder=false] |
| physical_plan | PromInstantManipulateExec: range=[0..0], lookback=[2000], interval=[300000], time index=[j] |
| | PromSeriesNormalizeExec: offset=[0], time index=[j], filter NaN: [false] |
| | PromSeriesDivideExec: tags=["k"] |
| | SortPreservingMergeExec: [k@2 ASC NULLS LAST] |
| | SortExec: expr=[k@2 ASC NULLS LAST], preserve_partitioning=[true] |
| | MergeScanExec: REDACTED
| | PromSeriesDivideExec: tags=["k"] |
| | MergeScanExec: REDACTED
| | |
+---------------+---------------------------------------------------------------------------------------------+
@@ -61,16 +53,12 @@ TQL EXPLAIN ('1970-01-01T00:00:00'::timestamp, '1970-01-01T00:00:00'::timestamp
| plan_type | plan |
+---------------+-----------------------------------------------------------------------------------------------+
| logical_plan | PromInstantManipulate: range=[0..0], lookback=[300000], interval=[300000], time index=[j] |
| | PromSeriesNormalize: offset=[0], time index=[j], filter NaN: [false] |
| | PromSeriesDivide: tags=["k"] |
| | Projection: test.i, test.j, test.k |
| | MergeScan [is_placeholder=false] |
| | PromSeriesDivide: tags=["k"] |
| | Projection: test.i, test.j, test.k |
| | MergeScan [is_placeholder=false] |
| physical_plan | PromInstantManipulateExec: range=[0..0], lookback=[300000], interval=[300000], time index=[j] |
| | PromSeriesNormalizeExec: offset=[0], time index=[j], filter NaN: [false] |
| | PromSeriesDivideExec: tags=["k"] |
| | SortPreservingMergeExec: [k@2 ASC NULLS LAST] |
| | SortExec: expr=[k@2 ASC NULLS LAST], preserve_partitioning=[true] |
| | MergeScanExec: REDACTED
| | PromSeriesDivideExec: tags=["k"] |
| | MergeScanExec: REDACTED
| | |
+---------------+-----------------------------------------------------------------------------------------------+
@@ -86,9 +74,8 @@ TQL EXPLAIN VERBOSE (0, 10, '5s') test;
| plan_type_| plan_|
+-+-+
| initial_logical_plan_| PromInstantManipulate: range=[0..0], lookback=[300000], interval=[300000], time index=[j]_|
|_|_PromSeriesNormalize: offset=[0], time index=[j], filter NaN: [false]_|
|_|_PromSeriesDivide: tags=["k"]_|
|_|_Sort: test.k DESC NULLS LAST, test.j DESC NULLS LAST_|
|_|_Sort: test.k ASC NULLS FIRST, test.j ASC NULLS FIRST_|
|_|_Filter: test.j >= TimestampMillisecond(-300000, None) AND test.j <= TimestampMillisecond(300000, None)_|
|_|_TableScan: test_|
| logical_plan after count_wildcard_to_time_index_rule_| SAME TEXT AS ABOVE_|
@@ -98,7 +85,6 @@ TQL EXPLAIN VERBOSE (0, 10, '5s') test;
| logical_plan after resolve_grouping_function_| SAME TEXT AS ABOVE_|
| logical_plan after type_coercion_| SAME TEXT AS ABOVE_|
| logical_plan after DistPlannerAnalyzer_| PromInstantManipulate: range=[0..0], lookback=[300000], interval=[300000], time index=[j]_|
|_|_PromSeriesNormalize: offset=[0], time index=[j], filter NaN: [false]_|
|_|_PromSeriesDivide: tags=["k"]_|
|_|_Projection: test.i, test.j, test.k_|
|_|_MergeScan [is_placeholder=false]_|
@@ -130,37 +116,32 @@ TQL EXPLAIN VERBOSE (0, 10, '5s') test;
| logical_plan after optimize_projections_| SAME TEXT AS ABOVE_|
| logical_plan after ScanHintRule_| SAME TEXT AS ABOVE_|
| logical_plan_| PromInstantManipulate: range=[0..0], lookback=[300000], interval=[300000], time index=[j]_|
|_|_PromSeriesNormalize: offset=[0], time index=[j], filter NaN: [false]_|
|_|_PromSeriesDivide: tags=["k"]_|
|_|_Projection: test.i, test.j, test.k_|
|_|_MergeScan [is_placeholder=false]_|
| initial_physical_plan_| PromInstantManipulateExec: range=[0..0], lookback=[300000], interval=[300000], time index=[j]_|
|_|_PromSeriesNormalizeExec: offset=[0], time index=[j], filter NaN: [false]_|
|_|_PromSeriesDivideExec: tags=["k"]_|
|_|_ProjectionExec: expr=[i@0 as i, j@1 as j, k@2 as k]_|
|_|_MergeScanExec: REDACTED
|_|_|
| initial_physical_plan_with_stats_| PromInstantManipulateExec: range=[0..0], lookback=[300000], interval=[300000], time index=[j], statistics=[Rows=Inexact(0), Bytes=Absent, [(Col[0]:),(Col[1]:),(Col[2]:)]] |
|_|_PromSeriesNormalizeExec: offset=[0], time index=[j], filter NaN: [false], statistics=[Rows=Absent, Bytes=Absent, [(Col[0]:),(Col[1]:),(Col[2]:)]]_|
|_|_PromSeriesDivideExec: tags=["k"], statistics=[Rows=Absent, Bytes=Absent, [(Col[0]:),(Col[1]:),(Col[2]:)]]_|
|_|_ProjectionExec: expr=[i@0 as i, j@1 as j, k@2 as k], statistics=[Rows=Absent, Bytes=Absent, [(Col[0]:),(Col[1]:),(Col[2]:)]]_|
|_|_MergeScanExec: REDACTED
|_|_|
| initial_physical_plan_with_schema_| PromInstantManipulateExec: range=[0..0], lookback=[300000], interval=[300000], time index=[j], schema=[i:Float64;N, j:Timestamp(Millisecond, None), k:Utf8;N]_|
|_|_PromSeriesNormalizeExec: offset=[0], time index=[j], filter NaN: [false], schema=[i:Float64;N, j:Timestamp(Millisecond, None), k:Utf8;N]_|
|_|_PromSeriesDivideExec: tags=["k"], schema=[i:Float64;N, j:Timestamp(Millisecond, None), k:Utf8;N]_|
|_|_ProjectionExec: expr=[i@0 as i, j@1 as j, k@2 as k], schema=[i:Float64;N, j:Timestamp(Millisecond, None), k:Utf8;N]_|
|_|_MergeScanExec: REDACTED
|_|_|
| physical_plan after parallelize_scan_| PromInstantManipulateExec: range=[0..0], lookback=[300000], interval=[300000], time index=[j]_|
|_|_PromSeriesNormalizeExec: offset=[0], time index=[j], filter NaN: [false]_|
|_|_PromSeriesDivideExec: tags=["k"]_|
|_|_ProjectionExec: expr=[i@0 as i, j@1 as j, k@2 as k]_|
|_|_MergeScanExec: REDACTED
|_|_|
| physical_plan after PassDistributionRule_| SAME TEXT AS ABOVE_|
| physical_plan after OutputRequirements_| OutputRequirementExec_|
|_|_PromInstantManipulateExec: range=[0..0], lookback=[300000], interval=[300000], time index=[j]_|
|_|_PromSeriesNormalizeExec: offset=[0], time index=[j], filter NaN: [false]_|
|_|_PromSeriesDivideExec: tags=["k"]_|
|_|_ProjectionExec: expr=[i@0 as i, j@1 as j, k@2 as k]_|
|_|_MergeScanExec: REDACTED
@@ -168,39 +149,18 @@ TQL EXPLAIN VERBOSE (0, 10, '5s') test;
| physical_plan after aggregate_statistics_| SAME TEXT AS ABOVE_|
| physical_plan after join_selection_| SAME TEXT AS ABOVE_|
| physical_plan after LimitedDistinctAggregation_| SAME TEXT AS ABOVE_|
| physical_plan after EnforceDistribution_| OutputRequirementExec_|
|_|_PromInstantManipulateExec: range=[0..0], lookback=[300000], interval=[300000], time index=[j]_|
|_|_PromSeriesNormalizeExec: offset=[0], time index=[j], filter NaN: [false]_|
|_|_PromSeriesDivideExec: tags=["k"]_|
|_|_CoalescePartitionsExec_|
|_|_ProjectionExec: expr=[i@0 as i, j@1 as j, k@2 as k]_|
|_|_MergeScanExec: REDACTED
|_|_|
| physical_plan after EnforceDistribution_| SAME TEXT AS ABOVE_|
| physical_plan after CombinePartialFinalAggregate_| SAME TEXT AS ABOVE_|
| physical_plan after EnforceSorting_| OutputRequirementExec_|
|_|_PromInstantManipulateExec: range=[0..0], lookback=[300000], interval=[300000], time index=[j]_|
|_|_PromSeriesNormalizeExec: offset=[0], time index=[j], filter NaN: [false]_|
|_|_PromSeriesDivideExec: tags=["k"]_|
|_|_SortPreservingMergeExec: [k@2 ASC NULLS LAST]_|
|_|_SortExec: expr=[k@2 ASC NULLS LAST], preserve_partitioning=[true]_|
|_|_ProjectionExec: expr=[i@0 as i, j@1 as j, k@2 as k]_|
|_|_MergeScanExec: REDACTED
|_|_|
| physical_plan after EnforceSorting_| SAME TEXT AS ABOVE_|
| physical_plan after OptimizeAggregateOrder_| SAME TEXT AS ABOVE_|
| physical_plan after ProjectionPushdown_| OutputRequirementExec_|
|_|_PromInstantManipulateExec: range=[0..0], lookback=[300000], interval=[300000], time index=[j]_|
|_|_PromSeriesNormalizeExec: offset=[0], time index=[j], filter NaN: [false]_|
|_|_PromSeriesDivideExec: tags=["k"]_|
|_|_SortPreservingMergeExec: [k@2 ASC NULLS LAST]_|
|_|_SortExec: expr=[k@2 ASC NULLS LAST], preserve_partitioning=[true]_|
|_|_MergeScanExec: REDACTED
|_|_|
| physical_plan after coalesce_batches_| SAME TEXT AS ABOVE_|
| physical_plan after OutputRequirements_| PromInstantManipulateExec: range=[0..0], lookback=[300000], interval=[300000], time index=[j]_|
|_|_PromSeriesNormalizeExec: offset=[0], time index=[j], filter NaN: [false]_|
|_|_PromSeriesDivideExec: tags=["k"]_|
|_|_SortPreservingMergeExec: [k@2 ASC NULLS LAST]_|
|_|_SortExec: expr=[k@2 ASC NULLS LAST], preserve_partitioning=[true]_|
|_|_MergeScanExec: REDACTED
|_|_|
| physical_plan after LimitAggregation_| SAME TEXT AS ABOVE_|
@@ -210,24 +170,15 @@ TQL EXPLAIN VERBOSE (0, 10, '5s') test;
| physical_plan after RemoveDuplicateRule_| SAME TEXT AS ABOVE_|
| physical_plan after SanityCheckPlan_| SAME TEXT AS ABOVE_|
| physical_plan_| PromInstantManipulateExec: range=[0..0], lookback=[300000], interval=[300000], time index=[j]_|
|_|_PromSeriesNormalizeExec: offset=[0], time index=[j], filter NaN: [false]_|
|_|_PromSeriesDivideExec: tags=["k"]_|
|_|_SortPreservingMergeExec: [k@2 ASC NULLS LAST]_|
|_|_SortExec: expr=[k@2 ASC NULLS LAST], preserve_partitioning=[true]_|
|_|_MergeScanExec: REDACTED
|_|_|
| physical_plan_with_stats_| PromInstantManipulateExec: range=[0..0], lookback=[300000], interval=[300000], time index=[j], statistics=[Rows=Inexact(0), Bytes=Absent, [(Col[0]:),(Col[1]:),(Col[2]:)]] |
|_|_PromSeriesNormalizeExec: offset=[0], time index=[j], filter NaN: [false], statistics=[Rows=Absent, Bytes=Absent, [(Col[0]:),(Col[1]:),(Col[2]:)]]_|
|_|_PromSeriesDivideExec: tags=["k"], statistics=[Rows=Absent, Bytes=Absent, [(Col[0]:),(Col[1]:),(Col[2]:)]]_|
|_|_SortPreservingMergeExec: [k@2 ASC NULLS LAST], statistics=[Rows=Absent, Bytes=Absent, [(Col[0]:),(Col[1]:),(Col[2]:)]]_|
|_|_SortExec: expr=[k@2 ASC NULLS LAST], preserve_partitioning=[true], statistics=[Rows=Absent, Bytes=Absent, [(Col[0]:),(Col[1]:),(Col[2]:)]]_|
|_|_MergeScanExec: REDACTED
|_|_|
| physical_plan_with_schema_| PromInstantManipulateExec: range=[0..0], lookback=[300000], interval=[300000], time index=[j], schema=[i:Float64;N, j:Timestamp(Millisecond, None), k:Utf8;N]_|
|_|_PromSeriesNormalizeExec: offset=[0], time index=[j], filter NaN: [false], schema=[i:Float64;N, j:Timestamp(Millisecond, None), k:Utf8;N]_|
|_|_PromSeriesDivideExec: tags=["k"], schema=[i:Float64;N, j:Timestamp(Millisecond, None), k:Utf8;N]_|
|_|_SortPreservingMergeExec: [k@2 ASC NULLS LAST], schema=[i:Float64;N, j:Timestamp(Millisecond, None), k:Utf8;N]_|
|_|_SortExec: expr=[k@2 ASC NULLS LAST], preserve_partitioning=[true], schema=[i:Float64;N, j:Timestamp(Millisecond, None), k:Utf8;N]_|
|_|_MergeScanExec: REDACTED
|_|_|
+-+-+

View File

@@ -152,15 +152,16 @@ INSERT INTO test VALUES (1, 1, "a"), (1, 1, "b"), (2, 2, "a");
Affected Rows: 3
-- SQLNESS SORT_RESULT 2 1
TQL EVAL (0, 10, '5s') test{__field__="Field_I"};
+---------+-------+---------------------+
| Field_I | Tag_K | Ts_J |
+---------+-------+---------------------+
| 2.0 | a | 1970-01-01T00:00:05 |
| 2.0 | a | 1970-01-01T00:00:10 |
| 1.0 | b | 1970-01-01T00:00:05 |
| 1.0 | b | 1970-01-01T00:00:10 |
| 2.0 | a | 1970-01-01T00:00:05 |
| 2.0 | a | 1970-01-01T00:00:10 |
+---------+-------+---------------------+
TQL EVAL (0, 10, '5s') test{__field__="field_i"};

View File

@@ -45,6 +45,7 @@ CREATE TABLE test (`Field_I` DOUBLE, `Ts_J` TIMESTAMP TIME INDEX, `Tag_K` STRING
INSERT INTO test VALUES (1, 1, "a"), (1, 1, "b"), (2, 2, "a");
-- SQLNESS SORT_RESULT 2 1
TQL EVAL (0, 10, '5s') test{__field__="Field_I"};
TQL EVAL (0, 10, '5s') test{__field__="field_i"};