mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2025-12-23 06:30:05 +00:00
Compare commits
4 Commits
feature/df
...
recording_
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
08161812ae | ||
|
|
33d1ba242f | ||
|
|
40ce94f3bf | ||
|
|
4628119a42 |
@@ -16,7 +16,6 @@
|
||||
|
||||
mod client;
|
||||
pub mod client_manager;
|
||||
#[cfg(feature = "testing")]
|
||||
mod database;
|
||||
pub mod error;
|
||||
pub mod flow;
|
||||
@@ -34,7 +33,6 @@ pub use common_recordbatch::{RecordBatches, SendableRecordBatchStream};
|
||||
use snafu::OptionExt;
|
||||
|
||||
pub use self::client::Client;
|
||||
#[cfg(feature = "testing")]
|
||||
pub use self::database::Database;
|
||||
pub use self::error::{Error, Result};
|
||||
use crate::error::{IllegalDatabaseResponseSnafu, ServerSnafu};
|
||||
|
||||
@@ -32,7 +32,7 @@ use common_meta::key::TableMetadataManager;
|
||||
use common_telemetry::info;
|
||||
use common_telemetry::logging::TracingOptions;
|
||||
use common_version::{short_version, version};
|
||||
use flow::{FlownodeBuilder, FlownodeInstance, FrontendInvoker};
|
||||
use flow::{FlownodeBuilder, FlownodeInstance, FrontendClient, FrontendInvoker};
|
||||
use meta_client::{MetaClientOptions, MetaClientType};
|
||||
use servers::Mode;
|
||||
use snafu::{OptionExt, ResultExt};
|
||||
@@ -317,6 +317,8 @@ impl StartCommand {
|
||||
Arc::new(executor),
|
||||
);
|
||||
|
||||
let frontend_client = FrontendClient::from_meta_client(meta_client.clone());
|
||||
|
||||
let flow_metadata_manager = Arc::new(FlowMetadataManager::new(cached_meta_backend.clone()));
|
||||
let flownode_builder = FlownodeBuilder::new(
|
||||
opts,
|
||||
@@ -324,6 +326,7 @@ impl StartCommand {
|
||||
table_metadata_manager,
|
||||
catalog_manager.clone(),
|
||||
flow_metadata_manager,
|
||||
Arc::new(frontend_client),
|
||||
)
|
||||
.with_heartbeat_task(heartbeat_task);
|
||||
|
||||
|
||||
@@ -54,7 +54,10 @@ use datanode::config::{DatanodeOptions, ProcedureConfig, RegionEngineConfig, Sto
|
||||
use datanode::datanode::{Datanode, DatanodeBuilder};
|
||||
use datanode::region_server::RegionServer;
|
||||
use file_engine::config::EngineConfig as FileEngineConfig;
|
||||
use flow::{FlowConfig, FlowWorkerManager, FlownodeBuilder, FlownodeOptions, FrontendInvoker};
|
||||
use flow::{
|
||||
FlowConfig, FlowWorkerManager, FlownodeBuilder, FlownodeOptions, FrontendClient,
|
||||
FrontendInvoker,
|
||||
};
|
||||
use frontend::frontend::FrontendOptions;
|
||||
use frontend::instance::builder::FrontendBuilder;
|
||||
use frontend::instance::{FrontendInstance, Instance as FeInstance, StandaloneDatanodeManager};
|
||||
@@ -533,12 +536,16 @@ impl StartCommand {
|
||||
flow: opts.flow.clone(),
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
let fe_server_addr = fe_opts.grpc.bind_addr.clone();
|
||||
let frontend_client = FrontendClient::from_static_grpc_addr(fe_server_addr);
|
||||
let flow_builder = FlownodeBuilder::new(
|
||||
flownode_options,
|
||||
plugins.clone(),
|
||||
table_metadata_manager.clone(),
|
||||
catalog_manager.clone(),
|
||||
flow_metadata_manager.clone(),
|
||||
Arc::new(frontend_client),
|
||||
);
|
||||
let flownode = Arc::new(
|
||||
flow_builder
|
||||
|
||||
@@ -343,6 +343,7 @@ pub enum FlowType {
|
||||
impl FlowType {
|
||||
pub const RECORDING_RULE: &str = "recording_rule";
|
||||
pub const STREAMING: &str = "streaming";
|
||||
pub const FLOW_TYPE_KEY: &str = "flow_type";
|
||||
}
|
||||
|
||||
impl Default for FlowType {
|
||||
@@ -398,7 +399,8 @@ impl From<&CreateFlowData> for CreateRequest {
|
||||
};
|
||||
|
||||
let flow_type = value.flow_type.unwrap_or_default().to_string();
|
||||
req.flow_options.insert("flow_type".to_string(), flow_type);
|
||||
req.flow_options
|
||||
.insert(FlowType::FLOW_TYPE_KEY.to_string(), flow_type);
|
||||
req
|
||||
}
|
||||
}
|
||||
@@ -430,7 +432,7 @@ impl From<&CreateFlowData> for (FlowInfoValue, Vec<(FlowPartitionId, FlowRouteVa
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let flow_type = value.flow_type.unwrap_or_default().to_string();
|
||||
options.insert("flow_type".to_string(), flow_type);
|
||||
options.insert(FlowType::FLOW_TYPE_KEY.to_string(), flow_type);
|
||||
|
||||
let flow_info = FlowInfoValue {
|
||||
source_table_ids: value.source_table_ids.clone(),
|
||||
|
||||
@@ -49,12 +49,13 @@ pub(crate) use crate::adapter::node_context::FlownodeContext;
|
||||
use crate::adapter::refill::RefillTask;
|
||||
use crate::adapter::table_source::ManagedTableSource;
|
||||
use crate::adapter::util::relation_desc_to_column_schemas_with_fallback;
|
||||
pub(crate) use crate::adapter::worker::{create_worker, Worker, WorkerHandle};
|
||||
pub(crate) use crate::adapter::worker::{create_worker, WorkerHandle};
|
||||
use crate::compute::ErrCollector;
|
||||
use crate::df_optimizer::sql_to_flow_plan;
|
||||
use crate::error::{EvalSnafu, ExternalSnafu, InternalSnafu, InvalidQuerySnafu, UnexpectedSnafu};
|
||||
use crate::expr::Batch;
|
||||
use crate::metrics::{METRIC_FLOW_INSERT_ELAPSED, METRIC_FLOW_ROWS, METRIC_FLOW_RUN_INTERVAL_MS};
|
||||
use crate::recording_rules::RecordingRuleEngine;
|
||||
use crate::repr::{self, DiffRow, RelationDesc, Row, BATCH_SIZE};
|
||||
|
||||
mod flownode_impl;
|
||||
@@ -171,6 +172,8 @@ pub struct FlowWorkerManager {
|
||||
flush_lock: RwLock<()>,
|
||||
/// receive a oneshot sender to send state size report
|
||||
state_report_handler: RwLock<Option<StateReportHandler>>,
|
||||
/// engine for recording rule
|
||||
rule_engine: RecordingRuleEngine,
|
||||
}
|
||||
|
||||
/// Building FlownodeManager
|
||||
@@ -185,6 +188,7 @@ impl FlowWorkerManager {
|
||||
node_id: Option<u32>,
|
||||
query_engine: Arc<dyn QueryEngine>,
|
||||
table_meta: TableMetadataManagerRef,
|
||||
rule_engine: RecordingRuleEngine,
|
||||
) -> Self {
|
||||
let srv_map = ManagedTableSource::new(
|
||||
table_meta.table_info_manager().clone(),
|
||||
@@ -207,6 +211,7 @@ impl FlowWorkerManager {
|
||||
node_id,
|
||||
flush_lock: RwLock::new(()),
|
||||
state_report_handler: RwLock::new(None),
|
||||
rule_engine,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -215,25 +220,6 @@ impl FlowWorkerManager {
|
||||
self
|
||||
}
|
||||
|
||||
/// Create a flownode manager with one worker
|
||||
pub fn new_with_workers<'s>(
|
||||
node_id: Option<u32>,
|
||||
query_engine: Arc<dyn QueryEngine>,
|
||||
table_meta: TableMetadataManagerRef,
|
||||
num_workers: usize,
|
||||
) -> (Self, Vec<Worker<'s>>) {
|
||||
let mut zelf = Self::new(node_id, query_engine, table_meta);
|
||||
|
||||
let workers: Vec<_> = (0..num_workers)
|
||||
.map(|_| {
|
||||
let (handle, worker) = create_worker();
|
||||
zelf.add_worker_handle(handle);
|
||||
worker
|
||||
})
|
||||
.collect();
|
||||
(zelf, workers)
|
||||
}
|
||||
|
||||
/// add a worker handler to manager, meaning this corresponding worker is under it's manage
|
||||
pub fn add_worker_handle(&mut self, handle: WorkerHandle) {
|
||||
self.worker_handles.push(handle);
|
||||
@@ -751,7 +737,11 @@ pub struct CreateFlowArgs {
|
||||
/// Create&Remove flow
|
||||
impl FlowWorkerManager {
|
||||
/// remove a flow by it's id
|
||||
#[allow(unreachable_code)]
|
||||
pub async fn remove_flow(&self, flow_id: FlowId) -> Result<(), Error> {
|
||||
// TODO(discord9): reroute some back to streaming engine later
|
||||
return self.rule_engine.remove_flow(flow_id).await;
|
||||
|
||||
for handle in self.worker_handles.iter() {
|
||||
if handle.contains_flow(flow_id).await? {
|
||||
handle.remove_flow(flow_id).await?;
|
||||
@@ -767,8 +757,10 @@ impl FlowWorkerManager {
|
||||
/// steps to create task:
|
||||
/// 1. parse query into typed plan(and optional parse expire_after expr)
|
||||
/// 2. render source/sink with output table id and used input table id
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
#[allow(clippy::too_many_arguments, unreachable_code)]
|
||||
pub async fn create_flow(&self, args: CreateFlowArgs) -> Result<Option<FlowId>, Error> {
|
||||
// TODO(discord9): reroute some back to streaming engine later
|
||||
return self.rule_engine.create_flow(args).await;
|
||||
let CreateFlowArgs {
|
||||
flow_id,
|
||||
sink_table_name,
|
||||
|
||||
@@ -153,7 +153,10 @@ impl Flownode for FlowWorkerManager {
|
||||
}
|
||||
}
|
||||
|
||||
#[allow(unreachable_code, unused)]
|
||||
async fn handle_inserts(&self, request: InsertRequests) -> Result<FlowResponse> {
|
||||
return Ok(Default::default());
|
||||
|
||||
// using try_read to ensure two things:
|
||||
// 1. flush wouldn't happen until inserts before it is inserted
|
||||
// 2. inserts happening concurrently with flush wouldn't be block by flush
|
||||
|
||||
@@ -16,6 +16,7 @@
|
||||
|
||||
use std::any::Any;
|
||||
|
||||
use arrow_schema::ArrowError;
|
||||
use common_error::ext::BoxedError;
|
||||
use common_error::{define_into_tonic_status, from_err_code_msg_to_header};
|
||||
use common_macro::stack_trace_debug;
|
||||
@@ -156,6 +157,15 @@ pub enum Error {
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Arrow error: {raw:?} in context: {context}"))]
|
||||
Arrow {
|
||||
#[snafu(source)]
|
||||
raw: ArrowError,
|
||||
context: String,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Datafusion error: {raw:?} in context: {context}"))]
|
||||
Datafusion {
|
||||
#[snafu(source)]
|
||||
@@ -230,6 +240,7 @@ impl ErrorExt for Error {
|
||||
match self {
|
||||
Self::Eval { .. }
|
||||
| Self::JoinTask { .. }
|
||||
| Self::Arrow { .. }
|
||||
| Self::Datafusion { .. }
|
||||
| Self::InsertIntoFlow { .. } => StatusCode::Internal,
|
||||
Self::FlowAlreadyExist { .. } => StatusCode::TableAlreadyExists,
|
||||
|
||||
@@ -238,6 +238,7 @@ mod test {
|
||||
|
||||
for (sql, current, expected) in &testcases {
|
||||
let plan = sql_to_substrait(engine.clone(), sql).await;
|
||||
|
||||
let mut ctx = create_test_ctx();
|
||||
let flow_plan = TypedPlan::from_substrait_plan(&mut ctx, &plan)
|
||||
.await
|
||||
|
||||
@@ -14,7 +14,6 @@
|
||||
|
||||
//! Send heartbeat from flownode to metasrv
|
||||
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use std::sync::Arc;
|
||||
|
||||
use api::v1::meta::{HeartbeatRequest, Peer};
|
||||
@@ -25,7 +24,7 @@ use common_meta::heartbeat::handler::{
|
||||
use common_meta::heartbeat::mailbox::{HeartbeatMailbox, MailboxRef, OutgoingMessage};
|
||||
use common_meta::heartbeat::utils::outgoing_message_to_mailbox_message;
|
||||
use common_meta::key::flow::flow_state::FlowStat;
|
||||
use common_telemetry::{debug, error, info, warn};
|
||||
use common_telemetry::{debug, error, info};
|
||||
use greptime_proto::v1::meta::NodeInfo;
|
||||
use meta_client::client::{HeartbeatSender, HeartbeatStream, MetaClient};
|
||||
use servers::addrs;
|
||||
@@ -66,7 +65,6 @@ pub struct HeartbeatTask {
|
||||
report_interval: Duration,
|
||||
retry_interval: Duration,
|
||||
resp_handler_executor: HeartbeatResponseHandlerExecutorRef,
|
||||
running: Arc<AtomicBool>,
|
||||
query_stat_size: Option<SizeReportSender>,
|
||||
}
|
||||
|
||||
@@ -89,20 +87,11 @@ impl HeartbeatTask {
|
||||
report_interval: heartbeat_opts.interval,
|
||||
retry_interval: heartbeat_opts.retry_interval,
|
||||
resp_handler_executor,
|
||||
running: Arc::new(AtomicBool::new(false)),
|
||||
query_stat_size: None,
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn start(&self) -> Result<(), Error> {
|
||||
if self
|
||||
.running
|
||||
.compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
|
||||
.is_err()
|
||||
{
|
||||
warn!("Heartbeat task started multiple times");
|
||||
return Ok(());
|
||||
}
|
||||
info!("Start to establish the heartbeat connection to metasrv.");
|
||||
let (req_sender, resp_stream) = self
|
||||
.meta_client
|
||||
@@ -125,13 +114,6 @@ impl HeartbeatTask {
|
||||
|
||||
pub fn shutdown(&self) {
|
||||
info!("Close heartbeat task for flownode");
|
||||
if self
|
||||
.running
|
||||
.compare_exchange(true, false, Ordering::AcqRel, Ordering::Acquire)
|
||||
.is_err()
|
||||
{
|
||||
warn!("Call close heartbeat task multiple times");
|
||||
}
|
||||
}
|
||||
|
||||
fn new_heartbeat_request(
|
||||
@@ -231,6 +213,8 @@ impl HeartbeatTask {
|
||||
// set the timeout to half of the report interval so that it wouldn't delay heartbeat if something went horribly wrong
|
||||
latest_report = query_flow_state(&query_stat_size, report_interval / 2).await;
|
||||
}
|
||||
|
||||
info!("flownode heartbeat task stopped.");
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
@@ -33,6 +33,7 @@ mod expr;
|
||||
pub mod heartbeat;
|
||||
mod metrics;
|
||||
mod plan;
|
||||
mod recording_rules;
|
||||
mod repr;
|
||||
mod server;
|
||||
mod transform;
|
||||
@@ -43,4 +44,5 @@ mod test_utils;
|
||||
|
||||
pub use adapter::{FlowConfig, FlowWorkerManager, FlowWorkerManagerRef, FlownodeOptions};
|
||||
pub use error::{Error, Result};
|
||||
pub use recording_rules::FrontendClient;
|
||||
pub use server::{FlownodeBuilder, FlownodeInstance, FlownodeServer, FrontendInvoker};
|
||||
|
||||
@@ -28,6 +28,32 @@ lazy_static! {
|
||||
&["table_id"]
|
||||
)
|
||||
.unwrap();
|
||||
pub static ref METRIC_FLOW_RULE_ENGINE_QUERY_TIME: HistogramVec = register_histogram_vec!(
|
||||
"greptime_flow_rule_engine_query_time",
|
||||
"flow rule engine query time",
|
||||
&["flow_id"],
|
||||
vec![
|
||||
0.0,
|
||||
1.,
|
||||
3.,
|
||||
5.,
|
||||
10.,
|
||||
20.,
|
||||
30.,
|
||||
60.,
|
||||
2. * 60.,
|
||||
5. * 60.,
|
||||
10. * 60.
|
||||
]
|
||||
)
|
||||
.unwrap();
|
||||
pub static ref METRIC_FLOW_RULE_ENGINE_SLOW_QUERY: HistogramVec = register_histogram_vec!(
|
||||
"greptime_flow_rule_engine_slow_query",
|
||||
"flow rule engine slow query",
|
||||
&["flow_id", "sql", "peer"],
|
||||
vec![60., 2. * 60., 3. * 60., 5. * 60., 10. * 60.]
|
||||
)
|
||||
.unwrap();
|
||||
pub static ref METRIC_FLOW_RUN_INTERVAL_MS: IntGauge =
|
||||
register_int_gauge!("greptime_flow_run_interval_ms", "flow run interval in ms").unwrap();
|
||||
pub static ref METRIC_FLOW_ROWS: IntCounterVec = register_int_counter_vec!(
|
||||
|
||||
744
src/flow/src/recording_rules.rs
Normal file
744
src/flow/src/recording_rules.rs
Normal file
@@ -0,0 +1,744 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
//! Run flow as recording rule which is time-window-aware normal query triggered every tick set by user
|
||||
|
||||
mod engine;
|
||||
mod frontend_client;
|
||||
|
||||
use std::collections::HashSet;
|
||||
use std::sync::Arc;
|
||||
|
||||
use common_error::ext::BoxedError;
|
||||
use common_recordbatch::DfRecordBatch;
|
||||
use common_time::timestamp::TimeUnit;
|
||||
use common_time::Timestamp;
|
||||
use datafusion::error::Result as DfResult;
|
||||
use datafusion::logical_expr::Expr;
|
||||
use datafusion::physical_planner::{DefaultPhysicalPlanner, PhysicalPlanner};
|
||||
use datafusion::prelude::SessionContext;
|
||||
use datafusion::sql::unparser::Unparser;
|
||||
use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeRecursion, TreeNodeRewriter};
|
||||
use datafusion_common::{Column, DFSchema, TableReference};
|
||||
use datafusion_expr::LogicalPlan;
|
||||
use datafusion_physical_expr::PhysicalExprRef;
|
||||
use datatypes::prelude::{ConcreteDataType, DataType};
|
||||
use datatypes::value::Value;
|
||||
use datatypes::vectors::{
|
||||
TimestampMicrosecondVector, TimestampMillisecondVector, TimestampNanosecondVector,
|
||||
TimestampSecondVector, Vector,
|
||||
};
|
||||
pub use engine::RecordingRuleEngine;
|
||||
pub use frontend_client::FrontendClient;
|
||||
use query::parser::QueryLanguageParser;
|
||||
use query::QueryEngineRef;
|
||||
use session::context::QueryContextRef;
|
||||
use snafu::{ensure, OptionExt, ResultExt};
|
||||
|
||||
use crate::df_optimizer::apply_df_optimizer;
|
||||
use crate::error::{ArrowSnafu, DatafusionSnafu, DatatypesSnafu, ExternalSnafu, UnexpectedSnafu};
|
||||
use crate::Error;
|
||||
|
||||
/// Convert sql to datafusion logical plan
|
||||
pub async fn sql_to_df_plan(
|
||||
query_ctx: QueryContextRef,
|
||||
engine: QueryEngineRef,
|
||||
sql: &str,
|
||||
optimize: bool,
|
||||
) -> Result<LogicalPlan, Error> {
|
||||
let stmt = QueryLanguageParser::parse_sql(sql, &query_ctx)
|
||||
.map_err(BoxedError::new)
|
||||
.context(ExternalSnafu)?;
|
||||
let plan = engine
|
||||
.planner()
|
||||
.plan(&stmt, query_ctx)
|
||||
.await
|
||||
.map_err(BoxedError::new)
|
||||
.context(ExternalSnafu)?;
|
||||
let plan = if optimize {
|
||||
apply_df_optimizer(plan).await?
|
||||
} else {
|
||||
plan
|
||||
};
|
||||
Ok(plan)
|
||||
}
|
||||
|
||||
/// Find nearest lower bound for time `current` in given `plan` for the time window expr.
|
||||
/// i.e. for time window expr being `date_bin(INTERVAL '5 minutes', ts) as time_window` and `current="2021-07-01 00:01:01.000"`,
|
||||
/// return `Some("2021-07-01 00:00:00.000")`
|
||||
/// if `plan` doesn't contain a `TIME INDEX` column, return `None`
|
||||
///
|
||||
/// Time window expr is a expr that:
|
||||
/// 1. ref only to a time index column
|
||||
/// 2. is monotonic increasing
|
||||
/// 3. show up in GROUP BY clause
|
||||
///
|
||||
/// note this plan should only contain one TableScan
|
||||
pub async fn find_plan_time_window_bound(
|
||||
plan: &LogicalPlan,
|
||||
current: Timestamp,
|
||||
query_ctx: QueryContextRef,
|
||||
engine: QueryEngineRef,
|
||||
) -> Result<(String, Option<Timestamp>, Option<Timestamp>), Error> {
|
||||
// TODO(discord9): find the expr that do time window
|
||||
let catalog_man = engine.engine_state().catalog_manager();
|
||||
|
||||
let mut table_name = None;
|
||||
// first find the table source in the logical plan
|
||||
plan.apply(|plan| {
|
||||
let LogicalPlan::TableScan(table_scan) = plan else {
|
||||
return Ok(TreeNodeRecursion::Continue);
|
||||
};
|
||||
table_name = Some(table_scan.table_name.clone());
|
||||
Ok(TreeNodeRecursion::Stop)
|
||||
})
|
||||
.with_context(|_| DatafusionSnafu {
|
||||
context: format!("Can't find table source in plan {plan:?}"),
|
||||
})?;
|
||||
let Some(table_name) = table_name else {
|
||||
UnexpectedSnafu {
|
||||
reason: format!("Can't find table source in plan {plan:?}"),
|
||||
}
|
||||
.fail()?
|
||||
};
|
||||
|
||||
let current_schema = query_ctx.current_schema();
|
||||
|
||||
let catalog_name = table_name.catalog().unwrap_or(query_ctx.current_catalog());
|
||||
let schema_name = table_name.schema().unwrap_or(¤t_schema);
|
||||
let table_name = table_name.table();
|
||||
|
||||
let Some(table_ref) = catalog_man
|
||||
.table(catalog_name, schema_name, table_name, Some(&query_ctx))
|
||||
.await
|
||||
.map_err(BoxedError::new)
|
||||
.context(ExternalSnafu)?
|
||||
else {
|
||||
UnexpectedSnafu {
|
||||
reason: format!(
|
||||
"Can't find table {table_name:?} in catalog {catalog_name:?}/{schema_name:?}"
|
||||
),
|
||||
}
|
||||
.fail()?
|
||||
};
|
||||
|
||||
let schema = &table_ref.table_info().meta.schema;
|
||||
|
||||
let ts_index = schema.timestamp_column().context(UnexpectedSnafu {
|
||||
reason: format!("Can't find timestamp column in table {table_name:?}"),
|
||||
})?;
|
||||
|
||||
let ts_col_name = ts_index.name.clone();
|
||||
|
||||
let expected_time_unit = ts_index.data_type.as_timestamp().with_context(|| UnexpectedSnafu {
|
||||
reason: format!(
|
||||
"Expected timestamp column {ts_col_name:?} in table {table_name:?} to be timestamp, but got {ts_index:?}"
|
||||
),
|
||||
})?.unit();
|
||||
|
||||
let ts_columns: HashSet<_> = HashSet::from_iter(vec![
|
||||
format!("{catalog_name}.{schema_name}.{table_name}.{ts_col_name}"),
|
||||
format!("{schema_name}.{table_name}.{ts_col_name}"),
|
||||
format!("{table_name}.{ts_col_name}"),
|
||||
format!("{ts_col_name}"),
|
||||
]);
|
||||
let ts_columns: HashSet<_> = ts_columns
|
||||
.into_iter()
|
||||
.map(Column::from_qualified_name)
|
||||
.collect();
|
||||
|
||||
let ts_columns_ref: HashSet<&Column> = ts_columns.iter().collect();
|
||||
|
||||
// find the time window expr which refers to the time index column
|
||||
let mut time_window_expr: Option<Expr> = None;
|
||||
let find_time_window_expr = |plan: &LogicalPlan| {
|
||||
let LogicalPlan::Aggregate(aggregate) = plan else {
|
||||
return Ok(TreeNodeRecursion::Continue);
|
||||
};
|
||||
|
||||
for group_expr in &aggregate.group_expr {
|
||||
let refs = group_expr.column_refs();
|
||||
if refs.len() != 1 {
|
||||
continue;
|
||||
}
|
||||
let ref_col = refs.iter().next().unwrap();
|
||||
if ts_columns_ref.contains(ref_col) {
|
||||
time_window_expr = Some(group_expr.clone());
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(TreeNodeRecursion::Stop)
|
||||
};
|
||||
plan.apply(find_time_window_expr)
|
||||
.with_context(|_| DatafusionSnafu {
|
||||
context: format!("Can't find time window expr in plan {plan:?}"),
|
||||
})?;
|
||||
|
||||
let arrow_schema = Arc::new(arrow_schema::Schema::new(vec![arrow_schema::Field::new(
|
||||
ts_col_name.clone(),
|
||||
ts_index.data_type.as_arrow_type(),
|
||||
false,
|
||||
)]));
|
||||
|
||||
let df_schema = DFSchema::from_field_specific_qualified_schema(
|
||||
vec![Some(TableReference::bare(table_name))],
|
||||
&arrow_schema,
|
||||
)
|
||||
.with_context(|_e| DatafusionSnafu {
|
||||
context: format!("Failed to create DFSchema from arrow schema {arrow_schema:?}"),
|
||||
})?;
|
||||
|
||||
// cast current to ts_index's type
|
||||
let new_current = current
|
||||
.convert_to(expected_time_unit)
|
||||
.with_context(|| UnexpectedSnafu {
|
||||
reason: format!("Failed to cast current timestamp {current:?} to {expected_time_unit}"),
|
||||
})?;
|
||||
|
||||
// if no time_window_expr is found, return None
|
||||
if let Some(time_window_expr) = time_window_expr {
|
||||
let lower_bound =
|
||||
find_expr_time_window_lower_bound(&time_window_expr, &df_schema, new_current)?;
|
||||
let upper_bound =
|
||||
find_expr_time_window_upper_bound(&time_window_expr, &df_schema, new_current)?;
|
||||
Ok((ts_col_name, lower_bound, upper_bound))
|
||||
} else {
|
||||
Ok((ts_col_name, None, None))
|
||||
}
|
||||
}
|
||||
|
||||
/// Find the lower bound of time window in given `expr` and `current` timestamp.
|
||||
///
|
||||
/// i.e. for `current="2021-07-01 00:01:01.000"` and `expr=date_bin(INTERVAL '5 minutes', ts) as time_window` and `ts_col=ts`,
|
||||
/// return `Some("2021-07-01 00:00:00.000")` since it's the lower bound
|
||||
/// of current time window given the current timestamp
|
||||
///
|
||||
/// if return None, meaning this time window have no lower bound
|
||||
fn find_expr_time_window_lower_bound(
|
||||
expr: &Expr,
|
||||
df_schema: &DFSchema,
|
||||
current: Timestamp,
|
||||
) -> Result<Option<Timestamp>, Error> {
|
||||
use std::cmp::Ordering;
|
||||
|
||||
let phy_planner = DefaultPhysicalPlanner::default();
|
||||
|
||||
let phy_expr: PhysicalExprRef = phy_planner
|
||||
.create_physical_expr(expr, df_schema, &SessionContext::new().state())
|
||||
.with_context(|_e| DatafusionSnafu {
|
||||
context: format!(
|
||||
"Failed to create physical expression from {expr:?} using {df_schema:?}"
|
||||
),
|
||||
})?;
|
||||
|
||||
let cur_time_window = eval_ts_to_ts(&phy_expr, df_schema, current)?;
|
||||
if cur_time_window == current {
|
||||
return Ok(Some(current));
|
||||
}
|
||||
|
||||
// search to find the lower bound
|
||||
let mut offset: i64 = 1;
|
||||
let lower_bound;
|
||||
let mut upper_bound = Some(current);
|
||||
// first expontial probe to found a range for binary search
|
||||
loop {
|
||||
let Some(next_val) = current.value().checked_sub(offset) else {
|
||||
// no lower bound
|
||||
return Ok(None);
|
||||
};
|
||||
|
||||
let prev_time_probe = common_time::Timestamp::new(next_val, current.unit());
|
||||
|
||||
let prev_time_window = eval_ts_to_ts(&phy_expr, df_schema, prev_time_probe)?;
|
||||
|
||||
match prev_time_window.cmp(&cur_time_window) {
|
||||
Ordering::Less => {
|
||||
lower_bound = Some(prev_time_probe);
|
||||
break;
|
||||
}
|
||||
Ordering::Equal => {
|
||||
upper_bound = Some(prev_time_probe);
|
||||
}
|
||||
Ordering::Greater => {
|
||||
UnexpectedSnafu {
|
||||
reason: format!(
|
||||
"Unsupported time window expression, expect monotonic increasing for time window expression {expr:?}"
|
||||
),
|
||||
}
|
||||
.fail()?
|
||||
}
|
||||
}
|
||||
|
||||
let Some(new_offset) = offset.checked_mul(2) else {
|
||||
// no lower bound
|
||||
return Ok(None);
|
||||
};
|
||||
offset = new_offset;
|
||||
}
|
||||
|
||||
// binary search for the exact lower bound
|
||||
|
||||
ensure!(lower_bound.map(|v|v.unit())==upper_bound.map(|v|v.unit()), UnexpectedSnafu{
|
||||
reason: format!(" unit mismatch for time window expression {expr:?}, found {lower_bound:?} and {upper_bound:?}"),
|
||||
});
|
||||
|
||||
let input_time_unit = lower_bound
|
||||
.context(UnexpectedSnafu {
|
||||
reason: "should have lower bound",
|
||||
})?
|
||||
.unit();
|
||||
|
||||
let mut low = lower_bound
|
||||
.context(UnexpectedSnafu {
|
||||
reason: "should have lower bound",
|
||||
})?
|
||||
.value();
|
||||
let mut high = upper_bound
|
||||
.context(UnexpectedSnafu {
|
||||
reason: "should have upper bound",
|
||||
})?
|
||||
.value();
|
||||
|
||||
while low < high {
|
||||
let mid = (low + high) / 2;
|
||||
let mid_probe = common_time::Timestamp::new(mid, input_time_unit);
|
||||
let mid_time_window = eval_ts_to_ts(&phy_expr, df_schema, mid_probe)?;
|
||||
|
||||
match mid_time_window.cmp(&cur_time_window) {
|
||||
Ordering::Less => low = mid + 1,
|
||||
Ordering::Equal => high = mid,
|
||||
Ordering::Greater => UnexpectedSnafu {
|
||||
reason: format!("Binary search failed for time window expression {expr:?}"),
|
||||
}
|
||||
.fail()?,
|
||||
}
|
||||
}
|
||||
|
||||
let final_lower_bound_for_time_window = common_time::Timestamp::new(low, input_time_unit);
|
||||
|
||||
Ok(Some(final_lower_bound_for_time_window))
|
||||
}
|
||||
|
||||
/// Find the upper bound for time window expression
|
||||
fn find_expr_time_window_upper_bound(
|
||||
expr: &Expr,
|
||||
df_schema: &DFSchema,
|
||||
current: Timestamp,
|
||||
) -> Result<Option<Timestamp>, Error> {
|
||||
use std::cmp::Ordering;
|
||||
|
||||
let phy_planner = DefaultPhysicalPlanner::default();
|
||||
|
||||
let phy_expr: PhysicalExprRef = phy_planner
|
||||
.create_physical_expr(expr, df_schema, &SessionContext::new().state())
|
||||
.with_context(|_e| DatafusionSnafu {
|
||||
context: format!(
|
||||
"Failed to create physical expression from {expr:?} using {df_schema:?}"
|
||||
),
|
||||
})?;
|
||||
|
||||
let cur_time_window = eval_ts_to_ts(&phy_expr, df_schema, current)?;
|
||||
|
||||
// search to find the lower bound
|
||||
let mut offset: i64 = 1;
|
||||
let mut lower_bound = Some(current);
|
||||
let upper_bound;
|
||||
// first expontial probe to found a range for binary search
|
||||
loop {
|
||||
let Some(next_val) = current.value().checked_add(offset) else {
|
||||
// no upper bound if overflow
|
||||
return Ok(None);
|
||||
};
|
||||
|
||||
let next_time_probe = common_time::Timestamp::new(next_val, current.unit());
|
||||
|
||||
let next_time_window = eval_ts_to_ts(&phy_expr, df_schema, next_time_probe)?;
|
||||
|
||||
match next_time_window.cmp(&cur_time_window) {
|
||||
Ordering::Less => {UnexpectedSnafu {
|
||||
reason: format!(
|
||||
"Unsupported time window expression, expect monotonic increasing for time window expression {expr:?}"
|
||||
),
|
||||
}
|
||||
.fail()?
|
||||
}
|
||||
Ordering::Equal => {
|
||||
lower_bound = Some(next_time_probe);
|
||||
}
|
||||
Ordering::Greater => {
|
||||
upper_bound = Some(next_time_probe);
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
let Some(new_offset) = offset.checked_mul(2) else {
|
||||
// no upper bound if overflow
|
||||
return Ok(None);
|
||||
};
|
||||
offset = new_offset;
|
||||
}
|
||||
|
||||
// binary search for the exact upper bound
|
||||
|
||||
ensure!(lower_bound.map(|v|v.unit())==upper_bound.map(|v|v.unit()), UnexpectedSnafu{
|
||||
reason: format!(" unit mismatch for time window expression {expr:?}, found {lower_bound:?} and {upper_bound:?}"),
|
||||
});
|
||||
|
||||
let output_unit = upper_bound
|
||||
.context(UnexpectedSnafu {
|
||||
reason: "should have lower bound",
|
||||
})?
|
||||
.unit();
|
||||
|
||||
let mut low = lower_bound
|
||||
.context(UnexpectedSnafu {
|
||||
reason: "should have lower bound",
|
||||
})?
|
||||
.value();
|
||||
let mut high = upper_bound
|
||||
.context(UnexpectedSnafu {
|
||||
reason: "should have upper bound",
|
||||
})?
|
||||
.value();
|
||||
while low < high {
|
||||
let mid = (low + high) / 2;
|
||||
let mid_probe = common_time::Timestamp::new(mid, output_unit);
|
||||
let mid_time_window = eval_ts_to_ts(&phy_expr, df_schema, mid_probe)?;
|
||||
|
||||
match mid_time_window.cmp(&cur_time_window) {
|
||||
Ordering::Less => UnexpectedSnafu {
|
||||
reason: format!("Binary search failed for time window expression {expr:?}"),
|
||||
}
|
||||
.fail()?,
|
||||
Ordering::Equal => low = mid + 1,
|
||||
Ordering::Greater => high = mid,
|
||||
}
|
||||
}
|
||||
|
||||
let final_upper_bound_for_time_window = common_time::Timestamp::new(high, output_unit);
|
||||
|
||||
Ok(Some(final_upper_bound_for_time_window))
|
||||
}
|
||||
|
||||
fn eval_ts_to_ts(
|
||||
phy: &PhysicalExprRef,
|
||||
df_schema: &DFSchema,
|
||||
input_value: Timestamp,
|
||||
) -> Result<Timestamp, Error> {
|
||||
let ts_vector = match input_value.unit() {
|
||||
TimeUnit::Second => {
|
||||
TimestampSecondVector::from_vec(vec![input_value.value()]).to_arrow_array()
|
||||
}
|
||||
TimeUnit::Millisecond => {
|
||||
TimestampMillisecondVector::from_vec(vec![input_value.value()]).to_arrow_array()
|
||||
}
|
||||
TimeUnit::Microsecond => {
|
||||
TimestampMicrosecondVector::from_vec(vec![input_value.value()]).to_arrow_array()
|
||||
}
|
||||
TimeUnit::Nanosecond => {
|
||||
TimestampNanosecondVector::from_vec(vec![input_value.value()]).to_arrow_array()
|
||||
}
|
||||
};
|
||||
|
||||
let rb = DfRecordBatch::try_new(df_schema.inner().clone(), vec![ts_vector.clone()])
|
||||
.with_context(|_| ArrowSnafu {
|
||||
context: format!("Failed to create record batch from {df_schema:?} and {ts_vector:?}"),
|
||||
})?;
|
||||
|
||||
let eval_res = phy.evaluate(&rb).with_context(|_| DatafusionSnafu {
|
||||
context: format!("Failed to evaluate physical expression {phy:?} on {rb:?}"),
|
||||
})?;
|
||||
|
||||
let val = match eval_res {
|
||||
datafusion_expr::ColumnarValue::Array(array) => {
|
||||
let ty = array.data_type();
|
||||
let ty = ConcreteDataType::from_arrow_type(ty);
|
||||
let time_unit = if let ConcreteDataType::Timestamp(ty) = ty {
|
||||
ty.unit()
|
||||
} else {
|
||||
return UnexpectedSnafu {
|
||||
reason: format!("Physical expression {phy:?} evaluated to non-timestamp type"),
|
||||
}
|
||||
.fail();
|
||||
};
|
||||
|
||||
match time_unit {
|
||||
TimeUnit::Second => TimestampSecondVector::try_from_arrow_array(array.clone())
|
||||
.with_context(|_| DatatypesSnafu {
|
||||
extra: format!("Failed to create vector from arrow array {array:?}"),
|
||||
})?
|
||||
.get(0),
|
||||
TimeUnit::Millisecond => {
|
||||
TimestampMillisecondVector::try_from_arrow_array(array.clone())
|
||||
.with_context(|_| DatatypesSnafu {
|
||||
extra: format!("Failed to create vector from arrow array {array:?}"),
|
||||
})?
|
||||
.get(0)
|
||||
}
|
||||
TimeUnit::Microsecond => {
|
||||
TimestampMicrosecondVector::try_from_arrow_array(array.clone())
|
||||
.with_context(|_| DatatypesSnafu {
|
||||
extra: format!("Failed to create vector from arrow array {array:?}"),
|
||||
})?
|
||||
.get(0)
|
||||
}
|
||||
TimeUnit::Nanosecond => {
|
||||
TimestampNanosecondVector::try_from_arrow_array(array.clone())
|
||||
.with_context(|_| DatatypesSnafu {
|
||||
extra: format!("Failed to create vector from arrow array {array:?}"),
|
||||
})?
|
||||
.get(0)
|
||||
}
|
||||
}
|
||||
}
|
||||
datafusion_expr::ColumnarValue::Scalar(scalar) => Value::try_from(scalar.clone())
|
||||
.with_context(|_| DatatypesSnafu {
|
||||
extra: format!("Failed to convert scalar {scalar:?} to value"),
|
||||
})?,
|
||||
};
|
||||
|
||||
if let Value::Timestamp(ts) = val {
|
||||
Ok(ts)
|
||||
} else {
|
||||
UnexpectedSnafu {
|
||||
reason: format!("Expected timestamp in expression {phy:?} but got {val:?}"),
|
||||
}
|
||||
.fail()?
|
||||
}
|
||||
}
|
||||
|
||||
// TODO(discord9): a method to found out the precise time window
|
||||
|
||||
/// Find out the `Filter` Node corresponding to outermost `WHERE` and add a new filter expr to it
|
||||
#[derive(Debug)]
|
||||
pub struct AddFilterRewriter {
|
||||
extra_filter: Expr,
|
||||
is_rewritten: bool,
|
||||
}
|
||||
|
||||
impl AddFilterRewriter {
|
||||
fn new(filter: Expr) -> Self {
|
||||
Self {
|
||||
extra_filter: filter,
|
||||
is_rewritten: false,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl TreeNodeRewriter for AddFilterRewriter {
|
||||
type Node = LogicalPlan;
|
||||
fn f_down(&mut self, node: Self::Node) -> DfResult<Transformed<Self::Node>> {
|
||||
if self.is_rewritten {
|
||||
return Ok(Transformed::no(node));
|
||||
}
|
||||
match node {
|
||||
LogicalPlan::Filter(mut filter) if !filter.having => {
|
||||
filter.predicate = filter.predicate.and(self.extra_filter.clone());
|
||||
self.is_rewritten = true;
|
||||
Ok(Transformed::yes(LogicalPlan::Filter(filter)))
|
||||
}
|
||||
LogicalPlan::TableScan(_) => {
|
||||
// add a new filter
|
||||
let filter =
|
||||
datafusion_expr::Filter::try_new(self.extra_filter.clone(), Arc::new(node))?;
|
||||
self.is_rewritten = true;
|
||||
Ok(Transformed::yes(LogicalPlan::Filter(filter)))
|
||||
}
|
||||
_ => Ok(Transformed::no(node)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn df_plan_to_sql(plan: &LogicalPlan) -> Result<String, Error> {
|
||||
let unparser = Unparser::default();
|
||||
let sql = unparser
|
||||
.plan_to_sql(plan)
|
||||
.with_context(|_e| DatafusionSnafu {
|
||||
context: format!("Failed to unparse logical plan {plan:?}"),
|
||||
})?;
|
||||
Ok(sql.to_string())
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use datafusion_common::tree_node::TreeNode;
|
||||
use pretty_assertions::assert_eq;
|
||||
use session::context::QueryContext;
|
||||
|
||||
use super::{sql_to_df_plan, *};
|
||||
use crate::recording_rules::{df_plan_to_sql, AddFilterRewriter};
|
||||
use crate::test_utils::create_test_query_engine;
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_add_filter() {
|
||||
let testcases = vec![
|
||||
(
|
||||
"SELECT number FROM numbers_with_ts GROUP BY number","SELECT numbers_with_ts.number FROM numbers_with_ts WHERE (number > 4) GROUP BY numbers_with_ts.number"
|
||||
),
|
||||
(
|
||||
"SELECT number FROM numbers_with_ts WHERE number < 2 OR number >10",
|
||||
"SELECT numbers_with_ts.number FROM numbers_with_ts WHERE (((numbers_with_ts.number < 2) OR (numbers_with_ts.number > 10)) AND (number > 4))"
|
||||
),
|
||||
(
|
||||
"SELECT date_bin('5 minutes', ts) as time_window FROM numbers_with_ts GROUP BY time_window",
|
||||
"SELECT date_bin('5 minutes', numbers_with_ts.ts) AS time_window FROM numbers_with_ts WHERE (number > 4) GROUP BY date_bin('5 minutes', numbers_with_ts.ts)"
|
||||
)
|
||||
];
|
||||
use datafusion_expr::{col, lit};
|
||||
let query_engine = create_test_query_engine();
|
||||
let ctx = QueryContext::arc();
|
||||
|
||||
for (before, after) in testcases {
|
||||
let sql = before;
|
||||
let plan = sql_to_df_plan(ctx.clone(), query_engine.clone(), sql, false)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let mut add_filter = AddFilterRewriter::new(col("number").gt(lit(4u32)));
|
||||
let plan = plan.rewrite(&mut add_filter).unwrap().data;
|
||||
let new_sql = df_plan_to_sql(&plan).unwrap();
|
||||
assert_eq!(after, new_sql);
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_plan_time_window_lower_bound() {
|
||||
use datafusion_expr::{col, lit};
|
||||
let query_engine = create_test_query_engine();
|
||||
let ctx = QueryContext::arc();
|
||||
|
||||
let testcases = [
|
||||
// same alias is not same column
|
||||
(
|
||||
"SELECT arrow_cast(date_bin(INTERVAL '1 MINS', numbers_with_ts.ts), 'Timestamp(Second, None)') AS ts FROM numbers_with_ts GROUP BY ts;",
|
||||
Timestamp::new(1740394109, TimeUnit::Second),
|
||||
(
|
||||
"ts".to_string(),
|
||||
Some(Timestamp::new(1740394109000, TimeUnit::Millisecond)),
|
||||
Some(Timestamp::new(1740394109001, TimeUnit::Millisecond)),
|
||||
),
|
||||
"SELECT arrow_cast(date_bin(INTERVAL '1 MINS', numbers_with_ts.ts), 'Timestamp(Second, None)') AS ts FROM numbers_with_ts WHERE ((ts >= CAST('2025-02-24 10:48:29' AS TIMESTAMP)) AND (ts <= CAST('2025-02-24 10:48:29.001' AS TIMESTAMP))) GROUP BY numbers_with_ts.ts"
|
||||
),
|
||||
// complex time window index
|
||||
(
|
||||
"SELECT arrow_cast(date_bin(INTERVAL '1 MINS', numbers_with_ts.ts), 'Timestamp(Second, None)') AS time_window FROM numbers_with_ts GROUP BY time_window;",
|
||||
Timestamp::new(1740394109, TimeUnit::Second),
|
||||
(
|
||||
"ts".to_string(),
|
||||
Some(Timestamp::new(1740394080, TimeUnit::Second)),
|
||||
Some(Timestamp::new(1740394140, TimeUnit::Second)),
|
||||
),
|
||||
"SELECT arrow_cast(date_bin(INTERVAL '1 MINS', numbers_with_ts.ts), 'Timestamp(Second, None)') AS time_window FROM numbers_with_ts WHERE ((ts >= CAST('2025-02-24 10:48:00' AS TIMESTAMP)) AND (ts <= CAST('2025-02-24 10:49:00' AS TIMESTAMP))) GROUP BY arrow_cast(date_bin(INTERVAL '1 MINS', numbers_with_ts.ts), 'Timestamp(Second, None)')"
|
||||
),
|
||||
// no time index
|
||||
(
|
||||
"SELECT date_bin('5 minutes', ts) FROM numbers_with_ts;",
|
||||
Timestamp::new(23, TimeUnit::Millisecond),
|
||||
("ts".to_string(), None, None),
|
||||
"SELECT date_bin('5 minutes', ts) FROM numbers_with_ts;"
|
||||
),
|
||||
// time index
|
||||
(
|
||||
"SELECT date_bin('5 minutes', ts) as time_window FROM numbers_with_ts GROUP BY time_window;",
|
||||
Timestamp::new(23, TimeUnit::Nanosecond),
|
||||
(
|
||||
"ts".to_string(),
|
||||
Some(Timestamp::new(0, TimeUnit::Millisecond)),
|
||||
Some(Timestamp::new(300000, TimeUnit::Millisecond)),
|
||||
),
|
||||
"SELECT date_bin('5 minutes', numbers_with_ts.ts) AS time_window FROM numbers_with_ts WHERE ((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts <= CAST('1970-01-01 00:05:00' AS TIMESTAMP))) GROUP BY date_bin('5 minutes', numbers_with_ts.ts)"
|
||||
),
|
||||
// on spot
|
||||
(
|
||||
"SELECT date_bin('5 minutes', ts) as time_window FROM numbers_with_ts GROUP BY time_window;",
|
||||
Timestamp::new(0, TimeUnit::Nanosecond),
|
||||
(
|
||||
"ts".to_string(),
|
||||
Some(Timestamp::new(0, TimeUnit::Millisecond)),
|
||||
Some(Timestamp::new(300000, TimeUnit::Millisecond)),
|
||||
),
|
||||
"SELECT date_bin('5 minutes', numbers_with_ts.ts) AS time_window FROM numbers_with_ts WHERE ((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts <= CAST('1970-01-01 00:05:00' AS TIMESTAMP))) GROUP BY date_bin('5 minutes', numbers_with_ts.ts)"
|
||||
),
|
||||
// different time unit
|
||||
(
|
||||
"SELECT date_bin('5 minutes', ts) as time_window FROM numbers_with_ts GROUP BY time_window;",
|
||||
Timestamp::new(23_000_000, TimeUnit::Nanosecond),
|
||||
(
|
||||
"ts".to_string(),
|
||||
Some(Timestamp::new(0, TimeUnit::Millisecond)),
|
||||
Some(Timestamp::new(300000, TimeUnit::Millisecond)),
|
||||
),
|
||||
"SELECT date_bin('5 minutes', numbers_with_ts.ts) AS time_window FROM numbers_with_ts WHERE ((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts <= CAST('1970-01-01 00:05:00' AS TIMESTAMP))) GROUP BY date_bin('5 minutes', numbers_with_ts.ts)"
|
||||
),
|
||||
// time index with other fields
|
||||
(
|
||||
"SELECT sum(number) as sum_up, date_bin('5 minutes', ts) as time_window FROM numbers_with_ts GROUP BY time_window;",
|
||||
Timestamp::new(23, TimeUnit::Millisecond),
|
||||
(
|
||||
"ts".to_string(),
|
||||
Some(Timestamp::new(0, TimeUnit::Millisecond)),
|
||||
Some(Timestamp::new(300000, TimeUnit::Millisecond)),
|
||||
),
|
||||
"SELECT sum(numbers_with_ts.number) AS sum_up, date_bin('5 minutes', numbers_with_ts.ts) AS time_window FROM numbers_with_ts WHERE ((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts <= CAST('1970-01-01 00:05:00' AS TIMESTAMP))) GROUP BY date_bin('5 minutes', numbers_with_ts.ts)"
|
||||
),
|
||||
// time index with other pks
|
||||
(
|
||||
"SELECT number, date_bin('5 minutes', ts) as time_window FROM numbers_with_ts GROUP BY time_window, number;",
|
||||
Timestamp::new(23, TimeUnit::Millisecond),
|
||||
(
|
||||
"ts".to_string(),
|
||||
Some(Timestamp::new(0, TimeUnit::Millisecond)),
|
||||
Some(Timestamp::new(300000, TimeUnit::Millisecond)),
|
||||
),
|
||||
"SELECT numbers_with_ts.number, date_bin('5 minutes', numbers_with_ts.ts) AS time_window FROM numbers_with_ts WHERE ((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts <= CAST('1970-01-01 00:05:00' AS TIMESTAMP))) GROUP BY date_bin('5 minutes', numbers_with_ts.ts), numbers_with_ts.number"
|
||||
),
|
||||
];
|
||||
|
||||
for (sql, current, expected, unparsed) in testcases {
|
||||
let plan = sql_to_df_plan(ctx.clone(), query_engine.clone(), sql, true)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let real =
|
||||
find_plan_time_window_bound(&plan, current, ctx.clone(), query_engine.clone())
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(expected, real);
|
||||
|
||||
let plan = sql_to_df_plan(ctx.clone(), query_engine.clone(), sql, false)
|
||||
.await
|
||||
.unwrap();
|
||||
let (col_name, lower, upper) = real;
|
||||
let new_sql = if lower.is_some() {
|
||||
let to_df_literal = |value| {
|
||||
let value = Value::from(value);
|
||||
|
||||
value.try_to_scalar_value(&value.data_type()).unwrap()
|
||||
};
|
||||
let lower = to_df_literal(lower.unwrap());
|
||||
let upper = to_df_literal(upper.unwrap());
|
||||
let expr = col(&col_name)
|
||||
.gt_eq(lit(lower))
|
||||
.and(col(&col_name).lt_eq(lit(upper)));
|
||||
let mut add_filter = AddFilterRewriter::new(expr);
|
||||
let plan = plan.rewrite(&mut add_filter).unwrap().data;
|
||||
df_plan_to_sql(&plan).unwrap()
|
||||
} else {
|
||||
sql.to_string()
|
||||
};
|
||||
assert_eq!(unparsed, new_sql);
|
||||
}
|
||||
}
|
||||
}
|
||||
407
src/flow/src/recording_rules/engine.rs
Normal file
407
src/flow/src/recording_rules/engine.rs
Normal file
@@ -0,0 +1,407 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::collections::BTreeMap;
|
||||
use std::sync::Arc;
|
||||
use std::time::{Duration, SystemTime, UNIX_EPOCH};
|
||||
|
||||
use common_meta::ddl::create_flow::FlowType;
|
||||
use common_telemetry::tracing::warn;
|
||||
use common_telemetry::{debug, info};
|
||||
use common_time::Timestamp;
|
||||
use datafusion_common::tree_node::TreeNode;
|
||||
use datatypes::value::Value;
|
||||
use query::QueryEngineRef;
|
||||
use session::context::QueryContextRef;
|
||||
use snafu::{ensure, ResultExt};
|
||||
use tokio::sync::oneshot::error::TryRecvError;
|
||||
use tokio::sync::{oneshot, RwLock};
|
||||
use tokio::time::Instant;
|
||||
|
||||
use super::frontend_client::FrontendClient;
|
||||
use super::{df_plan_to_sql, AddFilterRewriter};
|
||||
use crate::adapter::{CreateFlowArgs, FlowId};
|
||||
use crate::error::{DatafusionSnafu, DatatypesSnafu, FlowAlreadyExistSnafu, UnexpectedSnafu};
|
||||
use crate::metrics::{METRIC_FLOW_RULE_ENGINE_QUERY_TIME, METRIC_FLOW_RULE_ENGINE_SLOW_QUERY};
|
||||
use crate::recording_rules::{find_plan_time_window_bound, sql_to_df_plan};
|
||||
use crate::Error;
|
||||
|
||||
/// TODO(discord9): make those constants configurable
|
||||
/// The default rule engine query timeout is 10 minutes
|
||||
pub const DEFAULT_RULE_ENGINE_QUERY_TIMEOUT: Duration = Duration::from_secs(10 * 60);
|
||||
|
||||
/// will output a warn log for any query that runs for more that 1 minutes, and also every 1 minutes when that query is still running
|
||||
pub const SLOW_QUERY_THRESHOLD: Duration = Duration::from_secs(60);
|
||||
|
||||
/// TODO(discord9): determine how to configure refresh rate
|
||||
pub struct RecordingRuleEngine {
|
||||
tasks: RwLock<BTreeMap<FlowId, RecordingRuleTask>>,
|
||||
shutdown_txs: RwLock<BTreeMap<FlowId, oneshot::Sender<()>>>,
|
||||
frontend_client: Arc<FrontendClient>,
|
||||
engine: QueryEngineRef,
|
||||
}
|
||||
|
||||
impl RecordingRuleEngine {
|
||||
pub fn new(frontend_client: Arc<FrontendClient>, engine: QueryEngineRef) -> Self {
|
||||
Self {
|
||||
tasks: Default::default(),
|
||||
shutdown_txs: Default::default(),
|
||||
frontend_client,
|
||||
engine,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
const MIN_REFRESH_DURATION: Duration = Duration::new(5, 0);
|
||||
|
||||
impl RecordingRuleEngine {
|
||||
pub async fn create_flow(&self, args: CreateFlowArgs) -> Result<Option<FlowId>, Error> {
|
||||
let CreateFlowArgs {
|
||||
flow_id,
|
||||
sink_table_name,
|
||||
source_table_ids: _,
|
||||
create_if_not_exists,
|
||||
or_replace,
|
||||
expire_after,
|
||||
comment: _,
|
||||
sql,
|
||||
flow_options,
|
||||
query_ctx,
|
||||
} = args;
|
||||
|
||||
// or replace logic
|
||||
{
|
||||
let is_exist = self.tasks.read().await.contains_key(&flow_id);
|
||||
match (create_if_not_exists, or_replace, is_exist) {
|
||||
// if replace, ignore that old flow exists
|
||||
(_, true, true) => {
|
||||
info!("Replacing flow with id={}", flow_id);
|
||||
}
|
||||
(false, false, true) => FlowAlreadyExistSnafu { id: flow_id }.fail()?,
|
||||
// already exists, and not replace, return None
|
||||
(true, false, true) => {
|
||||
info!("Flow with id={} already exists, do nothing", flow_id);
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
// continue as normal
|
||||
(_, _, false) => (),
|
||||
}
|
||||
}
|
||||
|
||||
let flow_type = flow_options.get(FlowType::FLOW_TYPE_KEY);
|
||||
|
||||
ensure!(
|
||||
flow_type == Some(&FlowType::RecordingRule.to_string()) || flow_type.is_none(),
|
||||
UnexpectedSnafu {
|
||||
reason: format!("Flow type is not RecordingRule nor None, got {flow_type:?}")
|
||||
}
|
||||
);
|
||||
|
||||
let Some(query_ctx) = query_ctx else {
|
||||
UnexpectedSnafu {
|
||||
reason: "Query context is None".to_string(),
|
||||
}
|
||||
.fail()?
|
||||
};
|
||||
|
||||
let (tx, rx) = oneshot::channel();
|
||||
let task = RecordingRuleTask::new(
|
||||
flow_id,
|
||||
&sql,
|
||||
expire_after,
|
||||
sink_table_name,
|
||||
Arc::new(query_ctx),
|
||||
rx,
|
||||
);
|
||||
|
||||
let task_inner = task.clone();
|
||||
let engine = self.engine.clone();
|
||||
let frontend = self.frontend_client.clone();
|
||||
|
||||
// TODO(discord9): also save handle & use time wheel or what for better
|
||||
let _handle = common_runtime::spawn_global(async move {
|
||||
match task_inner.start_executing(engine, frontend).await {
|
||||
Ok(()) => info!("Flow {} shutdown", task_inner.flow_id),
|
||||
Err(err) => common_telemetry::error!(
|
||||
"Flow {} encounter unrecoverable error: {err:?}",
|
||||
task_inner.flow_id
|
||||
),
|
||||
}
|
||||
});
|
||||
|
||||
// TODO(discord9): deal with replace logic
|
||||
let replaced_old_task_opt = self.tasks.write().await.insert(flow_id, task);
|
||||
drop(replaced_old_task_opt);
|
||||
|
||||
self.shutdown_txs.write().await.insert(flow_id, tx);
|
||||
|
||||
Ok(Some(flow_id))
|
||||
}
|
||||
|
||||
pub async fn remove_flow(&self, flow_id: FlowId) -> Result<(), Error> {
|
||||
if self.tasks.write().await.remove(&flow_id).is_none() {
|
||||
warn!("Flow {flow_id} not found in tasks")
|
||||
}
|
||||
let Some(tx) = self.shutdown_txs.write().await.remove(&flow_id) else {
|
||||
UnexpectedSnafu {
|
||||
reason: format!("Can't found shutdown tx for flow {flow_id}"),
|
||||
}
|
||||
.fail()?
|
||||
};
|
||||
if tx.send(()).is_err() {
|
||||
warn!("Fail to shutdown flow {flow_id} due to receiver already dropped, maybe flow {flow_id} is already dropped?")
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct RecordingRuleTask {
|
||||
flow_id: FlowId,
|
||||
query: String,
|
||||
/// in seconds
|
||||
expire_after: Option<i64>,
|
||||
sink_table_name: [String; 3],
|
||||
state: Arc<RwLock<RecordingRuleState>>,
|
||||
}
|
||||
|
||||
impl RecordingRuleTask {
|
||||
pub fn new(
|
||||
flow_id: FlowId,
|
||||
query: &str,
|
||||
expire_after: Option<i64>,
|
||||
sink_table_name: [String; 3],
|
||||
query_ctx: QueryContextRef,
|
||||
shutdown_rx: oneshot::Receiver<()>,
|
||||
) -> Self {
|
||||
Self {
|
||||
flow_id,
|
||||
query: query.to_string(),
|
||||
expire_after,
|
||||
sink_table_name,
|
||||
state: Arc::new(RwLock::new(RecordingRuleState::new(query_ctx, shutdown_rx))),
|
||||
}
|
||||
}
|
||||
}
|
||||
impl RecordingRuleTask {
|
||||
/// This should be called in a new tokio task
|
||||
pub async fn start_executing(
|
||||
&self,
|
||||
engine: QueryEngineRef,
|
||||
frontend_client: Arc<FrontendClient>,
|
||||
) -> Result<(), Error> {
|
||||
// only first query don't need upper bound
|
||||
let mut is_first = true;
|
||||
|
||||
loop {
|
||||
// FIXME(discord9): test if need upper bound also works
|
||||
let new_query = self
|
||||
.gen_query_with_time_window(engine.clone(), false)
|
||||
.await?;
|
||||
|
||||
let insert_into = format!(
|
||||
"INSERT INTO {}.{}.{} {}",
|
||||
self.sink_table_name[0],
|
||||
self.sink_table_name[1],
|
||||
self.sink_table_name[2],
|
||||
new_query
|
||||
);
|
||||
|
||||
if is_first {
|
||||
is_first = false;
|
||||
}
|
||||
|
||||
let instant = Instant::now();
|
||||
let flow_id = self.flow_id;
|
||||
let db_client = frontend_client.get_database_client().await?;
|
||||
let peer_addr = db_client.peer.addr;
|
||||
debug!(
|
||||
"Executing flow {flow_id}(expire_after={:?} secs) on {:?} with query {}",
|
||||
self.expire_after, peer_addr, &insert_into
|
||||
);
|
||||
|
||||
let timer = METRIC_FLOW_RULE_ENGINE_QUERY_TIME
|
||||
.with_label_values(&[flow_id.to_string().as_str()])
|
||||
.start_timer();
|
||||
|
||||
let res = db_client.database.sql(&insert_into).await;
|
||||
drop(timer);
|
||||
|
||||
let elapsed = instant.elapsed();
|
||||
if let Ok(res1) = &res {
|
||||
debug!(
|
||||
"Flow {flow_id} executed, result: {res1:?}, elapsed: {:?}",
|
||||
elapsed
|
||||
);
|
||||
} else if let Err(res) = &res {
|
||||
warn!(
|
||||
"Failed to execute Flow {flow_id} on frontend {}, result: {res:?}, elapsed: {:?} with query: {}",
|
||||
peer_addr, elapsed, &insert_into
|
||||
);
|
||||
}
|
||||
|
||||
// record slow query
|
||||
if elapsed >= SLOW_QUERY_THRESHOLD {
|
||||
warn!(
|
||||
"Flow {flow_id} on frontend {} executed for {:?} before complete, query: {}",
|
||||
peer_addr, elapsed, &insert_into
|
||||
);
|
||||
METRIC_FLOW_RULE_ENGINE_SLOW_QUERY
|
||||
.with_label_values(&[flow_id.to_string().as_str(), &insert_into, &peer_addr])
|
||||
.observe(elapsed.as_secs_f64());
|
||||
}
|
||||
|
||||
self.state
|
||||
.write()
|
||||
.await
|
||||
.after_query_exec(elapsed, res.is_ok());
|
||||
|
||||
let sleep_until = {
|
||||
let mut state = self.state.write().await;
|
||||
match state.shutdown_rx.try_recv() {
|
||||
Ok(()) => break Ok(()),
|
||||
Err(TryRecvError::Closed) => {
|
||||
warn!("Unexpected shutdown flow {flow_id}, shutdown anyway");
|
||||
break Ok(());
|
||||
}
|
||||
Err(TryRecvError::Empty) => (),
|
||||
}
|
||||
state.get_next_start_query_time(None)
|
||||
};
|
||||
tokio::time::sleep_until(sleep_until).await;
|
||||
}
|
||||
}
|
||||
|
||||
async fn gen_query_with_time_window(
|
||||
&self,
|
||||
engine: QueryEngineRef,
|
||||
need_upper_bound: bool,
|
||||
) -> Result<String, Error> {
|
||||
let query_ctx = self.state.read().await.query_ctx.clone();
|
||||
let start = SystemTime::now();
|
||||
let since_the_epoch = start
|
||||
.duration_since(UNIX_EPOCH)
|
||||
.expect("Time went backwards");
|
||||
let low_bound = self
|
||||
.expire_after
|
||||
.map(|e| since_the_epoch.as_secs() - e as u64);
|
||||
|
||||
let Some(low_bound) = low_bound else {
|
||||
return Ok(self.query.clone());
|
||||
};
|
||||
|
||||
let low_bound = Timestamp::new_second(low_bound as i64);
|
||||
|
||||
let plan = sql_to_df_plan(query_ctx.clone(), engine.clone(), &self.query, true).await?;
|
||||
|
||||
let (col_name, lower, upper) =
|
||||
find_plan_time_window_bound(&plan, low_bound, query_ctx.clone(), engine.clone())
|
||||
.await?;
|
||||
|
||||
let new_sql = {
|
||||
let to_df_literal = |value| -> Result<_, Error> {
|
||||
let value = Value::from(value);
|
||||
let value = value
|
||||
.try_to_scalar_value(&value.data_type())
|
||||
.with_context(|_| DatatypesSnafu {
|
||||
extra: format!("Failed to convert to scalar value: {}", value),
|
||||
})?;
|
||||
Ok(value)
|
||||
};
|
||||
let lower = lower.map(to_df_literal).transpose()?;
|
||||
let upper = upper.map(to_df_literal).transpose()?.and_then(|u| {
|
||||
if need_upper_bound {
|
||||
Some(u)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
});
|
||||
let expr = {
|
||||
use datafusion_expr::{col, lit};
|
||||
match (lower, upper) {
|
||||
(Some(l), Some(u)) => col(&col_name)
|
||||
.gt_eq(lit(l))
|
||||
.and(col(&col_name).lt_eq(lit(u))),
|
||||
(Some(l), None) => col(&col_name).gt_eq(lit(l)),
|
||||
(None, Some(u)) => col(&col_name).lt(lit(u)),
|
||||
// no time window, direct return
|
||||
(None, None) => return Ok(self.query.clone()),
|
||||
}
|
||||
};
|
||||
|
||||
let mut add_filter = AddFilterRewriter::new(expr);
|
||||
// make a not optimized plan for clearer unparse
|
||||
let plan =
|
||||
sql_to_df_plan(query_ctx.clone(), engine.clone(), &self.query, false).await?;
|
||||
let plan = plan
|
||||
.clone()
|
||||
.rewrite(&mut add_filter)
|
||||
.with_context(|_| DatafusionSnafu {
|
||||
context: format!("Failed to rewrite plan {plan:?}"),
|
||||
})?
|
||||
.data;
|
||||
df_plan_to_sql(&plan)?
|
||||
};
|
||||
|
||||
Ok(new_sql)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct RecordingRuleState {
|
||||
query_ctx: QueryContextRef,
|
||||
/// last query complete time
|
||||
last_update_time: Instant,
|
||||
/// last time query duration
|
||||
last_query_duration: Duration,
|
||||
exec_state: ExecState,
|
||||
shutdown_rx: oneshot::Receiver<()>,
|
||||
}
|
||||
|
||||
impl RecordingRuleState {
|
||||
pub fn new(query_ctx: QueryContextRef, shutdown_rx: oneshot::Receiver<()>) -> Self {
|
||||
Self {
|
||||
query_ctx,
|
||||
last_update_time: Instant::now(),
|
||||
last_query_duration: Duration::from_secs(0),
|
||||
exec_state: ExecState::Idle,
|
||||
shutdown_rx,
|
||||
}
|
||||
}
|
||||
|
||||
/// called after last query is done
|
||||
/// `is_succ` indicate whether the last query is successful
|
||||
pub fn after_query_exec(&mut self, elapsed: Duration, _is_succ: bool) {
|
||||
self.exec_state = ExecState::Idle;
|
||||
self.last_query_duration = elapsed;
|
||||
self.last_update_time = Instant::now();
|
||||
}
|
||||
|
||||
/// wait for at least `last_query_duration`, at most `max_timeout` to start next query
|
||||
pub fn get_next_start_query_time(&self, max_timeout: Option<Duration>) -> Instant {
|
||||
let next_duration = max_timeout
|
||||
.unwrap_or(self.last_query_duration)
|
||||
.min(self.last_query_duration);
|
||||
let next_duration = next_duration.max(MIN_REFRESH_DURATION);
|
||||
|
||||
self.last_update_time + next_duration
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
enum ExecState {
|
||||
Idle,
|
||||
Executing,
|
||||
}
|
||||
150
src/flow/src/recording_rules/frontend_client.rs
Normal file
150
src/flow/src/recording_rules/frontend_client.rs
Normal file
@@ -0,0 +1,150 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
//! Frontend client to run flow as recording rule which is time-window-aware normal query triggered every tick set by user
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
use client::{Client, Database, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
|
||||
use common_error::ext::BoxedError;
|
||||
use common_grpc::channel_manager::{ChannelConfig, ChannelManager};
|
||||
use common_meta::cluster::{NodeInfo, NodeInfoKey, Role};
|
||||
use common_meta::peer::Peer;
|
||||
use common_meta::rpc::store::RangeRequest;
|
||||
use meta_client::client::MetaClient;
|
||||
use snafu::ResultExt;
|
||||
|
||||
use crate::error::{ExternalSnafu, UnexpectedSnafu};
|
||||
use crate::recording_rules::engine::DEFAULT_RULE_ENGINE_QUERY_TIMEOUT;
|
||||
use crate::Error;
|
||||
|
||||
fn default_channel_mgr() -> ChannelManager {
|
||||
let cfg = ChannelConfig::new().timeout(DEFAULT_RULE_ENGINE_QUERY_TIMEOUT);
|
||||
ChannelManager::with_config(cfg)
|
||||
}
|
||||
|
||||
fn client_from_urls(addrs: Vec<String>) -> Client {
|
||||
Client::with_manager_and_urls(default_channel_mgr(), addrs)
|
||||
}
|
||||
|
||||
/// A simple frontend client able to execute sql using grpc protocol
|
||||
#[derive(Debug)]
|
||||
pub enum FrontendClient {
|
||||
Distributed {
|
||||
meta_client: Arc<MetaClient>,
|
||||
},
|
||||
Standalone {
|
||||
/// for the sake of simplicity still use grpc even in standalone mode
|
||||
/// notice the client here should all be lazy, so that can wait after frontend is booted then make conn
|
||||
/// TODO(discord9): not use grpc under standalone mode
|
||||
database_client: DatabaseWithPeer,
|
||||
},
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct DatabaseWithPeer {
|
||||
pub database: Database,
|
||||
pub peer: Peer,
|
||||
}
|
||||
|
||||
impl DatabaseWithPeer {
|
||||
fn new(database: Database, peer: Peer) -> Self {
|
||||
Self { database, peer }
|
||||
}
|
||||
}
|
||||
|
||||
impl FrontendClient {
|
||||
pub fn from_meta_client(meta_client: Arc<MetaClient>) -> Self {
|
||||
Self::Distributed { meta_client }
|
||||
}
|
||||
|
||||
pub fn from_static_grpc_addr(addr: String) -> Self {
|
||||
let peer = Peer {
|
||||
id: 0,
|
||||
addr: addr.clone(),
|
||||
};
|
||||
|
||||
let client = client_from_urls(vec![addr]);
|
||||
let database = Database::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, client);
|
||||
Self::Standalone {
|
||||
database_client: DatabaseWithPeer::new(database, peer),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl FrontendClient {
|
||||
async fn scan_for_frontend(&self) -> Result<Vec<(NodeInfoKey, NodeInfo)>, Error> {
|
||||
let Self::Distributed { meta_client, .. } = self else {
|
||||
return Ok(vec![]);
|
||||
};
|
||||
let cluster_client = meta_client
|
||||
.cluster_client()
|
||||
.map_err(BoxedError::new)
|
||||
.context(ExternalSnafu)?;
|
||||
let cluster_id = meta_client.id().0;
|
||||
let prefix = NodeInfoKey::key_prefix_with_role(cluster_id, Role::Frontend);
|
||||
let req = RangeRequest::new().with_prefix(prefix);
|
||||
let resp = cluster_client
|
||||
.range(req)
|
||||
.await
|
||||
.map_err(BoxedError::new)
|
||||
.context(ExternalSnafu)?;
|
||||
let mut res = Vec::with_capacity(resp.kvs.len());
|
||||
for kv in resp.kvs {
|
||||
let key = NodeInfoKey::try_from(kv.key)
|
||||
.map_err(BoxedError::new)
|
||||
.context(ExternalSnafu)?;
|
||||
|
||||
let val = NodeInfo::try_from(kv.value)
|
||||
.map_err(BoxedError::new)
|
||||
.context(ExternalSnafu)?;
|
||||
res.push((key, val));
|
||||
}
|
||||
Ok(res)
|
||||
}
|
||||
|
||||
/// Get the database with max `last_activity_ts`
|
||||
async fn get_last_active_frontend(&self) -> Result<DatabaseWithPeer, Error> {
|
||||
if let Self::Standalone { database_client } = self {
|
||||
return Ok(database_client.clone());
|
||||
}
|
||||
|
||||
let frontends = self.scan_for_frontend().await?;
|
||||
let mut last_activity_ts = i64::MIN;
|
||||
let mut peer = None;
|
||||
for (_key, val) in frontends.iter() {
|
||||
if val.last_activity_ts > last_activity_ts {
|
||||
last_activity_ts = val.last_activity_ts;
|
||||
peer = Some(val.peer.clone());
|
||||
}
|
||||
}
|
||||
let Some(peer) = peer else {
|
||||
UnexpectedSnafu {
|
||||
reason: format!("No frontend available: {:?}", frontends),
|
||||
}
|
||||
.fail()?
|
||||
};
|
||||
let client = client_from_urls(vec![peer.addr.clone()]);
|
||||
let database = Database::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, client);
|
||||
Ok(DatabaseWithPeer::new(database, peer))
|
||||
}
|
||||
|
||||
/// Get a database client, and possibly update it before returning.
|
||||
pub async fn get_database_client(&self) -> Result<DatabaseWithPeer, Error> {
|
||||
match self {
|
||||
Self::Standalone { database_client } => Ok(database_client.clone()),
|
||||
Self::Distributed { meta_client: _ } => self.get_last_active_frontend().await,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -57,6 +57,7 @@ use crate::error::{
|
||||
};
|
||||
use crate::heartbeat::HeartbeatTask;
|
||||
use crate::metrics::{METRIC_FLOW_PROCESSING_TIME, METRIC_FLOW_ROWS};
|
||||
use crate::recording_rules::{FrontendClient, RecordingRuleEngine};
|
||||
use crate::transform::register_function_to_query_engine;
|
||||
use crate::utils::{SizeReportSender, StateReportHandler};
|
||||
use crate::{Error, FlowWorkerManager, FlownodeOptions};
|
||||
@@ -245,6 +246,7 @@ impl FlownodeInstance {
|
||||
self.server.shutdown().await.context(ShutdownServerSnafu)?;
|
||||
|
||||
if let Some(task) = &self.heartbeat_task {
|
||||
info!("Close heartbeat task for flownode");
|
||||
task.shutdown();
|
||||
}
|
||||
|
||||
@@ -271,6 +273,8 @@ pub struct FlownodeBuilder {
|
||||
heartbeat_task: Option<HeartbeatTask>,
|
||||
/// receive a oneshot sender to send state size report
|
||||
state_report_handler: Option<StateReportHandler>,
|
||||
/// Client to send sql to frontend
|
||||
frontend_client: Arc<FrontendClient>,
|
||||
}
|
||||
|
||||
impl FlownodeBuilder {
|
||||
@@ -281,6 +285,7 @@ impl FlownodeBuilder {
|
||||
table_meta: TableMetadataManagerRef,
|
||||
catalog_manager: CatalogManagerRef,
|
||||
flow_metadata_manager: FlowMetadataManagerRef,
|
||||
frontend_client: Arc<FrontendClient>,
|
||||
) -> Self {
|
||||
Self {
|
||||
opts,
|
||||
@@ -290,6 +295,7 @@ impl FlownodeBuilder {
|
||||
flow_metadata_manager,
|
||||
heartbeat_task: None,
|
||||
state_report_handler: None,
|
||||
frontend_client,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -447,7 +453,10 @@ impl FlownodeBuilder {
|
||||
|
||||
let node_id = self.opts.node_id.map(|id| id as u32);
|
||||
|
||||
let mut man = FlowWorkerManager::new(node_id, query_engine, table_meta);
|
||||
let rule_engine =
|
||||
RecordingRuleEngine::new(self.frontend_client.clone(), query_engine.clone());
|
||||
|
||||
let mut man = FlowWorkerManager::new(node_id, query_engine, table_meta, rule_engine);
|
||||
for worker_id in 0..num_workers {
|
||||
let (tx, rx) = oneshot::channel();
|
||||
|
||||
|
||||
@@ -86,7 +86,8 @@ pub fn create_test_query_engine() -> Arc<dyn QueryEngine> {
|
||||
|
||||
let schema = vec![
|
||||
datatypes::schema::ColumnSchema::new("number", CDT::uint32_datatype(), false),
|
||||
datatypes::schema::ColumnSchema::new("ts", CDT::timestamp_millisecond_datatype(), false),
|
||||
datatypes::schema::ColumnSchema::new("ts", CDT::timestamp_millisecond_datatype(), false)
|
||||
.with_time_index(true),
|
||||
];
|
||||
let mut columns = vec![];
|
||||
let numbers = (1..=10).collect_vec();
|
||||
|
||||
@@ -112,6 +112,7 @@ impl MetaClientBuilder {
|
||||
.enable_store()
|
||||
.enable_heartbeat()
|
||||
.enable_procedure()
|
||||
.enable_access_cluster_info()
|
||||
}
|
||||
|
||||
pub fn enable_heartbeat(self) -> Self {
|
||||
|
||||
@@ -68,6 +68,7 @@ pub struct Inserter {
|
||||
catalog_manager: CatalogManagerRef,
|
||||
partition_manager: PartitionRuleManagerRef,
|
||||
node_manager: NodeManagerRef,
|
||||
#[allow(unused)]
|
||||
table_flownode_set_cache: TableFlownodeSetCacheRef,
|
||||
}
|
||||
|
||||
@@ -335,9 +336,11 @@ impl Inserter {
|
||||
|
||||
let InstantAndNormalInsertRequests {
|
||||
normal_requests,
|
||||
instant_requests,
|
||||
instant_requests: _,
|
||||
} = requests;
|
||||
|
||||
// TODO(discord9): mirror some
|
||||
/*
|
||||
// Mirror requests for source table to flownode asynchronously
|
||||
let flow_mirror_task = FlowMirrorTask::new(
|
||||
&self.table_flownode_set_cache,
|
||||
@@ -347,7 +350,7 @@ impl Inserter {
|
||||
.chain(instant_requests.requests.iter()),
|
||||
)
|
||||
.await?;
|
||||
flow_mirror_task.detach(self.node_manager.clone())?;
|
||||
flow_mirror_task.detach(self.node_manager.clone())?;*/
|
||||
|
||||
// Write requests to datanode and wait for response
|
||||
let write_tasks = self
|
||||
@@ -817,12 +820,14 @@ struct CreateAlterTableResult {
|
||||
table_infos: HashMap<TableId, Arc<TableInfo>>,
|
||||
}
|
||||
|
||||
#[allow(unused)]
|
||||
struct FlowMirrorTask {
|
||||
requests: HashMap<Peer, RegionInsertRequests>,
|
||||
num_rows: usize,
|
||||
}
|
||||
|
||||
impl FlowMirrorTask {
|
||||
#[allow(unused)]
|
||||
async fn new(
|
||||
cache: &TableFlownodeSetCacheRef,
|
||||
requests: impl Iterator<Item = &RegionInsertRequest>,
|
||||
@@ -896,6 +901,7 @@ impl FlowMirrorTask {
|
||||
})
|
||||
}
|
||||
|
||||
#[allow(unused)]
|
||||
fn detach(self, node_manager: NodeManagerRef) -> Result<()> {
|
||||
crate::metrics::DIST_MIRROR_PENDING_ROW_COUNT.add(self.num_rows as i64);
|
||||
for (peer, inserts) in self.requests {
|
||||
|
||||
@@ -40,7 +40,7 @@ use common_procedure::options::ProcedureConfig;
|
||||
use common_procedure::ProcedureManagerRef;
|
||||
use common_wal::config::{DatanodeWalConfig, MetasrvWalConfig};
|
||||
use datanode::datanode::DatanodeBuilder;
|
||||
use flow::FlownodeBuilder;
|
||||
use flow::{FlownodeBuilder, FrontendClient};
|
||||
use frontend::instance::builder::FrontendBuilder;
|
||||
use frontend::instance::{FrontendInstance, Instance, StandaloneDatanodeManager};
|
||||
use meta_srv::metasrv::{FLOW_ID_SEQ, TABLE_ID_SEQ};
|
||||
@@ -164,12 +164,15 @@ impl GreptimeDbStandaloneBuilder {
|
||||
Some(procedure_manager.clone()),
|
||||
);
|
||||
|
||||
let fe_server_addr = opts.frontend_options().grpc.bind_addr.clone();
|
||||
let frontend_client = FrontendClient::from_static_grpc_addr(fe_server_addr);
|
||||
let flow_builder = FlownodeBuilder::new(
|
||||
Default::default(),
|
||||
plugins.clone(),
|
||||
table_metadata_manager.clone(),
|
||||
catalog_manager.clone(),
|
||||
flow_metadata_manager.clone(),
|
||||
Arc::new(frontend_client),
|
||||
);
|
||||
let flownode = Arc::new(flow_builder.build().await.unwrap());
|
||||
|
||||
|
||||
Reference in New Issue
Block a user