From 0ee41339aa8f927226289c905208a9003bb564b1 Mon Sep 17 00:00:00 2001 From: discord9 <55937128+discord9@users.noreply.github.com> Date: Wed, 8 Jan 2025 14:45:56 +0800 Subject: [PATCH] feat(flow): flow refill state (Part 1) (#5295) * feat(flow): (Part 1) refill utils * chore: after rebase fix * chore: more rebase * rm refill.rs to reduce pr size * chore: simpler args * refactor: per review * docs: more explain for instant requests * refactor: per review --- src/common/meta/src/key/table_info.rs | 7 + src/common/recordbatch/src/recordbatch.rs | 2 +- src/flow/src/adapter.rs | 2 + src/flow/src/adapter/node_context.rs | 40 +++ src/flow/src/adapter/table_source.rs | 33 +++ src/flow/src/error.rs | 8 +- src/flow/src/expr.rs | 11 + src/flow/src/expr/linear.rs | 24 ++ src/flow/src/expr/scalar.rs | 3 + src/flow/src/expr/utils.rs | 340 ++++++++++++++++++++++ src/flow/src/lib.rs | 3 + src/flow/src/plan.rs | 79 ++++- src/flow/src/plan/reduce.rs | 12 +- src/flow/src/server.rs | 40 +++ src/flow/src/test_utils.rs | 142 +++++++++ src/operator/src/insert.rs | 1 + tests/runner/src/env.rs | 24 +- 17 files changed, 756 insertions(+), 15 deletions(-) create mode 100644 src/flow/src/expr/utils.rs create mode 100644 src/flow/src/test_utils.rs diff --git a/src/common/meta/src/key/table_info.rs b/src/common/meta/src/key/table_info.rs index 3ea40d4f1c..0a25a60899 100644 --- a/src/common/meta/src/key/table_info.rs +++ b/src/common/meta/src/key/table_info.rs @@ -190,6 +190,13 @@ impl TableInfoManager { )) } + /// Checks if the table exists. + pub async fn exists(&self, table_id: TableId) -> Result { + let key = TableInfoKey::new(table_id); + let raw_key = key.to_bytes(); + self.kv_backend.exists(&raw_key).await + } + pub async fn get( &self, table_id: TableId, diff --git a/src/common/recordbatch/src/recordbatch.rs b/src/common/recordbatch/src/recordbatch.rs index 4641cc0d9a..901c5bd12b 100644 --- a/src/common/recordbatch/src/recordbatch.rs +++ b/src/common/recordbatch/src/recordbatch.rs @@ -35,7 +35,7 @@ use crate::DfRecordBatch; #[derive(Clone, Debug, PartialEq)] pub struct RecordBatch { pub schema: SchemaRef, - columns: Vec, + pub columns: Vec, df_record_batch: DfRecordBatch, } diff --git a/src/flow/src/adapter.rs b/src/flow/src/adapter.rs index 085096fd95..6e16b7340a 100644 --- a/src/flow/src/adapter.rs +++ b/src/flow/src/adapter.rs @@ -804,6 +804,8 @@ impl FlowWorkerManager { } } + node_ctx.add_flow_plan(flow_id, flow_plan.clone()); + let _ = comment; let _ = flow_options; diff --git a/src/flow/src/adapter/node_context.rs b/src/flow/src/adapter/node_context.rs index 1896534e86..070a3946c2 100644 --- a/src/flow/src/adapter/node_context.rs +++ b/src/flow/src/adapter/node_context.rs @@ -18,6 +18,7 @@ use std::collections::{BTreeMap, BTreeSet, HashMap}; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; +use common_recordbatch::RecordBatch; use common_telemetry::trace; use datatypes::prelude::ConcreteDataType; use session::context::QueryContext; @@ -31,6 +32,7 @@ use crate::error::{Error, EvalSnafu, TableNotFoundSnafu}; use crate::expr::error::InternalSnafu; use crate::expr::{Batch, GlobalId}; use crate::metrics::METRIC_FLOW_INPUT_BUF_SIZE; +use crate::plan::TypedPlan; use crate::repr::{DiffRow, RelationDesc, BATCH_SIZE, BROADCAST_CAP, SEND_BUF_CAP}; /// A context that holds the information of the dataflow @@ -40,6 +42,7 @@ pub struct FlownodeContext { pub source_to_tasks: BTreeMap>, /// mapping from task to sink table, useful for sending data back to the client when a task is done running pub flow_to_sink: BTreeMap, + pub flow_plans: BTreeMap, pub sink_to_flow: BTreeMap, /// broadcast sender for source table, any incoming write request will be sent to the source table's corresponding sender /// @@ -63,6 +66,7 @@ impl FlownodeContext { Self { source_to_tasks: Default::default(), flow_to_sink: Default::default(), + flow_plans: Default::default(), sink_to_flow: Default::default(), source_sender: Default::default(), sink_receiver: Default::default(), @@ -179,6 +183,22 @@ impl SourceSender { Ok(0) } + + /// send record batch + pub async fn send_record_batch(&self, batch: RecordBatch) -> Result { + let row_cnt = batch.num_rows(); + let batch = Batch::from(batch); + + self.send_buf_row_cnt.fetch_add(row_cnt, Ordering::SeqCst); + + self.send_buf_tx.send(batch).await.map_err(|e| { + crate::error::InternalSnafu { + reason: format!("Failed to send batch, error = {:?}", e), + } + .build() + })?; + Ok(row_cnt) + } } impl FlownodeContext { @@ -200,6 +220,16 @@ impl FlownodeContext { sender.send_rows(rows, batch_datatypes).await } + pub async fn send_rb(&self, table_id: TableId, batch: RecordBatch) -> Result { + let sender = self + .source_sender + .get(&table_id) + .with_context(|| TableNotFoundSnafu { + name: table_id.to_string(), + })?; + sender.send_record_batch(batch).await + } + /// flush all sender's buf /// /// return numbers being sent @@ -235,6 +265,15 @@ impl FlownodeContext { self.sink_to_flow.insert(sink_table_name, task_id); } + /// add flow plan to worker context + pub fn add_flow_plan(&mut self, task_id: FlowId, plan: TypedPlan) { + self.flow_plans.insert(task_id, plan); + } + + pub fn get_flow_plan(&self, task_id: &FlowId) -> Option { + self.flow_plans.get(task_id).cloned() + } + /// remove flow from worker context pub fn remove_flow(&mut self, task_id: FlowId) { if let Some(sink_table_name) = self.flow_to_sink.remove(&task_id) { @@ -246,6 +285,7 @@ impl FlownodeContext { self.source_sender.remove(source_table_id); } } + self.flow_plans.remove(&task_id); } /// try add source sender, if already exist, do nothing diff --git a/src/flow/src/adapter/table_source.rs b/src/flow/src/adapter/table_source.rs index bc2e9ec2d5..41ef218feb 100644 --- a/src/flow/src/adapter/table_source.rs +++ b/src/flow/src/adapter/table_source.rs @@ -82,6 +82,31 @@ impl ManagedTableSource { } } + /// Get the time index column from table id + pub async fn get_time_index_column_from_table_id( + &self, + table_id: TableId, + ) -> Result<(usize, datatypes::schema::ColumnSchema), Error> { + let info = self + .table_info_manager + .get(table_id) + .await + .map_err(BoxedError::new) + .context(ExternalSnafu)? + .context(UnexpectedSnafu { + reason: format!("Table id = {:?}, couldn't found table info", table_id), + })?; + let raw_schema = &info.table_info.meta.schema; + let Some(ts_index) = raw_schema.timestamp_index else { + UnexpectedSnafu { + reason: format!("Table id = {:?}, couldn't found timestamp index", table_id), + } + .fail()? + }; + let col_schema = raw_schema.column_schemas[ts_index].clone(); + Ok((ts_index, col_schema)) + } + pub async fn get_table_id_from_proto_name( &self, name: &greptime_proto::v1::TableName, @@ -168,6 +193,14 @@ impl ManagedTableSource { let desc = table_info_value_to_relation_desc(table_info_value)?; Ok((table_name, desc)) } + + pub async fn check_table_exist(&self, table_id: &TableId) -> Result { + self.table_info_manager + .exists(*table_id) + .await + .map_err(BoxedError::new) + .context(ExternalSnafu) + } } impl std::fmt::Debug for ManagedTableSource { diff --git a/src/flow/src/error.rs b/src/flow/src/error.rs index 0374e00506..703b47641c 100644 --- a/src/flow/src/error.rs +++ b/src/flow/src/error.rs @@ -21,7 +21,7 @@ use common_error::{define_into_tonic_status, from_err_code_msg_to_header}; use common_macro::stack_trace_debug; use common_telemetry::common_error::ext::ErrorExt; use common_telemetry::common_error::status_code::StatusCode; -use snafu::{Location, Snafu}; +use snafu::{Location, ResultExt, Snafu}; use tonic::metadata::MetadataMap; use crate::adapter::FlowId; @@ -259,3 +259,9 @@ impl ErrorExt for Error { } define_into_tonic_status!(Error); + +impl From for Error { + fn from(e: EvalError) -> Self { + Err::<(), _>(e).context(EvalSnafu).unwrap_err() + } +} diff --git a/src/flow/src/expr.rs b/src/flow/src/expr.rs index 5dde62b43a..2696959f5f 100644 --- a/src/flow/src/expr.rs +++ b/src/flow/src/expr.rs @@ -22,6 +22,7 @@ mod linear; pub(crate) mod relation; mod scalar; mod signature; +pub(crate) mod utils; use arrow::compute::FilterBuilder; use datatypes::prelude::{ConcreteDataType, DataType}; @@ -54,6 +55,16 @@ pub struct Batch { diffs: Option, } +impl From for Batch { + fn from(value: common_recordbatch::RecordBatch) -> Self { + Self { + row_count: value.num_rows(), + batch: value.columns, + diffs: None, + } + } +} + impl PartialEq for Batch { fn eq(&self, other: &Self) -> bool { let mut batch_eq = true; diff --git a/src/flow/src/expr/linear.rs b/src/flow/src/expr/linear.rs index f96d7827b6..807119a032 100644 --- a/src/flow/src/expr/linear.rs +++ b/src/flow/src/expr/linear.rs @@ -94,6 +94,30 @@ impl MapFilterProject { } } + pub fn get_nth_expr(&self, n: usize) -> Option { + let idx = *self.projection.get(n)?; + if idx < self.input_arity { + Some(ScalarExpr::Column(idx)) + } else { + // find direct ref to input's expr + + let mut expr = self.expressions.get(idx - self.input_arity)?; + loop { + match expr { + ScalarExpr::Column(prev) => { + if *prev < self.input_arity { + return Some(ScalarExpr::Column(*prev)); + } else { + expr = self.expressions.get(*prev - self.input_arity)?; + continue; + } + } + _ => return Some(expr.clone()), + } + } + } + } + /// The number of columns expected in the output row. pub fn output_arity(&self) -> usize { self.projection.len() diff --git a/src/flow/src/expr/scalar.rs b/src/flow/src/expr/scalar.rs index a6e00cce5b..94af5e0e42 100644 --- a/src/flow/src/expr/scalar.rs +++ b/src/flow/src/expr/scalar.rs @@ -311,6 +311,9 @@ impl ScalarExpr { } /// Eval this expression with the given values. + /// + /// TODO(discord9): add tests to make sure `eval_batch` is the same as `eval` in + /// most cases pub fn eval(&self, values: &[Value]) -> Result { match self { ScalarExpr::Column(index) => Ok(values[*index].clone()), diff --git a/src/flow/src/expr/utils.rs b/src/flow/src/expr/utils.rs new file mode 100644 index 0000000000..636d83b72d --- /dev/null +++ b/src/flow/src/expr/utils.rs @@ -0,0 +1,340 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! This module contains utility functions for expressions. + +use std::cmp::Ordering; +use std::collections::BTreeMap; + +use datatypes::value::Value; +use snafu::{ensure, OptionExt}; + +use crate::error::UnexpectedSnafu; +use crate::expr::ScalarExpr; +use crate::plan::TypedPlan; +use crate::Result; + +/// Find lower bound for time `current` in given `plan` for the time window expr. +/// +/// i.e. for time window expr being `date_bin(INTERVAL '5 minutes', ts) as time_window` and `current="2021-07-01 00:01:01.000"`, +/// return `Some("2021-07-01 00:00:00.000")` +/// +/// if `plan` doesn't contain a `TIME INDEX` column, return `None` +pub fn find_plan_time_window_expr_lower_bound( + plan: &TypedPlan, + current: common_time::Timestamp, +) -> Result> { + let typ = plan.schema.typ(); + let Some(mut time_index) = typ.time_index else { + return Ok(None); + }; + + let mut cur_plan = plan; + let mut expr_time_index; + + loop { + // follow upward and find deepest time index expr that is not a column ref + expr_time_index = Some(cur_plan.plan.get_nth_expr(time_index).context( + UnexpectedSnafu { + reason: "Failed to find time index expr", + }, + )?); + + if let Some(ScalarExpr::Column(i)) = expr_time_index { + time_index = i; + } else { + break; + } + if let Some(input) = cur_plan.plan.get_first_input_plan() { + cur_plan = input; + } else { + break; + } + } + + let expr_time_index = expr_time_index.context(UnexpectedSnafu { + reason: "Failed to find time index expr", + })?; + + let ts_col = expr_time_index + .get_all_ref_columns() + .first() + .cloned() + .context(UnexpectedSnafu { + reason: "Failed to find time index column", + })?; + + find_time_window_lower_bound(&expr_time_index, ts_col, current) +} + +/// Find the lower bound of time window in given `expr` and `current` timestamp. +/// +/// i.e. for `current="2021-07-01 00:01:01.000"` and `expr=date_bin(INTERVAL '5 minutes', ts) as time_window` and `ts_col=ts`, +/// return `Some("2021-07-01 00:00:00.000")` since it's the lower bound +/// of current time window given the current timestamp +/// +/// if return None, meaning this time window have no lower bound +pub fn find_time_window_lower_bound( + expr: &ScalarExpr, + ts_col_idx: usize, + current: common_time::Timestamp, +) -> Result> { + let all_ref_columns = expr.get_all_ref_columns(); + + ensure!( + all_ref_columns.contains(&ts_col_idx), + UnexpectedSnafu { + reason: format!( + "Expected column {} to be referenced in expression {expr:?}", + ts_col_idx + ), + } + ); + + ensure!(all_ref_columns.len() == 1, UnexpectedSnafu { + reason: format!( + "Expect only one column to be referenced in expression {expr:?}, found {all_ref_columns:?}" + ), + }); + + let permute_map = BTreeMap::from([(ts_col_idx, 0usize)]); + + let mut rewrote_expr = expr.clone(); + + rewrote_expr.permute_map(&permute_map)?; + + fn eval_to_timestamp(expr: &ScalarExpr, values: &[Value]) -> Result { + let val = expr.eval(values)?; + if let Value::Timestamp(ts) = val { + Ok(ts) + } else { + UnexpectedSnafu { + reason: format!("Expected timestamp in expression {expr:?} but got {val:?}"), + } + .fail()? + } + } + + let cur_time_window = eval_to_timestamp(&rewrote_expr, &[current.into()])?; + + // search to find the lower bound + let mut offset: i64 = 1; + let lower_bound; + let mut upper_bound = Some(current); + // first expontial probe to found a range for binary search + loop { + let Some(next_val) = current.value().checked_sub(offset) else { + // no lower bound + return Ok(None); + }; + + let prev_time_probe = common_time::Timestamp::new(next_val, current.unit()); + + let prev_time_window = eval_to_timestamp(&rewrote_expr, &[prev_time_probe.into()])?; + + match prev_time_window.cmp(&cur_time_window) { + Ordering::Less => { + lower_bound = Some(prev_time_probe); + break; + } + Ordering::Equal => { + upper_bound = Some(prev_time_probe); + } + Ordering::Greater => { + UnexpectedSnafu { + reason: format!( + "Unsupported time window expression {rewrote_expr:?}, expect monotonic increasing for time window expression {expr:?}" + ), + } + .fail()? + } + } + + let Some(new_offset) = offset.checked_mul(2) else { + // no lower bound + return Ok(None); + }; + offset = new_offset; + } + + // binary search for the lower bound + + ensure!(lower_bound.map(|v|v.unit())==upper_bound.map(|v|v.unit()), UnexpectedSnafu{ + reason: format!(" unit mismatch for time window expression {expr:?}, found {lower_bound:?} and {upper_bound:?}"), + }); + + let output_unit = lower_bound.expect("should have lower bound").unit(); + + let mut low = lower_bound.expect("should have lower bound").value(); + let mut high = upper_bound.expect("should have upper bound").value(); + while low < high { + let mid = (low + high) / 2; + let mid_probe = common_time::Timestamp::new(mid, output_unit); + let mid_time_window = eval_to_timestamp(&rewrote_expr, &[mid_probe.into()])?; + + match mid_time_window.cmp(&cur_time_window) { + Ordering::Less => low = mid + 1, + Ordering::Equal => high = mid, + Ordering::Greater => UnexpectedSnafu { + reason: format!("Binary search failed for time window expression {expr:?}"), + } + .fail()?, + } + } + + let final_lower_bound_for_time_window = common_time::Timestamp::new(low, output_unit); + + Ok(Some(final_lower_bound_for_time_window)) +} + +#[cfg(test)] +mod test { + use pretty_assertions::assert_eq; + + use super::*; + use crate::plan::{Plan, TypedPlan}; + use crate::test_utils::{create_test_ctx, create_test_query_engine, sql_to_substrait}; + + #[tokio::test] + async fn test_plan_time_window_lower_bound() { + let testcases = [ + // no time index + ( + "SELECT date_bin('5 minutes', ts) FROM numbers_with_ts;", + "2021-07-01 00:01:01.000", + None, + ), + // time index + ( + "SELECT date_bin('5 minutes', ts) as time_window FROM numbers_with_ts GROUP BY time_window;", + "2021-07-01 00:01:01.000", + Some("2021-07-01 00:00:00.000"), + ), + // time index with other fields + ( + "SELECT sum(number) as sum_up, date_bin('5 minutes', ts) as time_window FROM numbers_with_ts GROUP BY time_window;", + "2021-07-01 00:01:01.000", + Some("2021-07-01 00:00:00.000"), + ), + // time index with other pks + ( + "SELECT number, date_bin('5 minutes', ts) as time_window FROM numbers_with_ts GROUP BY time_window, number;", + "2021-07-01 00:01:01.000", + Some("2021-07-01 00:00:00.000"), + ), + ]; + let engine = create_test_query_engine(); + + for (sql, current, expected) in &testcases { + let plan = sql_to_substrait(engine.clone(), sql).await; + let mut ctx = create_test_ctx(); + let flow_plan = TypedPlan::from_substrait_plan(&mut ctx, &plan) + .await + .unwrap(); + + let current = common_time::Timestamp::from_str(current, None).unwrap(); + + let expected = + expected.map(|expected| common_time::Timestamp::from_str(expected, None).unwrap()); + + assert_eq!( + find_plan_time_window_expr_lower_bound(&flow_plan, current).unwrap(), + expected + ); + } + } + + #[tokio::test] + async fn test_timewindow_lower_bound() { + let testcases = [ + ( + ("'5 minutes'", "ts", Some("2021-07-01 00:00:00.000")), + "2021-07-01 00:01:01.000", + "2021-07-01 00:00:00.000", + ), + ( + ("'5 minutes'", "ts", None), + "2021-07-01 00:01:01.000", + "2021-07-01 00:00:00.000", + ), + ( + ("'5 minutes'", "ts", None), + "2021-07-01 00:00:00.000", + "2021-07-01 00:00:00.000", + ), + // test edge cases + ( + ("'5 minutes'", "ts", None), + "2021-07-01 00:05:00.000", + "2021-07-01 00:05:00.000", + ), + ( + ("'5 minutes'", "ts", None), + "2021-07-01 00:04:59.999", + "2021-07-01 00:00:00.000", + ), + ( + ("'5 minutes'", "ts", None), + "2021-07-01 00:04:59.999999999", + "2021-07-01 00:00:00.000", + ), + ( + ("'5 minutes'", "ts", None), + "2021-07-01 00:04:59.999999999999", + "2021-07-01 00:00:00.000", + ), + ( + ("'5 minutes'", "ts", None), + "2021-07-01 00:04:59.999999999999999", + "2021-07-01 00:00:00.000", + ), + ]; + let engine = create_test_query_engine(); + + for (args, current, expected) in testcases { + let sql = if let Some(origin) = args.2 { + format!( + "SELECT date_bin({}, {}, '{origin}') FROM numbers_with_ts;", + args.0, args.1 + ) + } else { + format!( + "SELECT date_bin({}, {}) FROM numbers_with_ts;", + args.0, args.1 + ) + }; + let plan = sql_to_substrait(engine.clone(), &sql).await; + let mut ctx = create_test_ctx(); + let flow_plan = TypedPlan::from_substrait_plan(&mut ctx, &plan) + .await + .unwrap(); + + let expr = { + let mfp = flow_plan.plan; + let Plan::Mfp { mfp, .. } = mfp else { + unreachable!() + }; + mfp.expressions[0].clone() + }; + + let current = common_time::Timestamp::from_str(current, None).unwrap(); + + let res = find_time_window_lower_bound(&expr, 1, current).unwrap(); + + let expected = Some(common_time::Timestamp::from_str(expected, None).unwrap()); + + assert_eq!(res, expected); + } + } +} diff --git a/src/flow/src/lib.rs b/src/flow/src/lib.rs index 8d6a881fa0..8d07369afa 100644 --- a/src/flow/src/lib.rs +++ b/src/flow/src/lib.rs @@ -38,6 +38,9 @@ mod server; mod transform; mod utils; +#[cfg(test)] +mod test_utils; + pub use adapter::{FlowWorkerManager, FlowWorkerManagerRef, FlownodeOptions}; pub use error::{Error, Result}; pub use server::{FlownodeBuilder, FlownodeInstance, FlownodeServer, FrontendInvoker}; diff --git a/src/flow/src/plan.rs b/src/flow/src/plan.rs index e1cf22e621..099954dc12 100644 --- a/src/flow/src/plan.rs +++ b/src/flow/src/plan.rs @@ -18,8 +18,10 @@ mod join; mod reduce; +use std::collections::BTreeSet; + use crate::error::Error; -use crate::expr::{Id, LocalId, MapFilterProject, SafeMfpPlan, TypedExpr}; +use crate::expr::{GlobalId, Id, LocalId, MapFilterProject, SafeMfpPlan, ScalarExpr, TypedExpr}; use crate::plan::join::JoinPlan; pub(crate) use crate::plan::reduce::{AccumulablePlan, AggrWithIndex, KeyValPlan, ReducePlan}; use crate::repr::{DiffRow, RelationDesc}; @@ -189,3 +191,78 @@ impl Plan { TypedPlan { schema, plan: self } } } + +impl Plan { + /// Get nth expr using column ref + pub fn get_nth_expr(&self, n: usize) -> Option { + match self { + Self::Mfp { mfp, .. } => mfp.get_nth_expr(n), + Self::Reduce { key_val_plan, .. } => key_val_plan.get_nth_expr(n), + _ => None, + } + } + + /// Get the first input plan if exists + pub fn get_first_input_plan(&self) -> Option<&TypedPlan> { + match self { + Plan::Let { value, .. } => Some(value), + Plan::Mfp { input, .. } => Some(input), + Plan::Reduce { input, .. } => Some(input), + Plan::Join { inputs, .. } => inputs.first(), + Plan::Union { inputs, .. } => inputs.first(), + _ => None, + } + } + + /// Get mutable ref to the first input plan if exists + pub fn get_mut_first_input_plan(&mut self) -> Option<&mut TypedPlan> { + match self { + Plan::Let { value, .. } => Some(value), + Plan::Mfp { input, .. } => Some(input), + Plan::Reduce { input, .. } => Some(input), + Plan::Join { inputs, .. } => inputs.first_mut(), + Plan::Union { inputs, .. } => inputs.first_mut(), + _ => None, + } + } + + /// Find all the used collection in the plan + pub fn find_used_collection(&self) -> BTreeSet { + fn recur_find_use(plan: &Plan, used: &mut BTreeSet) { + match plan { + Plan::Get { id } => { + match id { + Id::Local(_) => (), + Id::Global(g) => { + used.insert(*g); + } + }; + } + Plan::Let { value, body, .. } => { + recur_find_use(&value.plan, used); + recur_find_use(&body.plan, used); + } + Plan::Mfp { input, .. } => { + recur_find_use(&input.plan, used); + } + Plan::Reduce { input, .. } => { + recur_find_use(&input.plan, used); + } + Plan::Join { inputs, .. } => { + for input in inputs { + recur_find_use(&input.plan, used); + } + } + Plan::Union { inputs, .. } => { + for input in inputs { + recur_find_use(&input.plan, used); + } + } + _ => {} + } + } + let mut ret = Default::default(); + recur_find_use(self, &mut ret); + ret + } +} diff --git a/src/flow/src/plan/reduce.rs b/src/flow/src/plan/reduce.rs index 1edd0c40dd..65a83756b5 100644 --- a/src/flow/src/plan/reduce.rs +++ b/src/flow/src/plan/reduce.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use crate::expr::{AggregateExpr, SafeMfpPlan}; +use crate::expr::{AggregateExpr, SafeMfpPlan, ScalarExpr}; /// Describe how to extract key-value pair from a `Row` #[derive(Debug, Clone, Eq, PartialEq, Ord, PartialOrd)] @@ -23,6 +23,16 @@ pub struct KeyValPlan { pub val_plan: SafeMfpPlan, } +impl KeyValPlan { + /// Get nth expr using column ref + pub fn get_nth_expr(&self, n: usize) -> Option { + self.key_plan.get_nth_expr(n).or_else(|| { + self.val_plan + .get_nth_expr(n - self.key_plan.projection.len()) + }) + } +} + /// TODO(discord9): def&impl of Hierarchical aggregates(for min/max with support to deletion) and /// basic aggregates(for other aggregate functions) and mixed aggregate #[derive(Debug, Clone, Eq, PartialEq, Ord, PartialOrd)] diff --git a/src/flow/src/server.rs b/src/flow/src/server.rs index 3ed8b7efa9..2cc2f5644f 100644 --- a/src/flow/src/server.rs +++ b/src/flow/src/server.rs @@ -440,6 +440,7 @@ impl FlownodeBuilder { } } +#[derive(Clone)] pub struct FrontendInvoker { inserter: Arc, deleter: Arc, @@ -549,3 +550,42 @@ impl FrontendInvoker { self.statement_executor.clone() } } + +/// get all flow ids in this flownode +pub(crate) async fn get_all_flow_ids( + flow_metadata_manager: &FlowMetadataManagerRef, + catalog_manager: &CatalogManagerRef, + nodeid: Option, +) -> Result, Error> { + let ret = if let Some(nodeid) = nodeid { + let flow_ids_one_node = flow_metadata_manager + .flownode_flow_manager() + .flows(nodeid) + .try_collect::>() + .await + .context(ListFlowsSnafu { id: Some(nodeid) })?; + flow_ids_one_node.into_iter().map(|(id, _)| id).collect() + } else { + let all_catalogs = catalog_manager + .catalog_names() + .await + .map_err(BoxedError::new) + .context(ExternalSnafu)?; + let mut all_flow_ids = vec![]; + for catalog in all_catalogs { + let flows = flow_metadata_manager + .flow_name_manager() + .flow_names(&catalog) + .await + .try_collect::>() + .await + .map_err(BoxedError::new) + .context(ExternalSnafu)?; + + all_flow_ids.extend(flows.into_iter().map(|(_, id)| id.flow_id())); + } + all_flow_ids + }; + + Ok(ret) +} diff --git a/src/flow/src/test_utils.rs b/src/flow/src/test_utils.rs new file mode 100644 index 0000000000..3cc42efda5 --- /dev/null +++ b/src/flow/src/test_utils.rs @@ -0,0 +1,142 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use catalog::RegisterTableRequest; +use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, NUMBERS_TABLE_ID}; +use datatypes::data_type::ConcreteDataType as CDT; +use datatypes::prelude::*; +use datatypes::schema::Schema; +use datatypes::timestamp::TimestampMillisecond; +use datatypes::vectors::{TimestampMillisecondVectorBuilder, VectorRef}; +use itertools::Itertools; +use prost::Message; +use query::parser::QueryLanguageParser; +use query::query_engine::DefaultSerializer; +use query::QueryEngine; +use session::context::QueryContext; +/// note here we are using the `substrait_proto_df` crate from the `substrait` module and +/// rename it to `substrait_proto` +use substrait::substrait_proto_df as substrait_proto; +use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan}; +use substrait_proto::proto; +use table::table::numbers::{NumbersTable, NUMBERS_TABLE_NAME}; +use table::test_util::MemTable; + +use crate::adapter::node_context::IdToNameMap; +use crate::adapter::table_source::test::FlowDummyTableSource; +use crate::adapter::FlownodeContext; +use crate::df_optimizer::apply_df_optimizer; +use crate::expr::GlobalId; +use crate::transform::register_function_to_query_engine; + +pub fn create_test_ctx() -> FlownodeContext { + let mut tri_map = IdToNameMap::new(); + { + let gid = GlobalId::User(0); + let name = [ + "greptime".to_string(), + "public".to_string(), + "numbers".to_string(), + ]; + tri_map.insert(Some(name.clone()), Some(1024), gid); + } + + { + let gid = GlobalId::User(1); + let name = [ + "greptime".to_string(), + "public".to_string(), + "numbers_with_ts".to_string(), + ]; + tri_map.insert(Some(name.clone()), Some(1025), gid); + } + + let dummy_source = FlowDummyTableSource::default(); + + let mut ctx = FlownodeContext::new(Box::new(dummy_source)); + ctx.table_repr = tri_map; + ctx.query_context = Some(Arc::new(QueryContext::with("greptime", "public"))); + + ctx +} + +pub fn create_test_query_engine() -> Arc { + let catalog_list = catalog::memory::new_memory_catalog_manager().unwrap(); + let req = RegisterTableRequest { + catalog: DEFAULT_CATALOG_NAME.to_string(), + schema: DEFAULT_SCHEMA_NAME.to_string(), + table_name: NUMBERS_TABLE_NAME.to_string(), + table_id: NUMBERS_TABLE_ID, + table: NumbersTable::table(NUMBERS_TABLE_ID), + }; + catalog_list.register_table_sync(req).unwrap(); + + let schema = vec![ + datatypes::schema::ColumnSchema::new("number", CDT::uint32_datatype(), false), + datatypes::schema::ColumnSchema::new("ts", CDT::timestamp_millisecond_datatype(), false), + ]; + let mut columns = vec![]; + let numbers = (1..=10).collect_vec(); + let column: VectorRef = Arc::new(::VectorType::from_vec(numbers)); + columns.push(column); + + let ts = (1..=10).collect_vec(); + let mut builder = TimestampMillisecondVectorBuilder::with_capacity(10); + ts.into_iter() + .map(|v| builder.push(Some(TimestampMillisecond::new(v)))) + .count(); + let column: VectorRef = builder.to_vector_cloned(); + columns.push(column); + + let schema = Arc::new(Schema::new(schema)); + let recordbatch = common_recordbatch::RecordBatch::new(schema, columns).unwrap(); + let table = MemTable::table("numbers_with_ts", recordbatch); + + let req_with_ts = RegisterTableRequest { + catalog: DEFAULT_CATALOG_NAME.to_string(), + schema: DEFAULT_SCHEMA_NAME.to_string(), + table_name: "numbers_with_ts".to_string(), + table_id: 1024, + table, + }; + catalog_list.register_table_sync(req_with_ts).unwrap(); + + let factory = query::QueryEngineFactory::new(catalog_list, None, None, None, None, false); + + let engine = factory.query_engine(); + register_function_to_query_engine(&engine); + + assert_eq!("datafusion", engine.name()); + engine +} + +pub async fn sql_to_substrait(engine: Arc, sql: &str) -> proto::Plan { + // let engine = create_test_query_engine(); + let stmt = QueryLanguageParser::parse_sql(sql, &QueryContext::arc()).unwrap(); + let plan = engine + .planner() + .plan(&stmt, QueryContext::arc()) + .await + .unwrap(); + let plan = apply_df_optimizer(plan).await.unwrap(); + + // encode then decode so to rely on the impl of conversion from logical plan to substrait plan + let bytes = DFLogicalSubstraitConvertor {} + .encode(&plan, DefaultSerializer) + .unwrap(); + + proto::Plan::decode(bytes).unwrap() +} diff --git a/src/operator/src/insert.rs b/src/operator/src/insert.rs index 2dc5d98e41..1f7964bf8c 100644 --- a/src/operator/src/insert.rs +++ b/src/operator/src/insert.rs @@ -105,6 +105,7 @@ pub struct InstantAndNormalInsertRequests { /// Requests with normal ttl. pub normal_requests: RegionInsertRequests, /// Requests with ttl=instant. + /// Will be discarded immediately at frontend, wouldn't even insert into memtable, and only sent to flow node if needed. pub instant_requests: RegionInsertRequests, } diff --git a/tests/runner/src/env.rs b/tests/runner/src/env.rs index 2782661c25..7c35e79fba 100644 --- a/tests/runner/src/env.rs +++ b/tests/runner/src/env.rs @@ -498,6 +498,7 @@ impl Env { Env::stop_server(server_process); } } + if is_full_restart { if let Some(mut metasrv_process) = db.metasrv_process.lock().expect("poisoned lock").take() @@ -509,11 +510,12 @@ impl Env { { Env::stop_server(&mut frontend_process); } - if let Some(mut flownode_process) = - db.flownode_process.lock().expect("poisoned lock").take() - { - Env::stop_server(&mut flownode_process); - } + } + + if let Some(mut flownode_process) = + db.flownode_process.lock().expect("poisoned lock").take() + { + Env::stop_server(&mut flownode_process); } } @@ -547,13 +549,13 @@ impl Env { .lock() .expect("lock poisoned") .replace(frontend); - - let flownode = self.start_server("flownode", &db.ctx, false).await; - db.flownode_process - .lock() - .expect("lock poisoned") - .replace(flownode); } + let flownode = self.start_server("flownode", &db.ctx, false).await; + db.flownode_process + .lock() + .expect("lock poisoned") + .replace(flownode); + processes };