feat(flow): flush_flow function (#4416)

* refactor: df err variant

* WIP

* chore: update proto version

* chore: revert mistaken rust-toolchain

* feat(WIP): added FlowService to QueryEngine

* refactor: move flow service to operator

* refactor: flush use flow name not id

* refactor: use full path in macro

* feat: flush flow

* feat: impl flush flow

* chore: remove unused

* chore: meaninful response

* chore: remove unused

* chore: clippy

* fix: flush_flow with proper blocking

* test: sqlness tests added back for flow

* test: better predicate for flush_flow

* refactor: rwlock

* fix: flush lock

* fix: flush lock write then drop

* test: add a new flow sqlness test

* fix: sqlness testcase

* chore: style

---------

Co-authored-by: dennis zhuang <killme2008@gmail.com>
This commit is contained in:
discord9
2024-07-27 07:04:13 +08:00
committed by GitHub
parent 0710e6ff36
commit 021ec7b6ac
51 changed files with 1415 additions and 161 deletions

4
Cargo.lock generated
View File

@@ -1949,6 +1949,7 @@ dependencies = [
"serde_json",
"session",
"snafu 0.8.3",
"sql",
"statrs",
"store-api",
"table",
@@ -3896,6 +3897,7 @@ dependencies = [
"common-datasource",
"common-error",
"common-frontend",
"common-function",
"common-grpc",
"common-macro",
"common-meta",
@@ -4232,7 +4234,7 @@ checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b"
[[package]]
name = "greptime-proto"
version = "0.1.0"
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=5c801650435d464891114502539b701c77a1b914#5c801650435d464891114502539b701c77a1b914"
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=7ca323090b3ae8faf2c15036b7f41b7c5225cf5f#7ca323090b3ae8faf2c15036b7f41b7c5225cf5f"
dependencies = [
"prost 0.12.6",
"serde",

View File

@@ -119,7 +119,7 @@ etcd-client = { version = "0.13" }
fst = "0.4.7"
futures = "0.3"
futures-util = "0.3"
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "5c801650435d464891114502539b701c77a1b914" }
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "7ca323090b3ae8faf2c15036b7f41b7c5225cf5f" }
humantime = "2.1"
humantime-serde = "1.1"
itertools = "0.10"

View File

@@ -288,6 +288,7 @@ async fn create_query_engine(meta_addr: &str) -> Result<DatafusionQueryEngine> {
None,
None,
None,
None,
false,
plugins.clone(),
));

View File

@@ -31,6 +31,7 @@ serde.workspace = true
serde_json.workspace = true
session.workspace = true
snafu.workspace = true
sql.workspace = true
statrs = "0.16"
store-api.workspace = true
table.workspace = true

View File

@@ -0,0 +1,164 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use common_error::ext::BoxedError;
use common_macro::admin_fn;
use common_query::error::{
ExecuteSnafu, InvalidFuncArgsSnafu, MissingFlowServiceHandlerSnafu, Result,
UnsupportedInputDataTypeSnafu,
};
use common_query::prelude::Signature;
use datafusion::logical_expr::Volatility;
use datatypes::value::{Value, ValueRef};
use session::context::QueryContextRef;
use snafu::{ensure, ResultExt};
use sql::parser::ParserContext;
use store_api::storage::ConcreteDataType;
use crate::handlers::FlowServiceHandlerRef;
fn flush_signature() -> Signature {
Signature::uniform(
1,
vec![ConcreteDataType::string_datatype()],
Volatility::Immutable,
)
}
#[admin_fn(
name = FlushFlowFunction,
display_name = flush_flow,
sig_fn = flush_signature,
ret = uint64
)]
pub(crate) async fn flush_flow(
flow_service_handler: &FlowServiceHandlerRef,
query_ctx: &QueryContextRef,
params: &[ValueRef<'_>],
) -> Result<Value> {
let (catalog_name, flow_name) = parse_flush_flow(params, query_ctx)?;
let res = flow_service_handler
.flush(&catalog_name, &flow_name, query_ctx.clone())
.await?;
let affected_rows = res.affected_rows;
Ok(Value::from(affected_rows))
}
fn parse_flush_flow(
params: &[ValueRef<'_>],
query_ctx: &QueryContextRef,
) -> Result<(String, String)> {
ensure!(
params.len() == 1,
InvalidFuncArgsSnafu {
err_msg: format!(
"The length of the args is not correct, expect 1, have: {}",
params.len()
),
}
);
let ValueRef::String(flow_name) = params[0] else {
return UnsupportedInputDataTypeSnafu {
function: "flush_flow",
datatypes: params.iter().map(|v| v.data_type()).collect::<Vec<_>>(),
}
.fail();
};
let obj_name = ParserContext::parse_table_name(flow_name, query_ctx.sql_dialect())
.map_err(BoxedError::new)
.context(ExecuteSnafu)?;
let (catalog_name, flow_name) = match &obj_name.0[..] {
[flow_name] => (
query_ctx.current_catalog().to_string(),
flow_name.value.clone(),
),
[catalog, flow_name] => (catalog.value.clone(), flow_name.value.clone()),
_ => {
return InvalidFuncArgsSnafu {
err_msg: format!(
"expect flow name to be <catalog>.<flow-name> or <flow-name>, actual: {}",
obj_name
),
}
.fail()
}
};
Ok((catalog_name, flow_name))
}
#[cfg(test)]
mod test {
use std::sync::Arc;
use datatypes::scalars::ScalarVector;
use datatypes::vectors::StringVector;
use session::context::QueryContext;
use super::*;
use crate::function::{Function, FunctionContext};
#[test]
fn test_flush_flow_metadata() {
let f = FlushFlowFunction;
assert_eq!("flush_flow", f.name());
assert_eq!(
ConcreteDataType::uint64_datatype(),
f.return_type(&[]).unwrap()
);
assert_eq!(
f.signature(),
Signature::uniform(
1,
vec![ConcreteDataType::string_datatype()],
Volatility::Immutable,
)
);
}
#[test]
fn test_missing_flow_service() {
let f = FlushFlowFunction;
let args = vec!["flow_name"];
let args = args
.into_iter()
.map(|arg| Arc::new(StringVector::from_slice(&[arg])) as _)
.collect::<Vec<_>>();
let result = f.eval(FunctionContext::default(), &args).unwrap_err();
assert_eq!(
"Missing FlowServiceHandler, not expected",
result.to_string()
);
}
#[test]
fn test_parse_flow_args() {
let testcases = [
("flow_name", ("greptime", "flow_name")),
("catalog.flow_name", ("catalog", "flow_name")),
];
for (input, expected) in testcases.iter() {
let args = vec![*input];
let args = args.into_iter().map(ValueRef::String).collect::<Vec<_>>();
let result = parse_flush_flow(&args, &QueryContext::arc()).unwrap();
assert_eq!(*expected, (result.0.as_str(), result.1.as_str()));
}
}
}

View File

@@ -65,6 +65,19 @@ pub trait ProcedureServiceHandler: Send + Sync {
async fn query_procedure_state(&self, pid: &str) -> Result<ProcedureStateResponse>;
}
/// This flow service handler is only use for flush flow for now.
#[async_trait]
pub trait FlowServiceHandler: Send + Sync {
async fn flush(
&self,
catalog: &str,
flow: &str,
ctx: QueryContextRef,
) -> Result<api::v1::flow::FlowResponse>;
}
pub type TableMutationHandlerRef = Arc<dyn TableMutationHandler>;
pub type ProcedureServiceHandlerRef = Arc<dyn ProcedureServiceHandler>;
pub type FlowServiceHandlerRef = Arc<dyn FlowServiceHandler>;

View File

@@ -15,6 +15,7 @@
#![feature(let_chains)]
#![feature(try_blocks)]
mod flush_flow;
mod macros;
pub mod scalars;
mod system;

View File

@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use crate::handlers::{ProcedureServiceHandlerRef, TableMutationHandlerRef};
use crate::handlers::{FlowServiceHandlerRef, ProcedureServiceHandlerRef, TableMutationHandlerRef};
/// Shared state for SQL functions.
/// The handlers in state may be `None` in cli command-line or test cases.
@@ -22,6 +22,8 @@ pub struct FunctionState {
pub table_mutation_handler: Option<TableMutationHandlerRef>,
// The procedure service handler
pub procedure_service_handler: Option<ProcedureServiceHandlerRef>,
// The flownode handler
pub flow_service_handler: Option<FlowServiceHandlerRef>,
}
impl FunctionState {
@@ -42,9 +44,10 @@ impl FunctionState {
CompactTableRequest, DeleteRequest, FlushTableRequest, InsertRequest,
};
use crate::handlers::{ProcedureServiceHandler, TableMutationHandler};
use crate::handlers::{FlowServiceHandler, ProcedureServiceHandler, TableMutationHandler};
struct MockProcedureServiceHandler;
struct MockTableMutationHandler;
struct MockFlowServiceHandler;
const ROWS: usize = 42;
#[async_trait]
@@ -116,9 +119,22 @@ impl FunctionState {
}
}
#[async_trait]
impl FlowServiceHandler for MockFlowServiceHandler {
async fn flush(
&self,
_catalog: &str,
_flow: &str,
_ctx: QueryContextRef,
) -> Result<api::v1::flow::FlowResponse> {
todo!()
}
}
Self {
table_mutation_handler: Some(Arc::new(MockTableMutationHandler)),
procedure_service_handler: Some(Arc::new(MockProcedureServiceHandler)),
flow_service_handler: Some(Arc::new(MockFlowServiceHandler)),
}
}
}

View File

@@ -12,26 +12,19 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::fmt;
use api::v1::meta::ProcedureStatus;
use common_macro::admin_fn;
use common_meta::rpc::procedure::ProcedureStateResponse;
use common_query::error::Error::ThreadJoin;
use common_query::error::{
InvalidFuncArgsSnafu, MissingProcedureServiceHandlerSnafu, Result,
UnsupportedInputDataTypeSnafu,
};
use common_query::prelude::{Signature, Volatility};
use common_telemetry::error;
use datatypes::prelude::*;
use datatypes::vectors::VectorRef;
use serde::Serialize;
use session::context::QueryContextRef;
use snafu::{ensure, Location, OptionExt};
use snafu::ensure;
use crate::ensure_greptime;
use crate::function::{Function, FunctionContext};
use crate::handlers::ProcedureServiceHandlerRef;
#[derive(Serialize)]
@@ -103,6 +96,7 @@ mod tests {
use datatypes::vectors::StringVector;
use super::*;
use crate::function::{Function, FunctionContext};
#[test]
fn test_procedure_state_misc() {

View File

@@ -22,6 +22,7 @@ use flush_compact_region::{CompactRegionFunction, FlushRegionFunction};
use flush_compact_table::{CompactTableFunction, FlushTableFunction};
use migrate_region::MigrateRegionFunction;
use crate::flush_flow::FlushFlowFunction;
use crate::function_registry::FunctionRegistry;
/// Table functions
@@ -35,5 +36,6 @@ impl TableFunction {
registry.register(Arc::new(CompactRegionFunction));
registry.register(Arc::new(FlushTableFunction));
registry.register(Arc::new(CompactTableFunction));
registry.register(Arc::new(FlushFlowFunction));
}
}

View File

@@ -12,23 +12,16 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::fmt;
use common_macro::admin_fn;
use common_query::error::Error::ThreadJoin;
use common_query::error::{
InvalidFuncArgsSnafu, MissingTableMutationHandlerSnafu, Result, UnsupportedInputDataTypeSnafu,
};
use common_query::prelude::{Signature, Volatility};
use common_telemetry::error;
use datatypes::prelude::*;
use datatypes::vectors::VectorRef;
use session::context::QueryContextRef;
use snafu::{ensure, Location, OptionExt};
use snafu::ensure;
use store_api::storage::RegionId;
use crate::ensure_greptime;
use crate::function::{Function, FunctionContext};
use crate::handlers::TableMutationHandlerRef;
use crate::helper::cast_u64;
@@ -84,6 +77,7 @@ mod tests {
use datatypes::vectors::UInt64Vector;
use super::*;
use crate::function::{Function, FunctionContext};
macro_rules! define_region_function_test {
($name: ident, $func: ident) => {

View File

@@ -12,28 +12,23 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::fmt;
use std::str::FromStr;
use api::v1::region::{compact_request, StrictWindow};
use common_error::ext::BoxedError;
use common_macro::admin_fn;
use common_query::error::Error::ThreadJoin;
use common_query::error::{
InvalidFuncArgsSnafu, MissingTableMutationHandlerSnafu, Result, TableMutationSnafu,
UnsupportedInputDataTypeSnafu,
};
use common_query::prelude::{Signature, Volatility};
use common_telemetry::{error, info};
use common_telemetry::info;
use datatypes::prelude::*;
use datatypes::vectors::VectorRef;
use session::context::QueryContextRef;
use session::table_name::table_name_to_full_name;
use snafu::{ensure, Location, OptionExt, ResultExt};
use snafu::{ensure, ResultExt};
use table::requests::{CompactTableRequest, FlushTableRequest};
use crate::ensure_greptime;
use crate::function::{Function, FunctionContext};
use crate::handlers::TableMutationHandlerRef;
/// Compact type: strict window.
@@ -209,6 +204,7 @@ mod tests {
use session::context::QueryContext;
use super::*;
use crate::function::{Function, FunctionContext};
macro_rules! define_table_function_test {
($name: ident, $func: ident) => {

View File

@@ -12,24 +12,16 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::fmt::{self};
use std::time::Duration;
use common_macro::admin_fn;
use common_meta::rpc::procedure::MigrateRegionRequest;
use common_query::error::Error::ThreadJoin;
use common_query::error::{InvalidFuncArgsSnafu, MissingProcedureServiceHandlerSnafu, Result};
use common_query::prelude::{Signature, TypeSignature, Volatility};
use common_telemetry::error;
use datatypes::data_type::DataType;
use datatypes::prelude::ConcreteDataType;
use datatypes::value::{Value, ValueRef};
use datatypes::vectors::VectorRef;
use session::context::QueryContextRef;
use snafu::{Location, OptionExt};
use crate::ensure_greptime;
use crate::function::{Function, FunctionContext};
use crate::handlers::ProcedureServiceHandlerRef;
use crate::helper::cast_u64;
@@ -128,9 +120,10 @@ mod tests {
use std::sync::Arc;
use common_query::prelude::TypeSignature;
use datatypes::vectors::{StringVector, UInt64Vector};
use datatypes::vectors::{StringVector, UInt64Vector, VectorRef};
use super::*;
use crate::function::{Function, FunctionContext};
#[test]
fn test_migrate_region_misc() {

View File

@@ -153,6 +153,7 @@ fn build_struct(
let ret = Ident::new(&format!("{ret}_datatype"), ret.span());
let uppcase_display_name = display_name.to_uppercase();
// Get the handler name in function state by the argument ident
// TODO(discord9): consider simple depend injection if more handlers are needed
let (handler, snafu_type) = match handler_type.to_string().as_str() {
"ProcedureServiceHandlerRef" => (
Ident::new("procedure_service_handler", handler_type.span()),
@@ -163,6 +164,11 @@ fn build_struct(
Ident::new("table_mutation_handler", handler_type.span()),
Ident::new("MissingTableMutationHandlerSnafu", handler_type.span()),
),
"FlowServiceHandlerRef" => (
Ident::new("flow_service_handler", handler_type.span()),
Ident::new("MissingFlowServiceHandlerSnafu", handler_type.span()),
),
handler => ok!(error!(
handler_type.span(),
format!("Unknown handler type: {handler}")
@@ -174,29 +180,29 @@ fn build_struct(
#[derive(Debug)]
#vis struct #name;
impl fmt::Display for #name {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
impl std::fmt::Display for #name {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, #uppcase_display_name)
}
}
impl Function for #name {
impl crate::function::Function for #name {
fn name(&self) -> &'static str {
#display_name
}
fn return_type(&self, _input_types: &[ConcreteDataType]) -> Result<ConcreteDataType> {
Ok(ConcreteDataType::#ret())
fn return_type(&self, _input_types: &[store_api::storage::ConcreteDataType]) -> common_query::error::Result<store_api::storage::ConcreteDataType> {
Ok(store_api::storage::ConcreteDataType::#ret())
}
fn signature(&self) -> Signature {
#sig_fn()
}
fn eval(&self, func_ctx: FunctionContext, columns: &[VectorRef]) -> Result<VectorRef> {
fn eval(&self, func_ctx: crate::function::FunctionContext, columns: &[datatypes::vectors::VectorRef]) -> common_query::error::Result<datatypes::vectors::VectorRef> {
// Ensure under the `greptime` catalog for security
ensure_greptime!(func_ctx);
crate::ensure_greptime!(func_ctx);
let columns_num = columns.len();
let rows_num = if columns.is_empty() {
@@ -208,6 +214,9 @@ fn build_struct(
// TODO(dennis): DataFusion doesn't support async UDF currently
std::thread::spawn(move || {
use snafu::OptionExt;
use datatypes::data_type::DataType;
let query_ctx = &func_ctx.query_ctx;
let handler = func_ctx
.state
@@ -215,7 +224,7 @@ fn build_struct(
.as_ref()
.context(#snafu_type)?;
let mut builder = ConcreteDataType::#ret()
let mut builder = store_api::storage::ConcreteDataType::#ret()
.create_mutable_vector(rows_num);
if columns_num == 0 {
@@ -242,9 +251,9 @@ fn build_struct(
})
.join()
.map_err(|e| {
error!(e; "Join thread error");
ThreadJoin {
location: Location::default(),
common_telemetry::error!(e; "Join thread error");
common_query::error::Error::ThreadJoin {
location: snafu::Location::default(),
}
})?

View File

@@ -73,7 +73,7 @@ pub fn range_fn(args: TokenStream, input: TokenStream) -> TokenStream {
/// Attribute macro to convert a normal function to SQL administration function. The annotated function
/// should accept:
/// - `&ProcedureServiceHandlerRef` or `&TableMutationHandlerRef` as the first argument,
/// - `&ProcedureServiceHandlerRef` or `&TableMutationHandlerRef` or `FlowServiceHandlerRef` as the first argument,
/// - `&QueryContextRef` as the second argument, and
/// - `&[ValueRef<'_>]` as the third argument which is SQL function input values in each row.
/// Return type must be `common_query::error::Result<Value>`.
@@ -85,6 +85,8 @@ pub fn range_fn(args: TokenStream, input: TokenStream) -> TokenStream {
/// - `ret`: The return type of the generated SQL function, it will be transformed into `ConcreteDataType::{ret}_datatype()` result.
/// - `display_name`: The display name of the generated SQL function.
/// - `sig_fn`: the function to returns `Signature` of generated `Function`.
///
/// Note that this macro should only be used in `common-function` crate for now
#[proc_macro_attribute]
pub fn admin_fn(args: TokenStream, input: TokenStream) -> TokenStream {
process_admin_fn(args, input)

View File

@@ -239,6 +239,12 @@ pub enum Error {
location: Location,
},
#[snafu(display("Missing FlowServiceHandler, not expected"))]
MissingFlowServiceHandler {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Invalid function args: {}", err_msg))]
InvalidFuncArgs {
err_msg: String,
@@ -252,6 +258,12 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Can't found alive flownode"))]
FlownodeNotFound {
#[snafu(implicit)]
location: Location,
},
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -269,7 +281,8 @@ impl ErrorExt for Error {
| Error::BadAccumulatorImpl { .. }
| Error::ToScalarValue { .. }
| Error::GetScalarVector { .. }
| Error::ArrowCompute { .. } => StatusCode::EngineExecuteQuery,
| Error::ArrowCompute { .. }
| Error::FlownodeNotFound { .. } => StatusCode::EngineExecuteQuery,
Error::ExecuteFunction { error, .. } | Error::GeneralDataFusion { error, .. } => {
datafusion_status_code::<Self>(error, None)
@@ -283,6 +296,7 @@ impl ErrorExt for Error {
Error::MissingTableMutationHandler { .. }
| Error::MissingProcedureServiceHandler { .. }
| Error::MissingFlowServiceHandler { .. }
| Error::ExecuteRepeatedly { .. }
| Error::ThreadJoin { .. } => StatusCode::Unexpected,

View File

@@ -321,6 +321,7 @@ impl DatanodeBuilder {
None,
None,
None,
None,
false,
self.plugins.clone(),
);

View File

@@ -129,6 +129,10 @@ pub struct FlowWorkerManager {
src_send_buf_lens: RwLock<BTreeMap<TableId, watch::Receiver<usize>>>,
tick_manager: FlowTickManager,
node_id: Option<u32>,
/// Lock for flushing, will be `read` by `handle_inserts` and `write` by `flush_flow`
///
/// So that a series of event like `inserts -> flush` can be handled correctly
flush_lock: RwLock<()>,
}
/// Building FlownodeManager
@@ -161,6 +165,7 @@ impl FlowWorkerManager {
src_send_buf_lens: Default::default(),
tick_manager,
node_id,
flush_lock: RwLock::new(()),
}
}
@@ -562,9 +567,9 @@ impl FlowWorkerManager {
for worker in self.worker_handles.iter() {
// TODO(discord9): consider how to handle error in individual worker
if blocking {
worker.lock().await.run_available(now).await?;
worker.lock().await.run_available(now, blocking).await?;
} else if let Ok(worker) = worker.try_lock() {
worker.run_available(now).await?;
worker.run_available(now, blocking).await?;
} else {
return Ok(row_cnt);
}

View File

@@ -16,7 +16,9 @@
use std::collections::HashMap;
use api::v1::flow::{flow_request, CreateRequest, DropRequest, FlowRequest, FlowResponse};
use api::v1::flow::{
flow_request, CreateRequest, DropRequest, FlowRequest, FlowResponse, FlushFlow,
};
use api::v1::region::InsertRequests;
use common_error::ext::BoxedError;
use common_meta::error::{ExternalSnafu, Result, UnexpectedSnafu};
@@ -92,6 +94,34 @@ impl Flownode for FlowWorkerManager {
.map_err(to_meta_err)?;
Ok(Default::default())
}
Some(flow_request::Body::Flush(FlushFlow {
flow_id: Some(flow_id),
})) => {
// TODO(discord9): impl individual flush
debug!("Starting to flush flow_id={:?}", flow_id);
// lock to make sure writes before flush are written to flow
// and immediately drop to prevent following writes to be blocked
drop(self.flush_lock.write().await);
let flushed_input_rows = self
.node_context
.read()
.await
.flush_all_sender()
.await
.map_err(to_meta_err)?;
let rows_send = self.run_available(true).await.map_err(to_meta_err)?;
let row = self.send_writeback_requests().await.map_err(to_meta_err)?;
debug!(
"Done to flush flow_id={:?} with {} input rows flushed, {} rows sended and {} output rows flushed",
flow_id, flushed_input_rows, rows_send, row
);
Ok(FlowResponse {
affected_flows: vec![flow_id],
affected_rows: row as u64,
..Default::default()
})
}
None => UnexpectedSnafu {
err_msg: "Missing request body",
}
@@ -104,6 +134,10 @@ impl Flownode for FlowWorkerManager {
}
async fn handle_inserts(&self, request: InsertRequests) -> Result<FlowResponse> {
// using try_read makesure two things:
// 1. flush wouldn't happen until inserts before it is inserted
// 2. inserts happening concurrently with flush wouldn't be block by flush
let _flush_lock = self.flush_lock.try_read();
for write_request in request.requests {
let region_id = write_request.region_id;
let table_id = RegionId::from(region_id).table_id();

View File

@@ -151,9 +151,23 @@ impl WorkerHandle {
///
/// will set the current timestamp to `now` for all dataflows before running them
///
/// `blocking` indicate whether it will wait til all dataflows are finished computing if true or
/// just start computing and return immediately if false
///
/// the returned error is unrecoverable, and the worker should be shutdown/rebooted
pub async fn run_available(&self, now: repr::Timestamp) -> Result<(), Error> {
self.itc_client.call_no_resp(Request::RunAvail { now })
pub async fn run_available(&self, now: repr::Timestamp, blocking: bool) -> Result<(), Error> {
common_telemetry::debug!("Running available with blocking={}", blocking);
if blocking {
let resp = self
.itc_client
.call_with_resp(Request::RunAvail { now, blocking })
.await?;
common_telemetry::debug!("Running available with response={:?}", resp);
Ok(())
} else {
self.itc_client
.call_no_resp(Request::RunAvail { now, blocking })
}
}
pub async fn contains_flow(&self, flow_id: FlowId) -> Result<bool, Error> {
@@ -332,9 +346,13 @@ impl<'s> Worker<'s> {
let ret = self.remove_flow(flow_id);
Some(Response::Remove { result: ret })
}
Request::RunAvail { now } => {
Request::RunAvail { now, blocking } => {
self.run_tick(now);
None
if blocking {
Some(Response::RunAvail)
} else {
None
}
}
Request::ContainTask { flow_id } => {
let ret = self.task_states.contains_key(&flow_id);
@@ -365,6 +383,7 @@ pub enum Request {
/// Trigger the worker to run, useful after input buffer is full
RunAvail {
now: repr::Timestamp,
blocking: bool,
},
ContainTask {
flow_id: FlowId,
@@ -384,6 +403,7 @@ enum Response {
ContainTask {
result: bool,
},
RunAvail,
}
fn create_inter_thread_call() -> (InterThreadCallClient, InterThreadCallServer) {
@@ -504,7 +524,7 @@ mod test {
Some(flow_id)
);
tx.send((Row::empty(), 0, 0)).unwrap();
handle.run_available(0).await.unwrap();
handle.run_available(0, true).await.unwrap();
assert_eq!(sink_rx.recv().await.unwrap().0, Row::empty());
drop(handle);
worker_thread_handle.join().unwrap();

View File

@@ -129,9 +129,14 @@ impl<'referred, 'df> Context<'referred, 'df> {
collection.into_inner(),
move |_ctx, recv| {
let data = recv.take_inner();
debug!(
"render_unbounded_sink: send {} rows",
data.iter().map(|i| i.len()).sum::<usize>()
);
for row in data.into_iter().flat_map(|i| i.into_iter()) {
// if the sender is closed, stop sending
if sender.is_closed() {
common_telemetry::error!("UnboundedSink is closed");
break;
}
// TODO(discord9): handling tokio error

View File

@@ -150,6 +150,7 @@ pub enum Error {
#[snafu(display("Datafusion error: {raw:?} in context: {context}"))]
Datafusion {
#[snafu(source)]
raw: datafusion_common::DataFusionError,
context: String,
#[snafu(implicit)]

View File

@@ -21,7 +21,7 @@ use datatypes::prelude::ConcreteDataType;
use datatypes::value::{OrderedF32, OrderedF64, Value};
use serde::{Deserialize, Serialize};
use smallvec::smallvec;
use snafu::{OptionExt, ResultExt};
use snafu::{IntoError, OptionExt, ResultExt};
use strum::{EnumIter, IntoEnumIterator};
use crate::error::{DatafusionSnafu, Error, InvalidQuerySnafu};
@@ -201,11 +201,10 @@ impl AggregateFunc {
}
.fail()
} else {
DatafusionSnafu {
raw: err,
Err(DatafusionSnafu {
context: "Error when parsing aggregate function",
}
.fail()
.into_error(err))
}
})?;

View File

@@ -364,12 +364,10 @@ impl ScalarExpr {
.fn_impl
// TODO(discord9): get scheme from args instead?
.data_type(df_scalar_fn.df_schema.as_arrow())
.map_err(|err| {
.context({
DatafusionSnafu {
raw: err,
context: "Failed to get data type from datafusion scalar function",
}
.build()
})?;
let typ = ConcreteDataType::try_from(&arrow_typ)
.map_err(BoxedError::new)

View File

@@ -376,12 +376,10 @@ impl RelationDesc {
.collect();
let arrow_schema = arrow_schema::Schema::new(fields);
DFSchema::try_from(arrow_schema.clone()).map_err(|err| {
DFSchema::try_from(arrow_schema.clone()).context({
DatafusionSnafu {
raw: err,
context: format!("Error when converting to DFSchema: {:?}", arrow_schema),
}
.build()
})
}

View File

@@ -275,6 +275,7 @@ impl FlownodeBuilder {
None,
None,
None,
None,
false,
Default::default(),
);

View File

@@ -19,6 +19,8 @@ use std::sync::Arc;
use bytes::buf::IntoIter;
use common_error::ext::BoxedError;
use common_telemetry::info;
use datafusion::optimizer::simplify_expressions::SimplifyExpressions;
use datafusion::optimizer::{OptimizerContext, OptimizerRule};
use datatypes::data_type::ConcreteDataType as CDT;
use literal::{from_substrait_literal, from_substrait_type};
use prost::Message;
@@ -39,8 +41,8 @@ use substrait_proto::proto::extensions::SimpleExtensionDeclaration;
use crate::adapter::FlownodeContext;
use crate::error::{
Error, ExternalSnafu, InvalidQueryProstSnafu, NotImplementedSnafu, TableNotFoundSnafu,
UnexpectedSnafu,
DatafusionSnafu, Error, ExternalSnafu, InvalidQueryProstSnafu, NotImplementedSnafu,
TableNotFoundSnafu, UnexpectedSnafu,
};
use crate::expr::GlobalId;
use crate::plan::TypedPlan;
@@ -135,6 +137,12 @@ pub async fn sql_to_flow_plan(
.map_err(BoxedError::new)
.context(ExternalSnafu)?;
let LogicalPlan::DfPlan(plan) = plan;
let plan = SimplifyExpressions::new()
.rewrite(plan, &OptimizerContext::default())
.context(DatafusionSnafu {
context: "Fail to apply `SimplifyExpressions` optimization",
})?
.data;
let sub_plan = DFLogicalSubstraitConvertor {}
.to_sub_plan(&plan, DefaultSerializer)
.map_err(BoxedError::new)
@@ -292,7 +300,7 @@ mod test {
};
catalog_list.register_table_sync(req_with_ts).unwrap();
let factory = query::QueryEngineFactory::new(catalog_list, None, None, None, false);
let factory = query::QueryEngineFactory::new(catalog_list, None, None, None, None, false);
let engine = factory.query_engine();
engine.register_function(Arc::new(TumbleFunction {}));

View File

@@ -96,21 +96,15 @@ pub(crate) async fn from_scalar_fn_to_df_fn_impl(
)
.await
;
let expr = df_expr.map_err(|err| {
let expr = df_expr.context({
DatafusionSnafu {
raw: err,
context: "Failed to convert substrait scalar function to datafusion scalar function",
}
.build()
})?;
let phy_expr =
datafusion::physical_expr::create_physical_expr(&expr, &schema, &Default::default())
.map_err(|err| {
DatafusionSnafu {
raw: err,
context: "Failed to create physical expression from logical expression",
}
.build()
.context(DatafusionSnafu {
context: "Failed to create physical expression from logical expression",
})?;
Ok(phy_expr)
}

View File

@@ -26,6 +26,7 @@ common-config.workspace = true
common-datasource.workspace = true
common-error.workspace = true
common-frontend.workspace = true
common-function.workspace = true
common-grpc.workspace = true
common-macro.workspace = true
common-meta.workspace = true

View File

@@ -20,10 +20,12 @@ use common_base::Plugins;
use common_meta::cache::{LayeredCacheRegistryRef, TableRouteCacheRef};
use common_meta::cache_invalidator::{CacheInvalidatorRef, DummyCacheInvalidator};
use common_meta::ddl::ProcedureExecutorRef;
use common_meta::key::flow::FlowMetadataManager;
use common_meta::key::TableMetadataManager;
use common_meta::kv_backend::KvBackendRef;
use common_meta::node_manager::NodeManagerRef;
use operator::delete::Deleter;
use operator::flow::FlowServiceOperator;
use operator::insert::Inserter;
use operator::procedure::ProcedureServiceOperator;
use operator::request::Requester;
@@ -153,11 +155,15 @@ impl FrontendBuilder {
self.procedure_executor.clone(),
));
let flow_metadata_manager = Arc::new(FlowMetadataManager::new(kv_backend.clone()));
let flow_service = FlowServiceOperator::new(flow_metadata_manager, node_manager.clone());
let query_engine = QueryEngineFactory::new_with_plugins(
self.catalog_manager.clone(),
Some(region_query_handler.clone()),
Some(table_mutation_handler),
Some(procedure_service_handler),
Some(Arc::new(flow_service)),
true,
plugins.clone(),
)

129
src/operator/src/flow.rs Normal file
View File

@@ -0,0 +1,129 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use api::v1::flow::FlowRequestHeader;
use async_trait::async_trait;
use common_error::ext::BoxedError;
use common_function::handlers::FlowServiceHandler;
use common_meta::key::flow::FlowMetadataManagerRef;
use common_meta::node_manager::NodeManagerRef;
use common_query::error::Result;
use common_telemetry::tracing_context::TracingContext;
use futures::stream::FuturesUnordered;
use futures::{StreamExt, TryStreamExt};
use session::context::QueryContextRef;
use snafu::{OptionExt, ResultExt};
/// The operator for flow service which implements [`FlowServiceHandler`].
pub struct FlowServiceOperator {
flow_metadata_manager: FlowMetadataManagerRef,
node_manager: NodeManagerRef,
}
impl FlowServiceOperator {
pub fn new(
flow_metadata_manager: FlowMetadataManagerRef,
node_manager: NodeManagerRef,
) -> Self {
Self {
flow_metadata_manager,
node_manager,
}
}
}
#[async_trait]
impl FlowServiceHandler for FlowServiceOperator {
async fn flush(
&self,
catalog: &str,
flow: &str,
ctx: QueryContextRef,
) -> Result<api::v1::flow::FlowResponse> {
self.flush_inner(catalog, flow, ctx).await
}
}
impl FlowServiceOperator {
/// Flush the flownodes according to the flow id.
async fn flush_inner(
&self,
catalog: &str,
flow: &str,
ctx: QueryContextRef,
) -> Result<api::v1::flow::FlowResponse> {
let id = self
.flow_metadata_manager
.flow_name_manager()
.get(catalog, flow)
.await
.map_err(BoxedError::new)
.context(common_query::error::ExecuteSnafu)?
.context(common_meta::error::FlowNotFoundSnafu {
flow_name: format!("{}.{}", catalog, flow),
})
.map_err(BoxedError::new)
.context(common_query::error::ExecuteSnafu)?
.flow_id();
let all_flownode_peers = self
.flow_metadata_manager
.flow_route_manager()
.routes(id)
.try_collect::<Vec<_>>()
.await
.map_err(BoxedError::new)
.context(common_query::error::ExecuteSnafu)?;
// order of flownodes doesn't matter here
let all_flow_nodes = FuturesUnordered::from_iter(
all_flownode_peers
.iter()
.map(|(_key, peer)| self.node_manager.flownode(peer.peer())),
)
.collect::<Vec<_>>()
.await;
let mut final_result: Option<api::v1::flow::FlowResponse> = None;
for node in all_flow_nodes {
let res = {
use api::v1::flow::{flow_request, FlowRequest, FlushFlow};
let flush_req = FlowRequest {
header: Some(FlowRequestHeader {
tracing_context: TracingContext::from_current_span().to_w3c(),
query_context: Some(
common_meta::rpc::ddl::QueryContext::from(ctx.clone()).into(),
),
}),
body: Some(flow_request::Body::Flush(FlushFlow {
flow_id: Some(api::v1::FlowId { id }),
})),
};
node.handle(flush_req)
.await
.map_err(BoxedError::new)
.context(common_query::error::ExecuteSnafu)?
};
if let Some(prev) = &mut final_result {
prev.affected_rows = res.affected_rows;
prev.affected_flows.extend(res.affected_flows);
prev.extension.extend(res.extension);
} else {
final_result = Some(res);
}
}
final_result.context(common_query::error::FlownodeNotFoundSnafu)
}
}

View File

@@ -18,6 +18,7 @@
pub mod delete;
pub mod error;
pub mod expr_factory;
pub mod flow;
pub mod insert;
pub mod metrics;
pub mod procedure;

View File

@@ -543,7 +543,7 @@ mod tests {
};
catalog_manager.register_table_sync(req).unwrap();
QueryEngineFactory::new(catalog_manager, None, None, None, false).query_engine()
QueryEngineFactory::new(catalog_manager, None, None, None, None, false).query_engine()
}
#[tokio::test]

View File

@@ -24,7 +24,9 @@ use catalog::CatalogManagerRef;
use common_base::Plugins;
use common_function::function::FunctionRef;
use common_function::function_registry::FUNCTION_REGISTRY;
use common_function::handlers::{ProcedureServiceHandlerRef, TableMutationHandlerRef};
use common_function::handlers::{
FlowServiceHandlerRef, ProcedureServiceHandlerRef, TableMutationHandlerRef,
};
use common_function::scalars::aggregate::AggregateFunctionMetaRef;
use common_query::prelude::ScalarUdf;
use common_query::Output;
@@ -103,6 +105,7 @@ impl QueryEngineFactory {
region_query_handler: Option<RegionQueryHandlerRef>,
table_mutation_handler: Option<TableMutationHandlerRef>,
procedure_service_handler: Option<ProcedureServiceHandlerRef>,
flow_service_handler: Option<FlowServiceHandlerRef>,
with_dist_planner: bool,
) -> Self {
Self::new_with_plugins(
@@ -110,6 +113,7 @@ impl QueryEngineFactory {
region_query_handler,
table_mutation_handler,
procedure_service_handler,
flow_service_handler,
with_dist_planner,
Default::default(),
)
@@ -120,6 +124,7 @@ impl QueryEngineFactory {
region_query_handler: Option<RegionQueryHandlerRef>,
table_mutation_handler: Option<TableMutationHandlerRef>,
procedure_service_handler: Option<ProcedureServiceHandlerRef>,
flow_service_handler: Option<FlowServiceHandlerRef>,
with_dist_planner: bool,
plugins: Plugins,
) -> Self {
@@ -128,6 +133,7 @@ impl QueryEngineFactory {
region_query_handler,
table_mutation_handler,
procedure_service_handler,
flow_service_handler,
with_dist_planner,
plugins.clone(),
));
@@ -161,7 +167,7 @@ mod tests {
#[test]
fn test_query_engine_factory() {
let catalog_list = catalog::memory::new_memory_catalog_manager().unwrap();
let factory = QueryEngineFactory::new(catalog_list, None, None, None, false);
let factory = QueryEngineFactory::new(catalog_list, None, None, None, None, false);
let engine = factory.query_engine();

View File

@@ -82,6 +82,7 @@ impl QueryEngineContext {
None,
None,
None,
None,
false,
Plugins::default(),
));

View File

@@ -140,7 +140,7 @@ mod tests {
#[tokio::test]
async fn test_serializer_decode_plan() {
let catalog_list = catalog::memory::new_memory_catalog_manager().unwrap();
let factory = QueryEngineFactory::new(catalog_list, None, None, None, false);
let factory = QueryEngineFactory::new(catalog_list, None, None, None, None, false);
let engine = factory.query_engine();

View File

@@ -20,7 +20,9 @@ use async_trait::async_trait;
use catalog::CatalogManagerRef;
use common_base::Plugins;
use common_function::function::FunctionRef;
use common_function::handlers::{ProcedureServiceHandlerRef, TableMutationHandlerRef};
use common_function::handlers::{
FlowServiceHandlerRef, ProcedureServiceHandlerRef, TableMutationHandlerRef,
};
use common_function::scalars::aggregate::AggregateFunctionMetaRef;
use common_function::state::FunctionState;
use common_query::prelude::ScalarUdf;
@@ -83,6 +85,7 @@ impl QueryEngineState {
region_query_handler: Option<RegionQueryHandlerRef>,
table_mutation_handler: Option<TableMutationHandlerRef>,
procedure_service_handler: Option<ProcedureServiceHandlerRef>,
flow_service_handler: Option<FlowServiceHandlerRef>,
with_dist_planner: bool,
plugins: Plugins,
) -> Self {
@@ -138,6 +141,7 @@ impl QueryEngineState {
function_state: Arc::new(FunctionState {
table_mutation_handler,
procedure_service_handler,
flow_service_handler,
}),
aggregate_functions: Arc::new(RwLock::new(HashMap::new())),
extension_rules,

View File

@@ -608,7 +608,7 @@ mod test {
table,
})
.is_ok());
QueryEngineFactory::new(catalog_list, None, None, None, false).query_engine()
QueryEngineFactory::new(catalog_list, None, None, None, None, false).query_engine()
}
async fn do_query(sql: &str) -> Result<crate::plan::LogicalPlan> {

View File

@@ -52,5 +52,5 @@ async fn exec_selection(engine: QueryEngineRef, sql: &str) -> Vec<RecordBatch> {
pub fn new_query_engine_with_table(table: TableRef) -> QueryEngineRef {
let catalog_manager = MemoryCatalogManager::new_with_table(table);
QueryEngineFactory::new(catalog_manager, None, None, None, false).query_engine()
QueryEngineFactory::new(catalog_manager, None, None, None, None, false).query_engine()
}

View File

@@ -47,7 +47,7 @@ async fn test_datafusion_query_engine() -> Result<()> {
let catalog_list = catalog::memory::new_memory_catalog_manager()
.map_err(BoxedError::new)
.context(QueryExecutionSnafu)?;
let factory = QueryEngineFactory::new(catalog_list, None, None, None, false);
let factory = QueryEngineFactory::new(catalog_list, None, None, None, None, false);
let engine = factory.query_engine();
let column_schemas = vec![ColumnSchema::new(
@@ -129,7 +129,7 @@ async fn test_query_validate() -> Result<()> {
});
let factory =
QueryEngineFactory::new_with_plugins(catalog_list, None, None, None, false, plugins);
QueryEngineFactory::new_with_plugins(catalog_list, None, None, None, None, false, plugins);
let engine = factory.query_engine();
let stmt =
@@ -159,7 +159,7 @@ async fn test_udf() -> Result<()> {
common_telemetry::init_default_ut_logging();
let catalog_list = catalog_manager()?;
let factory = QueryEngineFactory::new(catalog_list, None, None, None, false);
let factory = QueryEngineFactory::new(catalog_list, None, None, None, None, false);
let engine = factory.query_engine();
let pow = make_scalar_function(pow);

View File

@@ -102,7 +102,8 @@ fn create_test_engine() -> TimeRangeTester {
};
let _ = catalog_manager.register_table_sync(req).unwrap();
let engine = QueryEngineFactory::new(catalog_manager, None, None, None, false).query_engine();
let engine =
QueryEngineFactory::new(catalog_manager, None, None, None, None, false).query_engine();
TimeRangeTester { engine, filter }
}

View File

@@ -53,7 +53,7 @@ pub(crate) fn sample_script_engine() -> PyEngine {
let catalog_manager =
MemoryCatalogManager::new_with_table(NumbersTable::table(NUMBERS_TABLE_ID));
let query_engine =
QueryEngineFactory::new(catalog_manager, None, None, None, false).query_engine();
QueryEngineFactory::new(catalog_manager, None, None, None, None, false).query_engine();
PyEngine::new(query_engine.clone())
}

View File

@@ -398,7 +398,7 @@ mod tests {
let catalog_manager =
MemoryCatalogManager::new_with_table(NumbersTable::table(NUMBERS_TABLE_ID));
let query_engine =
QueryEngineFactory::new(catalog_manager, None, None, None, false).query_engine();
QueryEngineFactory::new(catalog_manager, None, None, None, None, false).query_engine();
PyEngine::new(query_engine.clone())
}

View File

@@ -56,7 +56,7 @@ pub async fn setup_scripts_manager(
let catalog_manager = MemoryCatalogManager::new_with_table(table.clone());
let factory = QueryEngineFactory::new(catalog_manager.clone(), None, None, None, false);
let factory = QueryEngineFactory::new(catalog_manager.clone(), None, None, None, None, false);
let query_engine = factory.query_engine();
let mgr = ScriptManager::new(Arc::new(MockGrpcQueryHandler {}) as _, query_engine)
.await

View File

@@ -215,7 +215,7 @@ impl GrpcQueryHandler for DummyInstance {
fn create_testing_instance(table: TableRef) -> DummyInstance {
let catalog_manager = MemoryCatalogManager::new_with_table(table);
let query_engine =
QueryEngineFactory::new(catalog_manager, None, None, None, false).query_engine();
QueryEngineFactory::new(catalog_manager, None, None, None, None, false).query_engine();
DummyInstance::new(query_engine)
}

View File

@@ -0,0 +1,216 @@
CREATE TABLE numbers_input_basic (
number INT,
ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY(number),
TIME INDEX(ts)
);
Affected Rows: 0
CREATE FLOW test_numbers_basic
SINK TO out_num_cnt_basic
AS
SELECT sum(number) FROM numbers_input_basic GROUP BY tumble(ts, '1 second', '2021-07-01 00:00:00');
Affected Rows: 0
-- TODO(discord9): confirm if it's necessary to flush flow here?
-- because flush_flow result is at most 1
select flush_flow('test_numbers_basic')<=1;
+----------------------------------------------------+
| flush_flow(Utf8("test_numbers_basic")) <= Int64(1) |
+----------------------------------------------------+
| true |
+----------------------------------------------------+
INSERT INTO numbers_input_basic
VALUES
(20, "2021-07-01 00:00:00.200"),
(22, "2021-07-01 00:00:00.600");
Affected Rows: 2
select flush_flow('test_numbers_basic')<=1;
+----------------------------------------------------+
| flush_flow(Utf8("test_numbers_basic")) <= Int64(1) |
+----------------------------------------------------+
| true |
+----------------------------------------------------+
SELECT col_0, window_start, window_end FROM out_num_cnt_basic;
+-------+---------------------+---------------------+
| col_0 | window_start | window_end |
+-------+---------------------+---------------------+
| 42 | 2021-07-01T00:00:00 | 2021-07-01T00:00:01 |
+-------+---------------------+---------------------+
select flush_flow('test_numbers_basic')<=1;
+----------------------------------------------------+
| flush_flow(Utf8("test_numbers_basic")) <= Int64(1) |
+----------------------------------------------------+
| true |
+----------------------------------------------------+
INSERT INTO numbers_input_basic
VALUES
(23,"2021-07-01 00:00:01.000"),
(24,"2021-07-01 00:00:01.500");
Affected Rows: 2
select flush_flow('test_numbers_basic')<=1;
+----------------------------------------------------+
| flush_flow(Utf8("test_numbers_basic")) <= Int64(1) |
+----------------------------------------------------+
| true |
+----------------------------------------------------+
SELECT col_0, window_start, window_end FROM out_num_cnt_basic;
+-------+---------------------+---------------------+
| col_0 | window_start | window_end |
+-------+---------------------+---------------------+
| 42 | 2021-07-01T00:00:00 | 2021-07-01T00:00:01 |
| 47 | 2021-07-01T00:00:01 | 2021-07-01T00:00:02 |
+-------+---------------------+---------------------+
DROP FLOW test_numbers_basic;
Affected Rows: 0
DROP TABLE numbers_input_basic;
Affected Rows: 0
DROP TABLE out_num_cnt_basic;
Affected Rows: 0
-- test interprete interval
CREATE TABLE numbers_input_basic (
number INT,
ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY(number),
TIME INDEX(ts)
);
Affected Rows: 0
create table out_num_cnt_basic (
number INT,
ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP TIME INDEX);
Affected Rows: 0
CREATE FLOW filter_numbers_basic SINK TO out_num_cnt_basic AS SELECT INTERVAL '1 day 1 second', INTERVAL '1 month 1 day 1 second', INTERVAL '1 year 1 month' FROM numbers_input_basic where number > 10;
Affected Rows: 0
SHOW CREATE FLOW filter_numbers_basic;
+----------------------+----------------------------------------------------------------------------------------------------------------------------------------------+
| Flow | Create Flow |
+----------------------+----------------------------------------------------------------------------------------------------------------------------------------------+
| filter_numbers_basic | CREATE OR REPLACE FLOW IF NOT EXISTS filter_numbers_basic |
| | SINK TO out_num_cnt_basic |
| | AS SELECT INTERVAL '1 day 1 second', INTERVAL '1 month 1 day 1 second', INTERVAL '1 year 1 month' FROM numbers_input_basic WHERE number > 10 |
+----------------------+----------------------------------------------------------------------------------------------------------------------------------------------+
drop flow filter_numbers_basic;
Affected Rows: 0
drop table out_num_cnt_basic;
Affected Rows: 0
drop table numbers_input_basic;
Affected Rows: 0
CREATE TABLE bytes_log (
byte INT,
ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP, -- event time
TIME INDEX(ts)
);
Affected Rows: 0
CREATE TABLE approx_rate (
rate FLOAT,
time_window TIMESTAMP,
update_at TIMESTAMP,
TIME INDEX(time_window)
);
Affected Rows: 0
CREATE FLOW find_approx_rate
SINK TO approx_rate
AS
SELECT CAST((max(byte) - min(byte)) AS FLOAT)/30.0, date_bin(INTERVAL '30 second', ts) as time_window from bytes_log GROUP BY time_window;
Affected Rows: 0
INSERT INTO bytes_log VALUES
(101, '2025-01-01 00:00:01'),
(300, '2025-01-01 00:00:29');
Affected Rows: 2
SELECT flush_flow('find_approx_rate')<=1;
+--------------------------------------------------+
| flush_flow(Utf8("find_approx_rate")) <= Int64(1) |
+--------------------------------------------------+
| true |
+--------------------------------------------------+
SELECT rate, time_window FROM approx_rate;
+----------+---------------------+
| rate | time_window |
+----------+---------------------+
| 6.633333 | 2025-01-01T00:00:00 |
+----------+---------------------+
INSERT INTO bytes_log VALUES
(450, '2025-01-01 00:00:32'),
(500, '2025-01-01 00:00:37');
Affected Rows: 2
SELECT flush_flow('find_approx_rate')<=1;
+--------------------------------------------------+
| flush_flow(Utf8("find_approx_rate")) <= Int64(1) |
+--------------------------------------------------+
| true |
+--------------------------------------------------+
SELECT rate, time_window FROM approx_rate;
+-----------+---------------------+
| rate | time_window |
+-----------+---------------------+
| 6.633333 | 2025-01-01T00:00:00 |
| 1.6666666 | 2025-01-01T00:00:30 |
+-----------+---------------------+
DROP TABLE bytes_log;
Affected Rows: 0
DROP FLOW find_approx_rate;
Affected Rows: 0
DROP TABLE approx_rate;
Affected Rows: 0

View File

@@ -0,0 +1,99 @@
CREATE TABLE numbers_input_basic (
number INT,
ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY(number),
TIME INDEX(ts)
);
CREATE FLOW test_numbers_basic
SINK TO out_num_cnt_basic
AS
SELECT sum(number) FROM numbers_input_basic GROUP BY tumble(ts, '1 second', '2021-07-01 00:00:00');
-- TODO(discord9): confirm if it's necessary to flush flow here?
-- because flush_flow result is at most 1
select flush_flow('test_numbers_basic')<=1;
INSERT INTO numbers_input_basic
VALUES
(20, "2021-07-01 00:00:00.200"),
(22, "2021-07-01 00:00:00.600");
select flush_flow('test_numbers_basic')<=1;
SELECT col_0, window_start, window_end FROM out_num_cnt_basic;
select flush_flow('test_numbers_basic')<=1;
INSERT INTO numbers_input_basic
VALUES
(23,"2021-07-01 00:00:01.000"),
(24,"2021-07-01 00:00:01.500");
select flush_flow('test_numbers_basic')<=1;
SELECT col_0, window_start, window_end FROM out_num_cnt_basic;
DROP FLOW test_numbers_basic;
DROP TABLE numbers_input_basic;
DROP TABLE out_num_cnt_basic;
-- test interprete interval
CREATE TABLE numbers_input_basic (
number INT,
ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY(number),
TIME INDEX(ts)
);
create table out_num_cnt_basic (
number INT,
ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP TIME INDEX);
CREATE FLOW filter_numbers_basic SINK TO out_num_cnt_basic AS SELECT INTERVAL '1 day 1 second', INTERVAL '1 month 1 day 1 second', INTERVAL '1 year 1 month' FROM numbers_input_basic where number > 10;
SHOW CREATE FLOW filter_numbers_basic;
drop flow filter_numbers_basic;
drop table out_num_cnt_basic;
drop table numbers_input_basic;
CREATE TABLE bytes_log (
byte INT,
ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP, -- event time
TIME INDEX(ts)
);
CREATE TABLE approx_rate (
rate FLOAT,
time_window TIMESTAMP,
update_at TIMESTAMP,
TIME INDEX(time_window)
);
CREATE FLOW find_approx_rate
SINK TO approx_rate
AS
SELECT CAST((max(byte) - min(byte)) AS FLOAT)/30.0, date_bin(INTERVAL '30 second', ts) as time_window from bytes_log GROUP BY time_window;
INSERT INTO bytes_log VALUES
(101, '2025-01-01 00:00:01'),
(300, '2025-01-01 00:00:29');
SELECT flush_flow('find_approx_rate')<=1;
SELECT rate, time_window FROM approx_rate;
INSERT INTO bytes_log VALUES
(450, '2025-01-01 00:00:32'),
(500, '2025-01-01 00:00:37');
SELECT flush_flow('find_approx_rate')<=1;
SELECT rate, time_window FROM approx_rate;
DROP TABLE bytes_log;
DROP FLOW find_approx_rate;
DROP TABLE approx_rate;

View File

@@ -0,0 +1,370 @@
CREATE TABLE numbers_input_df_func (
number INT,
ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY(number),
TIME INDEX(ts)
);
Affected Rows: 0
-- call `sum(abs(number))` where `abs` is DataFusion Function and `sum` is flow function
CREATE FLOW test_numbers_df_func
SINK TO out_num_cnt_df_func
AS
SELECT sum(abs(number)) FROM numbers_input_df_func GROUP BY tumble(ts, '1 second', '2021-07-01 00:00:00');
Affected Rows: 0
select flush_flow('test_numbers_df_func')<=1;
+------------------------------------------------------+
| flush_flow(Utf8("test_numbers_df_func")) <= Int64(1) |
+------------------------------------------------------+
| true |
+------------------------------------------------------+
INSERT INTO numbers_input_df_func
VALUES
(-20, "2021-07-01 00:00:00.200"),
(22, "2021-07-01 00:00:00.600");
Affected Rows: 2
-- flush flow to make sure that table is created and data is inserted
select flush_flow('test_numbers_df_func')<=1;
+------------------------------------------------------+
| flush_flow(Utf8("test_numbers_df_func")) <= Int64(1) |
+------------------------------------------------------+
| true |
+------------------------------------------------------+
SELECT col_0, window_start, window_end FROM out_num_cnt_df_func;
+-------+---------------------+---------------------+
| col_0 | window_start | window_end |
+-------+---------------------+---------------------+
| 42 | 2021-07-01T00:00:00 | 2021-07-01T00:00:01 |
+-------+---------------------+---------------------+
select flush_flow('test_numbers_df_func')<=1;
+------------------------------------------------------+
| flush_flow(Utf8("test_numbers_df_func")) <= Int64(1) |
+------------------------------------------------------+
| true |
+------------------------------------------------------+
INSERT INTO numbers_input_df_func
VALUES
(23,"2021-07-01 00:00:01.000"),
(-24,"2021-07-01 00:00:01.500");
Affected Rows: 2
select flush_flow('test_numbers_df_func')<=1;
+------------------------------------------------------+
| flush_flow(Utf8("test_numbers_df_func")) <= Int64(1) |
+------------------------------------------------------+
| true |
+------------------------------------------------------+
SELECT col_0, window_start, window_end FROM out_num_cnt_df_func;
+-------+---------------------+---------------------+
| col_0 | window_start | window_end |
+-------+---------------------+---------------------+
| 42 | 2021-07-01T00:00:00 | 2021-07-01T00:00:01 |
| 47 | 2021-07-01T00:00:01 | 2021-07-01T00:00:02 |
+-------+---------------------+---------------------+
DROP FLOW test_numbers_df_func;
Affected Rows: 0
DROP TABLE numbers_input_df_func;
Affected Rows: 0
DROP TABLE out_num_cnt_df_func;
Affected Rows: 0
CREATE TABLE numbers_input_df_func (
number INT,
ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY(number),
TIME INDEX(ts)
);
Affected Rows: 0
-- call `abs(sum(number))`to make sure that calling `abs` function(impl by datafusion) on `sum` function(impl by flow) is working
CREATE FLOW test_numbers_df_func
SINK TO out_num_cnt_df_func
AS
SELECT abs(sum(number)) FROM numbers_input_df_func GROUP BY tumble(ts, '1 second', '2021-07-01 00:00:00');
Affected Rows: 0
select flush_flow('test_numbers_df_func')<=1;
+------------------------------------------------------+
| flush_flow(Utf8("test_numbers_df_func")) <= Int64(1) |
+------------------------------------------------------+
| true |
+------------------------------------------------------+
INSERT INTO numbers_input_df_func
VALUES
(-20, "2021-07-01 00:00:00.200"),
(22, "2021-07-01 00:00:00.600");
Affected Rows: 2
-- flush flow to make sure that table is created and data is inserted
select flush_flow('test_numbers_df_func')<=1;
+------------------------------------------------------+
| flush_flow(Utf8("test_numbers_df_func")) <= Int64(1) |
+------------------------------------------------------+
| true |
+------------------------------------------------------+
SELECT col_0, window_start, window_end FROM out_num_cnt_df_func;
+-------+---------------------+---------------------+
| col_0 | window_start | window_end |
+-------+---------------------+---------------------+
| 2 | 2021-07-01T00:00:00 | 2021-07-01T00:00:01 |
+-------+---------------------+---------------------+
select flush_flow('test_numbers_df_func')<=1;
+------------------------------------------------------+
| flush_flow(Utf8("test_numbers_df_func")) <= Int64(1) |
+------------------------------------------------------+
| true |
+------------------------------------------------------+
INSERT INTO numbers_input_df_func
VALUES
(23,"2021-07-01 00:00:01.000"),
(-24,"2021-07-01 00:00:01.500");
Affected Rows: 2
select flush_flow('test_numbers_df_func')<=1;
+------------------------------------------------------+
| flush_flow(Utf8("test_numbers_df_func")) <= Int64(1) |
+------------------------------------------------------+
| true |
+------------------------------------------------------+
SELECT col_0, window_start, window_end FROM out_num_cnt_df_func;
+-------+---------------------+---------------------+
| col_0 | window_start | window_end |
+-------+---------------------+---------------------+
| 2 | 2021-07-01T00:00:00 | 2021-07-01T00:00:01 |
| 1 | 2021-07-01T00:00:01 | 2021-07-01T00:00:02 |
+-------+---------------------+---------------------+
DROP FLOW test_numbers_df_func;
Affected Rows: 0
DROP TABLE numbers_input_df_func;
Affected Rows: 0
DROP TABLE out_num_cnt_df_func;
Affected Rows: 0
-- test date_bin
CREATE TABLE numbers_input_df_func (
number INT,
ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY(number),
TIME INDEX(ts)
);
Affected Rows: 0
CREATE FLOW test_numbers_df_func
SINK TO out_num_cnt_df_func
AS
SELECT max(number) - min(number), date_bin(INTERVAL '1 second', ts, '2021-07-01 00:00:00'::TimestampNanosecond) FROM numbers_input_df_func GROUP BY date_bin(INTERVAL '1 second', ts, '2021-07-01 00:00:00'::TimestampNanosecond);
Affected Rows: 0
select flush_flow('test_numbers_df_func')<=1;
+------------------------------------------------------+
| flush_flow(Utf8("test_numbers_df_func")) <= Int64(1) |
+------------------------------------------------------+
| true |
+------------------------------------------------------+
INSERT INTO numbers_input_df_func
VALUES
(20, "2021-07-01 00:00:00.200"),
(22, "2021-07-01 00:00:00.600");
Affected Rows: 2
select flush_flow('test_numbers_df_func')<=1;
+------------------------------------------------------+
| flush_flow(Utf8("test_numbers_df_func")) <= Int64(1) |
+------------------------------------------------------+
| true |
+------------------------------------------------------+
SELECT col_0, col_1 FROM out_num_cnt_df_func;
+-------+---------------------+
| col_0 | col_1 |
+-------+---------------------+
| 2 | 2021-07-01T00:00:00 |
+-------+---------------------+
select flush_flow('test_numbers_df_func')<=1;
+------------------------------------------------------+
| flush_flow(Utf8("test_numbers_df_func")) <= Int64(1) |
+------------------------------------------------------+
| true |
+------------------------------------------------------+
INSERT INTO numbers_input_df_func
VALUES
(23,"2021-07-01 00:00:01.000"),
(24,"2021-07-01 00:00:01.500");
Affected Rows: 2
select flush_flow('test_numbers_df_func')<=1;
+------------------------------------------------------+
| flush_flow(Utf8("test_numbers_df_func")) <= Int64(1) |
+------------------------------------------------------+
| true |
+------------------------------------------------------+
SELECT col_0, col_1 FROM out_num_cnt_df_func;
+-------+---------------------+
| col_0 | col_1 |
+-------+---------------------+
| 2 | 2021-07-01T00:00:00 |
| 1 | 2021-07-01T00:00:01 |
+-------+---------------------+
DROP FLOW test_numbers_df_func;
Affected Rows: 0
DROP TABLE numbers_input_df_func;
Affected Rows: 0
DROP TABLE out_num_cnt_df_func;
Affected Rows: 0
-- test date_trunc
CREATE TABLE numbers_input_df_func (
number INT,
ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY(number),
TIME INDEX(ts)
);
Affected Rows: 0
CREATE FLOW test_numbers_df_func
SINK TO out_num_cnt
AS
SELECT date_trunc('second', ts), sum(number) FROM numbers_input_df_func GROUP BY date_trunc('second', ts);
Affected Rows: 0
select flush_flow('test_numbers_df_func')<=1;
+------------------------------------------------------+
| flush_flow(Utf8("test_numbers_df_func")) <= Int64(1) |
+------------------------------------------------------+
| true |
+------------------------------------------------------+
INSERT INTO numbers_input_df_func
VALUES
(20, "2021-07-01 00:00:00.200"),
(22, "2021-07-01 00:00:00.600");
Affected Rows: 2
select flush_flow('test_numbers_df_func')<=1;
+------------------------------------------------------+
| flush_flow(Utf8("test_numbers_df_func")) <= Int64(1) |
+------------------------------------------------------+
| true |
+------------------------------------------------------+
SELECT col_0, col_1 FROM out_num_cnt;
+---------------------+-------+
| col_0 | col_1 |
+---------------------+-------+
| 2021-07-01T00:00:00 | 42 |
+---------------------+-------+
select flush_flow('test_numbers_df_func')<=1;
+------------------------------------------------------+
| flush_flow(Utf8("test_numbers_df_func")) <= Int64(1) |
+------------------------------------------------------+
| true |
+------------------------------------------------------+
INSERT INTO numbers_input_df_func
VALUES
(23,"2021-07-01 00:00:01.000"),
(24,"2021-07-01 00:00:01.500");
Affected Rows: 2
select flush_flow('test_numbers_df_func')<=1;
+------------------------------------------------------+
| flush_flow(Utf8("test_numbers_df_func")) <= Int64(1) |
+------------------------------------------------------+
| true |
+------------------------------------------------------+
SELECT col_0, col_1 FROM out_num_cnt;
+---------------------+-------+
| col_0 | col_1 |
+---------------------+-------+
| 2021-07-01T00:00:00 | 42 |
| 2021-07-01T00:00:01 | 47 |
+---------------------+-------+
DROP FLOW test_numbers_df_func;
Affected Rows: 0
DROP TABLE numbers_input_df_func;
Affected Rows: 0
DROP TABLE out_num_cnt;
Affected Rows: 0

View File

@@ -0,0 +1,158 @@
CREATE TABLE numbers_input_df_func (
number INT,
ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY(number),
TIME INDEX(ts)
);
-- call `sum(abs(number))` where `abs` is DataFusion Function and `sum` is flow function
CREATE FLOW test_numbers_df_func
SINK TO out_num_cnt_df_func
AS
SELECT sum(abs(number)) FROM numbers_input_df_func GROUP BY tumble(ts, '1 second', '2021-07-01 00:00:00');
select flush_flow('test_numbers_df_func')<=1;
INSERT INTO numbers_input_df_func
VALUES
(-20, "2021-07-01 00:00:00.200"),
(22, "2021-07-01 00:00:00.600");
-- flush flow to make sure that table is created and data is inserted
select flush_flow('test_numbers_df_func')<=1;
SELECT col_0, window_start, window_end FROM out_num_cnt_df_func;
select flush_flow('test_numbers_df_func')<=1;
INSERT INTO numbers_input_df_func
VALUES
(23,"2021-07-01 00:00:01.000"),
(-24,"2021-07-01 00:00:01.500");
select flush_flow('test_numbers_df_func')<=1;
SELECT col_0, window_start, window_end FROM out_num_cnt_df_func;
DROP FLOW test_numbers_df_func;
DROP TABLE numbers_input_df_func;
DROP TABLE out_num_cnt_df_func;
CREATE TABLE numbers_input_df_func (
number INT,
ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY(number),
TIME INDEX(ts)
);
-- call `abs(sum(number))`to make sure that calling `abs` function(impl by datafusion) on `sum` function(impl by flow) is working
CREATE FLOW test_numbers_df_func
SINK TO out_num_cnt_df_func
AS
SELECT abs(sum(number)) FROM numbers_input_df_func GROUP BY tumble(ts, '1 second', '2021-07-01 00:00:00');
select flush_flow('test_numbers_df_func')<=1;
INSERT INTO numbers_input_df_func
VALUES
(-20, "2021-07-01 00:00:00.200"),
(22, "2021-07-01 00:00:00.600");
-- flush flow to make sure that table is created and data is inserted
select flush_flow('test_numbers_df_func')<=1;
SELECT col_0, window_start, window_end FROM out_num_cnt_df_func;
select flush_flow('test_numbers_df_func')<=1;
INSERT INTO numbers_input_df_func
VALUES
(23,"2021-07-01 00:00:01.000"),
(-24,"2021-07-01 00:00:01.500");
select flush_flow('test_numbers_df_func')<=1;
SELECT col_0, window_start, window_end FROM out_num_cnt_df_func;
DROP FLOW test_numbers_df_func;
DROP TABLE numbers_input_df_func;
DROP TABLE out_num_cnt_df_func;
-- test date_bin
CREATE TABLE numbers_input_df_func (
number INT,
ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY(number),
TIME INDEX(ts)
);
CREATE FLOW test_numbers_df_func
SINK TO out_num_cnt_df_func
AS
SELECT max(number) - min(number), date_bin(INTERVAL '1 second', ts, '2021-07-01 00:00:00'::TimestampNanosecond) FROM numbers_input_df_func GROUP BY date_bin(INTERVAL '1 second', ts, '2021-07-01 00:00:00'::TimestampNanosecond);
select flush_flow('test_numbers_df_func')<=1;
INSERT INTO numbers_input_df_func
VALUES
(20, "2021-07-01 00:00:00.200"),
(22, "2021-07-01 00:00:00.600");
select flush_flow('test_numbers_df_func')<=1;
SELECT col_0, col_1 FROM out_num_cnt_df_func;
select flush_flow('test_numbers_df_func')<=1;
INSERT INTO numbers_input_df_func
VALUES
(23,"2021-07-01 00:00:01.000"),
(24,"2021-07-01 00:00:01.500");
select flush_flow('test_numbers_df_func')<=1;
SELECT col_0, col_1 FROM out_num_cnt_df_func;
DROP FLOW test_numbers_df_func;
DROP TABLE numbers_input_df_func;
DROP TABLE out_num_cnt_df_func;
-- test date_trunc
CREATE TABLE numbers_input_df_func (
number INT,
ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY(number),
TIME INDEX(ts)
);
CREATE FLOW test_numbers_df_func
SINK TO out_num_cnt
AS
SELECT date_trunc('second', ts), sum(number) FROM numbers_input_df_func GROUP BY date_trunc('second', ts);
select flush_flow('test_numbers_df_func')<=1;
INSERT INTO numbers_input_df_func
VALUES
(20, "2021-07-01 00:00:00.200"),
(22, "2021-07-01 00:00:00.600");
select flush_flow('test_numbers_df_func')<=1;
SELECT col_0, col_1 FROM out_num_cnt;
select flush_flow('test_numbers_df_func')<=1;
INSERT INTO numbers_input_df_func
VALUES
(23,"2021-07-01 00:00:01.000"),
(24,"2021-07-01 00:00:01.500");
select flush_flow('test_numbers_df_func')<=1;
SELECT col_0, col_1 FROM out_num_cnt;
DROP FLOW test_numbers_df_func;
DROP TABLE numbers_input_df_func;
DROP TABLE out_num_cnt;

View File

@@ -1,4 +1,4 @@
CREATE TABLE numbers_input (
CREATE TABLE numbers_input_show (
number INT,
ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY(number),
@@ -7,75 +7,71 @@ CREATE TABLE numbers_input (
Affected Rows: 0
create table out_num_cnt (
create table out_num_cnt_show (
number INT,
ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP TIME INDEX);
Affected Rows: 0
SELECT flow_name, table_catalog, flow_definition FROM INFORMATION_SCHEMA.FLOWS;
+-----------+---------------+-----------------+
| flow_name | table_catalog | flow_definition |
+-----------+---------------+-----------------+
+-----------+---------------+-----------------+
SHOW FLOWS;
SELECT flow_name, table_catalog, flow_definition FROM INFORMATION_SCHEMA.FLOWS WHERE flow_name='filter_numbers_show';
++
++
CREATE FLOW filter_numbers SINK TO out_num_cnt AS SELECT number FROM numbers_input where number > 10;
Affected Rows: 0
SHOW CREATE FLOW filter_numbers;
+----------------+-------------------------------------------------------+
| Flow | Create Flow |
+----------------+-------------------------------------------------------+
| filter_numbers | CREATE OR REPLACE FLOW IF NOT EXISTS filter_numbers |
| | SINK TO out_num_cnt |
| | AS SELECT number FROM numbers_input WHERE number > 10 |
+----------------+-------------------------------------------------------+
SELECT flow_name, table_catalog, flow_definition FROM INFORMATION_SCHEMA.FLOWS;
+----------------+---------------+----------------------------------------------------+
| flow_name | table_catalog | flow_definition |
+----------------+---------------+----------------------------------------------------+
| filter_numbers | greptime | SELECT number FROM numbers_input WHERE number > 10 |
+----------------+---------------+----------------------------------------------------+
SHOW FLOWS;
+----------------+
| Flows |
+----------------+
| filter_numbers |
+----------------+
drop flow filter_numbers;
Affected Rows: 0
SELECT flow_name, table_catalog, flow_definition FROM INFORMATION_SCHEMA.FLOWS;
+-----------+---------------+-----------------+
| flow_name | table_catalog | flow_definition |
+-----------+---------------+-----------------+
+-----------+---------------+-----------------+
SHOW FLOWS;
SHOW FLOWS LIKE 'filter_numbers_show';
++
++
drop table out_num_cnt;
CREATE FLOW filter_numbers_show SINK TO out_num_cnt_show AS SELECT number FROM numbers_input_show where number > 10;
Affected Rows: 0
drop table numbers_input;
SHOW CREATE FLOW filter_numbers_show;
+---------------------+------------------------------------------------------------+
| Flow | Create Flow |
+---------------------+------------------------------------------------------------+
| filter_numbers_show | CREATE OR REPLACE FLOW IF NOT EXISTS filter_numbers_show |
| | SINK TO out_num_cnt_show |
| | AS SELECT number FROM numbers_input_show WHERE number > 10 |
+---------------------+------------------------------------------------------------+
SELECT flow_name, table_catalog, flow_definition FROM INFORMATION_SCHEMA.FLOWS WHERE flow_name='filter_numbers_show';
+---------------------+---------------+---------------------------------------------------------+
| flow_name | table_catalog | flow_definition |
+---------------------+---------------+---------------------------------------------------------+
| filter_numbers_show | greptime | SELECT number FROM numbers_input_show WHERE number > 10 |
+---------------------+---------------+---------------------------------------------------------+
SHOW FLOWS LIKE 'filter_numbers_show';
+---------------------+
| Flows |
+---------------------+
| filter_numbers_show |
+---------------------+
drop flow filter_numbers_show;
Affected Rows: 0
SELECT flow_name, table_catalog, flow_definition FROM INFORMATION_SCHEMA.FLOWS WHERE flow_name='filter_numbers_show';
++
++
SHOW FLOWS LIKE 'filter_numbers_show';
++
++
drop table out_num_cnt_show;
Affected Rows: 0
drop table numbers_input_show;
Affected Rows: 0

View File

@@ -1,32 +1,32 @@
CREATE TABLE numbers_input (
CREATE TABLE numbers_input_show (
number INT,
ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY(number),
TIME INDEX(ts)
);
create table out_num_cnt (
create table out_num_cnt_show (
number INT,
ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP TIME INDEX);
SELECT flow_name, table_catalog, flow_definition FROM INFORMATION_SCHEMA.FLOWS;
SELECT flow_name, table_catalog, flow_definition FROM INFORMATION_SCHEMA.FLOWS WHERE flow_name='filter_numbers_show';
SHOW FLOWS;
SHOW FLOWS LIKE 'filter_numbers_show';
CREATE FLOW filter_numbers SINK TO out_num_cnt AS SELECT number FROM numbers_input where number > 10;
CREATE FLOW filter_numbers_show SINK TO out_num_cnt_show AS SELECT number FROM numbers_input_show where number > 10;
SHOW CREATE FLOW filter_numbers;
SHOW CREATE FLOW filter_numbers_show;
SELECT flow_name, table_catalog, flow_definition FROM INFORMATION_SCHEMA.FLOWS;
SELECT flow_name, table_catalog, flow_definition FROM INFORMATION_SCHEMA.FLOWS WHERE flow_name='filter_numbers_show';
SHOW FLOWS;
SHOW FLOWS LIKE 'filter_numbers_show';
drop flow filter_numbers;
drop flow filter_numbers_show;
SELECT flow_name, table_catalog, flow_definition FROM INFORMATION_SCHEMA.FLOWS;
SELECT flow_name, table_catalog, flow_definition FROM INFORMATION_SCHEMA.FLOWS WHERE flow_name='filter_numbers_show';
SHOW FLOWS;
SHOW FLOWS LIKE 'filter_numbers_show';
drop table out_num_cnt;
drop table out_num_cnt_show;
drop table numbers_input;
drop table numbers_input_show;