Compare commits

...

11 Commits

Author SHA1 Message Date
discord9
365421c452 Revert "feat: stream drop record metrics"
This reverts commit 3eda4a2257928d95cf9c1328ae44fae84cfbb017.

Signed-off-by: discord9 <discord9@163.com>
2025-07-10 17:23:49 +08:00
discord9
b361c5f50a chore: more dbg
Signed-off-by: discord9 <discord9@163.com>
2025-07-10 17:23:49 +08:00
discord9
5d78bc1efa test: update sqlness
Signed-off-by: discord9 <discord9@163.com>
2025-07-10 17:23:49 +08:00
discord9
99c78b2f97 fix: expand differently
Signed-off-by: discord9 <discord9@163.com>
2025-07-10 17:23:49 +08:00
discord9
bdecdb869e feat: stream drop record metrics
Signed-off-by: discord9 <discord9@163.com>

refactor: move logging to drop too

Signed-off-by: discord9 <discord9@163.com>

fix: drop input stream before collect metrics

Signed-off-by: discord9 <discord9@163.com>
2025-07-10 17:23:49 +08:00
discord9
df4cd157e1 Revert "feat: stream drop record metrics"
This reverts commit 6a16946a5b8ea37557bbb1b600847d24274d6500.

Signed-off-by: discord9 <discord9@163.com>
2025-07-10 17:23:49 +08:00
discord9
60fbe54f90 feat: stream drop record metrics
Signed-off-by: discord9 <discord9@163.com>
2025-07-10 17:23:49 +08:00
discord9
7c15e71407 revert
Signed-off-by: discord9 <discord9@163.com>
2025-07-10 17:23:49 +08:00
discord9
13f20b5a40 add logging to figure test failure
Signed-off-by: discord9 <discord9@163.com>
2025-07-10 17:23:49 +08:00
Ruihang Xia
2b802e45f5 update sqlness result
Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
Signed-off-by: discord9 <discord9@163.com>
2025-07-10 17:23:49 +08:00
Ruihang Xia
def9b7c01d fix: expand on conditional commutative as well
Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
Signed-off-by: discord9 <discord9@163.com>
2025-07-10 17:23:49 +08:00
3 changed files with 127 additions and 1 deletions

View File

@@ -155,7 +155,23 @@ struct PlanRewriter {
/// Partition columns of the table in current pass
partition_cols: Option<Vec<String>>,
column_requirements: HashSet<Column>,
/// Whether to expand on next call
/// This is used to handle the case where a plan is transformed, but need to be expanded from it's
/// parent node. For example a Aggregate plan is split into two parts in frontend and datanode, and need
/// to be expanded from the parent node of the Aggregate plan.
expand_on_next_call: bool,
/// Expanding on next partial/conditional/transformed commutative plan
/// This is used to handle the case where a plan is transformed, but still
/// need to push down as many node as possible before next partial/conditional/transformed commutative
/// plan. I.e.
/// ```
/// Limit:
/// Sort:
/// ```
/// where `Limit` is partial commutative, and `Sort` is conditional commutative.
/// In this case, we need to expand the `Limit` plan,
/// so that we can push down the `Sort` plan as much as possible.
expand_on_next_part_cond_trans_commutative: bool,
new_child_plan: Option<LogicalPlan>,
}
@@ -177,15 +193,38 @@ impl PlanRewriter {
{
return true;
}
if self.expand_on_next_call {
self.expand_on_next_call = false;
return true;
}
if self.expand_on_next_part_cond_trans_commutative {
let comm = Categorizer::check_plan(plan, self.partition_cols.clone());
match comm {
Commutativity::PartialCommutative => {
// a small difference is that for partial commutative, we still need to
// expand on next call(so `Limit` can be pushed down)
self.expand_on_next_part_cond_trans_commutative = false;
self.expand_on_next_call = true;
}
Commutativity::ConditionalCommutative(_)
| Commutativity::TransformedCommutative { .. } => {
// for conditional commutative and transformed commutative, we can
// expand now
self.expand_on_next_part_cond_trans_commutative = false;
return true;
}
_ => (),
}
}
match Categorizer::check_plan(plan, self.partition_cols.clone()) {
Commutativity::Commutative => {}
Commutativity::PartialCommutative => {
if let Some(plan) = partial_commutative_transformer(plan) {
self.update_column_requirements(&plan);
self.expand_on_next_part_cond_trans_commutative = true;
self.stage.push(plan)
}
}
@@ -194,6 +233,7 @@ impl PlanRewriter {
&& let Some(plan) = transformer(plan)
{
self.update_column_requirements(&plan);
self.expand_on_next_part_cond_trans_commutative = true;
self.stage.push(plan)
}
}
@@ -202,7 +242,7 @@ impl PlanRewriter {
&& let Some(transformer_actions) = transformer(plan)
{
debug!(
"PlanRewriter: transformed plan: {:#?}\n from {plan}",
"PlanRewriter: transformed plan: {:?}\n from {plan}",
transformer_actions.extra_parent_plans
);
if let Some(last_stage) = transformer_actions.extra_parent_plans.last() {
@@ -226,6 +266,10 @@ impl PlanRewriter {
}
fn update_column_requirements(&mut self, plan: &LogicalPlan) {
debug!(
"PlanRewriter: update column requirements for plan: {plan}\n withcolumn_requirements: {:?}",
self.column_requirements
);
let mut container = HashSet::new();
for expr in plan.expressions() {
// this method won't fail
@@ -235,6 +279,10 @@ impl PlanRewriter {
for col in container {
self.column_requirements.insert(col);
}
debug!(
"PlanRewriter: updated column requirements: {:?}",
self.column_requirements
);
}
fn is_expanded(&self) -> bool {

View File

@@ -234,3 +234,57 @@ drop table test;
Affected Rows: 0
CREATE TABLE test2 (
"greptime_timestamp" TIMESTAMP(3) NOT NULL,
"greptime_value" DOUBLE NULL,
"shard" STRING NULL INVERTED INDEX,
TIME INDEX ("greptime_timestamp"),
PRIMARY KEY ("shard")
)
PARTITION ON COLUMNS ("shard") (
shard <= '2',
shard > '2'
);
Affected Rows: 0
TQL EVAL sum(test2);
++
++
-- SQLNESS REPLACE (metrics.*) REDACTED
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
-- SQLNESS REPLACE (-+) -
-- SQLNESS REPLACE (\s\s+) _
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
TQL ANALYZE sum(test2);
+-+-+-+
| stage | node | plan_|
+-+-+-+
| 0_| 0_|_SortPreservingMergeExec: [greptime_timestamp@0 ASC NULLS LAST] REDACTED
|_|_|_SortExec: expr=[greptime_timestamp@0 ASC NULLS LAST], preserve_partitioning=[true] REDACTED
|_|_|_AggregateExec: mode=FinalPartitioned, gby=[greptime_timestamp@0 as greptime_timestamp], aggr=[sum(test2.greptime_value)] REDACTED
|_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED
|_|_|_RepartitionExec: partitioning=Hash([greptime_timestamp@0], 20), input_partitions=20 REDACTED
|_|_|_AggregateExec: mode=Partial, gby=[greptime_timestamp@0 as greptime_timestamp], aggr=[sum(test2.greptime_value)] REDACTED
|_|_|_ProjectionExec: expr=[greptime_timestamp@0 as greptime_timestamp, greptime_value@1 as greptime_value] REDACTED
|_|_|_MergeScanExec: REDACTED
|_|_|_|
| 1_| 0_|_PromInstantManipulateExec: range=[0..0], lookback=[300000], interval=[300000], time index=[greptime_timestamp] REDACTED
|_|_|_PromSeriesDivideExec: tags=["shard"] REDACTED
|_|_|_SeriesScan: region=REDACTED, "partition_count":{"count":0, "mem_ranges":0, "files":0, "file_ranges":0}, "distribution":"PerSeries" REDACTED
|_|_|_|
| 1_| 1_|_PromInstantManipulateExec: range=[0..0], lookback=[300000], interval=[300000], time index=[greptime_timestamp] REDACTED
|_|_|_PromSeriesDivideExec: tags=["shard"] REDACTED
|_|_|_SeriesScan: region=REDACTED, "partition_count":{"count":0, "mem_ranges":0, "files":0, "file_ranges":0}, "distribution":"PerSeries" REDACTED
|_|_|_|
|_|_| Total rows: 0_|
+-+-+-+
DROP TABLE test2;
Affected Rows: 0

View File

@@ -95,3 +95,27 @@ TQL ANALYZE VERBOSE FORMAT JSON (0, 10, '5s') test;
TQL ANALYZE FORMAT TEXT (0, 10, '5s') test;
drop table test;
CREATE TABLE test2 (
"greptime_timestamp" TIMESTAMP(3) NOT NULL,
"greptime_value" DOUBLE NULL,
"shard" STRING NULL INVERTED INDEX,
TIME INDEX ("greptime_timestamp"),
PRIMARY KEY ("shard")
)
PARTITION ON COLUMNS ("shard") (
shard <= '2',
shard > '2'
);
TQL EVAL sum(test2);
-- SQLNESS REPLACE (metrics.*) REDACTED
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
-- SQLNESS REPLACE (-+) -
-- SQLNESS REPLACE (\s\s+) _
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
TQL ANALYZE sum(test2);
DROP TABLE test2;