mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2025-12-23 06:30:05 +00:00
Compare commits
11 Commits
feature/df
...
fix-dist-p
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
365421c452 | ||
|
|
b361c5f50a | ||
|
|
5d78bc1efa | ||
|
|
99c78b2f97 | ||
|
|
bdecdb869e | ||
|
|
df4cd157e1 | ||
|
|
60fbe54f90 | ||
|
|
7c15e71407 | ||
|
|
13f20b5a40 | ||
|
|
2b802e45f5 | ||
|
|
def9b7c01d |
@@ -155,7 +155,23 @@ struct PlanRewriter {
|
||||
/// Partition columns of the table in current pass
|
||||
partition_cols: Option<Vec<String>>,
|
||||
column_requirements: HashSet<Column>,
|
||||
/// Whether to expand on next call
|
||||
/// This is used to handle the case where a plan is transformed, but need to be expanded from it's
|
||||
/// parent node. For example a Aggregate plan is split into two parts in frontend and datanode, and need
|
||||
/// to be expanded from the parent node of the Aggregate plan.
|
||||
expand_on_next_call: bool,
|
||||
/// Expanding on next partial/conditional/transformed commutative plan
|
||||
/// This is used to handle the case where a plan is transformed, but still
|
||||
/// need to push down as many node as possible before next partial/conditional/transformed commutative
|
||||
/// plan. I.e.
|
||||
/// ```
|
||||
/// Limit:
|
||||
/// Sort:
|
||||
/// ```
|
||||
/// where `Limit` is partial commutative, and `Sort` is conditional commutative.
|
||||
/// In this case, we need to expand the `Limit` plan,
|
||||
/// so that we can push down the `Sort` plan as much as possible.
|
||||
expand_on_next_part_cond_trans_commutative: bool,
|
||||
new_child_plan: Option<LogicalPlan>,
|
||||
}
|
||||
|
||||
@@ -177,15 +193,38 @@ impl PlanRewriter {
|
||||
{
|
||||
return true;
|
||||
}
|
||||
|
||||
if self.expand_on_next_call {
|
||||
self.expand_on_next_call = false;
|
||||
return true;
|
||||
}
|
||||
|
||||
if self.expand_on_next_part_cond_trans_commutative {
|
||||
let comm = Categorizer::check_plan(plan, self.partition_cols.clone());
|
||||
match comm {
|
||||
Commutativity::PartialCommutative => {
|
||||
// a small difference is that for partial commutative, we still need to
|
||||
// expand on next call(so `Limit` can be pushed down)
|
||||
self.expand_on_next_part_cond_trans_commutative = false;
|
||||
self.expand_on_next_call = true;
|
||||
}
|
||||
Commutativity::ConditionalCommutative(_)
|
||||
| Commutativity::TransformedCommutative { .. } => {
|
||||
// for conditional commutative and transformed commutative, we can
|
||||
// expand now
|
||||
self.expand_on_next_part_cond_trans_commutative = false;
|
||||
return true;
|
||||
}
|
||||
_ => (),
|
||||
}
|
||||
}
|
||||
|
||||
match Categorizer::check_plan(plan, self.partition_cols.clone()) {
|
||||
Commutativity::Commutative => {}
|
||||
Commutativity::PartialCommutative => {
|
||||
if let Some(plan) = partial_commutative_transformer(plan) {
|
||||
self.update_column_requirements(&plan);
|
||||
self.expand_on_next_part_cond_trans_commutative = true;
|
||||
self.stage.push(plan)
|
||||
}
|
||||
}
|
||||
@@ -194,6 +233,7 @@ impl PlanRewriter {
|
||||
&& let Some(plan) = transformer(plan)
|
||||
{
|
||||
self.update_column_requirements(&plan);
|
||||
self.expand_on_next_part_cond_trans_commutative = true;
|
||||
self.stage.push(plan)
|
||||
}
|
||||
}
|
||||
@@ -202,7 +242,7 @@ impl PlanRewriter {
|
||||
&& let Some(transformer_actions) = transformer(plan)
|
||||
{
|
||||
debug!(
|
||||
"PlanRewriter: transformed plan: {:#?}\n from {plan}",
|
||||
"PlanRewriter: transformed plan: {:?}\n from {plan}",
|
||||
transformer_actions.extra_parent_plans
|
||||
);
|
||||
if let Some(last_stage) = transformer_actions.extra_parent_plans.last() {
|
||||
@@ -226,6 +266,10 @@ impl PlanRewriter {
|
||||
}
|
||||
|
||||
fn update_column_requirements(&mut self, plan: &LogicalPlan) {
|
||||
debug!(
|
||||
"PlanRewriter: update column requirements for plan: {plan}\n withcolumn_requirements: {:?}",
|
||||
self.column_requirements
|
||||
);
|
||||
let mut container = HashSet::new();
|
||||
for expr in plan.expressions() {
|
||||
// this method won't fail
|
||||
@@ -235,6 +279,10 @@ impl PlanRewriter {
|
||||
for col in container {
|
||||
self.column_requirements.insert(col);
|
||||
}
|
||||
debug!(
|
||||
"PlanRewriter: updated column requirements: {:?}",
|
||||
self.column_requirements
|
||||
);
|
||||
}
|
||||
|
||||
fn is_expanded(&self) -> bool {
|
||||
|
||||
@@ -234,3 +234,57 @@ drop table test;
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
CREATE TABLE test2 (
|
||||
"greptime_timestamp" TIMESTAMP(3) NOT NULL,
|
||||
"greptime_value" DOUBLE NULL,
|
||||
"shard" STRING NULL INVERTED INDEX,
|
||||
TIME INDEX ("greptime_timestamp"),
|
||||
PRIMARY KEY ("shard")
|
||||
)
|
||||
PARTITION ON COLUMNS ("shard") (
|
||||
shard <= '2',
|
||||
shard > '2'
|
||||
);
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
TQL EVAL sum(test2);
|
||||
|
||||
++
|
||||
++
|
||||
|
||||
-- SQLNESS REPLACE (metrics.*) REDACTED
|
||||
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
|
||||
-- SQLNESS REPLACE (-+) -
|
||||
-- SQLNESS REPLACE (\s\s+) _
|
||||
-- SQLNESS REPLACE (peers.*) REDACTED
|
||||
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
|
||||
TQL ANALYZE sum(test2);
|
||||
|
||||
+-+-+-+
|
||||
| stage | node | plan_|
|
||||
+-+-+-+
|
||||
| 0_| 0_|_SortPreservingMergeExec: [greptime_timestamp@0 ASC NULLS LAST] REDACTED
|
||||
|_|_|_SortExec: expr=[greptime_timestamp@0 ASC NULLS LAST], preserve_partitioning=[true] REDACTED
|
||||
|_|_|_AggregateExec: mode=FinalPartitioned, gby=[greptime_timestamp@0 as greptime_timestamp], aggr=[sum(test2.greptime_value)] REDACTED
|
||||
|_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED
|
||||
|_|_|_RepartitionExec: partitioning=Hash([greptime_timestamp@0], 20), input_partitions=20 REDACTED
|
||||
|_|_|_AggregateExec: mode=Partial, gby=[greptime_timestamp@0 as greptime_timestamp], aggr=[sum(test2.greptime_value)] REDACTED
|
||||
|_|_|_ProjectionExec: expr=[greptime_timestamp@0 as greptime_timestamp, greptime_value@1 as greptime_value] REDACTED
|
||||
|_|_|_MergeScanExec: REDACTED
|
||||
|_|_|_|
|
||||
| 1_| 0_|_PromInstantManipulateExec: range=[0..0], lookback=[300000], interval=[300000], time index=[greptime_timestamp] REDACTED
|
||||
|_|_|_PromSeriesDivideExec: tags=["shard"] REDACTED
|
||||
|_|_|_SeriesScan: region=REDACTED, "partition_count":{"count":0, "mem_ranges":0, "files":0, "file_ranges":0}, "distribution":"PerSeries" REDACTED
|
||||
|_|_|_|
|
||||
| 1_| 1_|_PromInstantManipulateExec: range=[0..0], lookback=[300000], interval=[300000], time index=[greptime_timestamp] REDACTED
|
||||
|_|_|_PromSeriesDivideExec: tags=["shard"] REDACTED
|
||||
|_|_|_SeriesScan: region=REDACTED, "partition_count":{"count":0, "mem_ranges":0, "files":0, "file_ranges":0}, "distribution":"PerSeries" REDACTED
|
||||
|_|_|_|
|
||||
|_|_| Total rows: 0_|
|
||||
+-+-+-+
|
||||
|
||||
DROP TABLE test2;
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
|
||||
@@ -95,3 +95,27 @@ TQL ANALYZE VERBOSE FORMAT JSON (0, 10, '5s') test;
|
||||
TQL ANALYZE FORMAT TEXT (0, 10, '5s') test;
|
||||
|
||||
drop table test;
|
||||
|
||||
CREATE TABLE test2 (
|
||||
"greptime_timestamp" TIMESTAMP(3) NOT NULL,
|
||||
"greptime_value" DOUBLE NULL,
|
||||
"shard" STRING NULL INVERTED INDEX,
|
||||
TIME INDEX ("greptime_timestamp"),
|
||||
PRIMARY KEY ("shard")
|
||||
)
|
||||
PARTITION ON COLUMNS ("shard") (
|
||||
shard <= '2',
|
||||
shard > '2'
|
||||
);
|
||||
|
||||
TQL EVAL sum(test2);
|
||||
|
||||
-- SQLNESS REPLACE (metrics.*) REDACTED
|
||||
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
|
||||
-- SQLNESS REPLACE (-+) -
|
||||
-- SQLNESS REPLACE (\s\s+) _
|
||||
-- SQLNESS REPLACE (peers.*) REDACTED
|
||||
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
|
||||
TQL ANALYZE sum(test2);
|
||||
|
||||
DROP TABLE test2;
|
||||
|
||||
Reference in New Issue
Block a user