Compare commits

...

12 Commits

Author SHA1 Message Date
discord9
7f7b974e8a fix: conn pool leak & placeholder feature so ci can compile 2025-04-10 15:01:07 +08:00
discord9
4875ace0d0 fix: placeholder feature so ci can compile 2025-04-08 14:37:55 +08:00
discord9
a847d96649 fix: time window filter expr use OR 2025-04-07 16:50:17 +08:00
discord9
23a0a54e18 fix: convert timestamp unit too 2025-04-07 16:50:17 +08:00
discord9
78eb8b53f6 fix: quote&more info when time window too many
chore: even more warning

fix: filter first warn later
2025-04-07 16:50:17 +08:00
discord9
2455f39e8e fix: subquery&cte time window expr 2025-04-07 16:46:53 +08:00
discord9
7fe0074202 refactor: even finer&limit time window num 2025-04-07 16:46:53 +08:00
discord9
e16bc203d0 feat: basic time window aware 2025-04-07 16:46:53 +08:00
discord9
9a3c26bb0a metrics: better bucket&longer timeout 2025-04-07 16:46:53 +08:00
discord9
e1ff398c32 fix: timeout 2025-04-07 16:46:53 +08:00
discord9
780e3000de fix: heartbeat&expire_after unit 2025-04-07 16:46:53 +08:00
discord9
2b5ddf8427 feat: time window in df plan
WIP

test: found out time window expr

chore: pub

tests: also unparsed

tests: rm dup code

feat: frontend client for recording rule

fix: bound edgecase

WIP

WIP

feat: rule engine

feat: add init options& tmp rerounte to rule

fix: dist client get

fix: also not handle mirror write in flownode

chore: clippy
2025-04-07 16:46:47 +08:00
24 changed files with 2084 additions and 50 deletions

1
Cargo.lock generated
View File

@@ -4165,6 +4165,7 @@ dependencies = [
"bytes",
"cache",
"catalog",
"chrono",
"client",
"common-base",
"common-catalog",

View File

@@ -16,7 +16,6 @@
mod client;
pub mod client_manager;
#[cfg(feature = "testing")]
mod database;
pub mod error;
pub mod flow;
@@ -34,7 +33,6 @@ pub use common_recordbatch::{RecordBatches, SendableRecordBatchStream};
use snafu::OptionExt;
pub use self::client::Client;
#[cfg(feature = "testing")]
pub use self::database::Database;
pub use self::error::{Error, Result};
use crate::error::{IllegalDatabaseResponseSnafu, ServerSnafu};

View File

@@ -32,7 +32,7 @@ use common_meta::key::TableMetadataManager;
use common_telemetry::info;
use common_telemetry::logging::TracingOptions;
use common_version::{short_version, version};
use flow::{FlownodeBuilder, FlownodeInstance, FrontendInvoker};
use flow::{FlownodeBuilder, FlownodeInstance, FrontendClient, FrontendInvoker};
use meta_client::{MetaClientOptions, MetaClientType};
use servers::Mode;
use snafu::{OptionExt, ResultExt};
@@ -317,6 +317,8 @@ impl StartCommand {
Arc::new(executor),
);
let frontend_client = FrontendClient::from_meta_client(meta_client.clone());
let flow_metadata_manager = Arc::new(FlowMetadataManager::new(cached_meta_backend.clone()));
let flownode_builder = FlownodeBuilder::new(
opts,
@@ -324,6 +326,7 @@ impl StartCommand {
table_metadata_manager,
catalog_manager.clone(),
flow_metadata_manager,
Arc::new(frontend_client),
)
.with_heartbeat_task(heartbeat_task);

View File

@@ -54,7 +54,10 @@ use datanode::config::{DatanodeOptions, ProcedureConfig, RegionEngineConfig, Sto
use datanode::datanode::{Datanode, DatanodeBuilder};
use datanode::region_server::RegionServer;
use file_engine::config::EngineConfig as FileEngineConfig;
use flow::{FlowConfig, FlowWorkerManager, FlownodeBuilder, FlownodeOptions, FrontendInvoker};
use flow::{
FlowConfig, FlowWorkerManager, FlownodeBuilder, FlownodeOptions, FrontendClient,
FrontendInvoker,
};
use frontend::frontend::FrontendOptions;
use frontend::instance::builder::FrontendBuilder;
use frontend::instance::{FrontendInstance, Instance as FeInstance, StandaloneDatanodeManager};
@@ -533,12 +536,16 @@ impl StartCommand {
flow: opts.flow.clone(),
..Default::default()
};
let fe_server_addr = fe_opts.grpc.bind_addr.clone();
let frontend_client = FrontendClient::from_static_grpc_addr(fe_server_addr);
let flow_builder = FlownodeBuilder::new(
flownode_options,
plugins.clone(),
table_metadata_manager.clone(),
catalog_manager.clone(),
flow_metadata_manager.clone(),
Arc::new(frontend_client),
);
let flownode = Arc::new(
flow_builder

View File

@@ -445,10 +445,16 @@ impl Pool {
async fn recycle_channel_in_loop(pool: Arc<Pool>, interval_secs: u64) {
let mut interval = tokio::time::interval(Duration::from_secs(interval_secs));
// use weak ref here to prevent pool being leaked
let pool_weak = Arc::downgrade(&pool);
loop {
let _ = interval.tick().await;
pool.retain_channel(|_, c| c.access.swap(0, Ordering::Relaxed) != 0)
if let Some(pool) = pool_weak.upgrade() {
pool.retain_channel(|_, c| c.access.swap(0, Ordering::Relaxed) != 0)
} else {
// no one is using this pool, so we can also let go
break;
}
}
}

View File

@@ -343,6 +343,7 @@ pub enum FlowType {
impl FlowType {
pub const RECORDING_RULE: &str = "recording_rule";
pub const STREAMING: &str = "streaming";
pub const FLOW_TYPE_KEY: &str = "flow_type";
}
impl Default for FlowType {
@@ -398,7 +399,8 @@ impl From<&CreateFlowData> for CreateRequest {
};
let flow_type = value.flow_type.unwrap_or_default().to_string();
req.flow_options.insert("flow_type".to_string(), flow_type);
req.flow_options
.insert(FlowType::FLOW_TYPE_KEY.to_string(), flow_type);
req
}
}
@@ -430,7 +432,7 @@ impl From<&CreateFlowData> for (FlowInfoValue, Vec<(FlowPartitionId, FlowRouteVa
.collect::<Vec<_>>();
let flow_type = value.flow_type.unwrap_or_default().to_string();
options.insert("flow_type".to_string(), flow_type);
options.insert(FlowType::FLOW_TYPE_KEY.to_string(), flow_type);
let flow_info = FlowInfoValue {
source_table_ids: value.source_table_ids.clone(),

View File

@@ -16,6 +16,7 @@ async-trait.workspace = true
bytes.workspace = true
cache.workspace = true
catalog.workspace = true
chrono.workspace = true
client.workspace = true
common-base.workspace = true
common-config.workspace = true

View File

@@ -49,12 +49,13 @@ pub(crate) use crate::adapter::node_context::FlownodeContext;
use crate::adapter::refill::RefillTask;
use crate::adapter::table_source::ManagedTableSource;
use crate::adapter::util::relation_desc_to_column_schemas_with_fallback;
pub(crate) use crate::adapter::worker::{create_worker, Worker, WorkerHandle};
pub(crate) use crate::adapter::worker::{create_worker, WorkerHandle};
use crate::compute::ErrCollector;
use crate::df_optimizer::sql_to_flow_plan;
use crate::error::{EvalSnafu, ExternalSnafu, InternalSnafu, InvalidQuerySnafu, UnexpectedSnafu};
use crate::expr::Batch;
use crate::metrics::{METRIC_FLOW_INSERT_ELAPSED, METRIC_FLOW_ROWS, METRIC_FLOW_RUN_INTERVAL_MS};
use crate::recording_rules::RecordingRuleEngine;
use crate::repr::{self, DiffRow, RelationDesc, Row, BATCH_SIZE};
mod flownode_impl;
@@ -63,7 +64,7 @@ pub(crate) mod refill;
mod stat;
#[cfg(test)]
mod tests;
mod util;
pub(crate) mod util;
mod worker;
pub(crate) mod node_context;
@@ -171,6 +172,8 @@ pub struct FlowWorkerManager {
flush_lock: RwLock<()>,
/// receive a oneshot sender to send state size report
state_report_handler: RwLock<Option<StateReportHandler>>,
/// engine for recording rule
rule_engine: RecordingRuleEngine,
}
/// Building FlownodeManager
@@ -185,6 +188,7 @@ impl FlowWorkerManager {
node_id: Option<u32>,
query_engine: Arc<dyn QueryEngine>,
table_meta: TableMetadataManagerRef,
rule_engine: RecordingRuleEngine,
) -> Self {
let srv_map = ManagedTableSource::new(
table_meta.table_info_manager().clone(),
@@ -207,6 +211,7 @@ impl FlowWorkerManager {
node_id,
flush_lock: RwLock::new(()),
state_report_handler: RwLock::new(None),
rule_engine,
}
}
@@ -215,25 +220,6 @@ impl FlowWorkerManager {
self
}
/// Create a flownode manager with one worker
pub fn new_with_workers<'s>(
node_id: Option<u32>,
query_engine: Arc<dyn QueryEngine>,
table_meta: TableMetadataManagerRef,
num_workers: usize,
) -> (Self, Vec<Worker<'s>>) {
let mut zelf = Self::new(node_id, query_engine, table_meta);
let workers: Vec<_> = (0..num_workers)
.map(|_| {
let (handle, worker) = create_worker();
zelf.add_worker_handle(handle);
worker
})
.collect();
(zelf, workers)
}
/// add a worker handler to manager, meaning this corresponding worker is under it's manage
pub fn add_worker_handle(&mut self, handle: WorkerHandle) {
self.worker_handles.push(handle);
@@ -751,7 +737,11 @@ pub struct CreateFlowArgs {
/// Create&Remove flow
impl FlowWorkerManager {
/// remove a flow by it's id
#[allow(unreachable_code)]
pub async fn remove_flow(&self, flow_id: FlowId) -> Result<(), Error> {
// TODO(discord9): reroute some back to streaming engine later
return self.rule_engine.remove_flow(flow_id).await;
for handle in self.worker_handles.iter() {
if handle.contains_flow(flow_id).await? {
handle.remove_flow(flow_id).await?;
@@ -767,8 +757,10 @@ impl FlowWorkerManager {
/// steps to create task:
/// 1. parse query into typed plan(and optional parse expire_after expr)
/// 2. render source/sink with output table id and used input table id
#[allow(clippy::too_many_arguments)]
#[allow(clippy::too_many_arguments, unreachable_code)]
pub async fn create_flow(&self, args: CreateFlowArgs) -> Result<Option<FlowId>, Error> {
// TODO(discord9): reroute some back to streaming engine later
return self.rule_engine.create_flow(args).await;
let CreateFlowArgs {
flow_id,
sink_table_name,

View File

@@ -153,7 +153,13 @@ impl Flownode for FlowWorkerManager {
}
}
#[allow(unreachable_code, unused)]
async fn handle_inserts(&self, request: InsertRequests) -> Result<FlowResponse> {
return self
.rule_engine
.handle_inserts(request)
.await
.map_err(to_meta_err(snafu::location!()));
// using try_read to ensure two things:
// 1. flush wouldn't happen until inserts before it is inserted
// 2. inserts happening concurrently with flush wouldn't be block by flush
@@ -206,15 +212,15 @@ impl Flownode for FlowWorkerManager {
.collect_vec();
let table_col_names = table_schema.relation_desc.names;
let table_col_names = table_col_names
.iter().enumerate()
.map(|(idx,name)| match name {
Some(name) => Ok(name.clone()),
None => InternalSnafu {
reason: format!("Expect column {idx} of table id={table_id} to have name in table schema, found None"),
}
.fail().map_err(BoxedError::new).context(ExternalSnafu),
})
.collect::<Result<Vec<_>>>()?;
.iter().enumerate()
.map(|(idx,name)| match name {
Some(name) => Ok(name.clone()),
None => InternalSnafu {
reason: format!("Expect column {idx} of table id={table_id} to have name in table schema, found None"),
}
.fail().map_err(BoxedError::new).context(ExternalSnafu),
})
.collect::<Result<Vec<_>>>()?;
let name_to_col = HashMap::<_, _>::from_iter(
insert_schema
.iter()

View File

@@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.
//! Some utility functions
use std::sync::Arc;
use api::helper::ColumnDataTypeWrapper;

View File

@@ -16,6 +16,7 @@
use std::any::Any;
use arrow_schema::ArrowError;
use common_error::ext::BoxedError;
use common_error::{define_into_tonic_status, from_err_code_msg_to_header};
use common_macro::stack_trace_debug;
@@ -53,6 +54,13 @@ pub enum Error {
location: Location,
},
#[snafu(display("Time error"))]
Time {
source: common_time::error::Error,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("External error"))]
External {
source: BoxedError,
@@ -156,6 +164,15 @@ pub enum Error {
location: Location,
},
#[snafu(display("Arrow error: {raw:?} in context: {context}"))]
Arrow {
#[snafu(source)]
raw: ArrowError,
context: String,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Datafusion error: {raw:?} in context: {context}"))]
Datafusion {
#[snafu(source)]
@@ -230,6 +247,7 @@ impl ErrorExt for Error {
match self {
Self::Eval { .. }
| Self::JoinTask { .. }
| Self::Arrow { .. }
| Self::Datafusion { .. }
| Self::InsertIntoFlow { .. } => StatusCode::Internal,
Self::FlowAlreadyExist { .. } => StatusCode::TableAlreadyExists,
@@ -238,7 +256,9 @@ impl ErrorExt for Error {
| Self::FlowNotFound { .. }
| Self::ListFlows { .. } => StatusCode::TableNotFound,
Self::Plan { .. } | Self::Datatypes { .. } => StatusCode::PlanQuery,
Self::InvalidQuery { .. } | Self::CreateFlow { .. } => StatusCode::EngineExecuteQuery,
Self::InvalidQuery { .. } | Self::CreateFlow { .. } | Self::Time { .. } => {
StatusCode::EngineExecuteQuery
}
Self::Unexpected { .. } => StatusCode::Unexpected,
Self::NotImplemented { .. } | Self::UnsupportedTemporalFilter { .. } => {
StatusCode::Unsupported

View File

@@ -238,6 +238,7 @@ mod test {
for (sql, current, expected) in &testcases {
let plan = sql_to_substrait(engine.clone(), sql).await;
let mut ctx = create_test_ctx();
let flow_plan = TypedPlan::from_substrait_plan(&mut ctx, &plan)
.await

View File

@@ -130,13 +130,6 @@ impl HeartbeatTask {
pub fn shutdown(&self) {
info!("Close heartbeat task for flownode");
if self
.running
.compare_exchange(true, false, Ordering::AcqRel, Ordering::Acquire)
.is_err()
{
warn!("Call close heartbeat task multiple times");
}
}
fn new_heartbeat_request(

View File

@@ -33,6 +33,7 @@ mod expr;
pub mod heartbeat;
mod metrics;
mod plan;
mod recording_rules;
mod repr;
mod server;
mod transform;
@@ -43,4 +44,5 @@ mod test_utils;
pub use adapter::{FlowConfig, FlowWorkerManager, FlowWorkerManagerRef, FlownodeOptions};
pub use error::{Error, Result};
pub use recording_rules::FrontendClient;
pub use server::{FlownodeBuilder, FlownodeInstance, FlownodeServer, FrontendInvoker};

View File

@@ -28,6 +28,32 @@ lazy_static! {
&["table_id"]
)
.unwrap();
pub static ref METRIC_FLOW_RULE_ENGINE_QUERY_TIME: HistogramVec = register_histogram_vec!(
"greptime_flow_rule_engine_query_time",
"flow rule engine query time",
&["flow_id"],
vec![
0.0,
1.,
3.,
5.,
10.,
20.,
30.,
60.,
2. * 60.,
5. * 60.,
10. * 60.
]
)
.unwrap();
pub static ref METRIC_FLOW_RULE_ENGINE_SLOW_QUERY: HistogramVec = register_histogram_vec!(
"greptime_flow_rule_engine_slow_query",
"flow rule engine slow query",
&["flow_id", "sql", "peer"],
vec![60., 2. * 60., 3. * 60., 5. * 60., 10. * 60.]
)
.unwrap();
pub static ref METRIC_FLOW_RUN_INTERVAL_MS: IntGauge =
register_int_gauge!("greptime_flow_run_interval_ms", "flow run interval in ms").unwrap();
pub static ref METRIC_FLOW_ROWS: IntCounterVec = register_int_counter_vec!(

View File

@@ -0,0 +1,940 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Run flow as recording rule which is time-window-aware normal query triggered every tick set by user
mod engine;
mod frontend_client;
use std::collections::BTreeSet;
use std::sync::Arc;
use api::helper::pb_value_to_value_ref;
use catalog::CatalogManagerRef;
use common_error::ext::BoxedError;
use common_recordbatch::DfRecordBatch;
use common_telemetry::warn;
use common_time::timestamp::TimeUnit;
use common_time::Timestamp;
use datafusion::error::Result as DfResult;
use datafusion::logical_expr::Expr;
use datafusion::physical_planner::{DefaultPhysicalPlanner, PhysicalPlanner};
use datafusion::prelude::SessionContext;
use datafusion::sql::unparser::Unparser;
use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeRecursion, TreeNodeRewriter};
use datafusion_common::{DFSchema, TableReference};
use datafusion_expr::{ColumnarValue, LogicalPlan};
use datafusion_physical_expr::PhysicalExprRef;
use datatypes::prelude::{ConcreteDataType, DataType};
use datatypes::scalars::ScalarVector;
use datatypes::schema::TIME_INDEX_KEY;
use datatypes::value::Value;
use datatypes::vectors::{
TimestampMicrosecondVector, TimestampMillisecondVector, TimestampNanosecondVector,
TimestampSecondVector, Vector,
};
pub use engine::RecordingRuleEngine;
pub use frontend_client::FrontendClient;
use itertools::Itertools;
use query::parser::QueryLanguageParser;
use query::QueryEngineRef;
use session::context::QueryContextRef;
use snafu::{ensure, OptionExt, ResultExt};
use crate::adapter::util::from_proto_to_data_type;
use crate::df_optimizer::apply_df_optimizer;
use crate::error::{ArrowSnafu, DatafusionSnafu, DatatypesSnafu, ExternalSnafu, UnexpectedSnafu};
use crate::expr::error::DataTypeSnafu;
use crate::Error;
#[derive(Debug, Clone)]
pub struct TimeWindowExpr {
phy_expr: PhysicalExprRef,
column_name: String,
logical_expr: Expr,
df_schema: DFSchema,
}
impl TimeWindowExpr {
pub fn from_expr(expr: &Expr, column_name: &str, df_schema: &DFSchema) -> Result<Self, Error> {
let phy_planner = DefaultPhysicalPlanner::default();
let phy_expr: PhysicalExprRef = phy_planner
.create_physical_expr(expr, df_schema, &SessionContext::new().state())
.with_context(|_e| DatafusionSnafu {
context: format!(
"Failed to create physical expression from {expr:?} using {df_schema:?}"
),
})?;
Ok(Self {
phy_expr,
column_name: column_name.to_string(),
logical_expr: expr.clone(),
df_schema: df_schema.clone(),
})
}
pub fn eval(
&self,
current: Timestamp,
) -> Result<(Option<Timestamp>, Option<Timestamp>), Error> {
let lower_bound =
find_expr_time_window_lower_bound(&self.logical_expr, &self.df_schema, current)?;
let upper_bound =
find_expr_time_window_upper_bound(&self.logical_expr, &self.df_schema, current)?;
Ok((lower_bound, upper_bound))
}
/// Find timestamps from rows using time window expr
pub async fn handle_rows(
&self,
rows_list: Vec<api::v1::Rows>,
) -> Result<BTreeSet<Timestamp>, Error> {
let mut time_windows = BTreeSet::new();
for rows in rows_list {
// pick the time index column and use it to eval on `self.expr`
let ts_col_index = rows
.schema
.iter()
.map(|col| col.column_name.clone())
.position(|name| name == self.column_name);
let Some(ts_col_index) = ts_col_index else {
warn!("can't found time index column in schema: {:?}", rows.schema);
continue;
};
let col_schema = &rows.schema[ts_col_index];
let cdt = from_proto_to_data_type(col_schema)?;
let column_values = rows
.rows
.iter()
.map(|row| &row.values[ts_col_index])
.collect_vec();
let mut vector = cdt.create_mutable_vector(column_values.len());
for value in column_values {
let value = pb_value_to_value_ref(value, &None);
vector.try_push_value_ref(value).context(DataTypeSnafu {
msg: "Failed to convert rows to columns",
})?;
}
let vector = vector.to_vector();
let df_schema = create_df_schema_for_ts_column(&self.column_name, cdt)?;
let rb =
DfRecordBatch::try_new(df_schema.inner().clone(), vec![vector.to_arrow_array()])
.with_context(|_e| ArrowSnafu {
context: format!(
"Failed to create record batch from {df_schema:?} and {vector:?}"
),
})?;
let eval_res = self
.phy_expr
.evaluate(&rb)
.with_context(|_| DatafusionSnafu {
context: format!(
"Failed to evaluate physical expression {:?} on {rb:?}",
self.phy_expr
),
})?;
let res = columnar_to_ts_vector(&eval_res)?;
for ts in res.into_iter().flatten() {
time_windows.insert(ts);
}
}
Ok(time_windows)
}
}
fn create_df_schema_for_ts_column(name: &str, cdt: ConcreteDataType) -> Result<DFSchema, Error> {
let arrow_schema = Arc::new(arrow_schema::Schema::new(vec![arrow_schema::Field::new(
name,
cdt.as_arrow_type(),
false,
)]));
let df_schema = DFSchema::from_field_specific_qualified_schema(
vec![Some(TableReference::bare("TimeIndexOnlyTable"))],
&arrow_schema,
)
.with_context(|_e| DatafusionSnafu {
context: format!("Failed to create DFSchema from arrow schema {arrow_schema:?}"),
})?;
Ok(df_schema)
}
/// Convert `ColumnarValue` to `Vec<Option<Timestamp>>`
fn columnar_to_ts_vector(columnar: &ColumnarValue) -> Result<Vec<Option<Timestamp>>, Error> {
let val = match columnar {
datafusion_expr::ColumnarValue::Array(array) => {
let ty = array.data_type();
let ty = ConcreteDataType::from_arrow_type(ty);
let time_unit = if let ConcreteDataType::Timestamp(ty) = ty {
ty.unit()
} else {
return UnexpectedSnafu {
reason: format!("Non-timestamp type: {ty:?}"),
}
.fail();
};
match time_unit {
TimeUnit::Second => TimestampSecondVector::try_from_arrow_array(array.clone())
.with_context(|_| DatatypesSnafu {
extra: format!("Failed to create vector from arrow array {array:?}"),
})?
.iter_data()
.map(|d| d.map(|d| d.0))
.collect_vec(),
TimeUnit::Millisecond => {
TimestampMillisecondVector::try_from_arrow_array(array.clone())
.with_context(|_| DatatypesSnafu {
extra: format!("Failed to create vector from arrow array {array:?}"),
})?
.iter_data()
.map(|d| d.map(|d| d.0))
.collect_vec()
}
TimeUnit::Microsecond => {
TimestampMicrosecondVector::try_from_arrow_array(array.clone())
.with_context(|_| DatatypesSnafu {
extra: format!("Failed to create vector from arrow array {array:?}"),
})?
.iter_data()
.map(|d| d.map(|d| d.0))
.collect_vec()
}
TimeUnit::Nanosecond => {
TimestampNanosecondVector::try_from_arrow_array(array.clone())
.with_context(|_| DatatypesSnafu {
extra: format!("Failed to create vector from arrow array {array:?}"),
})?
.iter_data()
.map(|d| d.map(|d| d.0))
.collect_vec()
}
}
}
datafusion_expr::ColumnarValue::Scalar(scalar) => {
let value = Value::try_from(scalar.clone()).with_context(|_| DatatypesSnafu {
extra: format!("Failed to convert scalar {scalar:?} to value"),
})?;
let ts = value.as_timestamp().context(UnexpectedSnafu {
reason: format!("Expect Timestamp, found {:?}", value),
})?;
vec![Some(ts)]
}
};
Ok(val)
}
/// Convert sql to datafusion logical plan
pub async fn sql_to_df_plan(
query_ctx: QueryContextRef,
engine: QueryEngineRef,
sql: &str,
optimize: bool,
) -> Result<LogicalPlan, Error> {
let stmt = QueryLanguageParser::parse_sql(sql, &query_ctx)
.map_err(BoxedError::new)
.context(ExternalSnafu)?;
let plan = engine
.planner()
.plan(&stmt, query_ctx)
.await
.map_err(BoxedError::new)
.context(ExternalSnafu)?;
let plan = if optimize {
apply_df_optimizer(plan).await?
} else {
plan
};
Ok(plan)
}
/// Return (the column name of time index column, the time window expr, the expected time unit of time index column, the expr's schema for evaluating the time window)
async fn find_time_window_expr(
plan: &LogicalPlan,
catalog_man: CatalogManagerRef,
query_ctx: QueryContextRef,
) -> Result<(String, Option<datafusion_expr::Expr>, TimeUnit, DFSchema), Error> {
// TODO(discord9): find the expr that do time window
let mut table_name = None;
// first find the table source in the logical plan
plan.apply(|plan| {
let LogicalPlan::TableScan(table_scan) = plan else {
return Ok(TreeNodeRecursion::Continue);
};
table_name = Some(table_scan.table_name.clone());
Ok(TreeNodeRecursion::Stop)
})
.with_context(|_| DatafusionSnafu {
context: format!("Can't find table source in plan {plan:?}"),
})?;
let Some(table_name) = table_name else {
UnexpectedSnafu {
reason: format!("Can't find table source in plan {plan:?}"),
}
.fail()?
};
let current_schema = query_ctx.current_schema();
let catalog_name = table_name.catalog().unwrap_or(query_ctx.current_catalog());
let schema_name = table_name.schema().unwrap_or(&current_schema);
let table_name = table_name.table();
let Some(table_ref) = catalog_man
.table(catalog_name, schema_name, table_name, Some(&query_ctx))
.await
.map_err(BoxedError::new)
.context(ExternalSnafu)?
else {
UnexpectedSnafu {
reason: format!(
"Can't find table {table_name:?} in catalog {catalog_name:?}/{schema_name:?}"
),
}
.fail()?
};
let schema = &table_ref.table_info().meta.schema;
let ts_index = schema.timestamp_column().context(UnexpectedSnafu {
reason: format!("Can't find timestamp column in table {table_name:?}"),
})?;
let ts_col_name = ts_index.name.clone();
let expected_time_unit = ts_index.data_type.as_timestamp().with_context(|| UnexpectedSnafu {
reason: format!(
"Expected timestamp column {ts_col_name:?} in table {table_name:?} to be timestamp, but got {ts_index:?}"
),
})?.unit();
let arrow_schema = Arc::new(arrow_schema::Schema::new(vec![arrow_schema::Field::new(
ts_col_name.clone(),
ts_index.data_type.as_arrow_type(),
false,
)]));
let df_schema = DFSchema::from_field_specific_qualified_schema(
vec![Some(TableReference::bare(table_name))],
&arrow_schema,
)
.with_context(|_e| DatafusionSnafu {
context: format!("Failed to create DFSchema from arrow schema {arrow_schema:?}"),
})?;
// find the time window expr which refers to the time index column
let mut aggr_expr = None;
let mut time_window_expr: Option<Expr> = None;
let find_inner_aggr_expr = |plan: &LogicalPlan| {
if let LogicalPlan::Aggregate(aggregate) = plan {
aggr_expr = Some(aggregate.clone());
};
Ok(TreeNodeRecursion::Continue)
};
plan.apply(find_inner_aggr_expr)
.with_context(|_| DatafusionSnafu {
context: format!("Can't find aggr expr in plan {plan:?}"),
})?;
if let Some(aggregate) = aggr_expr {
for group_expr in &aggregate.group_expr {
let refs = group_expr.column_refs();
if refs.len() != 1 {
continue;
}
let ref_col = refs.iter().next().unwrap();
let index = aggregate.input.schema().maybe_index_of_column(ref_col);
let Some(index) = index else {
continue;
};
let field = aggregate.input.schema().field(index);
let is_time_index = field.metadata().get(TIME_INDEX_KEY) == Some(&"true".to_string());
if is_time_index {
let rewrite_column = group_expr.clone();
let rewritten = rewrite_column
.rewrite(&mut RewriteColumn {
table_name: table_name.to_string(),
})
.with_context(|_| DatafusionSnafu {
context: format!("Rewrite expr failed, expr={:?}", group_expr),
})?
.data;
struct RewriteColumn {
table_name: String,
}
impl TreeNodeRewriter for RewriteColumn {
type Node = Expr;
fn f_down(&mut self, node: Self::Node) -> DfResult<Transformed<Self::Node>> {
let Expr::Column(mut column) = node else {
return Ok(Transformed::no(node));
};
column.relation = Some(TableReference::bare(self.table_name.clone()));
Ok(Transformed::yes(Expr::Column(column)))
}
}
time_window_expr = Some(rewritten);
break;
}
}
Ok((ts_col_name, time_window_expr, expected_time_unit, df_schema))
} else {
// can't found time window expr, return None
Ok((ts_col_name, None, expected_time_unit, df_schema))
}
}
/// Find nearest lower bound for time `current` in given `plan` for the time window expr.
/// i.e. for time window expr being `date_bin(INTERVAL '5 minutes', ts) as time_window` and `current="2021-07-01 00:01:01.000"`,
/// return `Some("2021-07-01 00:00:00.000")`
/// if `plan` doesn't contain a `TIME INDEX` column, return `None`
///
/// Time window expr is a expr that:
/// 1. ref only to a time index column
/// 2. is monotonic increasing
/// 3. show up in GROUP BY clause
///
/// note this plan should only contain one TableScan
pub async fn find_plan_time_window_bound(
plan: &LogicalPlan,
current: Timestamp,
query_ctx: QueryContextRef,
engine: QueryEngineRef,
) -> Result<(String, Option<Timestamp>, Option<Timestamp>), Error> {
// TODO(discord9): find the expr that do time window
let catalog_man = engine.engine_state().catalog_manager();
let (ts_col_name, time_window_expr, expected_time_unit, df_schema) =
find_time_window_expr(plan, catalog_man.clone(), query_ctx).await?;
// cast current to ts_index's type
let new_current = current
.convert_to(expected_time_unit)
.with_context(|| UnexpectedSnafu {
reason: format!("Failed to cast current timestamp {current:?} to {expected_time_unit}"),
})?;
// if no time_window_expr is found, return None
if let Some(time_window_expr) = time_window_expr {
let lower_bound =
find_expr_time_window_lower_bound(&time_window_expr, &df_schema, new_current)?;
let upper_bound =
find_expr_time_window_upper_bound(&time_window_expr, &df_schema, new_current)?;
Ok((ts_col_name, lower_bound, upper_bound))
} else {
Ok((ts_col_name, None, None))
}
}
/// Find the lower bound of time window in given `expr` and `current` timestamp.
///
/// i.e. for `current="2021-07-01 00:01:01.000"` and `expr=date_bin(INTERVAL '5 minutes', ts) as time_window` and `ts_col=ts`,
/// return `Some("2021-07-01 00:00:00.000")` since it's the lower bound
/// return `Some("2021-07-01 00:00:00.000")` since it's the lower bound
/// of current time window given the current timestamp
///
/// if return None, meaning this time window have no lower bound
fn find_expr_time_window_lower_bound(
expr: &Expr,
df_schema: &DFSchema,
current: Timestamp,
) -> Result<Option<Timestamp>, Error> {
let phy_planner = DefaultPhysicalPlanner::default();
let phy_expr: PhysicalExprRef = phy_planner
.create_physical_expr(expr, df_schema, &SessionContext::new().state())
.with_context(|_e| DatafusionSnafu {
context: format!(
"Failed to create physical expression from {expr:?} using {df_schema:?}"
),
})?;
let cur_time_window = eval_ts_to_ts(&phy_expr, df_schema, current)?;
let input_time_unit = cur_time_window.unit();
Ok(cur_time_window.convert_to(input_time_unit))
}
/// Find the upper bound for time window expression
fn find_expr_time_window_upper_bound(
expr: &Expr,
df_schema: &DFSchema,
current: Timestamp,
) -> Result<Option<Timestamp>, Error> {
use std::cmp::Ordering;
let phy_planner = DefaultPhysicalPlanner::default();
let phy_expr: PhysicalExprRef = phy_planner
.create_physical_expr(expr, df_schema, &SessionContext::new().state())
.with_context(|_e| DatafusionSnafu {
context: format!(
"Failed to create physical expression from {expr:?} using {df_schema:?}"
),
})?;
let cur_time_window = eval_ts_to_ts(&phy_expr, df_schema, current)?;
// search to find the lower bound
let mut offset: i64 = 1;
let mut lower_bound = Some(current);
let upper_bound;
// first expontial probe to found a range for binary search
loop {
let Some(next_val) = current.value().checked_add(offset) else {
// no upper bound if overflow
return Ok(None);
};
let next_time_probe = common_time::Timestamp::new(next_val, current.unit());
let next_time_window = eval_ts_to_ts(&phy_expr, df_schema, next_time_probe)?;
match next_time_window.cmp(&cur_time_window) {
Ordering::Less => {UnexpectedSnafu {
reason: format!(
"Unsupported time window expression, expect monotonic increasing for time window expression {expr:?}"
),
}
.fail()?
}
Ordering::Equal => {
lower_bound = Some(next_time_probe);
}
Ordering::Greater => {
upper_bound = Some(next_time_probe);
break
}
}
let Some(new_offset) = offset.checked_mul(2) else {
// no upper bound if overflow
return Ok(None);
};
offset = new_offset;
}
// binary search for the exact upper bound
ensure!(lower_bound.map(|v|v.unit())==upper_bound.map(|v|v.unit()), UnexpectedSnafu{
reason: format!(" unit mismatch for time window expression {expr:?}, found {lower_bound:?} and {upper_bound:?}"),
});
let output_unit = upper_bound
.context(UnexpectedSnafu {
reason: "should have lower bound",
})?
.unit();
let mut low = lower_bound
.context(UnexpectedSnafu {
reason: "should have lower bound",
})?
.value();
let mut high = upper_bound
.context(UnexpectedSnafu {
reason: "should have upper bound",
})?
.value();
while low < high {
let mid = (low + high) / 2;
let mid_probe = common_time::Timestamp::new(mid, output_unit);
let mid_time_window = eval_ts_to_ts(&phy_expr, df_schema, mid_probe)?;
match mid_time_window.cmp(&cur_time_window) {
Ordering::Less => UnexpectedSnafu {
reason: format!("Binary search failed for time window expression {expr:?}"),
}
.fail()?,
Ordering::Equal => low = mid + 1,
Ordering::Greater => high = mid,
}
}
let final_upper_bound_for_time_window = common_time::Timestamp::new(high, output_unit);
Ok(Some(final_upper_bound_for_time_window))
}
fn eval_ts_to_ts(
phy: &PhysicalExprRef,
df_schema: &DFSchema,
input_value: Timestamp,
) -> Result<Timestamp, Error> {
let schema_ty = df_schema.field(0).data_type();
let schema_cdt = ConcreteDataType::from_arrow_type(schema_ty);
let schema_unit = if let ConcreteDataType::Timestamp(ts) = schema_cdt {
ts.unit()
} else {
return UnexpectedSnafu {
reason: format!("Expect Timestamp, found {:?}", schema_cdt),
}
.fail();
};
let input_value = input_value
.convert_to(schema_unit)
.with_context(|| UnexpectedSnafu {
reason: format!("Failed to convert timestamp {input_value:?} to {schema_unit}"),
})?;
let ts_vector = match schema_unit {
TimeUnit::Second => {
TimestampSecondVector::from_vec(vec![input_value.value()]).to_arrow_array()
}
TimeUnit::Millisecond => {
TimestampMillisecondVector::from_vec(vec![input_value.value()]).to_arrow_array()
}
TimeUnit::Microsecond => {
TimestampMicrosecondVector::from_vec(vec![input_value.value()]).to_arrow_array()
}
TimeUnit::Nanosecond => {
TimestampNanosecondVector::from_vec(vec![input_value.value()]).to_arrow_array()
}
};
let rb = DfRecordBatch::try_new(df_schema.inner().clone(), vec![ts_vector.clone()])
.with_context(|_| ArrowSnafu {
context: format!("Failed to create record batch from {df_schema:?} and {ts_vector:?}"),
})?;
let eval_res = phy.evaluate(&rb).with_context(|_| DatafusionSnafu {
context: format!("Failed to evaluate physical expression {phy:?} on {rb:?}"),
})?;
if let Some(Some(ts)) = columnar_to_ts_vector(&eval_res)?.first() {
Ok(*ts)
} else {
UnexpectedSnafu {
reason: format!(
"Expected timestamp in expression {phy:?} but got {:?}",
eval_res
),
}
.fail()?
}
}
// TODO(discord9): a method to found out the precise time window
/// Find out the `Filter` Node corresponding to outermost `WHERE` and add a new filter expr to it
#[derive(Debug)]
pub struct AddFilterRewriter {
extra_filter: Expr,
is_rewritten: bool,
}
impl AddFilterRewriter {
fn new(filter: Expr) -> Self {
Self {
extra_filter: filter,
is_rewritten: false,
}
}
}
impl TreeNodeRewriter for AddFilterRewriter {
type Node = LogicalPlan;
fn f_up(&mut self, node: Self::Node) -> DfResult<Transformed<Self::Node>> {
if self.is_rewritten {
return Ok(Transformed::no(node));
}
match node {
LogicalPlan::Filter(mut filter) if !filter.having => {
filter.predicate = filter.predicate.and(self.extra_filter.clone());
self.is_rewritten = true;
Ok(Transformed::yes(LogicalPlan::Filter(filter)))
}
LogicalPlan::TableScan(_) => {
// add a new filter
let filter =
datafusion_expr::Filter::try_new(self.extra_filter.clone(), Arc::new(node))?;
self.is_rewritten = true;
Ok(Transformed::yes(LogicalPlan::Filter(filter)))
}
_ => Ok(Transformed::no(node)),
}
}
}
fn df_plan_to_sql(plan: &LogicalPlan) -> Result<String, Error> {
/// A dialect that forces all identifiers to be quoted
struct ForceQuoteIdentifiers;
impl datafusion::sql::unparser::dialect::Dialect for ForceQuoteIdentifiers {
fn identifier_quote_style(&self, identifier: &str) -> Option<char> {
if identifier.to_lowercase() != identifier {
Some('"')
} else {
None
}
}
}
let unparser = Unparser::new(&ForceQuoteIdentifiers);
// first make all column qualified
let sql = unparser
.plan_to_sql(plan)
.with_context(|_e| DatafusionSnafu {
context: format!("Failed to unparse logical plan {plan:?}"),
})?;
Ok(sql.to_string())
}
#[cfg(test)]
mod test {
use datafusion_common::tree_node::TreeNode;
use pretty_assertions::assert_eq;
use session::context::QueryContext;
use super::{sql_to_df_plan, *};
use crate::recording_rules::{df_plan_to_sql, AddFilterRewriter};
use crate::test_utils::create_test_query_engine;
#[tokio::test]
async fn test_sql_plan_convert() {
let query_engine = create_test_query_engine();
let ctx = QueryContext::arc();
let old = r#"SELECT "NUMBER" FROM "UPPERCASE_NUMBERS_WITH_TS""#;
let new = sql_to_df_plan(ctx.clone(), query_engine.clone(), old, false)
.await
.unwrap();
let new_sql = df_plan_to_sql(&new).unwrap();
assert_eq!(
r#"SELECT "UPPERCASE_NUMBERS_WITH_TS"."NUMBER" FROM "UPPERCASE_NUMBERS_WITH_TS""#,
new_sql
);
}
#[tokio::test]
async fn test_add_filter() {
let testcases = vec![
(
"SELECT number FROM numbers_with_ts GROUP BY number","SELECT numbers_with_ts.number FROM numbers_with_ts WHERE (number > 4) GROUP BY numbers_with_ts.number"
),
(
"SELECT number FROM numbers_with_ts WHERE number < 2 OR number >10",
"SELECT numbers_with_ts.number FROM numbers_with_ts WHERE ((numbers_with_ts.number < 2) OR (numbers_with_ts.number > 10)) AND (number > 4)"
),
(
"SELECT date_bin('5 minutes', ts) as time_window FROM numbers_with_ts GROUP BY time_window",
"SELECT date_bin('5 minutes', numbers_with_ts.ts) AS time_window FROM numbers_with_ts WHERE (number > 4) GROUP BY date_bin('5 minutes', numbers_with_ts.ts)"
)
];
use datafusion_expr::{col, lit};
let query_engine = create_test_query_engine();
let ctx = QueryContext::arc();
for (before, after) in testcases {
let sql = before;
let plan = sql_to_df_plan(ctx.clone(), query_engine.clone(), sql, false)
.await
.unwrap();
let mut add_filter = AddFilterRewriter::new(col("number").gt(lit(4u32)));
let plan = plan.rewrite(&mut add_filter).unwrap().data;
let new_sql = df_plan_to_sql(&plan).unwrap();
assert_eq!(after, new_sql);
}
}
#[tokio::test]
async fn test_plan_time_window_lower_bound() {
use datafusion_expr::{col, lit};
let query_engine = create_test_query_engine();
let ctx = QueryContext::arc();
let testcases = [
// same alias is not same column
(
"SELECT arrow_cast(date_bin(INTERVAL '1 MINS', numbers_with_ts.ts), 'Timestamp(Second, None)') AS ts FROM numbers_with_ts GROUP BY ts;",
Timestamp::new(1740394109, TimeUnit::Second),
(
"ts".to_string(),
Some(Timestamp::new(1740394109000, TimeUnit::Millisecond)),
Some(Timestamp::new(1740394109001, TimeUnit::Millisecond)),
),
r#"SELECT arrow_cast(date_bin(INTERVAL '1 MINS', numbers_with_ts.ts), 'Timestamp(Second, None)') AS ts FROM numbers_with_ts WHERE ((ts >= CAST('2025-02-24 10:48:29' AS TIMESTAMP)) AND (ts <= CAST('2025-02-24 10:48:29.001' AS TIMESTAMP))) GROUP BY numbers_with_ts.ts"#
),
// complex time window index
(
"SELECT arrow_cast(date_bin(INTERVAL '1 MINS', numbers_with_ts.ts), 'Timestamp(Second, None)') AS time_window FROM numbers_with_ts GROUP BY time_window;",
Timestamp::new(1740394109, TimeUnit::Second),
(
"ts".to_string(),
Some(Timestamp::new(1740394080, TimeUnit::Second)),
Some(Timestamp::new(1740394140, TimeUnit::Second)),
),
"SELECT arrow_cast(date_bin(INTERVAL '1 MINS', numbers_with_ts.ts), 'Timestamp(Second, None)') AS time_window FROM numbers_with_ts WHERE ((ts >= CAST('2025-02-24 10:48:00' AS TIMESTAMP)) AND (ts <= CAST('2025-02-24 10:49:00' AS TIMESTAMP))) GROUP BY arrow_cast(date_bin(INTERVAL '1 MINS', numbers_with_ts.ts), 'Timestamp(Second, None)')"
),
// no time index
(
"SELECT date_bin('5 minutes', ts) FROM numbers_with_ts;",
Timestamp::new(23, TimeUnit::Millisecond),
("ts".to_string(), None, None),
"SELECT date_bin('5 minutes', ts) FROM numbers_with_ts;"
),
// time index
(
"SELECT date_bin('5 minutes', ts) as time_window FROM numbers_with_ts GROUP BY time_window;",
Timestamp::new(23, TimeUnit::Nanosecond),
(
"ts".to_string(),
Some(Timestamp::new(0, TimeUnit::Millisecond)),
Some(Timestamp::new(300000, TimeUnit::Millisecond)),
),
"SELECT date_bin('5 minutes', numbers_with_ts.ts) AS time_window FROM numbers_with_ts WHERE ((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts <= CAST('1970-01-01 00:05:00' AS TIMESTAMP))) GROUP BY date_bin('5 minutes', numbers_with_ts.ts)"
),
// on spot
(
"SELECT date_bin('5 minutes', ts) as time_window FROM numbers_with_ts GROUP BY time_window;",
Timestamp::new(0, TimeUnit::Nanosecond),
(
"ts".to_string(),
Some(Timestamp::new(0, TimeUnit::Millisecond)),
Some(Timestamp::new(300000, TimeUnit::Millisecond)),
),
"SELECT date_bin('5 minutes', numbers_with_ts.ts) AS time_window FROM numbers_with_ts WHERE ((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts <= CAST('1970-01-01 00:05:00' AS TIMESTAMP))) GROUP BY date_bin('5 minutes', numbers_with_ts.ts)"
),
// different time unit
(
"SELECT date_bin('5 minutes', ts) as time_window FROM numbers_with_ts GROUP BY time_window;",
Timestamp::new(23_000_000, TimeUnit::Nanosecond),
(
"ts".to_string(),
Some(Timestamp::new(0, TimeUnit::Millisecond)),
Some(Timestamp::new(300000, TimeUnit::Millisecond)),
),
"SELECT date_bin('5 minutes', numbers_with_ts.ts) AS time_window FROM numbers_with_ts WHERE ((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts <= CAST('1970-01-01 00:05:00' AS TIMESTAMP))) GROUP BY date_bin('5 minutes', numbers_with_ts.ts)"
),
// time index with other fields
(
"SELECT sum(number) as sum_up, date_bin('5 minutes', ts) as time_window FROM numbers_with_ts GROUP BY time_window;",
Timestamp::new(23, TimeUnit::Millisecond),
(
"ts".to_string(),
Some(Timestamp::new(0, TimeUnit::Millisecond)),
Some(Timestamp::new(300000, TimeUnit::Millisecond)),
),
"SELECT sum(numbers_with_ts.number) AS sum_up, date_bin('5 minutes', numbers_with_ts.ts) AS time_window FROM numbers_with_ts WHERE ((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts <= CAST('1970-01-01 00:05:00' AS TIMESTAMP))) GROUP BY date_bin('5 minutes', numbers_with_ts.ts)"
),
// time index with other pks
(
"SELECT number, date_bin('5 minutes', ts) as time_window FROM numbers_with_ts GROUP BY time_window, number;",
Timestamp::new(23, TimeUnit::Millisecond),
(
"ts".to_string(),
Some(Timestamp::new(0, TimeUnit::Millisecond)),
Some(Timestamp::new(300000, TimeUnit::Millisecond)),
),
"SELECT numbers_with_ts.number, date_bin('5 minutes', numbers_with_ts.ts) AS time_window FROM numbers_with_ts WHERE ((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts <= CAST('1970-01-01 00:05:00' AS TIMESTAMP))) GROUP BY date_bin('5 minutes', numbers_with_ts.ts), numbers_with_ts.number"
),
// subquery
(
"SELECT number, time_window FROM (SELECT number, date_bin('5 minutes', ts) as time_window FROM numbers_with_ts GROUP BY time_window, number);",
Timestamp::new(23, TimeUnit::Millisecond),
(
"ts".to_string(),
Some(Timestamp::new(0, TimeUnit::Millisecond)),
Some(Timestamp::new(300000, TimeUnit::Millisecond)),
),
"SELECT numbers_with_ts.number, time_window FROM (SELECT numbers_with_ts.number, date_bin('5 minutes', numbers_with_ts.ts) AS time_window FROM numbers_with_ts WHERE ((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts <= CAST('1970-01-01 00:05:00' AS TIMESTAMP))) GROUP BY date_bin('5 minutes', numbers_with_ts.ts), numbers_with_ts.number)"
),
// cte
(
"with cte as (select number, date_bin('5 minutes', ts) as time_window from numbers_with_ts GROUP BY time_window, number) select number, time_window from cte;",
Timestamp::new(23, TimeUnit::Millisecond),
(
"ts".to_string(),
Some(Timestamp::new(0, TimeUnit::Millisecond)),
Some(Timestamp::new(300000, TimeUnit::Millisecond)),
),
"SELECT cte.number, cte.time_window FROM (SELECT numbers_with_ts.number, date_bin('5 minutes', numbers_with_ts.ts) AS time_window FROM numbers_with_ts WHERE ((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts <= CAST('1970-01-01 00:05:00' AS TIMESTAMP))) GROUP BY date_bin('5 minutes', numbers_with_ts.ts), numbers_with_ts.number) AS cte"
),
// complex subquery without alias
(
"SELECT sum(number), number, date_bin('5 minutes', ts) as time_window, bucket_name FROM (SELECT number, ts, case when number < 5 THEN 'bucket_0_5' when number >= 5 THEN 'bucket_5_inf' END as bucket_name FROM numbers_with_ts) GROUP BY number, time_window, bucket_name;",
Timestamp::new(23, TimeUnit::Millisecond),
(
"ts".to_string(),
Some(Timestamp::new(0, TimeUnit::Millisecond)),
Some(Timestamp::new(300000, TimeUnit::Millisecond)),
),
"SELECT sum(numbers_with_ts.number), numbers_with_ts.number, date_bin('5 minutes', numbers_with_ts.ts) AS time_window, bucket_name FROM (SELECT numbers_with_ts.number, numbers_with_ts.ts, CASE WHEN (numbers_with_ts.number < 5) THEN 'bucket_0_5' WHEN (numbers_with_ts.number >= 5) THEN 'bucket_5_inf' END AS bucket_name FROM numbers_with_ts WHERE ((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts <= CAST('1970-01-01 00:05:00' AS TIMESTAMP)))) GROUP BY numbers_with_ts.number, date_bin('5 minutes', numbers_with_ts.ts), bucket_name"
),
// complex subquery alias
(
"SELECT sum(number), number, date_bin('5 minutes', ts) as time_window, bucket_name FROM (SELECT number, ts, case when number < 5 THEN 'bucket_0_5' when number >= 5 THEN 'bucket_5_inf' END as bucket_name FROM numbers_with_ts) as cte GROUP BY number, time_window, bucket_name;",
Timestamp::new(23, TimeUnit::Millisecond),
(
"ts".to_string(),
Some(Timestamp::new(0, TimeUnit::Millisecond)),
Some(Timestamp::new(300000, TimeUnit::Millisecond)),
),
"SELECT sum(cte.number), cte.number, date_bin('5 minutes', cte.ts) AS time_window, cte.bucket_name FROM (SELECT numbers_with_ts.number, numbers_with_ts.ts, CASE WHEN (numbers_with_ts.number < 5) THEN 'bucket_0_5' WHEN (numbers_with_ts.number >= 5) THEN 'bucket_5_inf' END AS bucket_name FROM numbers_with_ts WHERE ((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts <= CAST('1970-01-01 00:05:00' AS TIMESTAMP)))) AS cte GROUP BY cte.number, date_bin('5 minutes', cte.ts), cte.bucket_name"
),
];
for (sql, current, expected, expected_unparsed) in testcases {
let plan = sql_to_df_plan(ctx.clone(), query_engine.clone(), sql, true)
.await
.unwrap();
let real =
find_plan_time_window_bound(&plan, current, ctx.clone(), query_engine.clone())
.await
.unwrap();
assert_eq!(expected, real);
let plan = sql_to_df_plan(ctx.clone(), query_engine.clone(), sql, false)
.await
.unwrap();
let (col_name, lower, upper) = real;
let new_sql = if lower.is_some() {
let to_df_literal = |value| {
let value = Value::from(value);
value.try_to_scalar_value(&value.data_type()).unwrap()
};
let lower = to_df_literal(lower.unwrap());
let upper = to_df_literal(upper.unwrap());
let expr = col(&col_name)
.gt_eq(lit(lower))
.and(col(&col_name).lt_eq(lit(upper)));
let mut add_filter = AddFilterRewriter::new(expr);
let plan = plan.rewrite(&mut add_filter).unwrap().data;
df_plan_to_sql(&plan).unwrap()
} else {
sql.to_string()
};
assert_eq!(expected_unparsed, new_sql);
}
}
}

View File

@@ -0,0 +1,815 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::{BTreeMap, HashMap, HashSet};
use std::sync::Arc;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use api::v1::flow::FlowResponse;
use common_error::ext::BoxedError;
use common_meta::ddl::create_flow::FlowType;
use common_meta::key::flow::FlowMetadataManagerRef;
use common_meta::key::table_info::TableInfoManager;
use common_meta::key::TableMetadataManagerRef;
use common_telemetry::tracing::warn;
use common_telemetry::{debug, info};
use common_time::Timestamp;
use datafusion::sql::unparser::expr_to_sql;
use datafusion_common::tree_node::TreeNode;
use datatypes::value::Value;
use query::QueryEngineRef;
use session::context::QueryContextRef;
use snafu::{ensure, OptionExt, ResultExt};
use store_api::storage::RegionId;
use table::metadata::TableId;
use tokio::sync::oneshot::error::TryRecvError;
use tokio::sync::{oneshot, RwLock};
use tokio::time::Instant;
use super::frontend_client::FrontendClient;
use super::{df_plan_to_sql, AddFilterRewriter, TimeWindowExpr};
use crate::adapter::{CreateFlowArgs, FlowId, TableName};
use crate::error::{
DatafusionSnafu, DatatypesSnafu, ExternalSnafu, FlowAlreadyExistSnafu, InternalSnafu,
TimeSnafu, UnexpectedSnafu,
};
use crate::metrics::{METRIC_FLOW_RULE_ENGINE_QUERY_TIME, METRIC_FLOW_RULE_ENGINE_SLOW_QUERY};
use crate::recording_rules::{find_time_window_expr, sql_to_df_plan};
use crate::Error;
/// TODO(discord9): make those constants configurable
/// The default rule engine query timeout is 10 minutes
pub const DEFAULT_RULE_ENGINE_QUERY_TIMEOUT: Duration = Duration::from_secs(10 * 60);
/// will output a warn log for any query that runs for more that 1 minutes, and also every 1 minutes when that query is still running
pub const SLOW_QUERY_THRESHOLD: Duration = Duration::from_secs(60);
/// TODO(discord9): determine how to configure refresh rate
pub struct RecordingRuleEngine {
tasks: RwLock<BTreeMap<FlowId, RecordingRuleTask>>,
shutdown_txs: RwLock<BTreeMap<FlowId, oneshot::Sender<()>>>,
frontend_client: Arc<FrontendClient>,
flow_metadata_manager: FlowMetadataManagerRef,
table_meta: TableMetadataManagerRef,
engine: QueryEngineRef,
}
impl RecordingRuleEngine {
pub fn new(
frontend_client: Arc<FrontendClient>,
engine: QueryEngineRef,
flow_metadata_manager: FlowMetadataManagerRef,
table_meta: TableMetadataManagerRef,
) -> Self {
Self {
tasks: Default::default(),
shutdown_txs: Default::default(),
frontend_client,
flow_metadata_manager,
table_meta,
engine,
}
}
pub async fn handle_inserts(
&self,
request: api::v1::region::InsertRequests,
) -> Result<FlowResponse, Error> {
let table_info_mgr = self.table_meta.table_info_manager();
let mut group_by_table_name: HashMap<TableName, Vec<api::v1::Rows>> = HashMap::new();
for r in request.requests {
let tid = RegionId::from(r.region_id).table_id();
let name = get_table_name(table_info_mgr, &tid).await?;
let entry = group_by_table_name.entry(name).or_default();
if let Some(rows) = r.rows {
entry.push(rows);
}
}
for (_flow_id, task) in self.tasks.read().await.iter() {
let src_table_names = &task.source_table_names;
for src_table_name in src_table_names {
if let Some(entry) = group_by_table_name.get(src_table_name) {
let Some(expr) = &task.time_window_expr else {
continue;
};
let involved_time_windows = expr.handle_rows(entry.clone()).await?;
let mut state = task.state.write().await;
state
.dirty_time_windows
.add_lower_bounds(involved_time_windows.into_iter());
}
}
}
Ok(Default::default())
}
}
async fn get_table_name(zelf: &TableInfoManager, table_id: &TableId) -> Result<TableName, Error> {
zelf.get(*table_id)
.await
.map_err(BoxedError::new)
.context(ExternalSnafu)?
.with_context(|| UnexpectedSnafu {
reason: format!("Table id = {:?}, couldn't found table name", table_id),
})
.map(|name| name.table_name())
.map(|name| [name.catalog_name, name.schema_name, name.table_name])
}
const MIN_REFRESH_DURATION: Duration = Duration::new(5, 0);
impl RecordingRuleEngine {
pub async fn create_flow(&self, args: CreateFlowArgs) -> Result<Option<FlowId>, Error> {
let CreateFlowArgs {
flow_id,
sink_table_name,
source_table_ids,
create_if_not_exists,
or_replace,
expire_after,
comment: _,
sql,
flow_options,
query_ctx,
} = args;
// or replace logic
{
let is_exist = self.tasks.read().await.contains_key(&flow_id);
match (create_if_not_exists, or_replace, is_exist) {
// if replace, ignore that old flow exists
(_, true, true) => {
info!("Replacing flow with id={}", flow_id);
}
(false, false, true) => FlowAlreadyExistSnafu { id: flow_id }.fail()?,
// already exists, and not replace, return None
(true, false, true) => {
info!("Flow with id={} already exists, do nothing", flow_id);
return Ok(None);
}
// continue as normal
(_, _, false) => (),
}
}
let flow_type = flow_options.get(FlowType::FLOW_TYPE_KEY);
ensure!(
flow_type == Some(&FlowType::RecordingRule.to_string()) || flow_type.is_none(),
UnexpectedSnafu {
reason: format!("Flow type is not RecordingRule nor None, got {flow_type:?}")
}
);
let Some(query_ctx) = query_ctx else {
UnexpectedSnafu {
reason: "Query context is None".to_string(),
}
.fail()?
};
let query_ctx = Arc::new(query_ctx);
let mut source_table_names = Vec::new();
for src_id in source_table_ids {
let table_name = self
.table_meta
.table_info_manager()
.get(src_id)
.await
.map_err(BoxedError::new)
.context(ExternalSnafu)?
.with_context(|| UnexpectedSnafu {
reason: format!("Table id = {:?}, couldn't found table name", src_id),
})
.map(|name| name.table_name())
.map(|name| [name.catalog_name, name.schema_name, name.table_name])?;
source_table_names.push(table_name);
}
let (tx, rx) = oneshot::channel();
let plan = sql_to_df_plan(query_ctx.clone(), self.engine.clone(), &sql, true).await?;
let (column_name, time_window_expr, _, df_schema) = find_time_window_expr(
&plan,
self.engine.engine_state().catalog_manager().clone(),
query_ctx.clone(),
)
.await?;
let phy_expr = time_window_expr
.map(|expr| TimeWindowExpr::from_expr(&expr, &column_name, &df_schema))
.transpose()?;
info!("Flow id={}, found time window expr={:?}", flow_id, phy_expr);
let task = RecordingRuleTask::new(
flow_id,
&sql,
phy_expr,
expire_after,
sink_table_name,
source_table_names,
query_ctx,
rx,
);
let task_inner = task.clone();
let engine = self.engine.clone();
let frontend = self.frontend_client.clone();
// TODO(discord9): also save handle & use time wheel or what for better
let _handle = common_runtime::spawn_global(async move {
match task_inner.start_executing(engine, frontend).await {
Ok(()) => info!("Flow {} shutdown", task_inner.flow_id),
Err(err) => common_telemetry::error!(
"Flow {} encounter unrecoverable error: {err:?}",
task_inner.flow_id
),
}
});
// TODO(discord9): deal with replace logic
let replaced_old_task_opt = self.tasks.write().await.insert(flow_id, task);
drop(replaced_old_task_opt);
self.shutdown_txs.write().await.insert(flow_id, tx);
Ok(Some(flow_id))
}
pub async fn remove_flow(&self, flow_id: FlowId) -> Result<(), Error> {
if self.tasks.write().await.remove(&flow_id).is_none() {
warn!("Flow {flow_id} not found in tasks")
}
let Some(tx) = self.shutdown_txs.write().await.remove(&flow_id) else {
UnexpectedSnafu {
reason: format!("Can't found shutdown tx for flow {flow_id}"),
}
.fail()?
};
if tx.send(()).is_err() {
warn!("Fail to shutdown flow {flow_id} due to receiver already dropped, maybe flow {flow_id} is already dropped?")
}
Ok(())
}
}
#[derive(Debug, Clone)]
pub struct RecordingRuleTask {
pub flow_id: FlowId,
query: String,
pub time_window_expr: Option<TimeWindowExpr>,
/// in seconds
pub expire_after: Option<i64>,
sink_table_name: [String; 3],
source_table_names: HashSet<[String; 3]>,
state: Arc<RwLock<RecordingRuleState>>,
}
impl RecordingRuleTask {
#[allow(clippy::too_many_arguments)]
pub fn new(
flow_id: FlowId,
query: &str,
time_window_expr: Option<TimeWindowExpr>,
expire_after: Option<i64>,
sink_table_name: [String; 3],
source_table_names: Vec<[String; 3]>,
query_ctx: QueryContextRef,
shutdown_rx: oneshot::Receiver<()>,
) -> Self {
Self {
flow_id,
query: query.to_string(),
time_window_expr,
expire_after,
sink_table_name,
source_table_names: source_table_names.into_iter().collect(),
state: Arc::new(RwLock::new(RecordingRuleState::new(query_ctx, shutdown_rx))),
}
}
}
impl RecordingRuleTask {
/// This should be called in a new tokio task
pub async fn start_executing(
&self,
engine: QueryEngineRef,
frontend_client: Arc<FrontendClient>,
) -> Result<(), Error> {
// only first query don't need upper bound
let mut is_first = true;
loop {
// FIXME(discord9): test if need upper bound also works
let new_query = self.gen_query_with_time_window(engine.clone()).await?;
let insert_into = if let Some(new_query) = new_query {
format!(
"INSERT INTO {}.{}.{} {}",
self.sink_table_name[0],
self.sink_table_name[1],
self.sink_table_name[2],
new_query
)
} else {
tokio::time::sleep(MIN_REFRESH_DURATION).await;
continue;
};
if is_first {
is_first = false;
}
let instant = Instant::now();
let flow_id = self.flow_id;
let db_client = frontend_client.get_database_client().await?;
let peer_addr = db_client.peer.addr;
debug!(
"Executing flow {flow_id}(expire_after={:?} secs) on {:?} with query {}",
self.expire_after, peer_addr, &insert_into
);
let timer = METRIC_FLOW_RULE_ENGINE_QUERY_TIME
.with_label_values(&[flow_id.to_string().as_str()])
.start_timer();
let res = db_client.database.sql(&insert_into).await;
drop(timer);
let elapsed = instant.elapsed();
if let Ok(res1) = &res {
debug!(
"Flow {flow_id} executed, result: {res1:?}, elapsed: {:?}",
elapsed
);
} else if let Err(res) = &res {
warn!(
"Failed to execute Flow {flow_id} on frontend {}, result: {res:?}, elapsed: {:?} with query: {}",
peer_addr, elapsed, &insert_into
);
}
// record slow query
if elapsed >= SLOW_QUERY_THRESHOLD {
warn!(
"Flow {flow_id} on frontend {} executed for {:?} before complete, query: {}",
peer_addr, elapsed, &insert_into
);
METRIC_FLOW_RULE_ENGINE_SLOW_QUERY
.with_label_values(&[flow_id.to_string().as_str(), &insert_into, &peer_addr])
.observe(elapsed.as_secs_f64());
}
self.state
.write()
.await
.after_query_exec(elapsed, res.is_ok());
// drop the result to free client-related resources
drop(res);
let sleep_until = {
let mut state = self.state.write().await;
match state.shutdown_rx.try_recv() {
Ok(()) => break Ok(()),
Err(TryRecvError::Closed) => {
warn!("Unexpected shutdown flow {flow_id}, shutdown anyway");
break Ok(());
}
Err(TryRecvError::Empty) => (),
}
state.get_next_start_query_time(None)
};
tokio::time::sleep_until(sleep_until).await;
}
}
/// will merge and use the first ten time window in query
async fn gen_query_with_time_window(
&self,
engine: QueryEngineRef,
) -> Result<Option<String>, Error> {
let query_ctx = self.state.read().await.query_ctx.clone();
let start = SystemTime::now();
let since_the_epoch = start
.duration_since(UNIX_EPOCH)
.expect("Time went backwards");
let low_bound = self
.expire_after
.map(|e| since_the_epoch.as_secs() - e as u64)
.unwrap_or(u64::MIN);
let low_bound = Timestamp::new_second(low_bound as i64);
// TODO(discord9): use time window expr to get the precise expire lower bound
let expire_time_window_bound = self
.time_window_expr
.as_ref()
.map(|expr| expr.eval(low_bound))
.transpose()?;
let new_sql = {
let expr = {
match expire_time_window_bound {
Some((Some(l), Some(u))) => {
let window_size = u.sub(&l).with_context(|| UnexpectedSnafu {
reason: format!("Can't get window size from {u:?} - {l:?}"),
})?;
let col_name = self
.time_window_expr
.as_ref()
.map(|expr| expr.column_name.clone())
.with_context(|| UnexpectedSnafu {
reason: format!(
"Flow id={:?}, Failed to get column name from time window expr",
self.flow_id
),
})?;
self.state
.write()
.await
.dirty_time_windows
.gen_filter_exprs(&col_name, Some(l), window_size, self)?
}
_ => {
debug!(
"Flow id = {:?}, can't get window size: precise_lower_bound={expire_time_window_bound:?}, using the same query", self.flow_id
);
// since no time window lower/upper bound is found, just return the original query
return Ok(Some(self.query.clone()));
}
}
};
debug!(
"Flow id={:?}, Generated filter expr: {:?}",
self.flow_id,
expr.as_ref()
.map(|expr| expr_to_sql(expr).with_context(|_| DatafusionSnafu {
context: format!("Failed to generate filter expr from {expr:?}"),
}))
.transpose()?
.map(|s| s.to_string())
);
let Some(expr) = expr else {
// no new data, hence no need to update
debug!("Flow id={:?}, no new data, not update", self.flow_id);
return Ok(None);
};
let mut add_filter = AddFilterRewriter::new(expr);
// make a not optimized plan for clearer unparse
let plan =
sql_to_df_plan(query_ctx.clone(), engine.clone(), &self.query, false).await?;
let plan = plan
.clone()
.rewrite(&mut add_filter)
.with_context(|_| DatafusionSnafu {
context: format!("Failed to rewrite plan {plan:?}"),
})?
.data;
df_plan_to_sql(&plan)?
};
Ok(Some(new_sql))
}
}
#[derive(Debug)]
pub struct RecordingRuleState {
query_ctx: QueryContextRef,
/// last query complete time
last_update_time: Instant,
/// last time query duration
last_query_duration: Duration,
/// Dirty Time windows need to be updated
/// mapping of `start -> end` and non-overlapping
dirty_time_windows: DirtyTimeWindows,
exec_state: ExecState,
shutdown_rx: oneshot::Receiver<()>,
}
#[derive(Debug, Clone, Default)]
pub struct DirtyTimeWindows {
windows: BTreeMap<Timestamp, Option<Timestamp>>,
}
fn to_df_literal(value: Timestamp) -> Result<datafusion_common::ScalarValue, Error> {
let value = Value::from(value);
let value = value
.try_to_scalar_value(&value.data_type())
.with_context(|_| DatatypesSnafu {
extra: format!("Failed to convert to scalar value: {}", value),
})?;
Ok(value)
}
impl DirtyTimeWindows {
/// Time window merge distance
const MERGE_DIST: i32 = 3;
/// Maximum number of filters allowed in a single query
const MAX_FILTER_NUM: usize = 20;
/// Add lower bounds to the dirty time windows. Upper bounds are ignored.
///
/// # Arguments
///
/// * `lower_bounds` - An iterator of lower bounds to be added.
pub fn add_lower_bounds(&mut self, lower_bounds: impl Iterator<Item = Timestamp>) {
for lower_bound in lower_bounds {
let entry = self.windows.entry(lower_bound);
entry.or_insert(None);
}
}
/// Generate all filter expressions consuming all time windows
pub fn gen_filter_exprs(
&mut self,
col_name: &str,
expire_lower_bound: Option<Timestamp>,
window_size: chrono::Duration,
task_ctx: &RecordingRuleTask,
) -> Result<Option<datafusion_expr::Expr>, Error> {
debug!(
"expire_lower_bound: {:?}, window_size: {:?}",
expire_lower_bound.map(|t| t.to_iso8601_string()),
window_size
);
self.merge_dirty_time_windows(window_size, expire_lower_bound)?;
if self.windows.len() > Self::MAX_FILTER_NUM {
let first_time_window = self.windows.first_key_value();
let last_time_window = self.windows.last_key_value();
warn!(
"Flow id = {:?}, too many time windows: {}, only the first {} are taken for this query, the group by expression might be wrong. Time window expr={:?}, expire_after={:?}, first_time_window={:?}, last_time_window={:?}, the original query: {:?}",
task_ctx.flow_id,
self.windows.len(),
Self::MAX_FILTER_NUM,
task_ctx.time_window_expr,
task_ctx.expire_after,
first_time_window,
last_time_window,
task_ctx.query
);
}
// get the first `MAX_FILTER_NUM` time windows
let nth = self
.windows
.iter()
.nth(Self::MAX_FILTER_NUM)
.map(|(key, _)| *key);
let first_nth = {
if let Some(nth) = nth {
let mut after = self.windows.split_off(&nth);
std::mem::swap(&mut self.windows, &mut after);
after
} else {
std::mem::take(&mut self.windows)
}
};
let mut expr_lst = vec![];
for (start, end) in first_nth.into_iter() {
debug!(
"Time window start: {:?}, end: {:?}",
start.to_iso8601_string(),
end.map(|t| t.to_iso8601_string())
);
use datafusion_expr::{col, lit};
let lower = to_df_literal(start)?;
let upper = end.map(to_df_literal).transpose()?;
let expr = if let Some(upper) = upper {
col(col_name)
.gt_eq(lit(lower))
.and(col(col_name).lt(lit(upper)))
} else {
col(col_name).gt_eq(lit(lower))
};
expr_lst.push(expr);
}
let expr = expr_lst.into_iter().reduce(|a, b| a.or(b));
Ok(expr)
}
/// Merge time windows that overlaps or get too close
pub fn merge_dirty_time_windows(
&mut self,
window_size: chrono::Duration,
expire_lower_bound: Option<Timestamp>,
) -> Result<(), Error> {
let mut new_windows = BTreeMap::new();
let mut prev_tw = None;
for (lower_bound, upper_bound) in std::mem::take(&mut self.windows) {
// filter out expired time window
if let Some(expire_lower_bound) = expire_lower_bound {
if lower_bound <= expire_lower_bound {
continue;
}
}
let Some(prev_tw) = &mut prev_tw else {
prev_tw = Some((lower_bound, upper_bound));
continue;
};
let std_window_size = window_size.to_std().map_err(|e| {
InternalSnafu {
reason: e.to_string(),
}
.build()
})?;
// if cur.lower - prev.upper <= window_size * 2, merge
let prev_upper = prev_tw
.1
.unwrap_or(prev_tw.0.add_duration(std_window_size).context(TimeSnafu)?);
prev_tw.1 = Some(prev_upper);
let cur_upper = upper_bound.unwrap_or(
lower_bound
.add_duration(std_window_size)
.context(TimeSnafu)?,
);
if lower_bound
.sub(&prev_upper)
.map(|dist| dist <= window_size * Self::MERGE_DIST)
.unwrap_or(false)
{
prev_tw.1 = Some(cur_upper);
} else {
new_windows.insert(prev_tw.0, prev_tw.1);
*prev_tw = (lower_bound, Some(cur_upper));
}
}
if let Some(prev_tw) = prev_tw {
new_windows.insert(prev_tw.0, prev_tw.1);
}
self.windows = new_windows;
Ok(())
}
}
impl RecordingRuleState {
pub fn new(query_ctx: QueryContextRef, shutdown_rx: oneshot::Receiver<()>) -> Self {
Self {
query_ctx,
last_update_time: Instant::now(),
last_query_duration: Duration::from_secs(0),
dirty_time_windows: Default::default(),
exec_state: ExecState::Idle,
shutdown_rx,
}
}
/// called after last query is done
/// `is_succ` indicate whether the last query is successful
pub fn after_query_exec(&mut self, elapsed: Duration, _is_succ: bool) {
self.exec_state = ExecState::Idle;
self.last_query_duration = elapsed;
self.last_update_time = Instant::now();
}
/// wait for at least `last_query_duration`, at most `max_timeout` to start next query
pub fn get_next_start_query_time(&self, max_timeout: Option<Duration>) -> Instant {
let next_duration = max_timeout
.unwrap_or(self.last_query_duration)
.min(self.last_query_duration);
let next_duration = next_duration.max(MIN_REFRESH_DURATION);
self.last_update_time + next_duration
}
}
#[derive(Debug, Clone)]
enum ExecState {
Idle,
Executing,
}
#[cfg(test)]
mod test {
use pretty_assertions::assert_eq;
use super::*;
#[test]
fn test_merge_dirty_time_windows() {
let mut dirty = DirtyTimeWindows::default();
dirty.add_lower_bounds(
vec![
Timestamp::new_second(0),
Timestamp::new_second((1 + DirtyTimeWindows::MERGE_DIST as i64) * 5 * 60),
]
.into_iter(),
);
dirty
.merge_dirty_time_windows(chrono::Duration::seconds(5 * 60), None)
.unwrap();
// just enough to merge
assert_eq!(
dirty.windows,
BTreeMap::from([(
Timestamp::new_second(0),
Some(Timestamp::new_second(
(2 + DirtyTimeWindows::MERGE_DIST as i64) * 5 * 60
))
)])
);
// separate time window
let mut dirty = DirtyTimeWindows::default();
dirty.add_lower_bounds(
vec![
Timestamp::new_second(0),
Timestamp::new_second((2 + DirtyTimeWindows::MERGE_DIST as i64) * 5 * 60),
]
.into_iter(),
);
dirty
.merge_dirty_time_windows(chrono::Duration::seconds(5 * 60), None)
.unwrap();
// just enough to merge
assert_eq!(
BTreeMap::from([
(
Timestamp::new_second(0),
Some(Timestamp::new_second(5 * 60))
),
(
Timestamp::new_second((2 + DirtyTimeWindows::MERGE_DIST as i64) * 5 * 60),
Some(Timestamp::new_second(
(3 + DirtyTimeWindows::MERGE_DIST as i64) * 5 * 60
))
)
]),
dirty.windows
);
// overlapping
let mut dirty = DirtyTimeWindows::default();
dirty.add_lower_bounds(
vec![
Timestamp::new_second(0),
Timestamp::new_second((DirtyTimeWindows::MERGE_DIST as i64) * 5 * 60),
]
.into_iter(),
);
dirty
.merge_dirty_time_windows(chrono::Duration::seconds(5 * 60), None)
.unwrap();
// just enough to merge
assert_eq!(
BTreeMap::from([(
Timestamp::new_second(0),
Some(Timestamp::new_second(
(1 + DirtyTimeWindows::MERGE_DIST as i64) * 5 * 60
))
),]),
dirty.windows
);
// expired
let mut dirty = DirtyTimeWindows::default();
dirty.add_lower_bounds(
vec![
Timestamp::new_second(0),
Timestamp::new_second((DirtyTimeWindows::MERGE_DIST as i64) * 5 * 60),
]
.into_iter(),
);
dirty
.merge_dirty_time_windows(
chrono::Duration::seconds(5 * 60),
Some(Timestamp::new_second(
(DirtyTimeWindows::MERGE_DIST as i64) * 6 * 60,
)),
)
.unwrap();
// just enough to merge
assert_eq!(BTreeMap::from([]), dirty.windows);
}
}

View File

@@ -0,0 +1,163 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Frontend client to run flow as recording rule which is time-window-aware normal query triggered every tick set by user
use std::sync::Arc;
use client::{Client, Database, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_error::ext::BoxedError;
use common_grpc::channel_manager::{ChannelConfig, ChannelManager};
use common_meta::cluster::{NodeInfo, NodeInfoKey, Role};
use common_meta::peer::Peer;
use common_meta::rpc::store::RangeRequest;
use meta_client::client::MetaClient;
use snafu::ResultExt;
use crate::error::{ExternalSnafu, UnexpectedSnafu};
use crate::recording_rules::engine::DEFAULT_RULE_ENGINE_QUERY_TIMEOUT;
use crate::Error;
fn default_channel_mgr() -> ChannelManager {
let cfg = ChannelConfig::new().timeout(DEFAULT_RULE_ENGINE_QUERY_TIMEOUT);
ChannelManager::with_config(cfg)
}
fn client_from_urls(addrs: Vec<String>) -> Client {
Client::with_manager_and_urls(default_channel_mgr(), addrs)
}
/// A simple frontend client able to execute sql using grpc protocol
#[derive(Debug)]
pub enum FrontendClient {
Distributed {
meta_client: Arc<MetaClient>,
channel_mgr: ChannelManager,
},
Standalone {
/// for the sake of simplicity still use grpc even in standalone mode
/// notice the client here should all be lazy, so that can wait after frontend is booted then make conn
/// TODO(discord9): not use grpc under standalone mode
database_client: DatabaseWithPeer,
},
}
#[derive(Debug, Clone)]
pub struct DatabaseWithPeer {
pub database: Database,
pub peer: Peer,
}
impl DatabaseWithPeer {
fn new(database: Database, peer: Peer) -> Self {
Self { database, peer }
}
}
impl FrontendClient {
pub fn from_meta_client(meta_client: Arc<MetaClient>) -> Self {
Self::Distributed {
meta_client,
channel_mgr: default_channel_mgr(),
}
}
pub fn from_static_grpc_addr(addr: String) -> Self {
let peer = Peer {
id: 0,
addr: addr.clone(),
};
let mgr = default_channel_mgr();
let client = Client::with_manager_and_urls(mgr.clone(), vec![addr]);
let database = Database::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, client);
Self::Standalone {
database_client: DatabaseWithPeer::new(database, peer),
}
}
}
impl FrontendClient {
async fn scan_for_frontend(&self) -> Result<Vec<(NodeInfoKey, NodeInfo)>, Error> {
let Self::Distributed { meta_client, .. } = self else {
return Ok(vec![]);
};
let cluster_client = meta_client
.cluster_client()
.map_err(BoxedError::new)
.context(ExternalSnafu)?;
let cluster_id = meta_client.id().0;
let prefix = NodeInfoKey::key_prefix_with_role(cluster_id, Role::Frontend);
let req = RangeRequest::new().with_prefix(prefix);
let resp = cluster_client
.range(req)
.await
.map_err(BoxedError::new)
.context(ExternalSnafu)?;
let mut res = Vec::with_capacity(resp.kvs.len());
for kv in resp.kvs {
let key = NodeInfoKey::try_from(kv.key)
.map_err(BoxedError::new)
.context(ExternalSnafu)?;
let val = NodeInfo::try_from(kv.value)
.map_err(BoxedError::new)
.context(ExternalSnafu)?;
res.push((key, val));
}
Ok(res)
}
/// Get the database with max `last_activity_ts`
async fn get_last_active_frontend(&self) -> Result<DatabaseWithPeer, Error> {
if let Self::Standalone { database_client } = self {
return Ok(database_client.clone());
}
match &self {
Self::Standalone { database_client } => Ok(database_client.clone()),
Self::Distributed {
meta_client: _,
channel_mgr,
} => {
let frontends = self.scan_for_frontend().await?;
let mut last_activity_ts = i64::MIN;
let mut peer = None;
for (_key, val) in frontends.iter() {
if val.last_activity_ts > last_activity_ts {
last_activity_ts = val.last_activity_ts;
peer = Some(val.peer.clone());
}
}
let Some(peer) = peer else {
UnexpectedSnafu {
reason: format!("No frontend available: {:?}", frontends),
}
.fail()?
};
let client =
Client::with_manager_and_urls(channel_mgr.clone(), vec![peer.addr.clone()]);
let database = Database::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, client);
Ok(DatabaseWithPeer::new(database, peer))
}
}
}
/// Get a database client, and possibly update it before returning.
pub async fn get_database_client(&self) -> Result<DatabaseWithPeer, Error> {
match self {
Self::Standalone { database_client } => Ok(database_client.clone()),
Self::Distributed { meta_client: _, .. } => self.get_last_active_frontend().await,
}
}
}

View File

@@ -57,6 +57,7 @@ use crate::error::{
};
use crate::heartbeat::HeartbeatTask;
use crate::metrics::{METRIC_FLOW_PROCESSING_TIME, METRIC_FLOW_ROWS};
use crate::recording_rules::{FrontendClient, RecordingRuleEngine};
use crate::transform::register_function_to_query_engine;
use crate::utils::{SizeReportSender, StateReportHandler};
use crate::{Error, FlowWorkerManager, FlownodeOptions};
@@ -245,6 +246,7 @@ impl FlownodeInstance {
self.server.shutdown().await.context(ShutdownServerSnafu)?;
if let Some(task) = &self.heartbeat_task {
info!("Close heartbeat task for flownode");
task.shutdown();
}
@@ -271,6 +273,8 @@ pub struct FlownodeBuilder {
heartbeat_task: Option<HeartbeatTask>,
/// receive a oneshot sender to send state size report
state_report_handler: Option<StateReportHandler>,
/// Client to send sql to frontend
frontend_client: Arc<FrontendClient>,
}
impl FlownodeBuilder {
@@ -281,6 +285,7 @@ impl FlownodeBuilder {
table_meta: TableMetadataManagerRef,
catalog_manager: CatalogManagerRef,
flow_metadata_manager: FlowMetadataManagerRef,
frontend_client: Arc<FrontendClient>,
) -> Self {
Self {
opts,
@@ -290,6 +295,7 @@ impl FlownodeBuilder {
flow_metadata_manager,
heartbeat_task: None,
state_report_handler: None,
frontend_client,
}
}
@@ -447,7 +453,14 @@ impl FlownodeBuilder {
let node_id = self.opts.node_id.map(|id| id as u32);
let mut man = FlowWorkerManager::new(node_id, query_engine, table_meta);
let rule_engine = RecordingRuleEngine::new(
self.frontend_client.clone(),
query_engine.clone(),
self.flow_metadata_manager.clone(),
table_meta.clone(),
);
let mut man = FlowWorkerManager::new(node_id, query_engine, table_meta, rule_engine);
for worker_id in 0..num_workers {
let (tx, rx) = oneshot::channel();

View File

@@ -86,7 +86,8 @@ pub fn create_test_query_engine() -> Arc<dyn QueryEngine> {
let schema = vec![
datatypes::schema::ColumnSchema::new("number", CDT::uint32_datatype(), false),
datatypes::schema::ColumnSchema::new("ts", CDT::timestamp_millisecond_datatype(), false),
datatypes::schema::ColumnSchema::new("ts", CDT::timestamp_millisecond_datatype(), false)
.with_time_index(true),
];
let mut columns = vec![];
let numbers = (1..=10).collect_vec();
@@ -114,6 +115,37 @@ pub fn create_test_query_engine() -> Arc<dyn QueryEngine> {
};
catalog_list.register_table_sync(req_with_ts).unwrap();
let schema = vec![
datatypes::schema::ColumnSchema::new("NUMBER", CDT::uint32_datatype(), false),
datatypes::schema::ColumnSchema::new("ts", CDT::timestamp_millisecond_datatype(), false)
.with_time_index(true),
];
let mut columns = vec![];
let numbers = (1..=10).collect_vec();
let column: VectorRef = Arc::new(<u32 as Scalar>::VectorType::from_vec(numbers));
columns.push(column);
let ts = (1..=10).collect_vec();
let mut builder = TimestampMillisecondVectorBuilder::with_capacity(10);
ts.into_iter()
.map(|v| builder.push(Some(TimestampMillisecond::new(v))))
.count();
let column: VectorRef = builder.to_vector_cloned();
columns.push(column);
let schema = Arc::new(Schema::new(schema));
let recordbatch = common_recordbatch::RecordBatch::new(schema, columns).unwrap();
let table = MemTable::table("UPPERCASE_NUMBERS_WITH_TS", recordbatch);
let req_with_ts = RegisterTableRequest {
catalog: DEFAULT_CATALOG_NAME.to_string(),
schema: DEFAULT_SCHEMA_NAME.to_string(),
table_name: "UPPERCASE_NUMBERS_WITH_TS".to_string(),
table_id: 1025,
table,
};
catalog_list.register_table_sync(req_with_ts).unwrap();
let factory = query::QueryEngineFactory::new(catalog_list, None, None, None, None, false);
let engine = factory.query_engine();

View File

@@ -112,6 +112,7 @@ impl MetaClientBuilder {
.enable_store()
.enable_heartbeat()
.enable_procedure()
.enable_access_cluster_info()
}
pub fn enable_heartbeat(self) -> Self {

View File

@@ -7,6 +7,7 @@ license.workspace = true
[features]
mock = []
pg_kvbackend = ["dep:tokio-postgres", "common-meta/pg_kvbackend"]
mysql_kvbackend = [] # placeholder features so CI can compile
[lints]
workspace = true

View File

@@ -68,6 +68,7 @@ pub struct Inserter {
catalog_manager: CatalogManagerRef,
partition_manager: PartitionRuleManagerRef,
node_manager: NodeManagerRef,
#[allow(unused)]
table_flownode_set_cache: TableFlownodeSetCacheRef,
}
@@ -338,6 +339,8 @@ impl Inserter {
instant_requests,
} = requests;
// TODO(discord9): mirror some
// Mirror requests for source table to flownode asynchronously
let flow_mirror_task = FlowMirrorTask::new(
&self.table_flownode_set_cache,
@@ -817,12 +820,14 @@ struct CreateAlterTableResult {
table_infos: HashMap<TableId, Arc<TableInfo>>,
}
#[allow(unused)]
struct FlowMirrorTask {
requests: HashMap<Peer, RegionInsertRequests>,
num_rows: usize,
}
impl FlowMirrorTask {
#[allow(unused)]
async fn new(
cache: &TableFlownodeSetCacheRef,
requests: impl Iterator<Item = &RegionInsertRequest>,
@@ -896,6 +901,7 @@ impl FlowMirrorTask {
})
}
#[allow(unused)]
fn detach(self, node_manager: NodeManagerRef) -> Result<()> {
crate::metrics::DIST_MIRROR_PENDING_ROW_COUNT.add(self.num_rows as i64);
for (peer, inserts) in self.requests {

View File

@@ -40,7 +40,7 @@ use common_procedure::options::ProcedureConfig;
use common_procedure::ProcedureManagerRef;
use common_wal::config::{DatanodeWalConfig, MetasrvWalConfig};
use datanode::datanode::DatanodeBuilder;
use flow::FlownodeBuilder;
use flow::{FlownodeBuilder, FrontendClient};
use frontend::instance::builder::FrontendBuilder;
use frontend::instance::{FrontendInstance, Instance, StandaloneDatanodeManager};
use meta_srv::metasrv::{FLOW_ID_SEQ, TABLE_ID_SEQ};
@@ -164,12 +164,15 @@ impl GreptimeDbStandaloneBuilder {
Some(procedure_manager.clone()),
);
let fe_server_addr = opts.frontend_options().grpc.bind_addr.clone();
let frontend_client = FrontendClient::from_static_grpc_addr(fe_server_addr);
let flow_builder = FlownodeBuilder::new(
Default::default(),
plugins.clone(),
table_metadata_manager.clone(),
catalog_manager.clone(),
flow_metadata_manager.clone(),
Arc::new(frontend_client),
);
let flownode = Arc::new(flow_builder.build().await.unwrap());