Compare commits

...

20 Commits

Author SHA1 Message Date
discord9
ed676d97c7 refactor: rename FlowWorkerManager to FlowStreamingEngine 2025-04-23 11:36:30 +08:00
discord9
14b2badded chore: better error variant 2025-04-22 19:52:19 +08:00
discord9
3626a50395 chore: use better error variant 2025-04-22 17:30:24 +08:00
discord9
0d0dad4ba2 chore: update docs 2025-04-22 17:09:42 +08:00
discord9
ae00e28b2a refactor: per review partially 2025-04-22 17:09:42 +08:00
discord9
92d2fafb33 chore: per review rename args 2025-04-22 17:09:42 +08:00
discord9
30b3600597 chore: per review 2025-04-22 17:09:42 +08:00
discord9
87f1a8c622 refactor: per review 2025-04-22 17:09:42 +08:00
discord9
8e815fc385 chore: add comments per review 2025-04-22 17:09:42 +08:00
discord9
ca46bd04ee chore: better logging 2025-04-22 17:09:42 +08:00
discord9
d32ade7399 fix: query without time window also clean dirty time window 2025-04-22 17:09:42 +08:00
discord9
b4aa0c8b8b refactor: per review 2025-04-22 17:09:42 +08:00
discord9
e647559d27 refactor: AddAutoColumnRewriter check for Projection 2025-04-22 17:09:42 +08:00
discord9
d2c4767d41 docs: explain nodeid use in check task 2025-04-22 17:09:42 +08:00
discord9
82cee11eea test: add align time window test 2025-04-22 17:09:42 +08:00
discord9
6d0470c3fb feat: flush_flow flush all ranges now 2025-04-22 17:09:42 +08:00
discord9
47a267e29c fix: add locks for create/drop flow&docs: update docs 2025-04-22 17:09:42 +08:00
discord9
fa13d06fc6 chore: update proto to main branch 2025-04-22 17:09:42 +08:00
discord9
26d9517c3e chore: update proto 2025-04-22 17:09:42 +08:00
discord9
a7da9af5de feat: use flow batching engine
broken: try using logical plan

fix: use dummy catalog for logical plan

fix: insert plan exec&sqlness grpc addr

feat: use frontend instance in flownode in standalone

feat: flow type in metasrv&fix: flush flow out of sync& column name alias

tests: sqlness update

tests: sqlness flow rebuild udpate

chore: per review

refactor: keep chnl mgr

refactor: use catalog mgr for get table

tests: use valid sql

fix: add more check

refactor: put flow type determine to frontend
2025-04-22 17:09:42 +08:00
54 changed files with 2091 additions and 504 deletions

3
Cargo.lock generated
View File

@@ -4505,6 +4505,7 @@ dependencies = [
"arc-swap", "arc-swap",
"async-trait", "async-trait",
"auth", "auth",
"bytes",
"cache", "cache",
"catalog", "catalog",
"client", "client",
@@ -4943,7 +4944,7 @@ dependencies = [
[[package]] [[package]]
name = "greptime-proto" name = "greptime-proto"
version = "0.1.0" version = "0.1.0"
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=b6d9cffd43c4e6358805a798f17e03e232994b82#b6d9cffd43c4e6358805a798f17e03e232994b82" source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=e82b0158cd38d4021edb4e4c0ae77f999051e62f#e82b0158cd38d4021edb4e4c0ae77f999051e62f"
dependencies = [ dependencies = [
"prost 0.13.5", "prost 0.13.5",
"serde", "serde",

View File

@@ -129,7 +129,7 @@ etcd-client = "0.14"
fst = "0.4.7" fst = "0.4.7"
futures = "0.3" futures = "0.3"
futures-util = "0.3" futures-util = "0.3"
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "b6d9cffd43c4e6358805a798f17e03e232994b82" } greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "e82b0158cd38d4021edb4e4c0ae77f999051e62f" }
hex = "0.4" hex = "0.4"
http = "1" http = "1"
humantime = "2.1" humantime = "2.1"

View File

@@ -514,6 +514,7 @@ fn query_request_type(request: &QueryRequest) -> &'static str {
Some(Query::Sql(_)) => "query.sql", Some(Query::Sql(_)) => "query.sql",
Some(Query::LogicalPlan(_)) => "query.logical_plan", Some(Query::LogicalPlan(_)) => "query.logical_plan",
Some(Query::PromRangeQuery(_)) => "query.prom_range", Some(Query::PromRangeQuery(_)) => "query.prom_range",
Some(Query::InsertIntoPlan(_)) => "query.insert_into_plan",
None => "query.empty", None => "query.empty",
} }
} }

View File

@@ -27,7 +27,7 @@ use session::context::QueryContextRef;
use snafu::{ensure, OptionExt, ResultExt}; use snafu::{ensure, OptionExt, ResultExt};
use table::metadata::TableType; use table::metadata::TableType;
use table::table::adapter::DfTableProviderAdapter; use table::table::adapter::DfTableProviderAdapter;
mod dummy_catalog; pub mod dummy_catalog;
use dummy_catalog::DummyCatalogList; use dummy_catalog::DummyCatalogList;
use table::TableRef; use table::TableRef;

View File

@@ -345,7 +345,7 @@ impl StartCommand {
let client = Arc::new(NodeClients::new(channel_config)); let client = Arc::new(NodeClients::new(channel_config));
let invoker = FrontendInvoker::build_from( let invoker = FrontendInvoker::build_from(
flownode.flow_worker_manager().clone(), flownode.flow_engine().streaming_engine(),
catalog_manager.clone(), catalog_manager.clone(),
cached_meta_backend.clone(), cached_meta_backend.clone(),
layered_cache_registry.clone(), layered_cache_registry.clone(),
@@ -355,7 +355,9 @@ impl StartCommand {
.await .await
.context(StartFlownodeSnafu)?; .context(StartFlownodeSnafu)?;
flownode flownode
.flow_worker_manager() .flow_engine()
.streaming_engine()
// TODO(discord9): refactor and avoid circular reference
.set_frontend_invoker(invoker) .set_frontend_invoker(invoker)
.await; .await;

View File

@@ -56,8 +56,8 @@ use datanode::datanode::{Datanode, DatanodeBuilder};
use datanode::region_server::RegionServer; use datanode::region_server::RegionServer;
use file_engine::config::EngineConfig as FileEngineConfig; use file_engine::config::EngineConfig as FileEngineConfig;
use flow::{ use flow::{
FlowConfig, FlowWorkerManager, FlownodeBuilder, FlownodeInstance, FlownodeOptions, FlowConfig, FlowStreamingEngine, FlownodeBuilder, FlownodeInstance, FlownodeOptions,
FrontendClient, FrontendInvoker, FrontendClient, FrontendInvoker, GrpcQueryHandlerWithBoxedError,
}; };
use frontend::frontend::{Frontend, FrontendOptions}; use frontend::frontend::{Frontend, FrontendOptions};
use frontend::instance::builder::FrontendBuilder; use frontend::instance::builder::FrontendBuilder;
@@ -524,17 +524,17 @@ impl StartCommand {
..Default::default() ..Default::default()
}; };
// TODO(discord9): for standalone not use grpc, but just somehow get a handler to frontend grpc client without // for standalone not use grpc, but get a handler to frontend grpc client without
// actually make a connection // actually make a connection
let fe_server_addr = fe_opts.grpc.bind_addr.clone(); let (frontend_client, frontend_instance_handler) =
let frontend_client = FrontendClient::from_static_grpc_addr(fe_server_addr); FrontendClient::from_empty_grpc_handler();
let flow_builder = FlownodeBuilder::new( let flow_builder = FlownodeBuilder::new(
flownode_options, flownode_options,
plugins.clone(), plugins.clone(),
table_metadata_manager.clone(), table_metadata_manager.clone(),
catalog_manager.clone(), catalog_manager.clone(),
flow_metadata_manager.clone(), flow_metadata_manager.clone(),
Arc::new(frontend_client), Arc::new(frontend_client.clone()),
); );
let flownode = flow_builder let flownode = flow_builder
.build() .build()
@@ -544,15 +544,15 @@ impl StartCommand {
// set the ref to query for the local flow state // set the ref to query for the local flow state
{ {
let flow_worker_manager = flownode.flow_worker_manager(); let flow_worker_manager = flownode.flow_engine().streaming_engine();
information_extension information_extension
.set_flow_worker_manager(flow_worker_manager.clone()) .set_flow_worker_manager(flow_worker_manager)
.await; .await;
} }
let node_manager = Arc::new(StandaloneDatanodeManager { let node_manager = Arc::new(StandaloneDatanodeManager {
region_server: datanode.region_server(), region_server: datanode.region_server(),
flow_server: flownode.flow_worker_manager(), flow_server: flownode.flow_engine(),
}); });
let table_id_sequence = Arc::new( let table_id_sequence = Arc::new(
@@ -606,7 +606,16 @@ impl StartCommand {
.context(error::StartFrontendSnafu)?; .context(error::StartFrontendSnafu)?;
let fe_instance = Arc::new(fe_instance); let fe_instance = Arc::new(fe_instance);
let flow_worker_manager = flownode.flow_worker_manager(); // set the frontend client for flownode
let grpc_handler = fe_instance.clone() as Arc<dyn GrpcQueryHandlerWithBoxedError>;
let weak_grpc_handler = Arc::downgrade(&grpc_handler);
frontend_instance_handler
.lock()
.unwrap()
.replace(weak_grpc_handler);
// set the frontend invoker for flownode
let flow_worker_manager = flownode.flow_engine().streaming_engine();
// flow server need to be able to use frontend to write insert requests back // flow server need to be able to use frontend to write insert requests back
let invoker = FrontendInvoker::build_from( let invoker = FrontendInvoker::build_from(
flow_worker_manager.clone(), flow_worker_manager.clone(),
@@ -694,7 +703,7 @@ pub struct StandaloneInformationExtension {
region_server: RegionServer, region_server: RegionServer,
procedure_manager: ProcedureManagerRef, procedure_manager: ProcedureManagerRef,
start_time_ms: u64, start_time_ms: u64,
flow_worker_manager: RwLock<Option<Arc<FlowWorkerManager>>>, flow_worker_manager: RwLock<Option<Arc<FlowStreamingEngine>>>,
} }
impl StandaloneInformationExtension { impl StandaloneInformationExtension {
@@ -708,7 +717,7 @@ impl StandaloneInformationExtension {
} }
/// Set the flow worker manager for the standalone instance. /// Set the flow worker manager for the standalone instance.
pub async fn set_flow_worker_manager(&self, flow_worker_manager: Arc<FlowWorkerManager>) { pub async fn set_flow_worker_manager(&self, flow_worker_manager: Arc<FlowStreamingEngine>) {
let mut guard = self.flow_worker_manager.write().await; let mut guard = self.flow_worker_manager.write().await;
*guard = Some(flow_worker_manager); *guard = Some(flow_worker_manager);
} }

View File

@@ -38,7 +38,7 @@ use table::metadata::TableId;
use crate::cache_invalidator::Context; use crate::cache_invalidator::Context;
use crate::ddl::utils::{add_peer_context_if_needed, handle_retry_error}; use crate::ddl::utils::{add_peer_context_if_needed, handle_retry_error};
use crate::ddl::DdlContext; use crate::ddl::DdlContext;
use crate::error::{self, Result}; use crate::error::{self, Result, UnexpectedSnafu};
use crate::instruction::{CacheIdent, CreateFlow}; use crate::instruction::{CacheIdent, CreateFlow};
use crate::key::flow::flow_info::FlowInfoValue; use crate::key::flow::flow_info::FlowInfoValue;
use crate::key::flow::flow_route::FlowRouteValue; use crate::key::flow::flow_route::FlowRouteValue;
@@ -171,7 +171,7 @@ impl CreateFlowProcedure {
} }
self.data.state = CreateFlowState::CreateFlows; self.data.state = CreateFlowState::CreateFlows;
// determine flow type // determine flow type
self.data.flow_type = Some(determine_flow_type(&self.data.task)); self.data.flow_type = Some(get_flow_type_from_options(&self.data.task)?);
Ok(Status::executing(true)) Ok(Status::executing(true))
} }
@@ -196,8 +196,8 @@ impl CreateFlowProcedure {
}); });
} }
info!( info!(
"Creating flow({:?}) on flownodes with peers={:?}", "Creating flow({:?}, type={:?}) on flownodes with peers={:?}",
self.data.flow_id, self.data.peers self.data.flow_id, self.data.flow_type, self.data.peers
); );
join_all(create_flow) join_all(create_flow)
.await .await
@@ -306,8 +306,20 @@ impl Procedure for CreateFlowProcedure {
} }
} }
pub fn determine_flow_type(_flow_task: &CreateFlowTask) -> FlowType { pub fn get_flow_type_from_options(flow_task: &CreateFlowTask) -> Result<FlowType> {
FlowType::Batching let flow_type = flow_task
.flow_options
.get(FlowType::FLOW_TYPE_KEY)
.map(|s| s.as_str());
match flow_type {
Some(FlowType::BATCHING) => Ok(FlowType::Batching),
Some(FlowType::STREAMING) => Ok(FlowType::Streaming),
Some(unknown) => UnexpectedSnafu {
err_msg: format!("Unknown flow type: {}", unknown),
}
.fail(),
None => Ok(FlowType::Batching),
}
} }
/// The state of [CreateFlowProcedure]. /// The state of [CreateFlowProcedure].

View File

@@ -46,7 +46,7 @@ pub(crate) fn test_create_flow_task(
create_if_not_exists, create_if_not_exists,
expire_after: Some(300), expire_after: Some(300),
comment: "".to_string(), comment: "".to_string(),
sql: "raw_sql".to_string(), sql: "select 1".to_string(),
flow_options: Default::default(), flow_options: Default::default(),
} }
} }

View File

@@ -401,6 +401,13 @@ pub enum Error {
location: Location, location: Location,
}, },
#[snafu(display("Invalid flow request body: {:?}", body))]
InvalidFlowRequestBody {
body: Box<Option<api::v1::flow::flow_request::Body>>,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to get kv cache, err: {}", err_msg))] #[snafu(display("Failed to get kv cache, err: {}", err_msg))]
GetKvCache { err_msg: String }, GetKvCache { err_msg: String },
@@ -853,7 +860,8 @@ impl ErrorExt for Error {
| TlsConfig { .. } | TlsConfig { .. }
| InvalidSetDatabaseOption { .. } | InvalidSetDatabaseOption { .. }
| InvalidUnsetDatabaseOption { .. } | InvalidUnsetDatabaseOption { .. }
| InvalidTopicNamePrefix { .. } => StatusCode::InvalidArguments, | InvalidTopicNamePrefix { .. }
| InvalidFlowRequestBody { .. } => StatusCode::InvalidArguments,
FlowNotFound { .. } => StatusCode::FlowNotFound, FlowNotFound { .. } => StatusCode::FlowNotFound,
FlowRouteNotFound { .. } => StatusCode::Unexpected, FlowRouteNotFound { .. } => StatusCode::Unexpected,

View File

@@ -18,16 +18,19 @@ mod udaf;
use std::sync::Arc; use std::sync::Arc;
use api::v1::TableName;
use datafusion::catalog::CatalogProviderList; use datafusion::catalog::CatalogProviderList;
use datafusion::error::Result as DatafusionResult; use datafusion::error::Result as DatafusionResult;
use datafusion::logical_expr::{LogicalPlan, LogicalPlanBuilder}; use datafusion::logical_expr::{LogicalPlan, LogicalPlanBuilder};
use datafusion_common::Column; use datafusion_common::{Column, TableReference};
use datafusion_expr::col; use datafusion_expr::dml::InsertOp;
use datafusion_expr::{col, DmlStatement, WriteOp};
pub use expr::{build_filter_from_timestamp, build_same_type_ts_filter}; pub use expr::{build_filter_from_timestamp, build_same_type_ts_filter};
use snafu::ResultExt;
pub use self::accumulator::{Accumulator, AggregateFunctionCreator, AggregateFunctionCreatorRef}; pub use self::accumulator::{Accumulator, AggregateFunctionCreator, AggregateFunctionCreatorRef};
pub use self::udaf::AggregateFunction; pub use self::udaf::AggregateFunction;
use crate::error::Result; use crate::error::{GeneralDataFusionSnafu, Result};
use crate::logical_plan::accumulator::*; use crate::logical_plan::accumulator::*;
use crate::signature::{Signature, Volatility}; use crate::signature::{Signature, Volatility};
@@ -79,6 +82,74 @@ pub fn rename_logical_plan_columns(
LogicalPlanBuilder::from(plan).project(projection)?.build() LogicalPlanBuilder::from(plan).project(projection)?.build()
} }
/// Convert a insert into logical plan to an (table_name, logical_plan)
/// where table_name is the name of the table to insert into.
/// logical_plan is the plan to be executed.
///
/// if input logical plan is not `insert into table_name <input>`, return None
///
/// Returned TableName will use provided catalog and schema if not specified in the logical plan,
/// if table scan in logical plan have full table name, will **NOT** override it.
pub fn breakup_insert_plan(
plan: &LogicalPlan,
default_catalog: &str,
default_schema: &str,
) -> Option<(TableName, Arc<LogicalPlan>)> {
if let LogicalPlan::Dml(dml) = plan {
if dml.op != WriteOp::Insert(InsertOp::Append) {
return None;
}
let table_name = &dml.table_name;
let table_name = match table_name {
TableReference::Bare { table } => TableName {
catalog_name: default_catalog.to_string(),
schema_name: default_schema.to_string(),
table_name: table.to_string(),
},
TableReference::Partial { schema, table } => TableName {
catalog_name: default_catalog.to_string(),
schema_name: schema.to_string(),
table_name: table.to_string(),
},
TableReference::Full {
catalog,
schema,
table,
} => TableName {
catalog_name: catalog.to_string(),
schema_name: schema.to_string(),
table_name: table.to_string(),
},
};
let logical_plan = dml.input.clone();
Some((table_name, logical_plan))
} else {
None
}
}
/// create a `insert into table_name <input>` logical plan
pub fn add_insert_to_logical_plan(
table_name: TableName,
table_schema: datafusion_common::DFSchemaRef,
input: LogicalPlan,
) -> Result<LogicalPlan> {
let table_name = TableReference::Full {
catalog: table_name.catalog_name.into(),
schema: table_name.schema_name.into(),
table: table_name.table_name.into(),
};
let plan = LogicalPlan::Dml(DmlStatement::new(
table_name,
table_schema,
WriteOp::Insert(InsertOp::Append),
Arc::new(input),
));
let plan = plan.recompute_schema().context(GeneralDataFusionSnafu)?;
Ok(plan)
}
/// The datafusion `[LogicalPlan]` decoder. /// The datafusion `[LogicalPlan]` decoder.
#[async_trait::async_trait] #[async_trait::async_trait]
pub trait SubstraitPlanDecoder { pub trait SubstraitPlanDecoder {

View File

@@ -58,7 +58,7 @@ use crate::metrics::{METRIC_FLOW_INSERT_ELAPSED, METRIC_FLOW_ROWS, METRIC_FLOW_R
use crate::repr::{self, DiffRow, RelationDesc, Row, BATCH_SIZE}; use crate::repr::{self, DiffRow, RelationDesc, Row, BATCH_SIZE};
use crate::{CreateFlowArgs, FlowId, TableName}; use crate::{CreateFlowArgs, FlowId, TableName};
mod flownode_impl; pub(crate) mod flownode_impl;
mod parse_expr; mod parse_expr;
pub(crate) mod refill; pub(crate) mod refill;
mod stat; mod stat;
@@ -135,12 +135,14 @@ impl Configurable for FlownodeOptions {
} }
/// Arc-ed FlowNodeManager, cheaper to clone /// Arc-ed FlowNodeManager, cheaper to clone
pub type FlowWorkerManagerRef = Arc<FlowWorkerManager>; pub type FlowWorkerManagerRef = Arc<FlowStreamingEngine>;
/// FlowNodeManager manages the state of all tasks in the flow node, which should be run on the same thread /// FlowNodeManager manages the state of all tasks in the flow node, which should be run on the same thread
/// ///
/// The choice of timestamp is just using current system timestamp for now /// The choice of timestamp is just using current system timestamp for now
pub struct FlowWorkerManager { ///
/// TODO(discord9): rename to FlowStreamingEngine
pub struct FlowStreamingEngine {
/// The handler to the worker that will run the dataflow /// The handler to the worker that will run the dataflow
/// which is `!Send` so a handle is used /// which is `!Send` so a handle is used
pub worker_handles: Vec<WorkerHandle>, pub worker_handles: Vec<WorkerHandle>,
@@ -158,7 +160,8 @@ pub struct FlowWorkerManager {
flow_err_collectors: RwLock<BTreeMap<FlowId, ErrCollector>>, flow_err_collectors: RwLock<BTreeMap<FlowId, ErrCollector>>,
src_send_buf_lens: RwLock<BTreeMap<TableId, watch::Receiver<usize>>>, src_send_buf_lens: RwLock<BTreeMap<TableId, watch::Receiver<usize>>>,
tick_manager: FlowTickManager, tick_manager: FlowTickManager,
node_id: Option<u32>, /// This node id is only available in distributed mode, on standalone mode this is guaranteed to be `None`
pub node_id: Option<u32>,
/// Lock for flushing, will be `read` by `handle_inserts` and `write` by `flush_flow` /// Lock for flushing, will be `read` by `handle_inserts` and `write` by `flush_flow`
/// ///
/// So that a series of event like `inserts -> flush` can be handled correctly /// So that a series of event like `inserts -> flush` can be handled correctly
@@ -168,7 +171,7 @@ pub struct FlowWorkerManager {
} }
/// Building FlownodeManager /// Building FlownodeManager
impl FlowWorkerManager { impl FlowStreamingEngine {
/// set frontend invoker /// set frontend invoker
pub async fn set_frontend_invoker(&self, frontend: FrontendInvoker) { pub async fn set_frontend_invoker(&self, frontend: FrontendInvoker) {
*self.frontend_invoker.write().await = Some(frontend); *self.frontend_invoker.write().await = Some(frontend);
@@ -187,7 +190,7 @@ impl FlowWorkerManager {
let node_context = FlownodeContext::new(Box::new(srv_map.clone()) as _); let node_context = FlownodeContext::new(Box::new(srv_map.clone()) as _);
let tick_manager = FlowTickManager::new(); let tick_manager = FlowTickManager::new();
let worker_handles = Vec::new(); let worker_handles = Vec::new();
FlowWorkerManager { FlowStreamingEngine {
worker_handles, worker_handles,
worker_selector: Mutex::new(0), worker_selector: Mutex::new(0),
query_engine, query_engine,
@@ -263,7 +266,7 @@ pub fn batches_to_rows_req(batches: Vec<Batch>) -> Result<Vec<DiffRequest>, Erro
} }
/// This impl block contains methods to send writeback requests to frontend /// This impl block contains methods to send writeback requests to frontend
impl FlowWorkerManager { impl FlowStreamingEngine {
/// Return the number of requests it made /// Return the number of requests it made
pub async fn send_writeback_requests(&self) -> Result<usize, Error> { pub async fn send_writeback_requests(&self) -> Result<usize, Error> {
let all_reqs = self.generate_writeback_request().await?; let all_reqs = self.generate_writeback_request().await?;
@@ -534,7 +537,7 @@ impl FlowWorkerManager {
} }
/// Flow Runtime related methods /// Flow Runtime related methods
impl FlowWorkerManager { impl FlowStreamingEngine {
/// Start state report handler, which will receive a sender from HeartbeatTask to send state size report back /// Start state report handler, which will receive a sender from HeartbeatTask to send state size report back
/// ///
/// if heartbeat task is shutdown, this future will exit too /// if heartbeat task is shutdown, this future will exit too
@@ -728,7 +731,7 @@ impl FlowWorkerManager {
} }
/// Create&Remove flow /// Create&Remove flow
impl FlowWorkerManager { impl FlowStreamingEngine {
/// remove a flow by it's id /// remove a flow by it's id
pub async fn remove_flow_inner(&self, flow_id: FlowId) -> Result<(), Error> { pub async fn remove_flow_inner(&self, flow_id: FlowId) -> Result<(), Error> {
for handle in self.worker_handles.iter() { for handle in self.worker_handles.iter() {

View File

@@ -20,35 +20,379 @@ use api::v1::flow::{
flow_request, CreateRequest, DropRequest, FlowRequest, FlowResponse, FlushFlow, flow_request, CreateRequest, DropRequest, FlowRequest, FlowResponse, FlushFlow,
}; };
use api::v1::region::InsertRequests; use api::v1::region::InsertRequests;
use catalog::CatalogManager;
use common_error::ext::BoxedError; use common_error::ext::BoxedError;
use common_meta::ddl::create_flow::FlowType; use common_meta::ddl::create_flow::FlowType;
use common_meta::error::{Result as MetaResult, UnexpectedSnafu}; use common_meta::error::Result as MetaResult;
use common_meta::key::flow::FlowMetadataManager;
use common_runtime::JoinHandle; use common_runtime::JoinHandle;
use common_telemetry::{trace, warn}; use common_telemetry::{error, info, trace, warn};
use datatypes::value::Value; use datatypes::value::Value;
use futures::TryStreamExt;
use itertools::Itertools; use itertools::Itertools;
use snafu::{IntoError, OptionExt, ResultExt}; use session::context::QueryContextBuilder;
use snafu::{ensure, IntoError, OptionExt, ResultExt};
use store_api::storage::{RegionId, TableId}; use store_api::storage::{RegionId, TableId};
use tokio::sync::{Mutex, RwLock};
use crate::adapter::{CreateFlowArgs, FlowWorkerManager}; use crate::adapter::{CreateFlowArgs, FlowStreamingEngine};
use crate::batching_mode::engine::BatchingEngine; use crate::batching_mode::engine::BatchingEngine;
use crate::engine::FlowEngine; use crate::engine::FlowEngine;
use crate::error::{CreateFlowSnafu, FlowNotFoundSnafu, InsertIntoFlowSnafu, InternalSnafu}; use crate::error::{
CreateFlowSnafu, ExternalSnafu, FlowNotFoundSnafu, IllegalCheckTaskStateSnafu,
InsertIntoFlowSnafu, InternalSnafu, JoinTaskSnafu, ListFlowsSnafu, SyncCheckTaskSnafu,
UnexpectedSnafu,
};
use crate::metrics::METRIC_FLOW_TASK_COUNT; use crate::metrics::METRIC_FLOW_TASK_COUNT;
use crate::repr::{self, DiffRow}; use crate::repr::{self, DiffRow};
use crate::{Error, FlowId}; use crate::{Error, FlowId};
/// Ref to [`FlowDualEngine`]
pub type FlowDualEngineRef = Arc<FlowDualEngine>;
/// Manage both streaming and batching mode engine /// Manage both streaming and batching mode engine
/// ///
/// including create/drop/flush flow /// including create/drop/flush flow
/// and redirect insert requests to the appropriate engine /// and redirect insert requests to the appropriate engine
pub struct FlowDualEngine { pub struct FlowDualEngine {
streaming_engine: Arc<FlowWorkerManager>, streaming_engine: Arc<FlowStreamingEngine>,
batching_engine: Arc<BatchingEngine>, batching_engine: Arc<BatchingEngine>,
/// helper struct for faster query flow by table id or vice versa /// helper struct for faster query flow by table id or vice versa
src_table2flow: std::sync::RwLock<SrcTableToFlow>, src_table2flow: RwLock<SrcTableToFlow>,
flow_metadata_manager: Arc<FlowMetadataManager>,
catalog_manager: Arc<dyn CatalogManager>,
check_task: tokio::sync::Mutex<Option<ConsistentCheckTask>>,
} }
impl FlowDualEngine {
pub fn new(
streaming_engine: Arc<FlowStreamingEngine>,
batching_engine: Arc<BatchingEngine>,
flow_metadata_manager: Arc<FlowMetadataManager>,
catalog_manager: Arc<dyn CatalogManager>,
) -> Self {
Self {
streaming_engine,
batching_engine,
src_table2flow: RwLock::new(SrcTableToFlow::default()),
flow_metadata_manager,
catalog_manager,
check_task: Mutex::new(None),
}
}
pub fn streaming_engine(&self) -> Arc<FlowStreamingEngine> {
self.streaming_engine.clone()
}
pub fn batching_engine(&self) -> Arc<BatchingEngine> {
self.batching_engine.clone()
}
/// Try to sync with check task, this is only used in drop flow&flush flow, so a flow id is required
///
/// the need to sync is to make sure flush flow actually get called
async fn try_sync_with_check_task(
&self,
flow_id: FlowId,
allow_drop: bool,
) -> Result<(), Error> {
// this function rarely get called so adding some log is helpful
info!("Try to sync with check task for flow {}", flow_id);
let mut retry = 0;
let max_retry = 10;
// keep trying to trigger consistent check
while retry < max_retry {
if let Some(task) = self.check_task.lock().await.as_ref() {
task.trigger(false, allow_drop).await?;
break;
}
retry += 1;
tokio::time::sleep(std::time::Duration::from_millis(500)).await;
}
if retry == max_retry {
error!(
"Can't sync with check task for flow {} with allow_drop={}",
flow_id, allow_drop
);
return SyncCheckTaskSnafu {
flow_id,
allow_drop,
}
.fail();
}
info!("Successfully sync with check task for flow {}", flow_id);
Ok(())
}
/// Spawn a task to consistently check if all flow tasks in metasrv is created on flownode,
/// so on startup, this will create all missing flow tasks, and constantly check at a interval
async fn check_flow_consistent(
&self,
allow_create: bool,
allow_drop: bool,
) -> Result<(), Error> {
// use nodeid to determine if this is standalone/distributed mode, and retrieve all flows in this node(in distributed mode)/or all flows(in standalone mode)
let nodeid = self.streaming_engine.node_id;
let should_exists: Vec<_> = if let Some(nodeid) = nodeid {
// nodeid is available, so we only need to check flows on this node
// which also means we are in distributed mode
let to_be_recover = self
.flow_metadata_manager
.flownode_flow_manager()
.flows(nodeid.into())
.try_collect::<Vec<_>>()
.await
.context(ListFlowsSnafu {
id: Some(nodeid.into()),
})?;
to_be_recover.into_iter().map(|(id, _)| id).collect()
} else {
// nodeid is not available, so we need to check all flows
// which also means we are in standalone mode
let all_catalogs = self
.catalog_manager
.catalog_names()
.await
.map_err(BoxedError::new)
.context(ExternalSnafu)?;
let mut all_flow_ids = vec![];
for catalog in all_catalogs {
let flows = self
.flow_metadata_manager
.flow_name_manager()
.flow_names(&catalog)
.await
.try_collect::<Vec<_>>()
.await
.map_err(BoxedError::new)
.context(ExternalSnafu)?;
all_flow_ids.extend(flows.into_iter().map(|(_, id)| id.flow_id()));
}
all_flow_ids
};
let should_exists = should_exists
.into_iter()
.map(|i| i as FlowId)
.collect::<HashSet<_>>();
let actual_exists = self.list_flows().await?.into_iter().collect::<HashSet<_>>();
let to_be_created = should_exists
.iter()
.filter(|id| !actual_exists.contains(id))
.collect::<Vec<_>>();
let to_be_dropped = actual_exists
.iter()
.filter(|id| !should_exists.contains(id))
.collect::<Vec<_>>();
if !to_be_created.is_empty() {
if allow_create {
info!(
"Recovering {} flows: {:?}",
to_be_created.len(),
to_be_created
);
let mut errors = vec![];
for flow_id in to_be_created {
let flow_id = *flow_id;
let info = self
.flow_metadata_manager
.flow_info_manager()
.get(flow_id as u32)
.await
.map_err(BoxedError::new)
.context(ExternalSnafu)?
.context(FlowNotFoundSnafu { id: flow_id })?;
let sink_table_name = [
info.sink_table_name().catalog_name.clone(),
info.sink_table_name().schema_name.clone(),
info.sink_table_name().table_name.clone(),
];
let args = CreateFlowArgs {
flow_id,
sink_table_name,
source_table_ids: info.source_table_ids().to_vec(),
// because recover should only happen on restart the `create_if_not_exists` and `or_replace` can be arbitrary value(since flow doesn't exist)
// but for the sake of consistency and to make sure recover of flow actually happen, we set both to true
// (which is also fine since checks for not allow both to be true is on metasrv and we already pass that)
create_if_not_exists: true,
or_replace: true,
expire_after: info.expire_after(),
comment: Some(info.comment().clone()),
sql: info.raw_sql().clone(),
flow_options: info.options().clone(),
query_ctx: Some(
QueryContextBuilder::default()
.current_catalog(info.catalog_name().clone())
.build(),
),
};
if let Err(err) = self
.create_flow(args)
.await
.map_err(BoxedError::new)
.with_context(|_| CreateFlowSnafu {
sql: info.raw_sql().clone(),
})
{
errors.push((flow_id, err));
}
}
for (flow_id, err) in errors {
warn!("Failed to recreate flow {}, err={:#?}", flow_id, err);
}
} else {
warn!(
"Flownode {:?} found flows not exist in flownode, flow_ids={:?}",
nodeid, to_be_created
);
}
}
if !to_be_dropped.is_empty() {
if allow_drop {
info!("Dropping flows: {:?}", to_be_dropped);
let mut errors = vec![];
for flow_id in to_be_dropped {
let flow_id = *flow_id;
if let Err(err) = self.remove_flow(flow_id).await {
errors.push((flow_id, err));
}
}
for (flow_id, err) in errors {
warn!("Failed to drop flow {}, err={:#?}", flow_id, err);
}
} else {
warn!(
"Flownode {:?} found flows not exist in flownode, flow_ids={:?}",
nodeid, to_be_dropped
);
}
}
Ok(())
}
// TODO(discord9): consider sync this with heartbeat(might become necessary in the future)
pub async fn start_flow_consistent_check_task(self: &Arc<Self>) -> Result<(), Error> {
let mut check_task = self.check_task.lock().await;
ensure!(
check_task.is_none(),
IllegalCheckTaskStateSnafu {
reason: "Flow consistent check task already exists",
}
);
let task = ConsistentCheckTask::start_check_task(self).await?;
*check_task = Some(task);
Ok(())
}
pub async fn stop_flow_consistent_check_task(&self) -> Result<(), Error> {
info!("Stopping flow consistent check task");
let mut check_task = self.check_task.lock().await;
ensure!(
check_task.is_some(),
IllegalCheckTaskStateSnafu {
reason: "Flow consistent check task does not exist",
}
);
check_task.take().expect("Already checked").stop().await?;
info!("Stopped flow consistent check task");
Ok(())
}
async fn flow_exist_in_metadata(&self, flow_id: FlowId) -> Result<bool, Error> {
self.flow_metadata_manager
.flow_info_manager()
.get(flow_id as u32)
.await
.map_err(BoxedError::new)
.context(ExternalSnafu)
.map(|info| info.is_some())
}
}
struct ConsistentCheckTask {
handle: JoinHandle<()>,
shutdown_tx: tokio::sync::mpsc::Sender<()>,
trigger_tx: tokio::sync::mpsc::Sender<(bool, bool, tokio::sync::oneshot::Sender<()>)>,
}
impl ConsistentCheckTask {
async fn start_check_task(engine: &Arc<FlowDualEngine>) -> Result<Self, Error> {
// first do recover flows
engine.check_flow_consistent(true, false).await?;
let inner = engine.clone();
let (tx, mut rx) = tokio::sync::mpsc::channel(1);
let (trigger_tx, mut trigger_rx) =
tokio::sync::mpsc::channel::<(bool, bool, tokio::sync::oneshot::Sender<()>)>(10);
let handle = common_runtime::spawn_global(async move {
let mut args = (false, false);
let mut ret_signal: Option<tokio::sync::oneshot::Sender<()>> = None;
loop {
if let Err(err) = inner.check_flow_consistent(args.0, args.1).await {
error!(err; "Failed to check flow consistent");
}
if let Some(done) = ret_signal.take() {
let _ = done.send(());
}
tokio::select! {
_ = rx.recv() => break,
incoming = trigger_rx.recv() => if let Some(incoming) = incoming {
args = (incoming.0, incoming.1);
ret_signal = Some(incoming.2);
},
_ = tokio::time::sleep(std::time::Duration::from_secs(10)) => args=(false,false),
}
}
});
Ok(ConsistentCheckTask {
handle,
shutdown_tx: tx,
trigger_tx,
})
}
async fn trigger(&self, allow_create: bool, allow_drop: bool) -> Result<(), Error> {
let (tx, rx) = tokio::sync::oneshot::channel();
self.trigger_tx
.send((allow_create, allow_drop, tx))
.await
.map_err(|_| {
IllegalCheckTaskStateSnafu {
reason: "Failed to send trigger signal",
}
.build()
})?;
rx.await.map_err(|_| {
IllegalCheckTaskStateSnafu {
reason: "Failed to receive trigger signal",
}
.build()
})?;
Ok(())
}
async fn stop(self) -> Result<(), Error> {
self.shutdown_tx.send(()).await.map_err(|_| {
IllegalCheckTaskStateSnafu {
reason: "Failed to send shutdown signal",
}
.build()
})?;
// abort so no need to wait
self.handle.abort();
Ok(())
}
}
#[derive(Default)]
struct SrcTableToFlow { struct SrcTableToFlow {
/// mapping of table ids to flow ids for streaming mode /// mapping of table ids to flow ids for streaming mode
stream: HashMap<TableId, HashSet<FlowId>>, stream: HashMap<TableId, HashSet<FlowId>>,
@@ -138,35 +482,49 @@ impl FlowEngine for FlowDualEngine {
self.src_table2flow self.src_table2flow
.write() .write()
.unwrap() .await
.add_flow(flow_id, flow_type, src_table_ids); .add_flow(flow_id, flow_type, src_table_ids);
Ok(res) Ok(res)
} }
async fn remove_flow(&self, flow_id: FlowId) -> Result<(), Error> { async fn remove_flow(&self, flow_id: FlowId) -> Result<(), Error> {
let flow_type = self.src_table2flow.read().unwrap().get_flow_type(flow_id); let flow_type = self.src_table2flow.read().await.get_flow_type(flow_id);
match flow_type { match flow_type {
Some(FlowType::Batching) => self.batching_engine.remove_flow(flow_id).await, Some(FlowType::Batching) => self.batching_engine.remove_flow(flow_id).await,
Some(FlowType::Streaming) => self.streaming_engine.remove_flow(flow_id).await, Some(FlowType::Streaming) => self.streaming_engine.remove_flow(flow_id).await,
None => FlowNotFoundSnafu { id: flow_id }.fail(), None => {
// this can happen if flownode just restart, and is stilling creating the flow
// since now that this flow should dropped, we need to trigger the consistent check and allow drop
// this rely on drop flow ddl delete metadata first, see src/common/meta/src/ddl/drop_flow.rs
warn!(
"Flow {} is not exist in the underlying engine, but exist in metadata",
flow_id
);
self.try_sync_with_check_task(flow_id, true).await?;
Ok(())
}
}?; }?;
// remove mapping // remove mapping
self.src_table2flow.write().unwrap().remove_flow(flow_id); self.src_table2flow.write().await.remove_flow(flow_id);
Ok(()) Ok(())
} }
async fn flush_flow(&self, flow_id: FlowId) -> Result<usize, Error> { async fn flush_flow(&self, flow_id: FlowId) -> Result<usize, Error> {
let flow_type = self.src_table2flow.read().unwrap().get_flow_type(flow_id); // sync with check task
self.try_sync_with_check_task(flow_id, false).await?;
let flow_type = self.src_table2flow.read().await.get_flow_type(flow_id);
match flow_type { match flow_type {
Some(FlowType::Batching) => self.batching_engine.flush_flow(flow_id).await, Some(FlowType::Batching) => self.batching_engine.flush_flow(flow_id).await,
Some(FlowType::Streaming) => self.streaming_engine.flush_flow(flow_id).await, Some(FlowType::Streaming) => self.streaming_engine.flush_flow(flow_id).await,
None => FlowNotFoundSnafu { id: flow_id }.fail(), None => Ok(0),
} }
} }
async fn flow_exist(&self, flow_id: FlowId) -> Result<bool, Error> { async fn flow_exist(&self, flow_id: FlowId) -> Result<bool, Error> {
let flow_type = self.src_table2flow.read().unwrap().get_flow_type(flow_id); let flow_type = self.src_table2flow.read().await.get_flow_type(flow_id);
// not using `flow_type.is_some()` to make sure the flow is actually exist in the underlying engine // not using `flow_type.is_some()` to make sure the flow is actually exist in the underlying engine
match flow_type { match flow_type {
Some(FlowType::Batching) => self.batching_engine.flow_exist(flow_id).await, Some(FlowType::Batching) => self.batching_engine.flow_exist(flow_id).await,
@@ -175,6 +533,13 @@ impl FlowEngine for FlowDualEngine {
} }
} }
async fn list_flows(&self) -> Result<impl IntoIterator<Item = FlowId>, Error> {
let stream_flows = self.streaming_engine.list_flows().await?;
let batch_flows = self.batching_engine.list_flows().await?;
Ok(stream_flows.into_iter().chain(batch_flows))
}
async fn handle_flow_inserts( async fn handle_flow_inserts(
&self, &self,
request: api::v1::region::InsertRequests, request: api::v1::region::InsertRequests,
@@ -184,7 +549,7 @@ impl FlowEngine for FlowDualEngine {
let mut to_batch_engine = request.requests; let mut to_batch_engine = request.requests;
{ {
let src_table2flow = self.src_table2flow.read().unwrap(); let src_table2flow = self.src_table2flow.read().await;
to_batch_engine.retain(|req| { to_batch_engine.retain(|req| {
let region_id = RegionId::from(req.region_id); let region_id = RegionId::from(req.region_id);
let table_id = region_id.table_id(); let table_id = region_id.table_id();
@@ -221,12 +586,7 @@ impl FlowEngine for FlowDualEngine {
requests: to_batch_engine, requests: to_batch_engine,
}) })
.await?; .await?;
stream_handler.await.map_err(|e| { stream_handler.await.context(JoinTaskSnafu)??;
crate::error::UnexpectedSnafu {
reason: format!("JoinError when handle inserts for flow stream engine: {e:?}"),
}
.build()
})??;
Ok(()) Ok(())
} }
@@ -307,14 +667,7 @@ impl common_meta::node_manager::Flownode for FlowDualEngine {
..Default::default() ..Default::default()
}) })
} }
None => UnexpectedSnafu { other => common_meta::error::InvalidFlowRequestBodySnafu { body: other }.fail(),
err_msg: "Missing request body",
}
.fail(),
_ => UnexpectedSnafu {
err_msg: "Invalid request body.",
}
.fail(),
} }
} }
@@ -339,7 +692,7 @@ fn to_meta_err(
} }
#[async_trait::async_trait] #[async_trait::async_trait]
impl common_meta::node_manager::Flownode for FlowWorkerManager { impl common_meta::node_manager::Flownode for FlowStreamingEngine {
async fn handle(&self, request: FlowRequest) -> MetaResult<FlowResponse> { async fn handle(&self, request: FlowRequest) -> MetaResult<FlowResponse> {
let query_ctx = request let query_ctx = request
.header .header
@@ -413,14 +766,7 @@ impl common_meta::node_manager::Flownode for FlowWorkerManager {
..Default::default() ..Default::default()
}) })
} }
None => UnexpectedSnafu { other => common_meta::error::InvalidFlowRequestBodySnafu { body: other }.fail(),
err_msg: "Missing request body",
}
.fail(),
_ => UnexpectedSnafu {
err_msg: "Invalid request body.",
}
.fail(),
} }
} }
@@ -432,7 +778,7 @@ impl common_meta::node_manager::Flownode for FlowWorkerManager {
} }
} }
impl FlowEngine for FlowWorkerManager { impl FlowEngine for FlowStreamingEngine {
async fn create_flow(&self, args: CreateFlowArgs) -> Result<Option<FlowId>, Error> { async fn create_flow(&self, args: CreateFlowArgs) -> Result<Option<FlowId>, Error> {
self.create_flow_inner(args).await self.create_flow_inner(args).await
} }
@@ -449,6 +795,16 @@ impl FlowEngine for FlowWorkerManager {
self.flow_exist_inner(flow_id).await self.flow_exist_inner(flow_id).await
} }
async fn list_flows(&self) -> Result<impl IntoIterator<Item = FlowId>, Error> {
Ok(self
.flow_err_collectors
.read()
.await
.keys()
.cloned()
.collect::<Vec<_>>())
}
async fn handle_flow_inserts( async fn handle_flow_inserts(
&self, &self,
request: api::v1::region::InsertRequests, request: api::v1::region::InsertRequests,
@@ -474,7 +830,7 @@ impl FetchFromRow {
} }
} }
impl FlowWorkerManager { impl FlowStreamingEngine {
async fn handle_inserts_inner( async fn handle_inserts_inner(
&self, &self,
request: InsertRequests, request: InsertRequests,
@@ -552,7 +908,7 @@ impl FlowWorkerManager {
.copied() .copied()
.map(FetchFromRow::Idx) .map(FetchFromRow::Idx)
.or_else(|| col_default_val.clone().map(FetchFromRow::Default)) .or_else(|| col_default_val.clone().map(FetchFromRow::Default))
.with_context(|| crate::error::UnexpectedSnafu { .with_context(|| UnexpectedSnafu {
reason: format!( reason: format!(
"Column not found: {}, default_value: {:?}", "Column not found: {}, default_value: {:?}",
col_name, col_default_val col_name, col_default_val

View File

@@ -31,7 +31,7 @@ use snafu::{ensure, OptionExt, ResultExt};
use table::metadata::TableId; use table::metadata::TableId;
use crate::adapter::table_source::ManagedTableSource; use crate::adapter::table_source::ManagedTableSource;
use crate::adapter::{FlowId, FlowWorkerManager, FlowWorkerManagerRef}; use crate::adapter::{FlowId, FlowStreamingEngine, FlowWorkerManagerRef};
use crate::error::{FlowNotFoundSnafu, JoinTaskSnafu, UnexpectedSnafu}; use crate::error::{FlowNotFoundSnafu, JoinTaskSnafu, UnexpectedSnafu};
use crate::expr::error::ExternalSnafu; use crate::expr::error::ExternalSnafu;
use crate::expr::utils::find_plan_time_window_expr_lower_bound; use crate::expr::utils::find_plan_time_window_expr_lower_bound;
@@ -39,7 +39,7 @@ use crate::repr::RelationDesc;
use crate::server::get_all_flow_ids; use crate::server::get_all_flow_ids;
use crate::{Error, FrontendInvoker}; use crate::{Error, FrontendInvoker};
impl FlowWorkerManager { impl FlowStreamingEngine {
/// Create and start refill flow tasks in background /// Create and start refill flow tasks in background
pub async fn create_and_start_refill_flow_tasks( pub async fn create_and_start_refill_flow_tasks(
self: &FlowWorkerManagerRef, self: &FlowWorkerManagerRef,

View File

@@ -16,9 +16,9 @@ use std::collections::BTreeMap;
use common_meta::key::flow::flow_state::FlowStat; use common_meta::key::flow::flow_state::FlowStat;
use crate::FlowWorkerManager; use crate::FlowStreamingEngine;
impl FlowWorkerManager { impl FlowStreamingEngine {
pub async fn gen_state_report(&self) -> FlowStat { pub async fn gen_state_report(&self) -> FlowStat {
let mut full_report = BTreeMap::new(); let mut full_report = BTreeMap::new();
let mut last_exec_time_map = BTreeMap::new(); let mut last_exec_time_map = BTreeMap::new();

View File

@@ -33,8 +33,8 @@ use crate::adapter::table_source::TableDesc;
use crate::adapter::{TableName, WorkerHandle, AUTO_CREATED_PLACEHOLDER_TS_COL}; use crate::adapter::{TableName, WorkerHandle, AUTO_CREATED_PLACEHOLDER_TS_COL};
use crate::error::{Error, ExternalSnafu, UnexpectedSnafu}; use crate::error::{Error, ExternalSnafu, UnexpectedSnafu};
use crate::repr::{ColumnType, RelationDesc, RelationType}; use crate::repr::{ColumnType, RelationDesc, RelationType};
use crate::FlowWorkerManager; use crate::FlowStreamingEngine;
impl FlowWorkerManager { impl FlowStreamingEngine {
/// Get a worker handle for creating flow, using round robin to select a worker /// Get a worker handle for creating flow, using round robin to select a worker
pub(crate) async fn get_worker_handle_for_create_flow(&self) -> &WorkerHandle { pub(crate) async fn get_worker_handle_for_create_flow(&self) -> &WorkerHandle {
let use_idx = { let use_idx = {

View File

@@ -17,14 +17,16 @@
use std::collections::{BTreeMap, HashMap}; use std::collections::{BTreeMap, HashMap};
use std::sync::Arc; use std::sync::Arc;
use catalog::CatalogManagerRef;
use common_error::ext::BoxedError; use common_error::ext::BoxedError;
use common_meta::ddl::create_flow::FlowType; use common_meta::ddl::create_flow::FlowType;
use common_meta::key::flow::FlowMetadataManagerRef; use common_meta::key::flow::FlowMetadataManagerRef;
use common_meta::key::table_info::TableInfoManager; use common_meta::key::table_info::{TableInfoManager, TableInfoValue};
use common_meta::key::TableMetadataManagerRef; use common_meta::key::TableMetadataManagerRef;
use common_runtime::JoinHandle; use common_runtime::JoinHandle;
use common_telemetry::info;
use common_telemetry::tracing::warn; use common_telemetry::tracing::warn;
use common_telemetry::{debug, info};
use common_time::TimeToLive;
use query::QueryEngineRef; use query::QueryEngineRef;
use snafu::{ensure, OptionExt, ResultExt}; use snafu::{ensure, OptionExt, ResultExt};
use store_api::storage::RegionId; use store_api::storage::RegionId;
@@ -36,7 +38,9 @@ use crate::batching_mode::task::BatchingTask;
use crate::batching_mode::time_window::{find_time_window_expr, TimeWindowExpr}; use crate::batching_mode::time_window::{find_time_window_expr, TimeWindowExpr};
use crate::batching_mode::utils::sql_to_df_plan; use crate::batching_mode::utils::sql_to_df_plan;
use crate::engine::FlowEngine; use crate::engine::FlowEngine;
use crate::error::{ExternalSnafu, FlowAlreadyExistSnafu, TableNotFoundMetaSnafu, UnexpectedSnafu}; use crate::error::{
ExternalSnafu, FlowAlreadyExistSnafu, TableNotFoundMetaSnafu, UnexpectedSnafu, UnsupportedSnafu,
};
use crate::{CreateFlowArgs, Error, FlowId, TableName}; use crate::{CreateFlowArgs, Error, FlowId, TableName};
/// Batching mode Engine, responsible for driving all the batching mode tasks /// Batching mode Engine, responsible for driving all the batching mode tasks
@@ -48,6 +52,7 @@ pub struct BatchingEngine {
frontend_client: Arc<FrontendClient>, frontend_client: Arc<FrontendClient>,
flow_metadata_manager: FlowMetadataManagerRef, flow_metadata_manager: FlowMetadataManagerRef,
table_meta: TableMetadataManagerRef, table_meta: TableMetadataManagerRef,
catalog_manager: CatalogManagerRef,
query_engine: QueryEngineRef, query_engine: QueryEngineRef,
} }
@@ -57,6 +62,7 @@ impl BatchingEngine {
query_engine: QueryEngineRef, query_engine: QueryEngineRef,
flow_metadata_manager: FlowMetadataManagerRef, flow_metadata_manager: FlowMetadataManagerRef,
table_meta: TableMetadataManagerRef, table_meta: TableMetadataManagerRef,
catalog_manager: CatalogManagerRef,
) -> Self { ) -> Self {
Self { Self {
tasks: Default::default(), tasks: Default::default(),
@@ -64,6 +70,7 @@ impl BatchingEngine {
frontend_client, frontend_client,
flow_metadata_manager, flow_metadata_manager,
table_meta, table_meta,
catalog_manager,
query_engine, query_engine,
} }
} }
@@ -179,6 +186,16 @@ async fn get_table_name(
table_info: &TableInfoManager, table_info: &TableInfoManager,
table_id: &TableId, table_id: &TableId,
) -> Result<TableName, Error> { ) -> Result<TableName, Error> {
get_table_info(table_info, table_id).await.map(|info| {
let name = info.table_name();
[name.catalog_name, name.schema_name, name.table_name]
})
}
async fn get_table_info(
table_info: &TableInfoManager,
table_id: &TableId,
) -> Result<TableInfoValue, Error> {
table_info table_info
.get(*table_id) .get(*table_id)
.await .await
@@ -187,8 +204,7 @@ async fn get_table_name(
.with_context(|| UnexpectedSnafu { .with_context(|| UnexpectedSnafu {
reason: format!("Table id = {:?}, couldn't found table name", table_id), reason: format!("Table id = {:?}, couldn't found table name", table_id),
}) })
.map(|name| name.table_name()) .map(|info| info.into_inner())
.map(|name| [name.catalog_name, name.schema_name, name.table_name])
} }
impl BatchingEngine { impl BatchingEngine {
@@ -248,7 +264,19 @@ impl BatchingEngine {
let query_ctx = Arc::new(query_ctx); let query_ctx = Arc::new(query_ctx);
let mut source_table_names = Vec::with_capacity(2); let mut source_table_names = Vec::with_capacity(2);
for src_id in source_table_ids { for src_id in source_table_ids {
// also check table option to see if ttl!=instant
let table_name = get_table_name(self.table_meta.table_info_manager(), &src_id).await?; let table_name = get_table_name(self.table_meta.table_info_manager(), &src_id).await?;
let table_info = get_table_info(self.table_meta.table_info_manager(), &src_id).await?;
if table_info.table_info.meta.options.ttl == Some(TimeToLive::Instant) {
UnsupportedSnafu {
reason: format!(
"Source table `{}`(id={}) has instant TTL, Instant TTL is not supported under batching mode. Consider using a TTL longer than flush interval",
table_name.join("."),
src_id
),
}
.fail()?;
}
source_table_names.push(table_name); source_table_names.push(table_name);
} }
@@ -273,7 +301,14 @@ impl BatchingEngine {
}) })
.transpose()?; .transpose()?;
info!("Flow id={}, found time window expr={:?}", flow_id, phy_expr); info!(
"Flow id={}, found time window expr={}",
flow_id,
phy_expr
.as_ref()
.map(|phy_expr| phy_expr.to_string())
.unwrap_or("None".to_string())
);
let task = BatchingTask::new( let task = BatchingTask::new(
flow_id, flow_id,
@@ -284,7 +319,7 @@ impl BatchingEngine {
sink_table_name, sink_table_name,
source_table_names, source_table_names,
query_ctx, query_ctx,
self.table_meta.clone(), self.catalog_manager.clone(),
rx, rx,
); );
@@ -295,10 +330,11 @@ impl BatchingEngine {
// check execute once first to detect any error early // check execute once first to detect any error early
task.check_execute(&engine, &frontend).await?; task.check_execute(&engine, &frontend).await?;
// TODO(discord9): also save handle & use time wheel or what for better // TODO(discord9): use time wheel or what for better
let _handle = common_runtime::spawn_global(async move { let handle = common_runtime::spawn_global(async move {
task_inner.start_executing_loop(engine, frontend).await; task_inner.start_executing_loop(engine, frontend).await;
}); });
task.state.write().unwrap().task_handle = Some(handle);
// only replace here not earlier because we want the old one intact if something went wrong before this line // only replace here not earlier because we want the old one intact if something went wrong before this line
let replaced_old_task_opt = self.tasks.write().await.insert(flow_id, task); let replaced_old_task_opt = self.tasks.write().await.insert(flow_id, task);
@@ -326,15 +362,23 @@ impl BatchingEngine {
} }
pub async fn flush_flow_inner(&self, flow_id: FlowId) -> Result<usize, Error> { pub async fn flush_flow_inner(&self, flow_id: FlowId) -> Result<usize, Error> {
debug!("Try flush flow {flow_id}");
let task = self.tasks.read().await.get(&flow_id).cloned(); let task = self.tasks.read().await.get(&flow_id).cloned();
let task = task.with_context(|| UnexpectedSnafu { let task = task.with_context(|| UnexpectedSnafu {
reason: format!("Can't found task for flow {flow_id}"), reason: format!("Can't found task for flow {flow_id}"),
})?; })?;
task.mark_all_windows_as_dirty()?;
let res = task let res = task
.gen_exec_once(&self.query_engine, &self.frontend_client) .gen_exec_once(&self.query_engine, &self.frontend_client)
.await?; .await?;
let affected_rows = res.map(|(r, _)| r).unwrap_or_default() as usize; let affected_rows = res.map(|(r, _)| r).unwrap_or_default() as usize;
debug!(
"Successfully flush flow {flow_id}, affected rows={}",
affected_rows
);
Ok(affected_rows) Ok(affected_rows)
} }
@@ -357,6 +401,9 @@ impl FlowEngine for BatchingEngine {
async fn flow_exist(&self, flow_id: FlowId) -> Result<bool, Error> { async fn flow_exist(&self, flow_id: FlowId) -> Result<bool, Error> {
Ok(self.flow_exist_inner(flow_id).await) Ok(self.flow_exist_inner(flow_id).await)
} }
async fn list_flows(&self) -> Result<impl IntoIterator<Item = FlowId>, Error> {
Ok(self.tasks.read().await.keys().cloned().collect::<Vec<_>>())
}
async fn handle_flow_inserts( async fn handle_flow_inserts(
&self, &self,
request: api::v1::region::InsertRequests, request: api::v1::region::InsertRequests,

View File

@@ -14,44 +14,105 @@
//! Frontend client to run flow as batching task which is time-window-aware normal query triggered every tick set by user //! Frontend client to run flow as batching task which is time-window-aware normal query triggered every tick set by user
use std::sync::Arc; use std::sync::{Arc, Weak};
use client::{Client, Database, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use api::v1::greptime_request::Request;
use common_error::ext::BoxedError; use api::v1::CreateTableExpr;
use client::{Client, Database};
use common_error::ext::{BoxedError, ErrorExt};
use common_grpc::channel_manager::{ChannelConfig, ChannelManager}; use common_grpc::channel_manager::{ChannelConfig, ChannelManager};
use common_meta::cluster::{NodeInfo, NodeInfoKey, Role}; use common_meta::cluster::{NodeInfo, NodeInfoKey, Role};
use common_meta::peer::Peer; use common_meta::peer::Peer;
use common_meta::rpc::store::RangeRequest; use common_meta::rpc::store::RangeRequest;
use common_query::Output;
use meta_client::client::MetaClient; use meta_client::client::MetaClient;
use snafu::ResultExt; use servers::query_handler::grpc::GrpcQueryHandler;
use session::context::{QueryContextBuilder, QueryContextRef};
use snafu::{OptionExt, ResultExt};
use crate::batching_mode::DEFAULT_BATCHING_ENGINE_QUERY_TIMEOUT; use crate::batching_mode::DEFAULT_BATCHING_ENGINE_QUERY_TIMEOUT;
use crate::error::{ExternalSnafu, UnexpectedSnafu}; use crate::error::{ExternalSnafu, InvalidRequestSnafu, UnexpectedSnafu};
use crate::Error; use crate::Error;
fn default_channel_mgr() -> ChannelManager { /// Just like [`GrpcQueryHandler`] but use BoxedError
let cfg = ChannelConfig::new().timeout(DEFAULT_BATCHING_ENGINE_QUERY_TIMEOUT); ///
ChannelManager::with_config(cfg) /// basically just a specialized `GrpcQueryHandler<Error=BoxedError>`
///
/// this is only useful for flownode to
/// invoke frontend Instance in standalone mode
#[async_trait::async_trait]
pub trait GrpcQueryHandlerWithBoxedError: Send + Sync + 'static {
async fn do_query(
&self,
query: Request,
ctx: QueryContextRef,
) -> std::result::Result<Output, BoxedError>;
} }
fn client_from_urls(addrs: Vec<String>) -> Client { /// auto impl
Client::with_manager_and_urls(default_channel_mgr(), addrs) #[async_trait::async_trait]
impl<
E: ErrorExt + Send + Sync + 'static,
T: GrpcQueryHandler<Error = E> + Send + Sync + 'static,
> GrpcQueryHandlerWithBoxedError for T
{
async fn do_query(
&self,
query: Request,
ctx: QueryContextRef,
) -> std::result::Result<Output, BoxedError> {
self.do_query(query, ctx).await.map_err(BoxedError::new)
}
} }
type HandlerMutable = Arc<std::sync::Mutex<Option<Weak<dyn GrpcQueryHandlerWithBoxedError>>>>;
/// A simple frontend client able to execute sql using grpc protocol /// A simple frontend client able to execute sql using grpc protocol
#[derive(Debug)] ///
/// This is for computation-heavy query which need to offload computation to frontend, lifting the load from flownode
#[derive(Debug, Clone)]
pub enum FrontendClient { pub enum FrontendClient {
Distributed { Distributed {
meta_client: Arc<MetaClient>, meta_client: Arc<MetaClient>,
chnl_mgr: ChannelManager,
}, },
Standalone { Standalone {
/// for the sake of simplicity still use grpc even in standalone mode /// for the sake of simplicity still use grpc even in standalone mode
/// notice the client here should all be lazy, so that can wait after frontend is booted then make conn /// notice the client here should all be lazy, so that can wait after frontend is booted then make conn
/// TODO(discord9): not use grpc under standalone mode /// TODO(discord9): not use grpc under standalone mode
database_client: DatabaseWithPeer, database_client: HandlerMutable,
}, },
} }
impl FrontendClient {
/// Create a new empty frontend client, with a `HandlerMutable` to set the grpc handler later
pub fn from_empty_grpc_handler() -> (Self, HandlerMutable) {
let handler = Arc::new(std::sync::Mutex::new(None));
(
Self::Standalone {
database_client: handler.clone(),
},
handler,
)
}
pub fn from_meta_client(meta_client: Arc<MetaClient>) -> Self {
Self::Distributed {
meta_client,
chnl_mgr: {
let cfg = ChannelConfig::new().timeout(DEFAULT_BATCHING_ENGINE_QUERY_TIMEOUT);
ChannelManager::with_config(cfg)
},
}
}
pub fn from_grpc_handler(grpc_handler: Weak<dyn GrpcQueryHandlerWithBoxedError>) -> Self {
Self::Standalone {
database_client: Arc::new(std::sync::Mutex::new(Some(grpc_handler))),
}
}
}
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct DatabaseWithPeer { pub struct DatabaseWithPeer {
pub database: Database, pub database: Database,
@@ -64,25 +125,6 @@ impl DatabaseWithPeer {
} }
} }
impl FrontendClient {
pub fn from_meta_client(meta_client: Arc<MetaClient>) -> Self {
Self::Distributed { meta_client }
}
pub fn from_static_grpc_addr(addr: String) -> Self {
let peer = Peer {
id: 0,
addr: addr.clone(),
};
let client = client_from_urls(vec![addr]);
let database = Database::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, client);
Self::Standalone {
database_client: DatabaseWithPeer::new(database, peer),
}
}
}
impl FrontendClient { impl FrontendClient {
async fn scan_for_frontend(&self) -> Result<Vec<(NodeInfoKey, NodeInfo)>, Error> { async fn scan_for_frontend(&self) -> Result<Vec<(NodeInfoKey, NodeInfo)>, Error> {
let Self::Distributed { meta_client, .. } = self else { let Self::Distributed { meta_client, .. } = self else {
@@ -115,10 +157,21 @@ impl FrontendClient {
} }
/// Get the database with max `last_activity_ts` /// Get the database with max `last_activity_ts`
async fn get_last_active_frontend(&self) -> Result<DatabaseWithPeer, Error> { async fn get_last_active_frontend(
if let Self::Standalone { database_client } = self { &self,
return Ok(database_client.clone()); catalog: &str,
schema: &str,
) -> Result<DatabaseWithPeer, Error> {
let Self::Distributed {
meta_client: _,
chnl_mgr,
} = self
else {
return UnexpectedSnafu {
reason: "Expect distributed mode",
} }
.fail();
};
let frontends = self.scan_for_frontend().await?; let frontends = self.scan_for_frontend().await?;
let mut peer = None; let mut peer = None;
@@ -133,16 +186,119 @@ impl FrontendClient {
} }
.fail()? .fail()?
}; };
let client = client_from_urls(vec![peer.addr.clone()]); let client = Client::with_manager_and_urls(chnl_mgr.clone(), vec![peer.addr.clone()]);
let database = Database::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, client); let database = Database::new(catalog, schema, client);
Ok(DatabaseWithPeer::new(database, peer)) Ok(DatabaseWithPeer::new(database, peer))
} }
/// Get a database client, and possibly update it before returning. pub async fn create(
pub async fn get_database_client(&self) -> Result<DatabaseWithPeer, Error> { &self,
create: CreateTableExpr,
catalog: &str,
schema: &str,
) -> Result<u32, Error> {
self.handle(
Request::Ddl(api::v1::DdlRequest {
expr: Some(api::v1::ddl_request::Expr::CreateTable(create)),
}),
catalog,
schema,
&mut None,
)
.await
}
/// Handle a request to frontend
pub(crate) async fn handle(
&self,
req: api::v1::greptime_request::Request,
catalog: &str,
schema: &str,
peer_desc: &mut Option<PeerDesc>,
) -> Result<u32, Error> {
match self { match self {
Self::Standalone { database_client } => Ok(database_client.clone()), FrontendClient::Distributed { .. } => {
Self::Distributed { meta_client: _ } => self.get_last_active_frontend().await, let db = self.get_last_active_frontend(catalog, schema).await?;
*peer_desc = Some(PeerDesc::Dist {
peer: db.peer.clone(),
});
db.database
.handle(req.clone())
.await
.with_context(|_| InvalidRequestSnafu {
context: format!("Failed to handle request: {:?}", req),
})
}
FrontendClient::Standalone { database_client } => {
let ctx = QueryContextBuilder::default()
.current_catalog(catalog.to_string())
.current_schema(schema.to_string())
.build();
let ctx = Arc::new(ctx);
{
let database_client = {
database_client
.lock()
.map_err(|e| {
UnexpectedSnafu {
reason: format!("Failed to lock database client: {e}"),
}
.build()
})?
.as_ref()
.context(UnexpectedSnafu {
reason: "Standalone's frontend instance is not set",
})?
.upgrade()
.context(UnexpectedSnafu {
reason: "Failed to upgrade database client",
})?
};
let resp: common_query::Output = database_client
.do_query(req.clone(), ctx)
.await
.map_err(BoxedError::new)
.context(ExternalSnafu)?;
match resp.data {
common_query::OutputData::AffectedRows(rows) => {
Ok(rows.try_into().map_err(|_| {
UnexpectedSnafu {
reason: format!("Failed to convert rows to u32: {}", rows),
}
.build()
})?)
}
_ => UnexpectedSnafu {
reason: "Unexpected output data",
}
.fail(),
}
}
}
}
}
}
/// Describe a peer of frontend
#[derive(Debug, Default)]
pub(crate) enum PeerDesc {
/// Distributed mode's frontend peer address
Dist {
/// frontend peer address
peer: Peer,
},
/// Standalone mode
#[default]
Standalone,
}
impl std::fmt::Display for PeerDesc {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
PeerDesc::Dist { peer } => write!(f, "{}", peer.addr),
PeerDesc::Standalone => write!(f, "standalone"),
} }
} }
} }

View File

@@ -22,13 +22,14 @@ use common_telemetry::tracing::warn;
use common_time::Timestamp; use common_time::Timestamp;
use datatypes::value::Value; use datatypes::value::Value;
use session::context::QueryContextRef; use session::context::QueryContextRef;
use snafu::ResultExt; use snafu::{OptionExt, ResultExt};
use tokio::sync::oneshot; use tokio::sync::oneshot;
use tokio::time::Instant; use tokio::time::Instant;
use crate::batching_mode::task::BatchingTask; use crate::batching_mode::task::BatchingTask;
use crate::batching_mode::time_window::TimeWindowExpr;
use crate::batching_mode::MIN_REFRESH_DURATION; use crate::batching_mode::MIN_REFRESH_DURATION;
use crate::error::{DatatypesSnafu, InternalSnafu, TimeSnafu}; use crate::error::{DatatypesSnafu, InternalSnafu, TimeSnafu, UnexpectedSnafu};
use crate::{Error, FlowId}; use crate::{Error, FlowId};
/// The state of the [`BatchingTask`]. /// The state of the [`BatchingTask`].
@@ -46,6 +47,8 @@ pub struct TaskState {
exec_state: ExecState, exec_state: ExecState,
/// Shutdown receiver /// Shutdown receiver
pub(crate) shutdown_rx: oneshot::Receiver<()>, pub(crate) shutdown_rx: oneshot::Receiver<()>,
/// Task handle
pub(crate) task_handle: Option<tokio::task::JoinHandle<()>>,
} }
impl TaskState { impl TaskState {
pub fn new(query_ctx: QueryContextRef, shutdown_rx: oneshot::Receiver<()>) -> Self { pub fn new(query_ctx: QueryContextRef, shutdown_rx: oneshot::Receiver<()>) -> Self {
@@ -56,6 +59,7 @@ impl TaskState {
dirty_time_windows: Default::default(), dirty_time_windows: Default::default(),
exec_state: ExecState::Idle, exec_state: ExecState::Idle,
shutdown_rx, shutdown_rx,
task_handle: None,
} }
} }
@@ -70,7 +74,11 @@ impl TaskState {
/// wait for at least `last_query_duration`, at most `max_timeout` to start next query /// wait for at least `last_query_duration`, at most `max_timeout` to start next query
/// ///
/// if have more dirty time window, exec next query immediately /// if have more dirty time window, exec next query immediately
pub fn get_next_start_query_time(&self, max_timeout: Option<Duration>) -> Instant { pub fn get_next_start_query_time(
&self,
flow_id: FlowId,
max_timeout: Option<Duration>,
) -> Instant {
let next_duration = max_timeout let next_duration = max_timeout
.unwrap_or(self.last_query_duration) .unwrap_or(self.last_query_duration)
.min(self.last_query_duration); .min(self.last_query_duration);
@@ -80,6 +88,12 @@ impl TaskState {
if self.dirty_time_windows.windows.is_empty() { if self.dirty_time_windows.windows.is_empty() {
self.last_update_time + next_duration self.last_update_time + next_duration
} else { } else {
debug!(
"Flow id = {}, still have {} dirty time window({:?}), execute immediately",
flow_id,
self.dirty_time_windows.windows.len(),
self.dirty_time_windows.windows
);
Instant::now() Instant::now()
} }
} }
@@ -115,6 +129,15 @@ impl DirtyTimeWindows {
} }
} }
pub fn add_window(&mut self, start: Timestamp, end: Option<Timestamp>) {
self.windows.insert(start, end);
}
/// Clean all dirty time windows, useful when can't found time window expr
pub fn clean(&mut self) {
self.windows.clear();
}
/// Generate all filter expressions consuming all time windows /// Generate all filter expressions consuming all time windows
pub fn gen_filter_exprs( pub fn gen_filter_exprs(
&mut self, &mut self,
@@ -177,6 +200,18 @@ impl DirtyTimeWindows {
let mut expr_lst = vec![]; let mut expr_lst = vec![];
for (start, end) in first_nth.into_iter() { for (start, end) in first_nth.into_iter() {
// align using time window exprs
let (start, end) = if let Some(ctx) = task_ctx {
let Some(time_window_expr) = &ctx.config.time_window_expr else {
UnexpectedSnafu {
reason: "time_window_expr is not set",
}
.fail()?
};
self.align_time_window(start, end, time_window_expr)?
} else {
(start, end)
};
debug!( debug!(
"Time window start: {:?}, end: {:?}", "Time window start: {:?}, end: {:?}",
start.to_iso8601_string(), start.to_iso8601_string(),
@@ -199,6 +234,30 @@ impl DirtyTimeWindows {
Ok(expr) Ok(expr)
} }
fn align_time_window(
&self,
start: Timestamp,
end: Option<Timestamp>,
time_window_expr: &TimeWindowExpr,
) -> Result<(Timestamp, Option<Timestamp>), Error> {
let align_start = time_window_expr.eval(start)?.0.context(UnexpectedSnafu {
reason: format!(
"Failed to align start time {:?} with time window expr {:?}",
start, time_window_expr
),
})?;
let align_end = end
.and_then(|end| {
time_window_expr
.eval(end)
// if after aligned, end is the same, then use end(because it's already aligned) else use aligned end
.map(|r| if r.0 == Some(end) { r.0 } else { r.1 })
.transpose()
})
.transpose()?;
Ok((align_start, align_end))
}
/// Merge time windows that overlaps or get too close /// Merge time windows that overlaps or get too close
pub fn merge_dirty_time_windows( pub fn merge_dirty_time_windows(
&mut self, &mut self,
@@ -287,8 +346,12 @@ enum ExecState {
#[cfg(test)] #[cfg(test)]
mod test { mod test {
use pretty_assertions::assert_eq; use pretty_assertions::assert_eq;
use session::context::QueryContext;
use super::*; use super::*;
use crate::batching_mode::time_window::find_time_window_expr;
use crate::batching_mode::utils::sql_to_df_plan;
use crate::test_utils::create_test_query_engine;
#[test] #[test]
fn test_merge_dirty_time_windows() { fn test_merge_dirty_time_windows() {
@@ -404,4 +467,59 @@ mod test {
assert_eq!(expected_filter_expr, to_sql.as_deref()); assert_eq!(expected_filter_expr, to_sql.as_deref());
} }
} }
#[tokio::test]
async fn test_align_time_window() {
type TimeWindow = (Timestamp, Option<Timestamp>);
struct TestCase {
sql: String,
aligns: Vec<(TimeWindow, TimeWindow)>,
}
let testcases: Vec<TestCase> = vec![TestCase{
sql: "SELECT date_bin(INTERVAL '5 second', ts) AS time_window FROM numbers_with_ts GROUP BY time_window;".to_string(),
aligns: vec![
((Timestamp::new_second(3), None), (Timestamp::new_second(0), None)),
((Timestamp::new_second(8), None), (Timestamp::new_second(5), None)),
((Timestamp::new_second(8), Some(Timestamp::new_second(10))), (Timestamp::new_second(5), Some(Timestamp::new_second(10)))),
((Timestamp::new_second(8), Some(Timestamp::new_second(9))), (Timestamp::new_second(5), Some(Timestamp::new_second(10)))),
],
}];
let query_engine = create_test_query_engine();
let ctx = QueryContext::arc();
for TestCase { sql, aligns } in testcases {
let plan = sql_to_df_plan(ctx.clone(), query_engine.clone(), &sql, true)
.await
.unwrap();
let (column_name, time_window_expr, _, df_schema) = find_time_window_expr(
&plan,
query_engine.engine_state().catalog_manager().clone(),
ctx.clone(),
)
.await
.unwrap();
let time_window_expr = time_window_expr
.map(|expr| {
TimeWindowExpr::from_expr(
&expr,
&column_name,
&df_schema,
&query_engine.engine_state().session_state(),
)
})
.transpose()
.unwrap()
.unwrap();
let dirty = DirtyTimeWindows::default();
for (before_align, expected_after_align) in aligns {
let after_align = dirty
.align_time_window(before_align.0, before_align.1, &time_window_expr)
.unwrap();
assert_eq!(expected_after_align, after_align);
}
}
}
} }

View File

@@ -12,33 +12,32 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
use std::collections::HashSet; use std::collections::{BTreeSet, HashSet};
use std::ops::Deref; use std::ops::Deref;
use std::sync::{Arc, RwLock}; use std::sync::{Arc, RwLock};
use std::time::{Duration, SystemTime, UNIX_EPOCH}; use std::time::{Duration, SystemTime, UNIX_EPOCH};
use api::v1::CreateTableExpr; use api::v1::CreateTableExpr;
use arrow_schema::Fields; use arrow_schema::Fields;
use catalog::CatalogManagerRef;
use common_error::ext::BoxedError; use common_error::ext::BoxedError;
use common_meta::key::table_name::TableNameKey; use common_query::logical_plan::breakup_insert_plan;
use common_meta::key::TableMetadataManagerRef;
use common_telemetry::tracing::warn; use common_telemetry::tracing::warn;
use common_telemetry::{debug, info}; use common_telemetry::{debug, info};
use common_time::Timestamp; use common_time::Timestamp;
use datafusion::optimizer::analyzer::count_wildcard_rule::CountWildcardRule;
use datafusion::optimizer::AnalyzerRule;
use datafusion::sql::unparser::expr_to_sql; use datafusion::sql::unparser::expr_to_sql;
use datafusion_common::tree_node::TreeNode; use datafusion_common::tree_node::{Transformed, TreeNode};
use datafusion_expr::{DmlStatement, LogicalPlan, WriteOp}; use datafusion_expr::{DmlStatement, LogicalPlan, WriteOp};
use datatypes::prelude::ConcreteDataType; use datatypes::prelude::ConcreteDataType;
use datatypes::schema::constraint::NOW_FN; use datatypes::schema::{ColumnSchema, Schema};
use datatypes::schema::{ColumnDefaultConstraint, ColumnSchema};
use datatypes::value::Value;
use operator::expr_helper::column_schemas_to_defs; use operator::expr_helper::column_schemas_to_defs;
use query::query_engine::DefaultSerializer; use query::query_engine::DefaultSerializer;
use query::QueryEngineRef; use query::QueryEngineRef;
use session::context::QueryContextRef; use session::context::QueryContextRef;
use snafu::{OptionExt, ResultExt}; use snafu::{OptionExt, ResultExt};
use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan}; use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan};
use table::metadata::RawTableMeta;
use tokio::sync::oneshot; use tokio::sync::oneshot;
use tokio::sync::oneshot::error::TryRecvError; use tokio::sync::oneshot::error::TryRecvError;
use tokio::time::Instant; use tokio::time::Instant;
@@ -48,14 +47,15 @@ use crate::batching_mode::frontend_client::FrontendClient;
use crate::batching_mode::state::TaskState; use crate::batching_mode::state::TaskState;
use crate::batching_mode::time_window::TimeWindowExpr; use crate::batching_mode::time_window::TimeWindowExpr;
use crate::batching_mode::utils::{ use crate::batching_mode::utils::{
sql_to_df_plan, AddAutoColumnRewriter, AddFilterRewriter, FindGroupByFinalName, get_table_info_df_schema, sql_to_df_plan, AddAutoColumnRewriter, AddFilterRewriter,
FindGroupByFinalName,
}; };
use crate::batching_mode::{ use crate::batching_mode::{
DEFAULT_BATCHING_ENGINE_QUERY_TIMEOUT, MIN_REFRESH_DURATION, SLOW_QUERY_THRESHOLD, DEFAULT_BATCHING_ENGINE_QUERY_TIMEOUT, MIN_REFRESH_DURATION, SLOW_QUERY_THRESHOLD,
}; };
use crate::error::{ use crate::error::{
ConvertColumnSchemaSnafu, DatafusionSnafu, DatatypesSnafu, ExternalSnafu, InvalidRequestSnafu, ConvertColumnSchemaSnafu, DatafusionSnafu, ExternalSnafu, InvalidQuerySnafu,
SubstraitEncodeLogicalPlanSnafu, TableNotFoundMetaSnafu, TableNotFoundSnafu, UnexpectedSnafu, SubstraitEncodeLogicalPlanSnafu, UnexpectedSnafu,
}; };
use crate::metrics::{ use crate::metrics::{
METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME, METRIC_FLOW_BATCHING_ENGINE_SLOW_QUERY, METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME, METRIC_FLOW_BATCHING_ENGINE_SLOW_QUERY,
@@ -73,7 +73,7 @@ pub struct TaskConfig {
pub expire_after: Option<i64>, pub expire_after: Option<i64>,
sink_table_name: [String; 3], sink_table_name: [String; 3],
pub source_table_names: HashSet<[String; 3]>, pub source_table_names: HashSet<[String; 3]>,
table_meta: TableMetadataManagerRef, catalog_manager: CatalogManagerRef,
} }
#[derive(Clone)] #[derive(Clone)]
@@ -93,7 +93,7 @@ impl BatchingTask {
sink_table_name: [String; 3], sink_table_name: [String; 3],
source_table_names: Vec<[String; 3]>, source_table_names: Vec<[String; 3]>,
query_ctx: QueryContextRef, query_ctx: QueryContextRef,
table_meta: TableMetadataManagerRef, catalog_manager: CatalogManagerRef,
shutdown_rx: oneshot::Receiver<()>, shutdown_rx: oneshot::Receiver<()>,
) -> Self { ) -> Self {
Self { Self {
@@ -105,12 +105,42 @@ impl BatchingTask {
expire_after, expire_after,
sink_table_name, sink_table_name,
source_table_names: source_table_names.into_iter().collect(), source_table_names: source_table_names.into_iter().collect(),
table_meta, catalog_manager,
}), }),
state: Arc::new(RwLock::new(TaskState::new(query_ctx, shutdown_rx))), state: Arc::new(RwLock::new(TaskState::new(query_ctx, shutdown_rx))),
} }
} }
/// mark time window range (now - expire_after, now) as dirty (or (0, now) if expire_after not set)
///
/// useful for flush_flow to flush dirty time windows range
pub fn mark_all_windows_as_dirty(&self) -> Result<(), Error> {
let now = SystemTime::now();
let now = Timestamp::new_second(
now.duration_since(UNIX_EPOCH)
.expect("Time went backwards")
.as_secs() as _,
);
let lower_bound = self
.config
.expire_after
.map(|e| now.sub_duration(Duration::from_secs(e as _)))
.transpose()
.map_err(BoxedError::new)
.context(ExternalSnafu)?
.unwrap_or(Timestamp::new_second(0));
debug!(
"Flow {} mark range ({:?}, {:?}) as dirty",
self.config.flow_id, lower_bound, now
);
self.state
.write()
.unwrap()
.dirty_time_windows
.add_window(lower_bound, Some(now));
Ok(())
}
/// Test execute, for check syntax or such /// Test execute, for check syntax or such
pub async fn check_execute( pub async fn check_execute(
&self, &self,
@@ -148,13 +178,8 @@ impl BatchingTask {
async fn is_table_exist(&self, table_name: &[String; 3]) -> Result<bool, Error> { async fn is_table_exist(&self, table_name: &[String; 3]) -> Result<bool, Error> {
self.config self.config
.table_meta .catalog_manager
.table_name_manager() .table_exists(&table_name[0], &table_name[1], &table_name[2], None)
.exists(TableNameKey {
catalog: &table_name[0],
schema: &table_name[1],
table: &table_name[2],
})
.await .await
.map_err(BoxedError::new) .map_err(BoxedError::new)
.context(ExternalSnafu) .context(ExternalSnafu)
@@ -166,8 +191,10 @@ impl BatchingTask {
frontend_client: &Arc<FrontendClient>, frontend_client: &Arc<FrontendClient>,
) -> Result<Option<(u32, Duration)>, Error> { ) -> Result<Option<(u32, Duration)>, Error> {
if let Some(new_query) = self.gen_insert_plan(engine).await? { if let Some(new_query) = self.gen_insert_plan(engine).await? {
debug!("Generate new query: {:#?}", new_query);
self.execute_logical_plan(frontend_client, &new_query).await self.execute_logical_plan(frontend_client, &new_query).await
} else { } else {
debug!("Generate no query");
Ok(None) Ok(None)
} }
} }
@@ -176,67 +203,35 @@ impl BatchingTask {
&self, &self,
engine: &QueryEngineRef, engine: &QueryEngineRef,
) -> Result<Option<LogicalPlan>, Error> { ) -> Result<Option<LogicalPlan>, Error> {
let full_table_name = self.config.sink_table_name.clone().join("."); let (table, df_schema) = get_table_info_df_schema(
self.config.catalog_manager.clone(),
let table_id = self self.config.sink_table_name.clone(),
.config )
.table_meta .await?;
.table_name_manager()
.get(common_meta::key::table_name::TableNameKey::new(
&self.config.sink_table_name[0],
&self.config.sink_table_name[1],
&self.config.sink_table_name[2],
))
.await
.with_context(|_| TableNotFoundMetaSnafu {
msg: full_table_name.clone(),
})?
.map(|t| t.table_id())
.with_context(|| TableNotFoundSnafu {
name: full_table_name.clone(),
})?;
let table = self
.config
.table_meta
.table_info_manager()
.get(table_id)
.await
.with_context(|_| TableNotFoundMetaSnafu {
msg: full_table_name.clone(),
})?
.with_context(|| TableNotFoundSnafu {
name: full_table_name.clone(),
})?
.into_inner();
let schema: datatypes::schema::Schema = table
.table_info
.meta
.schema
.clone()
.try_into()
.with_context(|_| DatatypesSnafu {
extra: format!(
"Failed to convert schema from raw schema, raw_schema={:?}",
table.table_info.meta.schema
),
})?;
let df_schema = Arc::new(schema.arrow_schema().clone().try_into().with_context(|_| {
DatafusionSnafu {
context: format!(
"Failed to convert arrow schema to datafusion schema, arrow_schema={:?}",
schema.arrow_schema()
),
}
})?);
let new_query = self let new_query = self
.gen_query_with_time_window(engine.clone(), &table.table_info.meta) .gen_query_with_time_window(engine.clone(), &table.meta.schema)
.await?; .await?;
let insert_into = if let Some((new_query, _column_cnt)) = new_query { let insert_into = if let Some((new_query, _column_cnt)) = new_query {
// first check if all columns in input query exists in sink table
// since insert into ref to names in record batch generate by given query
let table_columns = df_schema
.columns()
.into_iter()
.map(|c| c.name)
.collect::<BTreeSet<_>>();
for column in new_query.schema().columns() {
if !table_columns.contains(column.name()) {
return InvalidQuerySnafu {
reason: format!(
"Column {} not found in sink table with columns {:?}",
column, table_columns
),
}
.fail();
}
}
// update_at& time index placeholder (if exists) should have default value // update_at& time index placeholder (if exists) should have default value
LogicalPlan::Dml(DmlStatement::new( LogicalPlan::Dml(DmlStatement::new(
datafusion_common::TableReference::Full { datafusion_common::TableReference::Full {
@@ -251,6 +246,9 @@ impl BatchingTask {
} else { } else {
return Ok(None); return Ok(None);
}; };
let insert_into = insert_into.recompute_schema().context(DatafusionSnafu {
context: "Failed to recompute schema",
})?;
Ok(Some(insert_into)) Ok(Some(insert_into))
} }
@@ -259,14 +257,11 @@ impl BatchingTask {
frontend_client: &Arc<FrontendClient>, frontend_client: &Arc<FrontendClient>,
expr: CreateTableExpr, expr: CreateTableExpr,
) -> Result<(), Error> { ) -> Result<(), Error> {
let db_client = frontend_client.get_database_client().await?; let catalog = &self.config.sink_table_name[0];
db_client let schema = &self.config.sink_table_name[1];
.database frontend_client
.create(expr.clone()) .create(expr.clone(), catalog, schema)
.await .await?;
.with_context(|_| InvalidRequestSnafu {
context: format!("Failed to create table with expr: {:?}", expr),
})?;
Ok(()) Ok(())
} }
@@ -277,27 +272,78 @@ impl BatchingTask {
) -> Result<Option<(u32, Duration)>, Error> { ) -> Result<Option<(u32, Duration)>, Error> {
let instant = Instant::now(); let instant = Instant::now();
let flow_id = self.config.flow_id; let flow_id = self.config.flow_id;
let db_client = frontend_client.get_database_client().await?;
let peer_addr = db_client.peer.addr;
debug!( debug!(
"Executing flow {flow_id}(expire_after={:?} secs) on {:?} with query {}", "Executing flow {flow_id}(expire_after={:?} secs) with query {}",
self.config.expire_after, peer_addr, &plan self.config.expire_after, &plan
); );
let timer = METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME let catalog = &self.config.sink_table_name[0];
let schema = &self.config.sink_table_name[1];
// fix all table ref by make it fully qualified, i.e. "table_name" => "catalog_name.schema_name.table_name"
let fixed_plan = plan
.clone()
.transform_down_with_subqueries(|p| {
if let LogicalPlan::TableScan(mut table_scan) = p {
let resolved = table_scan.table_name.resolve(catalog, schema);
table_scan.table_name = resolved.into();
Ok(Transformed::yes(LogicalPlan::TableScan(table_scan)))
} else {
Ok(Transformed::no(p))
}
})
.with_context(|_| DatafusionSnafu {
context: format!("Failed to fix table ref in logical plan, plan={:?}", plan),
})?
.data;
let expanded_plan = CountWildcardRule::new()
.analyze(fixed_plan.clone(), &Default::default())
.with_context(|_| DatafusionSnafu {
context: format!(
"Failed to expand wildcard in logical plan, plan={:?}",
fixed_plan
),
})?;
let plan = expanded_plan;
let mut peer_desc = None;
let res = {
let _timer = METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME
.with_label_values(&[flow_id.to_string().as_str()]) .with_label_values(&[flow_id.to_string().as_str()])
.start_timer(); .start_timer();
// hack and special handling the insert logical plan
let req = if let Some((insert_to, insert_plan)) =
breakup_insert_plan(&plan, catalog, schema)
{
let message = DFLogicalSubstraitConvertor {} let message = DFLogicalSubstraitConvertor {}
.encode(plan, DefaultSerializer) .encode(&insert_plan, DefaultSerializer)
.context(SubstraitEncodeLogicalPlanSnafu)?;
api::v1::greptime_request::Request::Query(api::v1::QueryRequest {
query: Some(api::v1::query_request::Query::InsertIntoPlan(
api::v1::InsertIntoPlan {
table_name: Some(insert_to),
logical_plan: message.to_vec(),
},
)),
})
} else {
let message = DFLogicalSubstraitConvertor {}
.encode(&plan, DefaultSerializer)
.context(SubstraitEncodeLogicalPlanSnafu)?; .context(SubstraitEncodeLogicalPlanSnafu)?;
let req = api::v1::greptime_request::Request::Query(api::v1::QueryRequest { api::v1::greptime_request::Request::Query(api::v1::QueryRequest {
query: Some(api::v1::query_request::Query::LogicalPlan(message.to_vec())), query: Some(api::v1::query_request::Query::LogicalPlan(message.to_vec())),
}); })
};
let res = db_client.database.handle(req).await; frontend_client
drop(timer); .handle(req, catalog, schema, &mut peer_desc)
.await
};
let elapsed = instant.elapsed(); let elapsed = instant.elapsed();
if let Ok(affected_rows) = &res { if let Ok(affected_rows) = &res {
@@ -307,19 +353,23 @@ impl BatchingTask {
); );
} else if let Err(err) = &res { } else if let Err(err) = &res {
warn!( warn!(
"Failed to execute Flow {flow_id} on frontend {}, result: {err:?}, elapsed: {:?} with query: {}", "Failed to execute Flow {flow_id} on frontend {:?}, result: {err:?}, elapsed: {:?} with query: {}",
peer_addr, elapsed, &plan peer_desc, elapsed, &plan
); );
} }
// record slow query // record slow query
if elapsed >= SLOW_QUERY_THRESHOLD { if elapsed >= SLOW_QUERY_THRESHOLD {
warn!( warn!(
"Flow {flow_id} on frontend {} executed for {:?} before complete, query: {}", "Flow {flow_id} on frontend {:?} executed for {:?} before complete, query: {}",
peer_addr, elapsed, &plan peer_desc, elapsed, &plan
); );
METRIC_FLOW_BATCHING_ENGINE_SLOW_QUERY METRIC_FLOW_BATCHING_ENGINE_SLOW_QUERY
.with_label_values(&[flow_id.to_string().as_str(), &plan.to_string(), &peer_addr]) .with_label_values(&[
flow_id.to_string().as_str(),
&plan.to_string(),
&peer_desc.unwrap_or_default().to_string(),
])
.observe(elapsed.as_secs_f64()); .observe(elapsed.as_secs_f64());
} }
@@ -328,12 +378,7 @@ impl BatchingTask {
.unwrap() .unwrap()
.after_query_exec(elapsed, res.is_ok()); .after_query_exec(elapsed, res.is_ok());
let res = res.context(InvalidRequestSnafu { let res = res?;
context: format!(
"Failed to execute query for flow={}: \'{}\'",
self.config.flow_id, &plan
),
})?;
Ok(Some((res, elapsed))) Ok(Some((res, elapsed)))
} }
@@ -372,7 +417,10 @@ impl BatchingTask {
} }
Err(TryRecvError::Empty) => (), Err(TryRecvError::Empty) => (),
} }
state.get_next_start_query_time(Some(DEFAULT_BATCHING_ENGINE_QUERY_TIMEOUT)) state.get_next_start_query_time(
self.config.flow_id,
Some(DEFAULT_BATCHING_ENGINE_QUERY_TIMEOUT),
)
}; };
tokio::time::sleep_until(sleep_until).await; tokio::time::sleep_until(sleep_until).await;
} }
@@ -386,14 +434,18 @@ impl BatchingTask {
continue; continue;
} }
// TODO(discord9): this error should have better place to go, but for now just print error, also more context is needed // TODO(discord9): this error should have better place to go, but for now just print error, also more context is needed
Err(err) => match new_query { Err(err) => {
match new_query {
Some(query) => { Some(query) => {
common_telemetry::error!(err; "Failed to execute query for flow={} with query: {query}", self.config.flow_id) common_telemetry::error!(err; "Failed to execute query for flow={} with query: {query}", self.config.flow_id)
} }
None => { None => {
common_telemetry::error!(err; "Failed to generate query for flow={}", self.config.flow_id) common_telemetry::error!(err; "Failed to generate query for flow={}", self.config.flow_id)
} }
}, }
// also sleep for a little while before try again to prevent flooding logs
tokio::time::sleep(MIN_REFRESH_DURATION).await;
}
} }
} }
} }
@@ -418,7 +470,7 @@ impl BatchingTask {
async fn gen_query_with_time_window( async fn gen_query_with_time_window(
&self, &self,
engine: QueryEngineRef, engine: QueryEngineRef,
sink_table_meta: &RawTableMeta, sink_table_schema: &Arc<Schema>,
) -> Result<Option<(LogicalPlan, usize)>, Error> { ) -> Result<Option<(LogicalPlan, usize)>, Error> {
let query_ctx = self.state.read().unwrap().query_ctx.clone(); let query_ctx = self.state.read().unwrap().query_ctx.clone();
let start = SystemTime::now(); let start = SystemTime::now();
@@ -477,9 +529,11 @@ impl BatchingTask {
debug!( debug!(
"Flow id = {:?}, can't get window size: precise_lower_bound={expire_time_window_bound:?}, using the same query", self.config.flow_id "Flow id = {:?}, can't get window size: precise_lower_bound={expire_time_window_bound:?}, using the same query", self.config.flow_id
); );
// clean dirty time window too, this could be from create flow's check_execute
self.state.write().unwrap().dirty_time_windows.clean();
let mut add_auto_column = let mut add_auto_column =
AddAutoColumnRewriter::new(sink_table_meta.schema.clone()); AddAutoColumnRewriter::new(sink_table_schema.clone());
let plan = self let plan = self
.config .config
.plan .plan
@@ -515,8 +569,10 @@ impl BatchingTask {
return Ok(None); return Ok(None);
}; };
// TODO(discord9): add auto column or not? This might break compatibility for auto created sink table before this, but that's ok right?
let mut add_filter = AddFilterRewriter::new(expr); let mut add_filter = AddFilterRewriter::new(expr);
let mut add_auto_column = AddAutoColumnRewriter::new(sink_table_meta.schema.clone()); let mut add_auto_column = AddAutoColumnRewriter::new(sink_table_schema.clone());
// make a not optimized plan for clearer unparse // make a not optimized plan for clearer unparse
let plan = sql_to_df_plan(query_ctx.clone(), engine.clone(), &self.config.query, false) let plan = sql_to_df_plan(query_ctx.clone(), engine.clone(), &self.config.query, false)
.await?; .await?;
@@ -534,7 +590,7 @@ impl BatchingTask {
} }
// auto created table have a auto added column `update_at`, and optional have a `AUTO_CREATED_PLACEHOLDER_TS_COL` column for time index placeholder if no timestamp column is specified // auto created table have a auto added column `update_at`, and optional have a `AUTO_CREATED_PLACEHOLDER_TS_COL` column for time index placeholder if no timestamp column is specified
// TODO(discord9): unit test // TODO(discord9): for now no default value is set for auto added column for compatibility reason with streaming mode, but this might change in favor of simpler code?
fn create_table_with_expr( fn create_table_with_expr(
plan: &LogicalPlan, plan: &LogicalPlan,
sink_table_name: &[String; 3], sink_table_name: &[String; 3],
@@ -558,11 +614,7 @@ fn create_table_with_expr(
AUTO_CREATED_UPDATE_AT_TS_COL, AUTO_CREATED_UPDATE_AT_TS_COL,
ConcreteDataType::timestamp_millisecond_datatype(), ConcreteDataType::timestamp_millisecond_datatype(),
true, true,
) );
.with_default_constraint(Some(ColumnDefaultConstraint::Function(NOW_FN.to_string())))
.context(DatatypesSnafu {
extra: "Failed to build column `update_at TimestampMillisecond default now()`",
})?;
column_schemas.push(update_at_schema); column_schemas.push(update_at_schema);
let time_index = if let Some(time_index) = first_time_stamp { let time_index = if let Some(time_index) = first_time_stamp {
@@ -574,16 +626,7 @@ fn create_table_with_expr(
ConcreteDataType::timestamp_millisecond_datatype(), ConcreteDataType::timestamp_millisecond_datatype(),
false, false,
) )
.with_time_index(true) .with_time_index(true),
.with_default_constraint(Some(ColumnDefaultConstraint::Value(Value::Timestamp(
Timestamp::new_millisecond(0),
))))
.context(DatatypesSnafu {
extra: format!(
"Failed to build column `{} TimestampMillisecond TIME INDEX default 0`",
AUTO_CREATED_PLACEHOLDER_TS_COL
),
})?,
); );
AUTO_CREATED_PLACEHOLDER_TS_COL.to_string() AUTO_CREATED_PLACEHOLDER_TS_COL.to_string()
}; };
@@ -675,20 +718,14 @@ mod test {
AUTO_CREATED_UPDATE_AT_TS_COL, AUTO_CREATED_UPDATE_AT_TS_COL,
ConcreteDataType::timestamp_millisecond_datatype(), ConcreteDataType::timestamp_millisecond_datatype(),
true, true,
) );
.with_default_constraint(Some(ColumnDefaultConstraint::Function(NOW_FN.to_string())))
.unwrap();
let ts_placeholder_schema = ColumnSchema::new( let ts_placeholder_schema = ColumnSchema::new(
AUTO_CREATED_PLACEHOLDER_TS_COL, AUTO_CREATED_PLACEHOLDER_TS_COL,
ConcreteDataType::timestamp_millisecond_datatype(), ConcreteDataType::timestamp_millisecond_datatype(),
false, false,
) )
.with_time_index(true) .with_time_index(true);
.with_default_constraint(Some(ColumnDefaultConstraint::Value(Value::Timestamp(
Timestamp::new_millisecond(0),
))))
.unwrap();
let testcases = vec![ let testcases = vec![
TestCase { TestCase {

View File

@@ -72,6 +72,17 @@ pub struct TimeWindowExpr {
df_schema: DFSchema, df_schema: DFSchema,
} }
impl std::fmt::Display for TimeWindowExpr {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("TimeWindowExpr")
.field("phy_expr", &self.phy_expr.to_string())
.field("column_name", &self.column_name)
.field("logical_expr", &self.logical_expr.to_string())
.field("df_schema", &self.df_schema)
.finish()
}
}
impl TimeWindowExpr { impl TimeWindowExpr {
pub fn from_expr( pub fn from_expr(
expr: &Expr, expr: &Expr,
@@ -256,7 +267,7 @@ fn columnar_to_ts_vector(columnar: &ColumnarValue) -> Result<Vec<Option<Timestam
Ok(val) Ok(val)
} }
/// Return (the column name of time index column, the time window expr, the expected time unit of time index column, the expr's schema for evaluating the time window) /// Return (`the column name of time index column`, `the time window expr`, `the expected time unit of time index column`, `the expr's schema for evaluating the time window`)
/// ///
/// The time window expr is expected to have one input column with Timestamp type, and also return Timestamp type, the time window expr is expected /// The time window expr is expected to have one input column with Timestamp type, and also return Timestamp type, the time window expr is expected
/// to be monotonic increasing and appears in the innermost GROUP BY clause /// to be monotonic increasing and appears in the innermost GROUP BY clause

View File

@@ -14,29 +14,63 @@
//! some utils for helping with batching mode //! some utils for helping with batching mode
use std::collections::HashSet; use std::collections::{BTreeSet, HashSet};
use std::sync::Arc; use std::sync::Arc;
use catalog::CatalogManagerRef;
use common_error::ext::BoxedError; use common_error::ext::BoxedError;
use common_telemetry::{debug, info}; use common_telemetry::debug;
use datafusion::error::Result as DfResult; use datafusion::error::Result as DfResult;
use datafusion::logical_expr::Expr; use datafusion::logical_expr::Expr;
use datafusion::sql::unparser::Unparser; use datafusion::sql::unparser::Unparser;
use datafusion_common::tree_node::{ use datafusion_common::tree_node::{
Transformed, TreeNodeRecursion, TreeNodeRewriter, TreeNodeVisitor, Transformed, TreeNodeRecursion, TreeNodeRewriter, TreeNodeVisitor,
}; };
use datafusion_common::DataFusionError; use datafusion_common::{DFSchema, DataFusionError, ScalarValue};
use datafusion_expr::{Distinct, LogicalPlan}; use datafusion_expr::{Distinct, LogicalPlan, Projection};
use datatypes::schema::RawSchema; use datatypes::schema::SchemaRef;
use query::parser::QueryLanguageParser; use query::parser::QueryLanguageParser;
use query::QueryEngineRef; use query::QueryEngineRef;
use session::context::QueryContextRef; use session::context::QueryContextRef;
use snafu::ResultExt; use snafu::{OptionExt, ResultExt};
use table::metadata::TableInfo;
use crate::adapter::AUTO_CREATED_PLACEHOLDER_TS_COL; use crate::adapter::AUTO_CREATED_PLACEHOLDER_TS_COL;
use crate::df_optimizer::apply_df_optimizer; use crate::df_optimizer::apply_df_optimizer;
use crate::error::{DatafusionSnafu, ExternalSnafu}; use crate::error::{DatafusionSnafu, ExternalSnafu, TableNotFoundSnafu};
use crate::Error; use crate::{Error, TableName};
pub async fn get_table_info_df_schema(
catalog_mr: CatalogManagerRef,
table_name: TableName,
) -> Result<(Arc<TableInfo>, Arc<DFSchema>), Error> {
let full_table_name = table_name.clone().join(".");
let table = catalog_mr
.table(&table_name[0], &table_name[1], &table_name[2], None)
.await
.map_err(BoxedError::new)
.context(ExternalSnafu)?
.with_context(|| TableNotFoundSnafu {
name: full_table_name.clone(),
})?;
let table_info = table.table_info().clone();
let schema = table_info.meta.schema.clone();
let df_schema: Arc<DFSchema> = Arc::new(
schema
.arrow_schema()
.clone()
.try_into()
.with_context(|_| DatafusionSnafu {
context: format!(
"Failed to convert arrow schema to datafusion schema, arrow_schema={:?}",
schema.arrow_schema()
),
})?,
);
Ok((table_info, df_schema))
}
/// Convert sql to datafusion logical plan /// Convert sql to datafusion logical plan
pub async fn sql_to_df_plan( pub async fn sql_to_df_plan(
@@ -164,14 +198,16 @@ impl TreeNodeVisitor<'_> for FindGroupByFinalName {
/// (which doesn't necessary need to have exact name just need to be a extra timestamp column) /// (which doesn't necessary need to have exact name just need to be a extra timestamp column)
/// and `__ts_placeholder`(this column need to have exact this name and be a timestamp) /// and `__ts_placeholder`(this column need to have exact this name and be a timestamp)
/// with values like `now()` and `0` /// with values like `now()` and `0`
///
/// it also give existing columns alias to column in sink table if needed
#[derive(Debug)] #[derive(Debug)]
pub struct AddAutoColumnRewriter { pub struct AddAutoColumnRewriter {
pub schema: RawSchema, pub schema: SchemaRef,
pub is_rewritten: bool, pub is_rewritten: bool,
} }
impl AddAutoColumnRewriter { impl AddAutoColumnRewriter {
pub fn new(schema: RawSchema) -> Self { pub fn new(schema: SchemaRef) -> Self {
Self { Self {
schema, schema,
is_rewritten: false, is_rewritten: false,
@@ -181,37 +217,97 @@ impl AddAutoColumnRewriter {
impl TreeNodeRewriter for AddAutoColumnRewriter { impl TreeNodeRewriter for AddAutoColumnRewriter {
type Node = LogicalPlan; type Node = LogicalPlan;
fn f_down(&mut self, node: Self::Node) -> DfResult<Transformed<Self::Node>> { fn f_down(&mut self, mut node: Self::Node) -> DfResult<Transformed<Self::Node>> {
if self.is_rewritten { if self.is_rewritten {
return Ok(Transformed::no(node)); return Ok(Transformed::no(node));
} }
// if is distinct all, go one level down // if is distinct all, wrap it in a projection
if let LogicalPlan::Distinct(Distinct::All(_)) = node { if let LogicalPlan::Distinct(Distinct::All(_)) = &node {
return Ok(Transformed::no(node)); let mut exprs = vec![];
for field in node.schema().fields().iter() {
exprs.push(Expr::Column(datafusion::common::Column::new_unqualified(
field.name(),
)));
} }
// FIXME(discord9): just read plan.expr and do stuffs let projection =
let mut exprs = node.expressions(); LogicalPlan::Projection(Projection::try_new(exprs, Arc::new(node.clone()))?);
node = projection;
}
// handle table_scan by wrap it in a projection
else if let LogicalPlan::TableScan(table_scan) = node {
let mut exprs = vec![];
for field in table_scan.projected_schema.fields().iter() {
exprs.push(Expr::Column(datafusion::common::Column::new(
Some(table_scan.table_name.clone()),
field.name(),
)));
}
let projection = LogicalPlan::Projection(Projection::try_new(
exprs,
Arc::new(LogicalPlan::TableScan(table_scan)),
)?);
node = projection;
}
// only do rewrite if found the outermost projection
let mut exprs = if let LogicalPlan::Projection(project) = &node {
project.expr.clone()
} else {
return Ok(Transformed::no(node));
};
let all_names = self
.schema
.column_schemas()
.iter()
.map(|c| c.name.clone())
.collect::<BTreeSet<_>>();
// first match by position
for (idx, expr) in exprs.iter_mut().enumerate() {
if !all_names.contains(&expr.qualified_name().1) {
if let Some(col_name) = self
.schema
.column_schemas()
.get(idx)
.map(|c| c.name.clone())
{
// if the data type mismatched, later check_execute will error out
// hence no need to check it here, beside, optimize pass might be able to cast it
// so checking here is not necessary
*expr = expr.clone().alias(col_name);
}
}
}
// add columns if have different column count // add columns if have different column count
let query_col_cnt = exprs.len(); let query_col_cnt = exprs.len();
let table_col_cnt = self.schema.column_schemas.len(); let table_col_cnt = self.schema.column_schemas().len();
info!("query_col_cnt={query_col_cnt}, table_col_cnt={table_col_cnt}"); debug!("query_col_cnt={query_col_cnt}, table_col_cnt={table_col_cnt}");
let placeholder_ts_expr =
datafusion::logical_expr::lit(ScalarValue::TimestampMillisecond(Some(0), None))
.alias(AUTO_CREATED_PLACEHOLDER_TS_COL);
if query_col_cnt == table_col_cnt { if query_col_cnt == table_col_cnt {
self.is_rewritten = true; // still need to add alias, see below
return Ok(Transformed::no(node));
} else if query_col_cnt + 1 == table_col_cnt { } else if query_col_cnt + 1 == table_col_cnt {
let last_col_schema = self.schema.column_schemas.last().unwrap(); let last_col_schema = self.schema.column_schemas().last().unwrap();
// if time index column is auto created add it // if time index column is auto created add it
if last_col_schema.name == AUTO_CREATED_PLACEHOLDER_TS_COL if last_col_schema.name == AUTO_CREATED_PLACEHOLDER_TS_COL
&& self.schema.timestamp_index == Some(table_col_cnt - 1) && self.schema.timestamp_index() == Some(table_col_cnt - 1)
{ {
exprs.push(datafusion::logical_expr::lit(0)); exprs.push(placeholder_ts_expr);
} else if last_col_schema.data_type.is_timestamp() { } else if last_col_schema.data_type.is_timestamp() {
// is the update at column // is the update at column
exprs.push(datafusion::prelude::now()); exprs.push(datafusion::prelude::now().alias(&last_col_schema.name));
} else { } else {
// helpful error message // helpful error message
return Err(DataFusionError::Plan(format!( return Err(DataFusionError::Plan(format!(
@@ -221,11 +317,11 @@ impl TreeNodeRewriter for AddAutoColumnRewriter {
))); )));
} }
} else if query_col_cnt + 2 == table_col_cnt { } else if query_col_cnt + 2 == table_col_cnt {
let mut col_iter = self.schema.column_schemas.iter().rev(); let mut col_iter = self.schema.column_schemas().iter().rev();
let last_col_schema = col_iter.next().unwrap(); let last_col_schema = col_iter.next().unwrap();
let second_last_col_schema = col_iter.next().unwrap(); let second_last_col_schema = col_iter.next().unwrap();
if second_last_col_schema.data_type.is_timestamp() { if second_last_col_schema.data_type.is_timestamp() {
exprs.push(datafusion::prelude::now()); exprs.push(datafusion::prelude::now().alias(&second_last_col_schema.name));
} else { } else {
return Err(DataFusionError::Plan(format!( return Err(DataFusionError::Plan(format!(
"Expect the second last column in the table to be timestamp column, found column {} with type {:?}", "Expect the second last column in the table to be timestamp column, found column {} with type {:?}",
@@ -235,9 +331,9 @@ impl TreeNodeRewriter for AddAutoColumnRewriter {
} }
if last_col_schema.name == AUTO_CREATED_PLACEHOLDER_TS_COL if last_col_schema.name == AUTO_CREATED_PLACEHOLDER_TS_COL
&& self.schema.timestamp_index == Some(table_col_cnt - 1) && self.schema.timestamp_index() == Some(table_col_cnt - 1)
{ {
exprs.push(datafusion::logical_expr::lit(0)); exprs.push(placeholder_ts_expr);
} else { } else {
return Err(DataFusionError::Plan(format!( return Err(DataFusionError::Plan(format!(
"Expect timestamp column {}, found {:?}", "Expect timestamp column {}, found {:?}",
@@ -246,8 +342,8 @@ impl TreeNodeRewriter for AddAutoColumnRewriter {
} }
} else { } else {
return Err(DataFusionError::Plan(format!( return Err(DataFusionError::Plan(format!(
"Expect table have 0,1 or 2 columns more than query columns, found {} query columns {:?}, {} table columns {:?}", "Expect table have 0,1 or 2 columns more than query columns, found {} query columns {:?}, {} table columns {:?} at node {:?}",
query_col_cnt, node.expressions(), table_col_cnt, self.schema.column_schemas query_col_cnt, exprs, table_col_cnt, self.schema.column_schemas(), node
))); )));
} }
@@ -255,6 +351,11 @@ impl TreeNodeRewriter for AddAutoColumnRewriter {
let new_plan = node.with_new_exprs(exprs, node.inputs().into_iter().cloned().collect())?; let new_plan = node.with_new_exprs(exprs, node.inputs().into_iter().cloned().collect())?;
Ok(Transformed::yes(new_plan)) Ok(Transformed::yes(new_plan))
} }
/// We might add new columns, so we need to recompute the schema
fn f_up(&mut self, node: Self::Node) -> DfResult<Transformed<Self::Node>> {
node.recompute_schema().map(Transformed::yes)
}
} }
// TODO(discord9): a method to found out the precise time window // TODO(discord9): a method to found out the precise time window
@@ -301,9 +402,11 @@ impl TreeNodeRewriter for AddFilterRewriter {
#[cfg(test)] #[cfg(test)]
mod test { mod test {
use std::sync::Arc;
use datafusion_common::tree_node::TreeNode as _; use datafusion_common::tree_node::TreeNode as _;
use datatypes::prelude::ConcreteDataType; use datatypes::prelude::ConcreteDataType;
use datatypes::schema::ColumnSchema; use datatypes::schema::{ColumnSchema, Schema};
use pretty_assertions::assert_eq; use pretty_assertions::assert_eq;
use session::context::QueryContext; use session::context::QueryContext;
@@ -386,7 +489,7 @@ mod test {
// add update_at // add update_at
( (
"SELECT number FROM numbers_with_ts", "SELECT number FROM numbers_with_ts",
Ok("SELECT numbers_with_ts.number, now() FROM numbers_with_ts"), Ok("SELECT numbers_with_ts.number, now() AS ts FROM numbers_with_ts"),
vec![ vec![
ColumnSchema::new("number", ConcreteDataType::int32_datatype(), true), ColumnSchema::new("number", ConcreteDataType::int32_datatype(), true),
ColumnSchema::new( ColumnSchema::new(
@@ -400,7 +503,7 @@ mod test {
// add ts placeholder // add ts placeholder
( (
"SELECT number FROM numbers_with_ts", "SELECT number FROM numbers_with_ts",
Ok("SELECT numbers_with_ts.number, 0 FROM numbers_with_ts"), Ok("SELECT numbers_with_ts.number, CAST('1970-01-01 00:00:00' AS TIMESTAMP) AS __ts_placeholder FROM numbers_with_ts"),
vec![ vec![
ColumnSchema::new("number", ConcreteDataType::int32_datatype(), true), ColumnSchema::new("number", ConcreteDataType::int32_datatype(), true),
ColumnSchema::new( ColumnSchema::new(
@@ -428,7 +531,7 @@ mod test {
// add update_at and ts placeholder // add update_at and ts placeholder
( (
"SELECT number FROM numbers_with_ts", "SELECT number FROM numbers_with_ts",
Ok("SELECT numbers_with_ts.number, now(), 0 FROM numbers_with_ts"), Ok("SELECT numbers_with_ts.number, now() AS update_at, CAST('1970-01-01 00:00:00' AS TIMESTAMP) AS __ts_placeholder FROM numbers_with_ts"),
vec![ vec![
ColumnSchema::new("number", ConcreteDataType::int32_datatype(), true), ColumnSchema::new("number", ConcreteDataType::int32_datatype(), true),
ColumnSchema::new( ColumnSchema::new(
@@ -447,7 +550,7 @@ mod test {
// add ts placeholder // add ts placeholder
( (
"SELECT number, ts FROM numbers_with_ts", "SELECT number, ts FROM numbers_with_ts",
Ok("SELECT numbers_with_ts.number, numbers_with_ts.ts, 0 FROM numbers_with_ts"), Ok("SELECT numbers_with_ts.number, numbers_with_ts.ts AS update_at, CAST('1970-01-01 00:00:00' AS TIMESTAMP) AS __ts_placeholder FROM numbers_with_ts"),
vec![ vec![
ColumnSchema::new("number", ConcreteDataType::int32_datatype(), true), ColumnSchema::new("number", ConcreteDataType::int32_datatype(), true),
ColumnSchema::new( ColumnSchema::new(
@@ -466,7 +569,7 @@ mod test {
// add update_at after time index column // add update_at after time index column
( (
"SELECT number, ts FROM numbers_with_ts", "SELECT number, ts FROM numbers_with_ts",
Ok("SELECT numbers_with_ts.number, numbers_with_ts.ts, now() FROM numbers_with_ts"), Ok("SELECT numbers_with_ts.number, numbers_with_ts.ts, now() AS update_atat FROM numbers_with_ts"),
vec![ vec![
ColumnSchema::new("number", ConcreteDataType::int32_datatype(), true), ColumnSchema::new("number", ConcreteDataType::int32_datatype(), true),
ColumnSchema::new( ColumnSchema::new(
@@ -528,8 +631,8 @@ mod test {
let query_engine = create_test_query_engine(); let query_engine = create_test_query_engine();
let ctx = QueryContext::arc(); let ctx = QueryContext::arc();
for (before, after, column_schemas) in testcases { for (before, after, column_schemas) in testcases {
let raw_schema = RawSchema::new(column_schemas); let schema = Arc::new(Schema::new(column_schemas));
let mut add_auto_column_rewriter = AddAutoColumnRewriter::new(raw_schema); let mut add_auto_column_rewriter = AddAutoColumnRewriter::new(schema);
let plan = sql_to_df_plan(ctx.clone(), query_engine.clone(), before, false) let plan = sql_to_df_plan(ctx.clone(), query_engine.clone(), before, false)
.await .await

View File

@@ -49,6 +49,8 @@ pub trait FlowEngine {
async fn flush_flow(&self, flow_id: FlowId) -> Result<usize, Error>; async fn flush_flow(&self, flow_id: FlowId) -> Result<usize, Error>;
/// Check if the flow exists /// Check if the flow exists
async fn flow_exist(&self, flow_id: FlowId) -> Result<bool, Error>; async fn flow_exist(&self, flow_id: FlowId) -> Result<bool, Error>;
/// List all flows
async fn list_flows(&self) -> Result<impl IntoIterator<Item = FlowId>, Error>;
/// Handle the insert requests for the flow /// Handle the insert requests for the flow
async fn handle_flow_inserts( async fn handle_flow_inserts(
&self, &self,

View File

@@ -149,6 +149,13 @@ pub enum Error {
location: Location, location: Location,
}, },
#[snafu(display("Unsupported: {reason}"))]
Unsupported {
reason: String,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Unsupported temporal filter: {reason}"))] #[snafu(display("Unsupported temporal filter: {reason}"))]
UnsupportedTemporalFilter { UnsupportedTemporalFilter {
reason: String, reason: String,
@@ -189,6 +196,25 @@ pub enum Error {
location: Location, location: Location,
}, },
#[snafu(display("Illegal check task state: {reason}"))]
IllegalCheckTaskState {
reason: String,
#[snafu(implicit)]
location: Location,
},
#[snafu(display(
"Failed to sync with check task for flow {} with allow_drop={}",
flow_id,
allow_drop
))]
SyncCheckTask {
flow_id: FlowId,
allow_drop: bool,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to start server"))] #[snafu(display("Failed to start server"))]
StartServer { StartServer {
#[snafu(implicit)] #[snafu(implicit)]
@@ -280,10 +306,12 @@ impl ErrorExt for Error {
Self::CreateFlow { .. } | Self::Arrow { .. } | Self::Time { .. } => { Self::CreateFlow { .. } | Self::Arrow { .. } | Self::Time { .. } => {
StatusCode::EngineExecuteQuery StatusCode::EngineExecuteQuery
} }
Self::Unexpected { .. } => StatusCode::Unexpected, Self::Unexpected { .. }
Self::NotImplemented { .. } | Self::UnsupportedTemporalFilter { .. } => { | Self::SyncCheckTask { .. }
StatusCode::Unsupported | Self::IllegalCheckTaskState { .. } => StatusCode::Unexpected,
} Self::NotImplemented { .. }
| Self::UnsupportedTemporalFilter { .. }
| Self::Unsupported { .. } => StatusCode::Unsupported,
Self::External { source, .. } => source.status_code(), Self::External { source, .. } => source.status_code(),
Self::Internal { .. } | Self::CacheRequired { .. } => StatusCode::Internal, Self::Internal { .. } | Self::CacheRequired { .. } => StatusCode::Internal,
Self::StartServer { source, .. } | Self::ShutdownServer { source, .. } => { Self::StartServer { source, .. } | Self::ShutdownServer { source, .. } => {

View File

@@ -43,8 +43,8 @@ mod utils;
#[cfg(test)] #[cfg(test)]
mod test_utils; mod test_utils;
pub use adapter::{FlowConfig, FlowWorkerManager, FlowWorkerManagerRef, FlownodeOptions}; pub use adapter::{FlowConfig, FlowStreamingEngine, FlowWorkerManagerRef, FlownodeOptions};
pub use batching_mode::frontend_client::FrontendClient; pub use batching_mode::frontend_client::{FrontendClient, GrpcQueryHandlerWithBoxedError};
pub(crate) use engine::{CreateFlowArgs, FlowId, TableName}; pub(crate) use engine::{CreateFlowArgs, FlowId, TableName};
pub use error::{Error, Result}; pub use error::{Error, Result};
pub use server::{ pub use server::{

View File

@@ -29,6 +29,7 @@ use common_meta::key::TableMetadataManagerRef;
use common_meta::kv_backend::KvBackendRef; use common_meta::kv_backend::KvBackendRef;
use common_meta::node_manager::{Flownode, NodeManagerRef}; use common_meta::node_manager::{Flownode, NodeManagerRef};
use common_query::Output; use common_query::Output;
use common_runtime::JoinHandle;
use common_telemetry::tracing::info; use common_telemetry::tracing::info;
use futures::{FutureExt, TryStreamExt}; use futures::{FutureExt, TryStreamExt};
use greptime_proto::v1::flow::{flow_server, FlowRequest, FlowResponse, InsertRequests}; use greptime_proto::v1::flow::{flow_server, FlowRequest, FlowResponse, InsertRequests};
@@ -50,7 +51,10 @@ use tonic::codec::CompressionEncoding;
use tonic::transport::server::TcpIncoming; use tonic::transport::server::TcpIncoming;
use tonic::{Request, Response, Status}; use tonic::{Request, Response, Status};
use crate::adapter::flownode_impl::{FlowDualEngine, FlowDualEngineRef};
use crate::adapter::{create_worker, FlowWorkerManagerRef}; use crate::adapter::{create_worker, FlowWorkerManagerRef};
use crate::batching_mode::engine::BatchingEngine;
use crate::engine::FlowEngine;
use crate::error::{ use crate::error::{
to_status_with_last_err, CacheRequiredSnafu, CreateFlowSnafu, ExternalSnafu, FlowNotFoundSnafu, to_status_with_last_err, CacheRequiredSnafu, CreateFlowSnafu, ExternalSnafu, FlowNotFoundSnafu,
ListFlowsSnafu, ParseAddrSnafu, ShutdownServerSnafu, StartServerSnafu, UnexpectedSnafu, ListFlowsSnafu, ParseAddrSnafu, ShutdownServerSnafu, StartServerSnafu, UnexpectedSnafu,
@@ -59,19 +63,21 @@ use crate::heartbeat::HeartbeatTask;
use crate::metrics::{METRIC_FLOW_PROCESSING_TIME, METRIC_FLOW_ROWS}; use crate::metrics::{METRIC_FLOW_PROCESSING_TIME, METRIC_FLOW_ROWS};
use crate::transform::register_function_to_query_engine; use crate::transform::register_function_to_query_engine;
use crate::utils::{SizeReportSender, StateReportHandler}; use crate::utils::{SizeReportSender, StateReportHandler};
use crate::{CreateFlowArgs, Error, FlowWorkerManager, FlownodeOptions, FrontendClient}; use crate::{CreateFlowArgs, Error, FlowStreamingEngine, FlownodeOptions, FrontendClient};
pub const FLOW_NODE_SERVER_NAME: &str = "FLOW_NODE_SERVER"; pub const FLOW_NODE_SERVER_NAME: &str = "FLOW_NODE_SERVER";
/// wrapping flow node manager to avoid orphan rule with Arc<...> /// wrapping flow node manager to avoid orphan rule with Arc<...>
#[derive(Clone)] #[derive(Clone)]
pub struct FlowService { pub struct FlowService {
/// TODO(discord9): replace with dual engine /// TODO(discord9): replace with dual engine
pub manager: FlowWorkerManagerRef, pub dual_engine: FlowDualEngineRef,
} }
impl FlowService { impl FlowService {
pub fn new(manager: FlowWorkerManagerRef) -> Self { pub fn new(manager: FlowDualEngineRef) -> Self {
Self { manager } Self {
dual_engine: manager,
}
} }
} }
@@ -86,7 +92,7 @@ impl flow_server::Flow for FlowService {
.start_timer(); .start_timer();
let request = request.into_inner(); let request = request.into_inner();
self.manager self.dual_engine
.handle(request) .handle(request)
.await .await
.map_err(|err| { .map_err(|err| {
@@ -126,7 +132,7 @@ impl flow_server::Flow for FlowService {
.with_label_values(&["in"]) .with_label_values(&["in"])
.inc_by(row_count as u64); .inc_by(row_count as u64);
self.manager self.dual_engine
.handle_inserts(request) .handle_inserts(request)
.await .await
.map(Response::new) .map(Response::new)
@@ -139,11 +145,16 @@ pub struct FlownodeServer {
inner: Arc<FlownodeServerInner>, inner: Arc<FlownodeServerInner>,
} }
/// FlownodeServerInner is the inner state of FlownodeServer,
/// this struct mostly useful for construct/start and stop the
/// flow node server
struct FlownodeServerInner { struct FlownodeServerInner {
/// worker shutdown signal, not to be confused with server_shutdown_tx /// worker shutdown signal, not to be confused with server_shutdown_tx
worker_shutdown_tx: Mutex<broadcast::Sender<()>>, worker_shutdown_tx: Mutex<broadcast::Sender<()>>,
/// server shutdown signal for shutdown grpc server /// server shutdown signal for shutdown grpc server
server_shutdown_tx: Mutex<broadcast::Sender<()>>, server_shutdown_tx: Mutex<broadcast::Sender<()>>,
/// streaming task handler
streaming_task_handler: Mutex<Option<JoinHandle<()>>>,
flow_service: FlowService, flow_service: FlowService,
} }
@@ -156,16 +167,28 @@ impl FlownodeServer {
flow_service, flow_service,
worker_shutdown_tx: Mutex::new(tx), worker_shutdown_tx: Mutex::new(tx),
server_shutdown_tx: Mutex::new(server_tx), server_shutdown_tx: Mutex::new(server_tx),
streaming_task_handler: Mutex::new(None),
}), }),
} }
} }
/// Start the background task for streaming computation. /// Start the background task for streaming computation.
async fn start_workers(&self) -> Result<(), Error> { async fn start_workers(&self) -> Result<(), Error> {
let manager_ref = self.inner.flow_service.manager.clone(); let manager_ref = self.inner.flow_service.dual_engine.clone();
let _handle = manager_ref let handle = manager_ref
.clone() .streaming_engine()
.run_background(Some(self.inner.worker_shutdown_tx.lock().await.subscribe())); .run_background(Some(self.inner.worker_shutdown_tx.lock().await.subscribe()));
self.inner
.streaming_task_handler
.lock()
.await
.replace(handle);
self.inner
.flow_service
.dual_engine
.start_flow_consistent_check_task()
.await?;
Ok(()) Ok(())
} }
@@ -176,6 +199,11 @@ impl FlownodeServer {
if tx.send(()).is_err() { if tx.send(()).is_err() {
info!("Receiver dropped, the flow node server has already shutdown"); info!("Receiver dropped, the flow node server has already shutdown");
} }
self.inner
.flow_service
.dual_engine
.stop_flow_consistent_check_task()
.await?;
Ok(()) Ok(())
} }
} }
@@ -272,8 +300,8 @@ impl FlownodeInstance {
&self.flownode_server &self.flownode_server
} }
pub fn flow_worker_manager(&self) -> FlowWorkerManagerRef { pub fn flow_engine(&self) -> FlowDualEngineRef {
self.flownode_server.inner.flow_service.manager.clone() self.flownode_server.inner.flow_service.dual_engine.clone()
} }
pub fn setup_services(&mut self, services: ServerHandlers) { pub fn setup_services(&mut self, services: ServerHandlers) {
@@ -342,12 +370,21 @@ impl FlownodeBuilder {
self.build_manager(query_engine_factory.query_engine()) self.build_manager(query_engine_factory.query_engine())
.await?, .await?,
); );
let batching = Arc::new(BatchingEngine::new(
self.frontend_client.clone(),
query_engine_factory.query_engine(),
self.flow_metadata_manager.clone(),
self.table_meta.clone(),
self.catalog_manager.clone(),
));
let dual = FlowDualEngine::new(
manager.clone(),
batching,
self.flow_metadata_manager.clone(),
self.catalog_manager.clone(),
);
if let Err(err) = self.recover_flows(&manager).await { let server = FlownodeServer::new(FlowService::new(Arc::new(dual)));
common_telemetry::error!(err; "Failed to recover flows");
}
let server = FlownodeServer::new(FlowService::new(manager.clone()));
let heartbeat_task = self.heartbeat_task; let heartbeat_task = self.heartbeat_task;
@@ -364,7 +401,7 @@ impl FlownodeBuilder {
/// or recover all existing flow tasks if in standalone mode(nodeid is None) /// or recover all existing flow tasks if in standalone mode(nodeid is None)
/// ///
/// TODO(discord9): persistent flow tasks with internal state /// TODO(discord9): persistent flow tasks with internal state
async fn recover_flows(&self, manager: &FlowWorkerManagerRef) -> Result<usize, Error> { async fn recover_flows(&self, manager: &FlowDualEngine) -> Result<usize, Error> {
let nodeid = self.opts.node_id; let nodeid = self.opts.node_id;
let to_be_recovered: Vec<_> = if let Some(nodeid) = nodeid { let to_be_recovered: Vec<_> = if let Some(nodeid) = nodeid {
let to_be_recover = self let to_be_recover = self
@@ -436,7 +473,7 @@ impl FlownodeBuilder {
), ),
}; };
manager manager
.create_flow_inner(args) .create_flow(args)
.await .await
.map_err(BoxedError::new) .map_err(BoxedError::new)
.with_context(|_| CreateFlowSnafu { .with_context(|_| CreateFlowSnafu {
@@ -452,7 +489,7 @@ impl FlownodeBuilder {
async fn build_manager( async fn build_manager(
&mut self, &mut self,
query_engine: Arc<dyn QueryEngine>, query_engine: Arc<dyn QueryEngine>,
) -> Result<FlowWorkerManager, Error> { ) -> Result<FlowStreamingEngine, Error> {
let table_meta = self.table_meta.clone(); let table_meta = self.table_meta.clone();
register_function_to_query_engine(&query_engine); register_function_to_query_engine(&query_engine);
@@ -461,7 +498,7 @@ impl FlownodeBuilder {
let node_id = self.opts.node_id.map(|id| id as u32); let node_id = self.opts.node_id.map(|id| id as u32);
let mut man = FlowWorkerManager::new(node_id, query_engine, table_meta); let mut man = FlowStreamingEngine::new(node_id, query_engine, table_meta);
for worker_id in 0..num_workers { for worker_id in 0..num_workers {
let (tx, rx) = oneshot::channel(); let (tx, rx) = oneshot::channel();
@@ -543,6 +580,10 @@ impl<'a> FlownodeServiceBuilder<'a> {
} }
} }
/// Basically a tiny frontend that communicates with datanode, different from [`FrontendClient`] which
/// connect to a real frontend instead, this is used for flow's streaming engine. And is for simple query.
///
/// For heavy query use [`FrontendClient`] which offload computation to frontend, lifting the load from flownode
#[derive(Clone)] #[derive(Clone)]
pub struct FrontendInvoker { pub struct FrontendInvoker {
inserter: Arc<Inserter>, inserter: Arc<Inserter>,

View File

@@ -15,6 +15,7 @@ api.workspace = true
arc-swap = "1.0" arc-swap = "1.0"
async-trait.workspace = true async-trait.workspace = true
auth.workspace = true auth.workspace = true
bytes.workspace = true
cache.workspace = true cache.workspace = true
catalog.workspace = true catalog.workspace = true
client.workspace = true client.workspace = true

View File

@@ -19,6 +19,8 @@ use common_error::define_into_tonic_status;
use common_error::ext::{BoxedError, ErrorExt}; use common_error::ext::{BoxedError, ErrorExt};
use common_error::status_code::StatusCode; use common_error::status_code::StatusCode;
use common_macro::stack_trace_debug; use common_macro::stack_trace_debug;
use common_query::error::datafusion_status_code;
use datafusion::error::DataFusionError;
use session::ReadPreference; use session::ReadPreference;
use snafu::{Location, Snafu}; use snafu::{Location, Snafu};
use store_api::storage::RegionId; use store_api::storage::RegionId;
@@ -345,7 +347,15 @@ pub enum Error {
SubstraitDecodeLogicalPlan { SubstraitDecodeLogicalPlan {
#[snafu(implicit)] #[snafu(implicit)]
location: Location, location: Location,
source: substrait::error::Error, source: common_query::error::Error,
},
#[snafu(display("DataFusionError"))]
DataFusion {
#[snafu(source)]
error: DataFusionError,
#[snafu(implicit)]
location: Location,
}, },
} }
@@ -423,6 +433,8 @@ impl ErrorExt for Error {
Error::TableOperation { source, .. } => source.status_code(), Error::TableOperation { source, .. } => source.status_code(),
Error::InFlightWriteBytesExceeded { .. } => StatusCode::RateLimited, Error::InFlightWriteBytesExceeded { .. } => StatusCode::RateLimited,
Error::DataFusion { error, .. } => datafusion_status_code::<Self>(error, None),
} }
} }

View File

@@ -278,7 +278,7 @@ impl SqlQueryHandler for Instance {
// plan should be prepared before exec // plan should be prepared before exec
// we'll do check there // we'll do check there
self.query_engine self.query_engine
.execute(plan, query_ctx) .execute(plan.clone(), query_ctx)
.await .await
.context(ExecLogicalPlanSnafu) .context(ExecLogicalPlanSnafu)
} }

View File

@@ -12,29 +12,33 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
use std::sync::Arc;
use api::v1::ddl_request::{Expr as DdlExpr, Expr}; use api::v1::ddl_request::{Expr as DdlExpr, Expr};
use api::v1::greptime_request::Request; use api::v1::greptime_request::Request;
use api::v1::query_request::Query; use api::v1::query_request::Query;
use api::v1::{DeleteRequests, DropFlowExpr, InsertRequests, RowDeleteRequests, RowInsertRequests}; use api::v1::{
DeleteRequests, DropFlowExpr, InsertIntoPlan, InsertRequests, RowDeleteRequests,
RowInsertRequests,
};
use async_trait::async_trait; use async_trait::async_trait;
use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq}; use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq};
use common_base::AffectedRows; use common_base::AffectedRows;
use common_query::logical_plan::add_insert_to_logical_plan;
use common_query::Output; use common_query::Output;
use common_telemetry::tracing::{self}; use common_telemetry::tracing::{self};
use datafusion::execution::SessionStateBuilder;
use query::parser::PromQuery; use query::parser::PromQuery;
use servers::interceptor::{GrpcQueryInterceptor, GrpcQueryInterceptorRef}; use servers::interceptor::{GrpcQueryInterceptor, GrpcQueryInterceptorRef};
use servers::query_handler::grpc::{GrpcQueryHandler, RawRecordBatch}; use servers::query_handler::grpc::{GrpcQueryHandler, RawRecordBatch};
use servers::query_handler::sql::SqlQueryHandler; use servers::query_handler::sql::SqlQueryHandler;
use session::context::QueryContextRef; use session::context::QueryContextRef;
use snafu::{ensure, OptionExt, ResultExt}; use snafu::{ensure, OptionExt, ResultExt};
use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan};
use table::table_name::TableName; use table::table_name::TableName;
use crate::error::{ use crate::error::{
CatalogSnafu, Error, InFlightWriteBytesExceededSnafu, IncompleteGrpcRequestSnafu, CatalogSnafu, DataFusionSnafu, Error, InFlightWriteBytesExceededSnafu,
NotSupportedSnafu, PermissionSnafu, Result, SubstraitDecodeLogicalPlanSnafu, IncompleteGrpcRequestSnafu, NotSupportedSnafu, PermissionSnafu, PlanStatementSnafu, Result,
TableNotFoundSnafu, TableOperationSnafu, SubstraitDecodeLogicalPlanSnafu, TableNotFoundSnafu, TableOperationSnafu,
}; };
use crate::instance::{attach_timer, Instance}; use crate::instance::{attach_timer, Instance};
use crate::metrics::{ use crate::metrics::{
@@ -91,14 +95,31 @@ impl GrpcQueryHandler for Instance {
Query::LogicalPlan(plan) => { Query::LogicalPlan(plan) => {
// this path is useful internally when flownode needs to execute a logical plan through gRPC interface // this path is useful internally when flownode needs to execute a logical plan through gRPC interface
let timer = GRPC_HANDLE_PLAN_ELAPSED.start_timer(); let timer = GRPC_HANDLE_PLAN_ELAPSED.start_timer();
let plan = DFLogicalSubstraitConvertor {}
.decode(&*plan, SessionStateBuilder::default().build()) // use dummy catalog to provide table
let plan_decoder = self
.query_engine()
.engine_context(ctx.clone())
.new_plan_decoder()
.context(PlanStatementSnafu)?;
let dummy_catalog_list =
Arc::new(catalog::table_source::dummy_catalog::DummyCatalogList::new(
self.catalog_manager().clone(),
));
let logical_plan = plan_decoder
.decode(bytes::Bytes::from(plan), dummy_catalog_list, true)
.await .await
.context(SubstraitDecodeLogicalPlanSnafu)?; .context(SubstraitDecodeLogicalPlanSnafu)?;
let output = SqlQueryHandler::do_exec_plan(self, plan, ctx.clone()).await?; let output =
SqlQueryHandler::do_exec_plan(self, logical_plan, ctx.clone()).await?;
attach_timer(output, timer) attach_timer(output, timer)
} }
Query::InsertIntoPlan(insert) => {
self.handle_insert_plan(insert, ctx.clone()).await?
}
Query::PromRangeQuery(promql) => { Query::PromRangeQuery(promql) => {
let timer = GRPC_HANDLE_PROMQL_ELAPSED.start_timer(); let timer = GRPC_HANDLE_PROMQL_ELAPSED.start_timer();
let prom_query = PromQuery { let prom_query = PromQuery {
@@ -284,6 +305,91 @@ fn fill_catalog_and_schema_from_context(ddl_expr: &mut DdlExpr, ctx: &QueryConte
} }
impl Instance { impl Instance {
async fn handle_insert_plan(
&self,
insert: InsertIntoPlan,
ctx: QueryContextRef,
) -> Result<Output> {
let timer = GRPC_HANDLE_PLAN_ELAPSED.start_timer();
let table_name = insert.table_name.context(IncompleteGrpcRequestSnafu {
err_msg: "'table_name' is absent in InsertIntoPlan",
})?;
// use dummy catalog to provide table
let plan_decoder = self
.query_engine()
.engine_context(ctx.clone())
.new_plan_decoder()
.context(PlanStatementSnafu)?;
let dummy_catalog_list =
Arc::new(catalog::table_source::dummy_catalog::DummyCatalogList::new(
self.catalog_manager().clone(),
));
// no optimize yet since we still need to add stuff
let logical_plan = plan_decoder
.decode(
bytes::Bytes::from(insert.logical_plan),
dummy_catalog_list,
false,
)
.await
.context(SubstraitDecodeLogicalPlanSnafu)?;
let table = self
.catalog_manager()
.table(
&table_name.catalog_name,
&table_name.schema_name,
&table_name.table_name,
None,
)
.await
.context(CatalogSnafu)?
.with_context(|| TableNotFoundSnafu {
table_name: [
table_name.catalog_name.clone(),
table_name.schema_name.clone(),
table_name.table_name.clone(),
]
.join("."),
})?;
let table_info = table.table_info();
let df_schema = Arc::new(
table_info
.meta
.schema
.arrow_schema()
.clone()
.try_into()
.context(DataFusionSnafu)?,
);
let insert_into = add_insert_to_logical_plan(table_name, df_schema, logical_plan)
.context(SubstraitDecodeLogicalPlanSnafu)?;
let engine_ctx = self.query_engine().engine_context(ctx.clone());
let state = engine_ctx.state();
// Analyze the plan
let analyzed_plan = state
.analyzer()
.execute_and_check(insert_into, state.config_options(), |_, _| {})
.context(common_query::error::GeneralDataFusionSnafu)
.context(SubstraitDecodeLogicalPlanSnafu)?;
// Optimize the plan
let optimized_plan = state
.optimize(&analyzed_plan)
.context(common_query::error::GeneralDataFusionSnafu)
.context(SubstraitDecodeLogicalPlanSnafu)?;
let output = SqlQueryHandler::do_exec_plan(self, optimized_plan, ctx.clone()).await?;
Ok(attach_timer(output, timer))
}
#[tracing::instrument(skip_all)] #[tracing::instrument(skip_all)]
pub async fn handle_inserts( pub async fn handle_inserts(
&self, &self,

View File

@@ -26,6 +26,7 @@ use common_catalog::consts::{is_readonly_schema, DEFAULT_CATALOG_NAME, DEFAULT_S
use common_catalog::{format_full_flow_name, format_full_table_name}; use common_catalog::{format_full_flow_name, format_full_table_name};
use common_error::ext::BoxedError; use common_error::ext::BoxedError;
use common_meta::cache_invalidator::Context; use common_meta::cache_invalidator::Context;
use common_meta::ddl::create_flow::FlowType;
use common_meta::ddl::ExecutorContext; use common_meta::ddl::ExecutorContext;
use common_meta::instruction::CacheIdent; use common_meta::instruction::CacheIdent;
use common_meta::key::schema_name::{SchemaName, SchemaNameKey}; use common_meta::key::schema_name::{SchemaName, SchemaNameKey};
@@ -38,6 +39,8 @@ use common_meta::rpc::router::{Partition, Partition as MetaPartition};
use common_query::Output; use common_query::Output;
use common_telemetry::{debug, info, tracing}; use common_telemetry::{debug, info, tracing};
use common_time::Timezone; use common_time::Timezone;
use datafusion_common::tree_node::TreeNodeVisitor;
use datafusion_expr::LogicalPlan;
use datatypes::prelude::ConcreteDataType; use datatypes::prelude::ConcreteDataType;
use datatypes::schema::{RawSchema, Schema}; use datatypes::schema::{RawSchema, Schema};
use datatypes::value::Value; use datatypes::value::Value;
@@ -45,7 +48,7 @@ use lazy_static::lazy_static;
use partition::expr::{Operand, PartitionExpr, RestrictedOp}; use partition::expr::{Operand, PartitionExpr, RestrictedOp};
use partition::multi_dim::MultiDimPartitionRule; use partition::multi_dim::MultiDimPartitionRule;
use partition::partition::{PartitionBound, PartitionDef}; use partition::partition::{PartitionBound, PartitionDef};
use query::parser::QueryStatement; use query::parser::{QueryLanguageParser, QueryStatement};
use query::plan::extract_and_rewrite_full_table_names; use query::plan::extract_and_rewrite_full_table_names;
use query::query_engine::DefaultSerializer; use query::query_engine::DefaultSerializer;
use query::sql::create_table_stmt; use query::sql::create_table_stmt;
@@ -69,13 +72,14 @@ use table::table_name::TableName;
use table::TableRef; use table::TableRef;
use crate::error::{ use crate::error::{
self, AlterExprToRequestSnafu, CatalogSnafu, ColumnDataTypeSnafu, ColumnNotFoundSnafu, self, AlterExprToRequestSnafu, BuildDfLogicalPlanSnafu, CatalogSnafu, ColumnDataTypeSnafu,
ConvertSchemaSnafu, CreateLogicalTablesSnafu, CreateTableInfoSnafu, DeserializePartitionSnafu, ColumnNotFoundSnafu, ConvertSchemaSnafu, CreateLogicalTablesSnafu, CreateTableInfoSnafu,
EmptyDdlExprSnafu, ExtractTableNamesSnafu, FlowNotFoundSnafu, InvalidPartitionRuleSnafu, DeserializePartitionSnafu, EmptyDdlExprSnafu, ExternalSnafu, ExtractTableNamesSnafu,
InvalidPartitionSnafu, InvalidSqlSnafu, InvalidTableNameSnafu, InvalidViewNameSnafu, FlowNotFoundSnafu, InvalidPartitionRuleSnafu, InvalidPartitionSnafu, InvalidSqlSnafu,
InvalidViewStmtSnafu, ParseSqlValueSnafu, Result, SchemaInUseSnafu, SchemaNotFoundSnafu, InvalidTableNameSnafu, InvalidViewNameSnafu, InvalidViewStmtSnafu, ParseSqlValueSnafu, Result,
SchemaReadOnlySnafu, SubstraitCodecSnafu, TableAlreadyExistsSnafu, TableMetadataManagerSnafu, SchemaInUseSnafu, SchemaNotFoundSnafu, SchemaReadOnlySnafu, SubstraitCodecSnafu,
TableNotFoundSnafu, UnrecognizedTableOptionSnafu, ViewAlreadyExistsSnafu, TableAlreadyExistsSnafu, TableMetadataManagerSnafu, TableNotFoundSnafu,
UnrecognizedTableOptionSnafu, ViewAlreadyExistsSnafu,
}; };
use crate::expr_helper; use crate::expr_helper;
use crate::statement::show::create_partitions_stmt; use crate::statement::show::create_partitions_stmt;
@@ -364,6 +368,18 @@ impl StatementExecutor {
expr: CreateFlowExpr, expr: CreateFlowExpr,
query_context: QueryContextRef, query_context: QueryContextRef,
) -> Result<SubmitDdlTaskResponse> { ) -> Result<SubmitDdlTaskResponse> {
let flow_type = self
.determine_flow_type(&expr.sql, query_context.clone())
.await?;
info!("determined flow={} type: {:#?}", expr.flow_name, flow_type);
let expr = {
let mut expr = expr;
expr.flow_options
.insert(FlowType::FLOW_TYPE_KEY.to_string(), flow_type.to_string());
expr
};
let task = CreateFlowTask::try_from(PbCreateFlowTask { let task = CreateFlowTask::try_from(PbCreateFlowTask {
create_flow: Some(expr), create_flow: Some(expr),
}) })
@@ -379,6 +395,55 @@ impl StatementExecutor {
.context(error::ExecuteDdlSnafu) .context(error::ExecuteDdlSnafu)
} }
/// Determine the flow type based on the SQL query
///
/// If it contains aggregation or distinct, then it is a batch flow, otherwise it is a streaming flow
async fn determine_flow_type(&self, sql: &str, query_ctx: QueryContextRef) -> Result<FlowType> {
let engine = &self.query_engine;
let stmt = QueryLanguageParser::parse_sql(sql, &query_ctx)
.map_err(BoxedError::new)
.context(ExternalSnafu)?;
let plan = engine
.planner()
.plan(&stmt, query_ctx)
.await
.map_err(BoxedError::new)
.context(ExternalSnafu)?;
/// Visitor to find aggregation or distinct
struct FindAggr {
is_aggr: bool,
}
impl TreeNodeVisitor<'_> for FindAggr {
type Node = LogicalPlan;
fn f_down(
&mut self,
node: &Self::Node,
) -> datafusion_common::Result<datafusion_common::tree_node::TreeNodeRecursion>
{
match node {
LogicalPlan::Aggregate(_) | LogicalPlan::Distinct(_) => {
self.is_aggr = true;
return Ok(datafusion_common::tree_node::TreeNodeRecursion::Stop);
}
_ => (),
}
Ok(datafusion_common::tree_node::TreeNodeRecursion::Continue)
}
}
let mut find_aggr = FindAggr { is_aggr: false };
plan.visit_with_subqueries(&mut find_aggr)
.context(BuildDfLogicalPlanSnafu)?;
if find_aggr.is_aggr {
Ok(FlowType::Batching)
} else {
Ok(FlowType::Streaming)
}
}
#[tracing::instrument(skip_all)] #[tracing::instrument(skip_all)]
pub async fn create_view( pub async fn create_view(
&self, &self,

View File

@@ -132,7 +132,7 @@ impl GrpcQueryHandler for DummyInstance {
); );
result.remove(0)? result.remove(0)?
} }
Query::LogicalPlan(_) => unimplemented!(), Query::LogicalPlan(_) | Query::InsertIntoPlan(_) => unimplemented!(),
Query::PromRangeQuery(promql) => { Query::PromRangeQuery(promql) => {
let prom_query = PromQuery { let prom_query = PromQuery {
query: promql.query, query: promql.query,

View File

@@ -41,7 +41,7 @@ use common_procedure::options::ProcedureConfig;
use common_procedure::ProcedureManagerRef; use common_procedure::ProcedureManagerRef;
use common_wal::config::{DatanodeWalConfig, MetasrvWalConfig}; use common_wal::config::{DatanodeWalConfig, MetasrvWalConfig};
use datanode::datanode::DatanodeBuilder; use datanode::datanode::DatanodeBuilder;
use flow::{FlownodeBuilder, FrontendClient}; use flow::{FlownodeBuilder, FrontendClient, GrpcQueryHandlerWithBoxedError};
use frontend::frontend::Frontend; use frontend::frontend::Frontend;
use frontend::instance::builder::FrontendBuilder; use frontend::instance::builder::FrontendBuilder;
use frontend::instance::{Instance, StandaloneDatanodeManager}; use frontend::instance::{Instance, StandaloneDatanodeManager};
@@ -174,8 +174,8 @@ impl GreptimeDbStandaloneBuilder {
Some(procedure_manager.clone()), Some(procedure_manager.clone()),
); );
let fe_server_addr = opts.frontend_options().grpc.bind_addr.clone(); let (frontend_client, frontend_instance_handler) =
let frontend_client = FrontendClient::from_static_grpc_addr(fe_server_addr); FrontendClient::from_empty_grpc_handler();
let flow_builder = FlownodeBuilder::new( let flow_builder = FlownodeBuilder::new(
Default::default(), Default::default(),
plugins.clone(), plugins.clone(),
@@ -188,7 +188,7 @@ impl GreptimeDbStandaloneBuilder {
let node_manager = Arc::new(StandaloneDatanodeManager { let node_manager = Arc::new(StandaloneDatanodeManager {
region_server: datanode.region_server(), region_server: datanode.region_server(),
flow_server: flownode.flow_worker_manager(), flow_server: flownode.flow_engine(),
}); });
let table_id_sequence = Arc::new( let table_id_sequence = Arc::new(
@@ -250,7 +250,15 @@ impl GreptimeDbStandaloneBuilder {
.unwrap(); .unwrap();
let instance = Arc::new(instance); let instance = Arc::new(instance);
let flow_worker_manager = flownode.flow_worker_manager(); // set the frontend client for flownode
let grpc_handler = instance.clone() as Arc<dyn GrpcQueryHandlerWithBoxedError>;
let weak_grpc_handler = Arc::downgrade(&grpc_handler);
frontend_instance_handler
.lock()
.unwrap()
.replace(weak_grpc_handler);
let flow_worker_manager = flownode.flow_engine().streaming_engine();
let invoker = flow::FrontendInvoker::build_from( let invoker = flow::FrontendInvoker::build_from(
flow_worker_manager.clone(), flow_worker_manager.clone(),
catalog_manager.clone(), catalog_manager.clone(),

View File

@@ -8,6 +8,20 @@ CREATE TABLE distinct_basic (
Affected Rows: 0 Affected Rows: 0
-- should fail
-- SQLNESS REPLACE id=\d+ id=REDACTED
CREATE FLOW test_distinct_basic SINK TO out_distinct_basic AS
SELECT
DISTINCT number as dis
FROM
distinct_basic;
Error: 3001(EngineExecuteQuery), Unsupported: Source table `greptime.public.distinct_basic`(id=REDACTED) has instant TTL, Instant TTL is not supported under batching mode. Consider using a TTL longer than flush interval
ALTER TABLE distinct_basic SET 'ttl' = '5s';
Affected Rows: 0
CREATE FLOW test_distinct_basic SINK TO out_distinct_basic AS CREATE FLOW test_distinct_basic SINK TO out_distinct_basic AS
SELECT SELECT
DISTINCT number as dis DISTINCT number as dis
@@ -24,7 +38,7 @@ VALUES
(20, "2021-07-01 00:00:00.200"), (20, "2021-07-01 00:00:00.200"),
(22, "2021-07-01 00:00:00.600"); (22, "2021-07-01 00:00:00.600");
Affected Rows: 0 Affected Rows: 3
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | -- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('test_distinct_basic'); ADMIN FLUSH_FLOW('test_distinct_basic');
@@ -49,7 +63,7 @@ SHOW CREATE TABLE distinct_basic;
| | | | | |
| | ENGINE=mito | | | ENGINE=mito |
| | WITH( | | | WITH( |
| | ttl = 'instant' | | | ttl = '5s' |
| | ) | | | ) |
+----------------+-----------------------------------------------------------+ +----------------+-----------------------------------------------------------+
@@ -84,8 +98,93 @@ FROM
SELECT number FROM distinct_basic; SELECT number FROM distinct_basic;
++ +--------+
++ | number |
+--------+
| 20 |
| 22 |
+--------+
-- SQLNESS SLEEP 6s
ADMIN FLUSH_TABLE('distinct_basic');
+-------------------------------------+
| ADMIN FLUSH_TABLE('distinct_basic') |
+-------------------------------------+
| 0 |
+-------------------------------------+
INSERT INTO
distinct_basic
VALUES
(23, "2021-07-01 00:00:01.600");
Affected Rows: 1
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('test_distinct_basic');
+-----------------------------------------+
| ADMIN FLUSH_FLOW('test_distinct_basic') |
+-----------------------------------------+
| FLOW_FLUSHED |
+-----------------------------------------+
SHOW CREATE TABLE distinct_basic;
+----------------+-----------------------------------------------------------+
| Table | Create Table |
+----------------+-----------------------------------------------------------+
| distinct_basic | CREATE TABLE IF NOT EXISTS "distinct_basic" ( |
| | "number" INT NULL, |
| | "ts" TIMESTAMP(3) NOT NULL DEFAULT current_timestamp(), |
| | TIME INDEX ("ts"), |
| | PRIMARY KEY ("number") |
| | ) |
| | |
| | ENGINE=mito |
| | WITH( |
| | ttl = '5s' |
| | ) |
+----------------+-----------------------------------------------------------+
SHOW CREATE TABLE out_distinct_basic;
+--------------------+---------------------------------------------------+
| Table | Create Table |
+--------------------+---------------------------------------------------+
| out_distinct_basic | CREATE TABLE IF NOT EXISTS "out_distinct_basic" ( |
| | "dis" INT NULL, |
| | "update_at" TIMESTAMP(3) NULL, |
| | "__ts_placeholder" TIMESTAMP(3) NOT NULL, |
| | TIME INDEX ("__ts_placeholder"), |
| | PRIMARY KEY ("dis") |
| | ) |
| | |
| | ENGINE=mito |
| | |
+--------------------+---------------------------------------------------+
SELECT
dis
FROM
out_distinct_basic;
+-----+
| dis |
+-----+
| 20 |
| 22 |
| 23 |
+-----+
SELECT number FROM distinct_basic;
+--------+
| number |
+--------+
| 23 |
+--------+
DROP FLOW test_distinct_basic; DROP FLOW test_distinct_basic;

View File

@@ -6,6 +6,16 @@ CREATE TABLE distinct_basic (
TIME INDEX(ts) TIME INDEX(ts)
)WITH ('ttl' = 'instant'); )WITH ('ttl' = 'instant');
-- should fail
-- SQLNESS REPLACE id=\d+ id=REDACTED
CREATE FLOW test_distinct_basic SINK TO out_distinct_basic AS
SELECT
DISTINCT number as dis
FROM
distinct_basic;
ALTER TABLE distinct_basic SET 'ttl' = '5s';
CREATE FLOW test_distinct_basic SINK TO out_distinct_basic AS CREATE FLOW test_distinct_basic SINK TO out_distinct_basic AS
SELECT SELECT
DISTINCT number as dis DISTINCT number as dis
@@ -34,6 +44,28 @@ FROM
SELECT number FROM distinct_basic; SELECT number FROM distinct_basic;
-- SQLNESS SLEEP 6s
ADMIN FLUSH_TABLE('distinct_basic');
INSERT INTO
distinct_basic
VALUES
(23, "2021-07-01 00:00:01.600");
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('test_distinct_basic');
SHOW CREATE TABLE distinct_basic;
SHOW CREATE TABLE out_distinct_basic;
SELECT
dis
FROM
out_distinct_basic;
SELECT number FROM distinct_basic;
DROP FLOW test_distinct_basic; DROP FLOW test_distinct_basic;
DROP TABLE distinct_basic; DROP TABLE distinct_basic;
DROP TABLE out_distinct_basic; DROP TABLE out_distinct_basic;

View File

@@ -9,11 +9,12 @@ Affected Rows: 0
CREATE FLOW test_numbers_basic SINK TO out_num_cnt_basic AS CREATE FLOW test_numbers_basic SINK TO out_num_cnt_basic AS
SELECT SELECT
sum(number) sum(number),
date_bin(INTERVAL '1 second', ts, '2021-07-01 00:00:00') as time_window
FROM FROM
numbers_input_basic numbers_input_basic
GROUP BY GROUP BY
tumble(ts, '1 second', '2021-07-01 00:00:00'); time_window;
Affected Rows: 0 Affected Rows: 0
@@ -24,11 +25,9 @@ SHOW CREATE TABLE out_num_cnt_basic;
+-------------------+--------------------------------------------------+ +-------------------+--------------------------------------------------+
| out_num_cnt_basic | CREATE TABLE IF NOT EXISTS "out_num_cnt_basic" ( | | out_num_cnt_basic | CREATE TABLE IF NOT EXISTS "out_num_cnt_basic" ( |
| | "sum(numbers_input_basic.number)" BIGINT NULL, | | | "sum(numbers_input_basic.number)" BIGINT NULL, |
| | "window_start" TIMESTAMP(3) NOT NULL, | | | "time_window" TIMESTAMP(9) NOT NULL, |
| | "window_end" TIMESTAMP(3) NULL, |
| | "update_at" TIMESTAMP(3) NULL, | | | "update_at" TIMESTAMP(3) NULL, |
| | TIME INDEX ("window_start"), | | | TIME INDEX ("time_window") |
| | PRIMARY KEY ("window_end") |
| | ) | | | ) |
| | | | | |
| | ENGINE=mito | | | ENGINE=mito |
@@ -52,11 +51,9 @@ SHOW CREATE TABLE out_num_cnt_basic;
+-------------------+--------------------------------------------------+ +-------------------+--------------------------------------------------+
| out_num_cnt_basic | CREATE TABLE IF NOT EXISTS "out_num_cnt_basic" ( | | out_num_cnt_basic | CREATE TABLE IF NOT EXISTS "out_num_cnt_basic" ( |
| | "sum(numbers_input_basic.number)" BIGINT NULL, | | | "sum(numbers_input_basic.number)" BIGINT NULL, |
| | "window_start" TIMESTAMP(3) NOT NULL, | | | "time_window" TIMESTAMP(9) NOT NULL, |
| | "window_end" TIMESTAMP(3) NULL, |
| | "update_at" TIMESTAMP(3) NULL, | | | "update_at" TIMESTAMP(3) NULL, |
| | TIME INDEX ("window_start"), | | | TIME INDEX ("time_window") |
| | PRIMARY KEY ("window_end") |
| | ) | | | ) |
| | | | | |
| | ENGINE=mito | | | ENGINE=mito |
@@ -65,13 +62,13 @@ SHOW CREATE TABLE out_num_cnt_basic;
SHOW CREATE FLOW test_numbers_basic; SHOW CREATE FLOW test_numbers_basic;
+--------------------+-------------------------------------------------------------------------------------------------------+ +--------------------+----------------------------------------------------------------------------------------------------------------------------------------------+
| Flow | Create Flow | | Flow | Create Flow |
+--------------------+-------------------------------------------------------------------------------------------------------+ +--------------------+----------------------------------------------------------------------------------------------------------------------------------------------+
| test_numbers_basic | CREATE FLOW IF NOT EXISTS test_numbers_basic | | test_numbers_basic | CREATE FLOW IF NOT EXISTS test_numbers_basic |
| | SINK TO out_num_cnt_basic | | | SINK TO out_num_cnt_basic |
| | AS SELECT sum(number) FROM numbers_input_basic GROUP BY tumble(ts, '1 second', '2021-07-01 00:00:00') | | | AS SELECT sum(number), date_bin(INTERVAL '1 second', ts, '2021-07-01 00:00:00') AS time_window FROM numbers_input_basic GROUP BY time_window |
+--------------------+-------------------------------------------------------------------------------------------------------+ +--------------------+----------------------------------------------------------------------------------------------------------------------------------------------+
DROP FLOW test_numbers_basic; DROP FLOW test_numbers_basic;

View File

@@ -7,11 +7,12 @@ CREATE TABLE numbers_input_basic (
CREATE FLOW test_numbers_basic SINK TO out_num_cnt_basic AS CREATE FLOW test_numbers_basic SINK TO out_num_cnt_basic AS
SELECT SELECT
sum(number) sum(number),
date_bin(INTERVAL '1 second', ts, '2021-07-01 00:00:00') as time_window
FROM FROM
numbers_input_basic numbers_input_basic
GROUP BY GROUP BY
tumble(ts, '1 second', '2021-07-01 00:00:00'); time_window;
SHOW CREATE TABLE out_num_cnt_basic; SHOW CREATE TABLE out_num_cnt_basic;

View File

@@ -9,11 +9,12 @@ Affected Rows: 0
CREATE FLOW test_numbers_basic SINK TO out_num_cnt_basic AS CREATE FLOW test_numbers_basic SINK TO out_num_cnt_basic AS
SELECT SELECT
sum(number) sum(number),
date_bin(INTERVAL '1 second', ts, '2021-07-01 00:00:00') as time_window
FROM FROM
numbers_input_basic numbers_input_basic
GROUP BY GROUP BY
tumble(ts, '1 second', '2021-07-01 00:00:00'); time_window;
Affected Rows: 0 Affected Rows: 0
@@ -24,11 +25,9 @@ SHOW CREATE TABLE out_num_cnt_basic;
+-------------------+--------------------------------------------------+ +-------------------+--------------------------------------------------+
| out_num_cnt_basic | CREATE TABLE IF NOT EXISTS "out_num_cnt_basic" ( | | out_num_cnt_basic | CREATE TABLE IF NOT EXISTS "out_num_cnt_basic" ( |
| | "sum(numbers_input_basic.number)" BIGINT NULL, | | | "sum(numbers_input_basic.number)" BIGINT NULL, |
| | "window_start" TIMESTAMP(3) NOT NULL, | | | "time_window" TIMESTAMP(9) NOT NULL, |
| | "window_end" TIMESTAMP(3) NULL, |
| | "update_at" TIMESTAMP(3) NULL, | | | "update_at" TIMESTAMP(3) NULL, |
| | TIME INDEX ("window_start"), | | | TIME INDEX ("time_window") |
| | PRIMARY KEY ("window_end") |
| | ) | | | ) |
| | | | | |
| | ENGINE=mito | | | ENGINE=mito |
@@ -53,11 +52,9 @@ SHOW CREATE TABLE out_num_cnt_basic;
+-------------------+--------------------------------------------------+ +-------------------+--------------------------------------------------+
| out_num_cnt_basic | CREATE TABLE IF NOT EXISTS "out_num_cnt_basic" ( | | out_num_cnt_basic | CREATE TABLE IF NOT EXISTS "out_num_cnt_basic" ( |
| | "sum(numbers_input_basic.number)" BIGINT NULL, | | | "sum(numbers_input_basic.number)" BIGINT NULL, |
| | "window_start" TIMESTAMP(3) NOT NULL, | | | "time_window" TIMESTAMP(9) NOT NULL, |
| | "window_end" TIMESTAMP(3) NULL, |
| | "update_at" TIMESTAMP(3) NULL, | | | "update_at" TIMESTAMP(3) NULL, |
| | TIME INDEX ("window_start"), | | | TIME INDEX ("time_window") |
| | PRIMARY KEY ("window_end") |
| | ) | | | ) |
| | | | | |
| | ENGINE=mito | | | ENGINE=mito |
@@ -84,16 +81,15 @@ ADMIN FLUSH_FLOW('test_numbers_basic');
SELECT SELECT
"sum(numbers_input_basic.number)", "sum(numbers_input_basic.number)",
window_start, time_window
window_end
FROM FROM
out_num_cnt_basic; out_num_cnt_basic;
+---------------------------------+---------------------+---------------------+ +---------------------------------+---------------------+
| sum(numbers_input_basic.number) | window_start | window_end | | sum(numbers_input_basic.number) | time_window |
+---------------------------------+---------------------+---------------------+ +---------------------------------+---------------------+
| 42 | 2021-07-01T00:00:00 | 2021-07-01T00:00:01 | | 42 | 2021-07-01T00:00:00 |
+---------------------------------+---------------------+---------------------+ +---------------------------------+---------------------+
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | -- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('test_numbers_basic'); ADMIN FLUSH_FLOW('test_numbers_basic');
@@ -124,17 +120,16 @@ ADMIN FLUSH_FLOW('test_numbers_basic');
-- note that this quote-unquote column is a column-name, **not** a aggregation expr, generated by datafusion -- note that this quote-unquote column is a column-name, **not** a aggregation expr, generated by datafusion
SELECT SELECT
"sum(numbers_input_basic.number)", "sum(numbers_input_basic.number)",
window_start, time_window
window_end
FROM FROM
out_num_cnt_basic; out_num_cnt_basic;
+---------------------------------+---------------------+---------------------+ +---------------------------------+---------------------+
| sum(numbers_input_basic.number) | window_start | window_end | | sum(numbers_input_basic.number) | time_window |
+---------------------------------+---------------------+---------------------+ +---------------------------------+---------------------+
| 42 | 2021-07-01T00:00:00 | 2021-07-01T00:00:01 | | 42 | 2021-07-01T00:00:00 |
| 47 | 2021-07-01T00:00:01 | 2021-07-01T00:00:02 | | 47 | 2021-07-01T00:00:01 |
+---------------------------------+---------------------+---------------------+ +---------------------------------+---------------------+
DROP FLOW test_numbers_basic; DROP FLOW test_numbers_basic;
@@ -896,6 +891,8 @@ CREATE TABLE temp_sensor_data (
loc STRING, loc STRING,
temperature DOUBLE, temperature DOUBLE,
ts TIMESTAMP TIME INDEX ts TIMESTAMP TIME INDEX
)WITH(
append_mode = 'true'
); );
Affected Rows: 0 Affected Rows: 0
@@ -904,7 +901,8 @@ CREATE TABLE temp_alerts (
sensor_id INT, sensor_id INT,
loc STRING, loc STRING,
max_temp DOUBLE, max_temp DOUBLE,
ts TIMESTAMP TIME INDEX event_ts TIMESTAMP TIME INDEX,
update_at TIMESTAMP
); );
Affected Rows: 0 Affected Rows: 0
@@ -914,6 +912,7 @@ SELECT
sensor_id, sensor_id,
loc, loc,
max(temperature) as max_temp, max(temperature) as max_temp,
max(ts) as event_ts
FROM FROM
temp_sensor_data temp_sensor_data
GROUP BY GROUP BY
@@ -933,8 +932,9 @@ SHOW CREATE TABLE temp_alerts;
| | "sensor_id" INT NULL, | | | "sensor_id" INT NULL, |
| | "loc" STRING NULL, | | | "loc" STRING NULL, |
| | "max_temp" DOUBLE NULL, | | | "max_temp" DOUBLE NULL, |
| | "ts" TIMESTAMP(3) NOT NULL, | | | "event_ts" TIMESTAMP(3) NOT NULL, |
| | TIME INDEX ("ts") | | | "update_at" TIMESTAMP(3) NULL, |
| | TIME INDEX ("event_ts") |
| | ) | | | ) |
| | | | | |
| | ENGINE=mito | | | ENGINE=mito |
@@ -993,15 +993,16 @@ SHOW TABLES LIKE 'temp_alerts';
SELECT SELECT
sensor_id, sensor_id,
loc, loc,
max_temp max_temp,
event_ts
FROM FROM
temp_alerts; temp_alerts;
+-----------+-------+----------+ +-----------+-------+----------+-------------------------+
| sensor_id | loc | max_temp | | sensor_id | loc | max_temp | event_ts |
+-----------+-------+----------+ +-----------+-------+----------+-------------------------+
| 1 | room1 | 150.0 | | 1 | room1 | 150.0 | 1970-01-01T00:00:00.001 |
+-----------+-------+----------+ +-----------+-------+----------+-------------------------+
INSERT INTO INSERT INTO
temp_sensor_data temp_sensor_data
@@ -1022,15 +1023,16 @@ ADMIN FLUSH_FLOW('temp_monitoring');
SELECT SELECT
sensor_id, sensor_id,
loc, loc,
max_temp max_temp,
event_ts
FROM FROM
temp_alerts; temp_alerts;
+-----------+-------+----------+ +-----------+-------+----------+-------------------------+
| sensor_id | loc | max_temp | | sensor_id | loc | max_temp | event_ts |
+-----------+-------+----------+ +-----------+-------+----------+-------------------------+
| 1 | room1 | 150.0 | | 1 | room1 | 150.0 | 1970-01-01T00:00:00.001 |
+-----------+-------+----------+ +-----------+-------+----------+-------------------------+
DROP FLOW temp_monitoring; DROP FLOW temp_monitoring;
@@ -1049,6 +1051,8 @@ CREATE TABLE ngx_access_log (
stat INT, stat INT,
size INT, size INT,
access_time TIMESTAMP TIME INDEX access_time TIMESTAMP TIME INDEX
)WITH(
append_mode = 'true'
); );
Affected Rows: 0 Affected Rows: 0
@@ -1183,6 +1187,8 @@ CREATE TABLE requests (
service_ip STRING, service_ip STRING,
val INT, val INT,
ts TIMESTAMP TIME INDEX ts TIMESTAMP TIME INDEX
)WITH(
append_mode = 'true'
); );
Affected Rows: 0 Affected Rows: 0
@@ -1392,6 +1398,8 @@ CREATE TABLE android_log (
`log` STRING, `log` STRING,
ts TIMESTAMP(9), ts TIMESTAMP(9),
TIME INDEX(ts) TIME INDEX(ts)
)WITH(
append_mode = 'true'
); );
Affected Rows: 0 Affected Rows: 0
@@ -1503,6 +1511,8 @@ CREATE TABLE android_log (
`log` STRING, `log` STRING,
ts TIMESTAMP(9), ts TIMESTAMP(9),
TIME INDEX(ts) TIME INDEX(ts)
)WITH(
append_mode = 'true'
); );
Affected Rows: 0 Affected Rows: 0

View File

@@ -7,11 +7,12 @@ CREATE TABLE numbers_input_basic (
CREATE FLOW test_numbers_basic SINK TO out_num_cnt_basic AS CREATE FLOW test_numbers_basic SINK TO out_num_cnt_basic AS
SELECT SELECT
sum(number) sum(number),
date_bin(INTERVAL '1 second', ts, '2021-07-01 00:00:00') as time_window
FROM FROM
numbers_input_basic numbers_input_basic
GROUP BY GROUP BY
tumble(ts, '1 second', '2021-07-01 00:00:00'); time_window;
SHOW CREATE TABLE out_num_cnt_basic; SHOW CREATE TABLE out_num_cnt_basic;
@@ -34,8 +35,7 @@ ADMIN FLUSH_FLOW('test_numbers_basic');
SELECT SELECT
"sum(numbers_input_basic.number)", "sum(numbers_input_basic.number)",
window_start, time_window
window_end
FROM FROM
out_num_cnt_basic; out_num_cnt_basic;
@@ -54,8 +54,7 @@ ADMIN FLUSH_FLOW('test_numbers_basic');
-- note that this quote-unquote column is a column-name, **not** a aggregation expr, generated by datafusion -- note that this quote-unquote column is a column-name, **not** a aggregation expr, generated by datafusion
SELECT SELECT
"sum(numbers_input_basic.number)", "sum(numbers_input_basic.number)",
window_start, time_window
window_end
FROM FROM
out_num_cnt_basic; out_num_cnt_basic;
@@ -403,13 +402,16 @@ CREATE TABLE temp_sensor_data (
loc STRING, loc STRING,
temperature DOUBLE, temperature DOUBLE,
ts TIMESTAMP TIME INDEX ts TIMESTAMP TIME INDEX
)WITH(
append_mode = 'true'
); );
CREATE TABLE temp_alerts ( CREATE TABLE temp_alerts (
sensor_id INT, sensor_id INT,
loc STRING, loc STRING,
max_temp DOUBLE, max_temp DOUBLE,
ts TIMESTAMP TIME INDEX event_ts TIMESTAMP TIME INDEX,
update_at TIMESTAMP
); );
CREATE FLOW temp_monitoring SINK TO temp_alerts AS CREATE FLOW temp_monitoring SINK TO temp_alerts AS
@@ -417,6 +419,7 @@ SELECT
sensor_id, sensor_id,
loc, loc,
max(temperature) as max_temp, max(temperature) as max_temp,
max(ts) as event_ts
FROM FROM
temp_sensor_data temp_sensor_data
GROUP BY GROUP BY
@@ -451,7 +454,8 @@ SHOW TABLES LIKE 'temp_alerts';
SELECT SELECT
sensor_id, sensor_id,
loc, loc,
max_temp max_temp,
event_ts
FROM FROM
temp_alerts; temp_alerts;
@@ -466,7 +470,8 @@ ADMIN FLUSH_FLOW('temp_monitoring');
SELECT SELECT
sensor_id, sensor_id,
loc, loc,
max_temp max_temp,
event_ts
FROM FROM
temp_alerts; temp_alerts;
@@ -481,6 +486,8 @@ CREATE TABLE ngx_access_log (
stat INT, stat INT,
size INT, size INT,
access_time TIMESTAMP TIME INDEX access_time TIMESTAMP TIME INDEX
)WITH(
append_mode = 'true'
); );
CREATE TABLE ngx_distribution ( CREATE TABLE ngx_distribution (
@@ -555,6 +562,8 @@ CREATE TABLE requests (
service_ip STRING, service_ip STRING,
val INT, val INT,
ts TIMESTAMP TIME INDEX ts TIMESTAMP TIME INDEX
)WITH(
append_mode = 'true'
); );
CREATE TABLE requests_without_ip ( CREATE TABLE requests_without_ip (
@@ -650,6 +659,8 @@ CREATE TABLE android_log (
`log` STRING, `log` STRING,
ts TIMESTAMP(9), ts TIMESTAMP(9),
TIME INDEX(ts) TIME INDEX(ts)
)WITH(
append_mode = 'true'
); );
CREATE TABLE android_log_abnormal ( CREATE TABLE android_log_abnormal (
@@ -704,6 +715,8 @@ CREATE TABLE android_log (
`log` STRING, `log` STRING,
ts TIMESTAMP(9), ts TIMESTAMP(9),
TIME INDEX(ts) TIME INDEX(ts)
)WITH(
append_mode = 'true'
); );
CREATE TABLE android_log_abnormal ( CREATE TABLE android_log_abnormal (

View File

@@ -19,7 +19,9 @@ Affected Rows: 0
CREATE FLOW calc_avg_speed SINK TO avg_speed AS CREATE FLOW calc_avg_speed SINK TO avg_speed AS
SELECT SELECT
avg((left_wheel + right_wheel) / 2) avg((left_wheel + right_wheel) / 2) as avg_speed,
date_bin(INTERVAL '5 second', ts) as start_window,
date_bin(INTERVAL '5 second', ts) + INTERVAL '5 second' as end_window,
FROM FROM
velocity velocity
WHERE WHERE
@@ -28,7 +30,7 @@ WHERE
AND left_wheel < 60 AND left_wheel < 60
AND right_wheel < 60 AND right_wheel < 60
GROUP BY GROUP BY
tumble(ts, '5 second'); start_window;
Affected Rows: 0 Affected Rows: 0

View File

@@ -15,7 +15,9 @@ CREATE TABLE avg_speed (
CREATE FLOW calc_avg_speed SINK TO avg_speed AS CREATE FLOW calc_avg_speed SINK TO avg_speed AS
SELECT SELECT
avg((left_wheel + right_wheel) / 2) avg((left_wheel + right_wheel) / 2) as avg_speed,
date_bin(INTERVAL '5 second', ts) as start_window,
date_bin(INTERVAL '5 second', ts) + INTERVAL '5 second' as end_window,
FROM FROM
velocity velocity
WHERE WHERE
@@ -24,7 +26,7 @@ WHERE
AND left_wheel < 60 AND left_wheel < 60
AND right_wheel < 60 AND right_wheel < 60
GROUP BY GROUP BY
tumble(ts, '5 second'); start_window;
INSERT INTO INSERT INTO
velocity velocity

View File

@@ -11,7 +11,7 @@ Affected Rows: 0
CREATE FLOW test_numbers_df_func CREATE FLOW test_numbers_df_func
SINK TO out_num_cnt_df_func SINK TO out_num_cnt_df_func
AS AS
SELECT sum(abs(number)) FROM numbers_input_df_func GROUP BY tumble(ts, '1 second', '2021-07-01 00:00:00'); SELECT sum(abs(number)), date_bin(INTERVAL '1 second', ts, '2021-07-01 00:00:00') as time_window FROM numbers_input_df_func GROUP BY time_window;
Affected Rows: 0 Affected Rows: 0
@@ -42,13 +42,13 @@ ADMIN FLUSH_FLOW('test_numbers_df_func');
+------------------------------------------+ +------------------------------------------+
-- note that this quote-unquote column is a column-name, **not** a aggregation expr, generated by datafusion -- note that this quote-unquote column is a column-name, **not** a aggregation expr, generated by datafusion
SELECT "sum(abs(numbers_input_df_func.number))", window_start, window_end FROM out_num_cnt_df_func; SELECT "sum(abs(numbers_input_df_func.number))", time_window FROM out_num_cnt_df_func;
+----------------------------------------+---------------------+---------------------+ +----------------------------------------+---------------------+
| sum(abs(numbers_input_df_func.number)) | window_start | window_end | | sum(abs(numbers_input_df_func.number)) | time_window |
+----------------------------------------+---------------------+---------------------+ +----------------------------------------+---------------------+
| 42 | 2021-07-01T00:00:00 | 2021-07-01T00:00:01 | | 42 | 2021-07-01T00:00:00 |
+----------------------------------------+---------------------+---------------------+ +----------------------------------------+---------------------+
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | -- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('test_numbers_df_func'); ADMIN FLUSH_FLOW('test_numbers_df_func');
@@ -76,14 +76,14 @@ ADMIN FLUSH_FLOW('test_numbers_df_func');
+------------------------------------------+ +------------------------------------------+
-- note that this quote-unquote column is a column-name, **not** a aggregation expr, generated by datafusion -- note that this quote-unquote column is a column-name, **not** a aggregation expr, generated by datafusion
SELECT "sum(abs(numbers_input_df_func.number))", window_start, window_end FROM out_num_cnt_df_func; SELECT "sum(abs(numbers_input_df_func.number))", time_window FROM out_num_cnt_df_func;
+----------------------------------------+---------------------+---------------------+ +----------------------------------------+---------------------+
| sum(abs(numbers_input_df_func.number)) | window_start | window_end | | sum(abs(numbers_input_df_func.number)) | time_window |
+----------------------------------------+---------------------+---------------------+ +----------------------------------------+---------------------+
| 42 | 2021-07-01T00:00:00 | 2021-07-01T00:00:01 | | 42 | 2021-07-01T00:00:00 |
| 47 | 2021-07-01T00:00:01 | 2021-07-01T00:00:02 | | 47 | 2021-07-01T00:00:01 |
+----------------------------------------+---------------------+---------------------+ +----------------------------------------+---------------------+
DROP FLOW test_numbers_df_func; DROP FLOW test_numbers_df_func;
@@ -110,7 +110,7 @@ Affected Rows: 0
CREATE FLOW test_numbers_df_func CREATE FLOW test_numbers_df_func
SINK TO out_num_cnt_df_func SINK TO out_num_cnt_df_func
AS AS
SELECT abs(sum(number)) FROM numbers_input_df_func GROUP BY tumble(ts, '1 second', '2021-07-01 00:00:00'); SELECT abs(sum(number)), date_bin(INTERVAL '1 second', ts, '2021-07-01 00:00:00') as time_window FROM numbers_input_df_func GROUP BY time_window;
Affected Rows: 0 Affected Rows: 0
@@ -140,13 +140,13 @@ ADMIN FLUSH_FLOW('test_numbers_df_func');
| FLOW_FLUSHED | | FLOW_FLUSHED |
+------------------------------------------+ +------------------------------------------+
SELECT "abs(sum(numbers_input_df_func.number))", window_start, window_end FROM out_num_cnt_df_func; SELECT "abs(sum(numbers_input_df_func.number))", time_window FROM out_num_cnt_df_func;
+----------------------------------------+---------------------+---------------------+ +----------------------------------------+---------------------+
| abs(sum(numbers_input_df_func.number)) | window_start | window_end | | abs(sum(numbers_input_df_func.number)) | time_window |
+----------------------------------------+---------------------+---------------------+ +----------------------------------------+---------------------+
| 2 | 2021-07-01T00:00:00 | 2021-07-01T00:00:01 | | 2 | 2021-07-01T00:00:00 |
+----------------------------------------+---------------------+---------------------+ +----------------------------------------+---------------------+
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | -- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('test_numbers_df_func'); ADMIN FLUSH_FLOW('test_numbers_df_func');
@@ -173,14 +173,14 @@ ADMIN FLUSH_FLOW('test_numbers_df_func');
| FLOW_FLUSHED | | FLOW_FLUSHED |
+------------------------------------------+ +------------------------------------------+
SELECT "abs(sum(numbers_input_df_func.number))", window_start, window_end FROM out_num_cnt_df_func; SELECT "abs(sum(numbers_input_df_func.number))", time_window FROM out_num_cnt_df_func;
+----------------------------------------+---------------------+---------------------+ +----------------------------------------+---------------------+
| abs(sum(numbers_input_df_func.number)) | window_start | window_end | | abs(sum(numbers_input_df_func.number)) | time_window |
+----------------------------------------+---------------------+---------------------+ +----------------------------------------+---------------------+
| 2 | 2021-07-01T00:00:00 | 2021-07-01T00:00:01 | | 2 | 2021-07-01T00:00:00 |
| 1 | 2021-07-01T00:00:01 | 2021-07-01T00:00:02 | | 1 | 2021-07-01T00:00:01 |
+----------------------------------------+---------------------+---------------------+ +----------------------------------------+---------------------+
DROP FLOW test_numbers_df_func; DROP FLOW test_numbers_df_func;

View File

@@ -9,7 +9,7 @@ CREATE TABLE numbers_input_df_func (
CREATE FLOW test_numbers_df_func CREATE FLOW test_numbers_df_func
SINK TO out_num_cnt_df_func SINK TO out_num_cnt_df_func
AS AS
SELECT sum(abs(number)) FROM numbers_input_df_func GROUP BY tumble(ts, '1 second', '2021-07-01 00:00:00'); SELECT sum(abs(number)), date_bin(INTERVAL '1 second', ts, '2021-07-01 00:00:00') as time_window FROM numbers_input_df_func GROUP BY time_window;
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | -- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('test_numbers_df_func'); ADMIN FLUSH_FLOW('test_numbers_df_func');
@@ -24,7 +24,7 @@ VALUES
ADMIN FLUSH_FLOW('test_numbers_df_func'); ADMIN FLUSH_FLOW('test_numbers_df_func');
-- note that this quote-unquote column is a column-name, **not** a aggregation expr, generated by datafusion -- note that this quote-unquote column is a column-name, **not** a aggregation expr, generated by datafusion
SELECT "sum(abs(numbers_input_df_func.number))", window_start, window_end FROM out_num_cnt_df_func; SELECT "sum(abs(numbers_input_df_func.number))", time_window FROM out_num_cnt_df_func;
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | -- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('test_numbers_df_func'); ADMIN FLUSH_FLOW('test_numbers_df_func');
@@ -38,7 +38,7 @@ VALUES
ADMIN FLUSH_FLOW('test_numbers_df_func'); ADMIN FLUSH_FLOW('test_numbers_df_func');
-- note that this quote-unquote column is a column-name, **not** a aggregation expr, generated by datafusion -- note that this quote-unquote column is a column-name, **not** a aggregation expr, generated by datafusion
SELECT "sum(abs(numbers_input_df_func.number))", window_start, window_end FROM out_num_cnt_df_func; SELECT "sum(abs(numbers_input_df_func.number))", time_window FROM out_num_cnt_df_func;
DROP FLOW test_numbers_df_func; DROP FLOW test_numbers_df_func;
DROP TABLE numbers_input_df_func; DROP TABLE numbers_input_df_func;
@@ -55,7 +55,7 @@ CREATE TABLE numbers_input_df_func (
CREATE FLOW test_numbers_df_func CREATE FLOW test_numbers_df_func
SINK TO out_num_cnt_df_func SINK TO out_num_cnt_df_func
AS AS
SELECT abs(sum(number)) FROM numbers_input_df_func GROUP BY tumble(ts, '1 second', '2021-07-01 00:00:00'); SELECT abs(sum(number)), date_bin(INTERVAL '1 second', ts, '2021-07-01 00:00:00') as time_window FROM numbers_input_df_func GROUP BY time_window;
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | -- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('test_numbers_df_func'); ADMIN FLUSH_FLOW('test_numbers_df_func');
@@ -69,7 +69,7 @@ VALUES
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | -- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('test_numbers_df_func'); ADMIN FLUSH_FLOW('test_numbers_df_func');
SELECT "abs(sum(numbers_input_df_func.number))", window_start, window_end FROM out_num_cnt_df_func; SELECT "abs(sum(numbers_input_df_func.number))", time_window FROM out_num_cnt_df_func;
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | -- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('test_numbers_df_func'); ADMIN FLUSH_FLOW('test_numbers_df_func');
@@ -82,7 +82,7 @@ VALUES
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | -- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('test_numbers_df_func'); ADMIN FLUSH_FLOW('test_numbers_df_func');
SELECT "abs(sum(numbers_input_df_func.number))", window_start, window_end FROM out_num_cnt_df_func; SELECT "abs(sum(numbers_input_df_func.number))", time_window FROM out_num_cnt_df_func;
DROP FLOW test_numbers_df_func; DROP FLOW test_numbers_df_func;
DROP TABLE numbers_input_df_func; DROP TABLE numbers_input_df_func;

View File

@@ -0,0 +1,62 @@
-- test if flush_flow works and flush old data to flow for compute
CREATE TABLE numbers_input_basic (
number INT,
ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY(number),
TIME INDEX(ts)
);
Affected Rows: 0
INSERT INTO
numbers_input_basic
VALUES
(20, "2021-07-01 00:00:00.200"),
(22, "2021-07-01 00:00:00.600");
Affected Rows: 2
CREATE FLOW test_numbers_basic SINK TO out_num_cnt_basic AS
SELECT
sum(number),
date_bin(INTERVAL '1 second', ts, '2021-07-01 00:00:00.1') as time_window
FROM
numbers_input_basic
GROUP BY
time_window;
Affected Rows: 0
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('test_numbers_basic');
+----------------------------------------+
| ADMIN FLUSH_FLOW('test_numbers_basic') |
+----------------------------------------+
| FLOW_FLUSHED |
+----------------------------------------+
SELECT
"sum(numbers_input_basic.number)",
time_window
FROM
out_num_cnt_basic;
+---------------------------------+-------------------------+
| sum(numbers_input_basic.number) | time_window |
+---------------------------------+-------------------------+
| 42 | 2021-07-01T00:00:00.100 |
+---------------------------------+-------------------------+
DROP FLOW test_numbers_basic;
Affected Rows: 0
DROP TABLE numbers_input_basic;
Affected Rows: 0
DROP TABLE out_num_cnt_basic;
Affected Rows: 0

View File

@@ -0,0 +1,37 @@
-- test if flush_flow works and flush old data to flow for compute
CREATE TABLE numbers_input_basic (
number INT,
ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY(number),
TIME INDEX(ts)
);
INSERT INTO
numbers_input_basic
VALUES
(20, "2021-07-01 00:00:00.200"),
(22, "2021-07-01 00:00:00.600");
CREATE FLOW test_numbers_basic SINK TO out_num_cnt_basic AS
SELECT
sum(number),
date_bin(INTERVAL '1 second', ts, '2021-07-01 00:00:00.1') as time_window
FROM
numbers_input_basic
GROUP BY
time_window;
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('test_numbers_basic');
SELECT
"sum(numbers_input_basic.number)",
time_window
FROM
out_num_cnt_basic;
DROP FLOW test_numbers_basic;
DROP TABLE numbers_input_basic;
DROP TABLE out_num_cnt_basic;

View File

@@ -5,6 +5,8 @@ CREATE TABLE requests (
service_ip STRING, service_ip STRING,
val INT, val INT,
ts TIMESTAMP TIME INDEX ts TIMESTAMP TIME INDEX
)WITH(
append_mode = 'true'
); );
Affected Rows: 0 Affected Rows: 0
@@ -93,6 +95,8 @@ CREATE TABLE ngx_access_log (
client STRING, client STRING,
country STRING, country STRING,
access_time TIMESTAMP TIME INDEX access_time TIMESTAMP TIME INDEX
)WITH(
append_mode = 'true'
); );
Affected Rows: 0 Affected Rows: 0

View File

@@ -6,6 +6,8 @@ CREATE TABLE requests (
service_ip STRING, service_ip STRING,
val INT, val INT,
ts TIMESTAMP TIME INDEX ts TIMESTAMP TIME INDEX
)WITH(
append_mode = 'true'
); );
CREATE TABLE sum_val_in_reqs ( CREATE TABLE sum_val_in_reqs (
@@ -59,6 +61,8 @@ CREATE TABLE ngx_access_log (
client STRING, client STRING,
country STRING, country STRING,
access_time TIMESTAMP TIME INDEX access_time TIMESTAMP TIME INDEX
)WITH(
append_mode = 'true'
); );
CREATE FLOW calc_ngx_country SINK TO ngx_country AS CREATE FLOW calc_ngx_country SINK TO ngx_country AS

View File

@@ -3,6 +3,8 @@ CREATE TABLE input_basic (
ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP, ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY(number), PRIMARY KEY(number),
TIME INDEX(ts) TIME INDEX(ts)
)WITH(
append_mode = 'true'
); );
Affected Rows: 0 Affected Rows: 0
@@ -166,7 +168,7 @@ ADMIN FLUSH_FLOW('test_wildcard_basic');
| FLOW_FLUSHED | | FLOW_FLUSHED |
+-----------------------------------------+ +-----------------------------------------+
-- 3 is also expected, since flow don't have persisent state -- flow batching mode
SELECT wildcard FROM out_basic; SELECT wildcard FROM out_basic;
+----------+ +----------+
@@ -175,6 +177,14 @@ SELECT wildcard FROM out_basic;
| 3 | | 3 |
+----------+ +----------+
SELECT count(*) FROM input_basic;
+----------+
| count(*) |
+----------+
| 3 |
+----------+
DROP TABLE input_basic; DROP TABLE input_basic;
Affected Rows: 0 Affected Rows: 0
@@ -302,6 +312,15 @@ FROM
Affected Rows: 0 Affected Rows: 0
-- SQLNESS ARG restart=true -- SQLNESS ARG restart=true
SELECT 1;
+----------+
| Int64(1) |
+----------+
| 1 |
+----------+
-- SQLNESS SLEEP 3s
INSERT INTO INSERT INTO
input_basic input_basic
VALUES VALUES
@@ -310,6 +329,8 @@ VALUES
Affected Rows: 2 Affected Rows: 2
-- give flownode a second to rebuild flow
-- SQLNESS SLEEP 3s
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | -- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('test_wildcard_basic'); ADMIN FLUSH_FLOW('test_wildcard_basic');
@@ -358,6 +379,15 @@ FROM
Affected Rows: 0 Affected Rows: 0
-- SQLNESS ARG restart=true -- SQLNESS ARG restart=true
SELECT 1;
+----------+
| Int64(1) |
+----------+
| 1 |
+----------+
-- SQLNESS SLEEP 3s
INSERT INTO INSERT INTO
input_basic input_basic
VALUES VALUES
@@ -366,6 +396,8 @@ VALUES
Affected Rows: 2 Affected Rows: 2
-- give flownode a second to rebuild flow
-- SQLNESS SLEEP 3s
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | -- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('test_wildcard_basic'); ADMIN FLUSH_FLOW('test_wildcard_basic');
@@ -397,6 +429,15 @@ CREATE TABLE input_basic (
Affected Rows: 0 Affected Rows: 0
-- SQLNESS ARG restart=true -- SQLNESS ARG restart=true
SELECT 1;
+----------+
| Int64(1) |
+----------+
| 1 |
+----------+
-- SQLNESS SLEEP 3s
INSERT INTO INSERT INTO
input_basic input_basic
VALUES VALUES
@@ -406,6 +447,8 @@ VALUES
Affected Rows: 3 Affected Rows: 3
-- give flownode a second to rebuild flow
-- SQLNESS SLEEP 3s
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | -- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('test_wildcard_basic'); ADMIN FLUSH_FLOW('test_wildcard_basic');
@@ -438,7 +481,17 @@ FROM
Affected Rows: 0 Affected Rows: 0
-- give flownode a second to rebuild flow
-- SQLNESS ARG restart=true -- SQLNESS ARG restart=true
SELECT 1;
+----------+
| Int64(1) |
+----------+
| 1 |
+----------+
-- SQLNESS SLEEP 3s
INSERT INTO INSERT INTO
input_basic input_basic
VALUES VALUES
@@ -457,13 +510,21 @@ ADMIN FLUSH_FLOW('test_wildcard_basic');
| FLOW_FLUSHED | | FLOW_FLUSHED |
+-----------------------------------------+ +-----------------------------------------+
-- 3 is also expected, since flow don't have persisent state -- 4 is also expected, since flow batching mode
SELECT wildcard FROM out_basic; SELECT wildcard FROM out_basic;
+----------+ +----------+
| wildcard | | wildcard |
+----------+ +----------+
| 3 | | 4 |
+----------+
SELECT count(*) FROM input_basic;
+----------+
| count(*) |
+----------+
| 4 |
+----------+ +----------+
DROP TABLE input_basic; DROP TABLE input_basic;
@@ -496,6 +557,15 @@ FROM
Affected Rows: 0 Affected Rows: 0
-- SQLNESS ARG restart=true -- SQLNESS ARG restart=true
SELECT 1;
+----------+
| Int64(1) |
+----------+
| 1 |
+----------+
-- SQLNESS SLEEP 3s
INSERT INTO INSERT INTO
input_basic input_basic
VALUES VALUES
@@ -504,6 +574,8 @@ VALUES
Affected Rows: 2 Affected Rows: 2
-- give flownode a second to rebuild flow
-- SQLNESS SLEEP 3s
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | -- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('test_wildcard_basic'); ADMIN FLUSH_FLOW('test_wildcard_basic');
@@ -538,6 +610,15 @@ FROM
Affected Rows: 0 Affected Rows: 0
-- SQLNESS ARG restart=true -- SQLNESS ARG restart=true
SELECT 1;
+----------+
| Int64(1) |
+----------+
| 1 |
+----------+
-- SQLNESS SLEEP 3s
INSERT INTO INSERT INTO
input_basic input_basic
VALUES VALUES
@@ -547,6 +628,7 @@ VALUES
Affected Rows: 3 Affected Rows: 3
-- give flownode a second to rebuild flow
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | -- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('test_wildcard_basic'); ADMIN FLUSH_FLOW('test_wildcard_basic');

View File

@@ -3,6 +3,8 @@ CREATE TABLE input_basic (
ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP, ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY(number), PRIMARY KEY(number),
TIME INDEX(ts) TIME INDEX(ts)
)WITH(
append_mode = 'true'
); );
CREATE FLOW test_wildcard_basic sink TO out_basic AS CREATE FLOW test_wildcard_basic sink TO out_basic AS
@@ -95,9 +97,11 @@ VALUES
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | -- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('test_wildcard_basic'); ADMIN FLUSH_FLOW('test_wildcard_basic');
-- 3 is also expected, since flow don't have persisent state -- flow batching mode
SELECT wildcard FROM out_basic; SELECT wildcard FROM out_basic;
SELECT count(*) FROM input_basic;
DROP TABLE input_basic; DROP TABLE input_basic;
DROP FLOW test_wildcard_basic; DROP FLOW test_wildcard_basic;
DROP TABLE out_basic; DROP TABLE out_basic;
@@ -168,12 +172,17 @@ FROM
input_basic; input_basic;
-- SQLNESS ARG restart=true -- SQLNESS ARG restart=true
SELECT 1;
-- SQLNESS SLEEP 3s
INSERT INTO INSERT INTO
input_basic input_basic
VALUES VALUES
(23, "2021-07-01 00:00:01.000"), (23, "2021-07-01 00:00:01.000"),
(24, "2021-07-01 00:00:01.500"); (24, "2021-07-01 00:00:01.500");
-- give flownode a second to rebuild flow
-- SQLNESS SLEEP 3s
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | -- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('test_wildcard_basic'); ADMIN FLUSH_FLOW('test_wildcard_basic');
@@ -201,12 +210,17 @@ FROM
input_basic; input_basic;
-- SQLNESS ARG restart=true -- SQLNESS ARG restart=true
SELECT 1;
-- SQLNESS SLEEP 3s
INSERT INTO INSERT INTO
input_basic input_basic
VALUES VALUES
(23, "2021-07-01 00:00:01.000"), (23, "2021-07-01 00:00:01.000"),
(24, "2021-07-01 00:00:01.500"); (24, "2021-07-01 00:00:01.500");
-- give flownode a second to rebuild flow
-- SQLNESS SLEEP 3s
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | -- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('test_wildcard_basic'); ADMIN FLUSH_FLOW('test_wildcard_basic');
@@ -222,6 +236,9 @@ CREATE TABLE input_basic (
); );
-- SQLNESS ARG restart=true -- SQLNESS ARG restart=true
SELECT 1;
-- SQLNESS SLEEP 3s
INSERT INTO INSERT INTO
input_basic input_basic
VALUES VALUES
@@ -229,6 +246,8 @@ VALUES
(24, "2021-07-01 00:00:01.500"), (24, "2021-07-01 00:00:01.500"),
(26, "2021-07-01 00:00:02.000"); (26, "2021-07-01 00:00:02.000");
-- give flownode a second to rebuild flow
-- SQLNESS SLEEP 3s
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | -- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('test_wildcard_basic'); ADMIN FLUSH_FLOW('test_wildcard_basic');
@@ -245,7 +264,11 @@ SELECT
FROM FROM
input_basic; input_basic;
-- give flownode a second to rebuild flow
-- SQLNESS ARG restart=true -- SQLNESS ARG restart=true
SELECT 1;
-- SQLNESS SLEEP 3s
INSERT INTO INSERT INTO
input_basic input_basic
VALUES VALUES
@@ -256,9 +279,11 @@ VALUES
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | -- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('test_wildcard_basic'); ADMIN FLUSH_FLOW('test_wildcard_basic');
-- 3 is also expected, since flow don't have persisent state -- 4 is also expected, since flow batching mode
SELECT wildcard FROM out_basic; SELECT wildcard FROM out_basic;
SELECT count(*) FROM input_basic;
DROP TABLE input_basic; DROP TABLE input_basic;
DROP FLOW test_wildcard_basic; DROP FLOW test_wildcard_basic;
DROP TABLE out_basic; DROP TABLE out_basic;
@@ -277,13 +302,17 @@ FROM
input_basic; input_basic;
-- SQLNESS ARG restart=true -- SQLNESS ARG restart=true
SELECT 1;
-- SQLNESS SLEEP 3s
INSERT INTO INSERT INTO
input_basic input_basic
VALUES VALUES
(23, "2021-07-01 00:00:01.000"), (23, "2021-07-01 00:00:01.000"),
(24, "2021-07-01 00:00:01.500"); (24, "2021-07-01 00:00:01.500");
-- give flownode a second to rebuild flow
-- SQLNESS SLEEP 3s
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | -- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('test_wildcard_basic'); ADMIN FLUSH_FLOW('test_wildcard_basic');
@@ -300,6 +329,9 @@ FROM
input_basic; input_basic;
-- SQLNESS ARG restart=true -- SQLNESS ARG restart=true
SELECT 1;
-- SQLNESS SLEEP 3s
INSERT INTO INSERT INTO
input_basic input_basic
VALUES VALUES
@@ -307,6 +339,7 @@ VALUES
(24, "2021-07-01 00:00:01.500"), (24, "2021-07-01 00:00:01.500"),
(25, "2021-07-01 00:00:01.700"); (25, "2021-07-01 00:00:01.700");
-- give flownode a second to rebuild flow
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | -- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('test_wildcard_basic'); ADMIN FLUSH_FLOW('test_wildcard_basic');

View File

@@ -397,7 +397,7 @@ CREATE TABLE temp_alerts (
sensor_id INT, sensor_id INT,
loc STRING, loc STRING,
max_temp DOUBLE, max_temp DOUBLE,
update_at TIMESTAMP TIME INDEX, event_ts TIMESTAMP TIME INDEX,
PRIMARY KEY(sensor_id, loc) PRIMARY KEY(sensor_id, loc)
); );
@@ -408,6 +408,7 @@ SELECT
sensor_id, sensor_id,
loc, loc,
max(temperature) as max_temp, max(temperature) as max_temp,
max(ts) as event_ts,
FROM FROM
temp_sensor_data temp_sensor_data
GROUP BY GROUP BY
@@ -438,7 +439,8 @@ ADMIN FLUSH_FLOW('temp_monitoring');
SELECT SELECT
sensor_id, sensor_id,
loc, loc,
max_temp max_temp,
event_ts
FROM FROM
temp_alerts; temp_alerts;
@@ -466,16 +468,17 @@ ADMIN FLUSH_FLOW('temp_monitoring');
SELECT SELECT
sensor_id, sensor_id,
loc, loc,
max_temp max_temp,
event_ts
FROM FROM
temp_alerts; temp_alerts;
+-----------+-------+----------+ +-----------+-------+----------+---------------------+
| sensor_id | loc | max_temp | | sensor_id | loc | max_temp | event_ts |
+-----------+-------+----------+ +-----------+-------+----------+---------------------+
| 1 | room1 | 101.5 | | 1 | room1 | 101.5 | 2022-01-01T00:00:02 |
| 2 | room2 | 102.5 | | 2 | room2 | 102.5 | 2022-01-01T00:00:03 |
+-----------+-------+----------+ +-----------+-------+----------+---------------------+
DROP FLOW temp_monitoring; DROP FLOW temp_monitoring;

View File

@@ -291,7 +291,7 @@ CREATE TABLE temp_alerts (
sensor_id INT, sensor_id INT,
loc STRING, loc STRING,
max_temp DOUBLE, max_temp DOUBLE,
update_at TIMESTAMP TIME INDEX, event_ts TIMESTAMP TIME INDEX,
PRIMARY KEY(sensor_id, loc) PRIMARY KEY(sensor_id, loc)
); );
@@ -300,6 +300,7 @@ SELECT
sensor_id, sensor_id,
loc, loc,
max(temperature) as max_temp, max(temperature) as max_temp,
max(ts) as event_ts,
FROM FROM
temp_sensor_data temp_sensor_data
GROUP BY GROUP BY
@@ -320,7 +321,8 @@ ADMIN FLUSH_FLOW('temp_monitoring');
SELECT SELECT
sensor_id, sensor_id,
loc, loc,
max_temp max_temp,
event_ts
FROM FROM
temp_alerts; temp_alerts;
@@ -337,7 +339,8 @@ ADMIN FLUSH_FLOW('temp_monitoring');
SELECT SELECT
sensor_id, sensor_id,
loc, loc,
max_temp max_temp,
event_ts
FROM FROM
temp_alerts; temp_alerts;

View File

@@ -72,12 +72,13 @@ INSERT INTO ngx_access_log VALUES ('192.168.1.1', 'GET', '/index.html', 200, 512
Affected Rows: 4 Affected Rows: 4
-- TODO(discord9): fix flow stat update for batching mode flow
SELECT created_time < last_execution_time, created_time IS NOT NULL, last_execution_time IS NOT NULL, source_table_names FROM information_schema.flows WHERE flow_name = 'user_agent_flow'; SELECT created_time < last_execution_time, created_time IS NOT NULL, last_execution_time IS NOT NULL, source_table_names FROM information_schema.flows WHERE flow_name = 'user_agent_flow';
+--------------------------------------------------------------------------------------+---------------------------------------------------+----------------------------------------------------------+--------------------------------+ +--------------------------------------------------------------------------------------+---------------------------------------------------+----------------------------------------------------------+--------------------------------+
| information_schema.flows.created_time < information_schema.flows.last_execution_time | information_schema.flows.created_time IS NOT NULL | information_schema.flows.last_execution_time IS NOT NULL | source_table_names | | information_schema.flows.created_time < information_schema.flows.last_execution_time | information_schema.flows.created_time IS NOT NULL | information_schema.flows.last_execution_time IS NOT NULL | source_table_names |
+--------------------------------------------------------------------------------------+---------------------------------------------------+----------------------------------------------------------+--------------------------------+ +--------------------------------------------------------------------------------------+---------------------------------------------------+----------------------------------------------------------+--------------------------------+
| true | true | true | greptime.public.ngx_access_log | | | true | false | greptime.public.ngx_access_log |
+--------------------------------------------------------------------------------------+---------------------------------------------------+----------------------------------------------------------+--------------------------------+ +--------------------------------------------------------------------------------------+---------------------------------------------------+----------------------------------------------------------+--------------------------------+
DROP TABLE ngx_access_log; DROP TABLE ngx_access_log;

View File

@@ -32,6 +32,7 @@ INSERT INTO ngx_access_log VALUES ('192.168.1.1', 'GET', '/index.html', 200, 512
-- SQLNESS SLEEP 10s -- SQLNESS SLEEP 10s
INSERT INTO ngx_access_log VALUES ('192.168.1.1', 'GET', '/index.html', 200, 512, 'Mozilla/5.0', 1024, '2023-10-01T10:00:00Z'), ('192.168.1.2', 'POST', '/submit', 201, 256, 'curl/7.68.0', 512, '2023-10-01T10:01:00Z'), ('192.168.1.1', 'GET', '/about.html', 200, 128, 'Mozilla/5.0', 256, '2023-10-01T10:02:00Z'), ('192.168.1.3', 'GET', '/contact', 404, 64, 'curl/7.68.0', 128, '2023-10-01T10:03:00Z'); INSERT INTO ngx_access_log VALUES ('192.168.1.1', 'GET', '/index.html', 200, 512, 'Mozilla/5.0', 1024, '2023-10-01T10:00:00Z'), ('192.168.1.2', 'POST', '/submit', 201, 256, 'curl/7.68.0', 512, '2023-10-01T10:01:00Z'), ('192.168.1.1', 'GET', '/about.html', 200, 128, 'Mozilla/5.0', 256, '2023-10-01T10:02:00Z'), ('192.168.1.3', 'GET', '/contact', 404, 64, 'curl/7.68.0', 128, '2023-10-01T10:03:00Z');
-- TODO(discord9): fix flow stat update for batching mode flow
SELECT created_time < last_execution_time, created_time IS NOT NULL, last_execution_time IS NOT NULL, source_table_names FROM information_schema.flows WHERE flow_name = 'user_agent_flow'; SELECT created_time < last_execution_time, created_time IS NOT NULL, last_execution_time IS NOT NULL, source_table_names FROM information_schema.flows WHERE flow_name = 'user_agent_flow';
DROP TABLE ngx_access_log; DROP TABLE ngx_access_log;

View File

@@ -1,3 +1,3 @@
[grpc] [grpc]
bind_addr = "127.0.0.1:29401" bind_addr = "{grpc_addr}"
server_addr = "127.0.0.1:29401" server_addr = "{grpc_addr}"

View File

@@ -389,6 +389,9 @@ impl ServerMode {
format!("--metasrv-addrs={metasrv_addr}"), format!("--metasrv-addrs={metasrv_addr}"),
format!("--http-addr={http_addr}"), format!("--http-addr={http_addr}"),
format!("--rpc-addr={rpc_bind_addr}"), format!("--rpc-addr={rpc_bind_addr}"),
// since sqlness run on local, bind addr is the same as server addr
// this is needed so that `cluster_info`'s server addr column can be correct
format!("--rpc-server-addr={rpc_bind_addr}"),
format!("--mysql-addr={mysql_addr}"), format!("--mysql-addr={mysql_addr}"),
format!("--postgres-addr={postgres_addr}"), format!("--postgres-addr={postgres_addr}"),
format!( format!(