diff --git a/Cargo.lock b/Cargo.lock index 1c7ddf1b40..70b59d27ca 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1949,6 +1949,7 @@ dependencies = [ "serde_json", "session", "snafu 0.8.3", + "sql", "statrs", "store-api", "table", @@ -3896,6 +3897,7 @@ dependencies = [ "common-datasource", "common-error", "common-frontend", + "common-function", "common-grpc", "common-macro", "common-meta", @@ -4232,7 +4234,7 @@ checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" [[package]] name = "greptime-proto" version = "0.1.0" -source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=5c801650435d464891114502539b701c77a1b914#5c801650435d464891114502539b701c77a1b914" +source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=7ca323090b3ae8faf2c15036b7f41b7c5225cf5f#7ca323090b3ae8faf2c15036b7f41b7c5225cf5f" dependencies = [ "prost 0.12.6", "serde", diff --git a/Cargo.toml b/Cargo.toml index 884960bd62..1d171c95fe 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -119,7 +119,7 @@ etcd-client = { version = "0.13" } fst = "0.4.7" futures = "0.3" futures-util = "0.3" -greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "5c801650435d464891114502539b701c77a1b914" } +greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "7ca323090b3ae8faf2c15036b7f41b7c5225cf5f" } humantime = "2.1" humantime-serde = "1.1" itertools = "0.10" diff --git a/src/cmd/src/cli/repl.rs b/src/cmd/src/cli/repl.rs index 66405212e6..6d7e211d7c 100644 --- a/src/cmd/src/cli/repl.rs +++ b/src/cmd/src/cli/repl.rs @@ -288,6 +288,7 @@ async fn create_query_engine(meta_addr: &str) -> Result { None, None, None, + None, false, plugins.clone(), )); diff --git a/src/common/function/Cargo.toml b/src/common/function/Cargo.toml index ae71af6d8f..67eda1c7a4 100644 --- a/src/common/function/Cargo.toml +++ b/src/common/function/Cargo.toml @@ -31,6 +31,7 @@ serde.workspace = true serde_json.workspace = true session.workspace = true snafu.workspace = true +sql.workspace = true statrs = "0.16" store-api.workspace = true table.workspace = true diff --git a/src/common/function/src/flush_flow.rs b/src/common/function/src/flush_flow.rs new file mode 100644 index 0000000000..27944547c3 --- /dev/null +++ b/src/common/function/src/flush_flow.rs @@ -0,0 +1,164 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use common_error::ext::BoxedError; +use common_macro::admin_fn; +use common_query::error::{ + ExecuteSnafu, InvalidFuncArgsSnafu, MissingFlowServiceHandlerSnafu, Result, + UnsupportedInputDataTypeSnafu, +}; +use common_query::prelude::Signature; +use datafusion::logical_expr::Volatility; +use datatypes::value::{Value, ValueRef}; +use session::context::QueryContextRef; +use snafu::{ensure, ResultExt}; +use sql::parser::ParserContext; +use store_api::storage::ConcreteDataType; + +use crate::handlers::FlowServiceHandlerRef; + +fn flush_signature() -> Signature { + Signature::uniform( + 1, + vec![ConcreteDataType::string_datatype()], + Volatility::Immutable, + ) +} + +#[admin_fn( + name = FlushFlowFunction, + display_name = flush_flow, + sig_fn = flush_signature, + ret = uint64 +)] +pub(crate) async fn flush_flow( + flow_service_handler: &FlowServiceHandlerRef, + query_ctx: &QueryContextRef, + params: &[ValueRef<'_>], +) -> Result { + let (catalog_name, flow_name) = parse_flush_flow(params, query_ctx)?; + + let res = flow_service_handler + .flush(&catalog_name, &flow_name, query_ctx.clone()) + .await?; + let affected_rows = res.affected_rows; + + Ok(Value::from(affected_rows)) +} + +fn parse_flush_flow( + params: &[ValueRef<'_>], + query_ctx: &QueryContextRef, +) -> Result<(String, String)> { + ensure!( + params.len() == 1, + InvalidFuncArgsSnafu { + err_msg: format!( + "The length of the args is not correct, expect 1, have: {}", + params.len() + ), + } + ); + + let ValueRef::String(flow_name) = params[0] else { + return UnsupportedInputDataTypeSnafu { + function: "flush_flow", + datatypes: params.iter().map(|v| v.data_type()).collect::>(), + } + .fail(); + }; + let obj_name = ParserContext::parse_table_name(flow_name, query_ctx.sql_dialect()) + .map_err(BoxedError::new) + .context(ExecuteSnafu)?; + + let (catalog_name, flow_name) = match &obj_name.0[..] { + [flow_name] => ( + query_ctx.current_catalog().to_string(), + flow_name.value.clone(), + ), + [catalog, flow_name] => (catalog.value.clone(), flow_name.value.clone()), + _ => { + return InvalidFuncArgsSnafu { + err_msg: format!( + "expect flow name to be . or , actual: {}", + obj_name + ), + } + .fail() + } + }; + Ok((catalog_name, flow_name)) +} + +#[cfg(test)] +mod test { + use std::sync::Arc; + + use datatypes::scalars::ScalarVector; + use datatypes::vectors::StringVector; + use session::context::QueryContext; + + use super::*; + use crate::function::{Function, FunctionContext}; + + #[test] + fn test_flush_flow_metadata() { + let f = FlushFlowFunction; + assert_eq!("flush_flow", f.name()); + assert_eq!( + ConcreteDataType::uint64_datatype(), + f.return_type(&[]).unwrap() + ); + assert_eq!( + f.signature(), + Signature::uniform( + 1, + vec![ConcreteDataType::string_datatype()], + Volatility::Immutable, + ) + ); + } + + #[test] + fn test_missing_flow_service() { + let f = FlushFlowFunction; + + let args = vec!["flow_name"]; + let args = args + .into_iter() + .map(|arg| Arc::new(StringVector::from_slice(&[arg])) as _) + .collect::>(); + + let result = f.eval(FunctionContext::default(), &args).unwrap_err(); + assert_eq!( + "Missing FlowServiceHandler, not expected", + result.to_string() + ); + } + + #[test] + fn test_parse_flow_args() { + let testcases = [ + ("flow_name", ("greptime", "flow_name")), + ("catalog.flow_name", ("catalog", "flow_name")), + ]; + for (input, expected) in testcases.iter() { + let args = vec![*input]; + let args = args.into_iter().map(ValueRef::String).collect::>(); + + let result = parse_flush_flow(&args, &QueryContext::arc()).unwrap(); + assert_eq!(*expected, (result.0.as_str(), result.1.as_str())); + } + } +} diff --git a/src/common/function/src/handlers.rs b/src/common/function/src/handlers.rs index a685b0dbc0..ddc233ed7d 100644 --- a/src/common/function/src/handlers.rs +++ b/src/common/function/src/handlers.rs @@ -65,6 +65,19 @@ pub trait ProcedureServiceHandler: Send + Sync { async fn query_procedure_state(&self, pid: &str) -> Result; } +/// This flow service handler is only use for flush flow for now. +#[async_trait] +pub trait FlowServiceHandler: Send + Sync { + async fn flush( + &self, + catalog: &str, + flow: &str, + ctx: QueryContextRef, + ) -> Result; +} + pub type TableMutationHandlerRef = Arc; pub type ProcedureServiceHandlerRef = Arc; + +pub type FlowServiceHandlerRef = Arc; diff --git a/src/common/function/src/lib.rs b/src/common/function/src/lib.rs index 204333601c..4a6a6844d5 100644 --- a/src/common/function/src/lib.rs +++ b/src/common/function/src/lib.rs @@ -15,6 +15,7 @@ #![feature(let_chains)] #![feature(try_blocks)] +mod flush_flow; mod macros; pub mod scalars; mod system; diff --git a/src/common/function/src/state.rs b/src/common/function/src/state.rs index 0a740ddb4a..55953b6794 100644 --- a/src/common/function/src/state.rs +++ b/src/common/function/src/state.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use crate::handlers::{ProcedureServiceHandlerRef, TableMutationHandlerRef}; +use crate::handlers::{FlowServiceHandlerRef, ProcedureServiceHandlerRef, TableMutationHandlerRef}; /// Shared state for SQL functions. /// The handlers in state may be `None` in cli command-line or test cases. @@ -22,6 +22,8 @@ pub struct FunctionState { pub table_mutation_handler: Option, // The procedure service handler pub procedure_service_handler: Option, + // The flownode handler + pub flow_service_handler: Option, } impl FunctionState { @@ -42,9 +44,10 @@ impl FunctionState { CompactTableRequest, DeleteRequest, FlushTableRequest, InsertRequest, }; - use crate::handlers::{ProcedureServiceHandler, TableMutationHandler}; + use crate::handlers::{FlowServiceHandler, ProcedureServiceHandler, TableMutationHandler}; struct MockProcedureServiceHandler; struct MockTableMutationHandler; + struct MockFlowServiceHandler; const ROWS: usize = 42; #[async_trait] @@ -116,9 +119,22 @@ impl FunctionState { } } + #[async_trait] + impl FlowServiceHandler for MockFlowServiceHandler { + async fn flush( + &self, + _catalog: &str, + _flow: &str, + _ctx: QueryContextRef, + ) -> Result { + todo!() + } + } + Self { table_mutation_handler: Some(Arc::new(MockTableMutationHandler)), procedure_service_handler: Some(Arc::new(MockProcedureServiceHandler)), + flow_service_handler: Some(Arc::new(MockFlowServiceHandler)), } } } diff --git a/src/common/function/src/system/procedure_state.rs b/src/common/function/src/system/procedure_state.rs index 3a15cbd011..3b0ce0195b 100644 --- a/src/common/function/src/system/procedure_state.rs +++ b/src/common/function/src/system/procedure_state.rs @@ -12,26 +12,19 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::fmt; - use api::v1::meta::ProcedureStatus; use common_macro::admin_fn; use common_meta::rpc::procedure::ProcedureStateResponse; -use common_query::error::Error::ThreadJoin; use common_query::error::{ InvalidFuncArgsSnafu, MissingProcedureServiceHandlerSnafu, Result, UnsupportedInputDataTypeSnafu, }; use common_query::prelude::{Signature, Volatility}; -use common_telemetry::error; use datatypes::prelude::*; -use datatypes::vectors::VectorRef; use serde::Serialize; use session::context::QueryContextRef; -use snafu::{ensure, Location, OptionExt}; +use snafu::ensure; -use crate::ensure_greptime; -use crate::function::{Function, FunctionContext}; use crate::handlers::ProcedureServiceHandlerRef; #[derive(Serialize)] @@ -103,6 +96,7 @@ mod tests { use datatypes::vectors::StringVector; use super::*; + use crate::function::{Function, FunctionContext}; #[test] fn test_procedure_state_misc() { diff --git a/src/common/function/src/table.rs b/src/common/function/src/table.rs index 244c395cb4..d61723375f 100644 --- a/src/common/function/src/table.rs +++ b/src/common/function/src/table.rs @@ -22,6 +22,7 @@ use flush_compact_region::{CompactRegionFunction, FlushRegionFunction}; use flush_compact_table::{CompactTableFunction, FlushTableFunction}; use migrate_region::MigrateRegionFunction; +use crate::flush_flow::FlushFlowFunction; use crate::function_registry::FunctionRegistry; /// Table functions @@ -35,5 +36,6 @@ impl TableFunction { registry.register(Arc::new(CompactRegionFunction)); registry.register(Arc::new(FlushTableFunction)); registry.register(Arc::new(CompactTableFunction)); + registry.register(Arc::new(FlushFlowFunction)); } } diff --git a/src/common/function/src/table/flush_compact_region.rs b/src/common/function/src/table/flush_compact_region.rs index 099fe3f86c..6d89c5c125 100644 --- a/src/common/function/src/table/flush_compact_region.rs +++ b/src/common/function/src/table/flush_compact_region.rs @@ -12,23 +12,16 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::fmt; - use common_macro::admin_fn; -use common_query::error::Error::ThreadJoin; use common_query::error::{ InvalidFuncArgsSnafu, MissingTableMutationHandlerSnafu, Result, UnsupportedInputDataTypeSnafu, }; use common_query::prelude::{Signature, Volatility}; -use common_telemetry::error; use datatypes::prelude::*; -use datatypes::vectors::VectorRef; use session::context::QueryContextRef; -use snafu::{ensure, Location, OptionExt}; +use snafu::ensure; use store_api::storage::RegionId; -use crate::ensure_greptime; -use crate::function::{Function, FunctionContext}; use crate::handlers::TableMutationHandlerRef; use crate::helper::cast_u64; @@ -84,6 +77,7 @@ mod tests { use datatypes::vectors::UInt64Vector; use super::*; + use crate::function::{Function, FunctionContext}; macro_rules! define_region_function_test { ($name: ident, $func: ident) => { diff --git a/src/common/function/src/table/flush_compact_table.rs b/src/common/function/src/table/flush_compact_table.rs index 52837184c2..5adfc25108 100644 --- a/src/common/function/src/table/flush_compact_table.rs +++ b/src/common/function/src/table/flush_compact_table.rs @@ -12,28 +12,23 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::fmt; use std::str::FromStr; use api::v1::region::{compact_request, StrictWindow}; use common_error::ext::BoxedError; use common_macro::admin_fn; -use common_query::error::Error::ThreadJoin; use common_query::error::{ InvalidFuncArgsSnafu, MissingTableMutationHandlerSnafu, Result, TableMutationSnafu, UnsupportedInputDataTypeSnafu, }; use common_query::prelude::{Signature, Volatility}; -use common_telemetry::{error, info}; +use common_telemetry::info; use datatypes::prelude::*; -use datatypes::vectors::VectorRef; use session::context::QueryContextRef; use session::table_name::table_name_to_full_name; -use snafu::{ensure, Location, OptionExt, ResultExt}; +use snafu::{ensure, ResultExt}; use table::requests::{CompactTableRequest, FlushTableRequest}; -use crate::ensure_greptime; -use crate::function::{Function, FunctionContext}; use crate::handlers::TableMutationHandlerRef; /// Compact type: strict window. @@ -209,6 +204,7 @@ mod tests { use session::context::QueryContext; use super::*; + use crate::function::{Function, FunctionContext}; macro_rules! define_table_function_test { ($name: ident, $func: ident) => { diff --git a/src/common/function/src/table/migrate_region.rs b/src/common/function/src/table/migrate_region.rs index 88e9516943..4e416d0246 100644 --- a/src/common/function/src/table/migrate_region.rs +++ b/src/common/function/src/table/migrate_region.rs @@ -12,24 +12,16 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::fmt::{self}; use std::time::Duration; use common_macro::admin_fn; use common_meta::rpc::procedure::MigrateRegionRequest; -use common_query::error::Error::ThreadJoin; use common_query::error::{InvalidFuncArgsSnafu, MissingProcedureServiceHandlerSnafu, Result}; use common_query::prelude::{Signature, TypeSignature, Volatility}; -use common_telemetry::error; -use datatypes::data_type::DataType; use datatypes::prelude::ConcreteDataType; use datatypes::value::{Value, ValueRef}; -use datatypes::vectors::VectorRef; use session::context::QueryContextRef; -use snafu::{Location, OptionExt}; -use crate::ensure_greptime; -use crate::function::{Function, FunctionContext}; use crate::handlers::ProcedureServiceHandlerRef; use crate::helper::cast_u64; @@ -128,9 +120,10 @@ mod tests { use std::sync::Arc; use common_query::prelude::TypeSignature; - use datatypes::vectors::{StringVector, UInt64Vector}; + use datatypes::vectors::{StringVector, UInt64Vector, VectorRef}; use super::*; + use crate::function::{Function, FunctionContext}; #[test] fn test_migrate_region_misc() { diff --git a/src/common/macro/src/admin_fn.rs b/src/common/macro/src/admin_fn.rs index be6b77a5eb..042343aec0 100644 --- a/src/common/macro/src/admin_fn.rs +++ b/src/common/macro/src/admin_fn.rs @@ -153,6 +153,7 @@ fn build_struct( let ret = Ident::new(&format!("{ret}_datatype"), ret.span()); let uppcase_display_name = display_name.to_uppercase(); // Get the handler name in function state by the argument ident + // TODO(discord9): consider simple depend injection if more handlers are needed let (handler, snafu_type) = match handler_type.to_string().as_str() { "ProcedureServiceHandlerRef" => ( Ident::new("procedure_service_handler", handler_type.span()), @@ -163,6 +164,11 @@ fn build_struct( Ident::new("table_mutation_handler", handler_type.span()), Ident::new("MissingTableMutationHandlerSnafu", handler_type.span()), ), + + "FlowServiceHandlerRef" => ( + Ident::new("flow_service_handler", handler_type.span()), + Ident::new("MissingFlowServiceHandlerSnafu", handler_type.span()), + ), handler => ok!(error!( handler_type.span(), format!("Unknown handler type: {handler}") @@ -174,29 +180,29 @@ fn build_struct( #[derive(Debug)] #vis struct #name; - impl fmt::Display for #name { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + impl std::fmt::Display for #name { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { write!(f, #uppcase_display_name) } } - impl Function for #name { + impl crate::function::Function for #name { fn name(&self) -> &'static str { #display_name } - fn return_type(&self, _input_types: &[ConcreteDataType]) -> Result { - Ok(ConcreteDataType::#ret()) + fn return_type(&self, _input_types: &[store_api::storage::ConcreteDataType]) -> common_query::error::Result { + Ok(store_api::storage::ConcreteDataType::#ret()) } fn signature(&self) -> Signature { #sig_fn() } - fn eval(&self, func_ctx: FunctionContext, columns: &[VectorRef]) -> Result { + fn eval(&self, func_ctx: crate::function::FunctionContext, columns: &[datatypes::vectors::VectorRef]) -> common_query::error::Result { // Ensure under the `greptime` catalog for security - ensure_greptime!(func_ctx); + crate::ensure_greptime!(func_ctx); let columns_num = columns.len(); let rows_num = if columns.is_empty() { @@ -208,6 +214,9 @@ fn build_struct( // TODO(dennis): DataFusion doesn't support async UDF currently std::thread::spawn(move || { + use snafu::OptionExt; + use datatypes::data_type::DataType; + let query_ctx = &func_ctx.query_ctx; let handler = func_ctx .state @@ -215,7 +224,7 @@ fn build_struct( .as_ref() .context(#snafu_type)?; - let mut builder = ConcreteDataType::#ret() + let mut builder = store_api::storage::ConcreteDataType::#ret() .create_mutable_vector(rows_num); if columns_num == 0 { @@ -242,9 +251,9 @@ fn build_struct( }) .join() .map_err(|e| { - error!(e; "Join thread error"); - ThreadJoin { - location: Location::default(), + common_telemetry::error!(e; "Join thread error"); + common_query::error::Error::ThreadJoin { + location: snafu::Location::default(), } })? diff --git a/src/common/macro/src/lib.rs b/src/common/macro/src/lib.rs index 89b7599e26..8b10b83e86 100644 --- a/src/common/macro/src/lib.rs +++ b/src/common/macro/src/lib.rs @@ -73,7 +73,7 @@ pub fn range_fn(args: TokenStream, input: TokenStream) -> TokenStream { /// Attribute macro to convert a normal function to SQL administration function. The annotated function /// should accept: -/// - `&ProcedureServiceHandlerRef` or `&TableMutationHandlerRef` as the first argument, +/// - `&ProcedureServiceHandlerRef` or `&TableMutationHandlerRef` or `FlowServiceHandlerRef` as the first argument, /// - `&QueryContextRef` as the second argument, and /// - `&[ValueRef<'_>]` as the third argument which is SQL function input values in each row. /// Return type must be `common_query::error::Result`. @@ -85,6 +85,8 @@ pub fn range_fn(args: TokenStream, input: TokenStream) -> TokenStream { /// - `ret`: The return type of the generated SQL function, it will be transformed into `ConcreteDataType::{ret}_datatype()` result. /// - `display_name`: The display name of the generated SQL function. /// - `sig_fn`: the function to returns `Signature` of generated `Function`. +/// +/// Note that this macro should only be used in `common-function` crate for now #[proc_macro_attribute] pub fn admin_fn(args: TokenStream, input: TokenStream) -> TokenStream { process_admin_fn(args, input) diff --git a/src/common/query/src/error.rs b/src/common/query/src/error.rs index 8f80004d42..b8adeeba5c 100644 --- a/src/common/query/src/error.rs +++ b/src/common/query/src/error.rs @@ -239,6 +239,12 @@ pub enum Error { location: Location, }, + #[snafu(display("Missing FlowServiceHandler, not expected"))] + MissingFlowServiceHandler { + #[snafu(implicit)] + location: Location, + }, + #[snafu(display("Invalid function args: {}", err_msg))] InvalidFuncArgs { err_msg: String, @@ -252,6 +258,12 @@ pub enum Error { #[snafu(implicit)] location: Location, }, + + #[snafu(display("Can't found alive flownode"))] + FlownodeNotFound { + #[snafu(implicit)] + location: Location, + }, } pub type Result = std::result::Result; @@ -269,7 +281,8 @@ impl ErrorExt for Error { | Error::BadAccumulatorImpl { .. } | Error::ToScalarValue { .. } | Error::GetScalarVector { .. } - | Error::ArrowCompute { .. } => StatusCode::EngineExecuteQuery, + | Error::ArrowCompute { .. } + | Error::FlownodeNotFound { .. } => StatusCode::EngineExecuteQuery, Error::ExecuteFunction { error, .. } | Error::GeneralDataFusion { error, .. } => { datafusion_status_code::(error, None) @@ -283,6 +296,7 @@ impl ErrorExt for Error { Error::MissingTableMutationHandler { .. } | Error::MissingProcedureServiceHandler { .. } + | Error::MissingFlowServiceHandler { .. } | Error::ExecuteRepeatedly { .. } | Error::ThreadJoin { .. } => StatusCode::Unexpected, diff --git a/src/datanode/src/datanode.rs b/src/datanode/src/datanode.rs index d4ba2f77cc..77c337e8fc 100644 --- a/src/datanode/src/datanode.rs +++ b/src/datanode/src/datanode.rs @@ -321,6 +321,7 @@ impl DatanodeBuilder { None, None, None, + None, false, self.plugins.clone(), ); diff --git a/src/flow/src/adapter.rs b/src/flow/src/adapter.rs index 6593785ac7..5ad69feb5f 100644 --- a/src/flow/src/adapter.rs +++ b/src/flow/src/adapter.rs @@ -129,6 +129,10 @@ pub struct FlowWorkerManager { src_send_buf_lens: RwLock>>, tick_manager: FlowTickManager, node_id: Option, + /// Lock for flushing, will be `read` by `handle_inserts` and `write` by `flush_flow` + /// + /// So that a series of event like `inserts -> flush` can be handled correctly + flush_lock: RwLock<()>, } /// Building FlownodeManager @@ -161,6 +165,7 @@ impl FlowWorkerManager { src_send_buf_lens: Default::default(), tick_manager, node_id, + flush_lock: RwLock::new(()), } } @@ -562,9 +567,9 @@ impl FlowWorkerManager { for worker in self.worker_handles.iter() { // TODO(discord9): consider how to handle error in individual worker if blocking { - worker.lock().await.run_available(now).await?; + worker.lock().await.run_available(now, blocking).await?; } else if let Ok(worker) = worker.try_lock() { - worker.run_available(now).await?; + worker.run_available(now, blocking).await?; } else { return Ok(row_cnt); } diff --git a/src/flow/src/adapter/flownode_impl.rs b/src/flow/src/adapter/flownode_impl.rs index f780745f62..04a748de40 100644 --- a/src/flow/src/adapter/flownode_impl.rs +++ b/src/flow/src/adapter/flownode_impl.rs @@ -16,7 +16,9 @@ use std::collections::HashMap; -use api::v1::flow::{flow_request, CreateRequest, DropRequest, FlowRequest, FlowResponse}; +use api::v1::flow::{ + flow_request, CreateRequest, DropRequest, FlowRequest, FlowResponse, FlushFlow, +}; use api::v1::region::InsertRequests; use common_error::ext::BoxedError; use common_meta::error::{ExternalSnafu, Result, UnexpectedSnafu}; @@ -92,6 +94,34 @@ impl Flownode for FlowWorkerManager { .map_err(to_meta_err)?; Ok(Default::default()) } + Some(flow_request::Body::Flush(FlushFlow { + flow_id: Some(flow_id), + })) => { + // TODO(discord9): impl individual flush + debug!("Starting to flush flow_id={:?}", flow_id); + // lock to make sure writes before flush are written to flow + // and immediately drop to prevent following writes to be blocked + drop(self.flush_lock.write().await); + let flushed_input_rows = self + .node_context + .read() + .await + .flush_all_sender() + .await + .map_err(to_meta_err)?; + let rows_send = self.run_available(true).await.map_err(to_meta_err)?; + let row = self.send_writeback_requests().await.map_err(to_meta_err)?; + + debug!( + "Done to flush flow_id={:?} with {} input rows flushed, {} rows sended and {} output rows flushed", + flow_id, flushed_input_rows, rows_send, row + ); + Ok(FlowResponse { + affected_flows: vec![flow_id], + affected_rows: row as u64, + ..Default::default() + }) + } None => UnexpectedSnafu { err_msg: "Missing request body", } @@ -104,6 +134,10 @@ impl Flownode for FlowWorkerManager { } async fn handle_inserts(&self, request: InsertRequests) -> Result { + // using try_read makesure two things: + // 1. flush wouldn't happen until inserts before it is inserted + // 2. inserts happening concurrently with flush wouldn't be block by flush + let _flush_lock = self.flush_lock.try_read(); for write_request in request.requests { let region_id = write_request.region_id; let table_id = RegionId::from(region_id).table_id(); diff --git a/src/flow/src/adapter/worker.rs b/src/flow/src/adapter/worker.rs index 341923a7b4..72ac9df127 100644 --- a/src/flow/src/adapter/worker.rs +++ b/src/flow/src/adapter/worker.rs @@ -151,9 +151,23 @@ impl WorkerHandle { /// /// will set the current timestamp to `now` for all dataflows before running them /// + /// `blocking` indicate whether it will wait til all dataflows are finished computing if true or + /// just start computing and return immediately if false + /// /// the returned error is unrecoverable, and the worker should be shutdown/rebooted - pub async fn run_available(&self, now: repr::Timestamp) -> Result<(), Error> { - self.itc_client.call_no_resp(Request::RunAvail { now }) + pub async fn run_available(&self, now: repr::Timestamp, blocking: bool) -> Result<(), Error> { + common_telemetry::debug!("Running available with blocking={}", blocking); + if blocking { + let resp = self + .itc_client + .call_with_resp(Request::RunAvail { now, blocking }) + .await?; + common_telemetry::debug!("Running available with response={:?}", resp); + Ok(()) + } else { + self.itc_client + .call_no_resp(Request::RunAvail { now, blocking }) + } } pub async fn contains_flow(&self, flow_id: FlowId) -> Result { @@ -332,9 +346,13 @@ impl<'s> Worker<'s> { let ret = self.remove_flow(flow_id); Some(Response::Remove { result: ret }) } - Request::RunAvail { now } => { + Request::RunAvail { now, blocking } => { self.run_tick(now); - None + if blocking { + Some(Response::RunAvail) + } else { + None + } } Request::ContainTask { flow_id } => { let ret = self.task_states.contains_key(&flow_id); @@ -365,6 +383,7 @@ pub enum Request { /// Trigger the worker to run, useful after input buffer is full RunAvail { now: repr::Timestamp, + blocking: bool, }, ContainTask { flow_id: FlowId, @@ -384,6 +403,7 @@ enum Response { ContainTask { result: bool, }, + RunAvail, } fn create_inter_thread_call() -> (InterThreadCallClient, InterThreadCallServer) { @@ -504,7 +524,7 @@ mod test { Some(flow_id) ); tx.send((Row::empty(), 0, 0)).unwrap(); - handle.run_available(0).await.unwrap(); + handle.run_available(0, true).await.unwrap(); assert_eq!(sink_rx.recv().await.unwrap().0, Row::empty()); drop(handle); worker_thread_handle.join().unwrap(); diff --git a/src/flow/src/compute/render/src_sink.rs b/src/flow/src/compute/render/src_sink.rs index 8ee6efb1ee..c8a8a901c7 100644 --- a/src/flow/src/compute/render/src_sink.rs +++ b/src/flow/src/compute/render/src_sink.rs @@ -129,9 +129,14 @@ impl<'referred, 'df> Context<'referred, 'df> { collection.into_inner(), move |_ctx, recv| { let data = recv.take_inner(); + debug!( + "render_unbounded_sink: send {} rows", + data.iter().map(|i| i.len()).sum::() + ); for row in data.into_iter().flat_map(|i| i.into_iter()) { // if the sender is closed, stop sending if sender.is_closed() { + common_telemetry::error!("UnboundedSink is closed"); break; } // TODO(discord9): handling tokio error diff --git a/src/flow/src/error.rs b/src/flow/src/error.rs index 7b6fd3633d..3b8877ed86 100644 --- a/src/flow/src/error.rs +++ b/src/flow/src/error.rs @@ -150,6 +150,7 @@ pub enum Error { #[snafu(display("Datafusion error: {raw:?} in context: {context}"))] Datafusion { + #[snafu(source)] raw: datafusion_common::DataFusionError, context: String, #[snafu(implicit)] diff --git a/src/flow/src/expr/relation/func.rs b/src/flow/src/expr/relation/func.rs index 7923f81493..5307a6aedb 100644 --- a/src/flow/src/expr/relation/func.rs +++ b/src/flow/src/expr/relation/func.rs @@ -21,7 +21,7 @@ use datatypes::prelude::ConcreteDataType; use datatypes::value::{OrderedF32, OrderedF64, Value}; use serde::{Deserialize, Serialize}; use smallvec::smallvec; -use snafu::{OptionExt, ResultExt}; +use snafu::{IntoError, OptionExt, ResultExt}; use strum::{EnumIter, IntoEnumIterator}; use crate::error::{DatafusionSnafu, Error, InvalidQuerySnafu}; @@ -201,11 +201,10 @@ impl AggregateFunc { } .fail() } else { - DatafusionSnafu { - raw: err, + Err(DatafusionSnafu { context: "Error when parsing aggregate function", } - .fail() + .into_error(err)) } })?; diff --git a/src/flow/src/expr/scalar.rs b/src/flow/src/expr/scalar.rs index 42ed61d4f5..8a3290a932 100644 --- a/src/flow/src/expr/scalar.rs +++ b/src/flow/src/expr/scalar.rs @@ -364,12 +364,10 @@ impl ScalarExpr { .fn_impl // TODO(discord9): get scheme from args instead? .data_type(df_scalar_fn.df_schema.as_arrow()) - .map_err(|err| { + .context({ DatafusionSnafu { - raw: err, context: "Failed to get data type from datafusion scalar function", } - .build() })?; let typ = ConcreteDataType::try_from(&arrow_typ) .map_err(BoxedError::new) diff --git a/src/flow/src/repr/relation.rs b/src/flow/src/repr/relation.rs index 382c7a63c3..e470ad9dbd 100644 --- a/src/flow/src/repr/relation.rs +++ b/src/flow/src/repr/relation.rs @@ -376,12 +376,10 @@ impl RelationDesc { .collect(); let arrow_schema = arrow_schema::Schema::new(fields); - DFSchema::try_from(arrow_schema.clone()).map_err(|err| { + DFSchema::try_from(arrow_schema.clone()).context({ DatafusionSnafu { - raw: err, context: format!("Error when converting to DFSchema: {:?}", arrow_schema), } - .build() }) } diff --git a/src/flow/src/server.rs b/src/flow/src/server.rs index cbe95143d6..51ebdda1d6 100644 --- a/src/flow/src/server.rs +++ b/src/flow/src/server.rs @@ -275,6 +275,7 @@ impl FlownodeBuilder { None, None, None, + None, false, Default::default(), ); diff --git a/src/flow/src/transform.rs b/src/flow/src/transform.rs index ab3fdd87c0..f8075b5dc2 100644 --- a/src/flow/src/transform.rs +++ b/src/flow/src/transform.rs @@ -19,6 +19,8 @@ use std::sync::Arc; use bytes::buf::IntoIter; use common_error::ext::BoxedError; use common_telemetry::info; +use datafusion::optimizer::simplify_expressions::SimplifyExpressions; +use datafusion::optimizer::{OptimizerContext, OptimizerRule}; use datatypes::data_type::ConcreteDataType as CDT; use literal::{from_substrait_literal, from_substrait_type}; use prost::Message; @@ -39,8 +41,8 @@ use substrait_proto::proto::extensions::SimpleExtensionDeclaration; use crate::adapter::FlownodeContext; use crate::error::{ - Error, ExternalSnafu, InvalidQueryProstSnafu, NotImplementedSnafu, TableNotFoundSnafu, - UnexpectedSnafu, + DatafusionSnafu, Error, ExternalSnafu, InvalidQueryProstSnafu, NotImplementedSnafu, + TableNotFoundSnafu, UnexpectedSnafu, }; use crate::expr::GlobalId; use crate::plan::TypedPlan; @@ -135,6 +137,12 @@ pub async fn sql_to_flow_plan( .map_err(BoxedError::new) .context(ExternalSnafu)?; let LogicalPlan::DfPlan(plan) = plan; + let plan = SimplifyExpressions::new() + .rewrite(plan, &OptimizerContext::default()) + .context(DatafusionSnafu { + context: "Fail to apply `SimplifyExpressions` optimization", + })? + .data; let sub_plan = DFLogicalSubstraitConvertor {} .to_sub_plan(&plan, DefaultSerializer) .map_err(BoxedError::new) @@ -292,7 +300,7 @@ mod test { }; catalog_list.register_table_sync(req_with_ts).unwrap(); - let factory = query::QueryEngineFactory::new(catalog_list, None, None, None, false); + let factory = query::QueryEngineFactory::new(catalog_list, None, None, None, None, false); let engine = factory.query_engine(); engine.register_function(Arc::new(TumbleFunction {})); diff --git a/src/flow/src/transform/expr.rs b/src/flow/src/transform/expr.rs index 0eb1460c49..5848dc66b6 100644 --- a/src/flow/src/transform/expr.rs +++ b/src/flow/src/transform/expr.rs @@ -96,21 +96,15 @@ pub(crate) async fn from_scalar_fn_to_df_fn_impl( ) .await ; - let expr = df_expr.map_err(|err| { + let expr = df_expr.context({ DatafusionSnafu { - raw: err, context: "Failed to convert substrait scalar function to datafusion scalar function", } - .build() })?; let phy_expr = datafusion::physical_expr::create_physical_expr(&expr, &schema, &Default::default()) - .map_err(|err| { - DatafusionSnafu { - raw: err, - context: "Failed to create physical expression from logical expression", - } - .build() + .context(DatafusionSnafu { + context: "Failed to create physical expression from logical expression", })?; Ok(phy_expr) } diff --git a/src/frontend/Cargo.toml b/src/frontend/Cargo.toml index 56f4ab904a..362d147d13 100644 --- a/src/frontend/Cargo.toml +++ b/src/frontend/Cargo.toml @@ -26,6 +26,7 @@ common-config.workspace = true common-datasource.workspace = true common-error.workspace = true common-frontend.workspace = true +common-function.workspace = true common-grpc.workspace = true common-macro.workspace = true common-meta.workspace = true diff --git a/src/frontend/src/instance/builder.rs b/src/frontend/src/instance/builder.rs index 49ffd67e31..5450e55ce2 100644 --- a/src/frontend/src/instance/builder.rs +++ b/src/frontend/src/instance/builder.rs @@ -20,10 +20,12 @@ use common_base::Plugins; use common_meta::cache::{LayeredCacheRegistryRef, TableRouteCacheRef}; use common_meta::cache_invalidator::{CacheInvalidatorRef, DummyCacheInvalidator}; use common_meta::ddl::ProcedureExecutorRef; +use common_meta::key::flow::FlowMetadataManager; use common_meta::key::TableMetadataManager; use common_meta::kv_backend::KvBackendRef; use common_meta::node_manager::NodeManagerRef; use operator::delete::Deleter; +use operator::flow::FlowServiceOperator; use operator::insert::Inserter; use operator::procedure::ProcedureServiceOperator; use operator::request::Requester; @@ -153,11 +155,15 @@ impl FrontendBuilder { self.procedure_executor.clone(), )); + let flow_metadata_manager = Arc::new(FlowMetadataManager::new(kv_backend.clone())); + let flow_service = FlowServiceOperator::new(flow_metadata_manager, node_manager.clone()); + let query_engine = QueryEngineFactory::new_with_plugins( self.catalog_manager.clone(), Some(region_query_handler.clone()), Some(table_mutation_handler), Some(procedure_service_handler), + Some(Arc::new(flow_service)), true, plugins.clone(), ) diff --git a/src/operator/src/flow.rs b/src/operator/src/flow.rs new file mode 100644 index 0000000000..d6344e278d --- /dev/null +++ b/src/operator/src/flow.rs @@ -0,0 +1,129 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use api::v1::flow::FlowRequestHeader; +use async_trait::async_trait; +use common_error::ext::BoxedError; +use common_function::handlers::FlowServiceHandler; +use common_meta::key::flow::FlowMetadataManagerRef; +use common_meta::node_manager::NodeManagerRef; +use common_query::error::Result; +use common_telemetry::tracing_context::TracingContext; +use futures::stream::FuturesUnordered; +use futures::{StreamExt, TryStreamExt}; +use session::context::QueryContextRef; +use snafu::{OptionExt, ResultExt}; + +/// The operator for flow service which implements [`FlowServiceHandler`]. +pub struct FlowServiceOperator { + flow_metadata_manager: FlowMetadataManagerRef, + node_manager: NodeManagerRef, +} + +impl FlowServiceOperator { + pub fn new( + flow_metadata_manager: FlowMetadataManagerRef, + node_manager: NodeManagerRef, + ) -> Self { + Self { + flow_metadata_manager, + node_manager, + } + } +} + +#[async_trait] +impl FlowServiceHandler for FlowServiceOperator { + async fn flush( + &self, + catalog: &str, + flow: &str, + ctx: QueryContextRef, + ) -> Result { + self.flush_inner(catalog, flow, ctx).await + } +} + +impl FlowServiceOperator { + /// Flush the flownodes according to the flow id. + async fn flush_inner( + &self, + catalog: &str, + flow: &str, + ctx: QueryContextRef, + ) -> Result { + let id = self + .flow_metadata_manager + .flow_name_manager() + .get(catalog, flow) + .await + .map_err(BoxedError::new) + .context(common_query::error::ExecuteSnafu)? + .context(common_meta::error::FlowNotFoundSnafu { + flow_name: format!("{}.{}", catalog, flow), + }) + .map_err(BoxedError::new) + .context(common_query::error::ExecuteSnafu)? + .flow_id(); + + let all_flownode_peers = self + .flow_metadata_manager + .flow_route_manager() + .routes(id) + .try_collect::>() + .await + .map_err(BoxedError::new) + .context(common_query::error::ExecuteSnafu)?; + + // order of flownodes doesn't matter here + let all_flow_nodes = FuturesUnordered::from_iter( + all_flownode_peers + .iter() + .map(|(_key, peer)| self.node_manager.flownode(peer.peer())), + ) + .collect::>() + .await; + + let mut final_result: Option = None; + for node in all_flow_nodes { + let res = { + use api::v1::flow::{flow_request, FlowRequest, FlushFlow}; + let flush_req = FlowRequest { + header: Some(FlowRequestHeader { + tracing_context: TracingContext::from_current_span().to_w3c(), + query_context: Some( + common_meta::rpc::ddl::QueryContext::from(ctx.clone()).into(), + ), + }), + body: Some(flow_request::Body::Flush(FlushFlow { + flow_id: Some(api::v1::FlowId { id }), + })), + }; + node.handle(flush_req) + .await + .map_err(BoxedError::new) + .context(common_query::error::ExecuteSnafu)? + }; + + if let Some(prev) = &mut final_result { + prev.affected_rows = res.affected_rows; + prev.affected_flows.extend(res.affected_flows); + prev.extension.extend(res.extension); + } else { + final_result = Some(res); + } + } + final_result.context(common_query::error::FlownodeNotFoundSnafu) + } +} diff --git a/src/operator/src/lib.rs b/src/operator/src/lib.rs index b3e7e3afb9..c3afa02e5f 100644 --- a/src/operator/src/lib.rs +++ b/src/operator/src/lib.rs @@ -18,6 +18,7 @@ pub mod delete; pub mod error; pub mod expr_factory; +pub mod flow; pub mod insert; pub mod metrics; pub mod procedure; diff --git a/src/query/src/datafusion.rs b/src/query/src/datafusion.rs index 93b9e0f920..a7dfd17d49 100644 --- a/src/query/src/datafusion.rs +++ b/src/query/src/datafusion.rs @@ -543,7 +543,7 @@ mod tests { }; catalog_manager.register_table_sync(req).unwrap(); - QueryEngineFactory::new(catalog_manager, None, None, None, false).query_engine() + QueryEngineFactory::new(catalog_manager, None, None, None, None, false).query_engine() } #[tokio::test] diff --git a/src/query/src/query_engine.rs b/src/query/src/query_engine.rs index 1beea2a1c2..1eaff7b732 100644 --- a/src/query/src/query_engine.rs +++ b/src/query/src/query_engine.rs @@ -24,7 +24,9 @@ use catalog::CatalogManagerRef; use common_base::Plugins; use common_function::function::FunctionRef; use common_function::function_registry::FUNCTION_REGISTRY; -use common_function::handlers::{ProcedureServiceHandlerRef, TableMutationHandlerRef}; +use common_function::handlers::{ + FlowServiceHandlerRef, ProcedureServiceHandlerRef, TableMutationHandlerRef, +}; use common_function::scalars::aggregate::AggregateFunctionMetaRef; use common_query::prelude::ScalarUdf; use common_query::Output; @@ -103,6 +105,7 @@ impl QueryEngineFactory { region_query_handler: Option, table_mutation_handler: Option, procedure_service_handler: Option, + flow_service_handler: Option, with_dist_planner: bool, ) -> Self { Self::new_with_plugins( @@ -110,6 +113,7 @@ impl QueryEngineFactory { region_query_handler, table_mutation_handler, procedure_service_handler, + flow_service_handler, with_dist_planner, Default::default(), ) @@ -120,6 +124,7 @@ impl QueryEngineFactory { region_query_handler: Option, table_mutation_handler: Option, procedure_service_handler: Option, + flow_service_handler: Option, with_dist_planner: bool, plugins: Plugins, ) -> Self { @@ -128,6 +133,7 @@ impl QueryEngineFactory { region_query_handler, table_mutation_handler, procedure_service_handler, + flow_service_handler, with_dist_planner, plugins.clone(), )); @@ -161,7 +167,7 @@ mod tests { #[test] fn test_query_engine_factory() { let catalog_list = catalog::memory::new_memory_catalog_manager().unwrap(); - let factory = QueryEngineFactory::new(catalog_list, None, None, None, false); + let factory = QueryEngineFactory::new(catalog_list, None, None, None, None, false); let engine = factory.query_engine(); diff --git a/src/query/src/query_engine/context.rs b/src/query/src/query_engine/context.rs index c527e9d405..433c39119e 100644 --- a/src/query/src/query_engine/context.rs +++ b/src/query/src/query_engine/context.rs @@ -82,6 +82,7 @@ impl QueryEngineContext { None, None, None, + None, false, Plugins::default(), )); diff --git a/src/query/src/query_engine/default_serializer.rs b/src/query/src/query_engine/default_serializer.rs index ff341a26ed..5c7e01db85 100644 --- a/src/query/src/query_engine/default_serializer.rs +++ b/src/query/src/query_engine/default_serializer.rs @@ -140,7 +140,7 @@ mod tests { #[tokio::test] async fn test_serializer_decode_plan() { let catalog_list = catalog::memory::new_memory_catalog_manager().unwrap(); - let factory = QueryEngineFactory::new(catalog_list, None, None, None, false); + let factory = QueryEngineFactory::new(catalog_list, None, None, None, None, false); let engine = factory.query_engine(); diff --git a/src/query/src/query_engine/state.rs b/src/query/src/query_engine/state.rs index 2ba2ea85a9..f0e2ef53a5 100644 --- a/src/query/src/query_engine/state.rs +++ b/src/query/src/query_engine/state.rs @@ -20,7 +20,9 @@ use async_trait::async_trait; use catalog::CatalogManagerRef; use common_base::Plugins; use common_function::function::FunctionRef; -use common_function::handlers::{ProcedureServiceHandlerRef, TableMutationHandlerRef}; +use common_function::handlers::{ + FlowServiceHandlerRef, ProcedureServiceHandlerRef, TableMutationHandlerRef, +}; use common_function::scalars::aggregate::AggregateFunctionMetaRef; use common_function::state::FunctionState; use common_query::prelude::ScalarUdf; @@ -83,6 +85,7 @@ impl QueryEngineState { region_query_handler: Option, table_mutation_handler: Option, procedure_service_handler: Option, + flow_service_handler: Option, with_dist_planner: bool, plugins: Plugins, ) -> Self { @@ -138,6 +141,7 @@ impl QueryEngineState { function_state: Arc::new(FunctionState { table_mutation_handler, procedure_service_handler, + flow_service_handler, }), aggregate_functions: Arc::new(RwLock::new(HashMap::new())), extension_rules, diff --git a/src/query/src/range_select/plan_rewrite.rs b/src/query/src/range_select/plan_rewrite.rs index 8d6757a976..9eb16b359e 100644 --- a/src/query/src/range_select/plan_rewrite.rs +++ b/src/query/src/range_select/plan_rewrite.rs @@ -608,7 +608,7 @@ mod test { table, }) .is_ok()); - QueryEngineFactory::new(catalog_list, None, None, None, false).query_engine() + QueryEngineFactory::new(catalog_list, None, None, None, None, false).query_engine() } async fn do_query(sql: &str) -> Result { diff --git a/src/query/src/tests.rs b/src/query/src/tests.rs index e92fbba577..3ecd69a455 100644 --- a/src/query/src/tests.rs +++ b/src/query/src/tests.rs @@ -52,5 +52,5 @@ async fn exec_selection(engine: QueryEngineRef, sql: &str) -> Vec { pub fn new_query_engine_with_table(table: TableRef) -> QueryEngineRef { let catalog_manager = MemoryCatalogManager::new_with_table(table); - QueryEngineFactory::new(catalog_manager, None, None, None, false).query_engine() + QueryEngineFactory::new(catalog_manager, None, None, None, None, false).query_engine() } diff --git a/src/query/src/tests/query_engine_test.rs b/src/query/src/tests/query_engine_test.rs index 99551dab0e..6359940d4f 100644 --- a/src/query/src/tests/query_engine_test.rs +++ b/src/query/src/tests/query_engine_test.rs @@ -47,7 +47,7 @@ async fn test_datafusion_query_engine() -> Result<()> { let catalog_list = catalog::memory::new_memory_catalog_manager() .map_err(BoxedError::new) .context(QueryExecutionSnafu)?; - let factory = QueryEngineFactory::new(catalog_list, None, None, None, false); + let factory = QueryEngineFactory::new(catalog_list, None, None, None, None, false); let engine = factory.query_engine(); let column_schemas = vec![ColumnSchema::new( @@ -129,7 +129,7 @@ async fn test_query_validate() -> Result<()> { }); let factory = - QueryEngineFactory::new_with_plugins(catalog_list, None, None, None, false, plugins); + QueryEngineFactory::new_with_plugins(catalog_list, None, None, None, None, false, plugins); let engine = factory.query_engine(); let stmt = @@ -159,7 +159,7 @@ async fn test_udf() -> Result<()> { common_telemetry::init_default_ut_logging(); let catalog_list = catalog_manager()?; - let factory = QueryEngineFactory::new(catalog_list, None, None, None, false); + let factory = QueryEngineFactory::new(catalog_list, None, None, None, None, false); let engine = factory.query_engine(); let pow = make_scalar_function(pow); diff --git a/src/query/src/tests/time_range_filter_test.rs b/src/query/src/tests/time_range_filter_test.rs index 8c52d327c8..edb4042209 100644 --- a/src/query/src/tests/time_range_filter_test.rs +++ b/src/query/src/tests/time_range_filter_test.rs @@ -102,7 +102,8 @@ fn create_test_engine() -> TimeRangeTester { }; let _ = catalog_manager.register_table_sync(req).unwrap(); - let engine = QueryEngineFactory::new(catalog_manager, None, None, None, false).query_engine(); + let engine = + QueryEngineFactory::new(catalog_manager, None, None, None, None, false).query_engine(); TimeRangeTester { engine, filter } } diff --git a/src/script/benches/py_benchmark.rs b/src/script/benches/py_benchmark.rs index 0e748bee3a..b5c3d590d4 100644 --- a/src/script/benches/py_benchmark.rs +++ b/src/script/benches/py_benchmark.rs @@ -53,7 +53,7 @@ pub(crate) fn sample_script_engine() -> PyEngine { let catalog_manager = MemoryCatalogManager::new_with_table(NumbersTable::table(NUMBERS_TABLE_ID)); let query_engine = - QueryEngineFactory::new(catalog_manager, None, None, None, false).query_engine(); + QueryEngineFactory::new(catalog_manager, None, None, None, None, false).query_engine(); PyEngine::new(query_engine.clone()) } diff --git a/src/script/src/python/engine.rs b/src/script/src/python/engine.rs index 58e54d4b8e..d7d3b1326a 100644 --- a/src/script/src/python/engine.rs +++ b/src/script/src/python/engine.rs @@ -398,7 +398,7 @@ mod tests { let catalog_manager = MemoryCatalogManager::new_with_table(NumbersTable::table(NUMBERS_TABLE_ID)); let query_engine = - QueryEngineFactory::new(catalog_manager, None, None, None, false).query_engine(); + QueryEngineFactory::new(catalog_manager, None, None, None, None, false).query_engine(); PyEngine::new(query_engine.clone()) } diff --git a/src/script/src/test.rs b/src/script/src/test.rs index f4d3e4537d..ba05c435e3 100644 --- a/src/script/src/test.rs +++ b/src/script/src/test.rs @@ -56,7 +56,7 @@ pub async fn setup_scripts_manager( let catalog_manager = MemoryCatalogManager::new_with_table(table.clone()); - let factory = QueryEngineFactory::new(catalog_manager.clone(), None, None, None, false); + let factory = QueryEngineFactory::new(catalog_manager.clone(), None, None, None, None, false); let query_engine = factory.query_engine(); let mgr = ScriptManager::new(Arc::new(MockGrpcQueryHandler {}) as _, query_engine) .await diff --git a/src/servers/tests/mod.rs b/src/servers/tests/mod.rs index 450ed86497..daf9382ec7 100644 --- a/src/servers/tests/mod.rs +++ b/src/servers/tests/mod.rs @@ -215,7 +215,7 @@ impl GrpcQueryHandler for DummyInstance { fn create_testing_instance(table: TableRef) -> DummyInstance { let catalog_manager = MemoryCatalogManager::new_with_table(table); let query_engine = - QueryEngineFactory::new(catalog_manager, None, None, None, false).query_engine(); + QueryEngineFactory::new(catalog_manager, None, None, None, None, false).query_engine(); DummyInstance::new(query_engine) } diff --git a/tests/cases/standalone/common/flow/flow_basic.result b/tests/cases/standalone/common/flow/flow_basic.result new file mode 100644 index 0000000000..e5983ea8d4 --- /dev/null +++ b/tests/cases/standalone/common/flow/flow_basic.result @@ -0,0 +1,216 @@ +CREATE TABLE numbers_input_basic ( + number INT, + ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY(number), + TIME INDEX(ts) +); + +Affected Rows: 0 + +CREATE FLOW test_numbers_basic +SINK TO out_num_cnt_basic +AS +SELECT sum(number) FROM numbers_input_basic GROUP BY tumble(ts, '1 second', '2021-07-01 00:00:00'); + +Affected Rows: 0 + +-- TODO(discord9): confirm if it's necessary to flush flow here? +-- because flush_flow result is at most 1 +select flush_flow('test_numbers_basic')<=1; + ++----------------------------------------------------+ +| flush_flow(Utf8("test_numbers_basic")) <= Int64(1) | ++----------------------------------------------------+ +| true | ++----------------------------------------------------+ + +INSERT INTO numbers_input_basic +VALUES + (20, "2021-07-01 00:00:00.200"), + (22, "2021-07-01 00:00:00.600"); + +Affected Rows: 2 + +select flush_flow('test_numbers_basic')<=1; + ++----------------------------------------------------+ +| flush_flow(Utf8("test_numbers_basic")) <= Int64(1) | ++----------------------------------------------------+ +| true | ++----------------------------------------------------+ + +SELECT col_0, window_start, window_end FROM out_num_cnt_basic; + ++-------+---------------------+---------------------+ +| col_0 | window_start | window_end | ++-------+---------------------+---------------------+ +| 42 | 2021-07-01T00:00:00 | 2021-07-01T00:00:01 | ++-------+---------------------+---------------------+ + +select flush_flow('test_numbers_basic')<=1; + ++----------------------------------------------------+ +| flush_flow(Utf8("test_numbers_basic")) <= Int64(1) | ++----------------------------------------------------+ +| true | ++----------------------------------------------------+ + +INSERT INTO numbers_input_basic +VALUES + (23,"2021-07-01 00:00:01.000"), + (24,"2021-07-01 00:00:01.500"); + +Affected Rows: 2 + +select flush_flow('test_numbers_basic')<=1; + ++----------------------------------------------------+ +| flush_flow(Utf8("test_numbers_basic")) <= Int64(1) | ++----------------------------------------------------+ +| true | ++----------------------------------------------------+ + +SELECT col_0, window_start, window_end FROM out_num_cnt_basic; + ++-------+---------------------+---------------------+ +| col_0 | window_start | window_end | ++-------+---------------------+---------------------+ +| 42 | 2021-07-01T00:00:00 | 2021-07-01T00:00:01 | +| 47 | 2021-07-01T00:00:01 | 2021-07-01T00:00:02 | ++-------+---------------------+---------------------+ + +DROP FLOW test_numbers_basic; + +Affected Rows: 0 + +DROP TABLE numbers_input_basic; + +Affected Rows: 0 + +DROP TABLE out_num_cnt_basic; + +Affected Rows: 0 + +-- test interprete interval +CREATE TABLE numbers_input_basic ( + number INT, + ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY(number), + TIME INDEX(ts) +); + +Affected Rows: 0 + +create table out_num_cnt_basic ( + number INT, + ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP TIME INDEX); + +Affected Rows: 0 + +CREATE FLOW filter_numbers_basic SINK TO out_num_cnt_basic AS SELECT INTERVAL '1 day 1 second', INTERVAL '1 month 1 day 1 second', INTERVAL '1 year 1 month' FROM numbers_input_basic where number > 10; + +Affected Rows: 0 + +SHOW CREATE FLOW filter_numbers_basic; + ++----------------------+----------------------------------------------------------------------------------------------------------------------------------------------+ +| Flow | Create Flow | ++----------------------+----------------------------------------------------------------------------------------------------------------------------------------------+ +| filter_numbers_basic | CREATE OR REPLACE FLOW IF NOT EXISTS filter_numbers_basic | +| | SINK TO out_num_cnt_basic | +| | AS SELECT INTERVAL '1 day 1 second', INTERVAL '1 month 1 day 1 second', INTERVAL '1 year 1 month' FROM numbers_input_basic WHERE number > 10 | ++----------------------+----------------------------------------------------------------------------------------------------------------------------------------------+ + +drop flow filter_numbers_basic; + +Affected Rows: 0 + +drop table out_num_cnt_basic; + +Affected Rows: 0 + +drop table numbers_input_basic; + +Affected Rows: 0 + +CREATE TABLE bytes_log ( + byte INT, + ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP, -- event time + TIME INDEX(ts) +); + +Affected Rows: 0 + +CREATE TABLE approx_rate ( + rate FLOAT, + time_window TIMESTAMP, + update_at TIMESTAMP, + TIME INDEX(time_window) +); + +Affected Rows: 0 + +CREATE FLOW find_approx_rate +SINK TO approx_rate +AS +SELECT CAST((max(byte) - min(byte)) AS FLOAT)/30.0, date_bin(INTERVAL '30 second', ts) as time_window from bytes_log GROUP BY time_window; + +Affected Rows: 0 + +INSERT INTO bytes_log VALUES +(101, '2025-01-01 00:00:01'), +(300, '2025-01-01 00:00:29'); + +Affected Rows: 2 + +SELECT flush_flow('find_approx_rate')<=1; + ++--------------------------------------------------+ +| flush_flow(Utf8("find_approx_rate")) <= Int64(1) | ++--------------------------------------------------+ +| true | ++--------------------------------------------------+ + +SELECT rate, time_window FROM approx_rate; + ++----------+---------------------+ +| rate | time_window | ++----------+---------------------+ +| 6.633333 | 2025-01-01T00:00:00 | ++----------+---------------------+ + +INSERT INTO bytes_log VALUES +(450, '2025-01-01 00:00:32'), +(500, '2025-01-01 00:00:37'); + +Affected Rows: 2 + +SELECT flush_flow('find_approx_rate')<=1; + ++--------------------------------------------------+ +| flush_flow(Utf8("find_approx_rate")) <= Int64(1) | ++--------------------------------------------------+ +| true | ++--------------------------------------------------+ + +SELECT rate, time_window FROM approx_rate; + ++-----------+---------------------+ +| rate | time_window | ++-----------+---------------------+ +| 6.633333 | 2025-01-01T00:00:00 | +| 1.6666666 | 2025-01-01T00:00:30 | ++-----------+---------------------+ + +DROP TABLE bytes_log; + +Affected Rows: 0 + +DROP FLOW find_approx_rate; + +Affected Rows: 0 + +DROP TABLE approx_rate; + +Affected Rows: 0 + diff --git a/tests/cases/standalone/common/flow/flow_basic.sql b/tests/cases/standalone/common/flow/flow_basic.sql new file mode 100644 index 0000000000..dab3d78f83 --- /dev/null +++ b/tests/cases/standalone/common/flow/flow_basic.sql @@ -0,0 +1,99 @@ +CREATE TABLE numbers_input_basic ( + number INT, + ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY(number), + TIME INDEX(ts) +); + +CREATE FLOW test_numbers_basic +SINK TO out_num_cnt_basic +AS +SELECT sum(number) FROM numbers_input_basic GROUP BY tumble(ts, '1 second', '2021-07-01 00:00:00'); + +-- TODO(discord9): confirm if it's necessary to flush flow here? +-- because flush_flow result is at most 1 +select flush_flow('test_numbers_basic')<=1; + +INSERT INTO numbers_input_basic +VALUES + (20, "2021-07-01 00:00:00.200"), + (22, "2021-07-01 00:00:00.600"); + +select flush_flow('test_numbers_basic')<=1; + +SELECT col_0, window_start, window_end FROM out_num_cnt_basic; + +select flush_flow('test_numbers_basic')<=1; + +INSERT INTO numbers_input_basic +VALUES + (23,"2021-07-01 00:00:01.000"), + (24,"2021-07-01 00:00:01.500"); + +select flush_flow('test_numbers_basic')<=1; + +SELECT col_0, window_start, window_end FROM out_num_cnt_basic; + +DROP FLOW test_numbers_basic; +DROP TABLE numbers_input_basic; +DROP TABLE out_num_cnt_basic; + +-- test interprete interval + +CREATE TABLE numbers_input_basic ( + number INT, + ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY(number), + TIME INDEX(ts) +); +create table out_num_cnt_basic ( + number INT, + ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP TIME INDEX); + +CREATE FLOW filter_numbers_basic SINK TO out_num_cnt_basic AS SELECT INTERVAL '1 day 1 second', INTERVAL '1 month 1 day 1 second', INTERVAL '1 year 1 month' FROM numbers_input_basic where number > 10; + +SHOW CREATE FLOW filter_numbers_basic; + +drop flow filter_numbers_basic; + +drop table out_num_cnt_basic; + +drop table numbers_input_basic; + +CREATE TABLE bytes_log ( + byte INT, + ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP, -- event time + TIME INDEX(ts) +); + +CREATE TABLE approx_rate ( + rate FLOAT, + time_window TIMESTAMP, + update_at TIMESTAMP, + TIME INDEX(time_window) +); + +CREATE FLOW find_approx_rate +SINK TO approx_rate +AS +SELECT CAST((max(byte) - min(byte)) AS FLOAT)/30.0, date_bin(INTERVAL '30 second', ts) as time_window from bytes_log GROUP BY time_window; + +INSERT INTO bytes_log VALUES +(101, '2025-01-01 00:00:01'), +(300, '2025-01-01 00:00:29'); + +SELECT flush_flow('find_approx_rate')<=1; + +SELECT rate, time_window FROM approx_rate; + +INSERT INTO bytes_log VALUES +(450, '2025-01-01 00:00:32'), +(500, '2025-01-01 00:00:37'); + +SELECT flush_flow('find_approx_rate')<=1; + +SELECT rate, time_window FROM approx_rate; + +DROP TABLE bytes_log; +DROP FLOW find_approx_rate; +DROP TABLE approx_rate; \ No newline at end of file diff --git a/tests/cases/standalone/common/flow/flow_call_df_func.result b/tests/cases/standalone/common/flow/flow_call_df_func.result new file mode 100644 index 0000000000..bb4357fa61 --- /dev/null +++ b/tests/cases/standalone/common/flow/flow_call_df_func.result @@ -0,0 +1,370 @@ +CREATE TABLE numbers_input_df_func ( + number INT, + ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY(number), + TIME INDEX(ts) +); + +Affected Rows: 0 + +-- call `sum(abs(number))` where `abs` is DataFusion Function and `sum` is flow function +CREATE FLOW test_numbers_df_func +SINK TO out_num_cnt_df_func +AS +SELECT sum(abs(number)) FROM numbers_input_df_func GROUP BY tumble(ts, '1 second', '2021-07-01 00:00:00'); + +Affected Rows: 0 + +select flush_flow('test_numbers_df_func')<=1; + ++------------------------------------------------------+ +| flush_flow(Utf8("test_numbers_df_func")) <= Int64(1) | ++------------------------------------------------------+ +| true | ++------------------------------------------------------+ + +INSERT INTO numbers_input_df_func +VALUES + (-20, "2021-07-01 00:00:00.200"), + (22, "2021-07-01 00:00:00.600"); + +Affected Rows: 2 + +-- flush flow to make sure that table is created and data is inserted +select flush_flow('test_numbers_df_func')<=1; + ++------------------------------------------------------+ +| flush_flow(Utf8("test_numbers_df_func")) <= Int64(1) | ++------------------------------------------------------+ +| true | ++------------------------------------------------------+ + +SELECT col_0, window_start, window_end FROM out_num_cnt_df_func; + ++-------+---------------------+---------------------+ +| col_0 | window_start | window_end | ++-------+---------------------+---------------------+ +| 42 | 2021-07-01T00:00:00 | 2021-07-01T00:00:01 | ++-------+---------------------+---------------------+ + +select flush_flow('test_numbers_df_func')<=1; + ++------------------------------------------------------+ +| flush_flow(Utf8("test_numbers_df_func")) <= Int64(1) | ++------------------------------------------------------+ +| true | ++------------------------------------------------------+ + +INSERT INTO numbers_input_df_func +VALUES + (23,"2021-07-01 00:00:01.000"), + (-24,"2021-07-01 00:00:01.500"); + +Affected Rows: 2 + +select flush_flow('test_numbers_df_func')<=1; + ++------------------------------------------------------+ +| flush_flow(Utf8("test_numbers_df_func")) <= Int64(1) | ++------------------------------------------------------+ +| true | ++------------------------------------------------------+ + +SELECT col_0, window_start, window_end FROM out_num_cnt_df_func; + ++-------+---------------------+---------------------+ +| col_0 | window_start | window_end | ++-------+---------------------+---------------------+ +| 42 | 2021-07-01T00:00:00 | 2021-07-01T00:00:01 | +| 47 | 2021-07-01T00:00:01 | 2021-07-01T00:00:02 | ++-------+---------------------+---------------------+ + +DROP FLOW test_numbers_df_func; + +Affected Rows: 0 + +DROP TABLE numbers_input_df_func; + +Affected Rows: 0 + +DROP TABLE out_num_cnt_df_func; + +Affected Rows: 0 + +CREATE TABLE numbers_input_df_func ( + number INT, + ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY(number), + TIME INDEX(ts) +); + +Affected Rows: 0 + +-- call `abs(sum(number))`to make sure that calling `abs` function(impl by datafusion) on `sum` function(impl by flow) is working +CREATE FLOW test_numbers_df_func +SINK TO out_num_cnt_df_func +AS +SELECT abs(sum(number)) FROM numbers_input_df_func GROUP BY tumble(ts, '1 second', '2021-07-01 00:00:00'); + +Affected Rows: 0 + +select flush_flow('test_numbers_df_func')<=1; + ++------------------------------------------------------+ +| flush_flow(Utf8("test_numbers_df_func")) <= Int64(1) | ++------------------------------------------------------+ +| true | ++------------------------------------------------------+ + +INSERT INTO numbers_input_df_func +VALUES + (-20, "2021-07-01 00:00:00.200"), + (22, "2021-07-01 00:00:00.600"); + +Affected Rows: 2 + +-- flush flow to make sure that table is created and data is inserted +select flush_flow('test_numbers_df_func')<=1; + ++------------------------------------------------------+ +| flush_flow(Utf8("test_numbers_df_func")) <= Int64(1) | ++------------------------------------------------------+ +| true | ++------------------------------------------------------+ + +SELECT col_0, window_start, window_end FROM out_num_cnt_df_func; + ++-------+---------------------+---------------------+ +| col_0 | window_start | window_end | ++-------+---------------------+---------------------+ +| 2 | 2021-07-01T00:00:00 | 2021-07-01T00:00:01 | ++-------+---------------------+---------------------+ + +select flush_flow('test_numbers_df_func')<=1; + ++------------------------------------------------------+ +| flush_flow(Utf8("test_numbers_df_func")) <= Int64(1) | ++------------------------------------------------------+ +| true | ++------------------------------------------------------+ + +INSERT INTO numbers_input_df_func +VALUES + (23,"2021-07-01 00:00:01.000"), + (-24,"2021-07-01 00:00:01.500"); + +Affected Rows: 2 + +select flush_flow('test_numbers_df_func')<=1; + ++------------------------------------------------------+ +| flush_flow(Utf8("test_numbers_df_func")) <= Int64(1) | ++------------------------------------------------------+ +| true | ++------------------------------------------------------+ + +SELECT col_0, window_start, window_end FROM out_num_cnt_df_func; + ++-------+---------------------+---------------------+ +| col_0 | window_start | window_end | ++-------+---------------------+---------------------+ +| 2 | 2021-07-01T00:00:00 | 2021-07-01T00:00:01 | +| 1 | 2021-07-01T00:00:01 | 2021-07-01T00:00:02 | ++-------+---------------------+---------------------+ + +DROP FLOW test_numbers_df_func; + +Affected Rows: 0 + +DROP TABLE numbers_input_df_func; + +Affected Rows: 0 + +DROP TABLE out_num_cnt_df_func; + +Affected Rows: 0 + +-- test date_bin +CREATE TABLE numbers_input_df_func ( + number INT, + ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY(number), + TIME INDEX(ts) +); + +Affected Rows: 0 + +CREATE FLOW test_numbers_df_func +SINK TO out_num_cnt_df_func +AS +SELECT max(number) - min(number), date_bin(INTERVAL '1 second', ts, '2021-07-01 00:00:00'::TimestampNanosecond) FROM numbers_input_df_func GROUP BY date_bin(INTERVAL '1 second', ts, '2021-07-01 00:00:00'::TimestampNanosecond); + +Affected Rows: 0 + +select flush_flow('test_numbers_df_func')<=1; + ++------------------------------------------------------+ +| flush_flow(Utf8("test_numbers_df_func")) <= Int64(1) | ++------------------------------------------------------+ +| true | ++------------------------------------------------------+ + +INSERT INTO numbers_input_df_func +VALUES + (20, "2021-07-01 00:00:00.200"), + (22, "2021-07-01 00:00:00.600"); + +Affected Rows: 2 + +select flush_flow('test_numbers_df_func')<=1; + ++------------------------------------------------------+ +| flush_flow(Utf8("test_numbers_df_func")) <= Int64(1) | ++------------------------------------------------------+ +| true | ++------------------------------------------------------+ + +SELECT col_0, col_1 FROM out_num_cnt_df_func; + ++-------+---------------------+ +| col_0 | col_1 | ++-------+---------------------+ +| 2 | 2021-07-01T00:00:00 | ++-------+---------------------+ + +select flush_flow('test_numbers_df_func')<=1; + ++------------------------------------------------------+ +| flush_flow(Utf8("test_numbers_df_func")) <= Int64(1) | ++------------------------------------------------------+ +| true | ++------------------------------------------------------+ + +INSERT INTO numbers_input_df_func +VALUES + (23,"2021-07-01 00:00:01.000"), + (24,"2021-07-01 00:00:01.500"); + +Affected Rows: 2 + +select flush_flow('test_numbers_df_func')<=1; + ++------------------------------------------------------+ +| flush_flow(Utf8("test_numbers_df_func")) <= Int64(1) | ++------------------------------------------------------+ +| true | ++------------------------------------------------------+ + +SELECT col_0, col_1 FROM out_num_cnt_df_func; + ++-------+---------------------+ +| col_0 | col_1 | ++-------+---------------------+ +| 2 | 2021-07-01T00:00:00 | +| 1 | 2021-07-01T00:00:01 | ++-------+---------------------+ + +DROP FLOW test_numbers_df_func; + +Affected Rows: 0 + +DROP TABLE numbers_input_df_func; + +Affected Rows: 0 + +DROP TABLE out_num_cnt_df_func; + +Affected Rows: 0 + +-- test date_trunc +CREATE TABLE numbers_input_df_func ( + number INT, + ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY(number), + TIME INDEX(ts) +); + +Affected Rows: 0 + +CREATE FLOW test_numbers_df_func +SINK TO out_num_cnt +AS +SELECT date_trunc('second', ts), sum(number) FROM numbers_input_df_func GROUP BY date_trunc('second', ts); + +Affected Rows: 0 + +select flush_flow('test_numbers_df_func')<=1; + ++------------------------------------------------------+ +| flush_flow(Utf8("test_numbers_df_func")) <= Int64(1) | ++------------------------------------------------------+ +| true | ++------------------------------------------------------+ + +INSERT INTO numbers_input_df_func +VALUES + (20, "2021-07-01 00:00:00.200"), + (22, "2021-07-01 00:00:00.600"); + +Affected Rows: 2 + +select flush_flow('test_numbers_df_func')<=1; + ++------------------------------------------------------+ +| flush_flow(Utf8("test_numbers_df_func")) <= Int64(1) | ++------------------------------------------------------+ +| true | ++------------------------------------------------------+ + +SELECT col_0, col_1 FROM out_num_cnt; + ++---------------------+-------+ +| col_0 | col_1 | ++---------------------+-------+ +| 2021-07-01T00:00:00 | 42 | ++---------------------+-------+ + +select flush_flow('test_numbers_df_func')<=1; + ++------------------------------------------------------+ +| flush_flow(Utf8("test_numbers_df_func")) <= Int64(1) | ++------------------------------------------------------+ +| true | ++------------------------------------------------------+ + +INSERT INTO numbers_input_df_func +VALUES + (23,"2021-07-01 00:00:01.000"), + (24,"2021-07-01 00:00:01.500"); + +Affected Rows: 2 + +select flush_flow('test_numbers_df_func')<=1; + ++------------------------------------------------------+ +| flush_flow(Utf8("test_numbers_df_func")) <= Int64(1) | ++------------------------------------------------------+ +| true | ++------------------------------------------------------+ + +SELECT col_0, col_1 FROM out_num_cnt; + ++---------------------+-------+ +| col_0 | col_1 | ++---------------------+-------+ +| 2021-07-01T00:00:00 | 42 | +| 2021-07-01T00:00:01 | 47 | ++---------------------+-------+ + +DROP FLOW test_numbers_df_func; + +Affected Rows: 0 + +DROP TABLE numbers_input_df_func; + +Affected Rows: 0 + +DROP TABLE out_num_cnt; + +Affected Rows: 0 + diff --git a/tests/cases/standalone/common/flow/flow_call_df_func.sql b/tests/cases/standalone/common/flow/flow_call_df_func.sql new file mode 100644 index 0000000000..45c316ecee --- /dev/null +++ b/tests/cases/standalone/common/flow/flow_call_df_func.sql @@ -0,0 +1,158 @@ +CREATE TABLE numbers_input_df_func ( + number INT, + ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY(number), + TIME INDEX(ts) +); + +-- call `sum(abs(number))` where `abs` is DataFusion Function and `sum` is flow function +CREATE FLOW test_numbers_df_func +SINK TO out_num_cnt_df_func +AS +SELECT sum(abs(number)) FROM numbers_input_df_func GROUP BY tumble(ts, '1 second', '2021-07-01 00:00:00'); + +select flush_flow('test_numbers_df_func')<=1; + +INSERT INTO numbers_input_df_func +VALUES + (-20, "2021-07-01 00:00:00.200"), + (22, "2021-07-01 00:00:00.600"); + +-- flush flow to make sure that table is created and data is inserted +select flush_flow('test_numbers_df_func')<=1; + +SELECT col_0, window_start, window_end FROM out_num_cnt_df_func; + +select flush_flow('test_numbers_df_func')<=1; + +INSERT INTO numbers_input_df_func +VALUES + (23,"2021-07-01 00:00:01.000"), + (-24,"2021-07-01 00:00:01.500"); + +select flush_flow('test_numbers_df_func')<=1; + +SELECT col_0, window_start, window_end FROM out_num_cnt_df_func; + +DROP FLOW test_numbers_df_func; +DROP TABLE numbers_input_df_func; +DROP TABLE out_num_cnt_df_func; + +CREATE TABLE numbers_input_df_func ( + number INT, + ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY(number), + TIME INDEX(ts) +); + +-- call `abs(sum(number))`to make sure that calling `abs` function(impl by datafusion) on `sum` function(impl by flow) is working +CREATE FLOW test_numbers_df_func +SINK TO out_num_cnt_df_func +AS +SELECT abs(sum(number)) FROM numbers_input_df_func GROUP BY tumble(ts, '1 second', '2021-07-01 00:00:00'); + +select flush_flow('test_numbers_df_func')<=1; + +INSERT INTO numbers_input_df_func +VALUES + (-20, "2021-07-01 00:00:00.200"), + (22, "2021-07-01 00:00:00.600"); + +-- flush flow to make sure that table is created and data is inserted +select flush_flow('test_numbers_df_func')<=1; + +SELECT col_0, window_start, window_end FROM out_num_cnt_df_func; + +select flush_flow('test_numbers_df_func')<=1; + +INSERT INTO numbers_input_df_func +VALUES + (23,"2021-07-01 00:00:01.000"), + (-24,"2021-07-01 00:00:01.500"); + +select flush_flow('test_numbers_df_func')<=1; + +SELECT col_0, window_start, window_end FROM out_num_cnt_df_func; + +DROP FLOW test_numbers_df_func; +DROP TABLE numbers_input_df_func; +DROP TABLE out_num_cnt_df_func; + +-- test date_bin +CREATE TABLE numbers_input_df_func ( + number INT, + ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY(number), + TIME INDEX(ts) +); + +CREATE FLOW test_numbers_df_func +SINK TO out_num_cnt_df_func +AS +SELECT max(number) - min(number), date_bin(INTERVAL '1 second', ts, '2021-07-01 00:00:00'::TimestampNanosecond) FROM numbers_input_df_func GROUP BY date_bin(INTERVAL '1 second', ts, '2021-07-01 00:00:00'::TimestampNanosecond); + +select flush_flow('test_numbers_df_func')<=1; + +INSERT INTO numbers_input_df_func +VALUES + (20, "2021-07-01 00:00:00.200"), + (22, "2021-07-01 00:00:00.600"); + +select flush_flow('test_numbers_df_func')<=1; + +SELECT col_0, col_1 FROM out_num_cnt_df_func; + +select flush_flow('test_numbers_df_func')<=1; + +INSERT INTO numbers_input_df_func +VALUES + (23,"2021-07-01 00:00:01.000"), + (24,"2021-07-01 00:00:01.500"); + +select flush_flow('test_numbers_df_func')<=1; + +SELECT col_0, col_1 FROM out_num_cnt_df_func; + +DROP FLOW test_numbers_df_func; +DROP TABLE numbers_input_df_func; +DROP TABLE out_num_cnt_df_func; + + +-- test date_trunc +CREATE TABLE numbers_input_df_func ( + number INT, + ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY(number), + TIME INDEX(ts) +); + +CREATE FLOW test_numbers_df_func +SINK TO out_num_cnt +AS +SELECT date_trunc('second', ts), sum(number) FROM numbers_input_df_func GROUP BY date_trunc('second', ts); + +select flush_flow('test_numbers_df_func')<=1; + +INSERT INTO numbers_input_df_func +VALUES + (20, "2021-07-01 00:00:00.200"), + (22, "2021-07-01 00:00:00.600"); + +select flush_flow('test_numbers_df_func')<=1; + +SELECT col_0, col_1 FROM out_num_cnt; + +select flush_flow('test_numbers_df_func')<=1; + +INSERT INTO numbers_input_df_func +VALUES + (23,"2021-07-01 00:00:01.000"), + (24,"2021-07-01 00:00:01.500"); + +select flush_flow('test_numbers_df_func')<=1; + +SELECT col_0, col_1 FROM out_num_cnt; + +DROP FLOW test_numbers_df_func; +DROP TABLE numbers_input_df_func; +DROP TABLE out_num_cnt; diff --git a/tests/cases/standalone/common/flow/show_create_flow.result b/tests/cases/standalone/common/flow/show_create_flow.result index 004d614f97..a91930ee76 100644 --- a/tests/cases/standalone/common/flow/show_create_flow.result +++ b/tests/cases/standalone/common/flow/show_create_flow.result @@ -1,4 +1,4 @@ -CREATE TABLE numbers_input ( +CREATE TABLE numbers_input_show ( number INT, ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP, PRIMARY KEY(number), @@ -7,75 +7,71 @@ CREATE TABLE numbers_input ( Affected Rows: 0 -create table out_num_cnt ( +create table out_num_cnt_show ( number INT, ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP TIME INDEX); Affected Rows: 0 -SELECT flow_name, table_catalog, flow_definition FROM INFORMATION_SCHEMA.FLOWS; - -+-----------+---------------+-----------------+ -| flow_name | table_catalog | flow_definition | -+-----------+---------------+-----------------+ -+-----------+---------------+-----------------+ - -SHOW FLOWS; +SELECT flow_name, table_catalog, flow_definition FROM INFORMATION_SCHEMA.FLOWS WHERE flow_name='filter_numbers_show'; ++ ++ -CREATE FLOW filter_numbers SINK TO out_num_cnt AS SELECT number FROM numbers_input where number > 10; - -Affected Rows: 0 - -SHOW CREATE FLOW filter_numbers; - -+----------------+-------------------------------------------------------+ -| Flow | Create Flow | -+----------------+-------------------------------------------------------+ -| filter_numbers | CREATE OR REPLACE FLOW IF NOT EXISTS filter_numbers | -| | SINK TO out_num_cnt | -| | AS SELECT number FROM numbers_input WHERE number > 10 | -+----------------+-------------------------------------------------------+ - -SELECT flow_name, table_catalog, flow_definition FROM INFORMATION_SCHEMA.FLOWS; - -+----------------+---------------+----------------------------------------------------+ -| flow_name | table_catalog | flow_definition | -+----------------+---------------+----------------------------------------------------+ -| filter_numbers | greptime | SELECT number FROM numbers_input WHERE number > 10 | -+----------------+---------------+----------------------------------------------------+ - -SHOW FLOWS; - -+----------------+ -| Flows | -+----------------+ -| filter_numbers | -+----------------+ - -drop flow filter_numbers; - -Affected Rows: 0 - -SELECT flow_name, table_catalog, flow_definition FROM INFORMATION_SCHEMA.FLOWS; - -+-----------+---------------+-----------------+ -| flow_name | table_catalog | flow_definition | -+-----------+---------------+-----------------+ -+-----------+---------------+-----------------+ - -SHOW FLOWS; +SHOW FLOWS LIKE 'filter_numbers_show'; ++ ++ -drop table out_num_cnt; +CREATE FLOW filter_numbers_show SINK TO out_num_cnt_show AS SELECT number FROM numbers_input_show where number > 10; Affected Rows: 0 -drop table numbers_input; +SHOW CREATE FLOW filter_numbers_show; + ++---------------------+------------------------------------------------------------+ +| Flow | Create Flow | ++---------------------+------------------------------------------------------------+ +| filter_numbers_show | CREATE OR REPLACE FLOW IF NOT EXISTS filter_numbers_show | +| | SINK TO out_num_cnt_show | +| | AS SELECT number FROM numbers_input_show WHERE number > 10 | ++---------------------+------------------------------------------------------------+ + +SELECT flow_name, table_catalog, flow_definition FROM INFORMATION_SCHEMA.FLOWS WHERE flow_name='filter_numbers_show'; + ++---------------------+---------------+---------------------------------------------------------+ +| flow_name | table_catalog | flow_definition | ++---------------------+---------------+---------------------------------------------------------+ +| filter_numbers_show | greptime | SELECT number FROM numbers_input_show WHERE number > 10 | ++---------------------+---------------+---------------------------------------------------------+ + +SHOW FLOWS LIKE 'filter_numbers_show'; + ++---------------------+ +| Flows | ++---------------------+ +| filter_numbers_show | ++---------------------+ + +drop flow filter_numbers_show; + +Affected Rows: 0 + +SELECT flow_name, table_catalog, flow_definition FROM INFORMATION_SCHEMA.FLOWS WHERE flow_name='filter_numbers_show'; + +++ +++ + +SHOW FLOWS LIKE 'filter_numbers_show'; + +++ +++ + +drop table out_num_cnt_show; + +Affected Rows: 0 + +drop table numbers_input_show; Affected Rows: 0 diff --git a/tests/cases/standalone/common/flow/show_create_flow.sql b/tests/cases/standalone/common/flow/show_create_flow.sql index 0f5907877f..3cf84ed0f3 100644 --- a/tests/cases/standalone/common/flow/show_create_flow.sql +++ b/tests/cases/standalone/common/flow/show_create_flow.sql @@ -1,32 +1,32 @@ -CREATE TABLE numbers_input ( +CREATE TABLE numbers_input_show ( number INT, ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP, PRIMARY KEY(number), TIME INDEX(ts) ); -create table out_num_cnt ( +create table out_num_cnt_show ( number INT, ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP TIME INDEX); -SELECT flow_name, table_catalog, flow_definition FROM INFORMATION_SCHEMA.FLOWS; +SELECT flow_name, table_catalog, flow_definition FROM INFORMATION_SCHEMA.FLOWS WHERE flow_name='filter_numbers_show'; -SHOW FLOWS; +SHOW FLOWS LIKE 'filter_numbers_show'; -CREATE FLOW filter_numbers SINK TO out_num_cnt AS SELECT number FROM numbers_input where number > 10; +CREATE FLOW filter_numbers_show SINK TO out_num_cnt_show AS SELECT number FROM numbers_input_show where number > 10; -SHOW CREATE FLOW filter_numbers; +SHOW CREATE FLOW filter_numbers_show; -SELECT flow_name, table_catalog, flow_definition FROM INFORMATION_SCHEMA.FLOWS; +SELECT flow_name, table_catalog, flow_definition FROM INFORMATION_SCHEMA.FLOWS WHERE flow_name='filter_numbers_show'; -SHOW FLOWS; +SHOW FLOWS LIKE 'filter_numbers_show'; -drop flow filter_numbers; +drop flow filter_numbers_show; -SELECT flow_name, table_catalog, flow_definition FROM INFORMATION_SCHEMA.FLOWS; +SELECT flow_name, table_catalog, flow_definition FROM INFORMATION_SCHEMA.FLOWS WHERE flow_name='filter_numbers_show'; -SHOW FLOWS; +SHOW FLOWS LIKE 'filter_numbers_show'; -drop table out_num_cnt; +drop table out_num_cnt_show; -drop table numbers_input; +drop table numbers_input_show;