mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2025-12-23 14:40:01 +00:00
Compare commits
20 Commits
docs/vecto
...
rr_part_6
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
ed676d97c7 | ||
|
|
14b2badded | ||
|
|
3626a50395 | ||
|
|
0d0dad4ba2 | ||
|
|
ae00e28b2a | ||
|
|
92d2fafb33 | ||
|
|
30b3600597 | ||
|
|
87f1a8c622 | ||
|
|
8e815fc385 | ||
|
|
ca46bd04ee | ||
|
|
d32ade7399 | ||
|
|
b4aa0c8b8b | ||
|
|
e647559d27 | ||
|
|
d2c4767d41 | ||
|
|
82cee11eea | ||
|
|
6d0470c3fb | ||
|
|
47a267e29c | ||
|
|
fa13d06fc6 | ||
|
|
26d9517c3e | ||
|
|
a7da9af5de |
3
Cargo.lock
generated
3
Cargo.lock
generated
@@ -4505,6 +4505,7 @@ dependencies = [
|
|||||||
"arc-swap",
|
"arc-swap",
|
||||||
"async-trait",
|
"async-trait",
|
||||||
"auth",
|
"auth",
|
||||||
|
"bytes",
|
||||||
"cache",
|
"cache",
|
||||||
"catalog",
|
"catalog",
|
||||||
"client",
|
"client",
|
||||||
@@ -4943,7 +4944,7 @@ dependencies = [
|
|||||||
[[package]]
|
[[package]]
|
||||||
name = "greptime-proto"
|
name = "greptime-proto"
|
||||||
version = "0.1.0"
|
version = "0.1.0"
|
||||||
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=b6d9cffd43c4e6358805a798f17e03e232994b82#b6d9cffd43c4e6358805a798f17e03e232994b82"
|
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=e82b0158cd38d4021edb4e4c0ae77f999051e62f#e82b0158cd38d4021edb4e4c0ae77f999051e62f"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"prost 0.13.5",
|
"prost 0.13.5",
|
||||||
"serde",
|
"serde",
|
||||||
|
|||||||
@@ -129,7 +129,7 @@ etcd-client = "0.14"
|
|||||||
fst = "0.4.7"
|
fst = "0.4.7"
|
||||||
futures = "0.3"
|
futures = "0.3"
|
||||||
futures-util = "0.3"
|
futures-util = "0.3"
|
||||||
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "b6d9cffd43c4e6358805a798f17e03e232994b82" }
|
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "e82b0158cd38d4021edb4e4c0ae77f999051e62f" }
|
||||||
hex = "0.4"
|
hex = "0.4"
|
||||||
http = "1"
|
http = "1"
|
||||||
humantime = "2.1"
|
humantime = "2.1"
|
||||||
|
|||||||
@@ -514,6 +514,7 @@ fn query_request_type(request: &QueryRequest) -> &'static str {
|
|||||||
Some(Query::Sql(_)) => "query.sql",
|
Some(Query::Sql(_)) => "query.sql",
|
||||||
Some(Query::LogicalPlan(_)) => "query.logical_plan",
|
Some(Query::LogicalPlan(_)) => "query.logical_plan",
|
||||||
Some(Query::PromRangeQuery(_)) => "query.prom_range",
|
Some(Query::PromRangeQuery(_)) => "query.prom_range",
|
||||||
|
Some(Query::InsertIntoPlan(_)) => "query.insert_into_plan",
|
||||||
None => "query.empty",
|
None => "query.empty",
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -27,7 +27,7 @@ use session::context::QueryContextRef;
|
|||||||
use snafu::{ensure, OptionExt, ResultExt};
|
use snafu::{ensure, OptionExt, ResultExt};
|
||||||
use table::metadata::TableType;
|
use table::metadata::TableType;
|
||||||
use table::table::adapter::DfTableProviderAdapter;
|
use table::table::adapter::DfTableProviderAdapter;
|
||||||
mod dummy_catalog;
|
pub mod dummy_catalog;
|
||||||
use dummy_catalog::DummyCatalogList;
|
use dummy_catalog::DummyCatalogList;
|
||||||
use table::TableRef;
|
use table::TableRef;
|
||||||
|
|
||||||
|
|||||||
@@ -345,7 +345,7 @@ impl StartCommand {
|
|||||||
let client = Arc::new(NodeClients::new(channel_config));
|
let client = Arc::new(NodeClients::new(channel_config));
|
||||||
|
|
||||||
let invoker = FrontendInvoker::build_from(
|
let invoker = FrontendInvoker::build_from(
|
||||||
flownode.flow_worker_manager().clone(),
|
flownode.flow_engine().streaming_engine(),
|
||||||
catalog_manager.clone(),
|
catalog_manager.clone(),
|
||||||
cached_meta_backend.clone(),
|
cached_meta_backend.clone(),
|
||||||
layered_cache_registry.clone(),
|
layered_cache_registry.clone(),
|
||||||
@@ -355,7 +355,9 @@ impl StartCommand {
|
|||||||
.await
|
.await
|
||||||
.context(StartFlownodeSnafu)?;
|
.context(StartFlownodeSnafu)?;
|
||||||
flownode
|
flownode
|
||||||
.flow_worker_manager()
|
.flow_engine()
|
||||||
|
.streaming_engine()
|
||||||
|
// TODO(discord9): refactor and avoid circular reference
|
||||||
.set_frontend_invoker(invoker)
|
.set_frontend_invoker(invoker)
|
||||||
.await;
|
.await;
|
||||||
|
|
||||||
|
|||||||
@@ -56,8 +56,8 @@ use datanode::datanode::{Datanode, DatanodeBuilder};
|
|||||||
use datanode::region_server::RegionServer;
|
use datanode::region_server::RegionServer;
|
||||||
use file_engine::config::EngineConfig as FileEngineConfig;
|
use file_engine::config::EngineConfig as FileEngineConfig;
|
||||||
use flow::{
|
use flow::{
|
||||||
FlowConfig, FlowWorkerManager, FlownodeBuilder, FlownodeInstance, FlownodeOptions,
|
FlowConfig, FlowStreamingEngine, FlownodeBuilder, FlownodeInstance, FlownodeOptions,
|
||||||
FrontendClient, FrontendInvoker,
|
FrontendClient, FrontendInvoker, GrpcQueryHandlerWithBoxedError,
|
||||||
};
|
};
|
||||||
use frontend::frontend::{Frontend, FrontendOptions};
|
use frontend::frontend::{Frontend, FrontendOptions};
|
||||||
use frontend::instance::builder::FrontendBuilder;
|
use frontend::instance::builder::FrontendBuilder;
|
||||||
@@ -524,17 +524,17 @@ impl StartCommand {
|
|||||||
..Default::default()
|
..Default::default()
|
||||||
};
|
};
|
||||||
|
|
||||||
// TODO(discord9): for standalone not use grpc, but just somehow get a handler to frontend grpc client without
|
// for standalone not use grpc, but get a handler to frontend grpc client without
|
||||||
// actually make a connection
|
// actually make a connection
|
||||||
let fe_server_addr = fe_opts.grpc.bind_addr.clone();
|
let (frontend_client, frontend_instance_handler) =
|
||||||
let frontend_client = FrontendClient::from_static_grpc_addr(fe_server_addr);
|
FrontendClient::from_empty_grpc_handler();
|
||||||
let flow_builder = FlownodeBuilder::new(
|
let flow_builder = FlownodeBuilder::new(
|
||||||
flownode_options,
|
flownode_options,
|
||||||
plugins.clone(),
|
plugins.clone(),
|
||||||
table_metadata_manager.clone(),
|
table_metadata_manager.clone(),
|
||||||
catalog_manager.clone(),
|
catalog_manager.clone(),
|
||||||
flow_metadata_manager.clone(),
|
flow_metadata_manager.clone(),
|
||||||
Arc::new(frontend_client),
|
Arc::new(frontend_client.clone()),
|
||||||
);
|
);
|
||||||
let flownode = flow_builder
|
let flownode = flow_builder
|
||||||
.build()
|
.build()
|
||||||
@@ -544,15 +544,15 @@ impl StartCommand {
|
|||||||
|
|
||||||
// set the ref to query for the local flow state
|
// set the ref to query for the local flow state
|
||||||
{
|
{
|
||||||
let flow_worker_manager = flownode.flow_worker_manager();
|
let flow_worker_manager = flownode.flow_engine().streaming_engine();
|
||||||
information_extension
|
information_extension
|
||||||
.set_flow_worker_manager(flow_worker_manager.clone())
|
.set_flow_worker_manager(flow_worker_manager)
|
||||||
.await;
|
.await;
|
||||||
}
|
}
|
||||||
|
|
||||||
let node_manager = Arc::new(StandaloneDatanodeManager {
|
let node_manager = Arc::new(StandaloneDatanodeManager {
|
||||||
region_server: datanode.region_server(),
|
region_server: datanode.region_server(),
|
||||||
flow_server: flownode.flow_worker_manager(),
|
flow_server: flownode.flow_engine(),
|
||||||
});
|
});
|
||||||
|
|
||||||
let table_id_sequence = Arc::new(
|
let table_id_sequence = Arc::new(
|
||||||
@@ -606,7 +606,16 @@ impl StartCommand {
|
|||||||
.context(error::StartFrontendSnafu)?;
|
.context(error::StartFrontendSnafu)?;
|
||||||
let fe_instance = Arc::new(fe_instance);
|
let fe_instance = Arc::new(fe_instance);
|
||||||
|
|
||||||
let flow_worker_manager = flownode.flow_worker_manager();
|
// set the frontend client for flownode
|
||||||
|
let grpc_handler = fe_instance.clone() as Arc<dyn GrpcQueryHandlerWithBoxedError>;
|
||||||
|
let weak_grpc_handler = Arc::downgrade(&grpc_handler);
|
||||||
|
frontend_instance_handler
|
||||||
|
.lock()
|
||||||
|
.unwrap()
|
||||||
|
.replace(weak_grpc_handler);
|
||||||
|
|
||||||
|
// set the frontend invoker for flownode
|
||||||
|
let flow_worker_manager = flownode.flow_engine().streaming_engine();
|
||||||
// flow server need to be able to use frontend to write insert requests back
|
// flow server need to be able to use frontend to write insert requests back
|
||||||
let invoker = FrontendInvoker::build_from(
|
let invoker = FrontendInvoker::build_from(
|
||||||
flow_worker_manager.clone(),
|
flow_worker_manager.clone(),
|
||||||
@@ -694,7 +703,7 @@ pub struct StandaloneInformationExtension {
|
|||||||
region_server: RegionServer,
|
region_server: RegionServer,
|
||||||
procedure_manager: ProcedureManagerRef,
|
procedure_manager: ProcedureManagerRef,
|
||||||
start_time_ms: u64,
|
start_time_ms: u64,
|
||||||
flow_worker_manager: RwLock<Option<Arc<FlowWorkerManager>>>,
|
flow_worker_manager: RwLock<Option<Arc<FlowStreamingEngine>>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl StandaloneInformationExtension {
|
impl StandaloneInformationExtension {
|
||||||
@@ -708,7 +717,7 @@ impl StandaloneInformationExtension {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Set the flow worker manager for the standalone instance.
|
/// Set the flow worker manager for the standalone instance.
|
||||||
pub async fn set_flow_worker_manager(&self, flow_worker_manager: Arc<FlowWorkerManager>) {
|
pub async fn set_flow_worker_manager(&self, flow_worker_manager: Arc<FlowStreamingEngine>) {
|
||||||
let mut guard = self.flow_worker_manager.write().await;
|
let mut guard = self.flow_worker_manager.write().await;
|
||||||
*guard = Some(flow_worker_manager);
|
*guard = Some(flow_worker_manager);
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -38,7 +38,7 @@ use table::metadata::TableId;
|
|||||||
use crate::cache_invalidator::Context;
|
use crate::cache_invalidator::Context;
|
||||||
use crate::ddl::utils::{add_peer_context_if_needed, handle_retry_error};
|
use crate::ddl::utils::{add_peer_context_if_needed, handle_retry_error};
|
||||||
use crate::ddl::DdlContext;
|
use crate::ddl::DdlContext;
|
||||||
use crate::error::{self, Result};
|
use crate::error::{self, Result, UnexpectedSnafu};
|
||||||
use crate::instruction::{CacheIdent, CreateFlow};
|
use crate::instruction::{CacheIdent, CreateFlow};
|
||||||
use crate::key::flow::flow_info::FlowInfoValue;
|
use crate::key::flow::flow_info::FlowInfoValue;
|
||||||
use crate::key::flow::flow_route::FlowRouteValue;
|
use crate::key::flow::flow_route::FlowRouteValue;
|
||||||
@@ -171,7 +171,7 @@ impl CreateFlowProcedure {
|
|||||||
}
|
}
|
||||||
self.data.state = CreateFlowState::CreateFlows;
|
self.data.state = CreateFlowState::CreateFlows;
|
||||||
// determine flow type
|
// determine flow type
|
||||||
self.data.flow_type = Some(determine_flow_type(&self.data.task));
|
self.data.flow_type = Some(get_flow_type_from_options(&self.data.task)?);
|
||||||
|
|
||||||
Ok(Status::executing(true))
|
Ok(Status::executing(true))
|
||||||
}
|
}
|
||||||
@@ -196,8 +196,8 @@ impl CreateFlowProcedure {
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
info!(
|
info!(
|
||||||
"Creating flow({:?}) on flownodes with peers={:?}",
|
"Creating flow({:?}, type={:?}) on flownodes with peers={:?}",
|
||||||
self.data.flow_id, self.data.peers
|
self.data.flow_id, self.data.flow_type, self.data.peers
|
||||||
);
|
);
|
||||||
join_all(create_flow)
|
join_all(create_flow)
|
||||||
.await
|
.await
|
||||||
@@ -306,8 +306,20 @@ impl Procedure for CreateFlowProcedure {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn determine_flow_type(_flow_task: &CreateFlowTask) -> FlowType {
|
pub fn get_flow_type_from_options(flow_task: &CreateFlowTask) -> Result<FlowType> {
|
||||||
FlowType::Batching
|
let flow_type = flow_task
|
||||||
|
.flow_options
|
||||||
|
.get(FlowType::FLOW_TYPE_KEY)
|
||||||
|
.map(|s| s.as_str());
|
||||||
|
match flow_type {
|
||||||
|
Some(FlowType::BATCHING) => Ok(FlowType::Batching),
|
||||||
|
Some(FlowType::STREAMING) => Ok(FlowType::Streaming),
|
||||||
|
Some(unknown) => UnexpectedSnafu {
|
||||||
|
err_msg: format!("Unknown flow type: {}", unknown),
|
||||||
|
}
|
||||||
|
.fail(),
|
||||||
|
None => Ok(FlowType::Batching),
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// The state of [CreateFlowProcedure].
|
/// The state of [CreateFlowProcedure].
|
||||||
|
|||||||
@@ -46,7 +46,7 @@ pub(crate) fn test_create_flow_task(
|
|||||||
create_if_not_exists,
|
create_if_not_exists,
|
||||||
expire_after: Some(300),
|
expire_after: Some(300),
|
||||||
comment: "".to_string(),
|
comment: "".to_string(),
|
||||||
sql: "raw_sql".to_string(),
|
sql: "select 1".to_string(),
|
||||||
flow_options: Default::default(),
|
flow_options: Default::default(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -401,6 +401,13 @@ pub enum Error {
|
|||||||
location: Location,
|
location: Location,
|
||||||
},
|
},
|
||||||
|
|
||||||
|
#[snafu(display("Invalid flow request body: {:?}", body))]
|
||||||
|
InvalidFlowRequestBody {
|
||||||
|
body: Box<Option<api::v1::flow::flow_request::Body>>,
|
||||||
|
#[snafu(implicit)]
|
||||||
|
location: Location,
|
||||||
|
},
|
||||||
|
|
||||||
#[snafu(display("Failed to get kv cache, err: {}", err_msg))]
|
#[snafu(display("Failed to get kv cache, err: {}", err_msg))]
|
||||||
GetKvCache { err_msg: String },
|
GetKvCache { err_msg: String },
|
||||||
|
|
||||||
@@ -853,7 +860,8 @@ impl ErrorExt for Error {
|
|||||||
| TlsConfig { .. }
|
| TlsConfig { .. }
|
||||||
| InvalidSetDatabaseOption { .. }
|
| InvalidSetDatabaseOption { .. }
|
||||||
| InvalidUnsetDatabaseOption { .. }
|
| InvalidUnsetDatabaseOption { .. }
|
||||||
| InvalidTopicNamePrefix { .. } => StatusCode::InvalidArguments,
|
| InvalidTopicNamePrefix { .. }
|
||||||
|
| InvalidFlowRequestBody { .. } => StatusCode::InvalidArguments,
|
||||||
|
|
||||||
FlowNotFound { .. } => StatusCode::FlowNotFound,
|
FlowNotFound { .. } => StatusCode::FlowNotFound,
|
||||||
FlowRouteNotFound { .. } => StatusCode::Unexpected,
|
FlowRouteNotFound { .. } => StatusCode::Unexpected,
|
||||||
|
|||||||
@@ -18,16 +18,19 @@ mod udaf;
|
|||||||
|
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
use api::v1::TableName;
|
||||||
use datafusion::catalog::CatalogProviderList;
|
use datafusion::catalog::CatalogProviderList;
|
||||||
use datafusion::error::Result as DatafusionResult;
|
use datafusion::error::Result as DatafusionResult;
|
||||||
use datafusion::logical_expr::{LogicalPlan, LogicalPlanBuilder};
|
use datafusion::logical_expr::{LogicalPlan, LogicalPlanBuilder};
|
||||||
use datafusion_common::Column;
|
use datafusion_common::{Column, TableReference};
|
||||||
use datafusion_expr::col;
|
use datafusion_expr::dml::InsertOp;
|
||||||
|
use datafusion_expr::{col, DmlStatement, WriteOp};
|
||||||
pub use expr::{build_filter_from_timestamp, build_same_type_ts_filter};
|
pub use expr::{build_filter_from_timestamp, build_same_type_ts_filter};
|
||||||
|
use snafu::ResultExt;
|
||||||
|
|
||||||
pub use self::accumulator::{Accumulator, AggregateFunctionCreator, AggregateFunctionCreatorRef};
|
pub use self::accumulator::{Accumulator, AggregateFunctionCreator, AggregateFunctionCreatorRef};
|
||||||
pub use self::udaf::AggregateFunction;
|
pub use self::udaf::AggregateFunction;
|
||||||
use crate::error::Result;
|
use crate::error::{GeneralDataFusionSnafu, Result};
|
||||||
use crate::logical_plan::accumulator::*;
|
use crate::logical_plan::accumulator::*;
|
||||||
use crate::signature::{Signature, Volatility};
|
use crate::signature::{Signature, Volatility};
|
||||||
|
|
||||||
@@ -79,6 +82,74 @@ pub fn rename_logical_plan_columns(
|
|||||||
LogicalPlanBuilder::from(plan).project(projection)?.build()
|
LogicalPlanBuilder::from(plan).project(projection)?.build()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Convert a insert into logical plan to an (table_name, logical_plan)
|
||||||
|
/// where table_name is the name of the table to insert into.
|
||||||
|
/// logical_plan is the plan to be executed.
|
||||||
|
///
|
||||||
|
/// if input logical plan is not `insert into table_name <input>`, return None
|
||||||
|
///
|
||||||
|
/// Returned TableName will use provided catalog and schema if not specified in the logical plan,
|
||||||
|
/// if table scan in logical plan have full table name, will **NOT** override it.
|
||||||
|
pub fn breakup_insert_plan(
|
||||||
|
plan: &LogicalPlan,
|
||||||
|
default_catalog: &str,
|
||||||
|
default_schema: &str,
|
||||||
|
) -> Option<(TableName, Arc<LogicalPlan>)> {
|
||||||
|
if let LogicalPlan::Dml(dml) = plan {
|
||||||
|
if dml.op != WriteOp::Insert(InsertOp::Append) {
|
||||||
|
return None;
|
||||||
|
}
|
||||||
|
let table_name = &dml.table_name;
|
||||||
|
let table_name = match table_name {
|
||||||
|
TableReference::Bare { table } => TableName {
|
||||||
|
catalog_name: default_catalog.to_string(),
|
||||||
|
schema_name: default_schema.to_string(),
|
||||||
|
table_name: table.to_string(),
|
||||||
|
},
|
||||||
|
TableReference::Partial { schema, table } => TableName {
|
||||||
|
catalog_name: default_catalog.to_string(),
|
||||||
|
schema_name: schema.to_string(),
|
||||||
|
table_name: table.to_string(),
|
||||||
|
},
|
||||||
|
TableReference::Full {
|
||||||
|
catalog,
|
||||||
|
schema,
|
||||||
|
table,
|
||||||
|
} => TableName {
|
||||||
|
catalog_name: catalog.to_string(),
|
||||||
|
schema_name: schema.to_string(),
|
||||||
|
table_name: table.to_string(),
|
||||||
|
},
|
||||||
|
};
|
||||||
|
let logical_plan = dml.input.clone();
|
||||||
|
Some((table_name, logical_plan))
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// create a `insert into table_name <input>` logical plan
|
||||||
|
pub fn add_insert_to_logical_plan(
|
||||||
|
table_name: TableName,
|
||||||
|
table_schema: datafusion_common::DFSchemaRef,
|
||||||
|
input: LogicalPlan,
|
||||||
|
) -> Result<LogicalPlan> {
|
||||||
|
let table_name = TableReference::Full {
|
||||||
|
catalog: table_name.catalog_name.into(),
|
||||||
|
schema: table_name.schema_name.into(),
|
||||||
|
table: table_name.table_name.into(),
|
||||||
|
};
|
||||||
|
|
||||||
|
let plan = LogicalPlan::Dml(DmlStatement::new(
|
||||||
|
table_name,
|
||||||
|
table_schema,
|
||||||
|
WriteOp::Insert(InsertOp::Append),
|
||||||
|
Arc::new(input),
|
||||||
|
));
|
||||||
|
let plan = plan.recompute_schema().context(GeneralDataFusionSnafu)?;
|
||||||
|
Ok(plan)
|
||||||
|
}
|
||||||
|
|
||||||
/// The datafusion `[LogicalPlan]` decoder.
|
/// The datafusion `[LogicalPlan]` decoder.
|
||||||
#[async_trait::async_trait]
|
#[async_trait::async_trait]
|
||||||
pub trait SubstraitPlanDecoder {
|
pub trait SubstraitPlanDecoder {
|
||||||
|
|||||||
@@ -58,7 +58,7 @@ use crate::metrics::{METRIC_FLOW_INSERT_ELAPSED, METRIC_FLOW_ROWS, METRIC_FLOW_R
|
|||||||
use crate::repr::{self, DiffRow, RelationDesc, Row, BATCH_SIZE};
|
use crate::repr::{self, DiffRow, RelationDesc, Row, BATCH_SIZE};
|
||||||
use crate::{CreateFlowArgs, FlowId, TableName};
|
use crate::{CreateFlowArgs, FlowId, TableName};
|
||||||
|
|
||||||
mod flownode_impl;
|
pub(crate) mod flownode_impl;
|
||||||
mod parse_expr;
|
mod parse_expr;
|
||||||
pub(crate) mod refill;
|
pub(crate) mod refill;
|
||||||
mod stat;
|
mod stat;
|
||||||
@@ -135,12 +135,14 @@ impl Configurable for FlownodeOptions {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Arc-ed FlowNodeManager, cheaper to clone
|
/// Arc-ed FlowNodeManager, cheaper to clone
|
||||||
pub type FlowWorkerManagerRef = Arc<FlowWorkerManager>;
|
pub type FlowWorkerManagerRef = Arc<FlowStreamingEngine>;
|
||||||
|
|
||||||
/// FlowNodeManager manages the state of all tasks in the flow node, which should be run on the same thread
|
/// FlowNodeManager manages the state of all tasks in the flow node, which should be run on the same thread
|
||||||
///
|
///
|
||||||
/// The choice of timestamp is just using current system timestamp for now
|
/// The choice of timestamp is just using current system timestamp for now
|
||||||
pub struct FlowWorkerManager {
|
///
|
||||||
|
/// TODO(discord9): rename to FlowStreamingEngine
|
||||||
|
pub struct FlowStreamingEngine {
|
||||||
/// The handler to the worker that will run the dataflow
|
/// The handler to the worker that will run the dataflow
|
||||||
/// which is `!Send` so a handle is used
|
/// which is `!Send` so a handle is used
|
||||||
pub worker_handles: Vec<WorkerHandle>,
|
pub worker_handles: Vec<WorkerHandle>,
|
||||||
@@ -158,7 +160,8 @@ pub struct FlowWorkerManager {
|
|||||||
flow_err_collectors: RwLock<BTreeMap<FlowId, ErrCollector>>,
|
flow_err_collectors: RwLock<BTreeMap<FlowId, ErrCollector>>,
|
||||||
src_send_buf_lens: RwLock<BTreeMap<TableId, watch::Receiver<usize>>>,
|
src_send_buf_lens: RwLock<BTreeMap<TableId, watch::Receiver<usize>>>,
|
||||||
tick_manager: FlowTickManager,
|
tick_manager: FlowTickManager,
|
||||||
node_id: Option<u32>,
|
/// This node id is only available in distributed mode, on standalone mode this is guaranteed to be `None`
|
||||||
|
pub node_id: Option<u32>,
|
||||||
/// Lock for flushing, will be `read` by `handle_inserts` and `write` by `flush_flow`
|
/// Lock for flushing, will be `read` by `handle_inserts` and `write` by `flush_flow`
|
||||||
///
|
///
|
||||||
/// So that a series of event like `inserts -> flush` can be handled correctly
|
/// So that a series of event like `inserts -> flush` can be handled correctly
|
||||||
@@ -168,7 +171,7 @@ pub struct FlowWorkerManager {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Building FlownodeManager
|
/// Building FlownodeManager
|
||||||
impl FlowWorkerManager {
|
impl FlowStreamingEngine {
|
||||||
/// set frontend invoker
|
/// set frontend invoker
|
||||||
pub async fn set_frontend_invoker(&self, frontend: FrontendInvoker) {
|
pub async fn set_frontend_invoker(&self, frontend: FrontendInvoker) {
|
||||||
*self.frontend_invoker.write().await = Some(frontend);
|
*self.frontend_invoker.write().await = Some(frontend);
|
||||||
@@ -187,7 +190,7 @@ impl FlowWorkerManager {
|
|||||||
let node_context = FlownodeContext::new(Box::new(srv_map.clone()) as _);
|
let node_context = FlownodeContext::new(Box::new(srv_map.clone()) as _);
|
||||||
let tick_manager = FlowTickManager::new();
|
let tick_manager = FlowTickManager::new();
|
||||||
let worker_handles = Vec::new();
|
let worker_handles = Vec::new();
|
||||||
FlowWorkerManager {
|
FlowStreamingEngine {
|
||||||
worker_handles,
|
worker_handles,
|
||||||
worker_selector: Mutex::new(0),
|
worker_selector: Mutex::new(0),
|
||||||
query_engine,
|
query_engine,
|
||||||
@@ -263,7 +266,7 @@ pub fn batches_to_rows_req(batches: Vec<Batch>) -> Result<Vec<DiffRequest>, Erro
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// This impl block contains methods to send writeback requests to frontend
|
/// This impl block contains methods to send writeback requests to frontend
|
||||||
impl FlowWorkerManager {
|
impl FlowStreamingEngine {
|
||||||
/// Return the number of requests it made
|
/// Return the number of requests it made
|
||||||
pub async fn send_writeback_requests(&self) -> Result<usize, Error> {
|
pub async fn send_writeback_requests(&self) -> Result<usize, Error> {
|
||||||
let all_reqs = self.generate_writeback_request().await?;
|
let all_reqs = self.generate_writeback_request().await?;
|
||||||
@@ -534,7 +537,7 @@ impl FlowWorkerManager {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Flow Runtime related methods
|
/// Flow Runtime related methods
|
||||||
impl FlowWorkerManager {
|
impl FlowStreamingEngine {
|
||||||
/// Start state report handler, which will receive a sender from HeartbeatTask to send state size report back
|
/// Start state report handler, which will receive a sender from HeartbeatTask to send state size report back
|
||||||
///
|
///
|
||||||
/// if heartbeat task is shutdown, this future will exit too
|
/// if heartbeat task is shutdown, this future will exit too
|
||||||
@@ -728,7 +731,7 @@ impl FlowWorkerManager {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Create&Remove flow
|
/// Create&Remove flow
|
||||||
impl FlowWorkerManager {
|
impl FlowStreamingEngine {
|
||||||
/// remove a flow by it's id
|
/// remove a flow by it's id
|
||||||
pub async fn remove_flow_inner(&self, flow_id: FlowId) -> Result<(), Error> {
|
pub async fn remove_flow_inner(&self, flow_id: FlowId) -> Result<(), Error> {
|
||||||
for handle in self.worker_handles.iter() {
|
for handle in self.worker_handles.iter() {
|
||||||
|
|||||||
@@ -20,35 +20,379 @@ use api::v1::flow::{
|
|||||||
flow_request, CreateRequest, DropRequest, FlowRequest, FlowResponse, FlushFlow,
|
flow_request, CreateRequest, DropRequest, FlowRequest, FlowResponse, FlushFlow,
|
||||||
};
|
};
|
||||||
use api::v1::region::InsertRequests;
|
use api::v1::region::InsertRequests;
|
||||||
|
use catalog::CatalogManager;
|
||||||
use common_error::ext::BoxedError;
|
use common_error::ext::BoxedError;
|
||||||
use common_meta::ddl::create_flow::FlowType;
|
use common_meta::ddl::create_flow::FlowType;
|
||||||
use common_meta::error::{Result as MetaResult, UnexpectedSnafu};
|
use common_meta::error::Result as MetaResult;
|
||||||
|
use common_meta::key::flow::FlowMetadataManager;
|
||||||
use common_runtime::JoinHandle;
|
use common_runtime::JoinHandle;
|
||||||
use common_telemetry::{trace, warn};
|
use common_telemetry::{error, info, trace, warn};
|
||||||
use datatypes::value::Value;
|
use datatypes::value::Value;
|
||||||
|
use futures::TryStreamExt;
|
||||||
use itertools::Itertools;
|
use itertools::Itertools;
|
||||||
use snafu::{IntoError, OptionExt, ResultExt};
|
use session::context::QueryContextBuilder;
|
||||||
|
use snafu::{ensure, IntoError, OptionExt, ResultExt};
|
||||||
use store_api::storage::{RegionId, TableId};
|
use store_api::storage::{RegionId, TableId};
|
||||||
|
use tokio::sync::{Mutex, RwLock};
|
||||||
|
|
||||||
use crate::adapter::{CreateFlowArgs, FlowWorkerManager};
|
use crate::adapter::{CreateFlowArgs, FlowStreamingEngine};
|
||||||
use crate::batching_mode::engine::BatchingEngine;
|
use crate::batching_mode::engine::BatchingEngine;
|
||||||
use crate::engine::FlowEngine;
|
use crate::engine::FlowEngine;
|
||||||
use crate::error::{CreateFlowSnafu, FlowNotFoundSnafu, InsertIntoFlowSnafu, InternalSnafu};
|
use crate::error::{
|
||||||
|
CreateFlowSnafu, ExternalSnafu, FlowNotFoundSnafu, IllegalCheckTaskStateSnafu,
|
||||||
|
InsertIntoFlowSnafu, InternalSnafu, JoinTaskSnafu, ListFlowsSnafu, SyncCheckTaskSnafu,
|
||||||
|
UnexpectedSnafu,
|
||||||
|
};
|
||||||
use crate::metrics::METRIC_FLOW_TASK_COUNT;
|
use crate::metrics::METRIC_FLOW_TASK_COUNT;
|
||||||
use crate::repr::{self, DiffRow};
|
use crate::repr::{self, DiffRow};
|
||||||
use crate::{Error, FlowId};
|
use crate::{Error, FlowId};
|
||||||
|
|
||||||
|
/// Ref to [`FlowDualEngine`]
|
||||||
|
pub type FlowDualEngineRef = Arc<FlowDualEngine>;
|
||||||
|
|
||||||
/// Manage both streaming and batching mode engine
|
/// Manage both streaming and batching mode engine
|
||||||
///
|
///
|
||||||
/// including create/drop/flush flow
|
/// including create/drop/flush flow
|
||||||
/// and redirect insert requests to the appropriate engine
|
/// and redirect insert requests to the appropriate engine
|
||||||
pub struct FlowDualEngine {
|
pub struct FlowDualEngine {
|
||||||
streaming_engine: Arc<FlowWorkerManager>,
|
streaming_engine: Arc<FlowStreamingEngine>,
|
||||||
batching_engine: Arc<BatchingEngine>,
|
batching_engine: Arc<BatchingEngine>,
|
||||||
/// helper struct for faster query flow by table id or vice versa
|
/// helper struct for faster query flow by table id or vice versa
|
||||||
src_table2flow: std::sync::RwLock<SrcTableToFlow>,
|
src_table2flow: RwLock<SrcTableToFlow>,
|
||||||
|
flow_metadata_manager: Arc<FlowMetadataManager>,
|
||||||
|
catalog_manager: Arc<dyn CatalogManager>,
|
||||||
|
check_task: tokio::sync::Mutex<Option<ConsistentCheckTask>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl FlowDualEngine {
|
||||||
|
pub fn new(
|
||||||
|
streaming_engine: Arc<FlowStreamingEngine>,
|
||||||
|
batching_engine: Arc<BatchingEngine>,
|
||||||
|
flow_metadata_manager: Arc<FlowMetadataManager>,
|
||||||
|
catalog_manager: Arc<dyn CatalogManager>,
|
||||||
|
) -> Self {
|
||||||
|
Self {
|
||||||
|
streaming_engine,
|
||||||
|
batching_engine,
|
||||||
|
src_table2flow: RwLock::new(SrcTableToFlow::default()),
|
||||||
|
flow_metadata_manager,
|
||||||
|
catalog_manager,
|
||||||
|
check_task: Mutex::new(None),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn streaming_engine(&self) -> Arc<FlowStreamingEngine> {
|
||||||
|
self.streaming_engine.clone()
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn batching_engine(&self) -> Arc<BatchingEngine> {
|
||||||
|
self.batching_engine.clone()
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Try to sync with check task, this is only used in drop flow&flush flow, so a flow id is required
|
||||||
|
///
|
||||||
|
/// the need to sync is to make sure flush flow actually get called
|
||||||
|
async fn try_sync_with_check_task(
|
||||||
|
&self,
|
||||||
|
flow_id: FlowId,
|
||||||
|
allow_drop: bool,
|
||||||
|
) -> Result<(), Error> {
|
||||||
|
// this function rarely get called so adding some log is helpful
|
||||||
|
info!("Try to sync with check task for flow {}", flow_id);
|
||||||
|
let mut retry = 0;
|
||||||
|
let max_retry = 10;
|
||||||
|
// keep trying to trigger consistent check
|
||||||
|
while retry < max_retry {
|
||||||
|
if let Some(task) = self.check_task.lock().await.as_ref() {
|
||||||
|
task.trigger(false, allow_drop).await?;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
retry += 1;
|
||||||
|
tokio::time::sleep(std::time::Duration::from_millis(500)).await;
|
||||||
|
}
|
||||||
|
|
||||||
|
if retry == max_retry {
|
||||||
|
error!(
|
||||||
|
"Can't sync with check task for flow {} with allow_drop={}",
|
||||||
|
flow_id, allow_drop
|
||||||
|
);
|
||||||
|
return SyncCheckTaskSnafu {
|
||||||
|
flow_id,
|
||||||
|
allow_drop,
|
||||||
|
}
|
||||||
|
.fail();
|
||||||
|
}
|
||||||
|
info!("Successfully sync with check task for flow {}", flow_id);
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Spawn a task to consistently check if all flow tasks in metasrv is created on flownode,
|
||||||
|
/// so on startup, this will create all missing flow tasks, and constantly check at a interval
|
||||||
|
async fn check_flow_consistent(
|
||||||
|
&self,
|
||||||
|
allow_create: bool,
|
||||||
|
allow_drop: bool,
|
||||||
|
) -> Result<(), Error> {
|
||||||
|
// use nodeid to determine if this is standalone/distributed mode, and retrieve all flows in this node(in distributed mode)/or all flows(in standalone mode)
|
||||||
|
let nodeid = self.streaming_engine.node_id;
|
||||||
|
let should_exists: Vec<_> = if let Some(nodeid) = nodeid {
|
||||||
|
// nodeid is available, so we only need to check flows on this node
|
||||||
|
// which also means we are in distributed mode
|
||||||
|
let to_be_recover = self
|
||||||
|
.flow_metadata_manager
|
||||||
|
.flownode_flow_manager()
|
||||||
|
.flows(nodeid.into())
|
||||||
|
.try_collect::<Vec<_>>()
|
||||||
|
.await
|
||||||
|
.context(ListFlowsSnafu {
|
||||||
|
id: Some(nodeid.into()),
|
||||||
|
})?;
|
||||||
|
to_be_recover.into_iter().map(|(id, _)| id).collect()
|
||||||
|
} else {
|
||||||
|
// nodeid is not available, so we need to check all flows
|
||||||
|
// which also means we are in standalone mode
|
||||||
|
let all_catalogs = self
|
||||||
|
.catalog_manager
|
||||||
|
.catalog_names()
|
||||||
|
.await
|
||||||
|
.map_err(BoxedError::new)
|
||||||
|
.context(ExternalSnafu)?;
|
||||||
|
let mut all_flow_ids = vec![];
|
||||||
|
for catalog in all_catalogs {
|
||||||
|
let flows = self
|
||||||
|
.flow_metadata_manager
|
||||||
|
.flow_name_manager()
|
||||||
|
.flow_names(&catalog)
|
||||||
|
.await
|
||||||
|
.try_collect::<Vec<_>>()
|
||||||
|
.await
|
||||||
|
.map_err(BoxedError::new)
|
||||||
|
.context(ExternalSnafu)?;
|
||||||
|
|
||||||
|
all_flow_ids.extend(flows.into_iter().map(|(_, id)| id.flow_id()));
|
||||||
|
}
|
||||||
|
all_flow_ids
|
||||||
|
};
|
||||||
|
let should_exists = should_exists
|
||||||
|
.into_iter()
|
||||||
|
.map(|i| i as FlowId)
|
||||||
|
.collect::<HashSet<_>>();
|
||||||
|
let actual_exists = self.list_flows().await?.into_iter().collect::<HashSet<_>>();
|
||||||
|
let to_be_created = should_exists
|
||||||
|
.iter()
|
||||||
|
.filter(|id| !actual_exists.contains(id))
|
||||||
|
.collect::<Vec<_>>();
|
||||||
|
let to_be_dropped = actual_exists
|
||||||
|
.iter()
|
||||||
|
.filter(|id| !should_exists.contains(id))
|
||||||
|
.collect::<Vec<_>>();
|
||||||
|
|
||||||
|
if !to_be_created.is_empty() {
|
||||||
|
if allow_create {
|
||||||
|
info!(
|
||||||
|
"Recovering {} flows: {:?}",
|
||||||
|
to_be_created.len(),
|
||||||
|
to_be_created
|
||||||
|
);
|
||||||
|
let mut errors = vec![];
|
||||||
|
for flow_id in to_be_created {
|
||||||
|
let flow_id = *flow_id;
|
||||||
|
let info = self
|
||||||
|
.flow_metadata_manager
|
||||||
|
.flow_info_manager()
|
||||||
|
.get(flow_id as u32)
|
||||||
|
.await
|
||||||
|
.map_err(BoxedError::new)
|
||||||
|
.context(ExternalSnafu)?
|
||||||
|
.context(FlowNotFoundSnafu { id: flow_id })?;
|
||||||
|
|
||||||
|
let sink_table_name = [
|
||||||
|
info.sink_table_name().catalog_name.clone(),
|
||||||
|
info.sink_table_name().schema_name.clone(),
|
||||||
|
info.sink_table_name().table_name.clone(),
|
||||||
|
];
|
||||||
|
let args = CreateFlowArgs {
|
||||||
|
flow_id,
|
||||||
|
sink_table_name,
|
||||||
|
source_table_ids: info.source_table_ids().to_vec(),
|
||||||
|
// because recover should only happen on restart the `create_if_not_exists` and `or_replace` can be arbitrary value(since flow doesn't exist)
|
||||||
|
// but for the sake of consistency and to make sure recover of flow actually happen, we set both to true
|
||||||
|
// (which is also fine since checks for not allow both to be true is on metasrv and we already pass that)
|
||||||
|
create_if_not_exists: true,
|
||||||
|
or_replace: true,
|
||||||
|
expire_after: info.expire_after(),
|
||||||
|
comment: Some(info.comment().clone()),
|
||||||
|
sql: info.raw_sql().clone(),
|
||||||
|
flow_options: info.options().clone(),
|
||||||
|
query_ctx: Some(
|
||||||
|
QueryContextBuilder::default()
|
||||||
|
.current_catalog(info.catalog_name().clone())
|
||||||
|
.build(),
|
||||||
|
),
|
||||||
|
};
|
||||||
|
if let Err(err) = self
|
||||||
|
.create_flow(args)
|
||||||
|
.await
|
||||||
|
.map_err(BoxedError::new)
|
||||||
|
.with_context(|_| CreateFlowSnafu {
|
||||||
|
sql: info.raw_sql().clone(),
|
||||||
|
})
|
||||||
|
{
|
||||||
|
errors.push((flow_id, err));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
for (flow_id, err) in errors {
|
||||||
|
warn!("Failed to recreate flow {}, err={:#?}", flow_id, err);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
warn!(
|
||||||
|
"Flownode {:?} found flows not exist in flownode, flow_ids={:?}",
|
||||||
|
nodeid, to_be_created
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if !to_be_dropped.is_empty() {
|
||||||
|
if allow_drop {
|
||||||
|
info!("Dropping flows: {:?}", to_be_dropped);
|
||||||
|
let mut errors = vec![];
|
||||||
|
for flow_id in to_be_dropped {
|
||||||
|
let flow_id = *flow_id;
|
||||||
|
if let Err(err) = self.remove_flow(flow_id).await {
|
||||||
|
errors.push((flow_id, err));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
for (flow_id, err) in errors {
|
||||||
|
warn!("Failed to drop flow {}, err={:#?}", flow_id, err);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
warn!(
|
||||||
|
"Flownode {:?} found flows not exist in flownode, flow_ids={:?}",
|
||||||
|
nodeid, to_be_dropped
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO(discord9): consider sync this with heartbeat(might become necessary in the future)
|
||||||
|
pub async fn start_flow_consistent_check_task(self: &Arc<Self>) -> Result<(), Error> {
|
||||||
|
let mut check_task = self.check_task.lock().await;
|
||||||
|
ensure!(
|
||||||
|
check_task.is_none(),
|
||||||
|
IllegalCheckTaskStateSnafu {
|
||||||
|
reason: "Flow consistent check task already exists",
|
||||||
|
}
|
||||||
|
);
|
||||||
|
let task = ConsistentCheckTask::start_check_task(self).await?;
|
||||||
|
*check_task = Some(task);
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn stop_flow_consistent_check_task(&self) -> Result<(), Error> {
|
||||||
|
info!("Stopping flow consistent check task");
|
||||||
|
let mut check_task = self.check_task.lock().await;
|
||||||
|
|
||||||
|
ensure!(
|
||||||
|
check_task.is_some(),
|
||||||
|
IllegalCheckTaskStateSnafu {
|
||||||
|
reason: "Flow consistent check task does not exist",
|
||||||
|
}
|
||||||
|
);
|
||||||
|
|
||||||
|
check_task.take().expect("Already checked").stop().await?;
|
||||||
|
info!("Stopped flow consistent check task");
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn flow_exist_in_metadata(&self, flow_id: FlowId) -> Result<bool, Error> {
|
||||||
|
self.flow_metadata_manager
|
||||||
|
.flow_info_manager()
|
||||||
|
.get(flow_id as u32)
|
||||||
|
.await
|
||||||
|
.map_err(BoxedError::new)
|
||||||
|
.context(ExternalSnafu)
|
||||||
|
.map(|info| info.is_some())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
struct ConsistentCheckTask {
|
||||||
|
handle: JoinHandle<()>,
|
||||||
|
shutdown_tx: tokio::sync::mpsc::Sender<()>,
|
||||||
|
trigger_tx: tokio::sync::mpsc::Sender<(bool, bool, tokio::sync::oneshot::Sender<()>)>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ConsistentCheckTask {
|
||||||
|
async fn start_check_task(engine: &Arc<FlowDualEngine>) -> Result<Self, Error> {
|
||||||
|
// first do recover flows
|
||||||
|
engine.check_flow_consistent(true, false).await?;
|
||||||
|
|
||||||
|
let inner = engine.clone();
|
||||||
|
let (tx, mut rx) = tokio::sync::mpsc::channel(1);
|
||||||
|
let (trigger_tx, mut trigger_rx) =
|
||||||
|
tokio::sync::mpsc::channel::<(bool, bool, tokio::sync::oneshot::Sender<()>)>(10);
|
||||||
|
let handle = common_runtime::spawn_global(async move {
|
||||||
|
let mut args = (false, false);
|
||||||
|
let mut ret_signal: Option<tokio::sync::oneshot::Sender<()>> = None;
|
||||||
|
loop {
|
||||||
|
if let Err(err) = inner.check_flow_consistent(args.0, args.1).await {
|
||||||
|
error!(err; "Failed to check flow consistent");
|
||||||
|
}
|
||||||
|
if let Some(done) = ret_signal.take() {
|
||||||
|
let _ = done.send(());
|
||||||
|
}
|
||||||
|
|
||||||
|
tokio::select! {
|
||||||
|
_ = rx.recv() => break,
|
||||||
|
incoming = trigger_rx.recv() => if let Some(incoming) = incoming {
|
||||||
|
args = (incoming.0, incoming.1);
|
||||||
|
ret_signal = Some(incoming.2);
|
||||||
|
},
|
||||||
|
_ = tokio::time::sleep(std::time::Duration::from_secs(10)) => args=(false,false),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
Ok(ConsistentCheckTask {
|
||||||
|
handle,
|
||||||
|
shutdown_tx: tx,
|
||||||
|
trigger_tx,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn trigger(&self, allow_create: bool, allow_drop: bool) -> Result<(), Error> {
|
||||||
|
let (tx, rx) = tokio::sync::oneshot::channel();
|
||||||
|
self.trigger_tx
|
||||||
|
.send((allow_create, allow_drop, tx))
|
||||||
|
.await
|
||||||
|
.map_err(|_| {
|
||||||
|
IllegalCheckTaskStateSnafu {
|
||||||
|
reason: "Failed to send trigger signal",
|
||||||
|
}
|
||||||
|
.build()
|
||||||
|
})?;
|
||||||
|
rx.await.map_err(|_| {
|
||||||
|
IllegalCheckTaskStateSnafu {
|
||||||
|
reason: "Failed to receive trigger signal",
|
||||||
|
}
|
||||||
|
.build()
|
||||||
|
})?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn stop(self) -> Result<(), Error> {
|
||||||
|
self.shutdown_tx.send(()).await.map_err(|_| {
|
||||||
|
IllegalCheckTaskStateSnafu {
|
||||||
|
reason: "Failed to send shutdown signal",
|
||||||
|
}
|
||||||
|
.build()
|
||||||
|
})?;
|
||||||
|
// abort so no need to wait
|
||||||
|
self.handle.abort();
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Default)]
|
||||||
struct SrcTableToFlow {
|
struct SrcTableToFlow {
|
||||||
/// mapping of table ids to flow ids for streaming mode
|
/// mapping of table ids to flow ids for streaming mode
|
||||||
stream: HashMap<TableId, HashSet<FlowId>>,
|
stream: HashMap<TableId, HashSet<FlowId>>,
|
||||||
@@ -138,35 +482,49 @@ impl FlowEngine for FlowDualEngine {
|
|||||||
|
|
||||||
self.src_table2flow
|
self.src_table2flow
|
||||||
.write()
|
.write()
|
||||||
.unwrap()
|
.await
|
||||||
.add_flow(flow_id, flow_type, src_table_ids);
|
.add_flow(flow_id, flow_type, src_table_ids);
|
||||||
|
|
||||||
Ok(res)
|
Ok(res)
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn remove_flow(&self, flow_id: FlowId) -> Result<(), Error> {
|
async fn remove_flow(&self, flow_id: FlowId) -> Result<(), Error> {
|
||||||
let flow_type = self.src_table2flow.read().unwrap().get_flow_type(flow_id);
|
let flow_type = self.src_table2flow.read().await.get_flow_type(flow_id);
|
||||||
|
|
||||||
match flow_type {
|
match flow_type {
|
||||||
Some(FlowType::Batching) => self.batching_engine.remove_flow(flow_id).await,
|
Some(FlowType::Batching) => self.batching_engine.remove_flow(flow_id).await,
|
||||||
Some(FlowType::Streaming) => self.streaming_engine.remove_flow(flow_id).await,
|
Some(FlowType::Streaming) => self.streaming_engine.remove_flow(flow_id).await,
|
||||||
None => FlowNotFoundSnafu { id: flow_id }.fail(),
|
None => {
|
||||||
|
// this can happen if flownode just restart, and is stilling creating the flow
|
||||||
|
// since now that this flow should dropped, we need to trigger the consistent check and allow drop
|
||||||
|
// this rely on drop flow ddl delete metadata first, see src/common/meta/src/ddl/drop_flow.rs
|
||||||
|
warn!(
|
||||||
|
"Flow {} is not exist in the underlying engine, but exist in metadata",
|
||||||
|
flow_id
|
||||||
|
);
|
||||||
|
self.try_sync_with_check_task(flow_id, true).await?;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
}?;
|
}?;
|
||||||
// remove mapping
|
// remove mapping
|
||||||
self.src_table2flow.write().unwrap().remove_flow(flow_id);
|
self.src_table2flow.write().await.remove_flow(flow_id);
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn flush_flow(&self, flow_id: FlowId) -> Result<usize, Error> {
|
async fn flush_flow(&self, flow_id: FlowId) -> Result<usize, Error> {
|
||||||
let flow_type = self.src_table2flow.read().unwrap().get_flow_type(flow_id);
|
// sync with check task
|
||||||
|
self.try_sync_with_check_task(flow_id, false).await?;
|
||||||
|
let flow_type = self.src_table2flow.read().await.get_flow_type(flow_id);
|
||||||
match flow_type {
|
match flow_type {
|
||||||
Some(FlowType::Batching) => self.batching_engine.flush_flow(flow_id).await,
|
Some(FlowType::Batching) => self.batching_engine.flush_flow(flow_id).await,
|
||||||
Some(FlowType::Streaming) => self.streaming_engine.flush_flow(flow_id).await,
|
Some(FlowType::Streaming) => self.streaming_engine.flush_flow(flow_id).await,
|
||||||
None => FlowNotFoundSnafu { id: flow_id }.fail(),
|
None => Ok(0),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn flow_exist(&self, flow_id: FlowId) -> Result<bool, Error> {
|
async fn flow_exist(&self, flow_id: FlowId) -> Result<bool, Error> {
|
||||||
let flow_type = self.src_table2flow.read().unwrap().get_flow_type(flow_id);
|
let flow_type = self.src_table2flow.read().await.get_flow_type(flow_id);
|
||||||
// not using `flow_type.is_some()` to make sure the flow is actually exist in the underlying engine
|
// not using `flow_type.is_some()` to make sure the flow is actually exist in the underlying engine
|
||||||
match flow_type {
|
match flow_type {
|
||||||
Some(FlowType::Batching) => self.batching_engine.flow_exist(flow_id).await,
|
Some(FlowType::Batching) => self.batching_engine.flow_exist(flow_id).await,
|
||||||
@@ -175,6 +533,13 @@ impl FlowEngine for FlowDualEngine {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn list_flows(&self) -> Result<impl IntoIterator<Item = FlowId>, Error> {
|
||||||
|
let stream_flows = self.streaming_engine.list_flows().await?;
|
||||||
|
let batch_flows = self.batching_engine.list_flows().await?;
|
||||||
|
|
||||||
|
Ok(stream_flows.into_iter().chain(batch_flows))
|
||||||
|
}
|
||||||
|
|
||||||
async fn handle_flow_inserts(
|
async fn handle_flow_inserts(
|
||||||
&self,
|
&self,
|
||||||
request: api::v1::region::InsertRequests,
|
request: api::v1::region::InsertRequests,
|
||||||
@@ -184,7 +549,7 @@ impl FlowEngine for FlowDualEngine {
|
|||||||
let mut to_batch_engine = request.requests;
|
let mut to_batch_engine = request.requests;
|
||||||
|
|
||||||
{
|
{
|
||||||
let src_table2flow = self.src_table2flow.read().unwrap();
|
let src_table2flow = self.src_table2flow.read().await;
|
||||||
to_batch_engine.retain(|req| {
|
to_batch_engine.retain(|req| {
|
||||||
let region_id = RegionId::from(req.region_id);
|
let region_id = RegionId::from(req.region_id);
|
||||||
let table_id = region_id.table_id();
|
let table_id = region_id.table_id();
|
||||||
@@ -221,12 +586,7 @@ impl FlowEngine for FlowDualEngine {
|
|||||||
requests: to_batch_engine,
|
requests: to_batch_engine,
|
||||||
})
|
})
|
||||||
.await?;
|
.await?;
|
||||||
stream_handler.await.map_err(|e| {
|
stream_handler.await.context(JoinTaskSnafu)??;
|
||||||
crate::error::UnexpectedSnafu {
|
|
||||||
reason: format!("JoinError when handle inserts for flow stream engine: {e:?}"),
|
|
||||||
}
|
|
||||||
.build()
|
|
||||||
})??;
|
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
@@ -307,14 +667,7 @@ impl common_meta::node_manager::Flownode for FlowDualEngine {
|
|||||||
..Default::default()
|
..Default::default()
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
None => UnexpectedSnafu {
|
other => common_meta::error::InvalidFlowRequestBodySnafu { body: other }.fail(),
|
||||||
err_msg: "Missing request body",
|
|
||||||
}
|
|
||||||
.fail(),
|
|
||||||
_ => UnexpectedSnafu {
|
|
||||||
err_msg: "Invalid request body.",
|
|
||||||
}
|
|
||||||
.fail(),
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -339,7 +692,7 @@ fn to_meta_err(
|
|||||||
}
|
}
|
||||||
|
|
||||||
#[async_trait::async_trait]
|
#[async_trait::async_trait]
|
||||||
impl common_meta::node_manager::Flownode for FlowWorkerManager {
|
impl common_meta::node_manager::Flownode for FlowStreamingEngine {
|
||||||
async fn handle(&self, request: FlowRequest) -> MetaResult<FlowResponse> {
|
async fn handle(&self, request: FlowRequest) -> MetaResult<FlowResponse> {
|
||||||
let query_ctx = request
|
let query_ctx = request
|
||||||
.header
|
.header
|
||||||
@@ -413,14 +766,7 @@ impl common_meta::node_manager::Flownode for FlowWorkerManager {
|
|||||||
..Default::default()
|
..Default::default()
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
None => UnexpectedSnafu {
|
other => common_meta::error::InvalidFlowRequestBodySnafu { body: other }.fail(),
|
||||||
err_msg: "Missing request body",
|
|
||||||
}
|
|
||||||
.fail(),
|
|
||||||
_ => UnexpectedSnafu {
|
|
||||||
err_msg: "Invalid request body.",
|
|
||||||
}
|
|
||||||
.fail(),
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -432,7 +778,7 @@ impl common_meta::node_manager::Flownode for FlowWorkerManager {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl FlowEngine for FlowWorkerManager {
|
impl FlowEngine for FlowStreamingEngine {
|
||||||
async fn create_flow(&self, args: CreateFlowArgs) -> Result<Option<FlowId>, Error> {
|
async fn create_flow(&self, args: CreateFlowArgs) -> Result<Option<FlowId>, Error> {
|
||||||
self.create_flow_inner(args).await
|
self.create_flow_inner(args).await
|
||||||
}
|
}
|
||||||
@@ -449,6 +795,16 @@ impl FlowEngine for FlowWorkerManager {
|
|||||||
self.flow_exist_inner(flow_id).await
|
self.flow_exist_inner(flow_id).await
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn list_flows(&self) -> Result<impl IntoIterator<Item = FlowId>, Error> {
|
||||||
|
Ok(self
|
||||||
|
.flow_err_collectors
|
||||||
|
.read()
|
||||||
|
.await
|
||||||
|
.keys()
|
||||||
|
.cloned()
|
||||||
|
.collect::<Vec<_>>())
|
||||||
|
}
|
||||||
|
|
||||||
async fn handle_flow_inserts(
|
async fn handle_flow_inserts(
|
||||||
&self,
|
&self,
|
||||||
request: api::v1::region::InsertRequests,
|
request: api::v1::region::InsertRequests,
|
||||||
@@ -474,7 +830,7 @@ impl FetchFromRow {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl FlowWorkerManager {
|
impl FlowStreamingEngine {
|
||||||
async fn handle_inserts_inner(
|
async fn handle_inserts_inner(
|
||||||
&self,
|
&self,
|
||||||
request: InsertRequests,
|
request: InsertRequests,
|
||||||
@@ -552,7 +908,7 @@ impl FlowWorkerManager {
|
|||||||
.copied()
|
.copied()
|
||||||
.map(FetchFromRow::Idx)
|
.map(FetchFromRow::Idx)
|
||||||
.or_else(|| col_default_val.clone().map(FetchFromRow::Default))
|
.or_else(|| col_default_val.clone().map(FetchFromRow::Default))
|
||||||
.with_context(|| crate::error::UnexpectedSnafu {
|
.with_context(|| UnexpectedSnafu {
|
||||||
reason: format!(
|
reason: format!(
|
||||||
"Column not found: {}, default_value: {:?}",
|
"Column not found: {}, default_value: {:?}",
|
||||||
col_name, col_default_val
|
col_name, col_default_val
|
||||||
|
|||||||
@@ -31,7 +31,7 @@ use snafu::{ensure, OptionExt, ResultExt};
|
|||||||
use table::metadata::TableId;
|
use table::metadata::TableId;
|
||||||
|
|
||||||
use crate::adapter::table_source::ManagedTableSource;
|
use crate::adapter::table_source::ManagedTableSource;
|
||||||
use crate::adapter::{FlowId, FlowWorkerManager, FlowWorkerManagerRef};
|
use crate::adapter::{FlowId, FlowStreamingEngine, FlowWorkerManagerRef};
|
||||||
use crate::error::{FlowNotFoundSnafu, JoinTaskSnafu, UnexpectedSnafu};
|
use crate::error::{FlowNotFoundSnafu, JoinTaskSnafu, UnexpectedSnafu};
|
||||||
use crate::expr::error::ExternalSnafu;
|
use crate::expr::error::ExternalSnafu;
|
||||||
use crate::expr::utils::find_plan_time_window_expr_lower_bound;
|
use crate::expr::utils::find_plan_time_window_expr_lower_bound;
|
||||||
@@ -39,7 +39,7 @@ use crate::repr::RelationDesc;
|
|||||||
use crate::server::get_all_flow_ids;
|
use crate::server::get_all_flow_ids;
|
||||||
use crate::{Error, FrontendInvoker};
|
use crate::{Error, FrontendInvoker};
|
||||||
|
|
||||||
impl FlowWorkerManager {
|
impl FlowStreamingEngine {
|
||||||
/// Create and start refill flow tasks in background
|
/// Create and start refill flow tasks in background
|
||||||
pub async fn create_and_start_refill_flow_tasks(
|
pub async fn create_and_start_refill_flow_tasks(
|
||||||
self: &FlowWorkerManagerRef,
|
self: &FlowWorkerManagerRef,
|
||||||
|
|||||||
@@ -16,9 +16,9 @@ use std::collections::BTreeMap;
|
|||||||
|
|
||||||
use common_meta::key::flow::flow_state::FlowStat;
|
use common_meta::key::flow::flow_state::FlowStat;
|
||||||
|
|
||||||
use crate::FlowWorkerManager;
|
use crate::FlowStreamingEngine;
|
||||||
|
|
||||||
impl FlowWorkerManager {
|
impl FlowStreamingEngine {
|
||||||
pub async fn gen_state_report(&self) -> FlowStat {
|
pub async fn gen_state_report(&self) -> FlowStat {
|
||||||
let mut full_report = BTreeMap::new();
|
let mut full_report = BTreeMap::new();
|
||||||
let mut last_exec_time_map = BTreeMap::new();
|
let mut last_exec_time_map = BTreeMap::new();
|
||||||
|
|||||||
@@ -33,8 +33,8 @@ use crate::adapter::table_source::TableDesc;
|
|||||||
use crate::adapter::{TableName, WorkerHandle, AUTO_CREATED_PLACEHOLDER_TS_COL};
|
use crate::adapter::{TableName, WorkerHandle, AUTO_CREATED_PLACEHOLDER_TS_COL};
|
||||||
use crate::error::{Error, ExternalSnafu, UnexpectedSnafu};
|
use crate::error::{Error, ExternalSnafu, UnexpectedSnafu};
|
||||||
use crate::repr::{ColumnType, RelationDesc, RelationType};
|
use crate::repr::{ColumnType, RelationDesc, RelationType};
|
||||||
use crate::FlowWorkerManager;
|
use crate::FlowStreamingEngine;
|
||||||
impl FlowWorkerManager {
|
impl FlowStreamingEngine {
|
||||||
/// Get a worker handle for creating flow, using round robin to select a worker
|
/// Get a worker handle for creating flow, using round robin to select a worker
|
||||||
pub(crate) async fn get_worker_handle_for_create_flow(&self) -> &WorkerHandle {
|
pub(crate) async fn get_worker_handle_for_create_flow(&self) -> &WorkerHandle {
|
||||||
let use_idx = {
|
let use_idx = {
|
||||||
|
|||||||
@@ -17,14 +17,16 @@
|
|||||||
use std::collections::{BTreeMap, HashMap};
|
use std::collections::{BTreeMap, HashMap};
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
use catalog::CatalogManagerRef;
|
||||||
use common_error::ext::BoxedError;
|
use common_error::ext::BoxedError;
|
||||||
use common_meta::ddl::create_flow::FlowType;
|
use common_meta::ddl::create_flow::FlowType;
|
||||||
use common_meta::key::flow::FlowMetadataManagerRef;
|
use common_meta::key::flow::FlowMetadataManagerRef;
|
||||||
use common_meta::key::table_info::TableInfoManager;
|
use common_meta::key::table_info::{TableInfoManager, TableInfoValue};
|
||||||
use common_meta::key::TableMetadataManagerRef;
|
use common_meta::key::TableMetadataManagerRef;
|
||||||
use common_runtime::JoinHandle;
|
use common_runtime::JoinHandle;
|
||||||
use common_telemetry::info;
|
|
||||||
use common_telemetry::tracing::warn;
|
use common_telemetry::tracing::warn;
|
||||||
|
use common_telemetry::{debug, info};
|
||||||
|
use common_time::TimeToLive;
|
||||||
use query::QueryEngineRef;
|
use query::QueryEngineRef;
|
||||||
use snafu::{ensure, OptionExt, ResultExt};
|
use snafu::{ensure, OptionExt, ResultExt};
|
||||||
use store_api::storage::RegionId;
|
use store_api::storage::RegionId;
|
||||||
@@ -36,7 +38,9 @@ use crate::batching_mode::task::BatchingTask;
|
|||||||
use crate::batching_mode::time_window::{find_time_window_expr, TimeWindowExpr};
|
use crate::batching_mode::time_window::{find_time_window_expr, TimeWindowExpr};
|
||||||
use crate::batching_mode::utils::sql_to_df_plan;
|
use crate::batching_mode::utils::sql_to_df_plan;
|
||||||
use crate::engine::FlowEngine;
|
use crate::engine::FlowEngine;
|
||||||
use crate::error::{ExternalSnafu, FlowAlreadyExistSnafu, TableNotFoundMetaSnafu, UnexpectedSnafu};
|
use crate::error::{
|
||||||
|
ExternalSnafu, FlowAlreadyExistSnafu, TableNotFoundMetaSnafu, UnexpectedSnafu, UnsupportedSnafu,
|
||||||
|
};
|
||||||
use crate::{CreateFlowArgs, Error, FlowId, TableName};
|
use crate::{CreateFlowArgs, Error, FlowId, TableName};
|
||||||
|
|
||||||
/// Batching mode Engine, responsible for driving all the batching mode tasks
|
/// Batching mode Engine, responsible for driving all the batching mode tasks
|
||||||
@@ -48,6 +52,7 @@ pub struct BatchingEngine {
|
|||||||
frontend_client: Arc<FrontendClient>,
|
frontend_client: Arc<FrontendClient>,
|
||||||
flow_metadata_manager: FlowMetadataManagerRef,
|
flow_metadata_manager: FlowMetadataManagerRef,
|
||||||
table_meta: TableMetadataManagerRef,
|
table_meta: TableMetadataManagerRef,
|
||||||
|
catalog_manager: CatalogManagerRef,
|
||||||
query_engine: QueryEngineRef,
|
query_engine: QueryEngineRef,
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -57,6 +62,7 @@ impl BatchingEngine {
|
|||||||
query_engine: QueryEngineRef,
|
query_engine: QueryEngineRef,
|
||||||
flow_metadata_manager: FlowMetadataManagerRef,
|
flow_metadata_manager: FlowMetadataManagerRef,
|
||||||
table_meta: TableMetadataManagerRef,
|
table_meta: TableMetadataManagerRef,
|
||||||
|
catalog_manager: CatalogManagerRef,
|
||||||
) -> Self {
|
) -> Self {
|
||||||
Self {
|
Self {
|
||||||
tasks: Default::default(),
|
tasks: Default::default(),
|
||||||
@@ -64,6 +70,7 @@ impl BatchingEngine {
|
|||||||
frontend_client,
|
frontend_client,
|
||||||
flow_metadata_manager,
|
flow_metadata_manager,
|
||||||
table_meta,
|
table_meta,
|
||||||
|
catalog_manager,
|
||||||
query_engine,
|
query_engine,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -179,6 +186,16 @@ async fn get_table_name(
|
|||||||
table_info: &TableInfoManager,
|
table_info: &TableInfoManager,
|
||||||
table_id: &TableId,
|
table_id: &TableId,
|
||||||
) -> Result<TableName, Error> {
|
) -> Result<TableName, Error> {
|
||||||
|
get_table_info(table_info, table_id).await.map(|info| {
|
||||||
|
let name = info.table_name();
|
||||||
|
[name.catalog_name, name.schema_name, name.table_name]
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn get_table_info(
|
||||||
|
table_info: &TableInfoManager,
|
||||||
|
table_id: &TableId,
|
||||||
|
) -> Result<TableInfoValue, Error> {
|
||||||
table_info
|
table_info
|
||||||
.get(*table_id)
|
.get(*table_id)
|
||||||
.await
|
.await
|
||||||
@@ -187,8 +204,7 @@ async fn get_table_name(
|
|||||||
.with_context(|| UnexpectedSnafu {
|
.with_context(|| UnexpectedSnafu {
|
||||||
reason: format!("Table id = {:?}, couldn't found table name", table_id),
|
reason: format!("Table id = {:?}, couldn't found table name", table_id),
|
||||||
})
|
})
|
||||||
.map(|name| name.table_name())
|
.map(|info| info.into_inner())
|
||||||
.map(|name| [name.catalog_name, name.schema_name, name.table_name])
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl BatchingEngine {
|
impl BatchingEngine {
|
||||||
@@ -248,7 +264,19 @@ impl BatchingEngine {
|
|||||||
let query_ctx = Arc::new(query_ctx);
|
let query_ctx = Arc::new(query_ctx);
|
||||||
let mut source_table_names = Vec::with_capacity(2);
|
let mut source_table_names = Vec::with_capacity(2);
|
||||||
for src_id in source_table_ids {
|
for src_id in source_table_ids {
|
||||||
|
// also check table option to see if ttl!=instant
|
||||||
let table_name = get_table_name(self.table_meta.table_info_manager(), &src_id).await?;
|
let table_name = get_table_name(self.table_meta.table_info_manager(), &src_id).await?;
|
||||||
|
let table_info = get_table_info(self.table_meta.table_info_manager(), &src_id).await?;
|
||||||
|
if table_info.table_info.meta.options.ttl == Some(TimeToLive::Instant) {
|
||||||
|
UnsupportedSnafu {
|
||||||
|
reason: format!(
|
||||||
|
"Source table `{}`(id={}) has instant TTL, Instant TTL is not supported under batching mode. Consider using a TTL longer than flush interval",
|
||||||
|
table_name.join("."),
|
||||||
|
src_id
|
||||||
|
),
|
||||||
|
}
|
||||||
|
.fail()?;
|
||||||
|
}
|
||||||
source_table_names.push(table_name);
|
source_table_names.push(table_name);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -273,7 +301,14 @@ impl BatchingEngine {
|
|||||||
})
|
})
|
||||||
.transpose()?;
|
.transpose()?;
|
||||||
|
|
||||||
info!("Flow id={}, found time window expr={:?}", flow_id, phy_expr);
|
info!(
|
||||||
|
"Flow id={}, found time window expr={}",
|
||||||
|
flow_id,
|
||||||
|
phy_expr
|
||||||
|
.as_ref()
|
||||||
|
.map(|phy_expr| phy_expr.to_string())
|
||||||
|
.unwrap_or("None".to_string())
|
||||||
|
);
|
||||||
|
|
||||||
let task = BatchingTask::new(
|
let task = BatchingTask::new(
|
||||||
flow_id,
|
flow_id,
|
||||||
@@ -284,7 +319,7 @@ impl BatchingEngine {
|
|||||||
sink_table_name,
|
sink_table_name,
|
||||||
source_table_names,
|
source_table_names,
|
||||||
query_ctx,
|
query_ctx,
|
||||||
self.table_meta.clone(),
|
self.catalog_manager.clone(),
|
||||||
rx,
|
rx,
|
||||||
);
|
);
|
||||||
|
|
||||||
@@ -295,10 +330,11 @@ impl BatchingEngine {
|
|||||||
// check execute once first to detect any error early
|
// check execute once first to detect any error early
|
||||||
task.check_execute(&engine, &frontend).await?;
|
task.check_execute(&engine, &frontend).await?;
|
||||||
|
|
||||||
// TODO(discord9): also save handle & use time wheel or what for better
|
// TODO(discord9): use time wheel or what for better
|
||||||
let _handle = common_runtime::spawn_global(async move {
|
let handle = common_runtime::spawn_global(async move {
|
||||||
task_inner.start_executing_loop(engine, frontend).await;
|
task_inner.start_executing_loop(engine, frontend).await;
|
||||||
});
|
});
|
||||||
|
task.state.write().unwrap().task_handle = Some(handle);
|
||||||
|
|
||||||
// only replace here not earlier because we want the old one intact if something went wrong before this line
|
// only replace here not earlier because we want the old one intact if something went wrong before this line
|
||||||
let replaced_old_task_opt = self.tasks.write().await.insert(flow_id, task);
|
let replaced_old_task_opt = self.tasks.write().await.insert(flow_id, task);
|
||||||
@@ -326,15 +362,23 @@ impl BatchingEngine {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub async fn flush_flow_inner(&self, flow_id: FlowId) -> Result<usize, Error> {
|
pub async fn flush_flow_inner(&self, flow_id: FlowId) -> Result<usize, Error> {
|
||||||
|
debug!("Try flush flow {flow_id}");
|
||||||
let task = self.tasks.read().await.get(&flow_id).cloned();
|
let task = self.tasks.read().await.get(&flow_id).cloned();
|
||||||
let task = task.with_context(|| UnexpectedSnafu {
|
let task = task.with_context(|| UnexpectedSnafu {
|
||||||
reason: format!("Can't found task for flow {flow_id}"),
|
reason: format!("Can't found task for flow {flow_id}"),
|
||||||
})?;
|
})?;
|
||||||
|
|
||||||
|
task.mark_all_windows_as_dirty()?;
|
||||||
|
|
||||||
let res = task
|
let res = task
|
||||||
.gen_exec_once(&self.query_engine, &self.frontend_client)
|
.gen_exec_once(&self.query_engine, &self.frontend_client)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
let affected_rows = res.map(|(r, _)| r).unwrap_or_default() as usize;
|
let affected_rows = res.map(|(r, _)| r).unwrap_or_default() as usize;
|
||||||
|
debug!(
|
||||||
|
"Successfully flush flow {flow_id}, affected rows={}",
|
||||||
|
affected_rows
|
||||||
|
);
|
||||||
Ok(affected_rows)
|
Ok(affected_rows)
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -357,6 +401,9 @@ impl FlowEngine for BatchingEngine {
|
|||||||
async fn flow_exist(&self, flow_id: FlowId) -> Result<bool, Error> {
|
async fn flow_exist(&self, flow_id: FlowId) -> Result<bool, Error> {
|
||||||
Ok(self.flow_exist_inner(flow_id).await)
|
Ok(self.flow_exist_inner(flow_id).await)
|
||||||
}
|
}
|
||||||
|
async fn list_flows(&self) -> Result<impl IntoIterator<Item = FlowId>, Error> {
|
||||||
|
Ok(self.tasks.read().await.keys().cloned().collect::<Vec<_>>())
|
||||||
|
}
|
||||||
async fn handle_flow_inserts(
|
async fn handle_flow_inserts(
|
||||||
&self,
|
&self,
|
||||||
request: api::v1::region::InsertRequests,
|
request: api::v1::region::InsertRequests,
|
||||||
|
|||||||
@@ -14,44 +14,105 @@
|
|||||||
|
|
||||||
//! Frontend client to run flow as batching task which is time-window-aware normal query triggered every tick set by user
|
//! Frontend client to run flow as batching task which is time-window-aware normal query triggered every tick set by user
|
||||||
|
|
||||||
use std::sync::Arc;
|
use std::sync::{Arc, Weak};
|
||||||
|
|
||||||
use client::{Client, Database, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
|
use api::v1::greptime_request::Request;
|
||||||
use common_error::ext::BoxedError;
|
use api::v1::CreateTableExpr;
|
||||||
|
use client::{Client, Database};
|
||||||
|
use common_error::ext::{BoxedError, ErrorExt};
|
||||||
use common_grpc::channel_manager::{ChannelConfig, ChannelManager};
|
use common_grpc::channel_manager::{ChannelConfig, ChannelManager};
|
||||||
use common_meta::cluster::{NodeInfo, NodeInfoKey, Role};
|
use common_meta::cluster::{NodeInfo, NodeInfoKey, Role};
|
||||||
use common_meta::peer::Peer;
|
use common_meta::peer::Peer;
|
||||||
use common_meta::rpc::store::RangeRequest;
|
use common_meta::rpc::store::RangeRequest;
|
||||||
|
use common_query::Output;
|
||||||
use meta_client::client::MetaClient;
|
use meta_client::client::MetaClient;
|
||||||
use snafu::ResultExt;
|
use servers::query_handler::grpc::GrpcQueryHandler;
|
||||||
|
use session::context::{QueryContextBuilder, QueryContextRef};
|
||||||
|
use snafu::{OptionExt, ResultExt};
|
||||||
|
|
||||||
use crate::batching_mode::DEFAULT_BATCHING_ENGINE_QUERY_TIMEOUT;
|
use crate::batching_mode::DEFAULT_BATCHING_ENGINE_QUERY_TIMEOUT;
|
||||||
use crate::error::{ExternalSnafu, UnexpectedSnafu};
|
use crate::error::{ExternalSnafu, InvalidRequestSnafu, UnexpectedSnafu};
|
||||||
use crate::Error;
|
use crate::Error;
|
||||||
|
|
||||||
fn default_channel_mgr() -> ChannelManager {
|
/// Just like [`GrpcQueryHandler`] but use BoxedError
|
||||||
let cfg = ChannelConfig::new().timeout(DEFAULT_BATCHING_ENGINE_QUERY_TIMEOUT);
|
///
|
||||||
ChannelManager::with_config(cfg)
|
/// basically just a specialized `GrpcQueryHandler<Error=BoxedError>`
|
||||||
|
///
|
||||||
|
/// this is only useful for flownode to
|
||||||
|
/// invoke frontend Instance in standalone mode
|
||||||
|
#[async_trait::async_trait]
|
||||||
|
pub trait GrpcQueryHandlerWithBoxedError: Send + Sync + 'static {
|
||||||
|
async fn do_query(
|
||||||
|
&self,
|
||||||
|
query: Request,
|
||||||
|
ctx: QueryContextRef,
|
||||||
|
) -> std::result::Result<Output, BoxedError>;
|
||||||
}
|
}
|
||||||
|
|
||||||
fn client_from_urls(addrs: Vec<String>) -> Client {
|
/// auto impl
|
||||||
Client::with_manager_and_urls(default_channel_mgr(), addrs)
|
#[async_trait::async_trait]
|
||||||
|
impl<
|
||||||
|
E: ErrorExt + Send + Sync + 'static,
|
||||||
|
T: GrpcQueryHandler<Error = E> + Send + Sync + 'static,
|
||||||
|
> GrpcQueryHandlerWithBoxedError for T
|
||||||
|
{
|
||||||
|
async fn do_query(
|
||||||
|
&self,
|
||||||
|
query: Request,
|
||||||
|
ctx: QueryContextRef,
|
||||||
|
) -> std::result::Result<Output, BoxedError> {
|
||||||
|
self.do_query(query, ctx).await.map_err(BoxedError::new)
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
type HandlerMutable = Arc<std::sync::Mutex<Option<Weak<dyn GrpcQueryHandlerWithBoxedError>>>>;
|
||||||
|
|
||||||
/// A simple frontend client able to execute sql using grpc protocol
|
/// A simple frontend client able to execute sql using grpc protocol
|
||||||
#[derive(Debug)]
|
///
|
||||||
|
/// This is for computation-heavy query which need to offload computation to frontend, lifting the load from flownode
|
||||||
|
#[derive(Debug, Clone)]
|
||||||
pub enum FrontendClient {
|
pub enum FrontendClient {
|
||||||
Distributed {
|
Distributed {
|
||||||
meta_client: Arc<MetaClient>,
|
meta_client: Arc<MetaClient>,
|
||||||
|
chnl_mgr: ChannelManager,
|
||||||
},
|
},
|
||||||
Standalone {
|
Standalone {
|
||||||
/// for the sake of simplicity still use grpc even in standalone mode
|
/// for the sake of simplicity still use grpc even in standalone mode
|
||||||
/// notice the client here should all be lazy, so that can wait after frontend is booted then make conn
|
/// notice the client here should all be lazy, so that can wait after frontend is booted then make conn
|
||||||
/// TODO(discord9): not use grpc under standalone mode
|
/// TODO(discord9): not use grpc under standalone mode
|
||||||
database_client: DatabaseWithPeer,
|
database_client: HandlerMutable,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl FrontendClient {
|
||||||
|
/// Create a new empty frontend client, with a `HandlerMutable` to set the grpc handler later
|
||||||
|
pub fn from_empty_grpc_handler() -> (Self, HandlerMutable) {
|
||||||
|
let handler = Arc::new(std::sync::Mutex::new(None));
|
||||||
|
(
|
||||||
|
Self::Standalone {
|
||||||
|
database_client: handler.clone(),
|
||||||
|
},
|
||||||
|
handler,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn from_meta_client(meta_client: Arc<MetaClient>) -> Self {
|
||||||
|
Self::Distributed {
|
||||||
|
meta_client,
|
||||||
|
chnl_mgr: {
|
||||||
|
let cfg = ChannelConfig::new().timeout(DEFAULT_BATCHING_ENGINE_QUERY_TIMEOUT);
|
||||||
|
ChannelManager::with_config(cfg)
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn from_grpc_handler(grpc_handler: Weak<dyn GrpcQueryHandlerWithBoxedError>) -> Self {
|
||||||
|
Self::Standalone {
|
||||||
|
database_client: Arc::new(std::sync::Mutex::new(Some(grpc_handler))),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[derive(Debug, Clone)]
|
#[derive(Debug, Clone)]
|
||||||
pub struct DatabaseWithPeer {
|
pub struct DatabaseWithPeer {
|
||||||
pub database: Database,
|
pub database: Database,
|
||||||
@@ -64,25 +125,6 @@ impl DatabaseWithPeer {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl FrontendClient {
|
|
||||||
pub fn from_meta_client(meta_client: Arc<MetaClient>) -> Self {
|
|
||||||
Self::Distributed { meta_client }
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn from_static_grpc_addr(addr: String) -> Self {
|
|
||||||
let peer = Peer {
|
|
||||||
id: 0,
|
|
||||||
addr: addr.clone(),
|
|
||||||
};
|
|
||||||
|
|
||||||
let client = client_from_urls(vec![addr]);
|
|
||||||
let database = Database::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, client);
|
|
||||||
Self::Standalone {
|
|
||||||
database_client: DatabaseWithPeer::new(database, peer),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl FrontendClient {
|
impl FrontendClient {
|
||||||
async fn scan_for_frontend(&self) -> Result<Vec<(NodeInfoKey, NodeInfo)>, Error> {
|
async fn scan_for_frontend(&self) -> Result<Vec<(NodeInfoKey, NodeInfo)>, Error> {
|
||||||
let Self::Distributed { meta_client, .. } = self else {
|
let Self::Distributed { meta_client, .. } = self else {
|
||||||
@@ -115,10 +157,21 @@ impl FrontendClient {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Get the database with max `last_activity_ts`
|
/// Get the database with max `last_activity_ts`
|
||||||
async fn get_last_active_frontend(&self) -> Result<DatabaseWithPeer, Error> {
|
async fn get_last_active_frontend(
|
||||||
if let Self::Standalone { database_client } = self {
|
&self,
|
||||||
return Ok(database_client.clone());
|
catalog: &str,
|
||||||
|
schema: &str,
|
||||||
|
) -> Result<DatabaseWithPeer, Error> {
|
||||||
|
let Self::Distributed {
|
||||||
|
meta_client: _,
|
||||||
|
chnl_mgr,
|
||||||
|
} = self
|
||||||
|
else {
|
||||||
|
return UnexpectedSnafu {
|
||||||
|
reason: "Expect distributed mode",
|
||||||
}
|
}
|
||||||
|
.fail();
|
||||||
|
};
|
||||||
|
|
||||||
let frontends = self.scan_for_frontend().await?;
|
let frontends = self.scan_for_frontend().await?;
|
||||||
let mut peer = None;
|
let mut peer = None;
|
||||||
@@ -133,16 +186,119 @@ impl FrontendClient {
|
|||||||
}
|
}
|
||||||
.fail()?
|
.fail()?
|
||||||
};
|
};
|
||||||
let client = client_from_urls(vec![peer.addr.clone()]);
|
let client = Client::with_manager_and_urls(chnl_mgr.clone(), vec![peer.addr.clone()]);
|
||||||
let database = Database::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, client);
|
let database = Database::new(catalog, schema, client);
|
||||||
Ok(DatabaseWithPeer::new(database, peer))
|
Ok(DatabaseWithPeer::new(database, peer))
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Get a database client, and possibly update it before returning.
|
pub async fn create(
|
||||||
pub async fn get_database_client(&self) -> Result<DatabaseWithPeer, Error> {
|
&self,
|
||||||
|
create: CreateTableExpr,
|
||||||
|
catalog: &str,
|
||||||
|
schema: &str,
|
||||||
|
) -> Result<u32, Error> {
|
||||||
|
self.handle(
|
||||||
|
Request::Ddl(api::v1::DdlRequest {
|
||||||
|
expr: Some(api::v1::ddl_request::Expr::CreateTable(create)),
|
||||||
|
}),
|
||||||
|
catalog,
|
||||||
|
schema,
|
||||||
|
&mut None,
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Handle a request to frontend
|
||||||
|
pub(crate) async fn handle(
|
||||||
|
&self,
|
||||||
|
req: api::v1::greptime_request::Request,
|
||||||
|
catalog: &str,
|
||||||
|
schema: &str,
|
||||||
|
peer_desc: &mut Option<PeerDesc>,
|
||||||
|
) -> Result<u32, Error> {
|
||||||
match self {
|
match self {
|
||||||
Self::Standalone { database_client } => Ok(database_client.clone()),
|
FrontendClient::Distributed { .. } => {
|
||||||
Self::Distributed { meta_client: _ } => self.get_last_active_frontend().await,
|
let db = self.get_last_active_frontend(catalog, schema).await?;
|
||||||
|
|
||||||
|
*peer_desc = Some(PeerDesc::Dist {
|
||||||
|
peer: db.peer.clone(),
|
||||||
|
});
|
||||||
|
|
||||||
|
db.database
|
||||||
|
.handle(req.clone())
|
||||||
|
.await
|
||||||
|
.with_context(|_| InvalidRequestSnafu {
|
||||||
|
context: format!("Failed to handle request: {:?}", req),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
FrontendClient::Standalone { database_client } => {
|
||||||
|
let ctx = QueryContextBuilder::default()
|
||||||
|
.current_catalog(catalog.to_string())
|
||||||
|
.current_schema(schema.to_string())
|
||||||
|
.build();
|
||||||
|
let ctx = Arc::new(ctx);
|
||||||
|
{
|
||||||
|
let database_client = {
|
||||||
|
database_client
|
||||||
|
.lock()
|
||||||
|
.map_err(|e| {
|
||||||
|
UnexpectedSnafu {
|
||||||
|
reason: format!("Failed to lock database client: {e}"),
|
||||||
|
}
|
||||||
|
.build()
|
||||||
|
})?
|
||||||
|
.as_ref()
|
||||||
|
.context(UnexpectedSnafu {
|
||||||
|
reason: "Standalone's frontend instance is not set",
|
||||||
|
})?
|
||||||
|
.upgrade()
|
||||||
|
.context(UnexpectedSnafu {
|
||||||
|
reason: "Failed to upgrade database client",
|
||||||
|
})?
|
||||||
|
};
|
||||||
|
let resp: common_query::Output = database_client
|
||||||
|
.do_query(req.clone(), ctx)
|
||||||
|
.await
|
||||||
|
.map_err(BoxedError::new)
|
||||||
|
.context(ExternalSnafu)?;
|
||||||
|
match resp.data {
|
||||||
|
common_query::OutputData::AffectedRows(rows) => {
|
||||||
|
Ok(rows.try_into().map_err(|_| {
|
||||||
|
UnexpectedSnafu {
|
||||||
|
reason: format!("Failed to convert rows to u32: {}", rows),
|
||||||
|
}
|
||||||
|
.build()
|
||||||
|
})?)
|
||||||
|
}
|
||||||
|
_ => UnexpectedSnafu {
|
||||||
|
reason: "Unexpected output data",
|
||||||
|
}
|
||||||
|
.fail(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Describe a peer of frontend
|
||||||
|
#[derive(Debug, Default)]
|
||||||
|
pub(crate) enum PeerDesc {
|
||||||
|
/// Distributed mode's frontend peer address
|
||||||
|
Dist {
|
||||||
|
/// frontend peer address
|
||||||
|
peer: Peer,
|
||||||
|
},
|
||||||
|
/// Standalone mode
|
||||||
|
#[default]
|
||||||
|
Standalone,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl std::fmt::Display for PeerDesc {
|
||||||
|
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||||
|
match self {
|
||||||
|
PeerDesc::Dist { peer } => write!(f, "{}", peer.addr),
|
||||||
|
PeerDesc::Standalone => write!(f, "standalone"),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -22,13 +22,14 @@ use common_telemetry::tracing::warn;
|
|||||||
use common_time::Timestamp;
|
use common_time::Timestamp;
|
||||||
use datatypes::value::Value;
|
use datatypes::value::Value;
|
||||||
use session::context::QueryContextRef;
|
use session::context::QueryContextRef;
|
||||||
use snafu::ResultExt;
|
use snafu::{OptionExt, ResultExt};
|
||||||
use tokio::sync::oneshot;
|
use tokio::sync::oneshot;
|
||||||
use tokio::time::Instant;
|
use tokio::time::Instant;
|
||||||
|
|
||||||
use crate::batching_mode::task::BatchingTask;
|
use crate::batching_mode::task::BatchingTask;
|
||||||
|
use crate::batching_mode::time_window::TimeWindowExpr;
|
||||||
use crate::batching_mode::MIN_REFRESH_DURATION;
|
use crate::batching_mode::MIN_REFRESH_DURATION;
|
||||||
use crate::error::{DatatypesSnafu, InternalSnafu, TimeSnafu};
|
use crate::error::{DatatypesSnafu, InternalSnafu, TimeSnafu, UnexpectedSnafu};
|
||||||
use crate::{Error, FlowId};
|
use crate::{Error, FlowId};
|
||||||
|
|
||||||
/// The state of the [`BatchingTask`].
|
/// The state of the [`BatchingTask`].
|
||||||
@@ -46,6 +47,8 @@ pub struct TaskState {
|
|||||||
exec_state: ExecState,
|
exec_state: ExecState,
|
||||||
/// Shutdown receiver
|
/// Shutdown receiver
|
||||||
pub(crate) shutdown_rx: oneshot::Receiver<()>,
|
pub(crate) shutdown_rx: oneshot::Receiver<()>,
|
||||||
|
/// Task handle
|
||||||
|
pub(crate) task_handle: Option<tokio::task::JoinHandle<()>>,
|
||||||
}
|
}
|
||||||
impl TaskState {
|
impl TaskState {
|
||||||
pub fn new(query_ctx: QueryContextRef, shutdown_rx: oneshot::Receiver<()>) -> Self {
|
pub fn new(query_ctx: QueryContextRef, shutdown_rx: oneshot::Receiver<()>) -> Self {
|
||||||
@@ -56,6 +59,7 @@ impl TaskState {
|
|||||||
dirty_time_windows: Default::default(),
|
dirty_time_windows: Default::default(),
|
||||||
exec_state: ExecState::Idle,
|
exec_state: ExecState::Idle,
|
||||||
shutdown_rx,
|
shutdown_rx,
|
||||||
|
task_handle: None,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -70,7 +74,11 @@ impl TaskState {
|
|||||||
/// wait for at least `last_query_duration`, at most `max_timeout` to start next query
|
/// wait for at least `last_query_duration`, at most `max_timeout` to start next query
|
||||||
///
|
///
|
||||||
/// if have more dirty time window, exec next query immediately
|
/// if have more dirty time window, exec next query immediately
|
||||||
pub fn get_next_start_query_time(&self, max_timeout: Option<Duration>) -> Instant {
|
pub fn get_next_start_query_time(
|
||||||
|
&self,
|
||||||
|
flow_id: FlowId,
|
||||||
|
max_timeout: Option<Duration>,
|
||||||
|
) -> Instant {
|
||||||
let next_duration = max_timeout
|
let next_duration = max_timeout
|
||||||
.unwrap_or(self.last_query_duration)
|
.unwrap_or(self.last_query_duration)
|
||||||
.min(self.last_query_duration);
|
.min(self.last_query_duration);
|
||||||
@@ -80,6 +88,12 @@ impl TaskState {
|
|||||||
if self.dirty_time_windows.windows.is_empty() {
|
if self.dirty_time_windows.windows.is_empty() {
|
||||||
self.last_update_time + next_duration
|
self.last_update_time + next_duration
|
||||||
} else {
|
} else {
|
||||||
|
debug!(
|
||||||
|
"Flow id = {}, still have {} dirty time window({:?}), execute immediately",
|
||||||
|
flow_id,
|
||||||
|
self.dirty_time_windows.windows.len(),
|
||||||
|
self.dirty_time_windows.windows
|
||||||
|
);
|
||||||
Instant::now()
|
Instant::now()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -115,6 +129,15 @@ impl DirtyTimeWindows {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn add_window(&mut self, start: Timestamp, end: Option<Timestamp>) {
|
||||||
|
self.windows.insert(start, end);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Clean all dirty time windows, useful when can't found time window expr
|
||||||
|
pub fn clean(&mut self) {
|
||||||
|
self.windows.clear();
|
||||||
|
}
|
||||||
|
|
||||||
/// Generate all filter expressions consuming all time windows
|
/// Generate all filter expressions consuming all time windows
|
||||||
pub fn gen_filter_exprs(
|
pub fn gen_filter_exprs(
|
||||||
&mut self,
|
&mut self,
|
||||||
@@ -177,6 +200,18 @@ impl DirtyTimeWindows {
|
|||||||
|
|
||||||
let mut expr_lst = vec![];
|
let mut expr_lst = vec![];
|
||||||
for (start, end) in first_nth.into_iter() {
|
for (start, end) in first_nth.into_iter() {
|
||||||
|
// align using time window exprs
|
||||||
|
let (start, end) = if let Some(ctx) = task_ctx {
|
||||||
|
let Some(time_window_expr) = &ctx.config.time_window_expr else {
|
||||||
|
UnexpectedSnafu {
|
||||||
|
reason: "time_window_expr is not set",
|
||||||
|
}
|
||||||
|
.fail()?
|
||||||
|
};
|
||||||
|
self.align_time_window(start, end, time_window_expr)?
|
||||||
|
} else {
|
||||||
|
(start, end)
|
||||||
|
};
|
||||||
debug!(
|
debug!(
|
||||||
"Time window start: {:?}, end: {:?}",
|
"Time window start: {:?}, end: {:?}",
|
||||||
start.to_iso8601_string(),
|
start.to_iso8601_string(),
|
||||||
@@ -199,6 +234,30 @@ impl DirtyTimeWindows {
|
|||||||
Ok(expr)
|
Ok(expr)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn align_time_window(
|
||||||
|
&self,
|
||||||
|
start: Timestamp,
|
||||||
|
end: Option<Timestamp>,
|
||||||
|
time_window_expr: &TimeWindowExpr,
|
||||||
|
) -> Result<(Timestamp, Option<Timestamp>), Error> {
|
||||||
|
let align_start = time_window_expr.eval(start)?.0.context(UnexpectedSnafu {
|
||||||
|
reason: format!(
|
||||||
|
"Failed to align start time {:?} with time window expr {:?}",
|
||||||
|
start, time_window_expr
|
||||||
|
),
|
||||||
|
})?;
|
||||||
|
let align_end = end
|
||||||
|
.and_then(|end| {
|
||||||
|
time_window_expr
|
||||||
|
.eval(end)
|
||||||
|
// if after aligned, end is the same, then use end(because it's already aligned) else use aligned end
|
||||||
|
.map(|r| if r.0 == Some(end) { r.0 } else { r.1 })
|
||||||
|
.transpose()
|
||||||
|
})
|
||||||
|
.transpose()?;
|
||||||
|
Ok((align_start, align_end))
|
||||||
|
}
|
||||||
|
|
||||||
/// Merge time windows that overlaps or get too close
|
/// Merge time windows that overlaps or get too close
|
||||||
pub fn merge_dirty_time_windows(
|
pub fn merge_dirty_time_windows(
|
||||||
&mut self,
|
&mut self,
|
||||||
@@ -287,8 +346,12 @@ enum ExecState {
|
|||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod test {
|
mod test {
|
||||||
use pretty_assertions::assert_eq;
|
use pretty_assertions::assert_eq;
|
||||||
|
use session::context::QueryContext;
|
||||||
|
|
||||||
use super::*;
|
use super::*;
|
||||||
|
use crate::batching_mode::time_window::find_time_window_expr;
|
||||||
|
use crate::batching_mode::utils::sql_to_df_plan;
|
||||||
|
use crate::test_utils::create_test_query_engine;
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_merge_dirty_time_windows() {
|
fn test_merge_dirty_time_windows() {
|
||||||
@@ -404,4 +467,59 @@ mod test {
|
|||||||
assert_eq!(expected_filter_expr, to_sql.as_deref());
|
assert_eq!(expected_filter_expr, to_sql.as_deref());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn test_align_time_window() {
|
||||||
|
type TimeWindow = (Timestamp, Option<Timestamp>);
|
||||||
|
struct TestCase {
|
||||||
|
sql: String,
|
||||||
|
aligns: Vec<(TimeWindow, TimeWindow)>,
|
||||||
|
}
|
||||||
|
let testcases: Vec<TestCase> = vec![TestCase{
|
||||||
|
sql: "SELECT date_bin(INTERVAL '5 second', ts) AS time_window FROM numbers_with_ts GROUP BY time_window;".to_string(),
|
||||||
|
aligns: vec![
|
||||||
|
((Timestamp::new_second(3), None), (Timestamp::new_second(0), None)),
|
||||||
|
((Timestamp::new_second(8), None), (Timestamp::new_second(5), None)),
|
||||||
|
((Timestamp::new_second(8), Some(Timestamp::new_second(10))), (Timestamp::new_second(5), Some(Timestamp::new_second(10)))),
|
||||||
|
((Timestamp::new_second(8), Some(Timestamp::new_second(9))), (Timestamp::new_second(5), Some(Timestamp::new_second(10)))),
|
||||||
|
],
|
||||||
|
}];
|
||||||
|
|
||||||
|
let query_engine = create_test_query_engine();
|
||||||
|
let ctx = QueryContext::arc();
|
||||||
|
for TestCase { sql, aligns } in testcases {
|
||||||
|
let plan = sql_to_df_plan(ctx.clone(), query_engine.clone(), &sql, true)
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
let (column_name, time_window_expr, _, df_schema) = find_time_window_expr(
|
||||||
|
&plan,
|
||||||
|
query_engine.engine_state().catalog_manager().clone(),
|
||||||
|
ctx.clone(),
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
let time_window_expr = time_window_expr
|
||||||
|
.map(|expr| {
|
||||||
|
TimeWindowExpr::from_expr(
|
||||||
|
&expr,
|
||||||
|
&column_name,
|
||||||
|
&df_schema,
|
||||||
|
&query_engine.engine_state().session_state(),
|
||||||
|
)
|
||||||
|
})
|
||||||
|
.transpose()
|
||||||
|
.unwrap()
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
let dirty = DirtyTimeWindows::default();
|
||||||
|
for (before_align, expected_after_align) in aligns {
|
||||||
|
let after_align = dirty
|
||||||
|
.align_time_window(before_align.0, before_align.1, &time_window_expr)
|
||||||
|
.unwrap();
|
||||||
|
assert_eq!(expected_after_align, after_align);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -12,33 +12,32 @@
|
|||||||
// See the License for the specific language governing permissions and
|
// See the License for the specific language governing permissions and
|
||||||
// limitations under the License.
|
// limitations under the License.
|
||||||
|
|
||||||
use std::collections::HashSet;
|
use std::collections::{BTreeSet, HashSet};
|
||||||
use std::ops::Deref;
|
use std::ops::Deref;
|
||||||
use std::sync::{Arc, RwLock};
|
use std::sync::{Arc, RwLock};
|
||||||
use std::time::{Duration, SystemTime, UNIX_EPOCH};
|
use std::time::{Duration, SystemTime, UNIX_EPOCH};
|
||||||
|
|
||||||
use api::v1::CreateTableExpr;
|
use api::v1::CreateTableExpr;
|
||||||
use arrow_schema::Fields;
|
use arrow_schema::Fields;
|
||||||
|
use catalog::CatalogManagerRef;
|
||||||
use common_error::ext::BoxedError;
|
use common_error::ext::BoxedError;
|
||||||
use common_meta::key::table_name::TableNameKey;
|
use common_query::logical_plan::breakup_insert_plan;
|
||||||
use common_meta::key::TableMetadataManagerRef;
|
|
||||||
use common_telemetry::tracing::warn;
|
use common_telemetry::tracing::warn;
|
||||||
use common_telemetry::{debug, info};
|
use common_telemetry::{debug, info};
|
||||||
use common_time::Timestamp;
|
use common_time::Timestamp;
|
||||||
|
use datafusion::optimizer::analyzer::count_wildcard_rule::CountWildcardRule;
|
||||||
|
use datafusion::optimizer::AnalyzerRule;
|
||||||
use datafusion::sql::unparser::expr_to_sql;
|
use datafusion::sql::unparser::expr_to_sql;
|
||||||
use datafusion_common::tree_node::TreeNode;
|
use datafusion_common::tree_node::{Transformed, TreeNode};
|
||||||
use datafusion_expr::{DmlStatement, LogicalPlan, WriteOp};
|
use datafusion_expr::{DmlStatement, LogicalPlan, WriteOp};
|
||||||
use datatypes::prelude::ConcreteDataType;
|
use datatypes::prelude::ConcreteDataType;
|
||||||
use datatypes::schema::constraint::NOW_FN;
|
use datatypes::schema::{ColumnSchema, Schema};
|
||||||
use datatypes::schema::{ColumnDefaultConstraint, ColumnSchema};
|
|
||||||
use datatypes::value::Value;
|
|
||||||
use operator::expr_helper::column_schemas_to_defs;
|
use operator::expr_helper::column_schemas_to_defs;
|
||||||
use query::query_engine::DefaultSerializer;
|
use query::query_engine::DefaultSerializer;
|
||||||
use query::QueryEngineRef;
|
use query::QueryEngineRef;
|
||||||
use session::context::QueryContextRef;
|
use session::context::QueryContextRef;
|
||||||
use snafu::{OptionExt, ResultExt};
|
use snafu::{OptionExt, ResultExt};
|
||||||
use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan};
|
use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan};
|
||||||
use table::metadata::RawTableMeta;
|
|
||||||
use tokio::sync::oneshot;
|
use tokio::sync::oneshot;
|
||||||
use tokio::sync::oneshot::error::TryRecvError;
|
use tokio::sync::oneshot::error::TryRecvError;
|
||||||
use tokio::time::Instant;
|
use tokio::time::Instant;
|
||||||
@@ -48,14 +47,15 @@ use crate::batching_mode::frontend_client::FrontendClient;
|
|||||||
use crate::batching_mode::state::TaskState;
|
use crate::batching_mode::state::TaskState;
|
||||||
use crate::batching_mode::time_window::TimeWindowExpr;
|
use crate::batching_mode::time_window::TimeWindowExpr;
|
||||||
use crate::batching_mode::utils::{
|
use crate::batching_mode::utils::{
|
||||||
sql_to_df_plan, AddAutoColumnRewriter, AddFilterRewriter, FindGroupByFinalName,
|
get_table_info_df_schema, sql_to_df_plan, AddAutoColumnRewriter, AddFilterRewriter,
|
||||||
|
FindGroupByFinalName,
|
||||||
};
|
};
|
||||||
use crate::batching_mode::{
|
use crate::batching_mode::{
|
||||||
DEFAULT_BATCHING_ENGINE_QUERY_TIMEOUT, MIN_REFRESH_DURATION, SLOW_QUERY_THRESHOLD,
|
DEFAULT_BATCHING_ENGINE_QUERY_TIMEOUT, MIN_REFRESH_DURATION, SLOW_QUERY_THRESHOLD,
|
||||||
};
|
};
|
||||||
use crate::error::{
|
use crate::error::{
|
||||||
ConvertColumnSchemaSnafu, DatafusionSnafu, DatatypesSnafu, ExternalSnafu, InvalidRequestSnafu,
|
ConvertColumnSchemaSnafu, DatafusionSnafu, ExternalSnafu, InvalidQuerySnafu,
|
||||||
SubstraitEncodeLogicalPlanSnafu, TableNotFoundMetaSnafu, TableNotFoundSnafu, UnexpectedSnafu,
|
SubstraitEncodeLogicalPlanSnafu, UnexpectedSnafu,
|
||||||
};
|
};
|
||||||
use crate::metrics::{
|
use crate::metrics::{
|
||||||
METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME, METRIC_FLOW_BATCHING_ENGINE_SLOW_QUERY,
|
METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME, METRIC_FLOW_BATCHING_ENGINE_SLOW_QUERY,
|
||||||
@@ -73,7 +73,7 @@ pub struct TaskConfig {
|
|||||||
pub expire_after: Option<i64>,
|
pub expire_after: Option<i64>,
|
||||||
sink_table_name: [String; 3],
|
sink_table_name: [String; 3],
|
||||||
pub source_table_names: HashSet<[String; 3]>,
|
pub source_table_names: HashSet<[String; 3]>,
|
||||||
table_meta: TableMetadataManagerRef,
|
catalog_manager: CatalogManagerRef,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
@@ -93,7 +93,7 @@ impl BatchingTask {
|
|||||||
sink_table_name: [String; 3],
|
sink_table_name: [String; 3],
|
||||||
source_table_names: Vec<[String; 3]>,
|
source_table_names: Vec<[String; 3]>,
|
||||||
query_ctx: QueryContextRef,
|
query_ctx: QueryContextRef,
|
||||||
table_meta: TableMetadataManagerRef,
|
catalog_manager: CatalogManagerRef,
|
||||||
shutdown_rx: oneshot::Receiver<()>,
|
shutdown_rx: oneshot::Receiver<()>,
|
||||||
) -> Self {
|
) -> Self {
|
||||||
Self {
|
Self {
|
||||||
@@ -105,12 +105,42 @@ impl BatchingTask {
|
|||||||
expire_after,
|
expire_after,
|
||||||
sink_table_name,
|
sink_table_name,
|
||||||
source_table_names: source_table_names.into_iter().collect(),
|
source_table_names: source_table_names.into_iter().collect(),
|
||||||
table_meta,
|
catalog_manager,
|
||||||
}),
|
}),
|
||||||
state: Arc::new(RwLock::new(TaskState::new(query_ctx, shutdown_rx))),
|
state: Arc::new(RwLock::new(TaskState::new(query_ctx, shutdown_rx))),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// mark time window range (now - expire_after, now) as dirty (or (0, now) if expire_after not set)
|
||||||
|
///
|
||||||
|
/// useful for flush_flow to flush dirty time windows range
|
||||||
|
pub fn mark_all_windows_as_dirty(&self) -> Result<(), Error> {
|
||||||
|
let now = SystemTime::now();
|
||||||
|
let now = Timestamp::new_second(
|
||||||
|
now.duration_since(UNIX_EPOCH)
|
||||||
|
.expect("Time went backwards")
|
||||||
|
.as_secs() as _,
|
||||||
|
);
|
||||||
|
let lower_bound = self
|
||||||
|
.config
|
||||||
|
.expire_after
|
||||||
|
.map(|e| now.sub_duration(Duration::from_secs(e as _)))
|
||||||
|
.transpose()
|
||||||
|
.map_err(BoxedError::new)
|
||||||
|
.context(ExternalSnafu)?
|
||||||
|
.unwrap_or(Timestamp::new_second(0));
|
||||||
|
debug!(
|
||||||
|
"Flow {} mark range ({:?}, {:?}) as dirty",
|
||||||
|
self.config.flow_id, lower_bound, now
|
||||||
|
);
|
||||||
|
self.state
|
||||||
|
.write()
|
||||||
|
.unwrap()
|
||||||
|
.dirty_time_windows
|
||||||
|
.add_window(lower_bound, Some(now));
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
/// Test execute, for check syntax or such
|
/// Test execute, for check syntax or such
|
||||||
pub async fn check_execute(
|
pub async fn check_execute(
|
||||||
&self,
|
&self,
|
||||||
@@ -148,13 +178,8 @@ impl BatchingTask {
|
|||||||
|
|
||||||
async fn is_table_exist(&self, table_name: &[String; 3]) -> Result<bool, Error> {
|
async fn is_table_exist(&self, table_name: &[String; 3]) -> Result<bool, Error> {
|
||||||
self.config
|
self.config
|
||||||
.table_meta
|
.catalog_manager
|
||||||
.table_name_manager()
|
.table_exists(&table_name[0], &table_name[1], &table_name[2], None)
|
||||||
.exists(TableNameKey {
|
|
||||||
catalog: &table_name[0],
|
|
||||||
schema: &table_name[1],
|
|
||||||
table: &table_name[2],
|
|
||||||
})
|
|
||||||
.await
|
.await
|
||||||
.map_err(BoxedError::new)
|
.map_err(BoxedError::new)
|
||||||
.context(ExternalSnafu)
|
.context(ExternalSnafu)
|
||||||
@@ -166,8 +191,10 @@ impl BatchingTask {
|
|||||||
frontend_client: &Arc<FrontendClient>,
|
frontend_client: &Arc<FrontendClient>,
|
||||||
) -> Result<Option<(u32, Duration)>, Error> {
|
) -> Result<Option<(u32, Duration)>, Error> {
|
||||||
if let Some(new_query) = self.gen_insert_plan(engine).await? {
|
if let Some(new_query) = self.gen_insert_plan(engine).await? {
|
||||||
|
debug!("Generate new query: {:#?}", new_query);
|
||||||
self.execute_logical_plan(frontend_client, &new_query).await
|
self.execute_logical_plan(frontend_client, &new_query).await
|
||||||
} else {
|
} else {
|
||||||
|
debug!("Generate no query");
|
||||||
Ok(None)
|
Ok(None)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -176,67 +203,35 @@ impl BatchingTask {
|
|||||||
&self,
|
&self,
|
||||||
engine: &QueryEngineRef,
|
engine: &QueryEngineRef,
|
||||||
) -> Result<Option<LogicalPlan>, Error> {
|
) -> Result<Option<LogicalPlan>, Error> {
|
||||||
let full_table_name = self.config.sink_table_name.clone().join(".");
|
let (table, df_schema) = get_table_info_df_schema(
|
||||||
|
self.config.catalog_manager.clone(),
|
||||||
let table_id = self
|
self.config.sink_table_name.clone(),
|
||||||
.config
|
)
|
||||||
.table_meta
|
.await?;
|
||||||
.table_name_manager()
|
|
||||||
.get(common_meta::key::table_name::TableNameKey::new(
|
|
||||||
&self.config.sink_table_name[0],
|
|
||||||
&self.config.sink_table_name[1],
|
|
||||||
&self.config.sink_table_name[2],
|
|
||||||
))
|
|
||||||
.await
|
|
||||||
.with_context(|_| TableNotFoundMetaSnafu {
|
|
||||||
msg: full_table_name.clone(),
|
|
||||||
})?
|
|
||||||
.map(|t| t.table_id())
|
|
||||||
.with_context(|| TableNotFoundSnafu {
|
|
||||||
name: full_table_name.clone(),
|
|
||||||
})?;
|
|
||||||
|
|
||||||
let table = self
|
|
||||||
.config
|
|
||||||
.table_meta
|
|
||||||
.table_info_manager()
|
|
||||||
.get(table_id)
|
|
||||||
.await
|
|
||||||
.with_context(|_| TableNotFoundMetaSnafu {
|
|
||||||
msg: full_table_name.clone(),
|
|
||||||
})?
|
|
||||||
.with_context(|| TableNotFoundSnafu {
|
|
||||||
name: full_table_name.clone(),
|
|
||||||
})?
|
|
||||||
.into_inner();
|
|
||||||
|
|
||||||
let schema: datatypes::schema::Schema = table
|
|
||||||
.table_info
|
|
||||||
.meta
|
|
||||||
.schema
|
|
||||||
.clone()
|
|
||||||
.try_into()
|
|
||||||
.with_context(|_| DatatypesSnafu {
|
|
||||||
extra: format!(
|
|
||||||
"Failed to convert schema from raw schema, raw_schema={:?}",
|
|
||||||
table.table_info.meta.schema
|
|
||||||
),
|
|
||||||
})?;
|
|
||||||
|
|
||||||
let df_schema = Arc::new(schema.arrow_schema().clone().try_into().with_context(|_| {
|
|
||||||
DatafusionSnafu {
|
|
||||||
context: format!(
|
|
||||||
"Failed to convert arrow schema to datafusion schema, arrow_schema={:?}",
|
|
||||||
schema.arrow_schema()
|
|
||||||
),
|
|
||||||
}
|
|
||||||
})?);
|
|
||||||
|
|
||||||
let new_query = self
|
let new_query = self
|
||||||
.gen_query_with_time_window(engine.clone(), &table.table_info.meta)
|
.gen_query_with_time_window(engine.clone(), &table.meta.schema)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
let insert_into = if let Some((new_query, _column_cnt)) = new_query {
|
let insert_into = if let Some((new_query, _column_cnt)) = new_query {
|
||||||
|
// first check if all columns in input query exists in sink table
|
||||||
|
// since insert into ref to names in record batch generate by given query
|
||||||
|
let table_columns = df_schema
|
||||||
|
.columns()
|
||||||
|
.into_iter()
|
||||||
|
.map(|c| c.name)
|
||||||
|
.collect::<BTreeSet<_>>();
|
||||||
|
for column in new_query.schema().columns() {
|
||||||
|
if !table_columns.contains(column.name()) {
|
||||||
|
return InvalidQuerySnafu {
|
||||||
|
reason: format!(
|
||||||
|
"Column {} not found in sink table with columns {:?}",
|
||||||
|
column, table_columns
|
||||||
|
),
|
||||||
|
}
|
||||||
|
.fail();
|
||||||
|
}
|
||||||
|
}
|
||||||
// update_at& time index placeholder (if exists) should have default value
|
// update_at& time index placeholder (if exists) should have default value
|
||||||
LogicalPlan::Dml(DmlStatement::new(
|
LogicalPlan::Dml(DmlStatement::new(
|
||||||
datafusion_common::TableReference::Full {
|
datafusion_common::TableReference::Full {
|
||||||
@@ -251,6 +246,9 @@ impl BatchingTask {
|
|||||||
} else {
|
} else {
|
||||||
return Ok(None);
|
return Ok(None);
|
||||||
};
|
};
|
||||||
|
let insert_into = insert_into.recompute_schema().context(DatafusionSnafu {
|
||||||
|
context: "Failed to recompute schema",
|
||||||
|
})?;
|
||||||
Ok(Some(insert_into))
|
Ok(Some(insert_into))
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -259,14 +257,11 @@ impl BatchingTask {
|
|||||||
frontend_client: &Arc<FrontendClient>,
|
frontend_client: &Arc<FrontendClient>,
|
||||||
expr: CreateTableExpr,
|
expr: CreateTableExpr,
|
||||||
) -> Result<(), Error> {
|
) -> Result<(), Error> {
|
||||||
let db_client = frontend_client.get_database_client().await?;
|
let catalog = &self.config.sink_table_name[0];
|
||||||
db_client
|
let schema = &self.config.sink_table_name[1];
|
||||||
.database
|
frontend_client
|
||||||
.create(expr.clone())
|
.create(expr.clone(), catalog, schema)
|
||||||
.await
|
.await?;
|
||||||
.with_context(|_| InvalidRequestSnafu {
|
|
||||||
context: format!("Failed to create table with expr: {:?}", expr),
|
|
||||||
})?;
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -277,27 +272,78 @@ impl BatchingTask {
|
|||||||
) -> Result<Option<(u32, Duration)>, Error> {
|
) -> Result<Option<(u32, Duration)>, Error> {
|
||||||
let instant = Instant::now();
|
let instant = Instant::now();
|
||||||
let flow_id = self.config.flow_id;
|
let flow_id = self.config.flow_id;
|
||||||
let db_client = frontend_client.get_database_client().await?;
|
|
||||||
let peer_addr = db_client.peer.addr;
|
|
||||||
debug!(
|
debug!(
|
||||||
"Executing flow {flow_id}(expire_after={:?} secs) on {:?} with query {}",
|
"Executing flow {flow_id}(expire_after={:?} secs) with query {}",
|
||||||
self.config.expire_after, peer_addr, &plan
|
self.config.expire_after, &plan
|
||||||
);
|
);
|
||||||
|
|
||||||
let timer = METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME
|
let catalog = &self.config.sink_table_name[0];
|
||||||
|
let schema = &self.config.sink_table_name[1];
|
||||||
|
|
||||||
|
// fix all table ref by make it fully qualified, i.e. "table_name" => "catalog_name.schema_name.table_name"
|
||||||
|
let fixed_plan = plan
|
||||||
|
.clone()
|
||||||
|
.transform_down_with_subqueries(|p| {
|
||||||
|
if let LogicalPlan::TableScan(mut table_scan) = p {
|
||||||
|
let resolved = table_scan.table_name.resolve(catalog, schema);
|
||||||
|
table_scan.table_name = resolved.into();
|
||||||
|
Ok(Transformed::yes(LogicalPlan::TableScan(table_scan)))
|
||||||
|
} else {
|
||||||
|
Ok(Transformed::no(p))
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.with_context(|_| DatafusionSnafu {
|
||||||
|
context: format!("Failed to fix table ref in logical plan, plan={:?}", plan),
|
||||||
|
})?
|
||||||
|
.data;
|
||||||
|
|
||||||
|
let expanded_plan = CountWildcardRule::new()
|
||||||
|
.analyze(fixed_plan.clone(), &Default::default())
|
||||||
|
.with_context(|_| DatafusionSnafu {
|
||||||
|
context: format!(
|
||||||
|
"Failed to expand wildcard in logical plan, plan={:?}",
|
||||||
|
fixed_plan
|
||||||
|
),
|
||||||
|
})?;
|
||||||
|
|
||||||
|
let plan = expanded_plan;
|
||||||
|
let mut peer_desc = None;
|
||||||
|
|
||||||
|
let res = {
|
||||||
|
let _timer = METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME
|
||||||
.with_label_values(&[flow_id.to_string().as_str()])
|
.with_label_values(&[flow_id.to_string().as_str()])
|
||||||
.start_timer();
|
.start_timer();
|
||||||
|
|
||||||
|
// hack and special handling the insert logical plan
|
||||||
|
let req = if let Some((insert_to, insert_plan)) =
|
||||||
|
breakup_insert_plan(&plan, catalog, schema)
|
||||||
|
{
|
||||||
let message = DFLogicalSubstraitConvertor {}
|
let message = DFLogicalSubstraitConvertor {}
|
||||||
.encode(plan, DefaultSerializer)
|
.encode(&insert_plan, DefaultSerializer)
|
||||||
|
.context(SubstraitEncodeLogicalPlanSnafu)?;
|
||||||
|
api::v1::greptime_request::Request::Query(api::v1::QueryRequest {
|
||||||
|
query: Some(api::v1::query_request::Query::InsertIntoPlan(
|
||||||
|
api::v1::InsertIntoPlan {
|
||||||
|
table_name: Some(insert_to),
|
||||||
|
logical_plan: message.to_vec(),
|
||||||
|
},
|
||||||
|
)),
|
||||||
|
})
|
||||||
|
} else {
|
||||||
|
let message = DFLogicalSubstraitConvertor {}
|
||||||
|
.encode(&plan, DefaultSerializer)
|
||||||
.context(SubstraitEncodeLogicalPlanSnafu)?;
|
.context(SubstraitEncodeLogicalPlanSnafu)?;
|
||||||
|
|
||||||
let req = api::v1::greptime_request::Request::Query(api::v1::QueryRequest {
|
api::v1::greptime_request::Request::Query(api::v1::QueryRequest {
|
||||||
query: Some(api::v1::query_request::Query::LogicalPlan(message.to_vec())),
|
query: Some(api::v1::query_request::Query::LogicalPlan(message.to_vec())),
|
||||||
});
|
})
|
||||||
|
};
|
||||||
|
|
||||||
let res = db_client.database.handle(req).await;
|
frontend_client
|
||||||
drop(timer);
|
.handle(req, catalog, schema, &mut peer_desc)
|
||||||
|
.await
|
||||||
|
};
|
||||||
|
|
||||||
let elapsed = instant.elapsed();
|
let elapsed = instant.elapsed();
|
||||||
if let Ok(affected_rows) = &res {
|
if let Ok(affected_rows) = &res {
|
||||||
@@ -307,19 +353,23 @@ impl BatchingTask {
|
|||||||
);
|
);
|
||||||
} else if let Err(err) = &res {
|
} else if let Err(err) = &res {
|
||||||
warn!(
|
warn!(
|
||||||
"Failed to execute Flow {flow_id} on frontend {}, result: {err:?}, elapsed: {:?} with query: {}",
|
"Failed to execute Flow {flow_id} on frontend {:?}, result: {err:?}, elapsed: {:?} with query: {}",
|
||||||
peer_addr, elapsed, &plan
|
peer_desc, elapsed, &plan
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
// record slow query
|
// record slow query
|
||||||
if elapsed >= SLOW_QUERY_THRESHOLD {
|
if elapsed >= SLOW_QUERY_THRESHOLD {
|
||||||
warn!(
|
warn!(
|
||||||
"Flow {flow_id} on frontend {} executed for {:?} before complete, query: {}",
|
"Flow {flow_id} on frontend {:?} executed for {:?} before complete, query: {}",
|
||||||
peer_addr, elapsed, &plan
|
peer_desc, elapsed, &plan
|
||||||
);
|
);
|
||||||
METRIC_FLOW_BATCHING_ENGINE_SLOW_QUERY
|
METRIC_FLOW_BATCHING_ENGINE_SLOW_QUERY
|
||||||
.with_label_values(&[flow_id.to_string().as_str(), &plan.to_string(), &peer_addr])
|
.with_label_values(&[
|
||||||
|
flow_id.to_string().as_str(),
|
||||||
|
&plan.to_string(),
|
||||||
|
&peer_desc.unwrap_or_default().to_string(),
|
||||||
|
])
|
||||||
.observe(elapsed.as_secs_f64());
|
.observe(elapsed.as_secs_f64());
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -328,12 +378,7 @@ impl BatchingTask {
|
|||||||
.unwrap()
|
.unwrap()
|
||||||
.after_query_exec(elapsed, res.is_ok());
|
.after_query_exec(elapsed, res.is_ok());
|
||||||
|
|
||||||
let res = res.context(InvalidRequestSnafu {
|
let res = res?;
|
||||||
context: format!(
|
|
||||||
"Failed to execute query for flow={}: \'{}\'",
|
|
||||||
self.config.flow_id, &plan
|
|
||||||
),
|
|
||||||
})?;
|
|
||||||
|
|
||||||
Ok(Some((res, elapsed)))
|
Ok(Some((res, elapsed)))
|
||||||
}
|
}
|
||||||
@@ -372,7 +417,10 @@ impl BatchingTask {
|
|||||||
}
|
}
|
||||||
Err(TryRecvError::Empty) => (),
|
Err(TryRecvError::Empty) => (),
|
||||||
}
|
}
|
||||||
state.get_next_start_query_time(Some(DEFAULT_BATCHING_ENGINE_QUERY_TIMEOUT))
|
state.get_next_start_query_time(
|
||||||
|
self.config.flow_id,
|
||||||
|
Some(DEFAULT_BATCHING_ENGINE_QUERY_TIMEOUT),
|
||||||
|
)
|
||||||
};
|
};
|
||||||
tokio::time::sleep_until(sleep_until).await;
|
tokio::time::sleep_until(sleep_until).await;
|
||||||
}
|
}
|
||||||
@@ -386,14 +434,18 @@ impl BatchingTask {
|
|||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
// TODO(discord9): this error should have better place to go, but for now just print error, also more context is needed
|
// TODO(discord9): this error should have better place to go, but for now just print error, also more context is needed
|
||||||
Err(err) => match new_query {
|
Err(err) => {
|
||||||
|
match new_query {
|
||||||
Some(query) => {
|
Some(query) => {
|
||||||
common_telemetry::error!(err; "Failed to execute query for flow={} with query: {query}", self.config.flow_id)
|
common_telemetry::error!(err; "Failed to execute query for flow={} with query: {query}", self.config.flow_id)
|
||||||
}
|
}
|
||||||
None => {
|
None => {
|
||||||
common_telemetry::error!(err; "Failed to generate query for flow={}", self.config.flow_id)
|
common_telemetry::error!(err; "Failed to generate query for flow={}", self.config.flow_id)
|
||||||
}
|
}
|
||||||
},
|
}
|
||||||
|
// also sleep for a little while before try again to prevent flooding logs
|
||||||
|
tokio::time::sleep(MIN_REFRESH_DURATION).await;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -418,7 +470,7 @@ impl BatchingTask {
|
|||||||
async fn gen_query_with_time_window(
|
async fn gen_query_with_time_window(
|
||||||
&self,
|
&self,
|
||||||
engine: QueryEngineRef,
|
engine: QueryEngineRef,
|
||||||
sink_table_meta: &RawTableMeta,
|
sink_table_schema: &Arc<Schema>,
|
||||||
) -> Result<Option<(LogicalPlan, usize)>, Error> {
|
) -> Result<Option<(LogicalPlan, usize)>, Error> {
|
||||||
let query_ctx = self.state.read().unwrap().query_ctx.clone();
|
let query_ctx = self.state.read().unwrap().query_ctx.clone();
|
||||||
let start = SystemTime::now();
|
let start = SystemTime::now();
|
||||||
@@ -477,9 +529,11 @@ impl BatchingTask {
|
|||||||
debug!(
|
debug!(
|
||||||
"Flow id = {:?}, can't get window size: precise_lower_bound={expire_time_window_bound:?}, using the same query", self.config.flow_id
|
"Flow id = {:?}, can't get window size: precise_lower_bound={expire_time_window_bound:?}, using the same query", self.config.flow_id
|
||||||
);
|
);
|
||||||
|
// clean dirty time window too, this could be from create flow's check_execute
|
||||||
|
self.state.write().unwrap().dirty_time_windows.clean();
|
||||||
|
|
||||||
let mut add_auto_column =
|
let mut add_auto_column =
|
||||||
AddAutoColumnRewriter::new(sink_table_meta.schema.clone());
|
AddAutoColumnRewriter::new(sink_table_schema.clone());
|
||||||
let plan = self
|
let plan = self
|
||||||
.config
|
.config
|
||||||
.plan
|
.plan
|
||||||
@@ -515,8 +569,10 @@ impl BatchingTask {
|
|||||||
return Ok(None);
|
return Ok(None);
|
||||||
};
|
};
|
||||||
|
|
||||||
|
// TODO(discord9): add auto column or not? This might break compatibility for auto created sink table before this, but that's ok right?
|
||||||
|
|
||||||
let mut add_filter = AddFilterRewriter::new(expr);
|
let mut add_filter = AddFilterRewriter::new(expr);
|
||||||
let mut add_auto_column = AddAutoColumnRewriter::new(sink_table_meta.schema.clone());
|
let mut add_auto_column = AddAutoColumnRewriter::new(sink_table_schema.clone());
|
||||||
// make a not optimized plan for clearer unparse
|
// make a not optimized plan for clearer unparse
|
||||||
let plan = sql_to_df_plan(query_ctx.clone(), engine.clone(), &self.config.query, false)
|
let plan = sql_to_df_plan(query_ctx.clone(), engine.clone(), &self.config.query, false)
|
||||||
.await?;
|
.await?;
|
||||||
@@ -534,7 +590,7 @@ impl BatchingTask {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// auto created table have a auto added column `update_at`, and optional have a `AUTO_CREATED_PLACEHOLDER_TS_COL` column for time index placeholder if no timestamp column is specified
|
// auto created table have a auto added column `update_at`, and optional have a `AUTO_CREATED_PLACEHOLDER_TS_COL` column for time index placeholder if no timestamp column is specified
|
||||||
// TODO(discord9): unit test
|
// TODO(discord9): for now no default value is set for auto added column for compatibility reason with streaming mode, but this might change in favor of simpler code?
|
||||||
fn create_table_with_expr(
|
fn create_table_with_expr(
|
||||||
plan: &LogicalPlan,
|
plan: &LogicalPlan,
|
||||||
sink_table_name: &[String; 3],
|
sink_table_name: &[String; 3],
|
||||||
@@ -558,11 +614,7 @@ fn create_table_with_expr(
|
|||||||
AUTO_CREATED_UPDATE_AT_TS_COL,
|
AUTO_CREATED_UPDATE_AT_TS_COL,
|
||||||
ConcreteDataType::timestamp_millisecond_datatype(),
|
ConcreteDataType::timestamp_millisecond_datatype(),
|
||||||
true,
|
true,
|
||||||
)
|
);
|
||||||
.with_default_constraint(Some(ColumnDefaultConstraint::Function(NOW_FN.to_string())))
|
|
||||||
.context(DatatypesSnafu {
|
|
||||||
extra: "Failed to build column `update_at TimestampMillisecond default now()`",
|
|
||||||
})?;
|
|
||||||
column_schemas.push(update_at_schema);
|
column_schemas.push(update_at_schema);
|
||||||
|
|
||||||
let time_index = if let Some(time_index) = first_time_stamp {
|
let time_index = if let Some(time_index) = first_time_stamp {
|
||||||
@@ -574,16 +626,7 @@ fn create_table_with_expr(
|
|||||||
ConcreteDataType::timestamp_millisecond_datatype(),
|
ConcreteDataType::timestamp_millisecond_datatype(),
|
||||||
false,
|
false,
|
||||||
)
|
)
|
||||||
.with_time_index(true)
|
.with_time_index(true),
|
||||||
.with_default_constraint(Some(ColumnDefaultConstraint::Value(Value::Timestamp(
|
|
||||||
Timestamp::new_millisecond(0),
|
|
||||||
))))
|
|
||||||
.context(DatatypesSnafu {
|
|
||||||
extra: format!(
|
|
||||||
"Failed to build column `{} TimestampMillisecond TIME INDEX default 0`",
|
|
||||||
AUTO_CREATED_PLACEHOLDER_TS_COL
|
|
||||||
),
|
|
||||||
})?,
|
|
||||||
);
|
);
|
||||||
AUTO_CREATED_PLACEHOLDER_TS_COL.to_string()
|
AUTO_CREATED_PLACEHOLDER_TS_COL.to_string()
|
||||||
};
|
};
|
||||||
@@ -675,20 +718,14 @@ mod test {
|
|||||||
AUTO_CREATED_UPDATE_AT_TS_COL,
|
AUTO_CREATED_UPDATE_AT_TS_COL,
|
||||||
ConcreteDataType::timestamp_millisecond_datatype(),
|
ConcreteDataType::timestamp_millisecond_datatype(),
|
||||||
true,
|
true,
|
||||||
)
|
);
|
||||||
.with_default_constraint(Some(ColumnDefaultConstraint::Function(NOW_FN.to_string())))
|
|
||||||
.unwrap();
|
|
||||||
|
|
||||||
let ts_placeholder_schema = ColumnSchema::new(
|
let ts_placeholder_schema = ColumnSchema::new(
|
||||||
AUTO_CREATED_PLACEHOLDER_TS_COL,
|
AUTO_CREATED_PLACEHOLDER_TS_COL,
|
||||||
ConcreteDataType::timestamp_millisecond_datatype(),
|
ConcreteDataType::timestamp_millisecond_datatype(),
|
||||||
false,
|
false,
|
||||||
)
|
)
|
||||||
.with_time_index(true)
|
.with_time_index(true);
|
||||||
.with_default_constraint(Some(ColumnDefaultConstraint::Value(Value::Timestamp(
|
|
||||||
Timestamp::new_millisecond(0),
|
|
||||||
))))
|
|
||||||
.unwrap();
|
|
||||||
|
|
||||||
let testcases = vec![
|
let testcases = vec![
|
||||||
TestCase {
|
TestCase {
|
||||||
|
|||||||
@@ -72,6 +72,17 @@ pub struct TimeWindowExpr {
|
|||||||
df_schema: DFSchema,
|
df_schema: DFSchema,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl std::fmt::Display for TimeWindowExpr {
|
||||||
|
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||||
|
f.debug_struct("TimeWindowExpr")
|
||||||
|
.field("phy_expr", &self.phy_expr.to_string())
|
||||||
|
.field("column_name", &self.column_name)
|
||||||
|
.field("logical_expr", &self.logical_expr.to_string())
|
||||||
|
.field("df_schema", &self.df_schema)
|
||||||
|
.finish()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
impl TimeWindowExpr {
|
impl TimeWindowExpr {
|
||||||
pub fn from_expr(
|
pub fn from_expr(
|
||||||
expr: &Expr,
|
expr: &Expr,
|
||||||
@@ -256,7 +267,7 @@ fn columnar_to_ts_vector(columnar: &ColumnarValue) -> Result<Vec<Option<Timestam
|
|||||||
Ok(val)
|
Ok(val)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Return (the column name of time index column, the time window expr, the expected time unit of time index column, the expr's schema for evaluating the time window)
|
/// Return (`the column name of time index column`, `the time window expr`, `the expected time unit of time index column`, `the expr's schema for evaluating the time window`)
|
||||||
///
|
///
|
||||||
/// The time window expr is expected to have one input column with Timestamp type, and also return Timestamp type, the time window expr is expected
|
/// The time window expr is expected to have one input column with Timestamp type, and also return Timestamp type, the time window expr is expected
|
||||||
/// to be monotonic increasing and appears in the innermost GROUP BY clause
|
/// to be monotonic increasing and appears in the innermost GROUP BY clause
|
||||||
|
|||||||
@@ -14,29 +14,63 @@
|
|||||||
|
|
||||||
//! some utils for helping with batching mode
|
//! some utils for helping with batching mode
|
||||||
|
|
||||||
use std::collections::HashSet;
|
use std::collections::{BTreeSet, HashSet};
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
use catalog::CatalogManagerRef;
|
||||||
use common_error::ext::BoxedError;
|
use common_error::ext::BoxedError;
|
||||||
use common_telemetry::{debug, info};
|
use common_telemetry::debug;
|
||||||
use datafusion::error::Result as DfResult;
|
use datafusion::error::Result as DfResult;
|
||||||
use datafusion::logical_expr::Expr;
|
use datafusion::logical_expr::Expr;
|
||||||
use datafusion::sql::unparser::Unparser;
|
use datafusion::sql::unparser::Unparser;
|
||||||
use datafusion_common::tree_node::{
|
use datafusion_common::tree_node::{
|
||||||
Transformed, TreeNodeRecursion, TreeNodeRewriter, TreeNodeVisitor,
|
Transformed, TreeNodeRecursion, TreeNodeRewriter, TreeNodeVisitor,
|
||||||
};
|
};
|
||||||
use datafusion_common::DataFusionError;
|
use datafusion_common::{DFSchema, DataFusionError, ScalarValue};
|
||||||
use datafusion_expr::{Distinct, LogicalPlan};
|
use datafusion_expr::{Distinct, LogicalPlan, Projection};
|
||||||
use datatypes::schema::RawSchema;
|
use datatypes::schema::SchemaRef;
|
||||||
use query::parser::QueryLanguageParser;
|
use query::parser::QueryLanguageParser;
|
||||||
use query::QueryEngineRef;
|
use query::QueryEngineRef;
|
||||||
use session::context::QueryContextRef;
|
use session::context::QueryContextRef;
|
||||||
use snafu::ResultExt;
|
use snafu::{OptionExt, ResultExt};
|
||||||
|
use table::metadata::TableInfo;
|
||||||
|
|
||||||
use crate::adapter::AUTO_CREATED_PLACEHOLDER_TS_COL;
|
use crate::adapter::AUTO_CREATED_PLACEHOLDER_TS_COL;
|
||||||
use crate::df_optimizer::apply_df_optimizer;
|
use crate::df_optimizer::apply_df_optimizer;
|
||||||
use crate::error::{DatafusionSnafu, ExternalSnafu};
|
use crate::error::{DatafusionSnafu, ExternalSnafu, TableNotFoundSnafu};
|
||||||
use crate::Error;
|
use crate::{Error, TableName};
|
||||||
|
|
||||||
|
pub async fn get_table_info_df_schema(
|
||||||
|
catalog_mr: CatalogManagerRef,
|
||||||
|
table_name: TableName,
|
||||||
|
) -> Result<(Arc<TableInfo>, Arc<DFSchema>), Error> {
|
||||||
|
let full_table_name = table_name.clone().join(".");
|
||||||
|
let table = catalog_mr
|
||||||
|
.table(&table_name[0], &table_name[1], &table_name[2], None)
|
||||||
|
.await
|
||||||
|
.map_err(BoxedError::new)
|
||||||
|
.context(ExternalSnafu)?
|
||||||
|
.with_context(|| TableNotFoundSnafu {
|
||||||
|
name: full_table_name.clone(),
|
||||||
|
})?;
|
||||||
|
let table_info = table.table_info().clone();
|
||||||
|
|
||||||
|
let schema = table_info.meta.schema.clone();
|
||||||
|
|
||||||
|
let df_schema: Arc<DFSchema> = Arc::new(
|
||||||
|
schema
|
||||||
|
.arrow_schema()
|
||||||
|
.clone()
|
||||||
|
.try_into()
|
||||||
|
.with_context(|_| DatafusionSnafu {
|
||||||
|
context: format!(
|
||||||
|
"Failed to convert arrow schema to datafusion schema, arrow_schema={:?}",
|
||||||
|
schema.arrow_schema()
|
||||||
|
),
|
||||||
|
})?,
|
||||||
|
);
|
||||||
|
Ok((table_info, df_schema))
|
||||||
|
}
|
||||||
|
|
||||||
/// Convert sql to datafusion logical plan
|
/// Convert sql to datafusion logical plan
|
||||||
pub async fn sql_to_df_plan(
|
pub async fn sql_to_df_plan(
|
||||||
@@ -164,14 +198,16 @@ impl TreeNodeVisitor<'_> for FindGroupByFinalName {
|
|||||||
/// (which doesn't necessary need to have exact name just need to be a extra timestamp column)
|
/// (which doesn't necessary need to have exact name just need to be a extra timestamp column)
|
||||||
/// and `__ts_placeholder`(this column need to have exact this name and be a timestamp)
|
/// and `__ts_placeholder`(this column need to have exact this name and be a timestamp)
|
||||||
/// with values like `now()` and `0`
|
/// with values like `now()` and `0`
|
||||||
|
///
|
||||||
|
/// it also give existing columns alias to column in sink table if needed
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub struct AddAutoColumnRewriter {
|
pub struct AddAutoColumnRewriter {
|
||||||
pub schema: RawSchema,
|
pub schema: SchemaRef,
|
||||||
pub is_rewritten: bool,
|
pub is_rewritten: bool,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl AddAutoColumnRewriter {
|
impl AddAutoColumnRewriter {
|
||||||
pub fn new(schema: RawSchema) -> Self {
|
pub fn new(schema: SchemaRef) -> Self {
|
||||||
Self {
|
Self {
|
||||||
schema,
|
schema,
|
||||||
is_rewritten: false,
|
is_rewritten: false,
|
||||||
@@ -181,37 +217,97 @@ impl AddAutoColumnRewriter {
|
|||||||
|
|
||||||
impl TreeNodeRewriter for AddAutoColumnRewriter {
|
impl TreeNodeRewriter for AddAutoColumnRewriter {
|
||||||
type Node = LogicalPlan;
|
type Node = LogicalPlan;
|
||||||
fn f_down(&mut self, node: Self::Node) -> DfResult<Transformed<Self::Node>> {
|
fn f_down(&mut self, mut node: Self::Node) -> DfResult<Transformed<Self::Node>> {
|
||||||
if self.is_rewritten {
|
if self.is_rewritten {
|
||||||
return Ok(Transformed::no(node));
|
return Ok(Transformed::no(node));
|
||||||
}
|
}
|
||||||
|
|
||||||
// if is distinct all, go one level down
|
// if is distinct all, wrap it in a projection
|
||||||
if let LogicalPlan::Distinct(Distinct::All(_)) = node {
|
if let LogicalPlan::Distinct(Distinct::All(_)) = &node {
|
||||||
return Ok(Transformed::no(node));
|
let mut exprs = vec![];
|
||||||
|
|
||||||
|
for field in node.schema().fields().iter() {
|
||||||
|
exprs.push(Expr::Column(datafusion::common::Column::new_unqualified(
|
||||||
|
field.name(),
|
||||||
|
)));
|
||||||
}
|
}
|
||||||
|
|
||||||
// FIXME(discord9): just read plan.expr and do stuffs
|
let projection =
|
||||||
let mut exprs = node.expressions();
|
LogicalPlan::Projection(Projection::try_new(exprs, Arc::new(node.clone()))?);
|
||||||
|
|
||||||
|
node = projection;
|
||||||
|
}
|
||||||
|
// handle table_scan by wrap it in a projection
|
||||||
|
else if let LogicalPlan::TableScan(table_scan) = node {
|
||||||
|
let mut exprs = vec![];
|
||||||
|
|
||||||
|
for field in table_scan.projected_schema.fields().iter() {
|
||||||
|
exprs.push(Expr::Column(datafusion::common::Column::new(
|
||||||
|
Some(table_scan.table_name.clone()),
|
||||||
|
field.name(),
|
||||||
|
)));
|
||||||
|
}
|
||||||
|
|
||||||
|
let projection = LogicalPlan::Projection(Projection::try_new(
|
||||||
|
exprs,
|
||||||
|
Arc::new(LogicalPlan::TableScan(table_scan)),
|
||||||
|
)?);
|
||||||
|
|
||||||
|
node = projection;
|
||||||
|
}
|
||||||
|
|
||||||
|
// only do rewrite if found the outermost projection
|
||||||
|
let mut exprs = if let LogicalPlan::Projection(project) = &node {
|
||||||
|
project.expr.clone()
|
||||||
|
} else {
|
||||||
|
return Ok(Transformed::no(node));
|
||||||
|
};
|
||||||
|
|
||||||
|
let all_names = self
|
||||||
|
.schema
|
||||||
|
.column_schemas()
|
||||||
|
.iter()
|
||||||
|
.map(|c| c.name.clone())
|
||||||
|
.collect::<BTreeSet<_>>();
|
||||||
|
// first match by position
|
||||||
|
for (idx, expr) in exprs.iter_mut().enumerate() {
|
||||||
|
if !all_names.contains(&expr.qualified_name().1) {
|
||||||
|
if let Some(col_name) = self
|
||||||
|
.schema
|
||||||
|
.column_schemas()
|
||||||
|
.get(idx)
|
||||||
|
.map(|c| c.name.clone())
|
||||||
|
{
|
||||||
|
// if the data type mismatched, later check_execute will error out
|
||||||
|
// hence no need to check it here, beside, optimize pass might be able to cast it
|
||||||
|
// so checking here is not necessary
|
||||||
|
*expr = expr.clone().alias(col_name);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// add columns if have different column count
|
// add columns if have different column count
|
||||||
let query_col_cnt = exprs.len();
|
let query_col_cnt = exprs.len();
|
||||||
let table_col_cnt = self.schema.column_schemas.len();
|
let table_col_cnt = self.schema.column_schemas().len();
|
||||||
info!("query_col_cnt={query_col_cnt}, table_col_cnt={table_col_cnt}");
|
debug!("query_col_cnt={query_col_cnt}, table_col_cnt={table_col_cnt}");
|
||||||
|
|
||||||
|
let placeholder_ts_expr =
|
||||||
|
datafusion::logical_expr::lit(ScalarValue::TimestampMillisecond(Some(0), None))
|
||||||
|
.alias(AUTO_CREATED_PLACEHOLDER_TS_COL);
|
||||||
|
|
||||||
if query_col_cnt == table_col_cnt {
|
if query_col_cnt == table_col_cnt {
|
||||||
self.is_rewritten = true;
|
// still need to add alias, see below
|
||||||
return Ok(Transformed::no(node));
|
|
||||||
} else if query_col_cnt + 1 == table_col_cnt {
|
} else if query_col_cnt + 1 == table_col_cnt {
|
||||||
let last_col_schema = self.schema.column_schemas.last().unwrap();
|
let last_col_schema = self.schema.column_schemas().last().unwrap();
|
||||||
|
|
||||||
// if time index column is auto created add it
|
// if time index column is auto created add it
|
||||||
if last_col_schema.name == AUTO_CREATED_PLACEHOLDER_TS_COL
|
if last_col_schema.name == AUTO_CREATED_PLACEHOLDER_TS_COL
|
||||||
&& self.schema.timestamp_index == Some(table_col_cnt - 1)
|
&& self.schema.timestamp_index() == Some(table_col_cnt - 1)
|
||||||
{
|
{
|
||||||
exprs.push(datafusion::logical_expr::lit(0));
|
exprs.push(placeholder_ts_expr);
|
||||||
} else if last_col_schema.data_type.is_timestamp() {
|
} else if last_col_schema.data_type.is_timestamp() {
|
||||||
// is the update at column
|
// is the update at column
|
||||||
exprs.push(datafusion::prelude::now());
|
exprs.push(datafusion::prelude::now().alias(&last_col_schema.name));
|
||||||
} else {
|
} else {
|
||||||
// helpful error message
|
// helpful error message
|
||||||
return Err(DataFusionError::Plan(format!(
|
return Err(DataFusionError::Plan(format!(
|
||||||
@@ -221,11 +317,11 @@ impl TreeNodeRewriter for AddAutoColumnRewriter {
|
|||||||
)));
|
)));
|
||||||
}
|
}
|
||||||
} else if query_col_cnt + 2 == table_col_cnt {
|
} else if query_col_cnt + 2 == table_col_cnt {
|
||||||
let mut col_iter = self.schema.column_schemas.iter().rev();
|
let mut col_iter = self.schema.column_schemas().iter().rev();
|
||||||
let last_col_schema = col_iter.next().unwrap();
|
let last_col_schema = col_iter.next().unwrap();
|
||||||
let second_last_col_schema = col_iter.next().unwrap();
|
let second_last_col_schema = col_iter.next().unwrap();
|
||||||
if second_last_col_schema.data_type.is_timestamp() {
|
if second_last_col_schema.data_type.is_timestamp() {
|
||||||
exprs.push(datafusion::prelude::now());
|
exprs.push(datafusion::prelude::now().alias(&second_last_col_schema.name));
|
||||||
} else {
|
} else {
|
||||||
return Err(DataFusionError::Plan(format!(
|
return Err(DataFusionError::Plan(format!(
|
||||||
"Expect the second last column in the table to be timestamp column, found column {} with type {:?}",
|
"Expect the second last column in the table to be timestamp column, found column {} with type {:?}",
|
||||||
@@ -235,9 +331,9 @@ impl TreeNodeRewriter for AddAutoColumnRewriter {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if last_col_schema.name == AUTO_CREATED_PLACEHOLDER_TS_COL
|
if last_col_schema.name == AUTO_CREATED_PLACEHOLDER_TS_COL
|
||||||
&& self.schema.timestamp_index == Some(table_col_cnt - 1)
|
&& self.schema.timestamp_index() == Some(table_col_cnt - 1)
|
||||||
{
|
{
|
||||||
exprs.push(datafusion::logical_expr::lit(0));
|
exprs.push(placeholder_ts_expr);
|
||||||
} else {
|
} else {
|
||||||
return Err(DataFusionError::Plan(format!(
|
return Err(DataFusionError::Plan(format!(
|
||||||
"Expect timestamp column {}, found {:?}",
|
"Expect timestamp column {}, found {:?}",
|
||||||
@@ -246,8 +342,8 @@ impl TreeNodeRewriter for AddAutoColumnRewriter {
|
|||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
return Err(DataFusionError::Plan(format!(
|
return Err(DataFusionError::Plan(format!(
|
||||||
"Expect table have 0,1 or 2 columns more than query columns, found {} query columns {:?}, {} table columns {:?}",
|
"Expect table have 0,1 or 2 columns more than query columns, found {} query columns {:?}, {} table columns {:?} at node {:?}",
|
||||||
query_col_cnt, node.expressions(), table_col_cnt, self.schema.column_schemas
|
query_col_cnt, exprs, table_col_cnt, self.schema.column_schemas(), node
|
||||||
)));
|
)));
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -255,6 +351,11 @@ impl TreeNodeRewriter for AddAutoColumnRewriter {
|
|||||||
let new_plan = node.with_new_exprs(exprs, node.inputs().into_iter().cloned().collect())?;
|
let new_plan = node.with_new_exprs(exprs, node.inputs().into_iter().cloned().collect())?;
|
||||||
Ok(Transformed::yes(new_plan))
|
Ok(Transformed::yes(new_plan))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// We might add new columns, so we need to recompute the schema
|
||||||
|
fn f_up(&mut self, node: Self::Node) -> DfResult<Transformed<Self::Node>> {
|
||||||
|
node.recompute_schema().map(Transformed::yes)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO(discord9): a method to found out the precise time window
|
// TODO(discord9): a method to found out the precise time window
|
||||||
@@ -301,9 +402,11 @@ impl TreeNodeRewriter for AddFilterRewriter {
|
|||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod test {
|
mod test {
|
||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
use datafusion_common::tree_node::TreeNode as _;
|
use datafusion_common::tree_node::TreeNode as _;
|
||||||
use datatypes::prelude::ConcreteDataType;
|
use datatypes::prelude::ConcreteDataType;
|
||||||
use datatypes::schema::ColumnSchema;
|
use datatypes::schema::{ColumnSchema, Schema};
|
||||||
use pretty_assertions::assert_eq;
|
use pretty_assertions::assert_eq;
|
||||||
use session::context::QueryContext;
|
use session::context::QueryContext;
|
||||||
|
|
||||||
@@ -386,7 +489,7 @@ mod test {
|
|||||||
// add update_at
|
// add update_at
|
||||||
(
|
(
|
||||||
"SELECT number FROM numbers_with_ts",
|
"SELECT number FROM numbers_with_ts",
|
||||||
Ok("SELECT numbers_with_ts.number, now() FROM numbers_with_ts"),
|
Ok("SELECT numbers_with_ts.number, now() AS ts FROM numbers_with_ts"),
|
||||||
vec![
|
vec![
|
||||||
ColumnSchema::new("number", ConcreteDataType::int32_datatype(), true),
|
ColumnSchema::new("number", ConcreteDataType::int32_datatype(), true),
|
||||||
ColumnSchema::new(
|
ColumnSchema::new(
|
||||||
@@ -400,7 +503,7 @@ mod test {
|
|||||||
// add ts placeholder
|
// add ts placeholder
|
||||||
(
|
(
|
||||||
"SELECT number FROM numbers_with_ts",
|
"SELECT number FROM numbers_with_ts",
|
||||||
Ok("SELECT numbers_with_ts.number, 0 FROM numbers_with_ts"),
|
Ok("SELECT numbers_with_ts.number, CAST('1970-01-01 00:00:00' AS TIMESTAMP) AS __ts_placeholder FROM numbers_with_ts"),
|
||||||
vec![
|
vec![
|
||||||
ColumnSchema::new("number", ConcreteDataType::int32_datatype(), true),
|
ColumnSchema::new("number", ConcreteDataType::int32_datatype(), true),
|
||||||
ColumnSchema::new(
|
ColumnSchema::new(
|
||||||
@@ -428,7 +531,7 @@ mod test {
|
|||||||
// add update_at and ts placeholder
|
// add update_at and ts placeholder
|
||||||
(
|
(
|
||||||
"SELECT number FROM numbers_with_ts",
|
"SELECT number FROM numbers_with_ts",
|
||||||
Ok("SELECT numbers_with_ts.number, now(), 0 FROM numbers_with_ts"),
|
Ok("SELECT numbers_with_ts.number, now() AS update_at, CAST('1970-01-01 00:00:00' AS TIMESTAMP) AS __ts_placeholder FROM numbers_with_ts"),
|
||||||
vec![
|
vec![
|
||||||
ColumnSchema::new("number", ConcreteDataType::int32_datatype(), true),
|
ColumnSchema::new("number", ConcreteDataType::int32_datatype(), true),
|
||||||
ColumnSchema::new(
|
ColumnSchema::new(
|
||||||
@@ -447,7 +550,7 @@ mod test {
|
|||||||
// add ts placeholder
|
// add ts placeholder
|
||||||
(
|
(
|
||||||
"SELECT number, ts FROM numbers_with_ts",
|
"SELECT number, ts FROM numbers_with_ts",
|
||||||
Ok("SELECT numbers_with_ts.number, numbers_with_ts.ts, 0 FROM numbers_with_ts"),
|
Ok("SELECT numbers_with_ts.number, numbers_with_ts.ts AS update_at, CAST('1970-01-01 00:00:00' AS TIMESTAMP) AS __ts_placeholder FROM numbers_with_ts"),
|
||||||
vec![
|
vec![
|
||||||
ColumnSchema::new("number", ConcreteDataType::int32_datatype(), true),
|
ColumnSchema::new("number", ConcreteDataType::int32_datatype(), true),
|
||||||
ColumnSchema::new(
|
ColumnSchema::new(
|
||||||
@@ -466,7 +569,7 @@ mod test {
|
|||||||
// add update_at after time index column
|
// add update_at after time index column
|
||||||
(
|
(
|
||||||
"SELECT number, ts FROM numbers_with_ts",
|
"SELECT number, ts FROM numbers_with_ts",
|
||||||
Ok("SELECT numbers_with_ts.number, numbers_with_ts.ts, now() FROM numbers_with_ts"),
|
Ok("SELECT numbers_with_ts.number, numbers_with_ts.ts, now() AS update_atat FROM numbers_with_ts"),
|
||||||
vec![
|
vec![
|
||||||
ColumnSchema::new("number", ConcreteDataType::int32_datatype(), true),
|
ColumnSchema::new("number", ConcreteDataType::int32_datatype(), true),
|
||||||
ColumnSchema::new(
|
ColumnSchema::new(
|
||||||
@@ -528,8 +631,8 @@ mod test {
|
|||||||
let query_engine = create_test_query_engine();
|
let query_engine = create_test_query_engine();
|
||||||
let ctx = QueryContext::arc();
|
let ctx = QueryContext::arc();
|
||||||
for (before, after, column_schemas) in testcases {
|
for (before, after, column_schemas) in testcases {
|
||||||
let raw_schema = RawSchema::new(column_schemas);
|
let schema = Arc::new(Schema::new(column_schemas));
|
||||||
let mut add_auto_column_rewriter = AddAutoColumnRewriter::new(raw_schema);
|
let mut add_auto_column_rewriter = AddAutoColumnRewriter::new(schema);
|
||||||
|
|
||||||
let plan = sql_to_df_plan(ctx.clone(), query_engine.clone(), before, false)
|
let plan = sql_to_df_plan(ctx.clone(), query_engine.clone(), before, false)
|
||||||
.await
|
.await
|
||||||
|
|||||||
@@ -49,6 +49,8 @@ pub trait FlowEngine {
|
|||||||
async fn flush_flow(&self, flow_id: FlowId) -> Result<usize, Error>;
|
async fn flush_flow(&self, flow_id: FlowId) -> Result<usize, Error>;
|
||||||
/// Check if the flow exists
|
/// Check if the flow exists
|
||||||
async fn flow_exist(&self, flow_id: FlowId) -> Result<bool, Error>;
|
async fn flow_exist(&self, flow_id: FlowId) -> Result<bool, Error>;
|
||||||
|
/// List all flows
|
||||||
|
async fn list_flows(&self) -> Result<impl IntoIterator<Item = FlowId>, Error>;
|
||||||
/// Handle the insert requests for the flow
|
/// Handle the insert requests for the flow
|
||||||
async fn handle_flow_inserts(
|
async fn handle_flow_inserts(
|
||||||
&self,
|
&self,
|
||||||
|
|||||||
@@ -149,6 +149,13 @@ pub enum Error {
|
|||||||
location: Location,
|
location: Location,
|
||||||
},
|
},
|
||||||
|
|
||||||
|
#[snafu(display("Unsupported: {reason}"))]
|
||||||
|
Unsupported {
|
||||||
|
reason: String,
|
||||||
|
#[snafu(implicit)]
|
||||||
|
location: Location,
|
||||||
|
},
|
||||||
|
|
||||||
#[snafu(display("Unsupported temporal filter: {reason}"))]
|
#[snafu(display("Unsupported temporal filter: {reason}"))]
|
||||||
UnsupportedTemporalFilter {
|
UnsupportedTemporalFilter {
|
||||||
reason: String,
|
reason: String,
|
||||||
@@ -189,6 +196,25 @@ pub enum Error {
|
|||||||
location: Location,
|
location: Location,
|
||||||
},
|
},
|
||||||
|
|
||||||
|
#[snafu(display("Illegal check task state: {reason}"))]
|
||||||
|
IllegalCheckTaskState {
|
||||||
|
reason: String,
|
||||||
|
#[snafu(implicit)]
|
||||||
|
location: Location,
|
||||||
|
},
|
||||||
|
|
||||||
|
#[snafu(display(
|
||||||
|
"Failed to sync with check task for flow {} with allow_drop={}",
|
||||||
|
flow_id,
|
||||||
|
allow_drop
|
||||||
|
))]
|
||||||
|
SyncCheckTask {
|
||||||
|
flow_id: FlowId,
|
||||||
|
allow_drop: bool,
|
||||||
|
#[snafu(implicit)]
|
||||||
|
location: Location,
|
||||||
|
},
|
||||||
|
|
||||||
#[snafu(display("Failed to start server"))]
|
#[snafu(display("Failed to start server"))]
|
||||||
StartServer {
|
StartServer {
|
||||||
#[snafu(implicit)]
|
#[snafu(implicit)]
|
||||||
@@ -280,10 +306,12 @@ impl ErrorExt for Error {
|
|||||||
Self::CreateFlow { .. } | Self::Arrow { .. } | Self::Time { .. } => {
|
Self::CreateFlow { .. } | Self::Arrow { .. } | Self::Time { .. } => {
|
||||||
StatusCode::EngineExecuteQuery
|
StatusCode::EngineExecuteQuery
|
||||||
}
|
}
|
||||||
Self::Unexpected { .. } => StatusCode::Unexpected,
|
Self::Unexpected { .. }
|
||||||
Self::NotImplemented { .. } | Self::UnsupportedTemporalFilter { .. } => {
|
| Self::SyncCheckTask { .. }
|
||||||
StatusCode::Unsupported
|
| Self::IllegalCheckTaskState { .. } => StatusCode::Unexpected,
|
||||||
}
|
Self::NotImplemented { .. }
|
||||||
|
| Self::UnsupportedTemporalFilter { .. }
|
||||||
|
| Self::Unsupported { .. } => StatusCode::Unsupported,
|
||||||
Self::External { source, .. } => source.status_code(),
|
Self::External { source, .. } => source.status_code(),
|
||||||
Self::Internal { .. } | Self::CacheRequired { .. } => StatusCode::Internal,
|
Self::Internal { .. } | Self::CacheRequired { .. } => StatusCode::Internal,
|
||||||
Self::StartServer { source, .. } | Self::ShutdownServer { source, .. } => {
|
Self::StartServer { source, .. } | Self::ShutdownServer { source, .. } => {
|
||||||
|
|||||||
@@ -43,8 +43,8 @@ mod utils;
|
|||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod test_utils;
|
mod test_utils;
|
||||||
|
|
||||||
pub use adapter::{FlowConfig, FlowWorkerManager, FlowWorkerManagerRef, FlownodeOptions};
|
pub use adapter::{FlowConfig, FlowStreamingEngine, FlowWorkerManagerRef, FlownodeOptions};
|
||||||
pub use batching_mode::frontend_client::FrontendClient;
|
pub use batching_mode::frontend_client::{FrontendClient, GrpcQueryHandlerWithBoxedError};
|
||||||
pub(crate) use engine::{CreateFlowArgs, FlowId, TableName};
|
pub(crate) use engine::{CreateFlowArgs, FlowId, TableName};
|
||||||
pub use error::{Error, Result};
|
pub use error::{Error, Result};
|
||||||
pub use server::{
|
pub use server::{
|
||||||
|
|||||||
@@ -29,6 +29,7 @@ use common_meta::key::TableMetadataManagerRef;
|
|||||||
use common_meta::kv_backend::KvBackendRef;
|
use common_meta::kv_backend::KvBackendRef;
|
||||||
use common_meta::node_manager::{Flownode, NodeManagerRef};
|
use common_meta::node_manager::{Flownode, NodeManagerRef};
|
||||||
use common_query::Output;
|
use common_query::Output;
|
||||||
|
use common_runtime::JoinHandle;
|
||||||
use common_telemetry::tracing::info;
|
use common_telemetry::tracing::info;
|
||||||
use futures::{FutureExt, TryStreamExt};
|
use futures::{FutureExt, TryStreamExt};
|
||||||
use greptime_proto::v1::flow::{flow_server, FlowRequest, FlowResponse, InsertRequests};
|
use greptime_proto::v1::flow::{flow_server, FlowRequest, FlowResponse, InsertRequests};
|
||||||
@@ -50,7 +51,10 @@ use tonic::codec::CompressionEncoding;
|
|||||||
use tonic::transport::server::TcpIncoming;
|
use tonic::transport::server::TcpIncoming;
|
||||||
use tonic::{Request, Response, Status};
|
use tonic::{Request, Response, Status};
|
||||||
|
|
||||||
|
use crate::adapter::flownode_impl::{FlowDualEngine, FlowDualEngineRef};
|
||||||
use crate::adapter::{create_worker, FlowWorkerManagerRef};
|
use crate::adapter::{create_worker, FlowWorkerManagerRef};
|
||||||
|
use crate::batching_mode::engine::BatchingEngine;
|
||||||
|
use crate::engine::FlowEngine;
|
||||||
use crate::error::{
|
use crate::error::{
|
||||||
to_status_with_last_err, CacheRequiredSnafu, CreateFlowSnafu, ExternalSnafu, FlowNotFoundSnafu,
|
to_status_with_last_err, CacheRequiredSnafu, CreateFlowSnafu, ExternalSnafu, FlowNotFoundSnafu,
|
||||||
ListFlowsSnafu, ParseAddrSnafu, ShutdownServerSnafu, StartServerSnafu, UnexpectedSnafu,
|
ListFlowsSnafu, ParseAddrSnafu, ShutdownServerSnafu, StartServerSnafu, UnexpectedSnafu,
|
||||||
@@ -59,19 +63,21 @@ use crate::heartbeat::HeartbeatTask;
|
|||||||
use crate::metrics::{METRIC_FLOW_PROCESSING_TIME, METRIC_FLOW_ROWS};
|
use crate::metrics::{METRIC_FLOW_PROCESSING_TIME, METRIC_FLOW_ROWS};
|
||||||
use crate::transform::register_function_to_query_engine;
|
use crate::transform::register_function_to_query_engine;
|
||||||
use crate::utils::{SizeReportSender, StateReportHandler};
|
use crate::utils::{SizeReportSender, StateReportHandler};
|
||||||
use crate::{CreateFlowArgs, Error, FlowWorkerManager, FlownodeOptions, FrontendClient};
|
use crate::{CreateFlowArgs, Error, FlowStreamingEngine, FlownodeOptions, FrontendClient};
|
||||||
|
|
||||||
pub const FLOW_NODE_SERVER_NAME: &str = "FLOW_NODE_SERVER";
|
pub const FLOW_NODE_SERVER_NAME: &str = "FLOW_NODE_SERVER";
|
||||||
/// wrapping flow node manager to avoid orphan rule with Arc<...>
|
/// wrapping flow node manager to avoid orphan rule with Arc<...>
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
pub struct FlowService {
|
pub struct FlowService {
|
||||||
/// TODO(discord9): replace with dual engine
|
/// TODO(discord9): replace with dual engine
|
||||||
pub manager: FlowWorkerManagerRef,
|
pub dual_engine: FlowDualEngineRef,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl FlowService {
|
impl FlowService {
|
||||||
pub fn new(manager: FlowWorkerManagerRef) -> Self {
|
pub fn new(manager: FlowDualEngineRef) -> Self {
|
||||||
Self { manager }
|
Self {
|
||||||
|
dual_engine: manager,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -86,7 +92,7 @@ impl flow_server::Flow for FlowService {
|
|||||||
.start_timer();
|
.start_timer();
|
||||||
|
|
||||||
let request = request.into_inner();
|
let request = request.into_inner();
|
||||||
self.manager
|
self.dual_engine
|
||||||
.handle(request)
|
.handle(request)
|
||||||
.await
|
.await
|
||||||
.map_err(|err| {
|
.map_err(|err| {
|
||||||
@@ -126,7 +132,7 @@ impl flow_server::Flow for FlowService {
|
|||||||
.with_label_values(&["in"])
|
.with_label_values(&["in"])
|
||||||
.inc_by(row_count as u64);
|
.inc_by(row_count as u64);
|
||||||
|
|
||||||
self.manager
|
self.dual_engine
|
||||||
.handle_inserts(request)
|
.handle_inserts(request)
|
||||||
.await
|
.await
|
||||||
.map(Response::new)
|
.map(Response::new)
|
||||||
@@ -139,11 +145,16 @@ pub struct FlownodeServer {
|
|||||||
inner: Arc<FlownodeServerInner>,
|
inner: Arc<FlownodeServerInner>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// FlownodeServerInner is the inner state of FlownodeServer,
|
||||||
|
/// this struct mostly useful for construct/start and stop the
|
||||||
|
/// flow node server
|
||||||
struct FlownodeServerInner {
|
struct FlownodeServerInner {
|
||||||
/// worker shutdown signal, not to be confused with server_shutdown_tx
|
/// worker shutdown signal, not to be confused with server_shutdown_tx
|
||||||
worker_shutdown_tx: Mutex<broadcast::Sender<()>>,
|
worker_shutdown_tx: Mutex<broadcast::Sender<()>>,
|
||||||
/// server shutdown signal for shutdown grpc server
|
/// server shutdown signal for shutdown grpc server
|
||||||
server_shutdown_tx: Mutex<broadcast::Sender<()>>,
|
server_shutdown_tx: Mutex<broadcast::Sender<()>>,
|
||||||
|
/// streaming task handler
|
||||||
|
streaming_task_handler: Mutex<Option<JoinHandle<()>>>,
|
||||||
flow_service: FlowService,
|
flow_service: FlowService,
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -156,16 +167,28 @@ impl FlownodeServer {
|
|||||||
flow_service,
|
flow_service,
|
||||||
worker_shutdown_tx: Mutex::new(tx),
|
worker_shutdown_tx: Mutex::new(tx),
|
||||||
server_shutdown_tx: Mutex::new(server_tx),
|
server_shutdown_tx: Mutex::new(server_tx),
|
||||||
|
streaming_task_handler: Mutex::new(None),
|
||||||
}),
|
}),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Start the background task for streaming computation.
|
/// Start the background task for streaming computation.
|
||||||
async fn start_workers(&self) -> Result<(), Error> {
|
async fn start_workers(&self) -> Result<(), Error> {
|
||||||
let manager_ref = self.inner.flow_service.manager.clone();
|
let manager_ref = self.inner.flow_service.dual_engine.clone();
|
||||||
let _handle = manager_ref
|
let handle = manager_ref
|
||||||
.clone()
|
.streaming_engine()
|
||||||
.run_background(Some(self.inner.worker_shutdown_tx.lock().await.subscribe()));
|
.run_background(Some(self.inner.worker_shutdown_tx.lock().await.subscribe()));
|
||||||
|
self.inner
|
||||||
|
.streaming_task_handler
|
||||||
|
.lock()
|
||||||
|
.await
|
||||||
|
.replace(handle);
|
||||||
|
|
||||||
|
self.inner
|
||||||
|
.flow_service
|
||||||
|
.dual_engine
|
||||||
|
.start_flow_consistent_check_task()
|
||||||
|
.await?;
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
@@ -176,6 +199,11 @@ impl FlownodeServer {
|
|||||||
if tx.send(()).is_err() {
|
if tx.send(()).is_err() {
|
||||||
info!("Receiver dropped, the flow node server has already shutdown");
|
info!("Receiver dropped, the flow node server has already shutdown");
|
||||||
}
|
}
|
||||||
|
self.inner
|
||||||
|
.flow_service
|
||||||
|
.dual_engine
|
||||||
|
.stop_flow_consistent_check_task()
|
||||||
|
.await?;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -272,8 +300,8 @@ impl FlownodeInstance {
|
|||||||
&self.flownode_server
|
&self.flownode_server
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn flow_worker_manager(&self) -> FlowWorkerManagerRef {
|
pub fn flow_engine(&self) -> FlowDualEngineRef {
|
||||||
self.flownode_server.inner.flow_service.manager.clone()
|
self.flownode_server.inner.flow_service.dual_engine.clone()
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn setup_services(&mut self, services: ServerHandlers) {
|
pub fn setup_services(&mut self, services: ServerHandlers) {
|
||||||
@@ -342,12 +370,21 @@ impl FlownodeBuilder {
|
|||||||
self.build_manager(query_engine_factory.query_engine())
|
self.build_manager(query_engine_factory.query_engine())
|
||||||
.await?,
|
.await?,
|
||||||
);
|
);
|
||||||
|
let batching = Arc::new(BatchingEngine::new(
|
||||||
|
self.frontend_client.clone(),
|
||||||
|
query_engine_factory.query_engine(),
|
||||||
|
self.flow_metadata_manager.clone(),
|
||||||
|
self.table_meta.clone(),
|
||||||
|
self.catalog_manager.clone(),
|
||||||
|
));
|
||||||
|
let dual = FlowDualEngine::new(
|
||||||
|
manager.clone(),
|
||||||
|
batching,
|
||||||
|
self.flow_metadata_manager.clone(),
|
||||||
|
self.catalog_manager.clone(),
|
||||||
|
);
|
||||||
|
|
||||||
if let Err(err) = self.recover_flows(&manager).await {
|
let server = FlownodeServer::new(FlowService::new(Arc::new(dual)));
|
||||||
common_telemetry::error!(err; "Failed to recover flows");
|
|
||||||
}
|
|
||||||
|
|
||||||
let server = FlownodeServer::new(FlowService::new(manager.clone()));
|
|
||||||
|
|
||||||
let heartbeat_task = self.heartbeat_task;
|
let heartbeat_task = self.heartbeat_task;
|
||||||
|
|
||||||
@@ -364,7 +401,7 @@ impl FlownodeBuilder {
|
|||||||
/// or recover all existing flow tasks if in standalone mode(nodeid is None)
|
/// or recover all existing flow tasks if in standalone mode(nodeid is None)
|
||||||
///
|
///
|
||||||
/// TODO(discord9): persistent flow tasks with internal state
|
/// TODO(discord9): persistent flow tasks with internal state
|
||||||
async fn recover_flows(&self, manager: &FlowWorkerManagerRef) -> Result<usize, Error> {
|
async fn recover_flows(&self, manager: &FlowDualEngine) -> Result<usize, Error> {
|
||||||
let nodeid = self.opts.node_id;
|
let nodeid = self.opts.node_id;
|
||||||
let to_be_recovered: Vec<_> = if let Some(nodeid) = nodeid {
|
let to_be_recovered: Vec<_> = if let Some(nodeid) = nodeid {
|
||||||
let to_be_recover = self
|
let to_be_recover = self
|
||||||
@@ -436,7 +473,7 @@ impl FlownodeBuilder {
|
|||||||
),
|
),
|
||||||
};
|
};
|
||||||
manager
|
manager
|
||||||
.create_flow_inner(args)
|
.create_flow(args)
|
||||||
.await
|
.await
|
||||||
.map_err(BoxedError::new)
|
.map_err(BoxedError::new)
|
||||||
.with_context(|_| CreateFlowSnafu {
|
.with_context(|_| CreateFlowSnafu {
|
||||||
@@ -452,7 +489,7 @@ impl FlownodeBuilder {
|
|||||||
async fn build_manager(
|
async fn build_manager(
|
||||||
&mut self,
|
&mut self,
|
||||||
query_engine: Arc<dyn QueryEngine>,
|
query_engine: Arc<dyn QueryEngine>,
|
||||||
) -> Result<FlowWorkerManager, Error> {
|
) -> Result<FlowStreamingEngine, Error> {
|
||||||
let table_meta = self.table_meta.clone();
|
let table_meta = self.table_meta.clone();
|
||||||
|
|
||||||
register_function_to_query_engine(&query_engine);
|
register_function_to_query_engine(&query_engine);
|
||||||
@@ -461,7 +498,7 @@ impl FlownodeBuilder {
|
|||||||
|
|
||||||
let node_id = self.opts.node_id.map(|id| id as u32);
|
let node_id = self.opts.node_id.map(|id| id as u32);
|
||||||
|
|
||||||
let mut man = FlowWorkerManager::new(node_id, query_engine, table_meta);
|
let mut man = FlowStreamingEngine::new(node_id, query_engine, table_meta);
|
||||||
for worker_id in 0..num_workers {
|
for worker_id in 0..num_workers {
|
||||||
let (tx, rx) = oneshot::channel();
|
let (tx, rx) = oneshot::channel();
|
||||||
|
|
||||||
@@ -543,6 +580,10 @@ impl<'a> FlownodeServiceBuilder<'a> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Basically a tiny frontend that communicates with datanode, different from [`FrontendClient`] which
|
||||||
|
/// connect to a real frontend instead, this is used for flow's streaming engine. And is for simple query.
|
||||||
|
///
|
||||||
|
/// For heavy query use [`FrontendClient`] which offload computation to frontend, lifting the load from flownode
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
pub struct FrontendInvoker {
|
pub struct FrontendInvoker {
|
||||||
inserter: Arc<Inserter>,
|
inserter: Arc<Inserter>,
|
||||||
|
|||||||
@@ -15,6 +15,7 @@ api.workspace = true
|
|||||||
arc-swap = "1.0"
|
arc-swap = "1.0"
|
||||||
async-trait.workspace = true
|
async-trait.workspace = true
|
||||||
auth.workspace = true
|
auth.workspace = true
|
||||||
|
bytes.workspace = true
|
||||||
cache.workspace = true
|
cache.workspace = true
|
||||||
catalog.workspace = true
|
catalog.workspace = true
|
||||||
client.workspace = true
|
client.workspace = true
|
||||||
|
|||||||
@@ -19,6 +19,8 @@ use common_error::define_into_tonic_status;
|
|||||||
use common_error::ext::{BoxedError, ErrorExt};
|
use common_error::ext::{BoxedError, ErrorExt};
|
||||||
use common_error::status_code::StatusCode;
|
use common_error::status_code::StatusCode;
|
||||||
use common_macro::stack_trace_debug;
|
use common_macro::stack_trace_debug;
|
||||||
|
use common_query::error::datafusion_status_code;
|
||||||
|
use datafusion::error::DataFusionError;
|
||||||
use session::ReadPreference;
|
use session::ReadPreference;
|
||||||
use snafu::{Location, Snafu};
|
use snafu::{Location, Snafu};
|
||||||
use store_api::storage::RegionId;
|
use store_api::storage::RegionId;
|
||||||
@@ -345,7 +347,15 @@ pub enum Error {
|
|||||||
SubstraitDecodeLogicalPlan {
|
SubstraitDecodeLogicalPlan {
|
||||||
#[snafu(implicit)]
|
#[snafu(implicit)]
|
||||||
location: Location,
|
location: Location,
|
||||||
source: substrait::error::Error,
|
source: common_query::error::Error,
|
||||||
|
},
|
||||||
|
|
||||||
|
#[snafu(display("DataFusionError"))]
|
||||||
|
DataFusion {
|
||||||
|
#[snafu(source)]
|
||||||
|
error: DataFusionError,
|
||||||
|
#[snafu(implicit)]
|
||||||
|
location: Location,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -423,6 +433,8 @@ impl ErrorExt for Error {
|
|||||||
Error::TableOperation { source, .. } => source.status_code(),
|
Error::TableOperation { source, .. } => source.status_code(),
|
||||||
|
|
||||||
Error::InFlightWriteBytesExceeded { .. } => StatusCode::RateLimited,
|
Error::InFlightWriteBytesExceeded { .. } => StatusCode::RateLimited,
|
||||||
|
|
||||||
|
Error::DataFusion { error, .. } => datafusion_status_code::<Self>(error, None),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -278,7 +278,7 @@ impl SqlQueryHandler for Instance {
|
|||||||
// plan should be prepared before exec
|
// plan should be prepared before exec
|
||||||
// we'll do check there
|
// we'll do check there
|
||||||
self.query_engine
|
self.query_engine
|
||||||
.execute(plan, query_ctx)
|
.execute(plan.clone(), query_ctx)
|
||||||
.await
|
.await
|
||||||
.context(ExecLogicalPlanSnafu)
|
.context(ExecLogicalPlanSnafu)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -12,29 +12,33 @@
|
|||||||
// See the License for the specific language governing permissions and
|
// See the License for the specific language governing permissions and
|
||||||
// limitations under the License.
|
// limitations under the License.
|
||||||
|
|
||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
use api::v1::ddl_request::{Expr as DdlExpr, Expr};
|
use api::v1::ddl_request::{Expr as DdlExpr, Expr};
|
||||||
use api::v1::greptime_request::Request;
|
use api::v1::greptime_request::Request;
|
||||||
use api::v1::query_request::Query;
|
use api::v1::query_request::Query;
|
||||||
use api::v1::{DeleteRequests, DropFlowExpr, InsertRequests, RowDeleteRequests, RowInsertRequests};
|
use api::v1::{
|
||||||
|
DeleteRequests, DropFlowExpr, InsertIntoPlan, InsertRequests, RowDeleteRequests,
|
||||||
|
RowInsertRequests,
|
||||||
|
};
|
||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq};
|
use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq};
|
||||||
use common_base::AffectedRows;
|
use common_base::AffectedRows;
|
||||||
|
use common_query::logical_plan::add_insert_to_logical_plan;
|
||||||
use common_query::Output;
|
use common_query::Output;
|
||||||
use common_telemetry::tracing::{self};
|
use common_telemetry::tracing::{self};
|
||||||
use datafusion::execution::SessionStateBuilder;
|
|
||||||
use query::parser::PromQuery;
|
use query::parser::PromQuery;
|
||||||
use servers::interceptor::{GrpcQueryInterceptor, GrpcQueryInterceptorRef};
|
use servers::interceptor::{GrpcQueryInterceptor, GrpcQueryInterceptorRef};
|
||||||
use servers::query_handler::grpc::{GrpcQueryHandler, RawRecordBatch};
|
use servers::query_handler::grpc::{GrpcQueryHandler, RawRecordBatch};
|
||||||
use servers::query_handler::sql::SqlQueryHandler;
|
use servers::query_handler::sql::SqlQueryHandler;
|
||||||
use session::context::QueryContextRef;
|
use session::context::QueryContextRef;
|
||||||
use snafu::{ensure, OptionExt, ResultExt};
|
use snafu::{ensure, OptionExt, ResultExt};
|
||||||
use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan};
|
|
||||||
use table::table_name::TableName;
|
use table::table_name::TableName;
|
||||||
|
|
||||||
use crate::error::{
|
use crate::error::{
|
||||||
CatalogSnafu, Error, InFlightWriteBytesExceededSnafu, IncompleteGrpcRequestSnafu,
|
CatalogSnafu, DataFusionSnafu, Error, InFlightWriteBytesExceededSnafu,
|
||||||
NotSupportedSnafu, PermissionSnafu, Result, SubstraitDecodeLogicalPlanSnafu,
|
IncompleteGrpcRequestSnafu, NotSupportedSnafu, PermissionSnafu, PlanStatementSnafu, Result,
|
||||||
TableNotFoundSnafu, TableOperationSnafu,
|
SubstraitDecodeLogicalPlanSnafu, TableNotFoundSnafu, TableOperationSnafu,
|
||||||
};
|
};
|
||||||
use crate::instance::{attach_timer, Instance};
|
use crate::instance::{attach_timer, Instance};
|
||||||
use crate::metrics::{
|
use crate::metrics::{
|
||||||
@@ -91,14 +95,31 @@ impl GrpcQueryHandler for Instance {
|
|||||||
Query::LogicalPlan(plan) => {
|
Query::LogicalPlan(plan) => {
|
||||||
// this path is useful internally when flownode needs to execute a logical plan through gRPC interface
|
// this path is useful internally when flownode needs to execute a logical plan through gRPC interface
|
||||||
let timer = GRPC_HANDLE_PLAN_ELAPSED.start_timer();
|
let timer = GRPC_HANDLE_PLAN_ELAPSED.start_timer();
|
||||||
let plan = DFLogicalSubstraitConvertor {}
|
|
||||||
.decode(&*plan, SessionStateBuilder::default().build())
|
// use dummy catalog to provide table
|
||||||
|
let plan_decoder = self
|
||||||
|
.query_engine()
|
||||||
|
.engine_context(ctx.clone())
|
||||||
|
.new_plan_decoder()
|
||||||
|
.context(PlanStatementSnafu)?;
|
||||||
|
|
||||||
|
let dummy_catalog_list =
|
||||||
|
Arc::new(catalog::table_source::dummy_catalog::DummyCatalogList::new(
|
||||||
|
self.catalog_manager().clone(),
|
||||||
|
));
|
||||||
|
|
||||||
|
let logical_plan = plan_decoder
|
||||||
|
.decode(bytes::Bytes::from(plan), dummy_catalog_list, true)
|
||||||
.await
|
.await
|
||||||
.context(SubstraitDecodeLogicalPlanSnafu)?;
|
.context(SubstraitDecodeLogicalPlanSnafu)?;
|
||||||
let output = SqlQueryHandler::do_exec_plan(self, plan, ctx.clone()).await?;
|
let output =
|
||||||
|
SqlQueryHandler::do_exec_plan(self, logical_plan, ctx.clone()).await?;
|
||||||
|
|
||||||
attach_timer(output, timer)
|
attach_timer(output, timer)
|
||||||
}
|
}
|
||||||
|
Query::InsertIntoPlan(insert) => {
|
||||||
|
self.handle_insert_plan(insert, ctx.clone()).await?
|
||||||
|
}
|
||||||
Query::PromRangeQuery(promql) => {
|
Query::PromRangeQuery(promql) => {
|
||||||
let timer = GRPC_HANDLE_PROMQL_ELAPSED.start_timer();
|
let timer = GRPC_HANDLE_PROMQL_ELAPSED.start_timer();
|
||||||
let prom_query = PromQuery {
|
let prom_query = PromQuery {
|
||||||
@@ -284,6 +305,91 @@ fn fill_catalog_and_schema_from_context(ddl_expr: &mut DdlExpr, ctx: &QueryConte
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl Instance {
|
impl Instance {
|
||||||
|
async fn handle_insert_plan(
|
||||||
|
&self,
|
||||||
|
insert: InsertIntoPlan,
|
||||||
|
ctx: QueryContextRef,
|
||||||
|
) -> Result<Output> {
|
||||||
|
let timer = GRPC_HANDLE_PLAN_ELAPSED.start_timer();
|
||||||
|
let table_name = insert.table_name.context(IncompleteGrpcRequestSnafu {
|
||||||
|
err_msg: "'table_name' is absent in InsertIntoPlan",
|
||||||
|
})?;
|
||||||
|
|
||||||
|
// use dummy catalog to provide table
|
||||||
|
let plan_decoder = self
|
||||||
|
.query_engine()
|
||||||
|
.engine_context(ctx.clone())
|
||||||
|
.new_plan_decoder()
|
||||||
|
.context(PlanStatementSnafu)?;
|
||||||
|
|
||||||
|
let dummy_catalog_list =
|
||||||
|
Arc::new(catalog::table_source::dummy_catalog::DummyCatalogList::new(
|
||||||
|
self.catalog_manager().clone(),
|
||||||
|
));
|
||||||
|
|
||||||
|
// no optimize yet since we still need to add stuff
|
||||||
|
let logical_plan = plan_decoder
|
||||||
|
.decode(
|
||||||
|
bytes::Bytes::from(insert.logical_plan),
|
||||||
|
dummy_catalog_list,
|
||||||
|
false,
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
.context(SubstraitDecodeLogicalPlanSnafu)?;
|
||||||
|
|
||||||
|
let table = self
|
||||||
|
.catalog_manager()
|
||||||
|
.table(
|
||||||
|
&table_name.catalog_name,
|
||||||
|
&table_name.schema_name,
|
||||||
|
&table_name.table_name,
|
||||||
|
None,
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
.context(CatalogSnafu)?
|
||||||
|
.with_context(|| TableNotFoundSnafu {
|
||||||
|
table_name: [
|
||||||
|
table_name.catalog_name.clone(),
|
||||||
|
table_name.schema_name.clone(),
|
||||||
|
table_name.table_name.clone(),
|
||||||
|
]
|
||||||
|
.join("."),
|
||||||
|
})?;
|
||||||
|
|
||||||
|
let table_info = table.table_info();
|
||||||
|
|
||||||
|
let df_schema = Arc::new(
|
||||||
|
table_info
|
||||||
|
.meta
|
||||||
|
.schema
|
||||||
|
.arrow_schema()
|
||||||
|
.clone()
|
||||||
|
.try_into()
|
||||||
|
.context(DataFusionSnafu)?,
|
||||||
|
);
|
||||||
|
|
||||||
|
let insert_into = add_insert_to_logical_plan(table_name, df_schema, logical_plan)
|
||||||
|
.context(SubstraitDecodeLogicalPlanSnafu)?;
|
||||||
|
|
||||||
|
let engine_ctx = self.query_engine().engine_context(ctx.clone());
|
||||||
|
let state = engine_ctx.state();
|
||||||
|
// Analyze the plan
|
||||||
|
let analyzed_plan = state
|
||||||
|
.analyzer()
|
||||||
|
.execute_and_check(insert_into, state.config_options(), |_, _| {})
|
||||||
|
.context(common_query::error::GeneralDataFusionSnafu)
|
||||||
|
.context(SubstraitDecodeLogicalPlanSnafu)?;
|
||||||
|
|
||||||
|
// Optimize the plan
|
||||||
|
let optimized_plan = state
|
||||||
|
.optimize(&analyzed_plan)
|
||||||
|
.context(common_query::error::GeneralDataFusionSnafu)
|
||||||
|
.context(SubstraitDecodeLogicalPlanSnafu)?;
|
||||||
|
|
||||||
|
let output = SqlQueryHandler::do_exec_plan(self, optimized_plan, ctx.clone()).await?;
|
||||||
|
|
||||||
|
Ok(attach_timer(output, timer))
|
||||||
|
}
|
||||||
#[tracing::instrument(skip_all)]
|
#[tracing::instrument(skip_all)]
|
||||||
pub async fn handle_inserts(
|
pub async fn handle_inserts(
|
||||||
&self,
|
&self,
|
||||||
|
|||||||
@@ -26,6 +26,7 @@ use common_catalog::consts::{is_readonly_schema, DEFAULT_CATALOG_NAME, DEFAULT_S
|
|||||||
use common_catalog::{format_full_flow_name, format_full_table_name};
|
use common_catalog::{format_full_flow_name, format_full_table_name};
|
||||||
use common_error::ext::BoxedError;
|
use common_error::ext::BoxedError;
|
||||||
use common_meta::cache_invalidator::Context;
|
use common_meta::cache_invalidator::Context;
|
||||||
|
use common_meta::ddl::create_flow::FlowType;
|
||||||
use common_meta::ddl::ExecutorContext;
|
use common_meta::ddl::ExecutorContext;
|
||||||
use common_meta::instruction::CacheIdent;
|
use common_meta::instruction::CacheIdent;
|
||||||
use common_meta::key::schema_name::{SchemaName, SchemaNameKey};
|
use common_meta::key::schema_name::{SchemaName, SchemaNameKey};
|
||||||
@@ -38,6 +39,8 @@ use common_meta::rpc::router::{Partition, Partition as MetaPartition};
|
|||||||
use common_query::Output;
|
use common_query::Output;
|
||||||
use common_telemetry::{debug, info, tracing};
|
use common_telemetry::{debug, info, tracing};
|
||||||
use common_time::Timezone;
|
use common_time::Timezone;
|
||||||
|
use datafusion_common::tree_node::TreeNodeVisitor;
|
||||||
|
use datafusion_expr::LogicalPlan;
|
||||||
use datatypes::prelude::ConcreteDataType;
|
use datatypes::prelude::ConcreteDataType;
|
||||||
use datatypes::schema::{RawSchema, Schema};
|
use datatypes::schema::{RawSchema, Schema};
|
||||||
use datatypes::value::Value;
|
use datatypes::value::Value;
|
||||||
@@ -45,7 +48,7 @@ use lazy_static::lazy_static;
|
|||||||
use partition::expr::{Operand, PartitionExpr, RestrictedOp};
|
use partition::expr::{Operand, PartitionExpr, RestrictedOp};
|
||||||
use partition::multi_dim::MultiDimPartitionRule;
|
use partition::multi_dim::MultiDimPartitionRule;
|
||||||
use partition::partition::{PartitionBound, PartitionDef};
|
use partition::partition::{PartitionBound, PartitionDef};
|
||||||
use query::parser::QueryStatement;
|
use query::parser::{QueryLanguageParser, QueryStatement};
|
||||||
use query::plan::extract_and_rewrite_full_table_names;
|
use query::plan::extract_and_rewrite_full_table_names;
|
||||||
use query::query_engine::DefaultSerializer;
|
use query::query_engine::DefaultSerializer;
|
||||||
use query::sql::create_table_stmt;
|
use query::sql::create_table_stmt;
|
||||||
@@ -69,13 +72,14 @@ use table::table_name::TableName;
|
|||||||
use table::TableRef;
|
use table::TableRef;
|
||||||
|
|
||||||
use crate::error::{
|
use crate::error::{
|
||||||
self, AlterExprToRequestSnafu, CatalogSnafu, ColumnDataTypeSnafu, ColumnNotFoundSnafu,
|
self, AlterExprToRequestSnafu, BuildDfLogicalPlanSnafu, CatalogSnafu, ColumnDataTypeSnafu,
|
||||||
ConvertSchemaSnafu, CreateLogicalTablesSnafu, CreateTableInfoSnafu, DeserializePartitionSnafu,
|
ColumnNotFoundSnafu, ConvertSchemaSnafu, CreateLogicalTablesSnafu, CreateTableInfoSnafu,
|
||||||
EmptyDdlExprSnafu, ExtractTableNamesSnafu, FlowNotFoundSnafu, InvalidPartitionRuleSnafu,
|
DeserializePartitionSnafu, EmptyDdlExprSnafu, ExternalSnafu, ExtractTableNamesSnafu,
|
||||||
InvalidPartitionSnafu, InvalidSqlSnafu, InvalidTableNameSnafu, InvalidViewNameSnafu,
|
FlowNotFoundSnafu, InvalidPartitionRuleSnafu, InvalidPartitionSnafu, InvalidSqlSnafu,
|
||||||
InvalidViewStmtSnafu, ParseSqlValueSnafu, Result, SchemaInUseSnafu, SchemaNotFoundSnafu,
|
InvalidTableNameSnafu, InvalidViewNameSnafu, InvalidViewStmtSnafu, ParseSqlValueSnafu, Result,
|
||||||
SchemaReadOnlySnafu, SubstraitCodecSnafu, TableAlreadyExistsSnafu, TableMetadataManagerSnafu,
|
SchemaInUseSnafu, SchemaNotFoundSnafu, SchemaReadOnlySnafu, SubstraitCodecSnafu,
|
||||||
TableNotFoundSnafu, UnrecognizedTableOptionSnafu, ViewAlreadyExistsSnafu,
|
TableAlreadyExistsSnafu, TableMetadataManagerSnafu, TableNotFoundSnafu,
|
||||||
|
UnrecognizedTableOptionSnafu, ViewAlreadyExistsSnafu,
|
||||||
};
|
};
|
||||||
use crate::expr_helper;
|
use crate::expr_helper;
|
||||||
use crate::statement::show::create_partitions_stmt;
|
use crate::statement::show::create_partitions_stmt;
|
||||||
@@ -364,6 +368,18 @@ impl StatementExecutor {
|
|||||||
expr: CreateFlowExpr,
|
expr: CreateFlowExpr,
|
||||||
query_context: QueryContextRef,
|
query_context: QueryContextRef,
|
||||||
) -> Result<SubmitDdlTaskResponse> {
|
) -> Result<SubmitDdlTaskResponse> {
|
||||||
|
let flow_type = self
|
||||||
|
.determine_flow_type(&expr.sql, query_context.clone())
|
||||||
|
.await?;
|
||||||
|
info!("determined flow={} type: {:#?}", expr.flow_name, flow_type);
|
||||||
|
|
||||||
|
let expr = {
|
||||||
|
let mut expr = expr;
|
||||||
|
expr.flow_options
|
||||||
|
.insert(FlowType::FLOW_TYPE_KEY.to_string(), flow_type.to_string());
|
||||||
|
expr
|
||||||
|
};
|
||||||
|
|
||||||
let task = CreateFlowTask::try_from(PbCreateFlowTask {
|
let task = CreateFlowTask::try_from(PbCreateFlowTask {
|
||||||
create_flow: Some(expr),
|
create_flow: Some(expr),
|
||||||
})
|
})
|
||||||
@@ -379,6 +395,55 @@ impl StatementExecutor {
|
|||||||
.context(error::ExecuteDdlSnafu)
|
.context(error::ExecuteDdlSnafu)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Determine the flow type based on the SQL query
|
||||||
|
///
|
||||||
|
/// If it contains aggregation or distinct, then it is a batch flow, otherwise it is a streaming flow
|
||||||
|
async fn determine_flow_type(&self, sql: &str, query_ctx: QueryContextRef) -> Result<FlowType> {
|
||||||
|
let engine = &self.query_engine;
|
||||||
|
let stmt = QueryLanguageParser::parse_sql(sql, &query_ctx)
|
||||||
|
.map_err(BoxedError::new)
|
||||||
|
.context(ExternalSnafu)?;
|
||||||
|
let plan = engine
|
||||||
|
.planner()
|
||||||
|
.plan(&stmt, query_ctx)
|
||||||
|
.await
|
||||||
|
.map_err(BoxedError::new)
|
||||||
|
.context(ExternalSnafu)?;
|
||||||
|
|
||||||
|
/// Visitor to find aggregation or distinct
|
||||||
|
struct FindAggr {
|
||||||
|
is_aggr: bool,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl TreeNodeVisitor<'_> for FindAggr {
|
||||||
|
type Node = LogicalPlan;
|
||||||
|
fn f_down(
|
||||||
|
&mut self,
|
||||||
|
node: &Self::Node,
|
||||||
|
) -> datafusion_common::Result<datafusion_common::tree_node::TreeNodeRecursion>
|
||||||
|
{
|
||||||
|
match node {
|
||||||
|
LogicalPlan::Aggregate(_) | LogicalPlan::Distinct(_) => {
|
||||||
|
self.is_aggr = true;
|
||||||
|
return Ok(datafusion_common::tree_node::TreeNodeRecursion::Stop);
|
||||||
|
}
|
||||||
|
_ => (),
|
||||||
|
}
|
||||||
|
Ok(datafusion_common::tree_node::TreeNodeRecursion::Continue)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut find_aggr = FindAggr { is_aggr: false };
|
||||||
|
|
||||||
|
plan.visit_with_subqueries(&mut find_aggr)
|
||||||
|
.context(BuildDfLogicalPlanSnafu)?;
|
||||||
|
if find_aggr.is_aggr {
|
||||||
|
Ok(FlowType::Batching)
|
||||||
|
} else {
|
||||||
|
Ok(FlowType::Streaming)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[tracing::instrument(skip_all)]
|
#[tracing::instrument(skip_all)]
|
||||||
pub async fn create_view(
|
pub async fn create_view(
|
||||||
&self,
|
&self,
|
||||||
|
|||||||
@@ -132,7 +132,7 @@ impl GrpcQueryHandler for DummyInstance {
|
|||||||
);
|
);
|
||||||
result.remove(0)?
|
result.remove(0)?
|
||||||
}
|
}
|
||||||
Query::LogicalPlan(_) => unimplemented!(),
|
Query::LogicalPlan(_) | Query::InsertIntoPlan(_) => unimplemented!(),
|
||||||
Query::PromRangeQuery(promql) => {
|
Query::PromRangeQuery(promql) => {
|
||||||
let prom_query = PromQuery {
|
let prom_query = PromQuery {
|
||||||
query: promql.query,
|
query: promql.query,
|
||||||
|
|||||||
@@ -41,7 +41,7 @@ use common_procedure::options::ProcedureConfig;
|
|||||||
use common_procedure::ProcedureManagerRef;
|
use common_procedure::ProcedureManagerRef;
|
||||||
use common_wal::config::{DatanodeWalConfig, MetasrvWalConfig};
|
use common_wal::config::{DatanodeWalConfig, MetasrvWalConfig};
|
||||||
use datanode::datanode::DatanodeBuilder;
|
use datanode::datanode::DatanodeBuilder;
|
||||||
use flow::{FlownodeBuilder, FrontendClient};
|
use flow::{FlownodeBuilder, FrontendClient, GrpcQueryHandlerWithBoxedError};
|
||||||
use frontend::frontend::Frontend;
|
use frontend::frontend::Frontend;
|
||||||
use frontend::instance::builder::FrontendBuilder;
|
use frontend::instance::builder::FrontendBuilder;
|
||||||
use frontend::instance::{Instance, StandaloneDatanodeManager};
|
use frontend::instance::{Instance, StandaloneDatanodeManager};
|
||||||
@@ -174,8 +174,8 @@ impl GreptimeDbStandaloneBuilder {
|
|||||||
Some(procedure_manager.clone()),
|
Some(procedure_manager.clone()),
|
||||||
);
|
);
|
||||||
|
|
||||||
let fe_server_addr = opts.frontend_options().grpc.bind_addr.clone();
|
let (frontend_client, frontend_instance_handler) =
|
||||||
let frontend_client = FrontendClient::from_static_grpc_addr(fe_server_addr);
|
FrontendClient::from_empty_grpc_handler();
|
||||||
let flow_builder = FlownodeBuilder::new(
|
let flow_builder = FlownodeBuilder::new(
|
||||||
Default::default(),
|
Default::default(),
|
||||||
plugins.clone(),
|
plugins.clone(),
|
||||||
@@ -188,7 +188,7 @@ impl GreptimeDbStandaloneBuilder {
|
|||||||
|
|
||||||
let node_manager = Arc::new(StandaloneDatanodeManager {
|
let node_manager = Arc::new(StandaloneDatanodeManager {
|
||||||
region_server: datanode.region_server(),
|
region_server: datanode.region_server(),
|
||||||
flow_server: flownode.flow_worker_manager(),
|
flow_server: flownode.flow_engine(),
|
||||||
});
|
});
|
||||||
|
|
||||||
let table_id_sequence = Arc::new(
|
let table_id_sequence = Arc::new(
|
||||||
@@ -250,7 +250,15 @@ impl GreptimeDbStandaloneBuilder {
|
|||||||
.unwrap();
|
.unwrap();
|
||||||
let instance = Arc::new(instance);
|
let instance = Arc::new(instance);
|
||||||
|
|
||||||
let flow_worker_manager = flownode.flow_worker_manager();
|
// set the frontend client for flownode
|
||||||
|
let grpc_handler = instance.clone() as Arc<dyn GrpcQueryHandlerWithBoxedError>;
|
||||||
|
let weak_grpc_handler = Arc::downgrade(&grpc_handler);
|
||||||
|
frontend_instance_handler
|
||||||
|
.lock()
|
||||||
|
.unwrap()
|
||||||
|
.replace(weak_grpc_handler);
|
||||||
|
|
||||||
|
let flow_worker_manager = flownode.flow_engine().streaming_engine();
|
||||||
let invoker = flow::FrontendInvoker::build_from(
|
let invoker = flow::FrontendInvoker::build_from(
|
||||||
flow_worker_manager.clone(),
|
flow_worker_manager.clone(),
|
||||||
catalog_manager.clone(),
|
catalog_manager.clone(),
|
||||||
|
|||||||
@@ -8,6 +8,20 @@ CREATE TABLE distinct_basic (
|
|||||||
|
|
||||||
Affected Rows: 0
|
Affected Rows: 0
|
||||||
|
|
||||||
|
-- should fail
|
||||||
|
-- SQLNESS REPLACE id=\d+ id=REDACTED
|
||||||
|
CREATE FLOW test_distinct_basic SINK TO out_distinct_basic AS
|
||||||
|
SELECT
|
||||||
|
DISTINCT number as dis
|
||||||
|
FROM
|
||||||
|
distinct_basic;
|
||||||
|
|
||||||
|
Error: 3001(EngineExecuteQuery), Unsupported: Source table `greptime.public.distinct_basic`(id=REDACTED) has instant TTL, Instant TTL is not supported under batching mode. Consider using a TTL longer than flush interval
|
||||||
|
|
||||||
|
ALTER TABLE distinct_basic SET 'ttl' = '5s';
|
||||||
|
|
||||||
|
Affected Rows: 0
|
||||||
|
|
||||||
CREATE FLOW test_distinct_basic SINK TO out_distinct_basic AS
|
CREATE FLOW test_distinct_basic SINK TO out_distinct_basic AS
|
||||||
SELECT
|
SELECT
|
||||||
DISTINCT number as dis
|
DISTINCT number as dis
|
||||||
@@ -24,7 +38,7 @@ VALUES
|
|||||||
(20, "2021-07-01 00:00:00.200"),
|
(20, "2021-07-01 00:00:00.200"),
|
||||||
(22, "2021-07-01 00:00:00.600");
|
(22, "2021-07-01 00:00:00.600");
|
||||||
|
|
||||||
Affected Rows: 0
|
Affected Rows: 3
|
||||||
|
|
||||||
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
|
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
|
||||||
ADMIN FLUSH_FLOW('test_distinct_basic');
|
ADMIN FLUSH_FLOW('test_distinct_basic');
|
||||||
@@ -49,7 +63,7 @@ SHOW CREATE TABLE distinct_basic;
|
|||||||
| | |
|
| | |
|
||||||
| | ENGINE=mito |
|
| | ENGINE=mito |
|
||||||
| | WITH( |
|
| | WITH( |
|
||||||
| | ttl = 'instant' |
|
| | ttl = '5s' |
|
||||||
| | ) |
|
| | ) |
|
||||||
+----------------+-----------------------------------------------------------+
|
+----------------+-----------------------------------------------------------+
|
||||||
|
|
||||||
@@ -84,8 +98,93 @@ FROM
|
|||||||
|
|
||||||
SELECT number FROM distinct_basic;
|
SELECT number FROM distinct_basic;
|
||||||
|
|
||||||
++
|
+--------+
|
||||||
++
|
| number |
|
||||||
|
+--------+
|
||||||
|
| 20 |
|
||||||
|
| 22 |
|
||||||
|
+--------+
|
||||||
|
|
||||||
|
-- SQLNESS SLEEP 6s
|
||||||
|
ADMIN FLUSH_TABLE('distinct_basic');
|
||||||
|
|
||||||
|
+-------------------------------------+
|
||||||
|
| ADMIN FLUSH_TABLE('distinct_basic') |
|
||||||
|
+-------------------------------------+
|
||||||
|
| 0 |
|
||||||
|
+-------------------------------------+
|
||||||
|
|
||||||
|
INSERT INTO
|
||||||
|
distinct_basic
|
||||||
|
VALUES
|
||||||
|
(23, "2021-07-01 00:00:01.600");
|
||||||
|
|
||||||
|
Affected Rows: 1
|
||||||
|
|
||||||
|
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
|
||||||
|
ADMIN FLUSH_FLOW('test_distinct_basic');
|
||||||
|
|
||||||
|
+-----------------------------------------+
|
||||||
|
| ADMIN FLUSH_FLOW('test_distinct_basic') |
|
||||||
|
+-----------------------------------------+
|
||||||
|
| FLOW_FLUSHED |
|
||||||
|
+-----------------------------------------+
|
||||||
|
|
||||||
|
SHOW CREATE TABLE distinct_basic;
|
||||||
|
|
||||||
|
+----------------+-----------------------------------------------------------+
|
||||||
|
| Table | Create Table |
|
||||||
|
+----------------+-----------------------------------------------------------+
|
||||||
|
| distinct_basic | CREATE TABLE IF NOT EXISTS "distinct_basic" ( |
|
||||||
|
| | "number" INT NULL, |
|
||||||
|
| | "ts" TIMESTAMP(3) NOT NULL DEFAULT current_timestamp(), |
|
||||||
|
| | TIME INDEX ("ts"), |
|
||||||
|
| | PRIMARY KEY ("number") |
|
||||||
|
| | ) |
|
||||||
|
| | |
|
||||||
|
| | ENGINE=mito |
|
||||||
|
| | WITH( |
|
||||||
|
| | ttl = '5s' |
|
||||||
|
| | ) |
|
||||||
|
+----------------+-----------------------------------------------------------+
|
||||||
|
|
||||||
|
SHOW CREATE TABLE out_distinct_basic;
|
||||||
|
|
||||||
|
+--------------------+---------------------------------------------------+
|
||||||
|
| Table | Create Table |
|
||||||
|
+--------------------+---------------------------------------------------+
|
||||||
|
| out_distinct_basic | CREATE TABLE IF NOT EXISTS "out_distinct_basic" ( |
|
||||||
|
| | "dis" INT NULL, |
|
||||||
|
| | "update_at" TIMESTAMP(3) NULL, |
|
||||||
|
| | "__ts_placeholder" TIMESTAMP(3) NOT NULL, |
|
||||||
|
| | TIME INDEX ("__ts_placeholder"), |
|
||||||
|
| | PRIMARY KEY ("dis") |
|
||||||
|
| | ) |
|
||||||
|
| | |
|
||||||
|
| | ENGINE=mito |
|
||||||
|
| | |
|
||||||
|
+--------------------+---------------------------------------------------+
|
||||||
|
|
||||||
|
SELECT
|
||||||
|
dis
|
||||||
|
FROM
|
||||||
|
out_distinct_basic;
|
||||||
|
|
||||||
|
+-----+
|
||||||
|
| dis |
|
||||||
|
+-----+
|
||||||
|
| 20 |
|
||||||
|
| 22 |
|
||||||
|
| 23 |
|
||||||
|
+-----+
|
||||||
|
|
||||||
|
SELECT number FROM distinct_basic;
|
||||||
|
|
||||||
|
+--------+
|
||||||
|
| number |
|
||||||
|
+--------+
|
||||||
|
| 23 |
|
||||||
|
+--------+
|
||||||
|
|
||||||
DROP FLOW test_distinct_basic;
|
DROP FLOW test_distinct_basic;
|
||||||
|
|
||||||
|
|||||||
@@ -6,6 +6,16 @@ CREATE TABLE distinct_basic (
|
|||||||
TIME INDEX(ts)
|
TIME INDEX(ts)
|
||||||
)WITH ('ttl' = 'instant');
|
)WITH ('ttl' = 'instant');
|
||||||
|
|
||||||
|
-- should fail
|
||||||
|
-- SQLNESS REPLACE id=\d+ id=REDACTED
|
||||||
|
CREATE FLOW test_distinct_basic SINK TO out_distinct_basic AS
|
||||||
|
SELECT
|
||||||
|
DISTINCT number as dis
|
||||||
|
FROM
|
||||||
|
distinct_basic;
|
||||||
|
|
||||||
|
ALTER TABLE distinct_basic SET 'ttl' = '5s';
|
||||||
|
|
||||||
CREATE FLOW test_distinct_basic SINK TO out_distinct_basic AS
|
CREATE FLOW test_distinct_basic SINK TO out_distinct_basic AS
|
||||||
SELECT
|
SELECT
|
||||||
DISTINCT number as dis
|
DISTINCT number as dis
|
||||||
@@ -34,6 +44,28 @@ FROM
|
|||||||
|
|
||||||
SELECT number FROM distinct_basic;
|
SELECT number FROM distinct_basic;
|
||||||
|
|
||||||
|
-- SQLNESS SLEEP 6s
|
||||||
|
ADMIN FLUSH_TABLE('distinct_basic');
|
||||||
|
|
||||||
|
INSERT INTO
|
||||||
|
distinct_basic
|
||||||
|
VALUES
|
||||||
|
(23, "2021-07-01 00:00:01.600");
|
||||||
|
|
||||||
|
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
|
||||||
|
ADMIN FLUSH_FLOW('test_distinct_basic');
|
||||||
|
|
||||||
|
SHOW CREATE TABLE distinct_basic;
|
||||||
|
|
||||||
|
SHOW CREATE TABLE out_distinct_basic;
|
||||||
|
|
||||||
|
SELECT
|
||||||
|
dis
|
||||||
|
FROM
|
||||||
|
out_distinct_basic;
|
||||||
|
|
||||||
|
SELECT number FROM distinct_basic;
|
||||||
|
|
||||||
DROP FLOW test_distinct_basic;
|
DROP FLOW test_distinct_basic;
|
||||||
DROP TABLE distinct_basic;
|
DROP TABLE distinct_basic;
|
||||||
DROP TABLE out_distinct_basic;
|
DROP TABLE out_distinct_basic;
|
||||||
@@ -9,11 +9,12 @@ Affected Rows: 0
|
|||||||
|
|
||||||
CREATE FLOW test_numbers_basic SINK TO out_num_cnt_basic AS
|
CREATE FLOW test_numbers_basic SINK TO out_num_cnt_basic AS
|
||||||
SELECT
|
SELECT
|
||||||
sum(number)
|
sum(number),
|
||||||
|
date_bin(INTERVAL '1 second', ts, '2021-07-01 00:00:00') as time_window
|
||||||
FROM
|
FROM
|
||||||
numbers_input_basic
|
numbers_input_basic
|
||||||
GROUP BY
|
GROUP BY
|
||||||
tumble(ts, '1 second', '2021-07-01 00:00:00');
|
time_window;
|
||||||
|
|
||||||
Affected Rows: 0
|
Affected Rows: 0
|
||||||
|
|
||||||
@@ -24,11 +25,9 @@ SHOW CREATE TABLE out_num_cnt_basic;
|
|||||||
+-------------------+--------------------------------------------------+
|
+-------------------+--------------------------------------------------+
|
||||||
| out_num_cnt_basic | CREATE TABLE IF NOT EXISTS "out_num_cnt_basic" ( |
|
| out_num_cnt_basic | CREATE TABLE IF NOT EXISTS "out_num_cnt_basic" ( |
|
||||||
| | "sum(numbers_input_basic.number)" BIGINT NULL, |
|
| | "sum(numbers_input_basic.number)" BIGINT NULL, |
|
||||||
| | "window_start" TIMESTAMP(3) NOT NULL, |
|
| | "time_window" TIMESTAMP(9) NOT NULL, |
|
||||||
| | "window_end" TIMESTAMP(3) NULL, |
|
|
||||||
| | "update_at" TIMESTAMP(3) NULL, |
|
| | "update_at" TIMESTAMP(3) NULL, |
|
||||||
| | TIME INDEX ("window_start"), |
|
| | TIME INDEX ("time_window") |
|
||||||
| | PRIMARY KEY ("window_end") |
|
|
||||||
| | ) |
|
| | ) |
|
||||||
| | |
|
| | |
|
||||||
| | ENGINE=mito |
|
| | ENGINE=mito |
|
||||||
@@ -52,11 +51,9 @@ SHOW CREATE TABLE out_num_cnt_basic;
|
|||||||
+-------------------+--------------------------------------------------+
|
+-------------------+--------------------------------------------------+
|
||||||
| out_num_cnt_basic | CREATE TABLE IF NOT EXISTS "out_num_cnt_basic" ( |
|
| out_num_cnt_basic | CREATE TABLE IF NOT EXISTS "out_num_cnt_basic" ( |
|
||||||
| | "sum(numbers_input_basic.number)" BIGINT NULL, |
|
| | "sum(numbers_input_basic.number)" BIGINT NULL, |
|
||||||
| | "window_start" TIMESTAMP(3) NOT NULL, |
|
| | "time_window" TIMESTAMP(9) NOT NULL, |
|
||||||
| | "window_end" TIMESTAMP(3) NULL, |
|
|
||||||
| | "update_at" TIMESTAMP(3) NULL, |
|
| | "update_at" TIMESTAMP(3) NULL, |
|
||||||
| | TIME INDEX ("window_start"), |
|
| | TIME INDEX ("time_window") |
|
||||||
| | PRIMARY KEY ("window_end") |
|
|
||||||
| | ) |
|
| | ) |
|
||||||
| | |
|
| | |
|
||||||
| | ENGINE=mito |
|
| | ENGINE=mito |
|
||||||
@@ -65,13 +62,13 @@ SHOW CREATE TABLE out_num_cnt_basic;
|
|||||||
|
|
||||||
SHOW CREATE FLOW test_numbers_basic;
|
SHOW CREATE FLOW test_numbers_basic;
|
||||||
|
|
||||||
+--------------------+-------------------------------------------------------------------------------------------------------+
|
+--------------------+----------------------------------------------------------------------------------------------------------------------------------------------+
|
||||||
| Flow | Create Flow |
|
| Flow | Create Flow |
|
||||||
+--------------------+-------------------------------------------------------------------------------------------------------+
|
+--------------------+----------------------------------------------------------------------------------------------------------------------------------------------+
|
||||||
| test_numbers_basic | CREATE FLOW IF NOT EXISTS test_numbers_basic |
|
| test_numbers_basic | CREATE FLOW IF NOT EXISTS test_numbers_basic |
|
||||||
| | SINK TO out_num_cnt_basic |
|
| | SINK TO out_num_cnt_basic |
|
||||||
| | AS SELECT sum(number) FROM numbers_input_basic GROUP BY tumble(ts, '1 second', '2021-07-01 00:00:00') |
|
| | AS SELECT sum(number), date_bin(INTERVAL '1 second', ts, '2021-07-01 00:00:00') AS time_window FROM numbers_input_basic GROUP BY time_window |
|
||||||
+--------------------+-------------------------------------------------------------------------------------------------------+
|
+--------------------+----------------------------------------------------------------------------------------------------------------------------------------------+
|
||||||
|
|
||||||
DROP FLOW test_numbers_basic;
|
DROP FLOW test_numbers_basic;
|
||||||
|
|
||||||
|
|||||||
@@ -7,11 +7,12 @@ CREATE TABLE numbers_input_basic (
|
|||||||
|
|
||||||
CREATE FLOW test_numbers_basic SINK TO out_num_cnt_basic AS
|
CREATE FLOW test_numbers_basic SINK TO out_num_cnt_basic AS
|
||||||
SELECT
|
SELECT
|
||||||
sum(number)
|
sum(number),
|
||||||
|
date_bin(INTERVAL '1 second', ts, '2021-07-01 00:00:00') as time_window
|
||||||
FROM
|
FROM
|
||||||
numbers_input_basic
|
numbers_input_basic
|
||||||
GROUP BY
|
GROUP BY
|
||||||
tumble(ts, '1 second', '2021-07-01 00:00:00');
|
time_window;
|
||||||
|
|
||||||
SHOW CREATE TABLE out_num_cnt_basic;
|
SHOW CREATE TABLE out_num_cnt_basic;
|
||||||
|
|
||||||
|
|||||||
@@ -9,11 +9,12 @@ Affected Rows: 0
|
|||||||
|
|
||||||
CREATE FLOW test_numbers_basic SINK TO out_num_cnt_basic AS
|
CREATE FLOW test_numbers_basic SINK TO out_num_cnt_basic AS
|
||||||
SELECT
|
SELECT
|
||||||
sum(number)
|
sum(number),
|
||||||
|
date_bin(INTERVAL '1 second', ts, '2021-07-01 00:00:00') as time_window
|
||||||
FROM
|
FROM
|
||||||
numbers_input_basic
|
numbers_input_basic
|
||||||
GROUP BY
|
GROUP BY
|
||||||
tumble(ts, '1 second', '2021-07-01 00:00:00');
|
time_window;
|
||||||
|
|
||||||
Affected Rows: 0
|
Affected Rows: 0
|
||||||
|
|
||||||
@@ -24,11 +25,9 @@ SHOW CREATE TABLE out_num_cnt_basic;
|
|||||||
+-------------------+--------------------------------------------------+
|
+-------------------+--------------------------------------------------+
|
||||||
| out_num_cnt_basic | CREATE TABLE IF NOT EXISTS "out_num_cnt_basic" ( |
|
| out_num_cnt_basic | CREATE TABLE IF NOT EXISTS "out_num_cnt_basic" ( |
|
||||||
| | "sum(numbers_input_basic.number)" BIGINT NULL, |
|
| | "sum(numbers_input_basic.number)" BIGINT NULL, |
|
||||||
| | "window_start" TIMESTAMP(3) NOT NULL, |
|
| | "time_window" TIMESTAMP(9) NOT NULL, |
|
||||||
| | "window_end" TIMESTAMP(3) NULL, |
|
|
||||||
| | "update_at" TIMESTAMP(3) NULL, |
|
| | "update_at" TIMESTAMP(3) NULL, |
|
||||||
| | TIME INDEX ("window_start"), |
|
| | TIME INDEX ("time_window") |
|
||||||
| | PRIMARY KEY ("window_end") |
|
|
||||||
| | ) |
|
| | ) |
|
||||||
| | |
|
| | |
|
||||||
| | ENGINE=mito |
|
| | ENGINE=mito |
|
||||||
@@ -53,11 +52,9 @@ SHOW CREATE TABLE out_num_cnt_basic;
|
|||||||
+-------------------+--------------------------------------------------+
|
+-------------------+--------------------------------------------------+
|
||||||
| out_num_cnt_basic | CREATE TABLE IF NOT EXISTS "out_num_cnt_basic" ( |
|
| out_num_cnt_basic | CREATE TABLE IF NOT EXISTS "out_num_cnt_basic" ( |
|
||||||
| | "sum(numbers_input_basic.number)" BIGINT NULL, |
|
| | "sum(numbers_input_basic.number)" BIGINT NULL, |
|
||||||
| | "window_start" TIMESTAMP(3) NOT NULL, |
|
| | "time_window" TIMESTAMP(9) NOT NULL, |
|
||||||
| | "window_end" TIMESTAMP(3) NULL, |
|
|
||||||
| | "update_at" TIMESTAMP(3) NULL, |
|
| | "update_at" TIMESTAMP(3) NULL, |
|
||||||
| | TIME INDEX ("window_start"), |
|
| | TIME INDEX ("time_window") |
|
||||||
| | PRIMARY KEY ("window_end") |
|
|
||||||
| | ) |
|
| | ) |
|
||||||
| | |
|
| | |
|
||||||
| | ENGINE=mito |
|
| | ENGINE=mito |
|
||||||
@@ -84,16 +81,15 @@ ADMIN FLUSH_FLOW('test_numbers_basic');
|
|||||||
|
|
||||||
SELECT
|
SELECT
|
||||||
"sum(numbers_input_basic.number)",
|
"sum(numbers_input_basic.number)",
|
||||||
window_start,
|
time_window
|
||||||
window_end
|
|
||||||
FROM
|
FROM
|
||||||
out_num_cnt_basic;
|
out_num_cnt_basic;
|
||||||
|
|
||||||
+---------------------------------+---------------------+---------------------+
|
+---------------------------------+---------------------+
|
||||||
| sum(numbers_input_basic.number) | window_start | window_end |
|
| sum(numbers_input_basic.number) | time_window |
|
||||||
+---------------------------------+---------------------+---------------------+
|
+---------------------------------+---------------------+
|
||||||
| 42 | 2021-07-01T00:00:00 | 2021-07-01T00:00:01 |
|
| 42 | 2021-07-01T00:00:00 |
|
||||||
+---------------------------------+---------------------+---------------------+
|
+---------------------------------+---------------------+
|
||||||
|
|
||||||
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
|
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
|
||||||
ADMIN FLUSH_FLOW('test_numbers_basic');
|
ADMIN FLUSH_FLOW('test_numbers_basic');
|
||||||
@@ -124,17 +120,16 @@ ADMIN FLUSH_FLOW('test_numbers_basic');
|
|||||||
-- note that this quote-unquote column is a column-name, **not** a aggregation expr, generated by datafusion
|
-- note that this quote-unquote column is a column-name, **not** a aggregation expr, generated by datafusion
|
||||||
SELECT
|
SELECT
|
||||||
"sum(numbers_input_basic.number)",
|
"sum(numbers_input_basic.number)",
|
||||||
window_start,
|
time_window
|
||||||
window_end
|
|
||||||
FROM
|
FROM
|
||||||
out_num_cnt_basic;
|
out_num_cnt_basic;
|
||||||
|
|
||||||
+---------------------------------+---------------------+---------------------+
|
+---------------------------------+---------------------+
|
||||||
| sum(numbers_input_basic.number) | window_start | window_end |
|
| sum(numbers_input_basic.number) | time_window |
|
||||||
+---------------------------------+---------------------+---------------------+
|
+---------------------------------+---------------------+
|
||||||
| 42 | 2021-07-01T00:00:00 | 2021-07-01T00:00:01 |
|
| 42 | 2021-07-01T00:00:00 |
|
||||||
| 47 | 2021-07-01T00:00:01 | 2021-07-01T00:00:02 |
|
| 47 | 2021-07-01T00:00:01 |
|
||||||
+---------------------------------+---------------------+---------------------+
|
+---------------------------------+---------------------+
|
||||||
|
|
||||||
DROP FLOW test_numbers_basic;
|
DROP FLOW test_numbers_basic;
|
||||||
|
|
||||||
@@ -896,6 +891,8 @@ CREATE TABLE temp_sensor_data (
|
|||||||
loc STRING,
|
loc STRING,
|
||||||
temperature DOUBLE,
|
temperature DOUBLE,
|
||||||
ts TIMESTAMP TIME INDEX
|
ts TIMESTAMP TIME INDEX
|
||||||
|
)WITH(
|
||||||
|
append_mode = 'true'
|
||||||
);
|
);
|
||||||
|
|
||||||
Affected Rows: 0
|
Affected Rows: 0
|
||||||
@@ -904,7 +901,8 @@ CREATE TABLE temp_alerts (
|
|||||||
sensor_id INT,
|
sensor_id INT,
|
||||||
loc STRING,
|
loc STRING,
|
||||||
max_temp DOUBLE,
|
max_temp DOUBLE,
|
||||||
ts TIMESTAMP TIME INDEX
|
event_ts TIMESTAMP TIME INDEX,
|
||||||
|
update_at TIMESTAMP
|
||||||
);
|
);
|
||||||
|
|
||||||
Affected Rows: 0
|
Affected Rows: 0
|
||||||
@@ -914,6 +912,7 @@ SELECT
|
|||||||
sensor_id,
|
sensor_id,
|
||||||
loc,
|
loc,
|
||||||
max(temperature) as max_temp,
|
max(temperature) as max_temp,
|
||||||
|
max(ts) as event_ts
|
||||||
FROM
|
FROM
|
||||||
temp_sensor_data
|
temp_sensor_data
|
||||||
GROUP BY
|
GROUP BY
|
||||||
@@ -933,8 +932,9 @@ SHOW CREATE TABLE temp_alerts;
|
|||||||
| | "sensor_id" INT NULL, |
|
| | "sensor_id" INT NULL, |
|
||||||
| | "loc" STRING NULL, |
|
| | "loc" STRING NULL, |
|
||||||
| | "max_temp" DOUBLE NULL, |
|
| | "max_temp" DOUBLE NULL, |
|
||||||
| | "ts" TIMESTAMP(3) NOT NULL, |
|
| | "event_ts" TIMESTAMP(3) NOT NULL, |
|
||||||
| | TIME INDEX ("ts") |
|
| | "update_at" TIMESTAMP(3) NULL, |
|
||||||
|
| | TIME INDEX ("event_ts") |
|
||||||
| | ) |
|
| | ) |
|
||||||
| | |
|
| | |
|
||||||
| | ENGINE=mito |
|
| | ENGINE=mito |
|
||||||
@@ -993,15 +993,16 @@ SHOW TABLES LIKE 'temp_alerts';
|
|||||||
SELECT
|
SELECT
|
||||||
sensor_id,
|
sensor_id,
|
||||||
loc,
|
loc,
|
||||||
max_temp
|
max_temp,
|
||||||
|
event_ts
|
||||||
FROM
|
FROM
|
||||||
temp_alerts;
|
temp_alerts;
|
||||||
|
|
||||||
+-----------+-------+----------+
|
+-----------+-------+----------+-------------------------+
|
||||||
| sensor_id | loc | max_temp |
|
| sensor_id | loc | max_temp | event_ts |
|
||||||
+-----------+-------+----------+
|
+-----------+-------+----------+-------------------------+
|
||||||
| 1 | room1 | 150.0 |
|
| 1 | room1 | 150.0 | 1970-01-01T00:00:00.001 |
|
||||||
+-----------+-------+----------+
|
+-----------+-------+----------+-------------------------+
|
||||||
|
|
||||||
INSERT INTO
|
INSERT INTO
|
||||||
temp_sensor_data
|
temp_sensor_data
|
||||||
@@ -1022,15 +1023,16 @@ ADMIN FLUSH_FLOW('temp_monitoring');
|
|||||||
SELECT
|
SELECT
|
||||||
sensor_id,
|
sensor_id,
|
||||||
loc,
|
loc,
|
||||||
max_temp
|
max_temp,
|
||||||
|
event_ts
|
||||||
FROM
|
FROM
|
||||||
temp_alerts;
|
temp_alerts;
|
||||||
|
|
||||||
+-----------+-------+----------+
|
+-----------+-------+----------+-------------------------+
|
||||||
| sensor_id | loc | max_temp |
|
| sensor_id | loc | max_temp | event_ts |
|
||||||
+-----------+-------+----------+
|
+-----------+-------+----------+-------------------------+
|
||||||
| 1 | room1 | 150.0 |
|
| 1 | room1 | 150.0 | 1970-01-01T00:00:00.001 |
|
||||||
+-----------+-------+----------+
|
+-----------+-------+----------+-------------------------+
|
||||||
|
|
||||||
DROP FLOW temp_monitoring;
|
DROP FLOW temp_monitoring;
|
||||||
|
|
||||||
@@ -1049,6 +1051,8 @@ CREATE TABLE ngx_access_log (
|
|||||||
stat INT,
|
stat INT,
|
||||||
size INT,
|
size INT,
|
||||||
access_time TIMESTAMP TIME INDEX
|
access_time TIMESTAMP TIME INDEX
|
||||||
|
)WITH(
|
||||||
|
append_mode = 'true'
|
||||||
);
|
);
|
||||||
|
|
||||||
Affected Rows: 0
|
Affected Rows: 0
|
||||||
@@ -1183,6 +1187,8 @@ CREATE TABLE requests (
|
|||||||
service_ip STRING,
|
service_ip STRING,
|
||||||
val INT,
|
val INT,
|
||||||
ts TIMESTAMP TIME INDEX
|
ts TIMESTAMP TIME INDEX
|
||||||
|
)WITH(
|
||||||
|
append_mode = 'true'
|
||||||
);
|
);
|
||||||
|
|
||||||
Affected Rows: 0
|
Affected Rows: 0
|
||||||
@@ -1392,6 +1398,8 @@ CREATE TABLE android_log (
|
|||||||
`log` STRING,
|
`log` STRING,
|
||||||
ts TIMESTAMP(9),
|
ts TIMESTAMP(9),
|
||||||
TIME INDEX(ts)
|
TIME INDEX(ts)
|
||||||
|
)WITH(
|
||||||
|
append_mode = 'true'
|
||||||
);
|
);
|
||||||
|
|
||||||
Affected Rows: 0
|
Affected Rows: 0
|
||||||
@@ -1503,6 +1511,8 @@ CREATE TABLE android_log (
|
|||||||
`log` STRING,
|
`log` STRING,
|
||||||
ts TIMESTAMP(9),
|
ts TIMESTAMP(9),
|
||||||
TIME INDEX(ts)
|
TIME INDEX(ts)
|
||||||
|
)WITH(
|
||||||
|
append_mode = 'true'
|
||||||
);
|
);
|
||||||
|
|
||||||
Affected Rows: 0
|
Affected Rows: 0
|
||||||
|
|||||||
@@ -7,11 +7,12 @@ CREATE TABLE numbers_input_basic (
|
|||||||
|
|
||||||
CREATE FLOW test_numbers_basic SINK TO out_num_cnt_basic AS
|
CREATE FLOW test_numbers_basic SINK TO out_num_cnt_basic AS
|
||||||
SELECT
|
SELECT
|
||||||
sum(number)
|
sum(number),
|
||||||
|
date_bin(INTERVAL '1 second', ts, '2021-07-01 00:00:00') as time_window
|
||||||
FROM
|
FROM
|
||||||
numbers_input_basic
|
numbers_input_basic
|
||||||
GROUP BY
|
GROUP BY
|
||||||
tumble(ts, '1 second', '2021-07-01 00:00:00');
|
time_window;
|
||||||
|
|
||||||
SHOW CREATE TABLE out_num_cnt_basic;
|
SHOW CREATE TABLE out_num_cnt_basic;
|
||||||
|
|
||||||
@@ -34,8 +35,7 @@ ADMIN FLUSH_FLOW('test_numbers_basic');
|
|||||||
|
|
||||||
SELECT
|
SELECT
|
||||||
"sum(numbers_input_basic.number)",
|
"sum(numbers_input_basic.number)",
|
||||||
window_start,
|
time_window
|
||||||
window_end
|
|
||||||
FROM
|
FROM
|
||||||
out_num_cnt_basic;
|
out_num_cnt_basic;
|
||||||
|
|
||||||
@@ -54,8 +54,7 @@ ADMIN FLUSH_FLOW('test_numbers_basic');
|
|||||||
-- note that this quote-unquote column is a column-name, **not** a aggregation expr, generated by datafusion
|
-- note that this quote-unquote column is a column-name, **not** a aggregation expr, generated by datafusion
|
||||||
SELECT
|
SELECT
|
||||||
"sum(numbers_input_basic.number)",
|
"sum(numbers_input_basic.number)",
|
||||||
window_start,
|
time_window
|
||||||
window_end
|
|
||||||
FROM
|
FROM
|
||||||
out_num_cnt_basic;
|
out_num_cnt_basic;
|
||||||
|
|
||||||
@@ -403,13 +402,16 @@ CREATE TABLE temp_sensor_data (
|
|||||||
loc STRING,
|
loc STRING,
|
||||||
temperature DOUBLE,
|
temperature DOUBLE,
|
||||||
ts TIMESTAMP TIME INDEX
|
ts TIMESTAMP TIME INDEX
|
||||||
|
)WITH(
|
||||||
|
append_mode = 'true'
|
||||||
);
|
);
|
||||||
|
|
||||||
CREATE TABLE temp_alerts (
|
CREATE TABLE temp_alerts (
|
||||||
sensor_id INT,
|
sensor_id INT,
|
||||||
loc STRING,
|
loc STRING,
|
||||||
max_temp DOUBLE,
|
max_temp DOUBLE,
|
||||||
ts TIMESTAMP TIME INDEX
|
event_ts TIMESTAMP TIME INDEX,
|
||||||
|
update_at TIMESTAMP
|
||||||
);
|
);
|
||||||
|
|
||||||
CREATE FLOW temp_monitoring SINK TO temp_alerts AS
|
CREATE FLOW temp_monitoring SINK TO temp_alerts AS
|
||||||
@@ -417,6 +419,7 @@ SELECT
|
|||||||
sensor_id,
|
sensor_id,
|
||||||
loc,
|
loc,
|
||||||
max(temperature) as max_temp,
|
max(temperature) as max_temp,
|
||||||
|
max(ts) as event_ts
|
||||||
FROM
|
FROM
|
||||||
temp_sensor_data
|
temp_sensor_data
|
||||||
GROUP BY
|
GROUP BY
|
||||||
@@ -451,7 +454,8 @@ SHOW TABLES LIKE 'temp_alerts';
|
|||||||
SELECT
|
SELECT
|
||||||
sensor_id,
|
sensor_id,
|
||||||
loc,
|
loc,
|
||||||
max_temp
|
max_temp,
|
||||||
|
event_ts
|
||||||
FROM
|
FROM
|
||||||
temp_alerts;
|
temp_alerts;
|
||||||
|
|
||||||
@@ -466,7 +470,8 @@ ADMIN FLUSH_FLOW('temp_monitoring');
|
|||||||
SELECT
|
SELECT
|
||||||
sensor_id,
|
sensor_id,
|
||||||
loc,
|
loc,
|
||||||
max_temp
|
max_temp,
|
||||||
|
event_ts
|
||||||
FROM
|
FROM
|
||||||
temp_alerts;
|
temp_alerts;
|
||||||
|
|
||||||
@@ -481,6 +486,8 @@ CREATE TABLE ngx_access_log (
|
|||||||
stat INT,
|
stat INT,
|
||||||
size INT,
|
size INT,
|
||||||
access_time TIMESTAMP TIME INDEX
|
access_time TIMESTAMP TIME INDEX
|
||||||
|
)WITH(
|
||||||
|
append_mode = 'true'
|
||||||
);
|
);
|
||||||
|
|
||||||
CREATE TABLE ngx_distribution (
|
CREATE TABLE ngx_distribution (
|
||||||
@@ -555,6 +562,8 @@ CREATE TABLE requests (
|
|||||||
service_ip STRING,
|
service_ip STRING,
|
||||||
val INT,
|
val INT,
|
||||||
ts TIMESTAMP TIME INDEX
|
ts TIMESTAMP TIME INDEX
|
||||||
|
)WITH(
|
||||||
|
append_mode = 'true'
|
||||||
);
|
);
|
||||||
|
|
||||||
CREATE TABLE requests_without_ip (
|
CREATE TABLE requests_without_ip (
|
||||||
@@ -650,6 +659,8 @@ CREATE TABLE android_log (
|
|||||||
`log` STRING,
|
`log` STRING,
|
||||||
ts TIMESTAMP(9),
|
ts TIMESTAMP(9),
|
||||||
TIME INDEX(ts)
|
TIME INDEX(ts)
|
||||||
|
)WITH(
|
||||||
|
append_mode = 'true'
|
||||||
);
|
);
|
||||||
|
|
||||||
CREATE TABLE android_log_abnormal (
|
CREATE TABLE android_log_abnormal (
|
||||||
@@ -704,6 +715,8 @@ CREATE TABLE android_log (
|
|||||||
`log` STRING,
|
`log` STRING,
|
||||||
ts TIMESTAMP(9),
|
ts TIMESTAMP(9),
|
||||||
TIME INDEX(ts)
|
TIME INDEX(ts)
|
||||||
|
)WITH(
|
||||||
|
append_mode = 'true'
|
||||||
);
|
);
|
||||||
|
|
||||||
CREATE TABLE android_log_abnormal (
|
CREATE TABLE android_log_abnormal (
|
||||||
|
|||||||
@@ -19,7 +19,9 @@ Affected Rows: 0
|
|||||||
|
|
||||||
CREATE FLOW calc_avg_speed SINK TO avg_speed AS
|
CREATE FLOW calc_avg_speed SINK TO avg_speed AS
|
||||||
SELECT
|
SELECT
|
||||||
avg((left_wheel + right_wheel) / 2)
|
avg((left_wheel + right_wheel) / 2) as avg_speed,
|
||||||
|
date_bin(INTERVAL '5 second', ts) as start_window,
|
||||||
|
date_bin(INTERVAL '5 second', ts) + INTERVAL '5 second' as end_window,
|
||||||
FROM
|
FROM
|
||||||
velocity
|
velocity
|
||||||
WHERE
|
WHERE
|
||||||
@@ -28,7 +30,7 @@ WHERE
|
|||||||
AND left_wheel < 60
|
AND left_wheel < 60
|
||||||
AND right_wheel < 60
|
AND right_wheel < 60
|
||||||
GROUP BY
|
GROUP BY
|
||||||
tumble(ts, '5 second');
|
start_window;
|
||||||
|
|
||||||
Affected Rows: 0
|
Affected Rows: 0
|
||||||
|
|
||||||
|
|||||||
@@ -15,7 +15,9 @@ CREATE TABLE avg_speed (
|
|||||||
|
|
||||||
CREATE FLOW calc_avg_speed SINK TO avg_speed AS
|
CREATE FLOW calc_avg_speed SINK TO avg_speed AS
|
||||||
SELECT
|
SELECT
|
||||||
avg((left_wheel + right_wheel) / 2)
|
avg((left_wheel + right_wheel) / 2) as avg_speed,
|
||||||
|
date_bin(INTERVAL '5 second', ts) as start_window,
|
||||||
|
date_bin(INTERVAL '5 second', ts) + INTERVAL '5 second' as end_window,
|
||||||
FROM
|
FROM
|
||||||
velocity
|
velocity
|
||||||
WHERE
|
WHERE
|
||||||
@@ -24,7 +26,7 @@ WHERE
|
|||||||
AND left_wheel < 60
|
AND left_wheel < 60
|
||||||
AND right_wheel < 60
|
AND right_wheel < 60
|
||||||
GROUP BY
|
GROUP BY
|
||||||
tumble(ts, '5 second');
|
start_window;
|
||||||
|
|
||||||
INSERT INTO
|
INSERT INTO
|
||||||
velocity
|
velocity
|
||||||
|
|||||||
@@ -11,7 +11,7 @@ Affected Rows: 0
|
|||||||
CREATE FLOW test_numbers_df_func
|
CREATE FLOW test_numbers_df_func
|
||||||
SINK TO out_num_cnt_df_func
|
SINK TO out_num_cnt_df_func
|
||||||
AS
|
AS
|
||||||
SELECT sum(abs(number)) FROM numbers_input_df_func GROUP BY tumble(ts, '1 second', '2021-07-01 00:00:00');
|
SELECT sum(abs(number)), date_bin(INTERVAL '1 second', ts, '2021-07-01 00:00:00') as time_window FROM numbers_input_df_func GROUP BY time_window;
|
||||||
|
|
||||||
Affected Rows: 0
|
Affected Rows: 0
|
||||||
|
|
||||||
@@ -42,13 +42,13 @@ ADMIN FLUSH_FLOW('test_numbers_df_func');
|
|||||||
+------------------------------------------+
|
+------------------------------------------+
|
||||||
|
|
||||||
-- note that this quote-unquote column is a column-name, **not** a aggregation expr, generated by datafusion
|
-- note that this quote-unquote column is a column-name, **not** a aggregation expr, generated by datafusion
|
||||||
SELECT "sum(abs(numbers_input_df_func.number))", window_start, window_end FROM out_num_cnt_df_func;
|
SELECT "sum(abs(numbers_input_df_func.number))", time_window FROM out_num_cnt_df_func;
|
||||||
|
|
||||||
+----------------------------------------+---------------------+---------------------+
|
+----------------------------------------+---------------------+
|
||||||
| sum(abs(numbers_input_df_func.number)) | window_start | window_end |
|
| sum(abs(numbers_input_df_func.number)) | time_window |
|
||||||
+----------------------------------------+---------------------+---------------------+
|
+----------------------------------------+---------------------+
|
||||||
| 42 | 2021-07-01T00:00:00 | 2021-07-01T00:00:01 |
|
| 42 | 2021-07-01T00:00:00 |
|
||||||
+----------------------------------------+---------------------+---------------------+
|
+----------------------------------------+---------------------+
|
||||||
|
|
||||||
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
|
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
|
||||||
ADMIN FLUSH_FLOW('test_numbers_df_func');
|
ADMIN FLUSH_FLOW('test_numbers_df_func');
|
||||||
@@ -76,14 +76,14 @@ ADMIN FLUSH_FLOW('test_numbers_df_func');
|
|||||||
+------------------------------------------+
|
+------------------------------------------+
|
||||||
|
|
||||||
-- note that this quote-unquote column is a column-name, **not** a aggregation expr, generated by datafusion
|
-- note that this quote-unquote column is a column-name, **not** a aggregation expr, generated by datafusion
|
||||||
SELECT "sum(abs(numbers_input_df_func.number))", window_start, window_end FROM out_num_cnt_df_func;
|
SELECT "sum(abs(numbers_input_df_func.number))", time_window FROM out_num_cnt_df_func;
|
||||||
|
|
||||||
+----------------------------------------+---------------------+---------------------+
|
+----------------------------------------+---------------------+
|
||||||
| sum(abs(numbers_input_df_func.number)) | window_start | window_end |
|
| sum(abs(numbers_input_df_func.number)) | time_window |
|
||||||
+----------------------------------------+---------------------+---------------------+
|
+----------------------------------------+---------------------+
|
||||||
| 42 | 2021-07-01T00:00:00 | 2021-07-01T00:00:01 |
|
| 42 | 2021-07-01T00:00:00 |
|
||||||
| 47 | 2021-07-01T00:00:01 | 2021-07-01T00:00:02 |
|
| 47 | 2021-07-01T00:00:01 |
|
||||||
+----------------------------------------+---------------------+---------------------+
|
+----------------------------------------+---------------------+
|
||||||
|
|
||||||
DROP FLOW test_numbers_df_func;
|
DROP FLOW test_numbers_df_func;
|
||||||
|
|
||||||
@@ -110,7 +110,7 @@ Affected Rows: 0
|
|||||||
CREATE FLOW test_numbers_df_func
|
CREATE FLOW test_numbers_df_func
|
||||||
SINK TO out_num_cnt_df_func
|
SINK TO out_num_cnt_df_func
|
||||||
AS
|
AS
|
||||||
SELECT abs(sum(number)) FROM numbers_input_df_func GROUP BY tumble(ts, '1 second', '2021-07-01 00:00:00');
|
SELECT abs(sum(number)), date_bin(INTERVAL '1 second', ts, '2021-07-01 00:00:00') as time_window FROM numbers_input_df_func GROUP BY time_window;
|
||||||
|
|
||||||
Affected Rows: 0
|
Affected Rows: 0
|
||||||
|
|
||||||
@@ -140,13 +140,13 @@ ADMIN FLUSH_FLOW('test_numbers_df_func');
|
|||||||
| FLOW_FLUSHED |
|
| FLOW_FLUSHED |
|
||||||
+------------------------------------------+
|
+------------------------------------------+
|
||||||
|
|
||||||
SELECT "abs(sum(numbers_input_df_func.number))", window_start, window_end FROM out_num_cnt_df_func;
|
SELECT "abs(sum(numbers_input_df_func.number))", time_window FROM out_num_cnt_df_func;
|
||||||
|
|
||||||
+----------------------------------------+---------------------+---------------------+
|
+----------------------------------------+---------------------+
|
||||||
| abs(sum(numbers_input_df_func.number)) | window_start | window_end |
|
| abs(sum(numbers_input_df_func.number)) | time_window |
|
||||||
+----------------------------------------+---------------------+---------------------+
|
+----------------------------------------+---------------------+
|
||||||
| 2 | 2021-07-01T00:00:00 | 2021-07-01T00:00:01 |
|
| 2 | 2021-07-01T00:00:00 |
|
||||||
+----------------------------------------+---------------------+---------------------+
|
+----------------------------------------+---------------------+
|
||||||
|
|
||||||
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
|
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
|
||||||
ADMIN FLUSH_FLOW('test_numbers_df_func');
|
ADMIN FLUSH_FLOW('test_numbers_df_func');
|
||||||
@@ -173,14 +173,14 @@ ADMIN FLUSH_FLOW('test_numbers_df_func');
|
|||||||
| FLOW_FLUSHED |
|
| FLOW_FLUSHED |
|
||||||
+------------------------------------------+
|
+------------------------------------------+
|
||||||
|
|
||||||
SELECT "abs(sum(numbers_input_df_func.number))", window_start, window_end FROM out_num_cnt_df_func;
|
SELECT "abs(sum(numbers_input_df_func.number))", time_window FROM out_num_cnt_df_func;
|
||||||
|
|
||||||
+----------------------------------------+---------------------+---------------------+
|
+----------------------------------------+---------------------+
|
||||||
| abs(sum(numbers_input_df_func.number)) | window_start | window_end |
|
| abs(sum(numbers_input_df_func.number)) | time_window |
|
||||||
+----------------------------------------+---------------------+---------------------+
|
+----------------------------------------+---------------------+
|
||||||
| 2 | 2021-07-01T00:00:00 | 2021-07-01T00:00:01 |
|
| 2 | 2021-07-01T00:00:00 |
|
||||||
| 1 | 2021-07-01T00:00:01 | 2021-07-01T00:00:02 |
|
| 1 | 2021-07-01T00:00:01 |
|
||||||
+----------------------------------------+---------------------+---------------------+
|
+----------------------------------------+---------------------+
|
||||||
|
|
||||||
DROP FLOW test_numbers_df_func;
|
DROP FLOW test_numbers_df_func;
|
||||||
|
|
||||||
|
|||||||
@@ -9,7 +9,7 @@ CREATE TABLE numbers_input_df_func (
|
|||||||
CREATE FLOW test_numbers_df_func
|
CREATE FLOW test_numbers_df_func
|
||||||
SINK TO out_num_cnt_df_func
|
SINK TO out_num_cnt_df_func
|
||||||
AS
|
AS
|
||||||
SELECT sum(abs(number)) FROM numbers_input_df_func GROUP BY tumble(ts, '1 second', '2021-07-01 00:00:00');
|
SELECT sum(abs(number)), date_bin(INTERVAL '1 second', ts, '2021-07-01 00:00:00') as time_window FROM numbers_input_df_func GROUP BY time_window;
|
||||||
|
|
||||||
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
|
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
|
||||||
ADMIN FLUSH_FLOW('test_numbers_df_func');
|
ADMIN FLUSH_FLOW('test_numbers_df_func');
|
||||||
@@ -24,7 +24,7 @@ VALUES
|
|||||||
ADMIN FLUSH_FLOW('test_numbers_df_func');
|
ADMIN FLUSH_FLOW('test_numbers_df_func');
|
||||||
|
|
||||||
-- note that this quote-unquote column is a column-name, **not** a aggregation expr, generated by datafusion
|
-- note that this quote-unquote column is a column-name, **not** a aggregation expr, generated by datafusion
|
||||||
SELECT "sum(abs(numbers_input_df_func.number))", window_start, window_end FROM out_num_cnt_df_func;
|
SELECT "sum(abs(numbers_input_df_func.number))", time_window FROM out_num_cnt_df_func;
|
||||||
|
|
||||||
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
|
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
|
||||||
ADMIN FLUSH_FLOW('test_numbers_df_func');
|
ADMIN FLUSH_FLOW('test_numbers_df_func');
|
||||||
@@ -38,7 +38,7 @@ VALUES
|
|||||||
ADMIN FLUSH_FLOW('test_numbers_df_func');
|
ADMIN FLUSH_FLOW('test_numbers_df_func');
|
||||||
|
|
||||||
-- note that this quote-unquote column is a column-name, **not** a aggregation expr, generated by datafusion
|
-- note that this quote-unquote column is a column-name, **not** a aggregation expr, generated by datafusion
|
||||||
SELECT "sum(abs(numbers_input_df_func.number))", window_start, window_end FROM out_num_cnt_df_func;
|
SELECT "sum(abs(numbers_input_df_func.number))", time_window FROM out_num_cnt_df_func;
|
||||||
|
|
||||||
DROP FLOW test_numbers_df_func;
|
DROP FLOW test_numbers_df_func;
|
||||||
DROP TABLE numbers_input_df_func;
|
DROP TABLE numbers_input_df_func;
|
||||||
@@ -55,7 +55,7 @@ CREATE TABLE numbers_input_df_func (
|
|||||||
CREATE FLOW test_numbers_df_func
|
CREATE FLOW test_numbers_df_func
|
||||||
SINK TO out_num_cnt_df_func
|
SINK TO out_num_cnt_df_func
|
||||||
AS
|
AS
|
||||||
SELECT abs(sum(number)) FROM numbers_input_df_func GROUP BY tumble(ts, '1 second', '2021-07-01 00:00:00');
|
SELECT abs(sum(number)), date_bin(INTERVAL '1 second', ts, '2021-07-01 00:00:00') as time_window FROM numbers_input_df_func GROUP BY time_window;
|
||||||
|
|
||||||
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
|
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
|
||||||
ADMIN FLUSH_FLOW('test_numbers_df_func');
|
ADMIN FLUSH_FLOW('test_numbers_df_func');
|
||||||
@@ -69,7 +69,7 @@ VALUES
|
|||||||
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
|
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
|
||||||
ADMIN FLUSH_FLOW('test_numbers_df_func');
|
ADMIN FLUSH_FLOW('test_numbers_df_func');
|
||||||
|
|
||||||
SELECT "abs(sum(numbers_input_df_func.number))", window_start, window_end FROM out_num_cnt_df_func;
|
SELECT "abs(sum(numbers_input_df_func.number))", time_window FROM out_num_cnt_df_func;
|
||||||
|
|
||||||
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
|
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
|
||||||
ADMIN FLUSH_FLOW('test_numbers_df_func');
|
ADMIN FLUSH_FLOW('test_numbers_df_func');
|
||||||
@@ -82,7 +82,7 @@ VALUES
|
|||||||
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
|
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
|
||||||
ADMIN FLUSH_FLOW('test_numbers_df_func');
|
ADMIN FLUSH_FLOW('test_numbers_df_func');
|
||||||
|
|
||||||
SELECT "abs(sum(numbers_input_df_func.number))", window_start, window_end FROM out_num_cnt_df_func;
|
SELECT "abs(sum(numbers_input_df_func.number))", time_window FROM out_num_cnt_df_func;
|
||||||
|
|
||||||
DROP FLOW test_numbers_df_func;
|
DROP FLOW test_numbers_df_func;
|
||||||
DROP TABLE numbers_input_df_func;
|
DROP TABLE numbers_input_df_func;
|
||||||
|
|||||||
62
tests/cases/standalone/common/flow/flow_flush.result
Normal file
62
tests/cases/standalone/common/flow/flow_flush.result
Normal file
@@ -0,0 +1,62 @@
|
|||||||
|
-- test if flush_flow works and flush old data to flow for compute
|
||||||
|
CREATE TABLE numbers_input_basic (
|
||||||
|
number INT,
|
||||||
|
ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
||||||
|
PRIMARY KEY(number),
|
||||||
|
TIME INDEX(ts)
|
||||||
|
);
|
||||||
|
|
||||||
|
Affected Rows: 0
|
||||||
|
|
||||||
|
INSERT INTO
|
||||||
|
numbers_input_basic
|
||||||
|
VALUES
|
||||||
|
(20, "2021-07-01 00:00:00.200"),
|
||||||
|
(22, "2021-07-01 00:00:00.600");
|
||||||
|
|
||||||
|
Affected Rows: 2
|
||||||
|
|
||||||
|
CREATE FLOW test_numbers_basic SINK TO out_num_cnt_basic AS
|
||||||
|
SELECT
|
||||||
|
sum(number),
|
||||||
|
date_bin(INTERVAL '1 second', ts, '2021-07-01 00:00:00.1') as time_window
|
||||||
|
FROM
|
||||||
|
numbers_input_basic
|
||||||
|
GROUP BY
|
||||||
|
time_window;
|
||||||
|
|
||||||
|
Affected Rows: 0
|
||||||
|
|
||||||
|
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
|
||||||
|
ADMIN FLUSH_FLOW('test_numbers_basic');
|
||||||
|
|
||||||
|
+----------------------------------------+
|
||||||
|
| ADMIN FLUSH_FLOW('test_numbers_basic') |
|
||||||
|
+----------------------------------------+
|
||||||
|
| FLOW_FLUSHED |
|
||||||
|
+----------------------------------------+
|
||||||
|
|
||||||
|
SELECT
|
||||||
|
"sum(numbers_input_basic.number)",
|
||||||
|
time_window
|
||||||
|
FROM
|
||||||
|
out_num_cnt_basic;
|
||||||
|
|
||||||
|
+---------------------------------+-------------------------+
|
||||||
|
| sum(numbers_input_basic.number) | time_window |
|
||||||
|
+---------------------------------+-------------------------+
|
||||||
|
| 42 | 2021-07-01T00:00:00.100 |
|
||||||
|
+---------------------------------+-------------------------+
|
||||||
|
|
||||||
|
DROP FLOW test_numbers_basic;
|
||||||
|
|
||||||
|
Affected Rows: 0
|
||||||
|
|
||||||
|
DROP TABLE numbers_input_basic;
|
||||||
|
|
||||||
|
Affected Rows: 0
|
||||||
|
|
||||||
|
DROP TABLE out_num_cnt_basic;
|
||||||
|
|
||||||
|
Affected Rows: 0
|
||||||
|
|
||||||
37
tests/cases/standalone/common/flow/flow_flush.sql
Normal file
37
tests/cases/standalone/common/flow/flow_flush.sql
Normal file
@@ -0,0 +1,37 @@
|
|||||||
|
-- test if flush_flow works and flush old data to flow for compute
|
||||||
|
CREATE TABLE numbers_input_basic (
|
||||||
|
number INT,
|
||||||
|
ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
||||||
|
PRIMARY KEY(number),
|
||||||
|
TIME INDEX(ts)
|
||||||
|
);
|
||||||
|
|
||||||
|
INSERT INTO
|
||||||
|
numbers_input_basic
|
||||||
|
VALUES
|
||||||
|
(20, "2021-07-01 00:00:00.200"),
|
||||||
|
(22, "2021-07-01 00:00:00.600");
|
||||||
|
|
||||||
|
CREATE FLOW test_numbers_basic SINK TO out_num_cnt_basic AS
|
||||||
|
SELECT
|
||||||
|
sum(number),
|
||||||
|
date_bin(INTERVAL '1 second', ts, '2021-07-01 00:00:00.1') as time_window
|
||||||
|
FROM
|
||||||
|
numbers_input_basic
|
||||||
|
GROUP BY
|
||||||
|
time_window;
|
||||||
|
|
||||||
|
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
|
||||||
|
ADMIN FLUSH_FLOW('test_numbers_basic');
|
||||||
|
|
||||||
|
SELECT
|
||||||
|
"sum(numbers_input_basic.number)",
|
||||||
|
time_window
|
||||||
|
FROM
|
||||||
|
out_num_cnt_basic;
|
||||||
|
|
||||||
|
DROP FLOW test_numbers_basic;
|
||||||
|
|
||||||
|
DROP TABLE numbers_input_basic;
|
||||||
|
|
||||||
|
DROP TABLE out_num_cnt_basic;
|
||||||
@@ -5,6 +5,8 @@ CREATE TABLE requests (
|
|||||||
service_ip STRING,
|
service_ip STRING,
|
||||||
val INT,
|
val INT,
|
||||||
ts TIMESTAMP TIME INDEX
|
ts TIMESTAMP TIME INDEX
|
||||||
|
)WITH(
|
||||||
|
append_mode = 'true'
|
||||||
);
|
);
|
||||||
|
|
||||||
Affected Rows: 0
|
Affected Rows: 0
|
||||||
@@ -93,6 +95,8 @@ CREATE TABLE ngx_access_log (
|
|||||||
client STRING,
|
client STRING,
|
||||||
country STRING,
|
country STRING,
|
||||||
access_time TIMESTAMP TIME INDEX
|
access_time TIMESTAMP TIME INDEX
|
||||||
|
)WITH(
|
||||||
|
append_mode = 'true'
|
||||||
);
|
);
|
||||||
|
|
||||||
Affected Rows: 0
|
Affected Rows: 0
|
||||||
|
|||||||
@@ -6,6 +6,8 @@ CREATE TABLE requests (
|
|||||||
service_ip STRING,
|
service_ip STRING,
|
||||||
val INT,
|
val INT,
|
||||||
ts TIMESTAMP TIME INDEX
|
ts TIMESTAMP TIME INDEX
|
||||||
|
)WITH(
|
||||||
|
append_mode = 'true'
|
||||||
);
|
);
|
||||||
|
|
||||||
CREATE TABLE sum_val_in_reqs (
|
CREATE TABLE sum_val_in_reqs (
|
||||||
@@ -59,6 +61,8 @@ CREATE TABLE ngx_access_log (
|
|||||||
client STRING,
|
client STRING,
|
||||||
country STRING,
|
country STRING,
|
||||||
access_time TIMESTAMP TIME INDEX
|
access_time TIMESTAMP TIME INDEX
|
||||||
|
)WITH(
|
||||||
|
append_mode = 'true'
|
||||||
);
|
);
|
||||||
|
|
||||||
CREATE FLOW calc_ngx_country SINK TO ngx_country AS
|
CREATE FLOW calc_ngx_country SINK TO ngx_country AS
|
||||||
|
|||||||
@@ -3,6 +3,8 @@ CREATE TABLE input_basic (
|
|||||||
ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
||||||
PRIMARY KEY(number),
|
PRIMARY KEY(number),
|
||||||
TIME INDEX(ts)
|
TIME INDEX(ts)
|
||||||
|
)WITH(
|
||||||
|
append_mode = 'true'
|
||||||
);
|
);
|
||||||
|
|
||||||
Affected Rows: 0
|
Affected Rows: 0
|
||||||
@@ -166,7 +168,7 @@ ADMIN FLUSH_FLOW('test_wildcard_basic');
|
|||||||
| FLOW_FLUSHED |
|
| FLOW_FLUSHED |
|
||||||
+-----------------------------------------+
|
+-----------------------------------------+
|
||||||
|
|
||||||
-- 3 is also expected, since flow don't have persisent state
|
-- flow batching mode
|
||||||
SELECT wildcard FROM out_basic;
|
SELECT wildcard FROM out_basic;
|
||||||
|
|
||||||
+----------+
|
+----------+
|
||||||
@@ -175,6 +177,14 @@ SELECT wildcard FROM out_basic;
|
|||||||
| 3 |
|
| 3 |
|
||||||
+----------+
|
+----------+
|
||||||
|
|
||||||
|
SELECT count(*) FROM input_basic;
|
||||||
|
|
||||||
|
+----------+
|
||||||
|
| count(*) |
|
||||||
|
+----------+
|
||||||
|
| 3 |
|
||||||
|
+----------+
|
||||||
|
|
||||||
DROP TABLE input_basic;
|
DROP TABLE input_basic;
|
||||||
|
|
||||||
Affected Rows: 0
|
Affected Rows: 0
|
||||||
@@ -302,6 +312,15 @@ FROM
|
|||||||
Affected Rows: 0
|
Affected Rows: 0
|
||||||
|
|
||||||
-- SQLNESS ARG restart=true
|
-- SQLNESS ARG restart=true
|
||||||
|
SELECT 1;
|
||||||
|
|
||||||
|
+----------+
|
||||||
|
| Int64(1) |
|
||||||
|
+----------+
|
||||||
|
| 1 |
|
||||||
|
+----------+
|
||||||
|
|
||||||
|
-- SQLNESS SLEEP 3s
|
||||||
INSERT INTO
|
INSERT INTO
|
||||||
input_basic
|
input_basic
|
||||||
VALUES
|
VALUES
|
||||||
@@ -310,6 +329,8 @@ VALUES
|
|||||||
|
|
||||||
Affected Rows: 2
|
Affected Rows: 2
|
||||||
|
|
||||||
|
-- give flownode a second to rebuild flow
|
||||||
|
-- SQLNESS SLEEP 3s
|
||||||
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
|
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
|
||||||
ADMIN FLUSH_FLOW('test_wildcard_basic');
|
ADMIN FLUSH_FLOW('test_wildcard_basic');
|
||||||
|
|
||||||
@@ -358,6 +379,15 @@ FROM
|
|||||||
Affected Rows: 0
|
Affected Rows: 0
|
||||||
|
|
||||||
-- SQLNESS ARG restart=true
|
-- SQLNESS ARG restart=true
|
||||||
|
SELECT 1;
|
||||||
|
|
||||||
|
+----------+
|
||||||
|
| Int64(1) |
|
||||||
|
+----------+
|
||||||
|
| 1 |
|
||||||
|
+----------+
|
||||||
|
|
||||||
|
-- SQLNESS SLEEP 3s
|
||||||
INSERT INTO
|
INSERT INTO
|
||||||
input_basic
|
input_basic
|
||||||
VALUES
|
VALUES
|
||||||
@@ -366,6 +396,8 @@ VALUES
|
|||||||
|
|
||||||
Affected Rows: 2
|
Affected Rows: 2
|
||||||
|
|
||||||
|
-- give flownode a second to rebuild flow
|
||||||
|
-- SQLNESS SLEEP 3s
|
||||||
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
|
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
|
||||||
ADMIN FLUSH_FLOW('test_wildcard_basic');
|
ADMIN FLUSH_FLOW('test_wildcard_basic');
|
||||||
|
|
||||||
@@ -397,6 +429,15 @@ CREATE TABLE input_basic (
|
|||||||
Affected Rows: 0
|
Affected Rows: 0
|
||||||
|
|
||||||
-- SQLNESS ARG restart=true
|
-- SQLNESS ARG restart=true
|
||||||
|
SELECT 1;
|
||||||
|
|
||||||
|
+----------+
|
||||||
|
| Int64(1) |
|
||||||
|
+----------+
|
||||||
|
| 1 |
|
||||||
|
+----------+
|
||||||
|
|
||||||
|
-- SQLNESS SLEEP 3s
|
||||||
INSERT INTO
|
INSERT INTO
|
||||||
input_basic
|
input_basic
|
||||||
VALUES
|
VALUES
|
||||||
@@ -406,6 +447,8 @@ VALUES
|
|||||||
|
|
||||||
Affected Rows: 3
|
Affected Rows: 3
|
||||||
|
|
||||||
|
-- give flownode a second to rebuild flow
|
||||||
|
-- SQLNESS SLEEP 3s
|
||||||
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
|
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
|
||||||
ADMIN FLUSH_FLOW('test_wildcard_basic');
|
ADMIN FLUSH_FLOW('test_wildcard_basic');
|
||||||
|
|
||||||
@@ -438,7 +481,17 @@ FROM
|
|||||||
|
|
||||||
Affected Rows: 0
|
Affected Rows: 0
|
||||||
|
|
||||||
|
-- give flownode a second to rebuild flow
|
||||||
-- SQLNESS ARG restart=true
|
-- SQLNESS ARG restart=true
|
||||||
|
SELECT 1;
|
||||||
|
|
||||||
|
+----------+
|
||||||
|
| Int64(1) |
|
||||||
|
+----------+
|
||||||
|
| 1 |
|
||||||
|
+----------+
|
||||||
|
|
||||||
|
-- SQLNESS SLEEP 3s
|
||||||
INSERT INTO
|
INSERT INTO
|
||||||
input_basic
|
input_basic
|
||||||
VALUES
|
VALUES
|
||||||
@@ -457,13 +510,21 @@ ADMIN FLUSH_FLOW('test_wildcard_basic');
|
|||||||
| FLOW_FLUSHED |
|
| FLOW_FLUSHED |
|
||||||
+-----------------------------------------+
|
+-----------------------------------------+
|
||||||
|
|
||||||
-- 3 is also expected, since flow don't have persisent state
|
-- 4 is also expected, since flow batching mode
|
||||||
SELECT wildcard FROM out_basic;
|
SELECT wildcard FROM out_basic;
|
||||||
|
|
||||||
+----------+
|
+----------+
|
||||||
| wildcard |
|
| wildcard |
|
||||||
+----------+
|
+----------+
|
||||||
| 3 |
|
| 4 |
|
||||||
|
+----------+
|
||||||
|
|
||||||
|
SELECT count(*) FROM input_basic;
|
||||||
|
|
||||||
|
+----------+
|
||||||
|
| count(*) |
|
||||||
|
+----------+
|
||||||
|
| 4 |
|
||||||
+----------+
|
+----------+
|
||||||
|
|
||||||
DROP TABLE input_basic;
|
DROP TABLE input_basic;
|
||||||
@@ -496,6 +557,15 @@ FROM
|
|||||||
Affected Rows: 0
|
Affected Rows: 0
|
||||||
|
|
||||||
-- SQLNESS ARG restart=true
|
-- SQLNESS ARG restart=true
|
||||||
|
SELECT 1;
|
||||||
|
|
||||||
|
+----------+
|
||||||
|
| Int64(1) |
|
||||||
|
+----------+
|
||||||
|
| 1 |
|
||||||
|
+----------+
|
||||||
|
|
||||||
|
-- SQLNESS SLEEP 3s
|
||||||
INSERT INTO
|
INSERT INTO
|
||||||
input_basic
|
input_basic
|
||||||
VALUES
|
VALUES
|
||||||
@@ -504,6 +574,8 @@ VALUES
|
|||||||
|
|
||||||
Affected Rows: 2
|
Affected Rows: 2
|
||||||
|
|
||||||
|
-- give flownode a second to rebuild flow
|
||||||
|
-- SQLNESS SLEEP 3s
|
||||||
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
|
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
|
||||||
ADMIN FLUSH_FLOW('test_wildcard_basic');
|
ADMIN FLUSH_FLOW('test_wildcard_basic');
|
||||||
|
|
||||||
@@ -538,6 +610,15 @@ FROM
|
|||||||
Affected Rows: 0
|
Affected Rows: 0
|
||||||
|
|
||||||
-- SQLNESS ARG restart=true
|
-- SQLNESS ARG restart=true
|
||||||
|
SELECT 1;
|
||||||
|
|
||||||
|
+----------+
|
||||||
|
| Int64(1) |
|
||||||
|
+----------+
|
||||||
|
| 1 |
|
||||||
|
+----------+
|
||||||
|
|
||||||
|
-- SQLNESS SLEEP 3s
|
||||||
INSERT INTO
|
INSERT INTO
|
||||||
input_basic
|
input_basic
|
||||||
VALUES
|
VALUES
|
||||||
@@ -547,6 +628,7 @@ VALUES
|
|||||||
|
|
||||||
Affected Rows: 3
|
Affected Rows: 3
|
||||||
|
|
||||||
|
-- give flownode a second to rebuild flow
|
||||||
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
|
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
|
||||||
ADMIN FLUSH_FLOW('test_wildcard_basic');
|
ADMIN FLUSH_FLOW('test_wildcard_basic');
|
||||||
|
|
||||||
|
|||||||
@@ -3,6 +3,8 @@ CREATE TABLE input_basic (
|
|||||||
ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
||||||
PRIMARY KEY(number),
|
PRIMARY KEY(number),
|
||||||
TIME INDEX(ts)
|
TIME INDEX(ts)
|
||||||
|
)WITH(
|
||||||
|
append_mode = 'true'
|
||||||
);
|
);
|
||||||
|
|
||||||
CREATE FLOW test_wildcard_basic sink TO out_basic AS
|
CREATE FLOW test_wildcard_basic sink TO out_basic AS
|
||||||
@@ -95,9 +97,11 @@ VALUES
|
|||||||
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
|
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
|
||||||
ADMIN FLUSH_FLOW('test_wildcard_basic');
|
ADMIN FLUSH_FLOW('test_wildcard_basic');
|
||||||
|
|
||||||
-- 3 is also expected, since flow don't have persisent state
|
-- flow batching mode
|
||||||
SELECT wildcard FROM out_basic;
|
SELECT wildcard FROM out_basic;
|
||||||
|
|
||||||
|
SELECT count(*) FROM input_basic;
|
||||||
|
|
||||||
DROP TABLE input_basic;
|
DROP TABLE input_basic;
|
||||||
DROP FLOW test_wildcard_basic;
|
DROP FLOW test_wildcard_basic;
|
||||||
DROP TABLE out_basic;
|
DROP TABLE out_basic;
|
||||||
@@ -168,12 +172,17 @@ FROM
|
|||||||
input_basic;
|
input_basic;
|
||||||
|
|
||||||
-- SQLNESS ARG restart=true
|
-- SQLNESS ARG restart=true
|
||||||
|
SELECT 1;
|
||||||
|
|
||||||
|
-- SQLNESS SLEEP 3s
|
||||||
INSERT INTO
|
INSERT INTO
|
||||||
input_basic
|
input_basic
|
||||||
VALUES
|
VALUES
|
||||||
(23, "2021-07-01 00:00:01.000"),
|
(23, "2021-07-01 00:00:01.000"),
|
||||||
(24, "2021-07-01 00:00:01.500");
|
(24, "2021-07-01 00:00:01.500");
|
||||||
|
|
||||||
|
-- give flownode a second to rebuild flow
|
||||||
|
-- SQLNESS SLEEP 3s
|
||||||
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
|
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
|
||||||
ADMIN FLUSH_FLOW('test_wildcard_basic');
|
ADMIN FLUSH_FLOW('test_wildcard_basic');
|
||||||
|
|
||||||
@@ -201,12 +210,17 @@ FROM
|
|||||||
input_basic;
|
input_basic;
|
||||||
|
|
||||||
-- SQLNESS ARG restart=true
|
-- SQLNESS ARG restart=true
|
||||||
|
SELECT 1;
|
||||||
|
|
||||||
|
-- SQLNESS SLEEP 3s
|
||||||
INSERT INTO
|
INSERT INTO
|
||||||
input_basic
|
input_basic
|
||||||
VALUES
|
VALUES
|
||||||
(23, "2021-07-01 00:00:01.000"),
|
(23, "2021-07-01 00:00:01.000"),
|
||||||
(24, "2021-07-01 00:00:01.500");
|
(24, "2021-07-01 00:00:01.500");
|
||||||
|
|
||||||
|
-- give flownode a second to rebuild flow
|
||||||
|
-- SQLNESS SLEEP 3s
|
||||||
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
|
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
|
||||||
ADMIN FLUSH_FLOW('test_wildcard_basic');
|
ADMIN FLUSH_FLOW('test_wildcard_basic');
|
||||||
|
|
||||||
@@ -222,6 +236,9 @@ CREATE TABLE input_basic (
|
|||||||
);
|
);
|
||||||
|
|
||||||
-- SQLNESS ARG restart=true
|
-- SQLNESS ARG restart=true
|
||||||
|
SELECT 1;
|
||||||
|
|
||||||
|
-- SQLNESS SLEEP 3s
|
||||||
INSERT INTO
|
INSERT INTO
|
||||||
input_basic
|
input_basic
|
||||||
VALUES
|
VALUES
|
||||||
@@ -229,6 +246,8 @@ VALUES
|
|||||||
(24, "2021-07-01 00:00:01.500"),
|
(24, "2021-07-01 00:00:01.500"),
|
||||||
(26, "2021-07-01 00:00:02.000");
|
(26, "2021-07-01 00:00:02.000");
|
||||||
|
|
||||||
|
-- give flownode a second to rebuild flow
|
||||||
|
-- SQLNESS SLEEP 3s
|
||||||
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
|
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
|
||||||
ADMIN FLUSH_FLOW('test_wildcard_basic');
|
ADMIN FLUSH_FLOW('test_wildcard_basic');
|
||||||
|
|
||||||
@@ -245,7 +264,11 @@ SELECT
|
|||||||
FROM
|
FROM
|
||||||
input_basic;
|
input_basic;
|
||||||
|
|
||||||
|
-- give flownode a second to rebuild flow
|
||||||
-- SQLNESS ARG restart=true
|
-- SQLNESS ARG restart=true
|
||||||
|
SELECT 1;
|
||||||
|
|
||||||
|
-- SQLNESS SLEEP 3s
|
||||||
INSERT INTO
|
INSERT INTO
|
||||||
input_basic
|
input_basic
|
||||||
VALUES
|
VALUES
|
||||||
@@ -256,9 +279,11 @@ VALUES
|
|||||||
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
|
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
|
||||||
ADMIN FLUSH_FLOW('test_wildcard_basic');
|
ADMIN FLUSH_FLOW('test_wildcard_basic');
|
||||||
|
|
||||||
-- 3 is also expected, since flow don't have persisent state
|
-- 4 is also expected, since flow batching mode
|
||||||
SELECT wildcard FROM out_basic;
|
SELECT wildcard FROM out_basic;
|
||||||
|
|
||||||
|
SELECT count(*) FROM input_basic;
|
||||||
|
|
||||||
DROP TABLE input_basic;
|
DROP TABLE input_basic;
|
||||||
DROP FLOW test_wildcard_basic;
|
DROP FLOW test_wildcard_basic;
|
||||||
DROP TABLE out_basic;
|
DROP TABLE out_basic;
|
||||||
@@ -277,13 +302,17 @@ FROM
|
|||||||
input_basic;
|
input_basic;
|
||||||
|
|
||||||
-- SQLNESS ARG restart=true
|
-- SQLNESS ARG restart=true
|
||||||
|
SELECT 1;
|
||||||
|
|
||||||
|
-- SQLNESS SLEEP 3s
|
||||||
INSERT INTO
|
INSERT INTO
|
||||||
input_basic
|
input_basic
|
||||||
VALUES
|
VALUES
|
||||||
(23, "2021-07-01 00:00:01.000"),
|
(23, "2021-07-01 00:00:01.000"),
|
||||||
(24, "2021-07-01 00:00:01.500");
|
(24, "2021-07-01 00:00:01.500");
|
||||||
|
|
||||||
|
-- give flownode a second to rebuild flow
|
||||||
|
-- SQLNESS SLEEP 3s
|
||||||
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
|
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
|
||||||
ADMIN FLUSH_FLOW('test_wildcard_basic');
|
ADMIN FLUSH_FLOW('test_wildcard_basic');
|
||||||
|
|
||||||
@@ -300,6 +329,9 @@ FROM
|
|||||||
input_basic;
|
input_basic;
|
||||||
|
|
||||||
-- SQLNESS ARG restart=true
|
-- SQLNESS ARG restart=true
|
||||||
|
SELECT 1;
|
||||||
|
|
||||||
|
-- SQLNESS SLEEP 3s
|
||||||
INSERT INTO
|
INSERT INTO
|
||||||
input_basic
|
input_basic
|
||||||
VALUES
|
VALUES
|
||||||
@@ -307,6 +339,7 @@ VALUES
|
|||||||
(24, "2021-07-01 00:00:01.500"),
|
(24, "2021-07-01 00:00:01.500"),
|
||||||
(25, "2021-07-01 00:00:01.700");
|
(25, "2021-07-01 00:00:01.700");
|
||||||
|
|
||||||
|
-- give flownode a second to rebuild flow
|
||||||
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
|
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
|
||||||
ADMIN FLUSH_FLOW('test_wildcard_basic');
|
ADMIN FLUSH_FLOW('test_wildcard_basic');
|
||||||
|
|
||||||
|
|||||||
@@ -397,7 +397,7 @@ CREATE TABLE temp_alerts (
|
|||||||
sensor_id INT,
|
sensor_id INT,
|
||||||
loc STRING,
|
loc STRING,
|
||||||
max_temp DOUBLE,
|
max_temp DOUBLE,
|
||||||
update_at TIMESTAMP TIME INDEX,
|
event_ts TIMESTAMP TIME INDEX,
|
||||||
PRIMARY KEY(sensor_id, loc)
|
PRIMARY KEY(sensor_id, loc)
|
||||||
);
|
);
|
||||||
|
|
||||||
@@ -408,6 +408,7 @@ SELECT
|
|||||||
sensor_id,
|
sensor_id,
|
||||||
loc,
|
loc,
|
||||||
max(temperature) as max_temp,
|
max(temperature) as max_temp,
|
||||||
|
max(ts) as event_ts,
|
||||||
FROM
|
FROM
|
||||||
temp_sensor_data
|
temp_sensor_data
|
||||||
GROUP BY
|
GROUP BY
|
||||||
@@ -438,7 +439,8 @@ ADMIN FLUSH_FLOW('temp_monitoring');
|
|||||||
SELECT
|
SELECT
|
||||||
sensor_id,
|
sensor_id,
|
||||||
loc,
|
loc,
|
||||||
max_temp
|
max_temp,
|
||||||
|
event_ts
|
||||||
FROM
|
FROM
|
||||||
temp_alerts;
|
temp_alerts;
|
||||||
|
|
||||||
@@ -466,16 +468,17 @@ ADMIN FLUSH_FLOW('temp_monitoring');
|
|||||||
SELECT
|
SELECT
|
||||||
sensor_id,
|
sensor_id,
|
||||||
loc,
|
loc,
|
||||||
max_temp
|
max_temp,
|
||||||
|
event_ts
|
||||||
FROM
|
FROM
|
||||||
temp_alerts;
|
temp_alerts;
|
||||||
|
|
||||||
+-----------+-------+----------+
|
+-----------+-------+----------+---------------------+
|
||||||
| sensor_id | loc | max_temp |
|
| sensor_id | loc | max_temp | event_ts |
|
||||||
+-----------+-------+----------+
|
+-----------+-------+----------+---------------------+
|
||||||
| 1 | room1 | 101.5 |
|
| 1 | room1 | 101.5 | 2022-01-01T00:00:02 |
|
||||||
| 2 | room2 | 102.5 |
|
| 2 | room2 | 102.5 | 2022-01-01T00:00:03 |
|
||||||
+-----------+-------+----------+
|
+-----------+-------+----------+---------------------+
|
||||||
|
|
||||||
DROP FLOW temp_monitoring;
|
DROP FLOW temp_monitoring;
|
||||||
|
|
||||||
|
|||||||
@@ -291,7 +291,7 @@ CREATE TABLE temp_alerts (
|
|||||||
sensor_id INT,
|
sensor_id INT,
|
||||||
loc STRING,
|
loc STRING,
|
||||||
max_temp DOUBLE,
|
max_temp DOUBLE,
|
||||||
update_at TIMESTAMP TIME INDEX,
|
event_ts TIMESTAMP TIME INDEX,
|
||||||
PRIMARY KEY(sensor_id, loc)
|
PRIMARY KEY(sensor_id, loc)
|
||||||
);
|
);
|
||||||
|
|
||||||
@@ -300,6 +300,7 @@ SELECT
|
|||||||
sensor_id,
|
sensor_id,
|
||||||
loc,
|
loc,
|
||||||
max(temperature) as max_temp,
|
max(temperature) as max_temp,
|
||||||
|
max(ts) as event_ts,
|
||||||
FROM
|
FROM
|
||||||
temp_sensor_data
|
temp_sensor_data
|
||||||
GROUP BY
|
GROUP BY
|
||||||
@@ -320,7 +321,8 @@ ADMIN FLUSH_FLOW('temp_monitoring');
|
|||||||
SELECT
|
SELECT
|
||||||
sensor_id,
|
sensor_id,
|
||||||
loc,
|
loc,
|
||||||
max_temp
|
max_temp,
|
||||||
|
event_ts
|
||||||
FROM
|
FROM
|
||||||
temp_alerts;
|
temp_alerts;
|
||||||
|
|
||||||
@@ -337,7 +339,8 @@ ADMIN FLUSH_FLOW('temp_monitoring');
|
|||||||
SELECT
|
SELECT
|
||||||
sensor_id,
|
sensor_id,
|
||||||
loc,
|
loc,
|
||||||
max_temp
|
max_temp,
|
||||||
|
event_ts
|
||||||
FROM
|
FROM
|
||||||
temp_alerts;
|
temp_alerts;
|
||||||
|
|
||||||
|
|||||||
@@ -72,12 +72,13 @@ INSERT INTO ngx_access_log VALUES ('192.168.1.1', 'GET', '/index.html', 200, 512
|
|||||||
|
|
||||||
Affected Rows: 4
|
Affected Rows: 4
|
||||||
|
|
||||||
|
-- TODO(discord9): fix flow stat update for batching mode flow
|
||||||
SELECT created_time < last_execution_time, created_time IS NOT NULL, last_execution_time IS NOT NULL, source_table_names FROM information_schema.flows WHERE flow_name = 'user_agent_flow';
|
SELECT created_time < last_execution_time, created_time IS NOT NULL, last_execution_time IS NOT NULL, source_table_names FROM information_schema.flows WHERE flow_name = 'user_agent_flow';
|
||||||
|
|
||||||
+--------------------------------------------------------------------------------------+---------------------------------------------------+----------------------------------------------------------+--------------------------------+
|
+--------------------------------------------------------------------------------------+---------------------------------------------------+----------------------------------------------------------+--------------------------------+
|
||||||
| information_schema.flows.created_time < information_schema.flows.last_execution_time | information_schema.flows.created_time IS NOT NULL | information_schema.flows.last_execution_time IS NOT NULL | source_table_names |
|
| information_schema.flows.created_time < information_schema.flows.last_execution_time | information_schema.flows.created_time IS NOT NULL | information_schema.flows.last_execution_time IS NOT NULL | source_table_names |
|
||||||
+--------------------------------------------------------------------------------------+---------------------------------------------------+----------------------------------------------------------+--------------------------------+
|
+--------------------------------------------------------------------------------------+---------------------------------------------------+----------------------------------------------------------+--------------------------------+
|
||||||
| true | true | true | greptime.public.ngx_access_log |
|
| | true | false | greptime.public.ngx_access_log |
|
||||||
+--------------------------------------------------------------------------------------+---------------------------------------------------+----------------------------------------------------------+--------------------------------+
|
+--------------------------------------------------------------------------------------+---------------------------------------------------+----------------------------------------------------------+--------------------------------+
|
||||||
|
|
||||||
DROP TABLE ngx_access_log;
|
DROP TABLE ngx_access_log;
|
||||||
|
|||||||
@@ -32,6 +32,7 @@ INSERT INTO ngx_access_log VALUES ('192.168.1.1', 'GET', '/index.html', 200, 512
|
|||||||
-- SQLNESS SLEEP 10s
|
-- SQLNESS SLEEP 10s
|
||||||
INSERT INTO ngx_access_log VALUES ('192.168.1.1', 'GET', '/index.html', 200, 512, 'Mozilla/5.0', 1024, '2023-10-01T10:00:00Z'), ('192.168.1.2', 'POST', '/submit', 201, 256, 'curl/7.68.0', 512, '2023-10-01T10:01:00Z'), ('192.168.1.1', 'GET', '/about.html', 200, 128, 'Mozilla/5.0', 256, '2023-10-01T10:02:00Z'), ('192.168.1.3', 'GET', '/contact', 404, 64, 'curl/7.68.0', 128, '2023-10-01T10:03:00Z');
|
INSERT INTO ngx_access_log VALUES ('192.168.1.1', 'GET', '/index.html', 200, 512, 'Mozilla/5.0', 1024, '2023-10-01T10:00:00Z'), ('192.168.1.2', 'POST', '/submit', 201, 256, 'curl/7.68.0', 512, '2023-10-01T10:01:00Z'), ('192.168.1.1', 'GET', '/about.html', 200, 128, 'Mozilla/5.0', 256, '2023-10-01T10:02:00Z'), ('192.168.1.3', 'GET', '/contact', 404, 64, 'curl/7.68.0', 128, '2023-10-01T10:03:00Z');
|
||||||
|
|
||||||
|
-- TODO(discord9): fix flow stat update for batching mode flow
|
||||||
SELECT created_time < last_execution_time, created_time IS NOT NULL, last_execution_time IS NOT NULL, source_table_names FROM information_schema.flows WHERE flow_name = 'user_agent_flow';
|
SELECT created_time < last_execution_time, created_time IS NOT NULL, last_execution_time IS NOT NULL, source_table_names FROM information_schema.flows WHERE flow_name = 'user_agent_flow';
|
||||||
|
|
||||||
DROP TABLE ngx_access_log;
|
DROP TABLE ngx_access_log;
|
||||||
|
|||||||
@@ -1,3 +1,3 @@
|
|||||||
[grpc]
|
[grpc]
|
||||||
bind_addr = "127.0.0.1:29401"
|
bind_addr = "{grpc_addr}"
|
||||||
server_addr = "127.0.0.1:29401"
|
server_addr = "{grpc_addr}"
|
||||||
|
|||||||
@@ -389,6 +389,9 @@ impl ServerMode {
|
|||||||
format!("--metasrv-addrs={metasrv_addr}"),
|
format!("--metasrv-addrs={metasrv_addr}"),
|
||||||
format!("--http-addr={http_addr}"),
|
format!("--http-addr={http_addr}"),
|
||||||
format!("--rpc-addr={rpc_bind_addr}"),
|
format!("--rpc-addr={rpc_bind_addr}"),
|
||||||
|
// since sqlness run on local, bind addr is the same as server addr
|
||||||
|
// this is needed so that `cluster_info`'s server addr column can be correct
|
||||||
|
format!("--rpc-server-addr={rpc_bind_addr}"),
|
||||||
format!("--mysql-addr={mysql_addr}"),
|
format!("--mysql-addr={mysql_addr}"),
|
||||||
format!("--postgres-addr={postgres_addr}"),
|
format!("--postgres-addr={postgres_addr}"),
|
||||||
format!(
|
format!(
|
||||||
|
|||||||
Reference in New Issue
Block a user