feat: use flow batching engine

broken: try using logical plan

fix: use dummy catalog for logical plan

fix: insert plan exec&sqlness grpc addr

feat: use frontend instance in flownode in standalone

feat: flow type in metasrv&fix: flush flow out of sync& column name alias

tests: sqlness update

tests: sqlness flow rebuild udpate

chore: per review

refactor: keep chnl mgr

refactor: use catalog mgr for get table

tests: use valid sql

fix: add more check

refactor: put flow type determine to frontend
This commit is contained in:
discord9
2025-04-11 16:12:18 +08:00
parent 9fb0487e67
commit a7da9af5de
47 changed files with 1691 additions and 427 deletions

3
Cargo.lock generated
View File

@@ -4505,6 +4505,7 @@ dependencies = [
"arc-swap",
"async-trait",
"auth",
"bytes",
"cache",
"catalog",
"client",
@@ -4943,7 +4944,7 @@ dependencies = [
[[package]]
name = "greptime-proto"
version = "0.1.0"
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=b6d9cffd43c4e6358805a798f17e03e232994b82#b6d9cffd43c4e6358805a798f17e03e232994b82"
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=e8fce283e78186dca9ed4990a7535c4b38633370#e8fce283e78186dca9ed4990a7535c4b38633370"
dependencies = [
"prost 0.13.5",
"serde",

View File

@@ -129,7 +129,7 @@ etcd-client = "0.14"
fst = "0.4.7"
futures = "0.3"
futures-util = "0.3"
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "b6d9cffd43c4e6358805a798f17e03e232994b82" }
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "e8fce283e78186dca9ed4990a7535c4b38633370" }
hex = "0.4"
http = "1"
humantime = "2.1"

View File

@@ -514,6 +514,7 @@ fn query_request_type(request: &QueryRequest) -> &'static str {
Some(Query::Sql(_)) => "query.sql",
Some(Query::LogicalPlan(_)) => "query.logical_plan",
Some(Query::PromRangeQuery(_)) => "query.prom_range",
Some(Query::InsertIntoPlan(_)) => "query.insert_into_plan",
None => "query.empty",
}
}

View File

@@ -27,7 +27,7 @@ use session::context::QueryContextRef;
use snafu::{ensure, OptionExt, ResultExt};
use table::metadata::TableType;
use table::table::adapter::DfTableProviderAdapter;
mod dummy_catalog;
pub mod dummy_catalog;
use dummy_catalog::DummyCatalogList;
use table::TableRef;

View File

@@ -345,7 +345,7 @@ impl StartCommand {
let client = Arc::new(NodeClients::new(channel_config));
let invoker = FrontendInvoker::build_from(
flownode.flow_worker_manager().clone(),
flownode.flow_engine().streaming_engine(),
catalog_manager.clone(),
cached_meta_backend.clone(),
layered_cache_registry.clone(),
@@ -355,7 +355,8 @@ impl StartCommand {
.await
.context(StartFlownodeSnafu)?;
flownode
.flow_worker_manager()
.flow_engine()
.streaming_engine()
.set_frontend_invoker(invoker)
.await;

View File

@@ -57,7 +57,7 @@ use datanode::region_server::RegionServer;
use file_engine::config::EngineConfig as FileEngineConfig;
use flow::{
FlowConfig, FlowWorkerManager, FlownodeBuilder, FlownodeInstance, FlownodeOptions,
FrontendClient, FrontendInvoker,
FrontendClient, FrontendInvoker, GrpcQueryHandlerWithBoxedError,
};
use frontend::frontend::{Frontend, FrontendOptions};
use frontend::instance::builder::FrontendBuilder;
@@ -527,14 +527,15 @@ impl StartCommand {
// TODO(discord9): for standalone not use grpc, but just somehow get a handler to frontend grpc client without
// actually make a connection
let fe_server_addr = fe_opts.grpc.bind_addr.clone();
let frontend_client = FrontendClient::from_static_grpc_addr(fe_server_addr);
let (frontend_client, frontend_instance_handler) =
FrontendClient::from_empty_grpc_handler();
let flow_builder = FlownodeBuilder::new(
flownode_options,
plugins.clone(),
table_metadata_manager.clone(),
catalog_manager.clone(),
flow_metadata_manager.clone(),
Arc::new(frontend_client),
Arc::new(frontend_client.clone()),
);
let flownode = flow_builder
.build()
@@ -544,15 +545,15 @@ impl StartCommand {
// set the ref to query for the local flow state
{
let flow_worker_manager = flownode.flow_worker_manager();
let flow_worker_manager = flownode.flow_engine().streaming_engine();
information_extension
.set_flow_worker_manager(flow_worker_manager.clone())
.set_flow_worker_manager(flow_worker_manager)
.await;
}
let node_manager = Arc::new(StandaloneDatanodeManager {
region_server: datanode.region_server(),
flow_server: flownode.flow_worker_manager(),
flow_server: flownode.flow_engine(),
});
let table_id_sequence = Arc::new(
@@ -606,7 +607,16 @@ impl StartCommand {
.context(error::StartFrontendSnafu)?;
let fe_instance = Arc::new(fe_instance);
let flow_worker_manager = flownode.flow_worker_manager();
// set the frontend client for flownode
let grpc_handler = fe_instance.clone() as Arc<dyn GrpcQueryHandlerWithBoxedError>;
let weak_grpc_handler = Arc::downgrade(&grpc_handler);
frontend_instance_handler
.lock()
.unwrap()
.replace(weak_grpc_handler);
// set the frontend invoker for flownode
let flow_worker_manager = flownode.flow_engine().streaming_engine();
// flow server need to be able to use frontend to write insert requests back
let invoker = FrontendInvoker::build_from(
flow_worker_manager.clone(),

View File

@@ -38,7 +38,7 @@ use table::metadata::TableId;
use crate::cache_invalidator::Context;
use crate::ddl::utils::{add_peer_context_if_needed, handle_retry_error};
use crate::ddl::DdlContext;
use crate::error::{self, Result};
use crate::error::{self, Result, UnexpectedSnafu};
use crate::instruction::{CacheIdent, CreateFlow};
use crate::key::flow::flow_info::FlowInfoValue;
use crate::key::flow::flow_route::FlowRouteValue;
@@ -171,7 +171,7 @@ impl CreateFlowProcedure {
}
self.data.state = CreateFlowState::CreateFlows;
// determine flow type
self.data.flow_type = Some(determine_flow_type(&self.data.task));
self.data.flow_type = Some(get_flow_type_from_options(&self.data.task)?);
Ok(Status::executing(true))
}
@@ -196,8 +196,8 @@ impl CreateFlowProcedure {
});
}
info!(
"Creating flow({:?}) on flownodes with peers={:?}",
self.data.flow_id, self.data.peers
"Creating flow({:?}, type={:?}) on flownodes with peers={:?}",
self.data.flow_id, self.data.flow_type, self.data.peers
);
join_all(create_flow)
.await
@@ -306,8 +306,20 @@ impl Procedure for CreateFlowProcedure {
}
}
pub fn determine_flow_type(_flow_task: &CreateFlowTask) -> FlowType {
FlowType::Batching
pub fn get_flow_type_from_options(flow_task: &CreateFlowTask) -> Result<FlowType> {
let flow_type = flow_task
.flow_options
.get(FlowType::FLOW_TYPE_KEY)
.map(|s| s.as_str());
match flow_type {
Some(FlowType::BATCHING) => Ok(FlowType::Batching),
Some(FlowType::STREAMING) => Ok(FlowType::Streaming),
Some(unknown) => UnexpectedSnafu {
err_msg: format!("Unknown flow type: {}", unknown),
}
.fail(),
None => Ok(FlowType::Batching),
}
}
/// The state of [CreateFlowProcedure].

View File

@@ -46,7 +46,7 @@ pub(crate) fn test_create_flow_task(
create_if_not_exists,
expire_after: Some(300),
comment: "".to_string(),
sql: "raw_sql".to_string(),
sql: "select 1".to_string(),
flow_options: Default::default(),
}
}

View File

@@ -18,16 +18,19 @@ mod udaf;
use std::sync::Arc;
use api::v1::TableName;
use datafusion::catalog::CatalogProviderList;
use datafusion::error::Result as DatafusionResult;
use datafusion::logical_expr::{LogicalPlan, LogicalPlanBuilder};
use datafusion_common::Column;
use datafusion_expr::col;
use datafusion_common::{Column, TableReference};
use datafusion_expr::dml::InsertOp;
use datafusion_expr::{col, DmlStatement, WriteOp};
pub use expr::{build_filter_from_timestamp, build_same_type_ts_filter};
use snafu::ResultExt;
pub use self::accumulator::{Accumulator, AggregateFunctionCreator, AggregateFunctionCreatorRef};
pub use self::udaf::AggregateFunction;
use crate::error::Result;
use crate::error::{GeneralDataFusionSnafu, Result};
use crate::logical_plan::accumulator::*;
use crate::signature::{Signature, Volatility};
@@ -79,6 +82,71 @@ pub fn rename_logical_plan_columns(
LogicalPlanBuilder::from(plan).project(projection)?.build()
}
/// Convert a insert into logical plan to an (table_name, logical_plan)
/// where table_name is the name of the table to insert into.
/// logical_plan is the plan to be executed.
///
/// if input logical plan is not `insert into table_name <input>`, return None
pub fn breakup_insert_plan(
plan: &LogicalPlan,
catalog: &str,
schema: &str,
) -> Option<(TableName, Arc<LogicalPlan>)> {
if let LogicalPlan::Dml(dml) = plan {
if dml.op != WriteOp::Insert(InsertOp::Append) {
return None;
}
let table_name = &dml.table_name;
let table_name = match table_name {
TableReference::Bare { table } => TableName {
catalog_name: catalog.to_string(),
schema_name: schema.to_string(),
table_name: table.to_string(),
},
TableReference::Partial { schema, table } => TableName {
catalog_name: catalog.to_string(),
schema_name: schema.to_string(),
table_name: table.to_string(),
},
TableReference::Full {
catalog,
schema,
table,
} => TableName {
catalog_name: catalog.to_string(),
schema_name: schema.to_string(),
table_name: table.to_string(),
},
};
let logical_plan = dml.input.clone();
Some((table_name, logical_plan))
} else {
None
}
}
/// create a `insert into table_name <input>` logical plan
pub fn add_insert_to_logical_plan(
table_name: TableName,
table_schema: datafusion_common::DFSchemaRef,
input: LogicalPlan,
) -> Result<LogicalPlan> {
let table_name = TableReference::Full {
catalog: table_name.catalog_name.into(),
schema: table_name.schema_name.into(),
table: table_name.table_name.into(),
};
let plan = LogicalPlan::Dml(DmlStatement::new(
table_name,
table_schema,
WriteOp::Insert(InsertOp::Append),
Arc::new(input),
));
let plan = plan.recompute_schema().context(GeneralDataFusionSnafu)?;
Ok(plan)
}
/// The datafusion `[LogicalPlan]` decoder.
#[async_trait::async_trait]
pub trait SubstraitPlanDecoder {

View File

@@ -58,7 +58,7 @@ use crate::metrics::{METRIC_FLOW_INSERT_ELAPSED, METRIC_FLOW_ROWS, METRIC_FLOW_R
use crate::repr::{self, DiffRow, RelationDesc, Row, BATCH_SIZE};
use crate::{CreateFlowArgs, FlowId, TableName};
mod flownode_impl;
pub(crate) mod flownode_impl;
mod parse_expr;
pub(crate) mod refill;
mod stat;
@@ -158,7 +158,7 @@ pub struct FlowWorkerManager {
flow_err_collectors: RwLock<BTreeMap<FlowId, ErrCollector>>,
src_send_buf_lens: RwLock<BTreeMap<TableId, watch::Receiver<usize>>>,
tick_manager: FlowTickManager,
node_id: Option<u32>,
pub node_id: Option<u32>,
/// Lock for flushing, will be `read` by `handle_inserts` and `write` by `flush_flow`
///
/// So that a series of event like `inserts -> flush` can be handled correctly

View File

@@ -20,24 +20,35 @@ use api::v1::flow::{
flow_request, CreateRequest, DropRequest, FlowRequest, FlowResponse, FlushFlow,
};
use api::v1::region::InsertRequests;
use catalog::CatalogManager;
use common_error::ext::BoxedError;
use common_meta::ddl::create_flow::FlowType;
use common_meta::error::{Result as MetaResult, UnexpectedSnafu};
use common_meta::key::flow::FlowMetadataManager;
use common_runtime::JoinHandle;
use common_telemetry::{trace, warn};
use common_telemetry::{error, info, trace, warn};
use datatypes::value::Value;
use futures::TryStreamExt;
use itertools::Itertools;
use session::context::QueryContextBuilder;
use snafu::{IntoError, OptionExt, ResultExt};
use store_api::storage::{RegionId, TableId};
use tokio::sync::Mutex;
use crate::adapter::{CreateFlowArgs, FlowWorkerManager};
use crate::batching_mode::engine::BatchingEngine;
use crate::engine::FlowEngine;
use crate::error::{CreateFlowSnafu, FlowNotFoundSnafu, InsertIntoFlowSnafu, InternalSnafu};
use crate::error::{
CreateFlowSnafu, ExternalSnafu, FlowNotFoundSnafu, InsertIntoFlowSnafu, InternalSnafu,
ListFlowsSnafu,
};
use crate::metrics::METRIC_FLOW_TASK_COUNT;
use crate::repr::{self, DiffRow};
use crate::{Error, FlowId};
/// Ref to [`FlowDualEngine`]
pub type FlowDualEngineRef = Arc<FlowDualEngine>;
/// Manage both streaming and batching mode engine
///
/// including create/drop/flush flow
@@ -47,8 +58,293 @@ pub struct FlowDualEngine {
batching_engine: Arc<BatchingEngine>,
/// helper struct for faster query flow by table id or vice versa
src_table2flow: std::sync::RwLock<SrcTableToFlow>,
flow_metadata_manager: Arc<FlowMetadataManager>,
catalog_manager: Arc<dyn CatalogManager>,
check_task: tokio::sync::Mutex<Option<ConsistentCheckTask>>,
}
impl FlowDualEngine {
pub fn new(
streaming_engine: Arc<FlowWorkerManager>,
batching_engine: Arc<BatchingEngine>,
flow_metadata_manager: Arc<FlowMetadataManager>,
catalog_manager: Arc<dyn CatalogManager>,
) -> Self {
Self {
streaming_engine,
batching_engine,
src_table2flow: std::sync::RwLock::new(SrcTableToFlow::default()),
flow_metadata_manager,
catalog_manager,
check_task: Mutex::new(None),
}
}
pub fn streaming_engine(&self) -> Arc<FlowWorkerManager> {
self.streaming_engine.clone()
}
pub fn batching_engine(&self) -> Arc<BatchingEngine> {
self.batching_engine.clone()
}
/// Spawn a task to consistently check if all flow tasks in metasrv is created on flownode,
/// so on startup, this will create all missing flow tasks, and constantly check at a interval
async fn check_flow_consistent(
&self,
allow_create: bool,
allow_drop: bool,
) -> Result<(), Error> {
let nodeid = self.streaming_engine.node_id;
let should_exists: Vec<_> = if let Some(nodeid) = nodeid {
let to_be_recover = self
.flow_metadata_manager
.flownode_flow_manager()
.flows(nodeid.into())
.try_collect::<Vec<_>>()
.await
.context(ListFlowsSnafu {
id: Some(nodeid.into()),
})?;
to_be_recover.into_iter().map(|(id, _)| id).collect()
} else {
let all_catalogs = self
.catalog_manager
.catalog_names()
.await
.map_err(BoxedError::new)
.context(ExternalSnafu)?;
let mut all_flow_ids = vec![];
for catalog in all_catalogs {
let flows = self
.flow_metadata_manager
.flow_name_manager()
.flow_names(&catalog)
.await
.try_collect::<Vec<_>>()
.await
.map_err(BoxedError::new)
.context(ExternalSnafu)?;
all_flow_ids.extend(flows.into_iter().map(|(_, id)| id.flow_id()));
}
all_flow_ids
};
let should_exists = should_exists
.into_iter()
.map(|i| i as FlowId)
.collect::<HashSet<_>>();
let actual_exist = self.list_flows().await?.into_iter().collect::<HashSet<_>>();
let to_be_created = should_exists
.iter()
.filter(|id| !actual_exist.contains(id))
.collect::<Vec<_>>();
let to_be_dropped = actual_exist
.iter()
.filter(|id| !should_exists.contains(id))
.collect::<Vec<_>>();
if !to_be_created.is_empty() {
if allow_create {
info!(
"Recovering {} flows: {:?}",
to_be_created.len(),
to_be_created
);
let mut errors = vec![];
for flow_id in to_be_created {
let flow_id = *flow_id;
let info = self
.flow_metadata_manager
.flow_info_manager()
.get(flow_id as u32)
.await
.map_err(BoxedError::new)
.context(ExternalSnafu)?
.context(FlowNotFoundSnafu { id: flow_id })?;
let sink_table_name = [
info.sink_table_name().catalog_name.clone(),
info.sink_table_name().schema_name.clone(),
info.sink_table_name().table_name.clone(),
];
let args = CreateFlowArgs {
flow_id,
sink_table_name,
source_table_ids: info.source_table_ids().to_vec(),
// because recover should only happen on restart the `create_if_not_exists` and `or_replace` can be arbitrary value(since flow doesn't exist)
// but for the sake of consistency and to make sure recover of flow actually happen, we set both to true
// (which is also fine since checks for not allow both to be true is on metasrv and we already pass that)
create_if_not_exists: true,
or_replace: true,
expire_after: info.expire_after(),
comment: Some(info.comment().clone()),
sql: info.raw_sql().clone(),
flow_options: info.options().clone(),
query_ctx: Some(
QueryContextBuilder::default()
.current_catalog(info.catalog_name().clone())
.build(),
),
};
if let Err(err) = self
.create_flow(args)
.await
.map_err(BoxedError::new)
.with_context(|_| CreateFlowSnafu {
sql: info.raw_sql().clone(),
})
{
errors.push((flow_id, err));
}
}
for (flow_id, err) in errors {
warn!("Failed to recreate flow {}, err={:#?}", flow_id, err);
}
} else {
warn!(
"Flownode {:?} found flows not exist in flownode, flow_ids={:?}",
nodeid, to_be_created
);
}
}
if !to_be_dropped.is_empty() {
if allow_drop {
info!("Dropping flows: {:?}", to_be_dropped);
let mut errors = vec![];
for flow_id in to_be_dropped {
let flow_id = *flow_id;
if let Err(err) = self.remove_flow(flow_id).await {
errors.push((flow_id, err));
}
}
for (flow_id, err) in errors {
warn!("Failed to drop flow {}, err={:#?}", flow_id, err);
}
} else {
warn!(
"Flownode {:?} found flows not exist in flownode, flow_ids={:?}",
nodeid, to_be_dropped
);
}
}
Ok(())
}
pub async fn start_flow_consistent_check_task(self: &Arc<Self>) -> Result<(), Error> {
if self.check_task.lock().await.is_some() {
crate::error::UnexpectedSnafu {
reason: "Flow consistent check task already exists",
}
.fail()?;
}
let task = ConsistentCheckTask::start_check_task(self).await?;
self.check_task.lock().await.replace(task);
Ok(())
}
pub async fn stop_flow_consistent_check_task(&self) -> Result<(), Error> {
info!("Stopping flow consistent check task");
if let Some(task) = self.check_task.lock().await.take() {
task.stop().await?;
} else {
crate::error::UnexpectedSnafu {
reason: "Flow consistent check task does not exist",
}
.fail()?;
}
info!("Stopped flow consistent check task");
Ok(())
}
async fn flow_exist_in_metadata(&self, flow_id: FlowId) -> Result<bool, Error> {
self.flow_metadata_manager
.flow_info_manager()
.get(flow_id as u32)
.await
.map_err(BoxedError::new)
.context(ExternalSnafu)
.map(|info| info.is_some())
}
}
struct ConsistentCheckTask {
handle: JoinHandle<()>,
shutdown_tx: tokio::sync::mpsc::Sender<()>,
trigger_tx: tokio::sync::mpsc::Sender<(bool, bool, tokio::sync::oneshot::Sender<()>)>,
}
impl ConsistentCheckTask {
async fn start_check_task(engine: &Arc<FlowDualEngine>) -> Result<Self, Error> {
// first do recover flows
engine.check_flow_consistent(true, false).await?;
let inner = engine.clone();
let (tx, mut rx) = tokio::sync::mpsc::channel(1);
let (trigger_tx, mut trigger_rx) =
tokio::sync::mpsc::channel::<(bool, bool, tokio::sync::oneshot::Sender<()>)>(10);
let handle = common_runtime::spawn_global(async move {
let mut args = (false, false);
let mut ret_signal: Option<tokio::sync::oneshot::Sender<()>> = None;
loop {
if let Err(err) = inner.check_flow_consistent(args.0, args.1).await {
error!(err; "Failed to check flow consistent");
}
if let Some(done) = ret_signal.take() {
let _ = done.send(());
}
tokio::select! {
_ = rx.recv() => break,
incoming = trigger_rx.recv() => if let Some(incoming) = incoming {
args = (incoming.0, incoming.1);
ret_signal = Some(incoming.2);
},
_ = tokio::time::sleep(std::time::Duration::from_secs(10)) => args=(false,false),
}
}
});
Ok(ConsistentCheckTask {
handle,
shutdown_tx: tx,
trigger_tx,
})
}
async fn trigger(&self, allow_create: bool, allow_drop: bool) -> Result<(), Error> {
let (tx, rx) = tokio::sync::oneshot::channel();
self.trigger_tx
.send((allow_create, allow_drop, tx))
.await
.map_err(|_| {
crate::error::UnexpectedSnafu {
reason: "Failed to send trigger signal",
}
.build()
})?;
rx.await.map_err(|_| {
crate::error::UnexpectedSnafu {
reason: "Failed to receive trigger signal",
}
.build()
})?;
Ok(())
}
async fn stop(self) -> Result<(), Error> {
self.shutdown_tx.send(()).await.map_err(|_| {
crate::error::UnexpectedSnafu {
reason: "Failed to send shutdown signal",
}
.build()
})?;
// abort so no need to wait
self.handle.abort();
Ok(())
}
}
#[derive(Default)]
struct SrcTableToFlow {
/// mapping of table ids to flow ids for streaming mode
stream: HashMap<TableId, HashSet<FlowId>>,
@@ -149,7 +445,36 @@ impl FlowEngine for FlowDualEngine {
match flow_type {
Some(FlowType::Batching) => self.batching_engine.remove_flow(flow_id).await,
Some(FlowType::Streaming) => self.streaming_engine.remove_flow(flow_id).await,
None => FlowNotFoundSnafu { id: flow_id }.fail(),
None => {
// this can happen if flownode just restart, and is stilling creating the flow
// since now that this flow should dropped, we need to trigger the consistent check and allow drop
// this rely on drop flow ddl delete metadata first, see src/common/meta/src/ddl/drop_flow.rs
warn!(
"Flow {} is not exist in the underlying engine, but exist in metadata",
flow_id
);
let mut retry = 0;
let max_retry = 10;
// keep trying to trigger consistent check
while retry < max_retry {
if let Some(task) = self.check_task.lock().await.as_ref() {
task.trigger(false, true).await?;
break;
}
retry += 1;
tokio::time::sleep(std::time::Duration::from_millis(500)).await;
}
if retry == max_retry {
error!(
"Failed to trigger consistent check after {} retries while dropping flow {}",
max_retry, flow_id
);
return FlowNotFoundSnafu { id: flow_id }.fail();
}
Ok(())
}
}?;
// remove mapping
self.src_table2flow.write().unwrap().remove_flow(flow_id);
@@ -161,7 +486,18 @@ impl FlowEngine for FlowDualEngine {
match flow_type {
Some(FlowType::Batching) => self.batching_engine.flush_flow(flow_id).await,
Some(FlowType::Streaming) => self.streaming_engine.flush_flow(flow_id).await,
None => FlowNotFoundSnafu { id: flow_id }.fail(),
None => {
// this might happen if flownode only just started
if self.flow_exist_in_metadata(flow_id).await? {
warn!(
"Flow {} is not exist in the underlying engine, but exist in metadata",
flow_id
);
Ok(0)
} else {
FlowNotFoundSnafu { id: flow_id }.fail()
}
}
}
}
@@ -175,6 +511,13 @@ impl FlowEngine for FlowDualEngine {
}
}
async fn list_flows(&self) -> Result<Vec<FlowId>, Error> {
let mut stream_flows = self.streaming_engine.list_flows().await?;
let batch_flows = self.batching_engine.list_flows().await?;
stream_flows.extend(batch_flows);
Ok(stream_flows)
}
async fn handle_flow_inserts(
&self,
request: api::v1::region::InsertRequests,
@@ -449,6 +792,16 @@ impl FlowEngine for FlowWorkerManager {
self.flow_exist_inner(flow_id).await
}
async fn list_flows(&self) -> Result<Vec<FlowId>, Error> {
Ok(self
.flow_err_collectors
.read()
.await
.keys()
.cloned()
.collect())
}
async fn handle_flow_inserts(
&self,
request: api::v1::region::InsertRequests,

View File

@@ -17,14 +17,16 @@
use std::collections::{BTreeMap, HashMap};
use std::sync::Arc;
use catalog::CatalogManagerRef;
use common_error::ext::BoxedError;
use common_meta::ddl::create_flow::FlowType;
use common_meta::key::flow::FlowMetadataManagerRef;
use common_meta::key::table_info::TableInfoManager;
use common_meta::key::table_info::{TableInfoManager, TableInfoValue};
use common_meta::key::TableMetadataManagerRef;
use common_runtime::JoinHandle;
use common_telemetry::info;
use common_telemetry::tracing::warn;
use common_time::TimeToLive;
use query::QueryEngineRef;
use snafu::{ensure, OptionExt, ResultExt};
use store_api::storage::RegionId;
@@ -36,7 +38,10 @@ use crate::batching_mode::task::BatchingTask;
use crate::batching_mode::time_window::{find_time_window_expr, TimeWindowExpr};
use crate::batching_mode::utils::sql_to_df_plan;
use crate::engine::FlowEngine;
use crate::error::{ExternalSnafu, FlowAlreadyExistSnafu, TableNotFoundMetaSnafu, UnexpectedSnafu};
use crate::error::{
ExternalSnafu, FlowAlreadyExistSnafu, InvalidQuerySnafu, TableNotFoundMetaSnafu,
UnexpectedSnafu,
};
use crate::{CreateFlowArgs, Error, FlowId, TableName};
/// Batching mode Engine, responsible for driving all the batching mode tasks
@@ -48,6 +53,7 @@ pub struct BatchingEngine {
frontend_client: Arc<FrontendClient>,
flow_metadata_manager: FlowMetadataManagerRef,
table_meta: TableMetadataManagerRef,
catalog_manager: CatalogManagerRef,
query_engine: QueryEngineRef,
}
@@ -57,6 +63,7 @@ impl BatchingEngine {
query_engine: QueryEngineRef,
flow_metadata_manager: FlowMetadataManagerRef,
table_meta: TableMetadataManagerRef,
catalog_manager: CatalogManagerRef,
) -> Self {
Self {
tasks: Default::default(),
@@ -64,6 +71,7 @@ impl BatchingEngine {
frontend_client,
flow_metadata_manager,
table_meta,
catalog_manager,
query_engine,
}
}
@@ -179,6 +187,16 @@ async fn get_table_name(
table_info: &TableInfoManager,
table_id: &TableId,
) -> Result<TableName, Error> {
get_table_info(table_info, table_id)
.await
.map(|info| info.table_name())
.map(|name| [name.catalog_name, name.schema_name, name.table_name])
}
async fn get_table_info(
table_info: &TableInfoManager,
table_id: &TableId,
) -> Result<TableInfoValue, Error> {
table_info
.get(*table_id)
.await
@@ -187,8 +205,7 @@ async fn get_table_name(
.with_context(|| UnexpectedSnafu {
reason: format!("Table id = {:?}, couldn't found table name", table_id),
})
.map(|name| name.table_name())
.map(|name| [name.catalog_name, name.schema_name, name.table_name])
.map(|info| info.into_inner())
}
impl BatchingEngine {
@@ -248,7 +265,19 @@ impl BatchingEngine {
let query_ctx = Arc::new(query_ctx);
let mut source_table_names = Vec::with_capacity(2);
for src_id in source_table_ids {
// also check table option to see if ttl!=instant
let table_name = get_table_name(self.table_meta.table_info_manager(), &src_id).await?;
let table_info = get_table_info(self.table_meta.table_info_manager(), &src_id).await?;
if table_info.table_info.meta.options.ttl == Some(TimeToLive::Instant) {
InvalidQuerySnafu {
reason: format!(
"Source table `{}`(id={}) has instant TTL, flow will only evaluate to empty results with such table, use a small ttl instead of instant",
table_name.join("."),
src_id
),
}
.fail()?;
}
source_table_names.push(table_name);
}
@@ -273,7 +302,14 @@ impl BatchingEngine {
})
.transpose()?;
info!("Flow id={}, found time window expr={:?}", flow_id, phy_expr);
info!(
"Flow id={}, found time window expr={}",
flow_id,
phy_expr
.as_ref()
.map(|phy_expr| phy_expr.to_string())
.unwrap_or("None".to_string())
);
let task = BatchingTask::new(
flow_id,
@@ -284,7 +320,7 @@ impl BatchingEngine {
sink_table_name,
source_table_names,
query_ctx,
self.table_meta.clone(),
self.catalog_manager.clone(),
rx,
);
@@ -295,10 +331,11 @@ impl BatchingEngine {
// check execute once first to detect any error early
task.check_execute(&engine, &frontend).await?;
// TODO(discord9): also save handle & use time wheel or what for better
let _handle = common_runtime::spawn_global(async move {
// TODO(discord9): use time wheel or what for better
let handle = common_runtime::spawn_global(async move {
task_inner.start_executing_loop(engine, frontend).await;
});
task.state.write().unwrap().task_handle = Some(handle);
// only replace here not earlier because we want the old one intact if something went wrong before this line
let replaced_old_task_opt = self.tasks.write().await.insert(flow_id, task);
@@ -357,6 +394,9 @@ impl FlowEngine for BatchingEngine {
async fn flow_exist(&self, flow_id: FlowId) -> Result<bool, Error> {
Ok(self.flow_exist_inner(flow_id).await)
}
async fn list_flows(&self) -> Result<Vec<FlowId>, Error> {
Ok(self.tasks.read().await.keys().cloned().collect())
}
async fn handle_flow_inserts(
&self,
request: api::v1::region::InsertRequests,

View File

@@ -14,44 +14,104 @@
//! Frontend client to run flow as batching task which is time-window-aware normal query triggered every tick set by user
use std::sync::Arc;
use std::sync::{Arc, Weak};
use client::{Client, Database, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_error::ext::BoxedError;
use api::v1::greptime_request::Request;
use api::v1::CreateTableExpr;
use client::{Client, Database};
use common_error::ext::{BoxedError, ErrorExt};
use common_grpc::channel_manager::{ChannelConfig, ChannelManager};
use common_meta::cluster::{NodeInfo, NodeInfoKey, Role};
use common_meta::peer::Peer;
use common_meta::rpc::store::RangeRequest;
use common_query::Output;
use meta_client::client::MetaClient;
use snafu::ResultExt;
use servers::query_handler::grpc::GrpcQueryHandler;
use session::context::{QueryContextBuilder, QueryContextRef};
use snafu::{OptionExt, ResultExt};
use crate::batching_mode::DEFAULT_BATCHING_ENGINE_QUERY_TIMEOUT;
use crate::error::{ExternalSnafu, UnexpectedSnafu};
use crate::error::{ExternalSnafu, InvalidRequestSnafu, UnexpectedSnafu};
use crate::Error;
fn default_channel_mgr() -> ChannelManager {
let cfg = ChannelConfig::new().timeout(DEFAULT_BATCHING_ENGINE_QUERY_TIMEOUT);
ChannelManager::with_config(cfg)
/// Just like [`GrpcQueryHandler`] but use BoxedError
///
/// basically just a specialized `GrpcQueryHandler<Error=BoxedError>`
///
/// this is only useful for flownode to
/// invoke frontend Instance in standalone mode
#[async_trait::async_trait]
pub trait GrpcQueryHandlerWithBoxedError: Send + Sync + 'static {
async fn do_query(
&self,
query: Request,
ctx: QueryContextRef,
) -> std::result::Result<Output, BoxedError>;
}
fn client_from_urls(addrs: Vec<String>) -> Client {
Client::with_manager_and_urls(default_channel_mgr(), addrs)
/// auto impl
#[async_trait::async_trait]
impl<
E: ErrorExt + Send + Sync + 'static,
T: GrpcQueryHandler<Error = E> + Send + Sync + 'static,
> GrpcQueryHandlerWithBoxedError for T
{
async fn do_query(
&self,
query: Request,
ctx: QueryContextRef,
) -> std::result::Result<Output, BoxedError> {
self.do_query(query, ctx).await.map_err(BoxedError::new)
}
}
type HandlerMutable = Arc<std::sync::Mutex<Option<Weak<dyn GrpcQueryHandlerWithBoxedError>>>>;
/// A simple frontend client able to execute sql using grpc protocol
#[derive(Debug)]
///
/// This is for computation-heavy query which need to offload computation to frontend, lifting the load from flownode
#[derive(Debug, Clone)]
pub enum FrontendClient {
Distributed {
meta_client: Arc<MetaClient>,
chnl_mgr: ChannelManager,
},
Standalone {
/// for the sake of simplicity still use grpc even in standalone mode
/// notice the client here should all be lazy, so that can wait after frontend is booted then make conn
/// TODO(discord9): not use grpc under standalone mode
database_client: DatabaseWithPeer,
database_client: HandlerMutable,
},
}
impl FrontendClient {
pub fn from_empty_grpc_handler() -> (Self, HandlerMutable) {
let handler = Arc::new(std::sync::Mutex::new(None));
(
Self::Standalone {
database_client: handler.clone(),
},
handler,
)
}
pub fn from_meta_client(meta_client: Arc<MetaClient>) -> Self {
Self::Distributed {
meta_client,
chnl_mgr: {
let cfg = ChannelConfig::new().timeout(DEFAULT_BATCHING_ENGINE_QUERY_TIMEOUT);
ChannelManager::with_config(cfg)
},
}
}
pub fn from_grpc_handler(grpc_handler: Weak<dyn GrpcQueryHandlerWithBoxedError>) -> Self {
Self::Standalone {
database_client: Arc::new(std::sync::Mutex::new(Some(grpc_handler))),
}
}
}
#[derive(Debug, Clone)]
pub struct DatabaseWithPeer {
pub database: Database,
@@ -64,25 +124,6 @@ impl DatabaseWithPeer {
}
}
impl FrontendClient {
pub fn from_meta_client(meta_client: Arc<MetaClient>) -> Self {
Self::Distributed { meta_client }
}
pub fn from_static_grpc_addr(addr: String) -> Self {
let peer = Peer {
id: 0,
addr: addr.clone(),
};
let client = client_from_urls(vec![addr]);
let database = Database::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, client);
Self::Standalone {
database_client: DatabaseWithPeer::new(database, peer),
}
}
}
impl FrontendClient {
async fn scan_for_frontend(&self) -> Result<Vec<(NodeInfoKey, NodeInfo)>, Error> {
let Self::Distributed { meta_client, .. } = self else {
@@ -115,10 +156,21 @@ impl FrontendClient {
}
/// Get the database with max `last_activity_ts`
async fn get_last_active_frontend(&self) -> Result<DatabaseWithPeer, Error> {
if let Self::Standalone { database_client } = self {
return Ok(database_client.clone());
}
async fn get_last_active_frontend(
&self,
catalog: &str,
schema: &str,
) -> Result<DatabaseWithPeer, Error> {
let Self::Distributed {
meta_client: _,
chnl_mgr,
} = self
else {
return UnexpectedSnafu {
reason: "Expect distributed mode",
}
.fail();
};
let frontends = self.scan_for_frontend().await?;
let mut peer = None;
@@ -133,16 +185,114 @@ impl FrontendClient {
}
.fail()?
};
let client = client_from_urls(vec![peer.addr.clone()]);
let database = Database::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, client);
let client = Client::with_manager_and_urls(chnl_mgr.clone(), vec![peer.addr.clone()]);
let database = Database::new(catalog, schema, client);
Ok(DatabaseWithPeer::new(database, peer))
}
/// Get a database client, and possibly update it before returning.
pub async fn get_database_client(&self) -> Result<DatabaseWithPeer, Error> {
pub async fn create(
&self,
create: CreateTableExpr,
catalog: &str,
schema: &str,
) -> Result<u32, Error> {
self.handle(
Request::Ddl(api::v1::DdlRequest {
expr: Some(api::v1::ddl_request::Expr::CreateTable(create)),
}),
catalog,
schema,
&mut None,
)
.await
}
/// Handle a request to frontend
pub(crate) async fn handle(
&self,
req: api::v1::greptime_request::Request,
catalog: &str,
schema: &str,
peer_desc: &mut Option<PeerDesc>,
) -> Result<u32, Error> {
match self {
Self::Standalone { database_client } => Ok(database_client.clone()),
Self::Distributed { meta_client: _ } => self.get_last_active_frontend().await,
FrontendClient::Distributed { .. } => {
let db = self.get_last_active_frontend(catalog, schema).await?;
*peer_desc = Some(PeerDesc::Dist {
peer: db.peer.clone(),
});
db.database
.handle(req.clone())
.await
.with_context(|_| InvalidRequestSnafu {
context: format!("Failed to handle request: {:?}", req),
})
}
FrontendClient::Standalone { database_client } => {
let ctx = QueryContextBuilder::default()
.current_catalog(catalog.to_string())
.current_schema(schema.to_string())
.build();
let ctx = Arc::new(ctx);
{
let database_client = {
database_client
.lock()
.unwrap()
.as_ref()
.context(UnexpectedSnafu {
reason: "Standalone's frontend instance is not set",
})?
.upgrade()
.context(UnexpectedSnafu {
reason: "Failed to upgrade database client",
})?
};
let resp: common_query::Output = database_client
.do_query(req.clone(), ctx)
.await
.map_err(BoxedError::new)
.context(ExternalSnafu)?;
match resp.data {
common_query::OutputData::AffectedRows(rows) => {
Ok(rows.try_into().map_err(|_| {
UnexpectedSnafu {
reason: format!("Failed to convert rows to u32: {}", rows),
}
.build()
})?)
}
_ => UnexpectedSnafu {
reason: "Unexpected output data",
}
.fail(),
}
}
}
}
}
}
/// Describe a peer of frontend
#[derive(Debug, Default)]
pub(crate) enum PeerDesc {
/// Distributed mode's frontend peer address
Dist {
/// frontend peer address
peer: Peer,
},
/// Standalone mode
#[default]
Standalone,
}
impl std::fmt::Display for PeerDesc {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
PeerDesc::Dist { peer } => write!(f, "{}", peer.addr),
PeerDesc::Standalone => write!(f, "standalone"),
}
}
}

View File

@@ -46,6 +46,8 @@ pub struct TaskState {
exec_state: ExecState,
/// Shutdown receiver
pub(crate) shutdown_rx: oneshot::Receiver<()>,
/// Task handle
pub(crate) task_handle: Option<tokio::task::JoinHandle<()>>,
}
impl TaskState {
pub fn new(query_ctx: QueryContextRef, shutdown_rx: oneshot::Receiver<()>) -> Self {
@@ -56,6 +58,7 @@ impl TaskState {
dirty_time_windows: Default::default(),
exec_state: ExecState::Idle,
shutdown_rx,
task_handle: None,
}
}

View File

@@ -12,33 +12,32 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashSet;
use std::collections::{BTreeSet, HashSet};
use std::ops::Deref;
use std::sync::{Arc, RwLock};
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use api::v1::CreateTableExpr;
use arrow_schema::Fields;
use catalog::CatalogManagerRef;
use common_error::ext::BoxedError;
use common_meta::key::table_name::TableNameKey;
use common_meta::key::TableMetadataManagerRef;
use common_query::logical_plan::breakup_insert_plan;
use common_telemetry::tracing::warn;
use common_telemetry::{debug, info};
use common_time::Timestamp;
use datafusion::optimizer::analyzer::count_wildcard_rule::CountWildcardRule;
use datafusion::optimizer::AnalyzerRule;
use datafusion::sql::unparser::expr_to_sql;
use datafusion_common::tree_node::TreeNode;
use datafusion_common::tree_node::{Transformed, TreeNode};
use datafusion_expr::{DmlStatement, LogicalPlan, WriteOp};
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::constraint::NOW_FN;
use datatypes::schema::{ColumnDefaultConstraint, ColumnSchema};
use datatypes::value::Value;
use datatypes::schema::{ColumnSchema, Schema};
use operator::expr_helper::column_schemas_to_defs;
use query::query_engine::DefaultSerializer;
use query::QueryEngineRef;
use session::context::QueryContextRef;
use snafu::{OptionExt, ResultExt};
use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan};
use table::metadata::RawTableMeta;
use tokio::sync::oneshot;
use tokio::sync::oneshot::error::TryRecvError;
use tokio::time::Instant;
@@ -48,14 +47,15 @@ use crate::batching_mode::frontend_client::FrontendClient;
use crate::batching_mode::state::TaskState;
use crate::batching_mode::time_window::TimeWindowExpr;
use crate::batching_mode::utils::{
sql_to_df_plan, AddAutoColumnRewriter, AddFilterRewriter, FindGroupByFinalName,
get_table_info_df_schema, sql_to_df_plan, AddAutoColumnRewriter, AddFilterRewriter,
FindGroupByFinalName,
};
use crate::batching_mode::{
DEFAULT_BATCHING_ENGINE_QUERY_TIMEOUT, MIN_REFRESH_DURATION, SLOW_QUERY_THRESHOLD,
};
use crate::error::{
ConvertColumnSchemaSnafu, DatafusionSnafu, DatatypesSnafu, ExternalSnafu, InvalidRequestSnafu,
SubstraitEncodeLogicalPlanSnafu, TableNotFoundMetaSnafu, TableNotFoundSnafu, UnexpectedSnafu,
ConvertColumnSchemaSnafu, DatafusionSnafu, ExternalSnafu, InvalidQuerySnafu,
SubstraitEncodeLogicalPlanSnafu, UnexpectedSnafu,
};
use crate::metrics::{
METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME, METRIC_FLOW_BATCHING_ENGINE_SLOW_QUERY,
@@ -73,7 +73,7 @@ pub struct TaskConfig {
pub expire_after: Option<i64>,
sink_table_name: [String; 3],
pub source_table_names: HashSet<[String; 3]>,
table_meta: TableMetadataManagerRef,
catalog_manager: CatalogManagerRef,
}
#[derive(Clone)]
@@ -93,7 +93,7 @@ impl BatchingTask {
sink_table_name: [String; 3],
source_table_names: Vec<[String; 3]>,
query_ctx: QueryContextRef,
table_meta: TableMetadataManagerRef,
catalog_manager: CatalogManagerRef,
shutdown_rx: oneshot::Receiver<()>,
) -> Self {
Self {
@@ -105,7 +105,7 @@ impl BatchingTask {
expire_after,
sink_table_name,
source_table_names: source_table_names.into_iter().collect(),
table_meta,
catalog_manager,
}),
state: Arc::new(RwLock::new(TaskState::new(query_ctx, shutdown_rx))),
}
@@ -148,13 +148,8 @@ impl BatchingTask {
async fn is_table_exist(&self, table_name: &[String; 3]) -> Result<bool, Error> {
self.config
.table_meta
.table_name_manager()
.exists(TableNameKey {
catalog: &table_name[0],
schema: &table_name[1],
table: &table_name[2],
})
.catalog_manager
.table_exists(&table_name[0], &table_name[1], &table_name[2], None)
.await
.map_err(BoxedError::new)
.context(ExternalSnafu)
@@ -166,8 +161,10 @@ impl BatchingTask {
frontend_client: &Arc<FrontendClient>,
) -> Result<Option<(u32, Duration)>, Error> {
if let Some(new_query) = self.gen_insert_plan(engine).await? {
debug!("Generate new query: {:#?}", new_query);
self.execute_logical_plan(frontend_client, &new_query).await
} else {
debug!("Generate no query");
Ok(None)
}
}
@@ -176,67 +173,35 @@ impl BatchingTask {
&self,
engine: &QueryEngineRef,
) -> Result<Option<LogicalPlan>, Error> {
let full_table_name = self.config.sink_table_name.clone().join(".");
let table_id = self
.config
.table_meta
.table_name_manager()
.get(common_meta::key::table_name::TableNameKey::new(
&self.config.sink_table_name[0],
&self.config.sink_table_name[1],
&self.config.sink_table_name[2],
))
.await
.with_context(|_| TableNotFoundMetaSnafu {
msg: full_table_name.clone(),
})?
.map(|t| t.table_id())
.with_context(|| TableNotFoundSnafu {
name: full_table_name.clone(),
})?;
let table = self
.config
.table_meta
.table_info_manager()
.get(table_id)
.await
.with_context(|_| TableNotFoundMetaSnafu {
msg: full_table_name.clone(),
})?
.with_context(|| TableNotFoundSnafu {
name: full_table_name.clone(),
})?
.into_inner();
let schema: datatypes::schema::Schema = table
.table_info
.meta
.schema
.clone()
.try_into()
.with_context(|_| DatatypesSnafu {
extra: format!(
"Failed to convert schema from raw schema, raw_schema={:?}",
table.table_info.meta.schema
),
})?;
let df_schema = Arc::new(schema.arrow_schema().clone().try_into().with_context(|_| {
DatafusionSnafu {
context: format!(
"Failed to convert arrow schema to datafusion schema, arrow_schema={:?}",
schema.arrow_schema()
),
}
})?);
let (table, df_schema) = get_table_info_df_schema(
self.config.catalog_manager.clone(),
self.config.sink_table_name.clone(),
)
.await?;
let new_query = self
.gen_query_with_time_window(engine.clone(), &table.table_info.meta)
.gen_query_with_time_window(engine.clone(), &table.meta.schema)
.await?;
let insert_into = if let Some((new_query, _column_cnt)) = new_query {
// first check if all columns in input query exists in sink table
// since insert into ref to names in record batch generate by given query
let table_columns = df_schema
.columns()
.into_iter()
.map(|c| c.name)
.collect::<BTreeSet<_>>();
for column in new_query.schema().columns() {
if !table_columns.contains(column.name()) {
return InvalidQuerySnafu {
reason: format!(
"Column {} not found in sink table with columns {:?}",
column, table_columns
),
}
.fail();
}
}
// update_at& time index placeholder (if exists) should have default value
LogicalPlan::Dml(DmlStatement::new(
datafusion_common::TableReference::Full {
@@ -251,6 +216,9 @@ impl BatchingTask {
} else {
return Ok(None);
};
let insert_into = insert_into.recompute_schema().context(DatafusionSnafu {
context: "Failed to recompute schema",
})?;
Ok(Some(insert_into))
}
@@ -259,14 +227,11 @@ impl BatchingTask {
frontend_client: &Arc<FrontendClient>,
expr: CreateTableExpr,
) -> Result<(), Error> {
let db_client = frontend_client.get_database_client().await?;
db_client
.database
.create(expr.clone())
.await
.with_context(|_| InvalidRequestSnafu {
context: format!("Failed to create table with expr: {:?}", expr),
})?;
let catalog = &self.config.sink_table_name[0];
let schema = &self.config.sink_table_name[1];
frontend_client
.create(expr.clone(), catalog, schema)
.await?;
Ok(())
}
@@ -277,27 +242,78 @@ impl BatchingTask {
) -> Result<Option<(u32, Duration)>, Error> {
let instant = Instant::now();
let flow_id = self.config.flow_id;
let db_client = frontend_client.get_database_client().await?;
let peer_addr = db_client.peer.addr;
debug!(
"Executing flow {flow_id}(expire_after={:?} secs) on {:?} with query {}",
self.config.expire_after, peer_addr, &plan
"Executing flow {flow_id}(expire_after={:?} secs) with query {}",
self.config.expire_after, &plan
);
let timer = METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME
.with_label_values(&[flow_id.to_string().as_str()])
.start_timer();
let catalog = &self.config.sink_table_name[0];
let schema = &self.config.sink_table_name[1];
let message = DFLogicalSubstraitConvertor {}
.encode(plan, DefaultSerializer)
.context(SubstraitEncodeLogicalPlanSnafu)?;
// fix all table ref by make it fully qualified, i.e. "table_name" => "catalog_name.schema_name.table_name"
let fixed_plan = plan
.clone()
.transform(|p| {
if let LogicalPlan::TableScan(mut table_scan) = p {
let resolved = table_scan.table_name.resolve(catalog, schema);
table_scan.table_name = resolved.into();
Ok(Transformed::yes(LogicalPlan::TableScan(table_scan)))
} else {
Ok(Transformed::no(p))
}
})
.with_context(|_| DatafusionSnafu {
context: format!("Failed to fix table ref in logical plan, plan={:?}", plan),
})?
.data;
let req = api::v1::greptime_request::Request::Query(api::v1::QueryRequest {
query: Some(api::v1::query_request::Query::LogicalPlan(message.to_vec())),
});
let expanded_plan = CountWildcardRule::new()
.analyze(fixed_plan.clone(), &Default::default())
.with_context(|_| DatafusionSnafu {
context: format!(
"Failed to expand wildcard in logical plan, plan={:?}",
fixed_plan
),
})?;
let res = db_client.database.handle(req).await;
drop(timer);
let plan = expanded_plan;
let mut peer_desc = None;
let res = {
let _timer = METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME
.with_label_values(&[flow_id.to_string().as_str()])
.start_timer();
// hack and special handling the insert logical plan
let req = if let Some((insert_to, insert_plan)) =
breakup_insert_plan(&plan, catalog, schema)
{
let message = DFLogicalSubstraitConvertor {}
.encode(&insert_plan, DefaultSerializer)
.context(SubstraitEncodeLogicalPlanSnafu)?;
api::v1::greptime_request::Request::Query(api::v1::QueryRequest {
query: Some(api::v1::query_request::Query::InsertIntoPlan(
api::v1::InsertIntoPlan {
table_name: Some(insert_to),
logical_plan: message.to_vec(),
},
)),
})
} else {
let message = DFLogicalSubstraitConvertor {}
.encode(&plan, DefaultSerializer)
.context(SubstraitEncodeLogicalPlanSnafu)?;
api::v1::greptime_request::Request::Query(api::v1::QueryRequest {
query: Some(api::v1::query_request::Query::LogicalPlan(message.to_vec())),
})
};
frontend_client
.handle(req, catalog, schema, &mut peer_desc)
.await
};
let elapsed = instant.elapsed();
if let Ok(affected_rows) = &res {
@@ -307,19 +323,23 @@ impl BatchingTask {
);
} else if let Err(err) = &res {
warn!(
"Failed to execute Flow {flow_id} on frontend {}, result: {err:?}, elapsed: {:?} with query: {}",
peer_addr, elapsed, &plan
"Failed to execute Flow {flow_id} on frontend {:?}, result: {err:?}, elapsed: {:?} with query: {}",
peer_desc, elapsed, &plan
);
}
// record slow query
if elapsed >= SLOW_QUERY_THRESHOLD {
warn!(
"Flow {flow_id} on frontend {} executed for {:?} before complete, query: {}",
peer_addr, elapsed, &plan
"Flow {flow_id} on frontend {:?} executed for {:?} before complete, query: {}",
peer_desc, elapsed, &plan
);
METRIC_FLOW_BATCHING_ENGINE_SLOW_QUERY
.with_label_values(&[flow_id.to_string().as_str(), &plan.to_string(), &peer_addr])
.with_label_values(&[
flow_id.to_string().as_str(),
&plan.to_string(),
&peer_desc.unwrap_or_default().to_string(),
])
.observe(elapsed.as_secs_f64());
}
@@ -328,12 +348,7 @@ impl BatchingTask {
.unwrap()
.after_query_exec(elapsed, res.is_ok());
let res = res.context(InvalidRequestSnafu {
context: format!(
"Failed to execute query for flow={}: \'{}\'",
self.config.flow_id, &plan
),
})?;
let res = res?;
Ok(Some((res, elapsed)))
}
@@ -386,14 +401,18 @@ impl BatchingTask {
continue;
}
// TODO(discord9): this error should have better place to go, but for now just print error, also more context is needed
Err(err) => match new_query {
Some(query) => {
common_telemetry::error!(err; "Failed to execute query for flow={} with query: {query}", self.config.flow_id)
Err(err) => {
match new_query {
Some(query) => {
common_telemetry::error!(err; "Failed to execute query for flow={} with query: {query}", self.config.flow_id)
}
None => {
common_telemetry::error!(err; "Failed to generate query for flow={}", self.config.flow_id)
}
}
None => {
common_telemetry::error!(err; "Failed to generate query for flow={}", self.config.flow_id)
}
},
// also sleep for a little while before try again to prevent flooding logs
tokio::time::sleep(MIN_REFRESH_DURATION).await;
}
}
}
}
@@ -418,7 +437,7 @@ impl BatchingTask {
async fn gen_query_with_time_window(
&self,
engine: QueryEngineRef,
sink_table_meta: &RawTableMeta,
sink_table_schema: &Arc<Schema>,
) -> Result<Option<(LogicalPlan, usize)>, Error> {
let query_ctx = self.state.read().unwrap().query_ctx.clone();
let start = SystemTime::now();
@@ -479,7 +498,7 @@ impl BatchingTask {
);
let mut add_auto_column =
AddAutoColumnRewriter::new(sink_table_meta.schema.clone());
AddAutoColumnRewriter::new(sink_table_schema.clone());
let plan = self
.config
.plan
@@ -515,8 +534,10 @@ impl BatchingTask {
return Ok(None);
};
// TODO(discord9): add auto column or not? This might break compatibility for auto created sink table before this, but that's ok right?
let mut add_filter = AddFilterRewriter::new(expr);
let mut add_auto_column = AddAutoColumnRewriter::new(sink_table_meta.schema.clone());
let mut add_auto_column = AddAutoColumnRewriter::new(sink_table_schema.clone());
// make a not optimized plan for clearer unparse
let plan = sql_to_df_plan(query_ctx.clone(), engine.clone(), &self.config.query, false)
.await?;
@@ -534,7 +555,7 @@ impl BatchingTask {
}
// auto created table have a auto added column `update_at`, and optional have a `AUTO_CREATED_PLACEHOLDER_TS_COL` column for time index placeholder if no timestamp column is specified
// TODO(discord9): unit test
// TODO(discord9): for now no default value is set for auto added column for compatibility reason with streaming mode, but this might change in favor of simpler code?
fn create_table_with_expr(
plan: &LogicalPlan,
sink_table_name: &[String; 3],
@@ -559,10 +580,10 @@ fn create_table_with_expr(
ConcreteDataType::timestamp_millisecond_datatype(),
true,
)
.with_default_constraint(Some(ColumnDefaultConstraint::Function(NOW_FN.to_string())))
/* .with_default_constraint(Some(ColumnDefaultConstraint::Function(NOW_FN.to_string())))
.context(DatatypesSnafu {
extra: "Failed to build column `update_at TimestampMillisecond default now()`",
})?;
})?*/ ;
column_schemas.push(update_at_schema);
let time_index = if let Some(time_index) = first_time_stamp {
@@ -574,16 +595,15 @@ fn create_table_with_expr(
ConcreteDataType::timestamp_millisecond_datatype(),
false,
)
.with_time_index(true)
.with_default_constraint(Some(ColumnDefaultConstraint::Value(Value::Timestamp(
Timestamp::new_millisecond(0),
))))
.context(DatatypesSnafu {
extra: format!(
"Failed to build column `{} TimestampMillisecond TIME INDEX default 0`",
AUTO_CREATED_PLACEHOLDER_TS_COL
),
})?,
.with_time_index(true), /* .with_default_constraint(Some(ColumnDefaultConstraint::Value(Value::Timestamp(
Timestamp::new_millisecond(0),
))))
.context(DatatypesSnafu {
extra: format!(
"Failed to build column `{} TimestampMillisecond TIME INDEX default 0`",
AUTO_CREATED_PLACEHOLDER_TS_COL
),
})?*/
);
AUTO_CREATED_PLACEHOLDER_TS_COL.to_string()
};
@@ -675,20 +695,17 @@ mod test {
AUTO_CREATED_UPDATE_AT_TS_COL,
ConcreteDataType::timestamp_millisecond_datatype(),
true,
)
.with_default_constraint(Some(ColumnDefaultConstraint::Function(NOW_FN.to_string())))
.unwrap();
);
// .with_default_constraint(Some(ColumnDefaultConstraint::Function(NOW_FN.to_string())))
let ts_placeholder_schema = ColumnSchema::new(
AUTO_CREATED_PLACEHOLDER_TS_COL,
ConcreteDataType::timestamp_millisecond_datatype(),
false,
)
.with_time_index(true)
.with_default_constraint(Some(ColumnDefaultConstraint::Value(Value::Timestamp(
Timestamp::new_millisecond(0),
))))
.unwrap();
.with_time_index(true);
// .with_default_constraint(Some(ColumnDefaultConstraint::Value(Value::Timestamp(
// Timestamp::new_millisecond(0), ))))
let testcases = vec![
TestCase {

View File

@@ -72,6 +72,17 @@ pub struct TimeWindowExpr {
df_schema: DFSchema,
}
impl std::fmt::Display for TimeWindowExpr {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("TimeWindowExpr")
.field("phy_expr", &self.phy_expr.to_string())
.field("column_name", &self.column_name)
.field("logical_expr", &self.logical_expr.to_string())
.field("df_schema", &self.df_schema)
.finish()
}
}
impl TimeWindowExpr {
pub fn from_expr(
expr: &Expr,

View File

@@ -14,29 +14,63 @@
//! some utils for helping with batching mode
use std::collections::HashSet;
use std::collections::{BTreeSet, HashSet};
use std::sync::Arc;
use catalog::CatalogManagerRef;
use common_error::ext::BoxedError;
use common_telemetry::{debug, info};
use common_telemetry::debug;
use datafusion::error::Result as DfResult;
use datafusion::logical_expr::Expr;
use datafusion::sql::unparser::Unparser;
use datafusion_common::tree_node::{
Transformed, TreeNodeRecursion, TreeNodeRewriter, TreeNodeVisitor,
};
use datafusion_common::DataFusionError;
use datafusion_expr::{Distinct, LogicalPlan};
use datatypes::schema::RawSchema;
use datafusion_common::{DFSchema, DataFusionError};
use datafusion_expr::{Distinct, LogicalPlan, Projection};
use datatypes::schema::SchemaRef;
use query::parser::QueryLanguageParser;
use query::QueryEngineRef;
use session::context::QueryContextRef;
use snafu::ResultExt;
use snafu::{OptionExt, ResultExt};
use table::metadata::TableInfo;
use crate::adapter::AUTO_CREATED_PLACEHOLDER_TS_COL;
use crate::df_optimizer::apply_df_optimizer;
use crate::error::{DatafusionSnafu, ExternalSnafu};
use crate::Error;
use crate::error::{DatafusionSnafu, ExternalSnafu, TableNotFoundSnafu};
use crate::{Error, TableName};
pub async fn get_table_info_df_schema(
catalog_mr: CatalogManagerRef,
table_name: TableName,
) -> Result<(Arc<TableInfo>, Arc<DFSchema>), Error> {
let full_table_name = table_name.clone().join(".");
let table = catalog_mr
.table(&table_name[0], &table_name[1], &table_name[2], None)
.await
.map_err(BoxedError::new)
.context(ExternalSnafu)?
.with_context(|| TableNotFoundSnafu {
name: full_table_name.clone(),
})?;
let table_info = table.table_info().clone();
let schema = table_info.meta.schema.clone();
let df_schema: Arc<DFSchema> = Arc::new(
schema
.arrow_schema()
.clone()
.try_into()
.with_context(|_| DatafusionSnafu {
context: format!(
"Failed to convert arrow schema to datafusion schema, arrow_schema={:?}",
schema.arrow_schema()
),
})?,
);
Ok((table_info, df_schema))
}
/// Convert sql to datafusion logical plan
pub async fn sql_to_df_plan(
@@ -164,14 +198,16 @@ impl TreeNodeVisitor<'_> for FindGroupByFinalName {
/// (which doesn't necessary need to have exact name just need to be a extra timestamp column)
/// and `__ts_placeholder`(this column need to have exact this name and be a timestamp)
/// with values like `now()` and `0`
///
/// it also give existing columns alias to column in sink table if needed
#[derive(Debug)]
pub struct AddAutoColumnRewriter {
pub schema: RawSchema,
pub schema: SchemaRef,
pub is_rewritten: bool,
}
impl AddAutoColumnRewriter {
pub fn new(schema: RawSchema) -> Self {
pub fn new(schema: SchemaRef) -> Self {
Self {
schema,
is_rewritten: false,
@@ -181,37 +217,88 @@ impl AddAutoColumnRewriter {
impl TreeNodeRewriter for AddAutoColumnRewriter {
type Node = LogicalPlan;
fn f_down(&mut self, node: Self::Node) -> DfResult<Transformed<Self::Node>> {
fn f_down(&mut self, mut node: Self::Node) -> DfResult<Transformed<Self::Node>> {
if self.is_rewritten {
return Ok(Transformed::no(node));
}
// if is distinct all, go one level down
if let LogicalPlan::Distinct(Distinct::All(_)) = node {
return Ok(Transformed::no(node));
// if is distinct all, wrap it in a projection
if let LogicalPlan::Distinct(Distinct::All(_)) = &node {
let mut exprs = vec![];
for field in node.schema().fields().iter() {
exprs.push(Expr::Column(datafusion::common::Column::new_unqualified(
field.name(),
)));
}
let projection =
LogicalPlan::Projection(Projection::try_new(exprs, Arc::new(node.clone()))?);
node = projection;
}
// handle table_scan by wrap it in a projection
else if let LogicalPlan::TableScan(table_scan) = node {
let mut exprs = vec![];
for field in table_scan.projected_schema.fields().iter() {
exprs.push(Expr::Column(datafusion::common::Column::new(
Some(table_scan.table_name.clone()),
field.name(),
)));
}
let projection = LogicalPlan::Projection(Projection::try_new(
exprs,
Arc::new(LogicalPlan::TableScan(table_scan)),
)?);
node = projection;
}
// FIXME(discord9): just read plan.expr and do stuffs
let mut exprs = node.expressions();
let all_names = self
.schema
.column_schemas()
.iter()
.map(|c| c.name.clone())
.collect::<BTreeSet<_>>();
// first match by position
for (idx, expr) in exprs.iter_mut().enumerate() {
if !all_names.contains(&expr.qualified_name().1) {
if let Some(col_name) = self
.schema
.column_schemas()
.get(idx)
.map(|c| c.name.clone())
{
*expr = expr.clone().alias(col_name);
}
}
}
// add columns if have different column count
let query_col_cnt = exprs.len();
let table_col_cnt = self.schema.column_schemas.len();
info!("query_col_cnt={query_col_cnt}, table_col_cnt={table_col_cnt}");
let table_col_cnt = self.schema.column_schemas().len();
debug!("query_col_cnt={query_col_cnt}, table_col_cnt={table_col_cnt}");
let placeholder_ts_expr =
datafusion::logical_expr::lit(0).alias(AUTO_CREATED_PLACEHOLDER_TS_COL);
if query_col_cnt == table_col_cnt {
self.is_rewritten = true;
return Ok(Transformed::no(node));
// still need to add alias, see below
} else if query_col_cnt + 1 == table_col_cnt {
let last_col_schema = self.schema.column_schemas.last().unwrap();
let last_col_schema = self.schema.column_schemas().last().unwrap();
// if time index column is auto created add it
if last_col_schema.name == AUTO_CREATED_PLACEHOLDER_TS_COL
&& self.schema.timestamp_index == Some(table_col_cnt - 1)
&& self.schema.timestamp_index() == Some(table_col_cnt - 1)
{
exprs.push(datafusion::logical_expr::lit(0));
exprs.push(placeholder_ts_expr);
} else if last_col_schema.data_type.is_timestamp() {
// is the update at column
exprs.push(datafusion::prelude::now());
exprs.push(datafusion::prelude::now().alias(&last_col_schema.name));
} else {
// helpful error message
return Err(DataFusionError::Plan(format!(
@@ -221,11 +308,11 @@ impl TreeNodeRewriter for AddAutoColumnRewriter {
)));
}
} else if query_col_cnt + 2 == table_col_cnt {
let mut col_iter = self.schema.column_schemas.iter().rev();
let mut col_iter = self.schema.column_schemas().iter().rev();
let last_col_schema = col_iter.next().unwrap();
let second_last_col_schema = col_iter.next().unwrap();
if second_last_col_schema.data_type.is_timestamp() {
exprs.push(datafusion::prelude::now());
exprs.push(datafusion::prelude::now().alias(&second_last_col_schema.name));
} else {
return Err(DataFusionError::Plan(format!(
"Expect the second last column in the table to be timestamp column, found column {} with type {:?}",
@@ -235,9 +322,9 @@ impl TreeNodeRewriter for AddAutoColumnRewriter {
}
if last_col_schema.name == AUTO_CREATED_PLACEHOLDER_TS_COL
&& self.schema.timestamp_index == Some(table_col_cnt - 1)
&& self.schema.timestamp_index() == Some(table_col_cnt - 1)
{
exprs.push(datafusion::logical_expr::lit(0));
exprs.push(placeholder_ts_expr);
} else {
return Err(DataFusionError::Plan(format!(
"Expect timestamp column {}, found {:?}",
@@ -246,8 +333,8 @@ impl TreeNodeRewriter for AddAutoColumnRewriter {
}
} else {
return Err(DataFusionError::Plan(format!(
"Expect table have 0,1 or 2 columns more than query columns, found {} query columns {:?}, {} table columns {:?}",
query_col_cnt, node.expressions(), table_col_cnt, self.schema.column_schemas
"Expect table have 0,1 or 2 columns more than query columns, found {} query columns {:?}, {} table columns {:?} at node {:?}",
query_col_cnt, node.expressions(), table_col_cnt, self.schema.column_schemas(), node
)));
}
@@ -255,6 +342,11 @@ impl TreeNodeRewriter for AddAutoColumnRewriter {
let new_plan = node.with_new_exprs(exprs, node.inputs().into_iter().cloned().collect())?;
Ok(Transformed::yes(new_plan))
}
/// We might add new columns, so we need to recompute the schema
fn f_up(&mut self, node: Self::Node) -> DfResult<Transformed<Self::Node>> {
node.recompute_schema().map(Transformed::yes)
}
}
// TODO(discord9): a method to found out the precise time window
@@ -301,9 +393,11 @@ impl TreeNodeRewriter for AddFilterRewriter {
#[cfg(test)]
mod test {
use std::sync::Arc;
use datafusion_common::tree_node::TreeNode as _;
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::ColumnSchema;
use datatypes::schema::{ColumnSchema, Schema};
use pretty_assertions::assert_eq;
use session::context::QueryContext;
@@ -386,7 +480,7 @@ mod test {
// add update_at
(
"SELECT number FROM numbers_with_ts",
Ok("SELECT numbers_with_ts.number, now() FROM numbers_with_ts"),
Ok("SELECT numbers_with_ts.number, now() AS ts FROM numbers_with_ts"),
vec![
ColumnSchema::new("number", ConcreteDataType::int32_datatype(), true),
ColumnSchema::new(
@@ -400,7 +494,7 @@ mod test {
// add ts placeholder
(
"SELECT number FROM numbers_with_ts",
Ok("SELECT numbers_with_ts.number, 0 FROM numbers_with_ts"),
Ok("SELECT numbers_with_ts.number, 0 AS __ts_placeholder FROM numbers_with_ts"),
vec![
ColumnSchema::new("number", ConcreteDataType::int32_datatype(), true),
ColumnSchema::new(
@@ -428,7 +522,7 @@ mod test {
// add update_at and ts placeholder
(
"SELECT number FROM numbers_with_ts",
Ok("SELECT numbers_with_ts.number, now(), 0 FROM numbers_with_ts"),
Ok("SELECT numbers_with_ts.number, now() AS update_at, 0 AS __ts_placeholder FROM numbers_with_ts"),
vec![
ColumnSchema::new("number", ConcreteDataType::int32_datatype(), true),
ColumnSchema::new(
@@ -447,7 +541,7 @@ mod test {
// add ts placeholder
(
"SELECT number, ts FROM numbers_with_ts",
Ok("SELECT numbers_with_ts.number, numbers_with_ts.ts, 0 FROM numbers_with_ts"),
Ok("SELECT numbers_with_ts.number, numbers_with_ts.ts AS update_at, 0 AS __ts_placeholder FROM numbers_with_ts"),
vec![
ColumnSchema::new("number", ConcreteDataType::int32_datatype(), true),
ColumnSchema::new(
@@ -466,7 +560,7 @@ mod test {
// add update_at after time index column
(
"SELECT number, ts FROM numbers_with_ts",
Ok("SELECT numbers_with_ts.number, numbers_with_ts.ts, now() FROM numbers_with_ts"),
Ok("SELECT numbers_with_ts.number, numbers_with_ts.ts, now() AS update_atat FROM numbers_with_ts"),
vec![
ColumnSchema::new("number", ConcreteDataType::int32_datatype(), true),
ColumnSchema::new(
@@ -528,8 +622,8 @@ mod test {
let query_engine = create_test_query_engine();
let ctx = QueryContext::arc();
for (before, after, column_schemas) in testcases {
let raw_schema = RawSchema::new(column_schemas);
let mut add_auto_column_rewriter = AddAutoColumnRewriter::new(raw_schema);
let schema = Arc::new(Schema::new(column_schemas));
let mut add_auto_column_rewriter = AddAutoColumnRewriter::new(schema);
let plan = sql_to_df_plan(ctx.clone(), query_engine.clone(), before, false)
.await

View File

@@ -49,6 +49,8 @@ pub trait FlowEngine {
async fn flush_flow(&self, flow_id: FlowId) -> Result<usize, Error>;
/// Check if the flow exists
async fn flow_exist(&self, flow_id: FlowId) -> Result<bool, Error>;
/// List all flows
async fn list_flows(&self) -> Result<Vec<FlowId>, Error>;
/// Handle the insert requests for the flow
async fn handle_flow_inserts(
&self,

View File

@@ -44,7 +44,7 @@ mod utils;
mod test_utils;
pub use adapter::{FlowConfig, FlowWorkerManager, FlowWorkerManagerRef, FlownodeOptions};
pub use batching_mode::frontend_client::FrontendClient;
pub use batching_mode::frontend_client::{FrontendClient, GrpcQueryHandlerWithBoxedError};
pub(crate) use engine::{CreateFlowArgs, FlowId, TableName};
pub use error::{Error, Result};
pub use server::{

View File

@@ -50,7 +50,10 @@ use tonic::codec::CompressionEncoding;
use tonic::transport::server::TcpIncoming;
use tonic::{Request, Response, Status};
use crate::adapter::flownode_impl::{FlowDualEngine, FlowDualEngineRef};
use crate::adapter::{create_worker, FlowWorkerManagerRef};
use crate::batching_mode::engine::BatchingEngine;
use crate::engine::FlowEngine;
use crate::error::{
to_status_with_last_err, CacheRequiredSnafu, CreateFlowSnafu, ExternalSnafu, FlowNotFoundSnafu,
ListFlowsSnafu, ParseAddrSnafu, ShutdownServerSnafu, StartServerSnafu, UnexpectedSnafu,
@@ -66,12 +69,14 @@ pub const FLOW_NODE_SERVER_NAME: &str = "FLOW_NODE_SERVER";
#[derive(Clone)]
pub struct FlowService {
/// TODO(discord9): replace with dual engine
pub manager: FlowWorkerManagerRef,
pub dual_engine: FlowDualEngineRef,
}
impl FlowService {
pub fn new(manager: FlowWorkerManagerRef) -> Self {
Self { manager }
pub fn new(manager: FlowDualEngineRef) -> Self {
Self {
dual_engine: manager,
}
}
}
@@ -86,7 +91,7 @@ impl flow_server::Flow for FlowService {
.start_timer();
let request = request.into_inner();
self.manager
self.dual_engine
.handle(request)
.await
.map_err(|err| {
@@ -126,7 +131,7 @@ impl flow_server::Flow for FlowService {
.with_label_values(&["in"])
.inc_by(row_count as u64);
self.manager
self.dual_engine
.handle_inserts(request)
.await
.map(Response::new)
@@ -162,10 +167,15 @@ impl FlownodeServer {
/// Start the background task for streaming computation.
async fn start_workers(&self) -> Result<(), Error> {
let manager_ref = self.inner.flow_service.manager.clone();
let manager_ref = self.inner.flow_service.dual_engine.clone();
let _handle = manager_ref
.clone()
.streaming_engine()
.run_background(Some(self.inner.worker_shutdown_tx.lock().await.subscribe()));
self.inner
.flow_service
.dual_engine
.start_flow_consistent_check_task()
.await?;
Ok(())
}
@@ -176,6 +186,11 @@ impl FlownodeServer {
if tx.send(()).is_err() {
info!("Receiver dropped, the flow node server has already shutdown");
}
self.inner
.flow_service
.dual_engine
.stop_flow_consistent_check_task()
.await?;
Ok(())
}
}
@@ -272,8 +287,8 @@ impl FlownodeInstance {
&self.flownode_server
}
pub fn flow_worker_manager(&self) -> FlowWorkerManagerRef {
self.flownode_server.inner.flow_service.manager.clone()
pub fn flow_engine(&self) -> FlowDualEngineRef {
self.flownode_server.inner.flow_service.dual_engine.clone()
}
pub fn setup_services(&mut self, services: ServerHandlers) {
@@ -342,12 +357,21 @@ impl FlownodeBuilder {
self.build_manager(query_engine_factory.query_engine())
.await?,
);
let batching = Arc::new(BatchingEngine::new(
self.frontend_client.clone(),
query_engine_factory.query_engine(),
self.flow_metadata_manager.clone(),
self.table_meta.clone(),
self.catalog_manager.clone(),
));
let dual = FlowDualEngine::new(
manager.clone(),
batching,
self.flow_metadata_manager.clone(),
self.catalog_manager.clone(),
);
if let Err(err) = self.recover_flows(&manager).await {
common_telemetry::error!(err; "Failed to recover flows");
}
let server = FlownodeServer::new(FlowService::new(manager.clone()));
let server = FlownodeServer::new(FlowService::new(Arc::new(dual)));
let heartbeat_task = self.heartbeat_task;
@@ -364,7 +388,7 @@ impl FlownodeBuilder {
/// or recover all existing flow tasks if in standalone mode(nodeid is None)
///
/// TODO(discord9): persistent flow tasks with internal state
async fn recover_flows(&self, manager: &FlowWorkerManagerRef) -> Result<usize, Error> {
async fn recover_flows(&self, manager: &FlowDualEngine) -> Result<usize, Error> {
let nodeid = self.opts.node_id;
let to_be_recovered: Vec<_> = if let Some(nodeid) = nodeid {
let to_be_recover = self
@@ -436,7 +460,7 @@ impl FlownodeBuilder {
),
};
manager
.create_flow_inner(args)
.create_flow(args)
.await
.map_err(BoxedError::new)
.with_context(|_| CreateFlowSnafu {
@@ -543,6 +567,10 @@ impl<'a> FlownodeServiceBuilder<'a> {
}
}
/// Basically a tiny frontend that communicates with datanode, different from [`FrontendClient`] which
/// connect to a real frontend instead, this is used for flow's streaming engine. And is for simple query.
///
/// For heavy query use [`FrontendClient`] which offload computation to frontend, lifting the load from flownode
#[derive(Clone)]
pub struct FrontendInvoker {
inserter: Arc<Inserter>,

View File

@@ -15,6 +15,7 @@ api.workspace = true
arc-swap = "1.0"
async-trait.workspace = true
auth.workspace = true
bytes.workspace = true
cache.workspace = true
catalog.workspace = true
client.workspace = true

View File

@@ -345,7 +345,7 @@ pub enum Error {
SubstraitDecodeLogicalPlan {
#[snafu(implicit)]
location: Location,
source: substrait::error::Error,
source: common_query::error::Error,
},
}

View File

@@ -278,7 +278,7 @@ impl SqlQueryHandler for Instance {
// plan should be prepared before exec
// we'll do check there
self.query_engine
.execute(plan, query_ctx)
.execute(plan.clone(), query_ctx)
.await
.context(ExecLogicalPlanSnafu)
}

View File

@@ -12,29 +12,33 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use api::v1::ddl_request::{Expr as DdlExpr, Expr};
use api::v1::greptime_request::Request;
use api::v1::query_request::Query;
use api::v1::{DeleteRequests, DropFlowExpr, InsertRequests, RowDeleteRequests, RowInsertRequests};
use api::v1::{
DeleteRequests, DropFlowExpr, InsertIntoPlan, InsertRequests, RowDeleteRequests,
RowInsertRequests,
};
use async_trait::async_trait;
use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq};
use common_base::AffectedRows;
use common_query::logical_plan::add_insert_to_logical_plan;
use common_query::Output;
use common_telemetry::tracing::{self};
use datafusion::execution::SessionStateBuilder;
use query::parser::PromQuery;
use servers::interceptor::{GrpcQueryInterceptor, GrpcQueryInterceptorRef};
use servers::query_handler::grpc::{GrpcQueryHandler, RawRecordBatch};
use servers::query_handler::sql::SqlQueryHandler;
use session::context::QueryContextRef;
use snafu::{ensure, OptionExt, ResultExt};
use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan};
use table::table_name::TableName;
use crate::error::{
CatalogSnafu, Error, InFlightWriteBytesExceededSnafu, IncompleteGrpcRequestSnafu,
NotSupportedSnafu, PermissionSnafu, Result, SubstraitDecodeLogicalPlanSnafu,
TableNotFoundSnafu, TableOperationSnafu,
NotSupportedSnafu, PermissionSnafu, PlanStatementSnafu, Result,
SubstraitDecodeLogicalPlanSnafu, TableNotFoundSnafu, TableOperationSnafu,
};
use crate::instance::{attach_timer, Instance};
use crate::metrics::{
@@ -91,14 +95,31 @@ impl GrpcQueryHandler for Instance {
Query::LogicalPlan(plan) => {
// this path is useful internally when flownode needs to execute a logical plan through gRPC interface
let timer = GRPC_HANDLE_PLAN_ELAPSED.start_timer();
let plan = DFLogicalSubstraitConvertor {}
.decode(&*plan, SessionStateBuilder::default().build())
// use dummy catalog to provide table
let plan_decoder = self
.query_engine()
.engine_context(ctx.clone())
.new_plan_decoder()
.context(PlanStatementSnafu)?;
let dummy_catalog_list =
Arc::new(catalog::table_source::dummy_catalog::DummyCatalogList::new(
self.catalog_manager().clone(),
));
let logical_plan = plan_decoder
.decode(bytes::Bytes::from(plan), dummy_catalog_list, true)
.await
.context(SubstraitDecodeLogicalPlanSnafu)?;
let output = SqlQueryHandler::do_exec_plan(self, plan, ctx.clone()).await?;
let output =
SqlQueryHandler::do_exec_plan(self, logical_plan, ctx.clone()).await?;
attach_timer(output, timer)
}
Query::InsertIntoPlan(insert) => {
self.handle_insert_plan(insert, ctx.clone()).await?
}
Query::PromRangeQuery(promql) => {
let timer = GRPC_HANDLE_PROMQL_ELAPSED.start_timer();
let prom_query = PromQuery {
@@ -284,6 +305,91 @@ fn fill_catalog_and_schema_from_context(ddl_expr: &mut DdlExpr, ctx: &QueryConte
}
impl Instance {
async fn handle_insert_plan(
&self,
insert: InsertIntoPlan,
ctx: QueryContextRef,
) -> Result<Output> {
let timer = GRPC_HANDLE_PLAN_ELAPSED.start_timer();
let table_name = insert.table_name.context(IncompleteGrpcRequestSnafu {
err_msg: "'table_name' is absent in InsertIntoPlan",
})?;
// use dummy catalog to provide table
let plan_decoder = self
.query_engine()
.engine_context(ctx.clone())
.new_plan_decoder()
.context(PlanStatementSnafu)?;
let dummy_catalog_list =
Arc::new(catalog::table_source::dummy_catalog::DummyCatalogList::new(
self.catalog_manager().clone(),
));
// no optimize yet since we still need to add stuff
let logical_plan = plan_decoder
.decode(
bytes::Bytes::from(insert.logical_plan),
dummy_catalog_list,
false,
)
.await
.context(SubstraitDecodeLogicalPlanSnafu)?;
let table = self
.catalog_manager()
.table(
&table_name.catalog_name,
&table_name.schema_name,
&table_name.table_name,
None,
)
.await
.context(CatalogSnafu)?
.with_context(|| TableNotFoundSnafu {
table_name: [
table_name.catalog_name.clone(),
table_name.schema_name.clone(),
table_name.table_name.clone(),
]
.join("."),
})?;
let table_info = table.table_info();
let df_schema = Arc::new(
table_info
.meta
.schema
.arrow_schema()
.clone()
.try_into()
.unwrap(),
);
let insert_into = add_insert_to_logical_plan(table_name, df_schema, logical_plan)
.context(SubstraitDecodeLogicalPlanSnafu)?;
let engine_ctx = self.query_engine().engine_context(ctx.clone());
let state = engine_ctx.state();
// Analyze the plan
let analyzed_plan = state
.analyzer()
.execute_and_check(insert_into, state.config_options(), |_, _| {})
.context(common_query::error::GeneralDataFusionSnafu)
.context(SubstraitDecodeLogicalPlanSnafu)?;
// Optimize the plan
let optimized_plan = state
.optimize(&analyzed_plan)
.context(common_query::error::GeneralDataFusionSnafu)
.context(SubstraitDecodeLogicalPlanSnafu)?;
let output = SqlQueryHandler::do_exec_plan(self, optimized_plan, ctx.clone()).await?;
Ok(attach_timer(output, timer))
}
#[tracing::instrument(skip_all)]
pub async fn handle_inserts(
&self,

View File

@@ -26,6 +26,7 @@ use common_catalog::consts::{is_readonly_schema, DEFAULT_CATALOG_NAME, DEFAULT_S
use common_catalog::{format_full_flow_name, format_full_table_name};
use common_error::ext::BoxedError;
use common_meta::cache_invalidator::Context;
use common_meta::ddl::create_flow::FlowType;
use common_meta::ddl::ExecutorContext;
use common_meta::instruction::CacheIdent;
use common_meta::key::schema_name::{SchemaName, SchemaNameKey};
@@ -38,6 +39,8 @@ use common_meta::rpc::router::{Partition, Partition as MetaPartition};
use common_query::Output;
use common_telemetry::{debug, info, tracing};
use common_time::Timezone;
use datafusion_common::tree_node::TreeNodeVisitor;
use datafusion_expr::LogicalPlan;
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::{RawSchema, Schema};
use datatypes::value::Value;
@@ -45,10 +48,11 @@ use lazy_static::lazy_static;
use partition::expr::{Operand, PartitionExpr, RestrictedOp};
use partition::multi_dim::MultiDimPartitionRule;
use partition::partition::{PartitionBound, PartitionDef};
use query::parser::QueryStatement;
use query::parser::{QueryLanguageParser, QueryStatement};
use query::plan::extract_and_rewrite_full_table_names;
use query::query_engine::DefaultSerializer;
use query::sql::create_table_stmt;
use query::QueryEngineRef;
use regex::Regex;
use session::context::QueryContextRef;
use session::table_name::table_idents_to_full_name;
@@ -69,13 +73,14 @@ use table::table_name::TableName;
use table::TableRef;
use crate::error::{
self, AlterExprToRequestSnafu, CatalogSnafu, ColumnDataTypeSnafu, ColumnNotFoundSnafu,
ConvertSchemaSnafu, CreateLogicalTablesSnafu, CreateTableInfoSnafu, DeserializePartitionSnafu,
EmptyDdlExprSnafu, ExtractTableNamesSnafu, FlowNotFoundSnafu, InvalidPartitionRuleSnafu,
InvalidPartitionSnafu, InvalidSqlSnafu, InvalidTableNameSnafu, InvalidViewNameSnafu,
InvalidViewStmtSnafu, ParseSqlValueSnafu, Result, SchemaInUseSnafu, SchemaNotFoundSnafu,
SchemaReadOnlySnafu, SubstraitCodecSnafu, TableAlreadyExistsSnafu, TableMetadataManagerSnafu,
TableNotFoundSnafu, UnrecognizedTableOptionSnafu, ViewAlreadyExistsSnafu,
self, AlterExprToRequestSnafu, BuildDfLogicalPlanSnafu, CatalogSnafu, ColumnDataTypeSnafu,
ColumnNotFoundSnafu, ConvertSchemaSnafu, CreateLogicalTablesSnafu, CreateTableInfoSnafu,
DeserializePartitionSnafu, EmptyDdlExprSnafu, ExternalSnafu, ExtractTableNamesSnafu,
FlowNotFoundSnafu, InvalidPartitionRuleSnafu, InvalidPartitionSnafu, InvalidSqlSnafu,
InvalidTableNameSnafu, InvalidViewNameSnafu, InvalidViewStmtSnafu, ParseSqlValueSnafu, Result,
SchemaInUseSnafu, SchemaNotFoundSnafu, SchemaReadOnlySnafu, SubstraitCodecSnafu,
TableAlreadyExistsSnafu, TableMetadataManagerSnafu, TableNotFoundSnafu,
UnrecognizedTableOptionSnafu, ViewAlreadyExistsSnafu,
};
use crate::expr_helper;
use crate::statement::show::create_partitions_stmt;
@@ -364,6 +369,69 @@ impl StatementExecutor {
expr: CreateFlowExpr,
query_context: QueryContextRef,
) -> Result<SubmitDdlTaskResponse> {
async fn sql_to_df_plan(
query_ctx: QueryContextRef,
engine: QueryEngineRef,
sql: &str,
) -> Result<LogicalPlan> {
let stmt = QueryLanguageParser::parse_sql(sql, &query_ctx)
.map_err(BoxedError::new)
.context(ExternalSnafu)?;
let plan = engine
.planner()
.plan(&stmt, query_ctx)
.await
.map_err(BoxedError::new)
.context(ExternalSnafu)?;
Ok(plan)
}
async fn determine_flow_type(plan: &LogicalPlan) -> Result<FlowType> {
pub struct FindAggr {
is_aggr: bool,
}
impl TreeNodeVisitor<'_> for FindAggr {
type Node = LogicalPlan;
fn f_down(
&mut self,
node: &Self::Node,
) -> datafusion_common::Result<datafusion_common::tree_node::TreeNodeRecursion>
{
match node {
LogicalPlan::Aggregate(_) | LogicalPlan::Distinct(_) => {
self.is_aggr = true;
return Ok(datafusion_common::tree_node::TreeNodeRecursion::Stop);
}
_ => (),
}
Ok(datafusion_common::tree_node::TreeNodeRecursion::Continue)
}
}
let mut find_aggr = FindAggr { is_aggr: false };
plan.visit_with_subqueries(&mut find_aggr)
.context(BuildDfLogicalPlanSnafu)?;
if find_aggr.is_aggr {
Ok(FlowType::Batching)
} else {
Ok(FlowType::Streaming)
}
}
let plan =
sql_to_df_plan(query_context.clone(), self.query_engine.clone(), &expr.sql).await?;
let flow_type = determine_flow_type(&plan).await?;
info!("determined flow={} type: {:#?}", expr.flow_name, flow_type);
let expr = {
let mut expr = expr;
expr.flow_options
.insert(FlowType::FLOW_TYPE_KEY.to_string(), flow_type.to_string());
expr
};
let task = CreateFlowTask::try_from(PbCreateFlowTask {
create_flow: Some(expr),
})

View File

@@ -132,7 +132,7 @@ impl GrpcQueryHandler for DummyInstance {
);
result.remove(0)?
}
Query::LogicalPlan(_) => unimplemented!(),
Query::LogicalPlan(_) | Query::InsertIntoPlan(_) => unimplemented!(),
Query::PromRangeQuery(promql) => {
let prom_query = PromQuery {
query: promql.query,

View File

@@ -41,7 +41,7 @@ use common_procedure::options::ProcedureConfig;
use common_procedure::ProcedureManagerRef;
use common_wal::config::{DatanodeWalConfig, MetasrvWalConfig};
use datanode::datanode::DatanodeBuilder;
use flow::{FlownodeBuilder, FrontendClient};
use flow::{FlownodeBuilder, FrontendClient, GrpcQueryHandlerWithBoxedError};
use frontend::frontend::Frontend;
use frontend::instance::builder::FrontendBuilder;
use frontend::instance::{Instance, StandaloneDatanodeManager};
@@ -174,8 +174,8 @@ impl GreptimeDbStandaloneBuilder {
Some(procedure_manager.clone()),
);
let fe_server_addr = opts.frontend_options().grpc.bind_addr.clone();
let frontend_client = FrontendClient::from_static_grpc_addr(fe_server_addr);
let (frontend_client, frontend_instance_handler) =
FrontendClient::from_empty_grpc_handler();
let flow_builder = FlownodeBuilder::new(
Default::default(),
plugins.clone(),
@@ -188,7 +188,7 @@ impl GreptimeDbStandaloneBuilder {
let node_manager = Arc::new(StandaloneDatanodeManager {
region_server: datanode.region_server(),
flow_server: flownode.flow_worker_manager(),
flow_server: flownode.flow_engine(),
});
let table_id_sequence = Arc::new(
@@ -250,7 +250,15 @@ impl GreptimeDbStandaloneBuilder {
.unwrap();
let instance = Arc::new(instance);
let flow_worker_manager = flownode.flow_worker_manager();
// set the frontend client for flownode
let grpc_handler = instance.clone() as Arc<dyn GrpcQueryHandlerWithBoxedError>;
let weak_grpc_handler = Arc::downgrade(&grpc_handler);
frontend_instance_handler
.lock()
.unwrap()
.replace(weak_grpc_handler);
let flow_worker_manager = flownode.flow_engine().streaming_engine();
let invoker = flow::FrontendInvoker::build_from(
flow_worker_manager.clone(),
catalog_manager.clone(),

View File

@@ -8,6 +8,20 @@ CREATE TABLE distinct_basic (
Affected Rows: 0
-- should fail
-- SQLNESS REPLACE id=\d+ id=REDACTED
CREATE FLOW test_distinct_basic SINK TO out_distinct_basic AS
SELECT
DISTINCT number as dis
FROM
distinct_basic;
Error: 3001(EngineExecuteQuery), Invalid query: Source table `greptime.public.distinct_basic`(id=REDACTED) has instant TTL, flow will only evaluate to empty results with such table, use a small ttl instead of instant
ALTER TABLE distinct_basic SET 'ttl' = '5s';
Affected Rows: 0
CREATE FLOW test_distinct_basic SINK TO out_distinct_basic AS
SELECT
DISTINCT number as dis
@@ -24,7 +38,7 @@ VALUES
(20, "2021-07-01 00:00:00.200"),
(22, "2021-07-01 00:00:00.600");
Affected Rows: 0
Affected Rows: 3
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('test_distinct_basic');
@@ -49,7 +63,7 @@ SHOW CREATE TABLE distinct_basic;
| | |
| | ENGINE=mito |
| | WITH( |
| | ttl = 'instant' |
| | ttl = '5s' |
| | ) |
+----------------+-----------------------------------------------------------+
@@ -84,8 +98,93 @@ FROM
SELECT number FROM distinct_basic;
++
++
+--------+
| number |
+--------+
| 20 |
| 22 |
+--------+
-- SQLNESS SLEEP 6s
ADMIN FLUSH_TABLE('distinct_basic');
+-------------------------------------+
| ADMIN FLUSH_TABLE('distinct_basic') |
+-------------------------------------+
| 0 |
+-------------------------------------+
INSERT INTO
distinct_basic
VALUES
(23, "2021-07-01 00:00:01.600");
Affected Rows: 1
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('test_distinct_basic');
+-----------------------------------------+
| ADMIN FLUSH_FLOW('test_distinct_basic') |
+-----------------------------------------+
| FLOW_FLUSHED |
+-----------------------------------------+
SHOW CREATE TABLE distinct_basic;
+----------------+-----------------------------------------------------------+
| Table | Create Table |
+----------------+-----------------------------------------------------------+
| distinct_basic | CREATE TABLE IF NOT EXISTS "distinct_basic" ( |
| | "number" INT NULL, |
| | "ts" TIMESTAMP(3) NOT NULL DEFAULT current_timestamp(), |
| | TIME INDEX ("ts"), |
| | PRIMARY KEY ("number") |
| | ) |
| | |
| | ENGINE=mito |
| | WITH( |
| | ttl = '5s' |
| | ) |
+----------------+-----------------------------------------------------------+
SHOW CREATE TABLE out_distinct_basic;
+--------------------+---------------------------------------------------+
| Table | Create Table |
+--------------------+---------------------------------------------------+
| out_distinct_basic | CREATE TABLE IF NOT EXISTS "out_distinct_basic" ( |
| | "dis" INT NULL, |
| | "update_at" TIMESTAMP(3) NULL, |
| | "__ts_placeholder" TIMESTAMP(3) NOT NULL, |
| | TIME INDEX ("__ts_placeholder"), |
| | PRIMARY KEY ("dis") |
| | ) |
| | |
| | ENGINE=mito |
| | |
+--------------------+---------------------------------------------------+
SELECT
dis
FROM
out_distinct_basic;
+-----+
| dis |
+-----+
| 20 |
| 22 |
| 23 |
+-----+
SELECT number FROM distinct_basic;
+--------+
| number |
+--------+
| 23 |
+--------+
DROP FLOW test_distinct_basic;

View File

@@ -6,6 +6,16 @@ CREATE TABLE distinct_basic (
TIME INDEX(ts)
)WITH ('ttl' = 'instant');
-- should fail
-- SQLNESS REPLACE id=\d+ id=REDACTED
CREATE FLOW test_distinct_basic SINK TO out_distinct_basic AS
SELECT
DISTINCT number as dis
FROM
distinct_basic;
ALTER TABLE distinct_basic SET 'ttl' = '5s';
CREATE FLOW test_distinct_basic SINK TO out_distinct_basic AS
SELECT
DISTINCT number as dis
@@ -34,6 +44,28 @@ FROM
SELECT number FROM distinct_basic;
-- SQLNESS SLEEP 6s
ADMIN FLUSH_TABLE('distinct_basic');
INSERT INTO
distinct_basic
VALUES
(23, "2021-07-01 00:00:01.600");
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('test_distinct_basic');
SHOW CREATE TABLE distinct_basic;
SHOW CREATE TABLE out_distinct_basic;
SELECT
dis
FROM
out_distinct_basic;
SELECT number FROM distinct_basic;
DROP FLOW test_distinct_basic;
DROP TABLE distinct_basic;
DROP TABLE out_distinct_basic;

View File

@@ -9,11 +9,12 @@ Affected Rows: 0
CREATE FLOW test_numbers_basic SINK TO out_num_cnt_basic AS
SELECT
sum(number)
sum(number),
date_bin(INTERVAL '1 second', ts, '2021-07-01 00:00:00') as time_window
FROM
numbers_input_basic
GROUP BY
tumble(ts, '1 second', '2021-07-01 00:00:00');
time_window;
Affected Rows: 0
@@ -24,11 +25,9 @@ SHOW CREATE TABLE out_num_cnt_basic;
+-------------------+--------------------------------------------------+
| out_num_cnt_basic | CREATE TABLE IF NOT EXISTS "out_num_cnt_basic" ( |
| | "sum(numbers_input_basic.number)" BIGINT NULL, |
| | "window_start" TIMESTAMP(3) NOT NULL, |
| | "window_end" TIMESTAMP(3) NULL, |
| | "time_window" TIMESTAMP(9) NOT NULL, |
| | "update_at" TIMESTAMP(3) NULL, |
| | TIME INDEX ("window_start"), |
| | PRIMARY KEY ("window_end") |
| | TIME INDEX ("time_window") |
| | ) |
| | |
| | ENGINE=mito |
@@ -52,11 +51,9 @@ SHOW CREATE TABLE out_num_cnt_basic;
+-------------------+--------------------------------------------------+
| out_num_cnt_basic | CREATE TABLE IF NOT EXISTS "out_num_cnt_basic" ( |
| | "sum(numbers_input_basic.number)" BIGINT NULL, |
| | "window_start" TIMESTAMP(3) NOT NULL, |
| | "window_end" TIMESTAMP(3) NULL, |
| | "time_window" TIMESTAMP(9) NOT NULL, |
| | "update_at" TIMESTAMP(3) NULL, |
| | TIME INDEX ("window_start"), |
| | PRIMARY KEY ("window_end") |
| | TIME INDEX ("time_window") |
| | ) |
| | |
| | ENGINE=mito |
@@ -65,13 +62,13 @@ SHOW CREATE TABLE out_num_cnt_basic;
SHOW CREATE FLOW test_numbers_basic;
+--------------------+-------------------------------------------------------------------------------------------------------+
| Flow | Create Flow |
+--------------------+-------------------------------------------------------------------------------------------------------+
| test_numbers_basic | CREATE FLOW IF NOT EXISTS test_numbers_basic |
| | SINK TO out_num_cnt_basic |
| | AS SELECT sum(number) FROM numbers_input_basic GROUP BY tumble(ts, '1 second', '2021-07-01 00:00:00') |
+--------------------+-------------------------------------------------------------------------------------------------------+
+--------------------+----------------------------------------------------------------------------------------------------------------------------------------------+
| Flow | Create Flow |
+--------------------+----------------------------------------------------------------------------------------------------------------------------------------------+
| test_numbers_basic | CREATE FLOW IF NOT EXISTS test_numbers_basic |
| | SINK TO out_num_cnt_basic |
| | AS SELECT sum(number), date_bin(INTERVAL '1 second', ts, '2021-07-01 00:00:00') AS time_window FROM numbers_input_basic GROUP BY time_window |
+--------------------+----------------------------------------------------------------------------------------------------------------------------------------------+
DROP FLOW test_numbers_basic;

View File

@@ -7,11 +7,12 @@ CREATE TABLE numbers_input_basic (
CREATE FLOW test_numbers_basic SINK TO out_num_cnt_basic AS
SELECT
sum(number)
sum(number),
date_bin(INTERVAL '1 second', ts, '2021-07-01 00:00:00') as time_window
FROM
numbers_input_basic
GROUP BY
tumble(ts, '1 second', '2021-07-01 00:00:00');
time_window;
SHOW CREATE TABLE out_num_cnt_basic;

View File

@@ -9,11 +9,12 @@ Affected Rows: 0
CREATE FLOW test_numbers_basic SINK TO out_num_cnt_basic AS
SELECT
sum(number)
sum(number),
date_bin(INTERVAL '1 second', ts, '2021-07-01 00:00:00') as time_window
FROM
numbers_input_basic
GROUP BY
tumble(ts, '1 second', '2021-07-01 00:00:00');
time_window;
Affected Rows: 0
@@ -24,11 +25,9 @@ SHOW CREATE TABLE out_num_cnt_basic;
+-------------------+--------------------------------------------------+
| out_num_cnt_basic | CREATE TABLE IF NOT EXISTS "out_num_cnt_basic" ( |
| | "sum(numbers_input_basic.number)" BIGINT NULL, |
| | "window_start" TIMESTAMP(3) NOT NULL, |
| | "window_end" TIMESTAMP(3) NULL, |
| | "time_window" TIMESTAMP(9) NOT NULL, |
| | "update_at" TIMESTAMP(3) NULL, |
| | TIME INDEX ("window_start"), |
| | PRIMARY KEY ("window_end") |
| | TIME INDEX ("time_window") |
| | ) |
| | |
| | ENGINE=mito |
@@ -53,11 +52,9 @@ SHOW CREATE TABLE out_num_cnt_basic;
+-------------------+--------------------------------------------------+
| out_num_cnt_basic | CREATE TABLE IF NOT EXISTS "out_num_cnt_basic" ( |
| | "sum(numbers_input_basic.number)" BIGINT NULL, |
| | "window_start" TIMESTAMP(3) NOT NULL, |
| | "window_end" TIMESTAMP(3) NULL, |
| | "time_window" TIMESTAMP(9) NOT NULL, |
| | "update_at" TIMESTAMP(3) NULL, |
| | TIME INDEX ("window_start"), |
| | PRIMARY KEY ("window_end") |
| | TIME INDEX ("time_window") |
| | ) |
| | |
| | ENGINE=mito |
@@ -84,16 +81,15 @@ ADMIN FLUSH_FLOW('test_numbers_basic');
SELECT
"sum(numbers_input_basic.number)",
window_start,
window_end
time_window
FROM
out_num_cnt_basic;
+---------------------------------+---------------------+---------------------+
| sum(numbers_input_basic.number) | window_start | window_end |
+---------------------------------+---------------------+---------------------+
| 42 | 2021-07-01T00:00:00 | 2021-07-01T00:00:01 |
+---------------------------------+---------------------+---------------------+
+---------------------------------+---------------------+
| sum(numbers_input_basic.number) | time_window |
+---------------------------------+---------------------+
| 42 | 2021-07-01T00:00:00 |
+---------------------------------+---------------------+
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('test_numbers_basic');
@@ -124,17 +120,16 @@ ADMIN FLUSH_FLOW('test_numbers_basic');
-- note that this quote-unquote column is a column-name, **not** a aggregation expr, generated by datafusion
SELECT
"sum(numbers_input_basic.number)",
window_start,
window_end
time_window
FROM
out_num_cnt_basic;
+---------------------------------+---------------------+---------------------+
| sum(numbers_input_basic.number) | window_start | window_end |
+---------------------------------+---------------------+---------------------+
| 42 | 2021-07-01T00:00:00 | 2021-07-01T00:00:01 |
| 47 | 2021-07-01T00:00:01 | 2021-07-01T00:00:02 |
+---------------------------------+---------------------+---------------------+
+---------------------------------+---------------------+
| sum(numbers_input_basic.number) | time_window |
+---------------------------------+---------------------+
| 42 | 2021-07-01T00:00:00 |
| 47 | 2021-07-01T00:00:01 |
+---------------------------------+---------------------+
DROP FLOW test_numbers_basic;
@@ -896,6 +891,8 @@ CREATE TABLE temp_sensor_data (
loc STRING,
temperature DOUBLE,
ts TIMESTAMP TIME INDEX
)WITH(
append_mode = 'true'
);
Affected Rows: 0
@@ -904,7 +901,8 @@ CREATE TABLE temp_alerts (
sensor_id INT,
loc STRING,
max_temp DOUBLE,
ts TIMESTAMP TIME INDEX
event_ts TIMESTAMP TIME INDEX,
update_at TIMESTAMP
);
Affected Rows: 0
@@ -914,6 +912,7 @@ SELECT
sensor_id,
loc,
max(temperature) as max_temp,
max(ts) as event_ts
FROM
temp_sensor_data
GROUP BY
@@ -933,8 +932,9 @@ SHOW CREATE TABLE temp_alerts;
| | "sensor_id" INT NULL, |
| | "loc" STRING NULL, |
| | "max_temp" DOUBLE NULL, |
| | "ts" TIMESTAMP(3) NOT NULL, |
| | TIME INDEX ("ts") |
| | "event_ts" TIMESTAMP(3) NOT NULL, |
| | "update_at" TIMESTAMP(3) NULL, |
| | TIME INDEX ("event_ts") |
| | ) |
| | |
| | ENGINE=mito |
@@ -993,15 +993,16 @@ SHOW TABLES LIKE 'temp_alerts';
SELECT
sensor_id,
loc,
max_temp
max_temp,
event_ts
FROM
temp_alerts;
+-----------+-------+----------+
| sensor_id | loc | max_temp |
+-----------+-------+----------+
| 1 | room1 | 150.0 |
+-----------+-------+----------+
+-----------+-------+----------+-------------------------+
| sensor_id | loc | max_temp | event_ts |
+-----------+-------+----------+-------------------------+
| 1 | room1 | 150.0 | 1970-01-01T00:00:00.001 |
+-----------+-------+----------+-------------------------+
INSERT INTO
temp_sensor_data
@@ -1022,15 +1023,16 @@ ADMIN FLUSH_FLOW('temp_monitoring');
SELECT
sensor_id,
loc,
max_temp
max_temp,
event_ts
FROM
temp_alerts;
+-----------+-------+----------+
| sensor_id | loc | max_temp |
+-----------+-------+----------+
| 1 | room1 | 150.0 |
+-----------+-------+----------+
+-----------+-------+----------+-------------------------+
| sensor_id | loc | max_temp | event_ts |
+-----------+-------+----------+-------------------------+
| 1 | room1 | 150.0 | 1970-01-01T00:00:00.001 |
+-----------+-------+----------+-------------------------+
DROP FLOW temp_monitoring;
@@ -1049,6 +1051,8 @@ CREATE TABLE ngx_access_log (
stat INT,
size INT,
access_time TIMESTAMP TIME INDEX
)WITH(
append_mode = 'true'
);
Affected Rows: 0
@@ -1183,6 +1187,8 @@ CREATE TABLE requests (
service_ip STRING,
val INT,
ts TIMESTAMP TIME INDEX
)WITH(
append_mode = 'true'
);
Affected Rows: 0
@@ -1392,6 +1398,8 @@ CREATE TABLE android_log (
`log` STRING,
ts TIMESTAMP(9),
TIME INDEX(ts)
)WITH(
append_mode = 'true'
);
Affected Rows: 0
@@ -1503,6 +1511,8 @@ CREATE TABLE android_log (
`log` STRING,
ts TIMESTAMP(9),
TIME INDEX(ts)
)WITH(
append_mode = 'true'
);
Affected Rows: 0

View File

@@ -7,11 +7,12 @@ CREATE TABLE numbers_input_basic (
CREATE FLOW test_numbers_basic SINK TO out_num_cnt_basic AS
SELECT
sum(number)
sum(number),
date_bin(INTERVAL '1 second', ts, '2021-07-01 00:00:00') as time_window
FROM
numbers_input_basic
GROUP BY
tumble(ts, '1 second', '2021-07-01 00:00:00');
time_window;
SHOW CREATE TABLE out_num_cnt_basic;
@@ -34,8 +35,7 @@ ADMIN FLUSH_FLOW('test_numbers_basic');
SELECT
"sum(numbers_input_basic.number)",
window_start,
window_end
time_window
FROM
out_num_cnt_basic;
@@ -54,8 +54,7 @@ ADMIN FLUSH_FLOW('test_numbers_basic');
-- note that this quote-unquote column is a column-name, **not** a aggregation expr, generated by datafusion
SELECT
"sum(numbers_input_basic.number)",
window_start,
window_end
time_window
FROM
out_num_cnt_basic;
@@ -403,13 +402,16 @@ CREATE TABLE temp_sensor_data (
loc STRING,
temperature DOUBLE,
ts TIMESTAMP TIME INDEX
)WITH(
append_mode = 'true'
);
CREATE TABLE temp_alerts (
sensor_id INT,
loc STRING,
max_temp DOUBLE,
ts TIMESTAMP TIME INDEX
event_ts TIMESTAMP TIME INDEX,
update_at TIMESTAMP
);
CREATE FLOW temp_monitoring SINK TO temp_alerts AS
@@ -417,6 +419,7 @@ SELECT
sensor_id,
loc,
max(temperature) as max_temp,
max(ts) as event_ts
FROM
temp_sensor_data
GROUP BY
@@ -451,7 +454,8 @@ SHOW TABLES LIKE 'temp_alerts';
SELECT
sensor_id,
loc,
max_temp
max_temp,
event_ts
FROM
temp_alerts;
@@ -466,7 +470,8 @@ ADMIN FLUSH_FLOW('temp_monitoring');
SELECT
sensor_id,
loc,
max_temp
max_temp,
event_ts
FROM
temp_alerts;
@@ -481,6 +486,8 @@ CREATE TABLE ngx_access_log (
stat INT,
size INT,
access_time TIMESTAMP TIME INDEX
)WITH(
append_mode = 'true'
);
CREATE TABLE ngx_distribution (
@@ -555,6 +562,8 @@ CREATE TABLE requests (
service_ip STRING,
val INT,
ts TIMESTAMP TIME INDEX
)WITH(
append_mode = 'true'
);
CREATE TABLE requests_without_ip (
@@ -650,6 +659,8 @@ CREATE TABLE android_log (
`log` STRING,
ts TIMESTAMP(9),
TIME INDEX(ts)
)WITH(
append_mode = 'true'
);
CREATE TABLE android_log_abnormal (
@@ -704,6 +715,8 @@ CREATE TABLE android_log (
`log` STRING,
ts TIMESTAMP(9),
TIME INDEX(ts)
)WITH(
append_mode = 'true'
);
CREATE TABLE android_log_abnormal (

View File

@@ -19,7 +19,9 @@ Affected Rows: 0
CREATE FLOW calc_avg_speed SINK TO avg_speed AS
SELECT
avg((left_wheel + right_wheel) / 2)
avg((left_wheel + right_wheel) / 2) as avg_speed,
date_bin(INTERVAL '5 second', ts) as start_window,
date_bin(INTERVAL '5 second', ts) + INTERVAL '5 second' as end_window,
FROM
velocity
WHERE
@@ -28,7 +30,7 @@ WHERE
AND left_wheel < 60
AND right_wheel < 60
GROUP BY
tumble(ts, '5 second');
start_window;
Affected Rows: 0

View File

@@ -15,7 +15,9 @@ CREATE TABLE avg_speed (
CREATE FLOW calc_avg_speed SINK TO avg_speed AS
SELECT
avg((left_wheel + right_wheel) / 2)
avg((left_wheel + right_wheel) / 2) as avg_speed,
date_bin(INTERVAL '5 second', ts) as start_window,
date_bin(INTERVAL '5 second', ts) + INTERVAL '5 second' as end_window,
FROM
velocity
WHERE
@@ -24,7 +26,7 @@ WHERE
AND left_wheel < 60
AND right_wheel < 60
GROUP BY
tumble(ts, '5 second');
start_window;
INSERT INTO
velocity

View File

@@ -11,7 +11,7 @@ Affected Rows: 0
CREATE FLOW test_numbers_df_func
SINK TO out_num_cnt_df_func
AS
SELECT sum(abs(number)) FROM numbers_input_df_func GROUP BY tumble(ts, '1 second', '2021-07-01 00:00:00');
SELECT sum(abs(number)), date_bin(INTERVAL '1 second', ts, '2021-07-01 00:00:00') as time_window FROM numbers_input_df_func GROUP BY time_window;
Affected Rows: 0
@@ -42,13 +42,13 @@ ADMIN FLUSH_FLOW('test_numbers_df_func');
+------------------------------------------+
-- note that this quote-unquote column is a column-name, **not** a aggregation expr, generated by datafusion
SELECT "sum(abs(numbers_input_df_func.number))", window_start, window_end FROM out_num_cnt_df_func;
SELECT "sum(abs(numbers_input_df_func.number))", time_window FROM out_num_cnt_df_func;
+----------------------------------------+---------------------+---------------------+
| sum(abs(numbers_input_df_func.number)) | window_start | window_end |
+----------------------------------------+---------------------+---------------------+
| 42 | 2021-07-01T00:00:00 | 2021-07-01T00:00:01 |
+----------------------------------------+---------------------+---------------------+
+----------------------------------------+---------------------+
| sum(abs(numbers_input_df_func.number)) | time_window |
+----------------------------------------+---------------------+
| 42 | 2021-07-01T00:00:00 |
+----------------------------------------+---------------------+
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('test_numbers_df_func');
@@ -76,14 +76,14 @@ ADMIN FLUSH_FLOW('test_numbers_df_func');
+------------------------------------------+
-- note that this quote-unquote column is a column-name, **not** a aggregation expr, generated by datafusion
SELECT "sum(abs(numbers_input_df_func.number))", window_start, window_end FROM out_num_cnt_df_func;
SELECT "sum(abs(numbers_input_df_func.number))", time_window FROM out_num_cnt_df_func;
+----------------------------------------+---------------------+---------------------+
| sum(abs(numbers_input_df_func.number)) | window_start | window_end |
+----------------------------------------+---------------------+---------------------+
| 42 | 2021-07-01T00:00:00 | 2021-07-01T00:00:01 |
| 47 | 2021-07-01T00:00:01 | 2021-07-01T00:00:02 |
+----------------------------------------+---------------------+---------------------+
+----------------------------------------+---------------------+
| sum(abs(numbers_input_df_func.number)) | time_window |
+----------------------------------------+---------------------+
| 42 | 2021-07-01T00:00:00 |
| 47 | 2021-07-01T00:00:01 |
+----------------------------------------+---------------------+
DROP FLOW test_numbers_df_func;
@@ -110,7 +110,7 @@ Affected Rows: 0
CREATE FLOW test_numbers_df_func
SINK TO out_num_cnt_df_func
AS
SELECT abs(sum(number)) FROM numbers_input_df_func GROUP BY tumble(ts, '1 second', '2021-07-01 00:00:00');
SELECT abs(sum(number)), date_bin(INTERVAL '1 second', ts, '2021-07-01 00:00:00') as time_window FROM numbers_input_df_func GROUP BY time_window;
Affected Rows: 0
@@ -140,13 +140,13 @@ ADMIN FLUSH_FLOW('test_numbers_df_func');
| FLOW_FLUSHED |
+------------------------------------------+
SELECT "abs(sum(numbers_input_df_func.number))", window_start, window_end FROM out_num_cnt_df_func;
SELECT "abs(sum(numbers_input_df_func.number))", time_window FROM out_num_cnt_df_func;
+----------------------------------------+---------------------+---------------------+
| abs(sum(numbers_input_df_func.number)) | window_start | window_end |
+----------------------------------------+---------------------+---------------------+
| 2 | 2021-07-01T00:00:00 | 2021-07-01T00:00:01 |
+----------------------------------------+---------------------+---------------------+
+----------------------------------------+---------------------+
| abs(sum(numbers_input_df_func.number)) | time_window |
+----------------------------------------+---------------------+
| 2 | 2021-07-01T00:00:00 |
+----------------------------------------+---------------------+
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('test_numbers_df_func');
@@ -173,14 +173,14 @@ ADMIN FLUSH_FLOW('test_numbers_df_func');
| FLOW_FLUSHED |
+------------------------------------------+
SELECT "abs(sum(numbers_input_df_func.number))", window_start, window_end FROM out_num_cnt_df_func;
SELECT "abs(sum(numbers_input_df_func.number))", time_window FROM out_num_cnt_df_func;
+----------------------------------------+---------------------+---------------------+
| abs(sum(numbers_input_df_func.number)) | window_start | window_end |
+----------------------------------------+---------------------+---------------------+
| 2 | 2021-07-01T00:00:00 | 2021-07-01T00:00:01 |
| 1 | 2021-07-01T00:00:01 | 2021-07-01T00:00:02 |
+----------------------------------------+---------------------+---------------------+
+----------------------------------------+---------------------+
| abs(sum(numbers_input_df_func.number)) | time_window |
+----------------------------------------+---------------------+
| 2 | 2021-07-01T00:00:00 |
| 1 | 2021-07-01T00:00:01 |
+----------------------------------------+---------------------+
DROP FLOW test_numbers_df_func;

View File

@@ -9,7 +9,7 @@ CREATE TABLE numbers_input_df_func (
CREATE FLOW test_numbers_df_func
SINK TO out_num_cnt_df_func
AS
SELECT sum(abs(number)) FROM numbers_input_df_func GROUP BY tumble(ts, '1 second', '2021-07-01 00:00:00');
SELECT sum(abs(number)), date_bin(INTERVAL '1 second', ts, '2021-07-01 00:00:00') as time_window FROM numbers_input_df_func GROUP BY time_window;
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('test_numbers_df_func');
@@ -24,7 +24,7 @@ VALUES
ADMIN FLUSH_FLOW('test_numbers_df_func');
-- note that this quote-unquote column is a column-name, **not** a aggregation expr, generated by datafusion
SELECT "sum(abs(numbers_input_df_func.number))", window_start, window_end FROM out_num_cnt_df_func;
SELECT "sum(abs(numbers_input_df_func.number))", time_window FROM out_num_cnt_df_func;
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('test_numbers_df_func');
@@ -38,7 +38,7 @@ VALUES
ADMIN FLUSH_FLOW('test_numbers_df_func');
-- note that this quote-unquote column is a column-name, **not** a aggregation expr, generated by datafusion
SELECT "sum(abs(numbers_input_df_func.number))", window_start, window_end FROM out_num_cnt_df_func;
SELECT "sum(abs(numbers_input_df_func.number))", time_window FROM out_num_cnt_df_func;
DROP FLOW test_numbers_df_func;
DROP TABLE numbers_input_df_func;
@@ -55,7 +55,7 @@ CREATE TABLE numbers_input_df_func (
CREATE FLOW test_numbers_df_func
SINK TO out_num_cnt_df_func
AS
SELECT abs(sum(number)) FROM numbers_input_df_func GROUP BY tumble(ts, '1 second', '2021-07-01 00:00:00');
SELECT abs(sum(number)), date_bin(INTERVAL '1 second', ts, '2021-07-01 00:00:00') as time_window FROM numbers_input_df_func GROUP BY time_window;
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('test_numbers_df_func');
@@ -69,7 +69,7 @@ VALUES
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('test_numbers_df_func');
SELECT "abs(sum(numbers_input_df_func.number))", window_start, window_end FROM out_num_cnt_df_func;
SELECT "abs(sum(numbers_input_df_func.number))", time_window FROM out_num_cnt_df_func;
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('test_numbers_df_func');
@@ -82,7 +82,7 @@ VALUES
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('test_numbers_df_func');
SELECT "abs(sum(numbers_input_df_func.number))", window_start, window_end FROM out_num_cnt_df_func;
SELECT "abs(sum(numbers_input_df_func.number))", time_window FROM out_num_cnt_df_func;
DROP FLOW test_numbers_df_func;
DROP TABLE numbers_input_df_func;

View File

@@ -5,6 +5,8 @@ CREATE TABLE requests (
service_ip STRING,
val INT,
ts TIMESTAMP TIME INDEX
)WITH(
append_mode = 'true'
);
Affected Rows: 0
@@ -93,6 +95,8 @@ CREATE TABLE ngx_access_log (
client STRING,
country STRING,
access_time TIMESTAMP TIME INDEX
)WITH(
append_mode = 'true'
);
Affected Rows: 0

View File

@@ -6,6 +6,8 @@ CREATE TABLE requests (
service_ip STRING,
val INT,
ts TIMESTAMP TIME INDEX
)WITH(
append_mode = 'true'
);
CREATE TABLE sum_val_in_reqs (
@@ -59,6 +61,8 @@ CREATE TABLE ngx_access_log (
client STRING,
country STRING,
access_time TIMESTAMP TIME INDEX
)WITH(
append_mode = 'true'
);
CREATE FLOW calc_ngx_country SINK TO ngx_country AS

View File

@@ -3,6 +3,8 @@ CREATE TABLE input_basic (
ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY(number),
TIME INDEX(ts)
)WITH(
append_mode = 'true'
);
Affected Rows: 0
@@ -166,7 +168,7 @@ ADMIN FLUSH_FLOW('test_wildcard_basic');
| FLOW_FLUSHED |
+-----------------------------------------+
-- 3 is also expected, since flow don't have persisent state
-- flow batching mode
SELECT wildcard FROM out_basic;
+----------+
@@ -175,6 +177,14 @@ SELECT wildcard FROM out_basic;
| 3 |
+----------+
SELECT count(*) FROM input_basic;
+----------+
| count(*) |
+----------+
| 3 |
+----------+
DROP TABLE input_basic;
Affected Rows: 0
@@ -302,6 +312,15 @@ FROM
Affected Rows: 0
-- SQLNESS ARG restart=true
SELECT 1;
+----------+
| Int64(1) |
+----------+
| 1 |
+----------+
-- SQLNESS SLEEP 3s
INSERT INTO
input_basic
VALUES
@@ -310,6 +329,8 @@ VALUES
Affected Rows: 2
-- give flownode a second to rebuild flow
-- SQLNESS SLEEP 3s
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('test_wildcard_basic');
@@ -358,6 +379,15 @@ FROM
Affected Rows: 0
-- SQLNESS ARG restart=true
SELECT 1;
+----------+
| Int64(1) |
+----------+
| 1 |
+----------+
-- SQLNESS SLEEP 3s
INSERT INTO
input_basic
VALUES
@@ -366,6 +396,8 @@ VALUES
Affected Rows: 2
-- give flownode a second to rebuild flow
-- SQLNESS SLEEP 3s
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('test_wildcard_basic');
@@ -397,6 +429,15 @@ CREATE TABLE input_basic (
Affected Rows: 0
-- SQLNESS ARG restart=true
SELECT 1;
+----------+
| Int64(1) |
+----------+
| 1 |
+----------+
-- SQLNESS SLEEP 3s
INSERT INTO
input_basic
VALUES
@@ -406,6 +447,8 @@ VALUES
Affected Rows: 3
-- give flownode a second to rebuild flow
-- SQLNESS SLEEP 3s
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('test_wildcard_basic');
@@ -438,7 +481,17 @@ FROM
Affected Rows: 0
-- give flownode a second to rebuild flow
-- SQLNESS ARG restart=true
SELECT 1;
+----------+
| Int64(1) |
+----------+
| 1 |
+----------+
-- SQLNESS SLEEP 3s
INSERT INTO
input_basic
VALUES
@@ -457,13 +510,21 @@ ADMIN FLUSH_FLOW('test_wildcard_basic');
| FLOW_FLUSHED |
+-----------------------------------------+
-- 3 is also expected, since flow don't have persisent state
-- 4 is also expected, since flow batching mode
SELECT wildcard FROM out_basic;
+----------+
| wildcard |
+----------+
| 3 |
| 4 |
+----------+
SELECT count(*) FROM input_basic;
+----------+
| count(*) |
+----------+
| 4 |
+----------+
DROP TABLE input_basic;
@@ -496,6 +557,15 @@ FROM
Affected Rows: 0
-- SQLNESS ARG restart=true
SELECT 1;
+----------+
| Int64(1) |
+----------+
| 1 |
+----------+
-- SQLNESS SLEEP 3s
INSERT INTO
input_basic
VALUES
@@ -504,6 +574,8 @@ VALUES
Affected Rows: 2
-- give flownode a second to rebuild flow
-- SQLNESS SLEEP 3s
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('test_wildcard_basic');
@@ -538,6 +610,15 @@ FROM
Affected Rows: 0
-- SQLNESS ARG restart=true
SELECT 1;
+----------+
| Int64(1) |
+----------+
| 1 |
+----------+
-- SQLNESS SLEEP 3s
INSERT INTO
input_basic
VALUES
@@ -547,6 +628,7 @@ VALUES
Affected Rows: 3
-- give flownode a second to rebuild flow
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('test_wildcard_basic');

View File

@@ -3,6 +3,8 @@ CREATE TABLE input_basic (
ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY(number),
TIME INDEX(ts)
)WITH(
append_mode = 'true'
);
CREATE FLOW test_wildcard_basic sink TO out_basic AS
@@ -95,9 +97,11 @@ VALUES
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('test_wildcard_basic');
-- 3 is also expected, since flow don't have persisent state
-- flow batching mode
SELECT wildcard FROM out_basic;
SELECT count(*) FROM input_basic;
DROP TABLE input_basic;
DROP FLOW test_wildcard_basic;
DROP TABLE out_basic;
@@ -168,12 +172,17 @@ FROM
input_basic;
-- SQLNESS ARG restart=true
SELECT 1;
-- SQLNESS SLEEP 3s
INSERT INTO
input_basic
VALUES
(23, "2021-07-01 00:00:01.000"),
(24, "2021-07-01 00:00:01.500");
-- give flownode a second to rebuild flow
-- SQLNESS SLEEP 3s
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('test_wildcard_basic');
@@ -201,12 +210,17 @@ FROM
input_basic;
-- SQLNESS ARG restart=true
SELECT 1;
-- SQLNESS SLEEP 3s
INSERT INTO
input_basic
VALUES
(23, "2021-07-01 00:00:01.000"),
(24, "2021-07-01 00:00:01.500");
-- give flownode a second to rebuild flow
-- SQLNESS SLEEP 3s
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('test_wildcard_basic');
@@ -222,6 +236,9 @@ CREATE TABLE input_basic (
);
-- SQLNESS ARG restart=true
SELECT 1;
-- SQLNESS SLEEP 3s
INSERT INTO
input_basic
VALUES
@@ -229,6 +246,8 @@ VALUES
(24, "2021-07-01 00:00:01.500"),
(26, "2021-07-01 00:00:02.000");
-- give flownode a second to rebuild flow
-- SQLNESS SLEEP 3s
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('test_wildcard_basic');
@@ -245,7 +264,11 @@ SELECT
FROM
input_basic;
-- give flownode a second to rebuild flow
-- SQLNESS ARG restart=true
SELECT 1;
-- SQLNESS SLEEP 3s
INSERT INTO
input_basic
VALUES
@@ -256,9 +279,11 @@ VALUES
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('test_wildcard_basic');
-- 3 is also expected, since flow don't have persisent state
-- 4 is also expected, since flow batching mode
SELECT wildcard FROM out_basic;
SELECT count(*) FROM input_basic;
DROP TABLE input_basic;
DROP FLOW test_wildcard_basic;
DROP TABLE out_basic;
@@ -277,13 +302,17 @@ FROM
input_basic;
-- SQLNESS ARG restart=true
SELECT 1;
-- SQLNESS SLEEP 3s
INSERT INTO
input_basic
VALUES
(23, "2021-07-01 00:00:01.000"),
(24, "2021-07-01 00:00:01.500");
-- give flownode a second to rebuild flow
-- SQLNESS SLEEP 3s
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('test_wildcard_basic');
@@ -300,6 +329,9 @@ FROM
input_basic;
-- SQLNESS ARG restart=true
SELECT 1;
-- SQLNESS SLEEP 3s
INSERT INTO
input_basic
VALUES
@@ -307,6 +339,7 @@ VALUES
(24, "2021-07-01 00:00:01.500"),
(25, "2021-07-01 00:00:01.700");
-- give flownode a second to rebuild flow
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('test_wildcard_basic');

View File

@@ -397,7 +397,7 @@ CREATE TABLE temp_alerts (
sensor_id INT,
loc STRING,
max_temp DOUBLE,
update_at TIMESTAMP TIME INDEX,
event_ts TIMESTAMP TIME INDEX,
PRIMARY KEY(sensor_id, loc)
);
@@ -408,6 +408,7 @@ SELECT
sensor_id,
loc,
max(temperature) as max_temp,
max(ts) as event_ts,
FROM
temp_sensor_data
GROUP BY
@@ -438,7 +439,8 @@ ADMIN FLUSH_FLOW('temp_monitoring');
SELECT
sensor_id,
loc,
max_temp
max_temp,
event_ts
FROM
temp_alerts;
@@ -466,16 +468,17 @@ ADMIN FLUSH_FLOW('temp_monitoring');
SELECT
sensor_id,
loc,
max_temp
max_temp,
event_ts
FROM
temp_alerts;
+-----------+-------+----------+
| sensor_id | loc | max_temp |
+-----------+-------+----------+
| 1 | room1 | 101.5 |
| 2 | room2 | 102.5 |
+-----------+-------+----------+
+-----------+-------+----------+---------------------+
| sensor_id | loc | max_temp | event_ts |
+-----------+-------+----------+---------------------+
| 1 | room1 | 101.5 | 2022-01-01T00:00:02 |
| 2 | room2 | 102.5 | 2022-01-01T00:00:03 |
+-----------+-------+----------+---------------------+
DROP FLOW temp_monitoring;

View File

@@ -291,7 +291,7 @@ CREATE TABLE temp_alerts (
sensor_id INT,
loc STRING,
max_temp DOUBLE,
update_at TIMESTAMP TIME INDEX,
event_ts TIMESTAMP TIME INDEX,
PRIMARY KEY(sensor_id, loc)
);
@@ -300,6 +300,7 @@ SELECT
sensor_id,
loc,
max(temperature) as max_temp,
max(ts) as event_ts,
FROM
temp_sensor_data
GROUP BY
@@ -320,7 +321,8 @@ ADMIN FLUSH_FLOW('temp_monitoring');
SELECT
sensor_id,
loc,
max_temp
max_temp,
event_ts
FROM
temp_alerts;
@@ -337,7 +339,8 @@ ADMIN FLUSH_FLOW('temp_monitoring');
SELECT
sensor_id,
loc,
max_temp
max_temp,
event_ts
FROM
temp_alerts;

View File

@@ -72,12 +72,13 @@ INSERT INTO ngx_access_log VALUES ('192.168.1.1', 'GET', '/index.html', 200, 512
Affected Rows: 4
-- TODO(discord9): fix flow stat update for batching mode flow
SELECT created_time < last_execution_time, created_time IS NOT NULL, last_execution_time IS NOT NULL, source_table_names FROM information_schema.flows WHERE flow_name = 'user_agent_flow';
+--------------------------------------------------------------------------------------+---------------------------------------------------+----------------------------------------------------------+--------------------------------+
| information_schema.flows.created_time < information_schema.flows.last_execution_time | information_schema.flows.created_time IS NOT NULL | information_schema.flows.last_execution_time IS NOT NULL | source_table_names |
+--------------------------------------------------------------------------------------+---------------------------------------------------+----------------------------------------------------------+--------------------------------+
| true | true | true | greptime.public.ngx_access_log |
| | true | false | greptime.public.ngx_access_log |
+--------------------------------------------------------------------------------------+---------------------------------------------------+----------------------------------------------------------+--------------------------------+
DROP TABLE ngx_access_log;

View File

@@ -32,6 +32,7 @@ INSERT INTO ngx_access_log VALUES ('192.168.1.1', 'GET', '/index.html', 200, 512
-- SQLNESS SLEEP 10s
INSERT INTO ngx_access_log VALUES ('192.168.1.1', 'GET', '/index.html', 200, 512, 'Mozilla/5.0', 1024, '2023-10-01T10:00:00Z'), ('192.168.1.2', 'POST', '/submit', 201, 256, 'curl/7.68.0', 512, '2023-10-01T10:01:00Z'), ('192.168.1.1', 'GET', '/about.html', 200, 128, 'Mozilla/5.0', 256, '2023-10-01T10:02:00Z'), ('192.168.1.3', 'GET', '/contact', 404, 64, 'curl/7.68.0', 128, '2023-10-01T10:03:00Z');
-- TODO(discord9): fix flow stat update for batching mode flow
SELECT created_time < last_execution_time, created_time IS NOT NULL, last_execution_time IS NOT NULL, source_table_names FROM information_schema.flows WHERE flow_name = 'user_agent_flow';
DROP TABLE ngx_access_log;

View File

@@ -1,3 +1,3 @@
[grpc]
bind_addr = "127.0.0.1:29401"
server_addr = "127.0.0.1:29401"
bind_addr = "{grpc_addr}"
server_addr = "{grpc_addr}"

View File

@@ -389,6 +389,9 @@ impl ServerMode {
format!("--metasrv-addrs={metasrv_addr}"),
format!("--http-addr={http_addr}"),
format!("--rpc-addr={rpc_bind_addr}"),
// since sqlness run on local, bind addr is the same as server addr
// this is needed so that `cluster_info`'s server addr column can be correct
format!("--rpc-server-addr={rpc_bind_addr}"),
format!("--mysql-addr={mysql_addr}"),
format!("--postgres-addr={postgres_addr}"),
format!(