mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2025-12-22 22:20:02 +00:00
Compare commits
8 Commits
feat/bulk-
...
wyze_recor
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
1540d9b815 | ||
|
|
ff0828230e | ||
|
|
d088dc2798 | ||
|
|
901488f6b1 | ||
|
|
ab8bac8b2d | ||
|
|
d117153742 | ||
|
|
e6e6d2a7b9 | ||
|
|
687afa1f89 |
1
Cargo.lock
generated
1
Cargo.lock
generated
@@ -4165,6 +4165,7 @@ dependencies = [
|
||||
"bytes",
|
||||
"cache",
|
||||
"catalog",
|
||||
"chrono",
|
||||
"client",
|
||||
"common-base",
|
||||
"common-catalog",
|
||||
|
||||
@@ -16,7 +16,6 @@
|
||||
|
||||
mod client;
|
||||
pub mod client_manager;
|
||||
#[cfg(feature = "testing")]
|
||||
mod database;
|
||||
pub mod error;
|
||||
pub mod flow;
|
||||
@@ -34,7 +33,6 @@ pub use common_recordbatch::{RecordBatches, SendableRecordBatchStream};
|
||||
use snafu::OptionExt;
|
||||
|
||||
pub use self::client::Client;
|
||||
#[cfg(feature = "testing")]
|
||||
pub use self::database::Database;
|
||||
pub use self::error::{Error, Result};
|
||||
use crate::error::{IllegalDatabaseResponseSnafu, ServerSnafu};
|
||||
|
||||
@@ -32,7 +32,7 @@ use common_meta::key::TableMetadataManager;
|
||||
use common_telemetry::info;
|
||||
use common_telemetry::logging::TracingOptions;
|
||||
use common_version::{short_version, version};
|
||||
use flow::{FlownodeBuilder, FlownodeInstance, FrontendInvoker};
|
||||
use flow::{FlownodeBuilder, FlownodeInstance, FrontendClient, FrontendInvoker};
|
||||
use meta_client::{MetaClientOptions, MetaClientType};
|
||||
use servers::Mode;
|
||||
use snafu::{OptionExt, ResultExt};
|
||||
@@ -317,6 +317,8 @@ impl StartCommand {
|
||||
Arc::new(executor),
|
||||
);
|
||||
|
||||
let frontend_client = FrontendClient::from_meta_client(meta_client.clone());
|
||||
|
||||
let flow_metadata_manager = Arc::new(FlowMetadataManager::new(cached_meta_backend.clone()));
|
||||
let flownode_builder = FlownodeBuilder::new(
|
||||
opts,
|
||||
@@ -324,6 +326,7 @@ impl StartCommand {
|
||||
table_metadata_manager,
|
||||
catalog_manager.clone(),
|
||||
flow_metadata_manager,
|
||||
Arc::new(frontend_client),
|
||||
)
|
||||
.with_heartbeat_task(heartbeat_task);
|
||||
|
||||
|
||||
@@ -54,7 +54,10 @@ use datanode::config::{DatanodeOptions, ProcedureConfig, RegionEngineConfig, Sto
|
||||
use datanode::datanode::{Datanode, DatanodeBuilder};
|
||||
use datanode::region_server::RegionServer;
|
||||
use file_engine::config::EngineConfig as FileEngineConfig;
|
||||
use flow::{FlowConfig, FlowWorkerManager, FlownodeBuilder, FlownodeOptions, FrontendInvoker};
|
||||
use flow::{
|
||||
FlowConfig, FlowWorkerManager, FlownodeBuilder, FlownodeOptions, FrontendClient,
|
||||
FrontendInvoker,
|
||||
};
|
||||
use frontend::frontend::FrontendOptions;
|
||||
use frontend::instance::builder::FrontendBuilder;
|
||||
use frontend::instance::{FrontendInstance, Instance as FeInstance, StandaloneDatanodeManager};
|
||||
@@ -533,12 +536,16 @@ impl StartCommand {
|
||||
flow: opts.flow.clone(),
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
let fe_server_addr = fe_opts.grpc.bind_addr.clone();
|
||||
let frontend_client = FrontendClient::from_static_grpc_addr(fe_server_addr);
|
||||
let flow_builder = FlownodeBuilder::new(
|
||||
flownode_options,
|
||||
plugins.clone(),
|
||||
table_metadata_manager.clone(),
|
||||
catalog_manager.clone(),
|
||||
flow_metadata_manager.clone(),
|
||||
Arc::new(frontend_client),
|
||||
);
|
||||
let flownode = Arc::new(
|
||||
flow_builder
|
||||
|
||||
@@ -343,6 +343,7 @@ pub enum FlowType {
|
||||
impl FlowType {
|
||||
pub const RECORDING_RULE: &str = "recording_rule";
|
||||
pub const STREAMING: &str = "streaming";
|
||||
pub const FLOW_TYPE_KEY: &str = "flow_type";
|
||||
}
|
||||
|
||||
impl Default for FlowType {
|
||||
@@ -398,7 +399,8 @@ impl From<&CreateFlowData> for CreateRequest {
|
||||
};
|
||||
|
||||
let flow_type = value.flow_type.unwrap_or_default().to_string();
|
||||
req.flow_options.insert("flow_type".to_string(), flow_type);
|
||||
req.flow_options
|
||||
.insert(FlowType::FLOW_TYPE_KEY.to_string(), flow_type);
|
||||
req
|
||||
}
|
||||
}
|
||||
@@ -430,7 +432,7 @@ impl From<&CreateFlowData> for (FlowInfoValue, Vec<(FlowPartitionId, FlowRouteVa
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let flow_type = value.flow_type.unwrap_or_default().to_string();
|
||||
options.insert("flow_type".to_string(), flow_type);
|
||||
options.insert(FlowType::FLOW_TYPE_KEY.to_string(), flow_type);
|
||||
|
||||
let flow_info = FlowInfoValue {
|
||||
source_table_ids: value.source_table_ids.clone(),
|
||||
|
||||
@@ -16,6 +16,7 @@ async-trait.workspace = true
|
||||
bytes.workspace = true
|
||||
cache.workspace = true
|
||||
catalog.workspace = true
|
||||
chrono.workspace = true
|
||||
client.workspace = true
|
||||
common-base.workspace = true
|
||||
common-config.workspace = true
|
||||
|
||||
@@ -49,12 +49,13 @@ pub(crate) use crate::adapter::node_context::FlownodeContext;
|
||||
use crate::adapter::refill::RefillTask;
|
||||
use crate::adapter::table_source::ManagedTableSource;
|
||||
use crate::adapter::util::relation_desc_to_column_schemas_with_fallback;
|
||||
pub(crate) use crate::adapter::worker::{create_worker, Worker, WorkerHandle};
|
||||
pub(crate) use crate::adapter::worker::{create_worker, WorkerHandle};
|
||||
use crate::compute::ErrCollector;
|
||||
use crate::df_optimizer::sql_to_flow_plan;
|
||||
use crate::error::{EvalSnafu, ExternalSnafu, InternalSnafu, InvalidQuerySnafu, UnexpectedSnafu};
|
||||
use crate::expr::Batch;
|
||||
use crate::metrics::{METRIC_FLOW_INSERT_ELAPSED, METRIC_FLOW_ROWS, METRIC_FLOW_RUN_INTERVAL_MS};
|
||||
use crate::recording_rules::RecordingRuleEngine;
|
||||
use crate::repr::{self, DiffRow, RelationDesc, Row, BATCH_SIZE};
|
||||
|
||||
mod flownode_impl;
|
||||
@@ -63,7 +64,7 @@ pub(crate) mod refill;
|
||||
mod stat;
|
||||
#[cfg(test)]
|
||||
mod tests;
|
||||
mod util;
|
||||
pub(crate) mod util;
|
||||
mod worker;
|
||||
|
||||
pub(crate) mod node_context;
|
||||
@@ -171,6 +172,8 @@ pub struct FlowWorkerManager {
|
||||
flush_lock: RwLock<()>,
|
||||
/// receive a oneshot sender to send state size report
|
||||
state_report_handler: RwLock<Option<StateReportHandler>>,
|
||||
/// engine for recording rule
|
||||
rule_engine: RecordingRuleEngine,
|
||||
}
|
||||
|
||||
/// Building FlownodeManager
|
||||
@@ -185,6 +188,7 @@ impl FlowWorkerManager {
|
||||
node_id: Option<u32>,
|
||||
query_engine: Arc<dyn QueryEngine>,
|
||||
table_meta: TableMetadataManagerRef,
|
||||
rule_engine: RecordingRuleEngine,
|
||||
) -> Self {
|
||||
let srv_map = ManagedTableSource::new(
|
||||
table_meta.table_info_manager().clone(),
|
||||
@@ -207,6 +211,7 @@ impl FlowWorkerManager {
|
||||
node_id,
|
||||
flush_lock: RwLock::new(()),
|
||||
state_report_handler: RwLock::new(None),
|
||||
rule_engine,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -215,25 +220,6 @@ impl FlowWorkerManager {
|
||||
self
|
||||
}
|
||||
|
||||
/// Create a flownode manager with one worker
|
||||
pub fn new_with_workers<'s>(
|
||||
node_id: Option<u32>,
|
||||
query_engine: Arc<dyn QueryEngine>,
|
||||
table_meta: TableMetadataManagerRef,
|
||||
num_workers: usize,
|
||||
) -> (Self, Vec<Worker<'s>>) {
|
||||
let mut zelf = Self::new(node_id, query_engine, table_meta);
|
||||
|
||||
let workers: Vec<_> = (0..num_workers)
|
||||
.map(|_| {
|
||||
let (handle, worker) = create_worker();
|
||||
zelf.add_worker_handle(handle);
|
||||
worker
|
||||
})
|
||||
.collect();
|
||||
(zelf, workers)
|
||||
}
|
||||
|
||||
/// add a worker handler to manager, meaning this corresponding worker is under it's manage
|
||||
pub fn add_worker_handle(&mut self, handle: WorkerHandle) {
|
||||
self.worker_handles.push(handle);
|
||||
@@ -751,7 +737,11 @@ pub struct CreateFlowArgs {
|
||||
/// Create&Remove flow
|
||||
impl FlowWorkerManager {
|
||||
/// remove a flow by it's id
|
||||
#[allow(unreachable_code)]
|
||||
pub async fn remove_flow(&self, flow_id: FlowId) -> Result<(), Error> {
|
||||
// TODO(discord9): reroute some back to streaming engine later
|
||||
return self.rule_engine.remove_flow(flow_id).await;
|
||||
|
||||
for handle in self.worker_handles.iter() {
|
||||
if handle.contains_flow(flow_id).await? {
|
||||
handle.remove_flow(flow_id).await?;
|
||||
@@ -767,8 +757,10 @@ impl FlowWorkerManager {
|
||||
/// steps to create task:
|
||||
/// 1. parse query into typed plan(and optional parse expire_after expr)
|
||||
/// 2. render source/sink with output table id and used input table id
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
#[allow(clippy::too_many_arguments, unreachable_code)]
|
||||
pub async fn create_flow(&self, args: CreateFlowArgs) -> Result<Option<FlowId>, Error> {
|
||||
// TODO(discord9): reroute some back to streaming engine later
|
||||
return self.rule_engine.create_flow(args).await;
|
||||
let CreateFlowArgs {
|
||||
flow_id,
|
||||
sink_table_name,
|
||||
|
||||
@@ -153,7 +153,13 @@ impl Flownode for FlowWorkerManager {
|
||||
}
|
||||
}
|
||||
|
||||
#[allow(unreachable_code, unused)]
|
||||
async fn handle_inserts(&self, request: InsertRequests) -> Result<FlowResponse> {
|
||||
return self
|
||||
.rule_engine
|
||||
.handle_inserts(request)
|
||||
.await
|
||||
.map_err(to_meta_err(snafu::location!()));
|
||||
// using try_read to ensure two things:
|
||||
// 1. flush wouldn't happen until inserts before it is inserted
|
||||
// 2. inserts happening concurrently with flush wouldn't be block by flush
|
||||
@@ -206,15 +212,15 @@ impl Flownode for FlowWorkerManager {
|
||||
.collect_vec();
|
||||
let table_col_names = table_schema.relation_desc.names;
|
||||
let table_col_names = table_col_names
|
||||
.iter().enumerate()
|
||||
.map(|(idx,name)| match name {
|
||||
Some(name) => Ok(name.clone()),
|
||||
None => InternalSnafu {
|
||||
reason: format!("Expect column {idx} of table id={table_id} to have name in table schema, found None"),
|
||||
}
|
||||
.fail().map_err(BoxedError::new).context(ExternalSnafu),
|
||||
})
|
||||
.collect::<Result<Vec<_>>>()?;
|
||||
.iter().enumerate()
|
||||
.map(|(idx,name)| match name {
|
||||
Some(name) => Ok(name.clone()),
|
||||
None => InternalSnafu {
|
||||
reason: format!("Expect column {idx} of table id={table_id} to have name in table schema, found None"),
|
||||
}
|
||||
.fail().map_err(BoxedError::new).context(ExternalSnafu),
|
||||
})
|
||||
.collect::<Result<Vec<_>>>()?;
|
||||
let name_to_col = HashMap::<_, _>::from_iter(
|
||||
insert_schema
|
||||
.iter()
|
||||
|
||||
@@ -12,6 +12,8 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
//! Some utility functions
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
use api::helper::ColumnDataTypeWrapper;
|
||||
|
||||
@@ -16,6 +16,7 @@
|
||||
|
||||
use std::any::Any;
|
||||
|
||||
use arrow_schema::ArrowError;
|
||||
use common_error::ext::BoxedError;
|
||||
use common_error::{define_into_tonic_status, from_err_code_msg_to_header};
|
||||
use common_macro::stack_trace_debug;
|
||||
@@ -53,6 +54,13 @@ pub enum Error {
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Time error"))]
|
||||
Time {
|
||||
source: common_time::error::Error,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("External error"))]
|
||||
External {
|
||||
source: BoxedError,
|
||||
@@ -156,6 +164,15 @@ pub enum Error {
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Arrow error: {raw:?} in context: {context}"))]
|
||||
Arrow {
|
||||
#[snafu(source)]
|
||||
raw: ArrowError,
|
||||
context: String,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Datafusion error: {raw:?} in context: {context}"))]
|
||||
Datafusion {
|
||||
#[snafu(source)]
|
||||
@@ -230,6 +247,7 @@ impl ErrorExt for Error {
|
||||
match self {
|
||||
Self::Eval { .. }
|
||||
| Self::JoinTask { .. }
|
||||
| Self::Arrow { .. }
|
||||
| Self::Datafusion { .. }
|
||||
| Self::InsertIntoFlow { .. } => StatusCode::Internal,
|
||||
Self::FlowAlreadyExist { .. } => StatusCode::TableAlreadyExists,
|
||||
@@ -238,7 +256,9 @@ impl ErrorExt for Error {
|
||||
| Self::FlowNotFound { .. }
|
||||
| Self::ListFlows { .. } => StatusCode::TableNotFound,
|
||||
Self::Plan { .. } | Self::Datatypes { .. } => StatusCode::PlanQuery,
|
||||
Self::InvalidQuery { .. } | Self::CreateFlow { .. } => StatusCode::EngineExecuteQuery,
|
||||
Self::InvalidQuery { .. } | Self::CreateFlow { .. } | Self::Time { .. } => {
|
||||
StatusCode::EngineExecuteQuery
|
||||
}
|
||||
Self::Unexpected { .. } => StatusCode::Unexpected,
|
||||
Self::NotImplemented { .. } | Self::UnsupportedTemporalFilter { .. } => {
|
||||
StatusCode::Unsupported
|
||||
|
||||
@@ -238,6 +238,7 @@ mod test {
|
||||
|
||||
for (sql, current, expected) in &testcases {
|
||||
let plan = sql_to_substrait(engine.clone(), sql).await;
|
||||
|
||||
let mut ctx = create_test_ctx();
|
||||
let flow_plan = TypedPlan::from_substrait_plan(&mut ctx, &plan)
|
||||
.await
|
||||
|
||||
@@ -130,13 +130,6 @@ impl HeartbeatTask {
|
||||
|
||||
pub fn shutdown(&self) {
|
||||
info!("Close heartbeat task for flownode");
|
||||
if self
|
||||
.running
|
||||
.compare_exchange(true, false, Ordering::AcqRel, Ordering::Acquire)
|
||||
.is_err()
|
||||
{
|
||||
warn!("Call close heartbeat task multiple times");
|
||||
}
|
||||
}
|
||||
|
||||
fn new_heartbeat_request(
|
||||
|
||||
@@ -33,6 +33,7 @@ mod expr;
|
||||
pub mod heartbeat;
|
||||
mod metrics;
|
||||
mod plan;
|
||||
mod recording_rules;
|
||||
mod repr;
|
||||
mod server;
|
||||
mod transform;
|
||||
@@ -43,4 +44,5 @@ mod test_utils;
|
||||
|
||||
pub use adapter::{FlowConfig, FlowWorkerManager, FlowWorkerManagerRef, FlownodeOptions};
|
||||
pub use error::{Error, Result};
|
||||
pub use recording_rules::FrontendClient;
|
||||
pub use server::{FlownodeBuilder, FlownodeInstance, FlownodeServer, FrontendInvoker};
|
||||
|
||||
@@ -28,6 +28,32 @@ lazy_static! {
|
||||
&["table_id"]
|
||||
)
|
||||
.unwrap();
|
||||
pub static ref METRIC_FLOW_RULE_ENGINE_QUERY_TIME: HistogramVec = register_histogram_vec!(
|
||||
"greptime_flow_rule_engine_query_time",
|
||||
"flow rule engine query time",
|
||||
&["flow_id"],
|
||||
vec![
|
||||
0.0,
|
||||
1.,
|
||||
3.,
|
||||
5.,
|
||||
10.,
|
||||
20.,
|
||||
30.,
|
||||
60.,
|
||||
2. * 60.,
|
||||
5. * 60.,
|
||||
10. * 60.
|
||||
]
|
||||
)
|
||||
.unwrap();
|
||||
pub static ref METRIC_FLOW_RULE_ENGINE_SLOW_QUERY: HistogramVec = register_histogram_vec!(
|
||||
"greptime_flow_rule_engine_slow_query",
|
||||
"flow rule engine slow query",
|
||||
&["flow_id", "sql", "peer"],
|
||||
vec![60., 2. * 60., 3. * 60., 5. * 60., 10. * 60.]
|
||||
)
|
||||
.unwrap();
|
||||
pub static ref METRIC_FLOW_RUN_INTERVAL_MS: IntGauge =
|
||||
register_int_gauge!("greptime_flow_run_interval_ms", "flow run interval in ms").unwrap();
|
||||
pub static ref METRIC_FLOW_ROWS: IntCounterVec = register_int_counter_vec!(
|
||||
|
||||
882
src/flow/src/recording_rules.rs
Normal file
882
src/flow/src/recording_rules.rs
Normal file
@@ -0,0 +1,882 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
//! Run flow as recording rule which is time-window-aware normal query triggered every tick set by user
|
||||
|
||||
mod engine;
|
||||
mod frontend_client;
|
||||
|
||||
use std::collections::BTreeSet;
|
||||
use std::sync::Arc;
|
||||
|
||||
use api::helper::pb_value_to_value_ref;
|
||||
use catalog::CatalogManagerRef;
|
||||
use common_error::ext::BoxedError;
|
||||
use common_recordbatch::DfRecordBatch;
|
||||
use common_telemetry::warn;
|
||||
use common_time::timestamp::TimeUnit;
|
||||
use common_time::Timestamp;
|
||||
use datafusion::error::Result as DfResult;
|
||||
use datafusion::logical_expr::Expr;
|
||||
use datafusion::physical_planner::{DefaultPhysicalPlanner, PhysicalPlanner};
|
||||
use datafusion::prelude::SessionContext;
|
||||
use datafusion::sql::unparser::Unparser;
|
||||
use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeRecursion, TreeNodeRewriter};
|
||||
use datafusion_common::{DFSchema, TableReference};
|
||||
use datafusion_expr::{ColumnarValue, LogicalPlan};
|
||||
use datafusion_physical_expr::PhysicalExprRef;
|
||||
use datatypes::prelude::{ConcreteDataType, DataType};
|
||||
use datatypes::scalars::ScalarVector;
|
||||
use datatypes::schema::TIME_INDEX_KEY;
|
||||
use datatypes::value::Value;
|
||||
use datatypes::vectors::{
|
||||
TimestampMicrosecondVector, TimestampMillisecondVector, TimestampNanosecondVector,
|
||||
TimestampSecondVector, Vector,
|
||||
};
|
||||
pub use engine::RecordingRuleEngine;
|
||||
pub use frontend_client::FrontendClient;
|
||||
use itertools::Itertools;
|
||||
use query::parser::QueryLanguageParser;
|
||||
use query::QueryEngineRef;
|
||||
use session::context::QueryContextRef;
|
||||
use snafu::{ensure, OptionExt, ResultExt};
|
||||
|
||||
use crate::adapter::util::from_proto_to_data_type;
|
||||
use crate::df_optimizer::apply_df_optimizer;
|
||||
use crate::error::{ArrowSnafu, DatafusionSnafu, DatatypesSnafu, ExternalSnafu, UnexpectedSnafu};
|
||||
use crate::expr::error::DataTypeSnafu;
|
||||
use crate::Error;
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct TimeWindowExpr {
|
||||
expr: PhysicalExprRef,
|
||||
column_name: String,
|
||||
}
|
||||
|
||||
impl TimeWindowExpr {
|
||||
pub fn new(expr: PhysicalExprRef, column_name: String) -> Self {
|
||||
Self { expr, column_name }
|
||||
}
|
||||
|
||||
pub fn from_expr(expr: &Expr, column_name: &str, df_schema: &DFSchema) -> Result<Self, Error> {
|
||||
let phy_planner = DefaultPhysicalPlanner::default();
|
||||
|
||||
let phy_expr: PhysicalExprRef = phy_planner
|
||||
.create_physical_expr(expr, df_schema, &SessionContext::new().state())
|
||||
.with_context(|_e| DatafusionSnafu {
|
||||
context: format!(
|
||||
"Failed to create physical expression from {expr:?} using {df_schema:?}"
|
||||
),
|
||||
})?;
|
||||
Ok(Self {
|
||||
expr: phy_expr,
|
||||
column_name: column_name.to_string(),
|
||||
})
|
||||
}
|
||||
|
||||
/// Find timestamps from rows using time window expr
|
||||
pub async fn handle_rows(
|
||||
&self,
|
||||
rows_list: Vec<api::v1::Rows>,
|
||||
) -> Result<BTreeSet<Timestamp>, Error> {
|
||||
let mut time_windows = BTreeSet::new();
|
||||
|
||||
for rows in rows_list {
|
||||
// pick the time index column and use it to eval on `self.expr`
|
||||
let ts_col_index = rows
|
||||
.schema
|
||||
.iter()
|
||||
.map(|col| col.column_name.clone())
|
||||
.position(|name| name == self.column_name);
|
||||
let Some(ts_col_index) = ts_col_index else {
|
||||
warn!("can't found time index column in schema: {:?}", rows.schema);
|
||||
continue;
|
||||
};
|
||||
let col_schema = &rows.schema[ts_col_index];
|
||||
let cdt = from_proto_to_data_type(col_schema)?;
|
||||
|
||||
let column_values = rows
|
||||
.rows
|
||||
.iter()
|
||||
.map(|row| &row.values[ts_col_index])
|
||||
.collect_vec();
|
||||
|
||||
let mut vector = cdt.create_mutable_vector(column_values.len());
|
||||
for value in column_values {
|
||||
let value = pb_value_to_value_ref(value, &None);
|
||||
vector.try_push_value_ref(value).context(DataTypeSnafu {
|
||||
msg: "Failed to convert rows to columns",
|
||||
})?;
|
||||
}
|
||||
let vector = vector.to_vector();
|
||||
|
||||
let df_schema = create_df_schema_for_ts_column(&self.column_name, cdt)?;
|
||||
|
||||
let rb =
|
||||
DfRecordBatch::try_new(df_schema.inner().clone(), vec![vector.to_arrow_array()])
|
||||
.with_context(|_e| ArrowSnafu {
|
||||
context: format!(
|
||||
"Failed to create record batch from {df_schema:?} and {vector:?}"
|
||||
),
|
||||
})?;
|
||||
|
||||
let eval_res = self.expr.evaluate(&rb).with_context(|_| DatafusionSnafu {
|
||||
context: format!(
|
||||
"Failed to evaluate physical expression {:?} on {rb:?}",
|
||||
self.expr
|
||||
),
|
||||
})?;
|
||||
|
||||
let res = columnar_to_ts_vector(&eval_res)?;
|
||||
|
||||
for ts in res.into_iter().flatten() {
|
||||
time_windows.insert(ts);
|
||||
}
|
||||
}
|
||||
|
||||
Ok(time_windows)
|
||||
}
|
||||
}
|
||||
|
||||
fn create_df_schema_for_ts_column(name: &str, cdt: ConcreteDataType) -> Result<DFSchema, Error> {
|
||||
let arrow_schema = Arc::new(arrow_schema::Schema::new(vec![arrow_schema::Field::new(
|
||||
name,
|
||||
cdt.as_arrow_type(),
|
||||
false,
|
||||
)]));
|
||||
|
||||
let df_schema = DFSchema::from_field_specific_qualified_schema(
|
||||
vec![Some(TableReference::bare("TimeIndexOnlyTable"))],
|
||||
&arrow_schema,
|
||||
)
|
||||
.with_context(|_e| DatafusionSnafu {
|
||||
context: format!("Failed to create DFSchema from arrow schema {arrow_schema:?}"),
|
||||
})?;
|
||||
|
||||
Ok(df_schema)
|
||||
}
|
||||
|
||||
/// Convert `ColumnarValue` to `Vec<Option<Timestamp>>`
|
||||
fn columnar_to_ts_vector(columnar: &ColumnarValue) -> Result<Vec<Option<Timestamp>>, Error> {
|
||||
let val = match columnar {
|
||||
datafusion_expr::ColumnarValue::Array(array) => {
|
||||
let ty = array.data_type();
|
||||
let ty = ConcreteDataType::from_arrow_type(ty);
|
||||
let time_unit = if let ConcreteDataType::Timestamp(ty) = ty {
|
||||
ty.unit()
|
||||
} else {
|
||||
return UnexpectedSnafu {
|
||||
reason: format!("Non-timestamp type: {ty:?}"),
|
||||
}
|
||||
.fail();
|
||||
};
|
||||
|
||||
match time_unit {
|
||||
TimeUnit::Second => TimestampSecondVector::try_from_arrow_array(array.clone())
|
||||
.with_context(|_| DatatypesSnafu {
|
||||
extra: format!("Failed to create vector from arrow array {array:?}"),
|
||||
})?
|
||||
.iter_data()
|
||||
.map(|d| d.map(|d| d.0))
|
||||
.collect_vec(),
|
||||
TimeUnit::Millisecond => {
|
||||
TimestampMillisecondVector::try_from_arrow_array(array.clone())
|
||||
.with_context(|_| DatatypesSnafu {
|
||||
extra: format!("Failed to create vector from arrow array {array:?}"),
|
||||
})?
|
||||
.iter_data()
|
||||
.map(|d| d.map(|d| d.0))
|
||||
.collect_vec()
|
||||
}
|
||||
TimeUnit::Microsecond => {
|
||||
TimestampMicrosecondVector::try_from_arrow_array(array.clone())
|
||||
.with_context(|_| DatatypesSnafu {
|
||||
extra: format!("Failed to create vector from arrow array {array:?}"),
|
||||
})?
|
||||
.iter_data()
|
||||
.map(|d| d.map(|d| d.0))
|
||||
.collect_vec()
|
||||
}
|
||||
TimeUnit::Nanosecond => {
|
||||
TimestampNanosecondVector::try_from_arrow_array(array.clone())
|
||||
.with_context(|_| DatatypesSnafu {
|
||||
extra: format!("Failed to create vector from arrow array {array:?}"),
|
||||
})?
|
||||
.iter_data()
|
||||
.map(|d| d.map(|d| d.0))
|
||||
.collect_vec()
|
||||
}
|
||||
}
|
||||
}
|
||||
datafusion_expr::ColumnarValue::Scalar(scalar) => {
|
||||
let value = Value::try_from(scalar.clone()).with_context(|_| DatatypesSnafu {
|
||||
extra: format!("Failed to convert scalar {scalar:?} to value"),
|
||||
})?;
|
||||
let ts = value.as_timestamp().context(UnexpectedSnafu {
|
||||
reason: format!("Expect Timestamp, found {:?}", value),
|
||||
})?;
|
||||
vec![Some(ts)]
|
||||
}
|
||||
};
|
||||
Ok(val)
|
||||
}
|
||||
|
||||
/// Convert sql to datafusion logical plan
|
||||
pub async fn sql_to_df_plan(
|
||||
query_ctx: QueryContextRef,
|
||||
engine: QueryEngineRef,
|
||||
sql: &str,
|
||||
optimize: bool,
|
||||
) -> Result<LogicalPlan, Error> {
|
||||
let stmt = QueryLanguageParser::parse_sql(sql, &query_ctx)
|
||||
.map_err(BoxedError::new)
|
||||
.context(ExternalSnafu)?;
|
||||
let plan = engine
|
||||
.planner()
|
||||
.plan(&stmt, query_ctx)
|
||||
.await
|
||||
.map_err(BoxedError::new)
|
||||
.context(ExternalSnafu)?;
|
||||
let plan = if optimize {
|
||||
apply_df_optimizer(plan).await?
|
||||
} else {
|
||||
plan
|
||||
};
|
||||
Ok(plan)
|
||||
}
|
||||
|
||||
/// Return (the column name of time index column, the time window expr, the expected time unit of time index column, the expr's schema for evaluating the time window)
|
||||
async fn find_time_window_expr(
|
||||
plan: &LogicalPlan,
|
||||
catalog_man: CatalogManagerRef,
|
||||
query_ctx: QueryContextRef,
|
||||
) -> Result<(String, Option<datafusion_expr::Expr>, TimeUnit, DFSchema), Error> {
|
||||
// TODO(discord9): find the expr that do time window
|
||||
|
||||
let mut table_name = None;
|
||||
|
||||
// first find the table source in the logical plan
|
||||
plan.apply(|plan| {
|
||||
let LogicalPlan::TableScan(table_scan) = plan else {
|
||||
return Ok(TreeNodeRecursion::Continue);
|
||||
};
|
||||
table_name = Some(table_scan.table_name.clone());
|
||||
Ok(TreeNodeRecursion::Stop)
|
||||
})
|
||||
.with_context(|_| DatafusionSnafu {
|
||||
context: format!("Can't find table source in plan {plan:?}"),
|
||||
})?;
|
||||
let Some(table_name) = table_name else {
|
||||
UnexpectedSnafu {
|
||||
reason: format!("Can't find table source in plan {plan:?}"),
|
||||
}
|
||||
.fail()?
|
||||
};
|
||||
|
||||
let current_schema = query_ctx.current_schema();
|
||||
|
||||
let catalog_name = table_name.catalog().unwrap_or(query_ctx.current_catalog());
|
||||
let schema_name = table_name.schema().unwrap_or(¤t_schema);
|
||||
let table_name = table_name.table();
|
||||
|
||||
let Some(table_ref) = catalog_man
|
||||
.table(catalog_name, schema_name, table_name, Some(&query_ctx))
|
||||
.await
|
||||
.map_err(BoxedError::new)
|
||||
.context(ExternalSnafu)?
|
||||
else {
|
||||
UnexpectedSnafu {
|
||||
reason: format!(
|
||||
"Can't find table {table_name:?} in catalog {catalog_name:?}/{schema_name:?}"
|
||||
),
|
||||
}
|
||||
.fail()?
|
||||
};
|
||||
|
||||
let schema = &table_ref.table_info().meta.schema;
|
||||
|
||||
let ts_index = schema.timestamp_column().context(UnexpectedSnafu {
|
||||
reason: format!("Can't find timestamp column in table {table_name:?}"),
|
||||
})?;
|
||||
|
||||
let ts_col_name = ts_index.name.clone();
|
||||
|
||||
let expected_time_unit = ts_index.data_type.as_timestamp().with_context(|| UnexpectedSnafu {
|
||||
reason: format!(
|
||||
"Expected timestamp column {ts_col_name:?} in table {table_name:?} to be timestamp, but got {ts_index:?}"
|
||||
),
|
||||
})?.unit();
|
||||
|
||||
let arrow_schema = Arc::new(arrow_schema::Schema::new(vec![arrow_schema::Field::new(
|
||||
ts_col_name.clone(),
|
||||
ts_index.data_type.as_arrow_type(),
|
||||
false,
|
||||
)]));
|
||||
|
||||
let df_schema = DFSchema::from_field_specific_qualified_schema(
|
||||
vec![Some(TableReference::bare(table_name))],
|
||||
&arrow_schema,
|
||||
)
|
||||
.with_context(|_e| DatafusionSnafu {
|
||||
context: format!("Failed to create DFSchema from arrow schema {arrow_schema:?}"),
|
||||
})?;
|
||||
|
||||
// find the time window expr which refers to the time index column
|
||||
let mut aggr_expr = None;
|
||||
let mut time_window_expr: Option<Expr> = None;
|
||||
|
||||
let find_inner_aggr_expr = |plan: &LogicalPlan| {
|
||||
if let LogicalPlan::Aggregate(aggregate) = plan {
|
||||
aggr_expr = Some(aggregate.clone());
|
||||
};
|
||||
|
||||
Ok(TreeNodeRecursion::Continue)
|
||||
};
|
||||
plan.apply(find_inner_aggr_expr)
|
||||
.with_context(|_| DatafusionSnafu {
|
||||
context: format!("Can't find aggr expr in plan {plan:?}"),
|
||||
})?;
|
||||
|
||||
if let Some(aggregate) = aggr_expr {
|
||||
for group_expr in &aggregate.group_expr {
|
||||
let refs = group_expr.column_refs();
|
||||
if refs.len() != 1 {
|
||||
continue;
|
||||
}
|
||||
let ref_col = refs.iter().next().unwrap();
|
||||
|
||||
let index = aggregate.input.schema().maybe_index_of_column(ref_col);
|
||||
let Some(index) = index else {
|
||||
continue;
|
||||
};
|
||||
let field = aggregate.input.schema().field(index);
|
||||
|
||||
let is_time_index = field.metadata().get(TIME_INDEX_KEY) == Some(&"true".to_string());
|
||||
|
||||
if is_time_index {
|
||||
let rewrite_column = group_expr.clone();
|
||||
let rewritten = rewrite_column
|
||||
.rewrite(&mut RewriteColumn {
|
||||
table_name: table_name.to_string(),
|
||||
})
|
||||
.with_context(|_| DatafusionSnafu {
|
||||
context: format!("Rewrite expr failed, expr={:?}", group_expr),
|
||||
})?
|
||||
.data;
|
||||
struct RewriteColumn {
|
||||
table_name: String,
|
||||
}
|
||||
|
||||
impl TreeNodeRewriter for RewriteColumn {
|
||||
type Node = Expr;
|
||||
fn f_down(&mut self, node: Self::Node) -> DfResult<Transformed<Self::Node>> {
|
||||
let Expr::Column(mut column) = node else {
|
||||
return Ok(Transformed::no(node));
|
||||
};
|
||||
|
||||
column.relation = Some(TableReference::bare(self.table_name.clone()));
|
||||
|
||||
Ok(Transformed::yes(Expr::Column(column)))
|
||||
}
|
||||
}
|
||||
|
||||
time_window_expr = Some(rewritten);
|
||||
break;
|
||||
}
|
||||
}
|
||||
Ok((ts_col_name, time_window_expr, expected_time_unit, df_schema))
|
||||
} else {
|
||||
// can't found time window expr, return None
|
||||
Ok((ts_col_name, None, expected_time_unit, df_schema))
|
||||
}
|
||||
}
|
||||
|
||||
/// Find nearest lower bound for time `current` in given `plan` for the time window expr.
|
||||
/// i.e. for time window expr being `date_bin(INTERVAL '5 minutes', ts) as time_window` and `current="2021-07-01 00:01:01.000"`,
|
||||
/// return `Some("2021-07-01 00:00:00.000")`
|
||||
/// if `plan` doesn't contain a `TIME INDEX` column, return `None`
|
||||
///
|
||||
/// Time window expr is a expr that:
|
||||
/// 1. ref only to a time index column
|
||||
/// 2. is monotonic increasing
|
||||
/// 3. show up in GROUP BY clause
|
||||
///
|
||||
/// note this plan should only contain one TableScan
|
||||
pub async fn find_plan_time_window_bound(
|
||||
plan: &LogicalPlan,
|
||||
current: Timestamp,
|
||||
query_ctx: QueryContextRef,
|
||||
engine: QueryEngineRef,
|
||||
) -> Result<(String, Option<Timestamp>, Option<Timestamp>), Error> {
|
||||
// TODO(discord9): find the expr that do time window
|
||||
let catalog_man = engine.engine_state().catalog_manager();
|
||||
|
||||
let (ts_col_name, time_window_expr, expected_time_unit, df_schema) =
|
||||
find_time_window_expr(plan, catalog_man.clone(), query_ctx).await?;
|
||||
// cast current to ts_index's type
|
||||
let new_current = current
|
||||
.convert_to(expected_time_unit)
|
||||
.with_context(|| UnexpectedSnafu {
|
||||
reason: format!("Failed to cast current timestamp {current:?} to {expected_time_unit}"),
|
||||
})?;
|
||||
|
||||
// if no time_window_expr is found, return None
|
||||
if let Some(time_window_expr) = time_window_expr {
|
||||
let lower_bound =
|
||||
find_expr_time_window_lower_bound(&time_window_expr, &df_schema, new_current)?;
|
||||
let upper_bound =
|
||||
find_expr_time_window_upper_bound(&time_window_expr, &df_schema, new_current)?;
|
||||
Ok((ts_col_name, lower_bound, upper_bound))
|
||||
} else {
|
||||
Ok((ts_col_name, None, None))
|
||||
}
|
||||
}
|
||||
|
||||
/// Find the lower bound of time window in given `expr` and `current` timestamp.
|
||||
///
|
||||
/// i.e. for `current="2021-07-01 00:01:01.000"` and `expr=date_bin(INTERVAL '5 minutes', ts) as time_window` and `ts_col=ts`,
|
||||
/// return `Some("2021-07-01 00:00:00.000")` since it's the lower bound
|
||||
/// of current time window given the current timestamp
|
||||
///
|
||||
/// if return None, meaning this time window have no lower bound
|
||||
fn find_expr_time_window_lower_bound(
|
||||
expr: &Expr,
|
||||
df_schema: &DFSchema,
|
||||
current: Timestamp,
|
||||
) -> Result<Option<Timestamp>, Error> {
|
||||
let phy_planner = DefaultPhysicalPlanner::default();
|
||||
|
||||
let phy_expr: PhysicalExprRef = phy_planner
|
||||
.create_physical_expr(expr, df_schema, &SessionContext::new().state())
|
||||
.with_context(|_e| DatafusionSnafu {
|
||||
context: format!(
|
||||
"Failed to create physical expression from {expr:?} using {df_schema:?}"
|
||||
),
|
||||
})?;
|
||||
|
||||
let cur_time_window = eval_ts_to_ts(&phy_expr, df_schema, current)?;
|
||||
let input_time_unit = cur_time_window.unit();
|
||||
Ok(cur_time_window.convert_to(input_time_unit))
|
||||
}
|
||||
|
||||
/// Find the upper bound for time window expression
|
||||
fn find_expr_time_window_upper_bound(
|
||||
expr: &Expr,
|
||||
df_schema: &DFSchema,
|
||||
current: Timestamp,
|
||||
) -> Result<Option<Timestamp>, Error> {
|
||||
use std::cmp::Ordering;
|
||||
|
||||
let phy_planner = DefaultPhysicalPlanner::default();
|
||||
|
||||
let phy_expr: PhysicalExprRef = phy_planner
|
||||
.create_physical_expr(expr, df_schema, &SessionContext::new().state())
|
||||
.with_context(|_e| DatafusionSnafu {
|
||||
context: format!(
|
||||
"Failed to create physical expression from {expr:?} using {df_schema:?}"
|
||||
),
|
||||
})?;
|
||||
|
||||
let cur_time_window = eval_ts_to_ts(&phy_expr, df_schema, current)?;
|
||||
|
||||
// search to find the lower bound
|
||||
let mut offset: i64 = 1;
|
||||
let mut lower_bound = Some(current);
|
||||
let upper_bound;
|
||||
// first expontial probe to found a range for binary search
|
||||
loop {
|
||||
let Some(next_val) = current.value().checked_add(offset) else {
|
||||
// no upper bound if overflow
|
||||
return Ok(None);
|
||||
};
|
||||
|
||||
let next_time_probe = common_time::Timestamp::new(next_val, current.unit());
|
||||
|
||||
let next_time_window = eval_ts_to_ts(&phy_expr, df_schema, next_time_probe)?;
|
||||
|
||||
match next_time_window.cmp(&cur_time_window) {
|
||||
Ordering::Less => {UnexpectedSnafu {
|
||||
reason: format!(
|
||||
"Unsupported time window expression, expect monotonic increasing for time window expression {expr:?}"
|
||||
),
|
||||
}
|
||||
.fail()?
|
||||
}
|
||||
Ordering::Equal => {
|
||||
lower_bound = Some(next_time_probe);
|
||||
}
|
||||
Ordering::Greater => {
|
||||
upper_bound = Some(next_time_probe);
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
let Some(new_offset) = offset.checked_mul(2) else {
|
||||
// no upper bound if overflow
|
||||
return Ok(None);
|
||||
};
|
||||
offset = new_offset;
|
||||
}
|
||||
|
||||
// binary search for the exact upper bound
|
||||
|
||||
ensure!(lower_bound.map(|v|v.unit())==upper_bound.map(|v|v.unit()), UnexpectedSnafu{
|
||||
reason: format!(" unit mismatch for time window expression {expr:?}, found {lower_bound:?} and {upper_bound:?}"),
|
||||
});
|
||||
|
||||
let output_unit = upper_bound
|
||||
.context(UnexpectedSnafu {
|
||||
reason: "should have lower bound",
|
||||
})?
|
||||
.unit();
|
||||
|
||||
let mut low = lower_bound
|
||||
.context(UnexpectedSnafu {
|
||||
reason: "should have lower bound",
|
||||
})?
|
||||
.value();
|
||||
let mut high = upper_bound
|
||||
.context(UnexpectedSnafu {
|
||||
reason: "should have upper bound",
|
||||
})?
|
||||
.value();
|
||||
while low < high {
|
||||
let mid = (low + high) / 2;
|
||||
let mid_probe = common_time::Timestamp::new(mid, output_unit);
|
||||
let mid_time_window = eval_ts_to_ts(&phy_expr, df_schema, mid_probe)?;
|
||||
|
||||
match mid_time_window.cmp(&cur_time_window) {
|
||||
Ordering::Less => UnexpectedSnafu {
|
||||
reason: format!("Binary search failed for time window expression {expr:?}"),
|
||||
}
|
||||
.fail()?,
|
||||
Ordering::Equal => low = mid + 1,
|
||||
Ordering::Greater => high = mid,
|
||||
}
|
||||
}
|
||||
|
||||
let final_upper_bound_for_time_window = common_time::Timestamp::new(high, output_unit);
|
||||
|
||||
Ok(Some(final_upper_bound_for_time_window))
|
||||
}
|
||||
|
||||
fn eval_ts_to_ts(
|
||||
phy: &PhysicalExprRef,
|
||||
df_schema: &DFSchema,
|
||||
input_value: Timestamp,
|
||||
) -> Result<Timestamp, Error> {
|
||||
let ts_vector = match input_value.unit() {
|
||||
TimeUnit::Second => {
|
||||
TimestampSecondVector::from_vec(vec![input_value.value()]).to_arrow_array()
|
||||
}
|
||||
TimeUnit::Millisecond => {
|
||||
TimestampMillisecondVector::from_vec(vec![input_value.value()]).to_arrow_array()
|
||||
}
|
||||
TimeUnit::Microsecond => {
|
||||
TimestampMicrosecondVector::from_vec(vec![input_value.value()]).to_arrow_array()
|
||||
}
|
||||
TimeUnit::Nanosecond => {
|
||||
TimestampNanosecondVector::from_vec(vec![input_value.value()]).to_arrow_array()
|
||||
}
|
||||
};
|
||||
|
||||
let rb = DfRecordBatch::try_new(df_schema.inner().clone(), vec![ts_vector.clone()])
|
||||
.with_context(|_| ArrowSnafu {
|
||||
context: format!("Failed to create record batch from {df_schema:?} and {ts_vector:?}"),
|
||||
})?;
|
||||
|
||||
let eval_res = phy.evaluate(&rb).with_context(|_| DatafusionSnafu {
|
||||
context: format!("Failed to evaluate physical expression {phy:?} on {rb:?}"),
|
||||
})?;
|
||||
|
||||
if let Some(Some(ts)) = columnar_to_ts_vector(&eval_res)?.first() {
|
||||
Ok(*ts)
|
||||
} else {
|
||||
UnexpectedSnafu {
|
||||
reason: format!(
|
||||
"Expected timestamp in expression {phy:?} but got {:?}",
|
||||
eval_res
|
||||
),
|
||||
}
|
||||
.fail()?
|
||||
}
|
||||
}
|
||||
|
||||
// TODO(discord9): a method to found out the precise time window
|
||||
|
||||
/// Find out the `Filter` Node corresponding to outermost `WHERE` and add a new filter expr to it
|
||||
#[derive(Debug)]
|
||||
pub struct AddFilterRewriter {
|
||||
extra_filter: Expr,
|
||||
is_rewritten: bool,
|
||||
}
|
||||
|
||||
impl AddFilterRewriter {
|
||||
fn new(filter: Expr) -> Self {
|
||||
Self {
|
||||
extra_filter: filter,
|
||||
is_rewritten: false,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl TreeNodeRewriter for AddFilterRewriter {
|
||||
type Node = LogicalPlan;
|
||||
fn f_up(&mut self, node: Self::Node) -> DfResult<Transformed<Self::Node>> {
|
||||
if self.is_rewritten {
|
||||
return Ok(Transformed::no(node));
|
||||
}
|
||||
match node {
|
||||
LogicalPlan::Filter(mut filter) if !filter.having => {
|
||||
filter.predicate = filter.predicate.and(self.extra_filter.clone());
|
||||
self.is_rewritten = true;
|
||||
Ok(Transformed::yes(LogicalPlan::Filter(filter)))
|
||||
}
|
||||
LogicalPlan::TableScan(_) => {
|
||||
// add a new filter
|
||||
let filter =
|
||||
datafusion_expr::Filter::try_new(self.extra_filter.clone(), Arc::new(node))?;
|
||||
self.is_rewritten = true;
|
||||
Ok(Transformed::yes(LogicalPlan::Filter(filter)))
|
||||
}
|
||||
_ => Ok(Transformed::no(node)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn df_plan_to_sql(plan: &LogicalPlan) -> Result<String, Error> {
|
||||
let unparser = Unparser::default();
|
||||
let sql = unparser
|
||||
.plan_to_sql(plan)
|
||||
.with_context(|_e| DatafusionSnafu {
|
||||
context: format!("Failed to unparse logical plan {plan:?}"),
|
||||
})?;
|
||||
Ok(sql.to_string())
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use datafusion_common::tree_node::TreeNode;
|
||||
use pretty_assertions::assert_eq;
|
||||
use session::context::QueryContext;
|
||||
|
||||
use super::{sql_to_df_plan, *};
|
||||
use crate::recording_rules::{df_plan_to_sql, AddFilterRewriter};
|
||||
use crate::test_utils::create_test_query_engine;
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_add_filter() {
|
||||
let testcases = vec![
|
||||
(
|
||||
"SELECT number FROM numbers_with_ts GROUP BY number","SELECT numbers_with_ts.number FROM numbers_with_ts WHERE (number > 4) GROUP BY numbers_with_ts.number"
|
||||
),
|
||||
(
|
||||
"SELECT number FROM numbers_with_ts WHERE number < 2 OR number >10",
|
||||
"SELECT numbers_with_ts.number FROM numbers_with_ts WHERE ((numbers_with_ts.number < 2) OR (numbers_with_ts.number > 10)) AND (number > 4)"
|
||||
),
|
||||
(
|
||||
"SELECT date_bin('5 minutes', ts) as time_window FROM numbers_with_ts GROUP BY time_window",
|
||||
"SELECT date_bin('5 minutes', numbers_with_ts.ts) AS time_window FROM numbers_with_ts WHERE (number > 4) GROUP BY date_bin('5 minutes', numbers_with_ts.ts)"
|
||||
)
|
||||
];
|
||||
use datafusion_expr::{col, lit};
|
||||
let query_engine = create_test_query_engine();
|
||||
let ctx = QueryContext::arc();
|
||||
|
||||
for (before, after) in testcases {
|
||||
let sql = before;
|
||||
let plan = sql_to_df_plan(ctx.clone(), query_engine.clone(), sql, false)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let mut add_filter = AddFilterRewriter::new(col("number").gt(lit(4u32)));
|
||||
let plan = plan.rewrite(&mut add_filter).unwrap().data;
|
||||
let new_sql = df_plan_to_sql(&plan).unwrap();
|
||||
assert_eq!(after, new_sql);
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_plan_time_window_lower_bound() {
|
||||
use datafusion_expr::{col, lit};
|
||||
let query_engine = create_test_query_engine();
|
||||
let ctx = QueryContext::arc();
|
||||
|
||||
let testcases = [
|
||||
// same alias is not same column
|
||||
(
|
||||
"SELECT arrow_cast(date_bin(INTERVAL '1 MINS', numbers_with_ts.ts), 'Timestamp(Second, None)') AS ts FROM numbers_with_ts GROUP BY ts;",
|
||||
Timestamp::new(1740394109, TimeUnit::Second),
|
||||
(
|
||||
"ts".to_string(),
|
||||
Some(Timestamp::new(1740394109000, TimeUnit::Millisecond)),
|
||||
Some(Timestamp::new(1740394109001, TimeUnit::Millisecond)),
|
||||
),
|
||||
"SELECT arrow_cast(date_bin(INTERVAL '1 MINS', numbers_with_ts.ts), 'Timestamp(Second, None)') AS ts FROM numbers_with_ts WHERE ((ts >= CAST('2025-02-24 10:48:29' AS TIMESTAMP)) AND (ts <= CAST('2025-02-24 10:48:29.001' AS TIMESTAMP))) GROUP BY numbers_with_ts.ts"
|
||||
),
|
||||
// complex time window index
|
||||
(
|
||||
"SELECT arrow_cast(date_bin(INTERVAL '1 MINS', numbers_with_ts.ts), 'Timestamp(Second, None)') AS time_window FROM numbers_with_ts GROUP BY time_window;",
|
||||
Timestamp::new(1740394109, TimeUnit::Second),
|
||||
(
|
||||
"ts".to_string(),
|
||||
Some(Timestamp::new(1740394080, TimeUnit::Second)),
|
||||
Some(Timestamp::new(1740394140, TimeUnit::Second)),
|
||||
),
|
||||
"SELECT arrow_cast(date_bin(INTERVAL '1 MINS', numbers_with_ts.ts), 'Timestamp(Second, None)') AS time_window FROM numbers_with_ts WHERE ((ts >= CAST('2025-02-24 10:48:00' AS TIMESTAMP)) AND (ts <= CAST('2025-02-24 10:49:00' AS TIMESTAMP))) GROUP BY arrow_cast(date_bin(INTERVAL '1 MINS', numbers_with_ts.ts), 'Timestamp(Second, None)')"
|
||||
),
|
||||
// no time index
|
||||
(
|
||||
"SELECT date_bin('5 minutes', ts) FROM numbers_with_ts;",
|
||||
Timestamp::new(23, TimeUnit::Millisecond),
|
||||
("ts".to_string(), None, None),
|
||||
"SELECT date_bin('5 minutes', ts) FROM numbers_with_ts;"
|
||||
),
|
||||
// time index
|
||||
(
|
||||
"SELECT date_bin('5 minutes', ts) as time_window FROM numbers_with_ts GROUP BY time_window;",
|
||||
Timestamp::new(23, TimeUnit::Nanosecond),
|
||||
(
|
||||
"ts".to_string(),
|
||||
Some(Timestamp::new(0, TimeUnit::Millisecond)),
|
||||
Some(Timestamp::new(300000, TimeUnit::Millisecond)),
|
||||
),
|
||||
"SELECT date_bin('5 minutes', numbers_with_ts.ts) AS time_window FROM numbers_with_ts WHERE ((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts <= CAST('1970-01-01 00:05:00' AS TIMESTAMP))) GROUP BY date_bin('5 minutes', numbers_with_ts.ts)"
|
||||
),
|
||||
// on spot
|
||||
(
|
||||
"SELECT date_bin('5 minutes', ts) as time_window FROM numbers_with_ts GROUP BY time_window;",
|
||||
Timestamp::new(0, TimeUnit::Nanosecond),
|
||||
(
|
||||
"ts".to_string(),
|
||||
Some(Timestamp::new(0, TimeUnit::Millisecond)),
|
||||
Some(Timestamp::new(300000, TimeUnit::Millisecond)),
|
||||
),
|
||||
"SELECT date_bin('5 minutes', numbers_with_ts.ts) AS time_window FROM numbers_with_ts WHERE ((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts <= CAST('1970-01-01 00:05:00' AS TIMESTAMP))) GROUP BY date_bin('5 minutes', numbers_with_ts.ts)"
|
||||
),
|
||||
// different time unit
|
||||
(
|
||||
"SELECT date_bin('5 minutes', ts) as time_window FROM numbers_with_ts GROUP BY time_window;",
|
||||
Timestamp::new(23_000_000, TimeUnit::Nanosecond),
|
||||
(
|
||||
"ts".to_string(),
|
||||
Some(Timestamp::new(0, TimeUnit::Millisecond)),
|
||||
Some(Timestamp::new(300000, TimeUnit::Millisecond)),
|
||||
),
|
||||
"SELECT date_bin('5 minutes', numbers_with_ts.ts) AS time_window FROM numbers_with_ts WHERE ((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts <= CAST('1970-01-01 00:05:00' AS TIMESTAMP))) GROUP BY date_bin('5 minutes', numbers_with_ts.ts)"
|
||||
),
|
||||
// time index with other fields
|
||||
(
|
||||
"SELECT sum(number) as sum_up, date_bin('5 minutes', ts) as time_window FROM numbers_with_ts GROUP BY time_window;",
|
||||
Timestamp::new(23, TimeUnit::Millisecond),
|
||||
(
|
||||
"ts".to_string(),
|
||||
Some(Timestamp::new(0, TimeUnit::Millisecond)),
|
||||
Some(Timestamp::new(300000, TimeUnit::Millisecond)),
|
||||
),
|
||||
"SELECT sum(numbers_with_ts.number) AS sum_up, date_bin('5 minutes', numbers_with_ts.ts) AS time_window FROM numbers_with_ts WHERE ((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts <= CAST('1970-01-01 00:05:00' AS TIMESTAMP))) GROUP BY date_bin('5 minutes', numbers_with_ts.ts)"
|
||||
),
|
||||
// time index with other pks
|
||||
(
|
||||
"SELECT number, date_bin('5 minutes', ts) as time_window FROM numbers_with_ts GROUP BY time_window, number;",
|
||||
Timestamp::new(23, TimeUnit::Millisecond),
|
||||
(
|
||||
"ts".to_string(),
|
||||
Some(Timestamp::new(0, TimeUnit::Millisecond)),
|
||||
Some(Timestamp::new(300000, TimeUnit::Millisecond)),
|
||||
),
|
||||
"SELECT numbers_with_ts.number, date_bin('5 minutes', numbers_with_ts.ts) AS time_window FROM numbers_with_ts WHERE ((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts <= CAST('1970-01-01 00:05:00' AS TIMESTAMP))) GROUP BY date_bin('5 minutes', numbers_with_ts.ts), numbers_with_ts.number"
|
||||
),
|
||||
// subquery
|
||||
(
|
||||
"SELECT number, time_window FROM (SELECT number, date_bin('5 minutes', ts) as time_window FROM numbers_with_ts GROUP BY time_window, number);",
|
||||
Timestamp::new(23, TimeUnit::Millisecond),
|
||||
(
|
||||
"ts".to_string(),
|
||||
Some(Timestamp::new(0, TimeUnit::Millisecond)),
|
||||
Some(Timestamp::new(300000, TimeUnit::Millisecond)),
|
||||
),
|
||||
"SELECT numbers_with_ts.number, time_window FROM (SELECT numbers_with_ts.number, date_bin('5 minutes', numbers_with_ts.ts) AS time_window FROM numbers_with_ts WHERE ((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts <= CAST('1970-01-01 00:05:00' AS TIMESTAMP))) GROUP BY date_bin('5 minutes', numbers_with_ts.ts), numbers_with_ts.number)"
|
||||
),
|
||||
// cte
|
||||
(
|
||||
"with cte as (select number, date_bin('5 minutes', ts) as time_window from numbers_with_ts GROUP BY time_window, number) select number, time_window from cte;",
|
||||
Timestamp::new(23, TimeUnit::Millisecond),
|
||||
(
|
||||
"ts".to_string(),
|
||||
Some(Timestamp::new(0, TimeUnit::Millisecond)),
|
||||
Some(Timestamp::new(300000, TimeUnit::Millisecond)),
|
||||
),
|
||||
"SELECT cte.number, cte.time_window FROM (SELECT numbers_with_ts.number, date_bin('5 minutes', numbers_with_ts.ts) AS time_window FROM numbers_with_ts WHERE ((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts <= CAST('1970-01-01 00:05:00' AS TIMESTAMP))) GROUP BY date_bin('5 minutes', numbers_with_ts.ts), numbers_with_ts.number) AS cte"
|
||||
),
|
||||
// complex subquery without alias
|
||||
(
|
||||
"SELECT sum(number), number, date_bin('5 minutes', ts) as time_window, bucket_name FROM (SELECT number, ts, case when number < 5 THEN 'bucket_0_5' when number >= 5 THEN 'bucket_5_inf' END as bucket_name FROM numbers_with_ts) GROUP BY number, time_window, bucket_name;",
|
||||
Timestamp::new(23, TimeUnit::Millisecond),
|
||||
(
|
||||
"ts".to_string(),
|
||||
Some(Timestamp::new(0, TimeUnit::Millisecond)),
|
||||
Some(Timestamp::new(300000, TimeUnit::Millisecond)),
|
||||
),
|
||||
"SELECT sum(numbers_with_ts.number), numbers_with_ts.number, date_bin('5 minutes', numbers_with_ts.ts) AS time_window, bucket_name FROM (SELECT numbers_with_ts.number, numbers_with_ts.ts, CASE WHEN (numbers_with_ts.number < 5) THEN 'bucket_0_5' WHEN (numbers_with_ts.number >= 5) THEN 'bucket_5_inf' END AS bucket_name FROM numbers_with_ts WHERE ((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts <= CAST('1970-01-01 00:05:00' AS TIMESTAMP)))) GROUP BY numbers_with_ts.number, date_bin('5 minutes', numbers_with_ts.ts), bucket_name"
|
||||
),
|
||||
// complex subquery alias
|
||||
(
|
||||
"SELECT sum(number), number, date_bin('5 minutes', ts) as time_window, bucket_name FROM (SELECT number, ts, case when number < 5 THEN 'bucket_0_5' when number >= 5 THEN 'bucket_5_inf' END as bucket_name FROM numbers_with_ts) as cte GROUP BY number, time_window, bucket_name;",
|
||||
Timestamp::new(23, TimeUnit::Millisecond),
|
||||
(
|
||||
"ts".to_string(),
|
||||
Some(Timestamp::new(0, TimeUnit::Millisecond)),
|
||||
Some(Timestamp::new(300000, TimeUnit::Millisecond)),
|
||||
),
|
||||
"SELECT sum(cte.number), cte.number, date_bin('5 minutes', cte.ts) AS time_window, cte.bucket_name FROM (SELECT numbers_with_ts.number, numbers_with_ts.ts, CASE WHEN (numbers_with_ts.number < 5) THEN 'bucket_0_5' WHEN (numbers_with_ts.number >= 5) THEN 'bucket_5_inf' END AS bucket_name FROM numbers_with_ts WHERE ((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts <= CAST('1970-01-01 00:05:00' AS TIMESTAMP)))) AS cte GROUP BY cte.number, date_bin('5 minutes', cte.ts), cte.bucket_name"
|
||||
),
|
||||
];
|
||||
|
||||
for (sql, current, expected, expected_unparsed) in testcases {
|
||||
let plan = sql_to_df_plan(ctx.clone(), query_engine.clone(), sql, true)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let real =
|
||||
find_plan_time_window_bound(&plan, current, ctx.clone(), query_engine.clone())
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(expected, real);
|
||||
|
||||
let plan = sql_to_df_plan(ctx.clone(), query_engine.clone(), sql, false)
|
||||
.await
|
||||
.unwrap();
|
||||
let (col_name, lower, upper) = real;
|
||||
let new_sql = if lower.is_some() {
|
||||
let to_df_literal = |value| {
|
||||
let value = Value::from(value);
|
||||
|
||||
value.try_to_scalar_value(&value.data_type()).unwrap()
|
||||
};
|
||||
let lower = to_df_literal(lower.unwrap());
|
||||
let upper = to_df_literal(upper.unwrap());
|
||||
let expr = col(&col_name)
|
||||
.gt_eq(lit(lower))
|
||||
.and(col(&col_name).lt_eq(lit(upper)));
|
||||
let mut add_filter = AddFilterRewriter::new(expr);
|
||||
let plan = plan.rewrite(&mut add_filter).unwrap().data;
|
||||
df_plan_to_sql(&plan).unwrap()
|
||||
} else {
|
||||
sql.to_string()
|
||||
};
|
||||
assert_eq!(expected_unparsed, new_sql);
|
||||
}
|
||||
}
|
||||
}
|
||||
773
src/flow/src/recording_rules/engine.rs
Normal file
773
src/flow/src/recording_rules/engine.rs
Normal file
@@ -0,0 +1,773 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::collections::{BTreeMap, HashMap, HashSet};
|
||||
use std::sync::Arc;
|
||||
use std::time::{Duration, SystemTime, UNIX_EPOCH};
|
||||
|
||||
use api::v1::flow::FlowResponse;
|
||||
use common_error::ext::BoxedError;
|
||||
use common_meta::ddl::create_flow::FlowType;
|
||||
use common_meta::key::flow::FlowMetadataManagerRef;
|
||||
use common_meta::key::table_info::TableInfoManager;
|
||||
use common_meta::key::TableMetadataManagerRef;
|
||||
use common_telemetry::tracing::warn;
|
||||
use common_telemetry::{debug, info};
|
||||
use common_time::Timestamp;
|
||||
use datafusion::sql::unparser::expr_to_sql;
|
||||
use datafusion_common::tree_node::TreeNode;
|
||||
use datatypes::value::Value;
|
||||
use query::QueryEngineRef;
|
||||
use session::context::QueryContextRef;
|
||||
use snafu::{ensure, OptionExt, ResultExt};
|
||||
use store_api::storage::RegionId;
|
||||
use table::metadata::TableId;
|
||||
use tokio::sync::oneshot::error::TryRecvError;
|
||||
use tokio::sync::{oneshot, RwLock};
|
||||
use tokio::time::Instant;
|
||||
|
||||
use super::frontend_client::FrontendClient;
|
||||
use super::{df_plan_to_sql, AddFilterRewriter, TimeWindowExpr};
|
||||
use crate::adapter::{CreateFlowArgs, FlowId, TableName};
|
||||
use crate::error::{
|
||||
DatafusionSnafu, DatatypesSnafu, ExternalSnafu, FlowAlreadyExistSnafu, InternalSnafu,
|
||||
TimeSnafu, UnexpectedSnafu,
|
||||
};
|
||||
use crate::metrics::{METRIC_FLOW_RULE_ENGINE_QUERY_TIME, METRIC_FLOW_RULE_ENGINE_SLOW_QUERY};
|
||||
use crate::recording_rules::{find_plan_time_window_bound, find_time_window_expr, sql_to_df_plan};
|
||||
use crate::Error;
|
||||
|
||||
/// TODO(discord9): make those constants configurable
|
||||
/// The default rule engine query timeout is 10 minutes
|
||||
pub const DEFAULT_RULE_ENGINE_QUERY_TIMEOUT: Duration = Duration::from_secs(10 * 60);
|
||||
|
||||
/// will output a warn log for any query that runs for more that 1 minutes, and also every 1 minutes when that query is still running
|
||||
pub const SLOW_QUERY_THRESHOLD: Duration = Duration::from_secs(60);
|
||||
|
||||
/// TODO(discord9): determine how to configure refresh rate
|
||||
pub struct RecordingRuleEngine {
|
||||
tasks: RwLock<BTreeMap<FlowId, RecordingRuleTask>>,
|
||||
shutdown_txs: RwLock<BTreeMap<FlowId, oneshot::Sender<()>>>,
|
||||
frontend_client: Arc<FrontendClient>,
|
||||
flow_metadata_manager: FlowMetadataManagerRef,
|
||||
table_meta: TableMetadataManagerRef,
|
||||
engine: QueryEngineRef,
|
||||
}
|
||||
|
||||
impl RecordingRuleEngine {
|
||||
pub fn new(
|
||||
frontend_client: Arc<FrontendClient>,
|
||||
engine: QueryEngineRef,
|
||||
flow_metadata_manager: FlowMetadataManagerRef,
|
||||
table_meta: TableMetadataManagerRef,
|
||||
) -> Self {
|
||||
Self {
|
||||
tasks: Default::default(),
|
||||
shutdown_txs: Default::default(),
|
||||
frontend_client,
|
||||
flow_metadata_manager,
|
||||
table_meta,
|
||||
engine,
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn handle_inserts(
|
||||
&self,
|
||||
request: api::v1::region::InsertRequests,
|
||||
) -> Result<FlowResponse, Error> {
|
||||
let table_info_mgr = self.table_meta.table_info_manager();
|
||||
let mut group_by_table_name: HashMap<TableName, Vec<api::v1::Rows>> = HashMap::new();
|
||||
for r in request.requests {
|
||||
let tid = RegionId::from(r.region_id).table_id();
|
||||
let name = get_table_name(table_info_mgr, &tid).await?;
|
||||
let entry = group_by_table_name.entry(name).or_default();
|
||||
if let Some(rows) = r.rows {
|
||||
entry.push(rows);
|
||||
}
|
||||
}
|
||||
|
||||
for (_flow_id, task) in self.tasks.read().await.iter() {
|
||||
let src_table_names = &task.source_table_names;
|
||||
|
||||
for src_table_name in src_table_names {
|
||||
if let Some(entry) = group_by_table_name.get(src_table_name) {
|
||||
let Some(expr) = &task.time_window_expr else {
|
||||
continue;
|
||||
};
|
||||
let involved_time_windows = expr.handle_rows(entry.clone()).await?;
|
||||
let mut state = task.state.write().await;
|
||||
state
|
||||
.dirty_time_windows
|
||||
.add_lower_bounds(involved_time_windows.into_iter());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(Default::default())
|
||||
}
|
||||
}
|
||||
|
||||
async fn get_table_name(zelf: &TableInfoManager, table_id: &TableId) -> Result<TableName, Error> {
|
||||
zelf.get(*table_id)
|
||||
.await
|
||||
.map_err(BoxedError::new)
|
||||
.context(ExternalSnafu)?
|
||||
.with_context(|| UnexpectedSnafu {
|
||||
reason: format!("Table id = {:?}, couldn't found table name", table_id),
|
||||
})
|
||||
.map(|name| name.table_name())
|
||||
.map(|name| [name.catalog_name, name.schema_name, name.table_name])
|
||||
}
|
||||
|
||||
const MIN_REFRESH_DURATION: Duration = Duration::new(5, 0);
|
||||
|
||||
impl RecordingRuleEngine {
|
||||
pub async fn create_flow(&self, args: CreateFlowArgs) -> Result<Option<FlowId>, Error> {
|
||||
let CreateFlowArgs {
|
||||
flow_id,
|
||||
sink_table_name,
|
||||
source_table_ids,
|
||||
create_if_not_exists,
|
||||
or_replace,
|
||||
expire_after,
|
||||
comment: _,
|
||||
sql,
|
||||
flow_options,
|
||||
query_ctx,
|
||||
} = args;
|
||||
|
||||
// or replace logic
|
||||
{
|
||||
let is_exist = self.tasks.read().await.contains_key(&flow_id);
|
||||
match (create_if_not_exists, or_replace, is_exist) {
|
||||
// if replace, ignore that old flow exists
|
||||
(_, true, true) => {
|
||||
info!("Replacing flow with id={}", flow_id);
|
||||
}
|
||||
(false, false, true) => FlowAlreadyExistSnafu { id: flow_id }.fail()?,
|
||||
// already exists, and not replace, return None
|
||||
(true, false, true) => {
|
||||
info!("Flow with id={} already exists, do nothing", flow_id);
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
// continue as normal
|
||||
(_, _, false) => (),
|
||||
}
|
||||
}
|
||||
|
||||
let flow_type = flow_options.get(FlowType::FLOW_TYPE_KEY);
|
||||
|
||||
ensure!(
|
||||
flow_type == Some(&FlowType::RecordingRule.to_string()) || flow_type.is_none(),
|
||||
UnexpectedSnafu {
|
||||
reason: format!("Flow type is not RecordingRule nor None, got {flow_type:?}")
|
||||
}
|
||||
);
|
||||
|
||||
let Some(query_ctx) = query_ctx else {
|
||||
UnexpectedSnafu {
|
||||
reason: "Query context is None".to_string(),
|
||||
}
|
||||
.fail()?
|
||||
};
|
||||
let query_ctx = Arc::new(query_ctx);
|
||||
let mut source_table_names = Vec::new();
|
||||
for src_id in source_table_ids {
|
||||
let table_name = self
|
||||
.table_meta
|
||||
.table_info_manager()
|
||||
.get(src_id)
|
||||
.await
|
||||
.map_err(BoxedError::new)
|
||||
.context(ExternalSnafu)?
|
||||
.with_context(|| UnexpectedSnafu {
|
||||
reason: format!("Table id = {:?}, couldn't found table name", src_id),
|
||||
})
|
||||
.map(|name| name.table_name())
|
||||
.map(|name| [name.catalog_name, name.schema_name, name.table_name])?;
|
||||
source_table_names.push(table_name);
|
||||
}
|
||||
|
||||
let (tx, rx) = oneshot::channel();
|
||||
|
||||
let plan = sql_to_df_plan(query_ctx.clone(), self.engine.clone(), &sql, true).await?;
|
||||
let (column_name, time_window_expr, _, df_schema) = find_time_window_expr(
|
||||
&plan,
|
||||
self.engine.engine_state().catalog_manager().clone(),
|
||||
query_ctx.clone(),
|
||||
)
|
||||
.await?;
|
||||
|
||||
let phy_expr = time_window_expr
|
||||
.map(|expr| TimeWindowExpr::from_expr(&expr, &column_name, &df_schema))
|
||||
.transpose()?;
|
||||
|
||||
let task = RecordingRuleTask::new(
|
||||
flow_id,
|
||||
&sql,
|
||||
phy_expr,
|
||||
expire_after,
|
||||
sink_table_name,
|
||||
source_table_names,
|
||||
query_ctx,
|
||||
rx,
|
||||
);
|
||||
|
||||
let task_inner = task.clone();
|
||||
let engine = self.engine.clone();
|
||||
let frontend = self.frontend_client.clone();
|
||||
|
||||
// TODO(discord9): also save handle & use time wheel or what for better
|
||||
let _handle = common_runtime::spawn_global(async move {
|
||||
match task_inner.start_executing(engine, frontend).await {
|
||||
Ok(()) => info!("Flow {} shutdown", task_inner.flow_id),
|
||||
Err(err) => common_telemetry::error!(
|
||||
"Flow {} encounter unrecoverable error: {err:?}",
|
||||
task_inner.flow_id
|
||||
),
|
||||
}
|
||||
});
|
||||
|
||||
// TODO(discord9): deal with replace logic
|
||||
let replaced_old_task_opt = self.tasks.write().await.insert(flow_id, task);
|
||||
drop(replaced_old_task_opt);
|
||||
|
||||
self.shutdown_txs.write().await.insert(flow_id, tx);
|
||||
|
||||
Ok(Some(flow_id))
|
||||
}
|
||||
|
||||
pub async fn remove_flow(&self, flow_id: FlowId) -> Result<(), Error> {
|
||||
if self.tasks.write().await.remove(&flow_id).is_none() {
|
||||
warn!("Flow {flow_id} not found in tasks")
|
||||
}
|
||||
let Some(tx) = self.shutdown_txs.write().await.remove(&flow_id) else {
|
||||
UnexpectedSnafu {
|
||||
reason: format!("Can't found shutdown tx for flow {flow_id}"),
|
||||
}
|
||||
.fail()?
|
||||
};
|
||||
if tx.send(()).is_err() {
|
||||
warn!("Fail to shutdown flow {flow_id} due to receiver already dropped, maybe flow {flow_id} is already dropped?")
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct RecordingRuleTask {
|
||||
flow_id: FlowId,
|
||||
query: String,
|
||||
time_window_expr: Option<TimeWindowExpr>,
|
||||
/// in seconds
|
||||
expire_after: Option<i64>,
|
||||
sink_table_name: [String; 3],
|
||||
source_table_names: HashSet<[String; 3]>,
|
||||
state: Arc<RwLock<RecordingRuleState>>,
|
||||
}
|
||||
|
||||
impl RecordingRuleTask {
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub fn new(
|
||||
flow_id: FlowId,
|
||||
query: &str,
|
||||
time_window_expr: Option<TimeWindowExpr>,
|
||||
expire_after: Option<i64>,
|
||||
sink_table_name: [String; 3],
|
||||
source_table_names: Vec<[String; 3]>,
|
||||
query_ctx: QueryContextRef,
|
||||
shutdown_rx: oneshot::Receiver<()>,
|
||||
) -> Self {
|
||||
Self {
|
||||
flow_id,
|
||||
query: query.to_string(),
|
||||
time_window_expr,
|
||||
expire_after,
|
||||
sink_table_name,
|
||||
source_table_names: source_table_names.into_iter().collect(),
|
||||
state: Arc::new(RwLock::new(RecordingRuleState::new(query_ctx, shutdown_rx))),
|
||||
}
|
||||
}
|
||||
}
|
||||
impl RecordingRuleTask {
|
||||
/// This should be called in a new tokio task
|
||||
pub async fn start_executing(
|
||||
&self,
|
||||
engine: QueryEngineRef,
|
||||
frontend_client: Arc<FrontendClient>,
|
||||
) -> Result<(), Error> {
|
||||
// only first query don't need upper bound
|
||||
let mut is_first = true;
|
||||
|
||||
loop {
|
||||
// FIXME(discord9): test if need upper bound also works
|
||||
let new_query = self.gen_query_with_time_window(engine.clone()).await?;
|
||||
|
||||
let insert_into = if let Some(new_query) = new_query {
|
||||
format!(
|
||||
"INSERT INTO {}.{}.{} {}",
|
||||
self.sink_table_name[0],
|
||||
self.sink_table_name[1],
|
||||
self.sink_table_name[2],
|
||||
new_query
|
||||
)
|
||||
} else {
|
||||
tokio::time::sleep(MIN_REFRESH_DURATION).await;
|
||||
continue;
|
||||
};
|
||||
|
||||
if is_first {
|
||||
is_first = false;
|
||||
}
|
||||
|
||||
let instant = Instant::now();
|
||||
let flow_id = self.flow_id;
|
||||
let db_client = frontend_client.get_database_client().await?;
|
||||
let peer_addr = db_client.peer.addr;
|
||||
debug!(
|
||||
"Executing flow {flow_id}(expire_after={:?} secs) on {:?} with query {}",
|
||||
self.expire_after, peer_addr, &insert_into
|
||||
);
|
||||
|
||||
let timer = METRIC_FLOW_RULE_ENGINE_QUERY_TIME
|
||||
.with_label_values(&[flow_id.to_string().as_str()])
|
||||
.start_timer();
|
||||
|
||||
let res = db_client.database.sql(&insert_into).await;
|
||||
drop(timer);
|
||||
|
||||
let elapsed = instant.elapsed();
|
||||
if let Ok(res1) = &res {
|
||||
debug!(
|
||||
"Flow {flow_id} executed, result: {res1:?}, elapsed: {:?}",
|
||||
elapsed
|
||||
);
|
||||
} else if let Err(res) = &res {
|
||||
warn!(
|
||||
"Failed to execute Flow {flow_id} on frontend {}, result: {res:?}, elapsed: {:?} with query: {}",
|
||||
peer_addr, elapsed, &insert_into
|
||||
);
|
||||
}
|
||||
|
||||
// record slow query
|
||||
if elapsed >= SLOW_QUERY_THRESHOLD {
|
||||
warn!(
|
||||
"Flow {flow_id} on frontend {} executed for {:?} before complete, query: {}",
|
||||
peer_addr, elapsed, &insert_into
|
||||
);
|
||||
METRIC_FLOW_RULE_ENGINE_SLOW_QUERY
|
||||
.with_label_values(&[flow_id.to_string().as_str(), &insert_into, &peer_addr])
|
||||
.observe(elapsed.as_secs_f64());
|
||||
}
|
||||
|
||||
self.state
|
||||
.write()
|
||||
.await
|
||||
.after_query_exec(elapsed, res.is_ok());
|
||||
|
||||
let sleep_until = {
|
||||
let mut state = self.state.write().await;
|
||||
match state.shutdown_rx.try_recv() {
|
||||
Ok(()) => break Ok(()),
|
||||
Err(TryRecvError::Closed) => {
|
||||
warn!("Unexpected shutdown flow {flow_id}, shutdown anyway");
|
||||
break Ok(());
|
||||
}
|
||||
Err(TryRecvError::Empty) => (),
|
||||
}
|
||||
state.get_next_start_query_time(None)
|
||||
};
|
||||
tokio::time::sleep_until(sleep_until).await;
|
||||
}
|
||||
}
|
||||
|
||||
/// will merge and use the first ten time window in query
|
||||
async fn gen_query_with_time_window(
|
||||
&self,
|
||||
engine: QueryEngineRef,
|
||||
) -> Result<Option<String>, Error> {
|
||||
let query_ctx = self.state.read().await.query_ctx.clone();
|
||||
let start = SystemTime::now();
|
||||
let since_the_epoch = start
|
||||
.duration_since(UNIX_EPOCH)
|
||||
.expect("Time went backwards");
|
||||
let low_bound = self
|
||||
.expire_after
|
||||
.map(|e| since_the_epoch.as_secs() - e as u64)
|
||||
.unwrap_or(u64::MIN);
|
||||
|
||||
let low_bound = Timestamp::new_second(low_bound as i64);
|
||||
|
||||
let plan = sql_to_df_plan(query_ctx.clone(), engine.clone(), &self.query, true).await?;
|
||||
|
||||
// TODO(discord9): find more time window!
|
||||
let (col_name, lower, upper) =
|
||||
find_plan_time_window_bound(&plan, low_bound, query_ctx.clone(), engine.clone())
|
||||
.await?;
|
||||
|
||||
let new_sql = {
|
||||
let expr = {
|
||||
match (lower, upper) {
|
||||
(Some(l), Some(u)) => {
|
||||
let window_size = u.sub(&l).with_context(|| UnexpectedSnafu {
|
||||
reason: format!("Can't get window size from {u:?} - {l:?}"),
|
||||
})?;
|
||||
|
||||
self.state
|
||||
.write()
|
||||
.await
|
||||
.dirty_time_windows
|
||||
.gen_filter_exprs(&col_name, lower, window_size)?
|
||||
}
|
||||
_ => {
|
||||
warn!(
|
||||
"Flow id = {:?}, can't get window size: lower={lower:?}, upper={upper:?}, using the same query", self.flow_id
|
||||
);
|
||||
// since no time window lower/upper bound is found, just return the original query
|
||||
return Ok(Some(self.query.clone()));
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
debug!(
|
||||
"Flow id={:?}, Generated filter expr: {:?}",
|
||||
self.flow_id,
|
||||
expr.as_ref()
|
||||
.map(|expr| expr_to_sql(expr).with_context(|_| DatafusionSnafu {
|
||||
context: format!("Failed to generate filter expr from {expr:?}"),
|
||||
}))
|
||||
.transpose()?
|
||||
.map(|s| s.to_string())
|
||||
);
|
||||
|
||||
let Some(expr) = expr else {
|
||||
// no new data, hence no need to update
|
||||
debug!("Flow id={:?}, no new data, not update", self.flow_id);
|
||||
return Ok(None);
|
||||
};
|
||||
|
||||
let mut add_filter = AddFilterRewriter::new(expr);
|
||||
// make a not optimized plan for clearer unparse
|
||||
let plan =
|
||||
sql_to_df_plan(query_ctx.clone(), engine.clone(), &self.query, false).await?;
|
||||
let plan = plan
|
||||
.clone()
|
||||
.rewrite(&mut add_filter)
|
||||
.with_context(|_| DatafusionSnafu {
|
||||
context: format!("Failed to rewrite plan {plan:?}"),
|
||||
})?
|
||||
.data;
|
||||
df_plan_to_sql(&plan)?
|
||||
};
|
||||
|
||||
Ok(Some(new_sql))
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct RecordingRuleState {
|
||||
query_ctx: QueryContextRef,
|
||||
/// last query complete time
|
||||
last_update_time: Instant,
|
||||
/// last time query duration
|
||||
last_query_duration: Duration,
|
||||
/// Dirty Time windows need to be updated
|
||||
/// mapping of `start -> end` and non-overlapping
|
||||
dirty_time_windows: DirtyTimeWindows,
|
||||
exec_state: ExecState,
|
||||
shutdown_rx: oneshot::Receiver<()>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Default)]
|
||||
pub struct DirtyTimeWindows {
|
||||
windows: BTreeMap<Timestamp, Option<Timestamp>>,
|
||||
}
|
||||
|
||||
fn to_df_literal(value: Timestamp) -> Result<datafusion_common::ScalarValue, Error> {
|
||||
let value = Value::from(value);
|
||||
let value = value
|
||||
.try_to_scalar_value(&value.data_type())
|
||||
.with_context(|_| DatatypesSnafu {
|
||||
extra: format!("Failed to convert to scalar value: {}", value),
|
||||
})?;
|
||||
Ok(value)
|
||||
}
|
||||
|
||||
impl DirtyTimeWindows {
|
||||
/// Time window merge distance
|
||||
const MERGE_DIST: i32 = 3;
|
||||
|
||||
/// Maximum number of filters allowed in a single query
|
||||
const MAX_FILTER_NUM: usize = 20;
|
||||
|
||||
/// Add lower bounds to the dirty time windows. Upper bounds are ignored.
|
||||
///
|
||||
/// # Arguments
|
||||
///
|
||||
/// * `lower_bounds` - An iterator of lower bounds to be added.
|
||||
pub fn add_lower_bounds(&mut self, lower_bounds: impl Iterator<Item = Timestamp>) {
|
||||
for lower_bound in lower_bounds {
|
||||
let entry = self.windows.entry(lower_bound);
|
||||
entry.or_insert(None);
|
||||
}
|
||||
}
|
||||
|
||||
/// Generate all filter expressions consuming all time windows
|
||||
pub fn gen_filter_exprs(
|
||||
&mut self,
|
||||
col_name: &str,
|
||||
expire_lower_bound: Option<Timestamp>,
|
||||
window_size: chrono::Duration,
|
||||
) -> Result<Option<datafusion_expr::Expr>, Error> {
|
||||
debug!(
|
||||
"expire_lower_bound: {:?}, window_size: {:?}",
|
||||
expire_lower_bound.map(|t| t.to_iso8601_string()),
|
||||
window_size
|
||||
);
|
||||
self.merge_dirty_time_windows(window_size)?;
|
||||
|
||||
if self.windows.len() > Self::MAX_FILTER_NUM {
|
||||
warn!(
|
||||
"Too many time windows: {}, only the first {} are taken for this query, the group by expression might be wrong",
|
||||
self.windows.len(),
|
||||
Self::MAX_FILTER_NUM
|
||||
);
|
||||
}
|
||||
|
||||
// get the first `MAX_FILTER_NUM` time windows
|
||||
let nth = self
|
||||
.windows
|
||||
.iter()
|
||||
.nth(Self::MAX_FILTER_NUM)
|
||||
.map(|(key, _)| *key);
|
||||
let first_nth = {
|
||||
if let Some(nth) = nth {
|
||||
let mut after = self.windows.split_off(&nth);
|
||||
std::mem::swap(&mut self.windows, &mut after);
|
||||
|
||||
after
|
||||
} else {
|
||||
std::mem::take(&mut self.windows)
|
||||
}
|
||||
};
|
||||
|
||||
let mut expr_lst = vec![];
|
||||
for (start, end) in first_nth.into_iter() {
|
||||
debug!(
|
||||
"Time window start: {:?}, end: {:?}",
|
||||
start.to_iso8601_string(),
|
||||
end.map(|t| t.to_iso8601_string())
|
||||
);
|
||||
let start = if let Some(expire) = expire_lower_bound {
|
||||
start.max(expire)
|
||||
} else {
|
||||
start
|
||||
};
|
||||
|
||||
// filter out expired time window
|
||||
if let Some(end) = end {
|
||||
if end <= start {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
use datafusion_expr::{col, lit};
|
||||
let lower = to_df_literal(start)?;
|
||||
let upper = end.map(to_df_literal).transpose()?;
|
||||
let expr = if let Some(upper) = upper {
|
||||
col(col_name)
|
||||
.gt_eq(lit(lower))
|
||||
.and(col(col_name).lt(lit(upper)))
|
||||
} else {
|
||||
col(col_name).gt_eq(lit(lower))
|
||||
};
|
||||
expr_lst.push(expr);
|
||||
}
|
||||
let expr = expr_lst.into_iter().reduce(|a, b| a.or(b));
|
||||
Ok(expr)
|
||||
}
|
||||
|
||||
/// Merge time windows that overlaps or get too close
|
||||
pub fn merge_dirty_time_windows(&mut self, window_size: chrono::Duration) -> Result<(), Error> {
|
||||
let mut new_windows = BTreeMap::new();
|
||||
|
||||
let mut prev_tw = None;
|
||||
for (lower_bound, upper_bound) in std::mem::take(&mut self.windows) {
|
||||
let Some(prev_tw) = &mut prev_tw else {
|
||||
prev_tw = Some((lower_bound, upper_bound));
|
||||
continue;
|
||||
};
|
||||
|
||||
let std_window_size = window_size.to_std().map_err(|e| {
|
||||
InternalSnafu {
|
||||
reason: e.to_string(),
|
||||
}
|
||||
.build()
|
||||
})?;
|
||||
|
||||
// if cur.lower - prev.upper <= window_size * 2, merge
|
||||
let prev_upper = prev_tw
|
||||
.1
|
||||
.unwrap_or(prev_tw.0.add_duration(std_window_size).context(TimeSnafu)?);
|
||||
prev_tw.1 = Some(prev_upper);
|
||||
|
||||
let cur_upper = upper_bound.unwrap_or(
|
||||
lower_bound
|
||||
.add_duration(std_window_size)
|
||||
.context(TimeSnafu)?,
|
||||
);
|
||||
|
||||
if lower_bound
|
||||
.sub(&prev_upper)
|
||||
.map(|dist| dist <= window_size * Self::MERGE_DIST)
|
||||
.unwrap_or(false)
|
||||
{
|
||||
prev_tw.1 = Some(cur_upper);
|
||||
} else {
|
||||
new_windows.insert(prev_tw.0, prev_tw.1);
|
||||
*prev_tw = (lower_bound, Some(cur_upper));
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(prev_tw) = prev_tw {
|
||||
new_windows.insert(prev_tw.0, prev_tw.1);
|
||||
}
|
||||
|
||||
self.windows = new_windows;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl RecordingRuleState {
|
||||
pub fn new(query_ctx: QueryContextRef, shutdown_rx: oneshot::Receiver<()>) -> Self {
|
||||
Self {
|
||||
query_ctx,
|
||||
last_update_time: Instant::now(),
|
||||
last_query_duration: Duration::from_secs(0),
|
||||
dirty_time_windows: Default::default(),
|
||||
exec_state: ExecState::Idle,
|
||||
shutdown_rx,
|
||||
}
|
||||
}
|
||||
|
||||
/// called after last query is done
|
||||
/// `is_succ` indicate whether the last query is successful
|
||||
pub fn after_query_exec(&mut self, elapsed: Duration, _is_succ: bool) {
|
||||
self.exec_state = ExecState::Idle;
|
||||
self.last_query_duration = elapsed;
|
||||
self.last_update_time = Instant::now();
|
||||
}
|
||||
|
||||
/// wait for at least `last_query_duration`, at most `max_timeout` to start next query
|
||||
pub fn get_next_start_query_time(&self, max_timeout: Option<Duration>) -> Instant {
|
||||
let next_duration = max_timeout
|
||||
.unwrap_or(self.last_query_duration)
|
||||
.min(self.last_query_duration);
|
||||
let next_duration = next_duration.max(MIN_REFRESH_DURATION);
|
||||
|
||||
self.last_update_time + next_duration
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
enum ExecState {
|
||||
Idle,
|
||||
Executing,
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use pretty_assertions::assert_eq;
|
||||
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_merge_dirty_time_windows() {
|
||||
let mut dirty = DirtyTimeWindows::default();
|
||||
dirty.add_lower_bounds(
|
||||
vec![
|
||||
Timestamp::new_second(0),
|
||||
Timestamp::new_second((1 + DirtyTimeWindows::MERGE_DIST as i64) * 5 * 60),
|
||||
]
|
||||
.into_iter(),
|
||||
);
|
||||
dirty
|
||||
.merge_dirty_time_windows(chrono::Duration::seconds(5 * 60))
|
||||
.unwrap();
|
||||
// just enough to merge
|
||||
assert_eq!(
|
||||
dirty.windows,
|
||||
BTreeMap::from([(
|
||||
Timestamp::new_second(0),
|
||||
Some(Timestamp::new_second(
|
||||
(2 + DirtyTimeWindows::MERGE_DIST as i64) * 5 * 60
|
||||
))
|
||||
)])
|
||||
);
|
||||
|
||||
// separate time window
|
||||
let mut dirty = DirtyTimeWindows::default();
|
||||
dirty.add_lower_bounds(
|
||||
vec![
|
||||
Timestamp::new_second(0),
|
||||
Timestamp::new_second((2 + DirtyTimeWindows::MERGE_DIST as i64) * 5 * 60),
|
||||
]
|
||||
.into_iter(),
|
||||
);
|
||||
dirty
|
||||
.merge_dirty_time_windows(chrono::Duration::seconds(5 * 60))
|
||||
.unwrap();
|
||||
// just enough to merge
|
||||
assert_eq!(
|
||||
BTreeMap::from([
|
||||
(
|
||||
Timestamp::new_second(0),
|
||||
Some(Timestamp::new_second(5 * 60))
|
||||
),
|
||||
(
|
||||
Timestamp::new_second((2 + DirtyTimeWindows::MERGE_DIST as i64) * 5 * 60),
|
||||
Some(Timestamp::new_second(
|
||||
(3 + DirtyTimeWindows::MERGE_DIST as i64) * 5 * 60
|
||||
))
|
||||
)
|
||||
]),
|
||||
dirty.windows
|
||||
);
|
||||
|
||||
// overlapping
|
||||
let mut dirty = DirtyTimeWindows::default();
|
||||
dirty.add_lower_bounds(
|
||||
vec![
|
||||
Timestamp::new_second(0),
|
||||
Timestamp::new_second((DirtyTimeWindows::MERGE_DIST as i64) * 5 * 60),
|
||||
]
|
||||
.into_iter(),
|
||||
);
|
||||
dirty
|
||||
.merge_dirty_time_windows(chrono::Duration::seconds(5 * 60))
|
||||
.unwrap();
|
||||
// just enough to merge
|
||||
assert_eq!(
|
||||
BTreeMap::from([(
|
||||
Timestamp::new_second(0),
|
||||
Some(Timestamp::new_second(
|
||||
(1 + DirtyTimeWindows::MERGE_DIST as i64) * 5 * 60
|
||||
))
|
||||
),]),
|
||||
dirty.windows
|
||||
);
|
||||
}
|
||||
}
|
||||
150
src/flow/src/recording_rules/frontend_client.rs
Normal file
150
src/flow/src/recording_rules/frontend_client.rs
Normal file
@@ -0,0 +1,150 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
//! Frontend client to run flow as recording rule which is time-window-aware normal query triggered every tick set by user
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
use client::{Client, Database, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
|
||||
use common_error::ext::BoxedError;
|
||||
use common_grpc::channel_manager::{ChannelConfig, ChannelManager};
|
||||
use common_meta::cluster::{NodeInfo, NodeInfoKey, Role};
|
||||
use common_meta::peer::Peer;
|
||||
use common_meta::rpc::store::RangeRequest;
|
||||
use meta_client::client::MetaClient;
|
||||
use snafu::ResultExt;
|
||||
|
||||
use crate::error::{ExternalSnafu, UnexpectedSnafu};
|
||||
use crate::recording_rules::engine::DEFAULT_RULE_ENGINE_QUERY_TIMEOUT;
|
||||
use crate::Error;
|
||||
|
||||
fn default_channel_mgr() -> ChannelManager {
|
||||
let cfg = ChannelConfig::new().timeout(DEFAULT_RULE_ENGINE_QUERY_TIMEOUT);
|
||||
ChannelManager::with_config(cfg)
|
||||
}
|
||||
|
||||
fn client_from_urls(addrs: Vec<String>) -> Client {
|
||||
Client::with_manager_and_urls(default_channel_mgr(), addrs)
|
||||
}
|
||||
|
||||
/// A simple frontend client able to execute sql using grpc protocol
|
||||
#[derive(Debug)]
|
||||
pub enum FrontendClient {
|
||||
Distributed {
|
||||
meta_client: Arc<MetaClient>,
|
||||
},
|
||||
Standalone {
|
||||
/// for the sake of simplicity still use grpc even in standalone mode
|
||||
/// notice the client here should all be lazy, so that can wait after frontend is booted then make conn
|
||||
/// TODO(discord9): not use grpc under standalone mode
|
||||
database_client: DatabaseWithPeer,
|
||||
},
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct DatabaseWithPeer {
|
||||
pub database: Database,
|
||||
pub peer: Peer,
|
||||
}
|
||||
|
||||
impl DatabaseWithPeer {
|
||||
fn new(database: Database, peer: Peer) -> Self {
|
||||
Self { database, peer }
|
||||
}
|
||||
}
|
||||
|
||||
impl FrontendClient {
|
||||
pub fn from_meta_client(meta_client: Arc<MetaClient>) -> Self {
|
||||
Self::Distributed { meta_client }
|
||||
}
|
||||
|
||||
pub fn from_static_grpc_addr(addr: String) -> Self {
|
||||
let peer = Peer {
|
||||
id: 0,
|
||||
addr: addr.clone(),
|
||||
};
|
||||
|
||||
let client = client_from_urls(vec![addr]);
|
||||
let database = Database::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, client);
|
||||
Self::Standalone {
|
||||
database_client: DatabaseWithPeer::new(database, peer),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl FrontendClient {
|
||||
async fn scan_for_frontend(&self) -> Result<Vec<(NodeInfoKey, NodeInfo)>, Error> {
|
||||
let Self::Distributed { meta_client, .. } = self else {
|
||||
return Ok(vec![]);
|
||||
};
|
||||
let cluster_client = meta_client
|
||||
.cluster_client()
|
||||
.map_err(BoxedError::new)
|
||||
.context(ExternalSnafu)?;
|
||||
let cluster_id = meta_client.id().0;
|
||||
let prefix = NodeInfoKey::key_prefix_with_role(cluster_id, Role::Frontend);
|
||||
let req = RangeRequest::new().with_prefix(prefix);
|
||||
let resp = cluster_client
|
||||
.range(req)
|
||||
.await
|
||||
.map_err(BoxedError::new)
|
||||
.context(ExternalSnafu)?;
|
||||
let mut res = Vec::with_capacity(resp.kvs.len());
|
||||
for kv in resp.kvs {
|
||||
let key = NodeInfoKey::try_from(kv.key)
|
||||
.map_err(BoxedError::new)
|
||||
.context(ExternalSnafu)?;
|
||||
|
||||
let val = NodeInfo::try_from(kv.value)
|
||||
.map_err(BoxedError::new)
|
||||
.context(ExternalSnafu)?;
|
||||
res.push((key, val));
|
||||
}
|
||||
Ok(res)
|
||||
}
|
||||
|
||||
/// Get the database with max `last_activity_ts`
|
||||
async fn get_last_active_frontend(&self) -> Result<DatabaseWithPeer, Error> {
|
||||
if let Self::Standalone { database_client } = self {
|
||||
return Ok(database_client.clone());
|
||||
}
|
||||
|
||||
let frontends = self.scan_for_frontend().await?;
|
||||
let mut last_activity_ts = i64::MIN;
|
||||
let mut peer = None;
|
||||
for (_key, val) in frontends.iter() {
|
||||
if val.last_activity_ts > last_activity_ts {
|
||||
last_activity_ts = val.last_activity_ts;
|
||||
peer = Some(val.peer.clone());
|
||||
}
|
||||
}
|
||||
let Some(peer) = peer else {
|
||||
UnexpectedSnafu {
|
||||
reason: format!("No frontend available: {:?}", frontends),
|
||||
}
|
||||
.fail()?
|
||||
};
|
||||
let client = client_from_urls(vec![peer.addr.clone()]);
|
||||
let database = Database::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, client);
|
||||
Ok(DatabaseWithPeer::new(database, peer))
|
||||
}
|
||||
|
||||
/// Get a database client, and possibly update it before returning.
|
||||
pub async fn get_database_client(&self) -> Result<DatabaseWithPeer, Error> {
|
||||
match self {
|
||||
Self::Standalone { database_client } => Ok(database_client.clone()),
|
||||
Self::Distributed { meta_client: _ } => self.get_last_active_frontend().await,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -57,6 +57,7 @@ use crate::error::{
|
||||
};
|
||||
use crate::heartbeat::HeartbeatTask;
|
||||
use crate::metrics::{METRIC_FLOW_PROCESSING_TIME, METRIC_FLOW_ROWS};
|
||||
use crate::recording_rules::{FrontendClient, RecordingRuleEngine};
|
||||
use crate::transform::register_function_to_query_engine;
|
||||
use crate::utils::{SizeReportSender, StateReportHandler};
|
||||
use crate::{Error, FlowWorkerManager, FlownodeOptions};
|
||||
@@ -245,6 +246,7 @@ impl FlownodeInstance {
|
||||
self.server.shutdown().await.context(ShutdownServerSnafu)?;
|
||||
|
||||
if let Some(task) = &self.heartbeat_task {
|
||||
info!("Close heartbeat task for flownode");
|
||||
task.shutdown();
|
||||
}
|
||||
|
||||
@@ -271,6 +273,8 @@ pub struct FlownodeBuilder {
|
||||
heartbeat_task: Option<HeartbeatTask>,
|
||||
/// receive a oneshot sender to send state size report
|
||||
state_report_handler: Option<StateReportHandler>,
|
||||
/// Client to send sql to frontend
|
||||
frontend_client: Arc<FrontendClient>,
|
||||
}
|
||||
|
||||
impl FlownodeBuilder {
|
||||
@@ -281,6 +285,7 @@ impl FlownodeBuilder {
|
||||
table_meta: TableMetadataManagerRef,
|
||||
catalog_manager: CatalogManagerRef,
|
||||
flow_metadata_manager: FlowMetadataManagerRef,
|
||||
frontend_client: Arc<FrontendClient>,
|
||||
) -> Self {
|
||||
Self {
|
||||
opts,
|
||||
@@ -290,6 +295,7 @@ impl FlownodeBuilder {
|
||||
flow_metadata_manager,
|
||||
heartbeat_task: None,
|
||||
state_report_handler: None,
|
||||
frontend_client,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -447,7 +453,14 @@ impl FlownodeBuilder {
|
||||
|
||||
let node_id = self.opts.node_id.map(|id| id as u32);
|
||||
|
||||
let mut man = FlowWorkerManager::new(node_id, query_engine, table_meta);
|
||||
let rule_engine = RecordingRuleEngine::new(
|
||||
self.frontend_client.clone(),
|
||||
query_engine.clone(),
|
||||
self.flow_metadata_manager.clone(),
|
||||
table_meta.clone(),
|
||||
);
|
||||
|
||||
let mut man = FlowWorkerManager::new(node_id, query_engine, table_meta, rule_engine);
|
||||
for worker_id in 0..num_workers {
|
||||
let (tx, rx) = oneshot::channel();
|
||||
|
||||
|
||||
@@ -86,7 +86,8 @@ pub fn create_test_query_engine() -> Arc<dyn QueryEngine> {
|
||||
|
||||
let schema = vec![
|
||||
datatypes::schema::ColumnSchema::new("number", CDT::uint32_datatype(), false),
|
||||
datatypes::schema::ColumnSchema::new("ts", CDT::timestamp_millisecond_datatype(), false),
|
||||
datatypes::schema::ColumnSchema::new("ts", CDT::timestamp_millisecond_datatype(), false)
|
||||
.with_time_index(true),
|
||||
];
|
||||
let mut columns = vec![];
|
||||
let numbers = (1..=10).collect_vec();
|
||||
|
||||
@@ -112,6 +112,7 @@ impl MetaClientBuilder {
|
||||
.enable_store()
|
||||
.enable_heartbeat()
|
||||
.enable_procedure()
|
||||
.enable_access_cluster_info()
|
||||
}
|
||||
|
||||
pub fn enable_heartbeat(self) -> Self {
|
||||
|
||||
@@ -68,6 +68,7 @@ pub struct Inserter {
|
||||
catalog_manager: CatalogManagerRef,
|
||||
partition_manager: PartitionRuleManagerRef,
|
||||
node_manager: NodeManagerRef,
|
||||
#[allow(unused)]
|
||||
table_flownode_set_cache: TableFlownodeSetCacheRef,
|
||||
}
|
||||
|
||||
@@ -338,6 +339,8 @@ impl Inserter {
|
||||
instant_requests,
|
||||
} = requests;
|
||||
|
||||
// TODO(discord9): mirror some
|
||||
|
||||
// Mirror requests for source table to flownode asynchronously
|
||||
let flow_mirror_task = FlowMirrorTask::new(
|
||||
&self.table_flownode_set_cache,
|
||||
@@ -817,12 +820,14 @@ struct CreateAlterTableResult {
|
||||
table_infos: HashMap<TableId, Arc<TableInfo>>,
|
||||
}
|
||||
|
||||
#[allow(unused)]
|
||||
struct FlowMirrorTask {
|
||||
requests: HashMap<Peer, RegionInsertRequests>,
|
||||
num_rows: usize,
|
||||
}
|
||||
|
||||
impl FlowMirrorTask {
|
||||
#[allow(unused)]
|
||||
async fn new(
|
||||
cache: &TableFlownodeSetCacheRef,
|
||||
requests: impl Iterator<Item = &RegionInsertRequest>,
|
||||
@@ -896,6 +901,7 @@ impl FlowMirrorTask {
|
||||
})
|
||||
}
|
||||
|
||||
#[allow(unused)]
|
||||
fn detach(self, node_manager: NodeManagerRef) -> Result<()> {
|
||||
crate::metrics::DIST_MIRROR_PENDING_ROW_COUNT.add(self.num_rows as i64);
|
||||
for (peer, inserts) in self.requests {
|
||||
|
||||
@@ -40,7 +40,7 @@ use common_procedure::options::ProcedureConfig;
|
||||
use common_procedure::ProcedureManagerRef;
|
||||
use common_wal::config::{DatanodeWalConfig, MetasrvWalConfig};
|
||||
use datanode::datanode::DatanodeBuilder;
|
||||
use flow::FlownodeBuilder;
|
||||
use flow::{FlownodeBuilder, FrontendClient};
|
||||
use frontend::instance::builder::FrontendBuilder;
|
||||
use frontend::instance::{FrontendInstance, Instance, StandaloneDatanodeManager};
|
||||
use meta_srv::metasrv::{FLOW_ID_SEQ, TABLE_ID_SEQ};
|
||||
@@ -164,12 +164,15 @@ impl GreptimeDbStandaloneBuilder {
|
||||
Some(procedure_manager.clone()),
|
||||
);
|
||||
|
||||
let fe_server_addr = opts.frontend_options().grpc.bind_addr.clone();
|
||||
let frontend_client = FrontendClient::from_static_grpc_addr(fe_server_addr);
|
||||
let flow_builder = FlownodeBuilder::new(
|
||||
Default::default(),
|
||||
plugins.clone(),
|
||||
table_metadata_manager.clone(),
|
||||
catalog_manager.clone(),
|
||||
flow_metadata_manager.clone(),
|
||||
Arc::new(frontend_client),
|
||||
);
|
||||
let flownode = Arc::new(flow_builder.build().await.unwrap());
|
||||
|
||||
|
||||
Reference in New Issue
Block a user