mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2025-12-22 22:20:02 +00:00
feat: basic time window aware
This commit is contained in:
1
Cargo.lock
generated
1
Cargo.lock
generated
@@ -4165,6 +4165,7 @@ dependencies = [
|
||||
"bytes",
|
||||
"cache",
|
||||
"catalog",
|
||||
"chrono",
|
||||
"client",
|
||||
"common-base",
|
||||
"common-catalog",
|
||||
|
||||
@@ -16,6 +16,7 @@ async-trait.workspace = true
|
||||
bytes.workspace = true
|
||||
cache.workspace = true
|
||||
catalog.workspace = true
|
||||
chrono.workspace = true
|
||||
client.workspace = true
|
||||
common-base.workspace = true
|
||||
common-config.workspace = true
|
||||
|
||||
@@ -64,7 +64,7 @@ pub(crate) mod refill;
|
||||
mod stat;
|
||||
#[cfg(test)]
|
||||
mod tests;
|
||||
mod util;
|
||||
pub(crate) mod util;
|
||||
mod worker;
|
||||
|
||||
pub(crate) mod node_context;
|
||||
|
||||
@@ -155,8 +155,11 @@ impl Flownode for FlowWorkerManager {
|
||||
|
||||
#[allow(unreachable_code, unused)]
|
||||
async fn handle_inserts(&self, request: InsertRequests) -> Result<FlowResponse> {
|
||||
return Ok(Default::default());
|
||||
|
||||
return self
|
||||
.rule_engine
|
||||
.handle_inserts(request)
|
||||
.await
|
||||
.map_err(to_meta_err(snafu::location!()));
|
||||
// using try_read to ensure two things:
|
||||
// 1. flush wouldn't happen until inserts before it is inserted
|
||||
// 2. inserts happening concurrently with flush wouldn't be block by flush
|
||||
@@ -209,15 +212,15 @@ impl Flownode for FlowWorkerManager {
|
||||
.collect_vec();
|
||||
let table_col_names = table_schema.relation_desc.names;
|
||||
let table_col_names = table_col_names
|
||||
.iter().enumerate()
|
||||
.map(|(idx,name)| match name {
|
||||
Some(name) => Ok(name.clone()),
|
||||
None => InternalSnafu {
|
||||
reason: format!("Expect column {idx} of table id={table_id} to have name in table schema, found None"),
|
||||
}
|
||||
.fail().map_err(BoxedError::new).context(ExternalSnafu),
|
||||
})
|
||||
.collect::<Result<Vec<_>>>()?;
|
||||
.iter().enumerate()
|
||||
.map(|(idx,name)| match name {
|
||||
Some(name) => Ok(name.clone()),
|
||||
None => InternalSnafu {
|
||||
reason: format!("Expect column {idx} of table id={table_id} to have name in table schema, found None"),
|
||||
}
|
||||
.fail().map_err(BoxedError::new).context(ExternalSnafu),
|
||||
})
|
||||
.collect::<Result<Vec<_>>>()?;
|
||||
let name_to_col = HashMap::<_, _>::from_iter(
|
||||
insert_schema
|
||||
.iter()
|
||||
|
||||
@@ -12,6 +12,8 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
//! Some utility functions
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
use api::helper::ColumnDataTypeWrapper;
|
||||
|
||||
@@ -54,6 +54,13 @@ pub enum Error {
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Time error"))]
|
||||
Time {
|
||||
source: common_time::error::Error,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("External error"))]
|
||||
External {
|
||||
source: BoxedError,
|
||||
@@ -249,7 +256,9 @@ impl ErrorExt for Error {
|
||||
| Self::FlowNotFound { .. }
|
||||
| Self::ListFlows { .. } => StatusCode::TableNotFound,
|
||||
Self::Plan { .. } | Self::Datatypes { .. } => StatusCode::PlanQuery,
|
||||
Self::InvalidQuery { .. } | Self::CreateFlow { .. } => StatusCode::EngineExecuteQuery,
|
||||
Self::InvalidQuery { .. } | Self::CreateFlow { .. } | Self::Time { .. } => {
|
||||
StatusCode::EngineExecuteQuery
|
||||
}
|
||||
Self::Unexpected { .. } => StatusCode::Unexpected,
|
||||
Self::NotImplemented { .. } | Self::UnsupportedTemporalFilter { .. } => {
|
||||
StatusCode::Unsupported
|
||||
|
||||
@@ -14,6 +14,7 @@
|
||||
|
||||
//! Send heartbeat from flownode to metasrv
|
||||
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use std::sync::Arc;
|
||||
|
||||
use api::v1::meta::{HeartbeatRequest, Peer};
|
||||
@@ -24,7 +25,7 @@ use common_meta::heartbeat::handler::{
|
||||
use common_meta::heartbeat::mailbox::{HeartbeatMailbox, MailboxRef, OutgoingMessage};
|
||||
use common_meta::heartbeat::utils::outgoing_message_to_mailbox_message;
|
||||
use common_meta::key::flow::flow_state::FlowStat;
|
||||
use common_telemetry::{debug, error, info};
|
||||
use common_telemetry::{debug, error, info, warn};
|
||||
use greptime_proto::v1::meta::NodeInfo;
|
||||
use meta_client::client::{HeartbeatSender, HeartbeatStream, MetaClient};
|
||||
use servers::addrs;
|
||||
@@ -65,6 +66,7 @@ pub struct HeartbeatTask {
|
||||
report_interval: Duration,
|
||||
retry_interval: Duration,
|
||||
resp_handler_executor: HeartbeatResponseHandlerExecutorRef,
|
||||
running: Arc<AtomicBool>,
|
||||
query_stat_size: Option<SizeReportSender>,
|
||||
}
|
||||
|
||||
@@ -87,6 +89,7 @@ impl HeartbeatTask {
|
||||
report_interval: heartbeat_opts.interval,
|
||||
retry_interval: heartbeat_opts.retry_interval,
|
||||
resp_handler_executor,
|
||||
running: Arc::new(AtomicBool::new(false)),
|
||||
query_stat_size: None,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -17,11 +17,14 @@
|
||||
mod engine;
|
||||
mod frontend_client;
|
||||
|
||||
use std::collections::HashSet;
|
||||
use std::collections::{BTreeSet, HashSet};
|
||||
use std::sync::Arc;
|
||||
|
||||
use api::helper::pb_value_to_value_ref;
|
||||
use catalog::CatalogManagerRef;
|
||||
use common_error::ext::BoxedError;
|
||||
use common_recordbatch::DfRecordBatch;
|
||||
use common_telemetry::warn;
|
||||
use common_time::timestamp::TimeUnit;
|
||||
use common_time::Timestamp;
|
||||
use datafusion::error::Result as DfResult;
|
||||
@@ -31,9 +34,10 @@ use datafusion::prelude::SessionContext;
|
||||
use datafusion::sql::unparser::Unparser;
|
||||
use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeRecursion, TreeNodeRewriter};
|
||||
use datafusion_common::{Column, DFSchema, TableReference};
|
||||
use datafusion_expr::LogicalPlan;
|
||||
use datafusion_expr::{ColumnarValue, LogicalPlan};
|
||||
use datafusion_physical_expr::PhysicalExprRef;
|
||||
use datatypes::prelude::{ConcreteDataType, DataType};
|
||||
use datatypes::scalars::ScalarVector;
|
||||
use datatypes::value::Value;
|
||||
use datatypes::vectors::{
|
||||
TimestampMicrosecondVector, TimestampMillisecondVector, TimestampNanosecondVector,
|
||||
@@ -41,15 +45,192 @@ use datatypes::vectors::{
|
||||
};
|
||||
pub use engine::RecordingRuleEngine;
|
||||
pub use frontend_client::FrontendClient;
|
||||
use itertools::Itertools;
|
||||
use query::parser::QueryLanguageParser;
|
||||
use query::QueryEngineRef;
|
||||
use session::context::QueryContextRef;
|
||||
use snafu::{ensure, OptionExt, ResultExt};
|
||||
|
||||
use crate::adapter::util::from_proto_to_data_type;
|
||||
use crate::df_optimizer::apply_df_optimizer;
|
||||
use crate::error::{ArrowSnafu, DatafusionSnafu, DatatypesSnafu, ExternalSnafu, UnexpectedSnafu};
|
||||
use crate::expr::error::DataTypeSnafu;
|
||||
use crate::Error;
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct TimeWindowExpr {
|
||||
expr: PhysicalExprRef,
|
||||
column_name: String,
|
||||
}
|
||||
|
||||
impl TimeWindowExpr {
|
||||
pub fn new(expr: PhysicalExprRef, column_name: String) -> Self {
|
||||
Self { expr, column_name }
|
||||
}
|
||||
|
||||
pub fn from_expr(expr: &Expr, column_name: &str, df_schema: &DFSchema) -> Result<Self, Error> {
|
||||
let phy_planner = DefaultPhysicalPlanner::default();
|
||||
|
||||
let phy_expr: PhysicalExprRef = phy_planner
|
||||
.create_physical_expr(expr, df_schema, &SessionContext::new().state())
|
||||
.with_context(|_e| DatafusionSnafu {
|
||||
context: format!(
|
||||
"Failed to create physical expression from {expr:?} using {df_schema:?}"
|
||||
),
|
||||
})?;
|
||||
Ok(Self {
|
||||
expr: phy_expr,
|
||||
column_name: column_name.to_string(),
|
||||
})
|
||||
}
|
||||
|
||||
/// Find timestamps from rows using time window expr
|
||||
pub async fn handle_rows(
|
||||
&self,
|
||||
rows_list: Vec<api::v1::Rows>,
|
||||
) -> Result<BTreeSet<Timestamp>, Error> {
|
||||
let mut time_windows = BTreeSet::new();
|
||||
|
||||
for rows in rows_list {
|
||||
// pick the time index column and use it to eval on `self.expr`
|
||||
let ts_col_index = rows
|
||||
.schema
|
||||
.iter()
|
||||
.map(|col| col.column_name.clone())
|
||||
.position(|name| name == self.column_name);
|
||||
let Some(ts_col_index) = ts_col_index else {
|
||||
warn!("can't found time index column in schema: {:?}", rows.schema);
|
||||
continue;
|
||||
};
|
||||
let col_schema = &rows.schema[ts_col_index];
|
||||
let cdt = from_proto_to_data_type(col_schema)?;
|
||||
|
||||
let column_values = rows
|
||||
.rows
|
||||
.iter()
|
||||
.map(|row| &row.values[ts_col_index])
|
||||
.collect_vec();
|
||||
|
||||
let mut vector = cdt.create_mutable_vector(column_values.len());
|
||||
for value in column_values {
|
||||
let value = pb_value_to_value_ref(value, &None);
|
||||
vector.try_push_value_ref(value).context(DataTypeSnafu {
|
||||
msg: "Failed to convert rows to columns",
|
||||
})?;
|
||||
}
|
||||
let vector = vector.to_vector();
|
||||
|
||||
let df_schema = create_df_schema_for_ts_column(&self.column_name, cdt)?;
|
||||
|
||||
let rb =
|
||||
DfRecordBatch::try_new(df_schema.inner().clone(), vec![vector.to_arrow_array()])
|
||||
.with_context(|_e| ArrowSnafu {
|
||||
context: format!(
|
||||
"Failed to create record batch from {df_schema:?} and {vector:?}"
|
||||
),
|
||||
})?;
|
||||
|
||||
let eval_res = self.expr.evaluate(&rb).with_context(|_| DatafusionSnafu {
|
||||
context: format!(
|
||||
"Failed to evaluate physical expression {:?} on {rb:?}",
|
||||
self.expr
|
||||
),
|
||||
})?;
|
||||
|
||||
let res = columnar_to_ts_vector(&eval_res)?;
|
||||
|
||||
for ts in res.into_iter().flatten() {
|
||||
time_windows.insert(ts);
|
||||
}
|
||||
}
|
||||
|
||||
Ok(time_windows)
|
||||
}
|
||||
}
|
||||
|
||||
fn create_df_schema_for_ts_column(name: &str, cdt: ConcreteDataType) -> Result<DFSchema, Error> {
|
||||
let arrow_schema = Arc::new(arrow_schema::Schema::new(vec![arrow_schema::Field::new(
|
||||
name,
|
||||
cdt.as_arrow_type(),
|
||||
false,
|
||||
)]));
|
||||
|
||||
let df_schema = DFSchema::from_field_specific_qualified_schema(
|
||||
vec![Some(TableReference::bare("TimeIndexOnlyTable"))],
|
||||
&arrow_schema,
|
||||
)
|
||||
.with_context(|_e| DatafusionSnafu {
|
||||
context: format!("Failed to create DFSchema from arrow schema {arrow_schema:?}"),
|
||||
})?;
|
||||
|
||||
Ok(df_schema)
|
||||
}
|
||||
|
||||
/// Convert `ColumnarValue` to `Vec<Option<Timestamp>>`
|
||||
fn columnar_to_ts_vector(columnar: &ColumnarValue) -> Result<Vec<Option<Timestamp>>, Error> {
|
||||
let val = match columnar {
|
||||
datafusion_expr::ColumnarValue::Array(array) => {
|
||||
let ty = array.data_type();
|
||||
let ty = ConcreteDataType::from_arrow_type(ty);
|
||||
let time_unit = if let ConcreteDataType::Timestamp(ty) = ty {
|
||||
ty.unit()
|
||||
} else {
|
||||
return UnexpectedSnafu {
|
||||
reason: format!("Non-timestamp type: {ty:?}"),
|
||||
}
|
||||
.fail();
|
||||
};
|
||||
|
||||
match time_unit {
|
||||
TimeUnit::Second => TimestampSecondVector::try_from_arrow_array(array.clone())
|
||||
.with_context(|_| DatatypesSnafu {
|
||||
extra: format!("Failed to create vector from arrow array {array:?}"),
|
||||
})?
|
||||
.iter_data()
|
||||
.map(|d| d.map(|d| d.0))
|
||||
.collect_vec(),
|
||||
TimeUnit::Millisecond => {
|
||||
TimestampMillisecondVector::try_from_arrow_array(array.clone())
|
||||
.with_context(|_| DatatypesSnafu {
|
||||
extra: format!("Failed to create vector from arrow array {array:?}"),
|
||||
})?
|
||||
.iter_data()
|
||||
.map(|d| d.map(|d| d.0))
|
||||
.collect_vec()
|
||||
}
|
||||
TimeUnit::Microsecond => {
|
||||
TimestampMicrosecondVector::try_from_arrow_array(array.clone())
|
||||
.with_context(|_| DatatypesSnafu {
|
||||
extra: format!("Failed to create vector from arrow array {array:?}"),
|
||||
})?
|
||||
.iter_data()
|
||||
.map(|d| d.map(|d| d.0))
|
||||
.collect_vec()
|
||||
}
|
||||
TimeUnit::Nanosecond => {
|
||||
TimestampNanosecondVector::try_from_arrow_array(array.clone())
|
||||
.with_context(|_| DatatypesSnafu {
|
||||
extra: format!("Failed to create vector from arrow array {array:?}"),
|
||||
})?
|
||||
.iter_data()
|
||||
.map(|d| d.map(|d| d.0))
|
||||
.collect_vec()
|
||||
}
|
||||
}
|
||||
}
|
||||
datafusion_expr::ColumnarValue::Scalar(scalar) => {
|
||||
let value = Value::try_from(scalar.clone()).with_context(|_| DatatypesSnafu {
|
||||
extra: format!("Failed to convert scalar {scalar:?} to value"),
|
||||
})?;
|
||||
let ts = value.as_timestamp().context(UnexpectedSnafu {
|
||||
reason: format!("Expect Timestamp, found {:?}", value),
|
||||
})?;
|
||||
vec![Some(ts)]
|
||||
}
|
||||
};
|
||||
Ok(val)
|
||||
}
|
||||
|
||||
/// Convert sql to datafusion logical plan
|
||||
pub async fn sql_to_df_plan(
|
||||
query_ctx: QueryContextRef,
|
||||
@@ -74,25 +255,13 @@ pub async fn sql_to_df_plan(
|
||||
Ok(plan)
|
||||
}
|
||||
|
||||
/// Find nearest lower bound for time `current` in given `plan` for the time window expr.
|
||||
/// i.e. for time window expr being `date_bin(INTERVAL '5 minutes', ts) as time_window` and `current="2021-07-01 00:01:01.000"`,
|
||||
/// return `Some("2021-07-01 00:00:00.000")`
|
||||
/// if `plan` doesn't contain a `TIME INDEX` column, return `None`
|
||||
///
|
||||
/// Time window expr is a expr that:
|
||||
/// 1. ref only to a time index column
|
||||
/// 2. is monotonic increasing
|
||||
/// 3. show up in GROUP BY clause
|
||||
///
|
||||
/// note this plan should only contain one TableScan
|
||||
pub async fn find_plan_time_window_bound(
|
||||
/// Return (the column name of time index column, the time window expr, the expected time unit of time index column, the expr's schema for evaluating the time window)
|
||||
async fn find_time_window_expr(
|
||||
plan: &LogicalPlan,
|
||||
current: Timestamp,
|
||||
catalog_man: CatalogManagerRef,
|
||||
query_ctx: QueryContextRef,
|
||||
engine: QueryEngineRef,
|
||||
) -> Result<(String, Option<Timestamp>, Option<Timestamp>), Error> {
|
||||
) -> Result<(String, Option<datafusion_expr::Expr>, TimeUnit, DFSchema), Error> {
|
||||
// TODO(discord9): find the expr that do time window
|
||||
let catalog_man = engine.engine_state().catalog_manager();
|
||||
|
||||
let mut table_name = None;
|
||||
// first find the table source in the logical plan
|
||||
@@ -199,6 +368,31 @@ pub async fn find_plan_time_window_bound(
|
||||
.with_context(|_e| DatafusionSnafu {
|
||||
context: format!("Failed to create DFSchema from arrow schema {arrow_schema:?}"),
|
||||
})?;
|
||||
Ok((ts_col_name, time_window_expr, expected_time_unit, df_schema))
|
||||
}
|
||||
|
||||
/// Find nearest lower bound for time `current` in given `plan` for the time window expr.
|
||||
/// i.e. for time window expr being `date_bin(INTERVAL '5 minutes', ts) as time_window` and `current="2021-07-01 00:01:01.000"`,
|
||||
/// return `Some("2021-07-01 00:00:00.000")`
|
||||
/// if `plan` doesn't contain a `TIME INDEX` column, return `None`
|
||||
///
|
||||
/// Time window expr is a expr that:
|
||||
/// 1. ref only to a time index column
|
||||
/// 2. is monotonic increasing
|
||||
/// 3. show up in GROUP BY clause
|
||||
///
|
||||
/// note this plan should only contain one TableScan
|
||||
pub async fn find_plan_time_window_bound(
|
||||
plan: &LogicalPlan,
|
||||
current: Timestamp,
|
||||
query_ctx: QueryContextRef,
|
||||
engine: QueryEngineRef,
|
||||
) -> Result<(String, Option<Timestamp>, Option<Timestamp>), Error> {
|
||||
// TODO(discord9): find the expr that do time window
|
||||
let catalog_man = engine.engine_state().catalog_manager();
|
||||
|
||||
let (ts_col_name, time_window_expr, expected_time_unit, df_schema) =
|
||||
find_time_window_expr(plan, catalog_man.clone(), query_ctx).await?;
|
||||
|
||||
// cast current to ts_index's type
|
||||
let new_current = current
|
||||
@@ -461,59 +655,14 @@ fn eval_ts_to_ts(
|
||||
context: format!("Failed to evaluate physical expression {phy:?} on {rb:?}"),
|
||||
})?;
|
||||
|
||||
let val = match eval_res {
|
||||
datafusion_expr::ColumnarValue::Array(array) => {
|
||||
let ty = array.data_type();
|
||||
let ty = ConcreteDataType::from_arrow_type(ty);
|
||||
let time_unit = if let ConcreteDataType::Timestamp(ty) = ty {
|
||||
ty.unit()
|
||||
} else {
|
||||
return UnexpectedSnafu {
|
||||
reason: format!("Physical expression {phy:?} evaluated to non-timestamp type"),
|
||||
}
|
||||
.fail();
|
||||
};
|
||||
|
||||
match time_unit {
|
||||
TimeUnit::Second => TimestampSecondVector::try_from_arrow_array(array.clone())
|
||||
.with_context(|_| DatatypesSnafu {
|
||||
extra: format!("Failed to create vector from arrow array {array:?}"),
|
||||
})?
|
||||
.get(0),
|
||||
TimeUnit::Millisecond => {
|
||||
TimestampMillisecondVector::try_from_arrow_array(array.clone())
|
||||
.with_context(|_| DatatypesSnafu {
|
||||
extra: format!("Failed to create vector from arrow array {array:?}"),
|
||||
})?
|
||||
.get(0)
|
||||
}
|
||||
TimeUnit::Microsecond => {
|
||||
TimestampMicrosecondVector::try_from_arrow_array(array.clone())
|
||||
.with_context(|_| DatatypesSnafu {
|
||||
extra: format!("Failed to create vector from arrow array {array:?}"),
|
||||
})?
|
||||
.get(0)
|
||||
}
|
||||
TimeUnit::Nanosecond => {
|
||||
TimestampNanosecondVector::try_from_arrow_array(array.clone())
|
||||
.with_context(|_| DatatypesSnafu {
|
||||
extra: format!("Failed to create vector from arrow array {array:?}"),
|
||||
})?
|
||||
.get(0)
|
||||
}
|
||||
}
|
||||
}
|
||||
datafusion_expr::ColumnarValue::Scalar(scalar) => Value::try_from(scalar.clone())
|
||||
.with_context(|_| DatatypesSnafu {
|
||||
extra: format!("Failed to convert scalar {scalar:?} to value"),
|
||||
})?,
|
||||
};
|
||||
|
||||
if let Value::Timestamp(ts) = val {
|
||||
Ok(ts)
|
||||
if let Some(Some(ts)) = columnar_to_ts_vector(&eval_res)?.first() {
|
||||
Ok(*ts)
|
||||
} else {
|
||||
UnexpectedSnafu {
|
||||
reason: format!("Expected timestamp in expression {phy:?} but got {val:?}"),
|
||||
reason: format!(
|
||||
"Expected timestamp in expression {phy:?} but got {:?}",
|
||||
eval_res
|
||||
),
|
||||
}
|
||||
.fail()?
|
||||
}
|
||||
|
||||
@@ -12,29 +12,40 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::collections::BTreeMap;
|
||||
use std::collections::{BTreeMap, HashMap, HashSet};
|
||||
use std::sync::Arc;
|
||||
use std::time::{Duration, SystemTime, UNIX_EPOCH};
|
||||
|
||||
use api::v1::flow::FlowResponse;
|
||||
use common_error::ext::BoxedError;
|
||||
use common_meta::ddl::create_flow::FlowType;
|
||||
use common_meta::key::flow::FlowMetadataManagerRef;
|
||||
use common_meta::key::table_info::TableInfoManager;
|
||||
use common_meta::key::TableMetadataManagerRef;
|
||||
use common_telemetry::tracing::warn;
|
||||
use common_telemetry::{debug, info};
|
||||
use common_time::Timestamp;
|
||||
use datafusion::sql::unparser::expr_to_sql;
|
||||
use datafusion_common::tree_node::TreeNode;
|
||||
use datatypes::value::Value;
|
||||
use query::QueryEngineRef;
|
||||
use session::context::QueryContextRef;
|
||||
use snafu::{ensure, ResultExt};
|
||||
use snafu::{ensure, OptionExt, ResultExt};
|
||||
use store_api::storage::RegionId;
|
||||
use table::metadata::TableId;
|
||||
use tokio::sync::oneshot::error::TryRecvError;
|
||||
use tokio::sync::{oneshot, RwLock};
|
||||
use tokio::time::Instant;
|
||||
|
||||
use super::frontend_client::FrontendClient;
|
||||
use super::{df_plan_to_sql, AddFilterRewriter};
|
||||
use crate::adapter::{CreateFlowArgs, FlowId};
|
||||
use crate::error::{DatafusionSnafu, DatatypesSnafu, FlowAlreadyExistSnafu, UnexpectedSnafu};
|
||||
use super::{df_plan_to_sql, AddFilterRewriter, TimeWindowExpr};
|
||||
use crate::adapter::{CreateFlowArgs, FlowId, TableName};
|
||||
use crate::error::{
|
||||
DatafusionSnafu, DatatypesSnafu, ExternalSnafu, FlowAlreadyExistSnafu, InternalSnafu,
|
||||
TimeSnafu, UnexpectedSnafu,
|
||||
};
|
||||
use crate::metrics::{METRIC_FLOW_RULE_ENGINE_QUERY_TIME, METRIC_FLOW_RULE_ENGINE_SLOW_QUERY};
|
||||
use crate::recording_rules::{find_plan_time_window_bound, sql_to_df_plan};
|
||||
use crate::recording_rules::{find_plan_time_window_bound, find_time_window_expr, sql_to_df_plan};
|
||||
use crate::Error;
|
||||
|
||||
/// TODO(discord9): make those constants configurable
|
||||
@@ -49,18 +60,74 @@ pub struct RecordingRuleEngine {
|
||||
tasks: RwLock<BTreeMap<FlowId, RecordingRuleTask>>,
|
||||
shutdown_txs: RwLock<BTreeMap<FlowId, oneshot::Sender<()>>>,
|
||||
frontend_client: Arc<FrontendClient>,
|
||||
flow_metadata_manager: FlowMetadataManagerRef,
|
||||
table_meta: TableMetadataManagerRef,
|
||||
engine: QueryEngineRef,
|
||||
}
|
||||
|
||||
impl RecordingRuleEngine {
|
||||
pub fn new(frontend_client: Arc<FrontendClient>, engine: QueryEngineRef) -> Self {
|
||||
pub fn new(
|
||||
frontend_client: Arc<FrontendClient>,
|
||||
engine: QueryEngineRef,
|
||||
flow_metadata_manager: FlowMetadataManagerRef,
|
||||
table_meta: TableMetadataManagerRef,
|
||||
) -> Self {
|
||||
Self {
|
||||
tasks: Default::default(),
|
||||
shutdown_txs: Default::default(),
|
||||
frontend_client,
|
||||
flow_metadata_manager,
|
||||
table_meta,
|
||||
engine,
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn handle_inserts(
|
||||
&self,
|
||||
request: api::v1::region::InsertRequests,
|
||||
) -> Result<FlowResponse, Error> {
|
||||
let table_info_mgr = self.table_meta.table_info_manager();
|
||||
let mut group_by_table_name: HashMap<TableName, Vec<api::v1::Rows>> = HashMap::new();
|
||||
for r in request.requests {
|
||||
let tid = RegionId::from(r.region_id).table_id();
|
||||
let name = get_table_name(table_info_mgr, &tid).await?;
|
||||
let entry = group_by_table_name.entry(name).or_default();
|
||||
if let Some(rows) = r.rows {
|
||||
entry.push(rows);
|
||||
}
|
||||
}
|
||||
|
||||
for (_flow_id, task) in self.tasks.read().await.iter() {
|
||||
let src_table_names = &task.source_table_names;
|
||||
|
||||
for src_table_name in src_table_names {
|
||||
if let Some(entry) = group_by_table_name.get(src_table_name) {
|
||||
let Some(expr) = &task.time_window_expr else {
|
||||
continue;
|
||||
};
|
||||
let involved_time_windows = expr.handle_rows(entry.clone()).await?;
|
||||
let mut state = task.state.write().await;
|
||||
state
|
||||
.dirty_time_windows
|
||||
.add_lower_bounds(involved_time_windows.into_iter());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(Default::default())
|
||||
}
|
||||
}
|
||||
|
||||
async fn get_table_name(zelf: &TableInfoManager, table_id: &TableId) -> Result<TableName, Error> {
|
||||
zelf.get(*table_id)
|
||||
.await
|
||||
.map_err(BoxedError::new)
|
||||
.context(ExternalSnafu)?
|
||||
.with_context(|| UnexpectedSnafu {
|
||||
reason: format!("Table id = {:?}, couldn't found table name", table_id),
|
||||
})
|
||||
.map(|name| name.table_name())
|
||||
.map(|name| [name.catalog_name, name.schema_name, name.table_name])
|
||||
}
|
||||
|
||||
const MIN_REFRESH_DURATION: Duration = Duration::new(5, 0);
|
||||
@@ -70,7 +137,7 @@ impl RecordingRuleEngine {
|
||||
let CreateFlowArgs {
|
||||
flow_id,
|
||||
sink_table_name,
|
||||
source_table_ids: _,
|
||||
source_table_ids,
|
||||
create_if_not_exists,
|
||||
or_replace,
|
||||
expire_after,
|
||||
@@ -115,14 +182,46 @@ impl RecordingRuleEngine {
|
||||
}
|
||||
.fail()?
|
||||
};
|
||||
let query_ctx = Arc::new(query_ctx);
|
||||
let mut source_table_names = Vec::new();
|
||||
for src_id in source_table_ids {
|
||||
let table_name = self
|
||||
.table_meta
|
||||
.table_info_manager()
|
||||
.get(src_id)
|
||||
.await
|
||||
.map_err(BoxedError::new)
|
||||
.context(ExternalSnafu)?
|
||||
.with_context(|| UnexpectedSnafu {
|
||||
reason: format!("Table id = {:?}, couldn't found table name", src_id),
|
||||
})
|
||||
.map(|name| name.table_name())
|
||||
.map(|name| [name.catalog_name, name.schema_name, name.table_name])?;
|
||||
source_table_names.push(table_name);
|
||||
}
|
||||
|
||||
let (tx, rx) = oneshot::channel();
|
||||
|
||||
let plan = sql_to_df_plan(query_ctx.clone(), self.engine.clone(), &sql, true).await?;
|
||||
let (column_name, time_window_expr, _, df_schema) = find_time_window_expr(
|
||||
&plan,
|
||||
self.engine.engine_state().catalog_manager().clone(),
|
||||
query_ctx.clone(),
|
||||
)
|
||||
.await?;
|
||||
|
||||
let phy_expr = time_window_expr
|
||||
.map(|expr| TimeWindowExpr::from_expr(&expr, &column_name, &df_schema))
|
||||
.transpose()?;
|
||||
|
||||
let task = RecordingRuleTask::new(
|
||||
flow_id,
|
||||
&sql,
|
||||
phy_expr,
|
||||
expire_after,
|
||||
sink_table_name,
|
||||
Arc::new(query_ctx),
|
||||
source_table_names,
|
||||
query_ctx,
|
||||
rx,
|
||||
);
|
||||
|
||||
@@ -171,26 +270,33 @@ impl RecordingRuleEngine {
|
||||
pub struct RecordingRuleTask {
|
||||
flow_id: FlowId,
|
||||
query: String,
|
||||
time_window_expr: Option<TimeWindowExpr>,
|
||||
/// in seconds
|
||||
expire_after: Option<i64>,
|
||||
sink_table_name: [String; 3],
|
||||
source_table_names: HashSet<[String; 3]>,
|
||||
state: Arc<RwLock<RecordingRuleState>>,
|
||||
}
|
||||
|
||||
impl RecordingRuleTask {
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub fn new(
|
||||
flow_id: FlowId,
|
||||
query: &str,
|
||||
time_window_expr: Option<TimeWindowExpr>,
|
||||
expire_after: Option<i64>,
|
||||
sink_table_name: [String; 3],
|
||||
source_table_names: Vec<[String; 3]>,
|
||||
query_ctx: QueryContextRef,
|
||||
shutdown_rx: oneshot::Receiver<()>,
|
||||
) -> Self {
|
||||
Self {
|
||||
flow_id,
|
||||
query: query.to_string(),
|
||||
time_window_expr,
|
||||
expire_after,
|
||||
sink_table_name,
|
||||
source_table_names: source_table_names.into_iter().collect(),
|
||||
state: Arc::new(RwLock::new(RecordingRuleState::new(query_ctx, shutdown_rx))),
|
||||
}
|
||||
}
|
||||
@@ -207,17 +313,20 @@ impl RecordingRuleTask {
|
||||
|
||||
loop {
|
||||
// FIXME(discord9): test if need upper bound also works
|
||||
let new_query = self
|
||||
.gen_query_with_time_window(engine.clone(), false)
|
||||
.await?;
|
||||
let new_query = self.gen_query_with_time_window(engine.clone()).await?;
|
||||
|
||||
let insert_into = format!(
|
||||
"INSERT INTO {}.{}.{} {}",
|
||||
self.sink_table_name[0],
|
||||
self.sink_table_name[1],
|
||||
self.sink_table_name[2],
|
||||
new_query
|
||||
);
|
||||
let insert_into = if let Some(new_query) = new_query {
|
||||
format!(
|
||||
"INSERT INTO {}.{}.{} {}",
|
||||
self.sink_table_name[0],
|
||||
self.sink_table_name[1],
|
||||
self.sink_table_name[2],
|
||||
new_query
|
||||
)
|
||||
} else {
|
||||
tokio::time::sleep(MIN_REFRESH_DURATION).await;
|
||||
continue;
|
||||
};
|
||||
|
||||
if is_first {
|
||||
is_first = false;
|
||||
@@ -284,11 +393,11 @@ impl RecordingRuleTask {
|
||||
}
|
||||
}
|
||||
|
||||
/// will merge and use the first ten time window in query
|
||||
async fn gen_query_with_time_window(
|
||||
&self,
|
||||
engine: QueryEngineRef,
|
||||
need_upper_bound: bool,
|
||||
) -> Result<String, Error> {
|
||||
) -> Result<Option<String>, Error> {
|
||||
let query_ctx = self.state.read().await.query_ctx.clone();
|
||||
let start = SystemTime::now();
|
||||
let since_the_epoch = start
|
||||
@@ -299,48 +408,55 @@ impl RecordingRuleTask {
|
||||
.map(|e| since_the_epoch.as_secs() - e as u64);
|
||||
|
||||
let Some(low_bound) = low_bound else {
|
||||
return Ok(self.query.clone());
|
||||
return Ok(Some(self.query.clone()));
|
||||
};
|
||||
|
||||
let low_bound = Timestamp::new_second(low_bound as i64);
|
||||
|
||||
let plan = sql_to_df_plan(query_ctx.clone(), engine.clone(), &self.query, true).await?;
|
||||
|
||||
// TODO(discord9): find more time window!
|
||||
let (col_name, lower, upper) =
|
||||
find_plan_time_window_bound(&plan, low_bound, query_ctx.clone(), engine.clone())
|
||||
.await?;
|
||||
|
||||
let new_sql = {
|
||||
let to_df_literal = |value| -> Result<_, Error> {
|
||||
let value = Value::from(value);
|
||||
let value = value
|
||||
.try_to_scalar_value(&value.data_type())
|
||||
.with_context(|_| DatatypesSnafu {
|
||||
extra: format!("Failed to convert to scalar value: {}", value),
|
||||
})?;
|
||||
Ok(value)
|
||||
};
|
||||
let lower = lower.map(to_df_literal).transpose()?;
|
||||
let upper = upper.map(to_df_literal).transpose()?.and_then(|u| {
|
||||
if need_upper_bound {
|
||||
Some(u)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
});
|
||||
let expr = {
|
||||
use datafusion_expr::{col, lit};
|
||||
match (lower, upper) {
|
||||
(Some(l), Some(u)) => col(&col_name)
|
||||
.gt_eq(lit(l))
|
||||
.and(col(&col_name).lt_eq(lit(u))),
|
||||
(Some(l), None) => col(&col_name).gt_eq(lit(l)),
|
||||
(None, Some(u)) => col(&col_name).lt(lit(u)),
|
||||
// no time window, direct return
|
||||
(None, None) => return Ok(self.query.clone()),
|
||||
(Some(l), Some(u)) => {
|
||||
let window_size = u.sub(&l).with_context(|| UnexpectedSnafu {
|
||||
reason: format!("Can't get window size from {u:?} - {l:?}"),
|
||||
})?;
|
||||
|
||||
self.state
|
||||
.write()
|
||||
.await
|
||||
.dirty_time_windows
|
||||
.gen_filter_exprs(&col_name, lower, window_size)?
|
||||
}
|
||||
_ => UnexpectedSnafu {
|
||||
reason: format!("Can't get window size: lower={lower:?}, upper={upper:?}"),
|
||||
}
|
||||
.fail()?,
|
||||
}
|
||||
};
|
||||
|
||||
debug!(
|
||||
"Flow id={:?}, Generated filter expr: {:?}",
|
||||
self.flow_id,
|
||||
expr.as_ref()
|
||||
.map(|expr| expr_to_sql(expr).with_context(|_| DatafusionSnafu {
|
||||
context: format!("Failed to generate filter expr from {expr:?}"),
|
||||
}))
|
||||
.transpose()?
|
||||
.map(|s| s.to_string())
|
||||
);
|
||||
|
||||
let Some(expr) = expr else {
|
||||
// no new data, hence no need to update
|
||||
return Ok(None);
|
||||
};
|
||||
|
||||
let mut add_filter = AddFilterRewriter::new(expr);
|
||||
// make a not optimized plan for clearer unparse
|
||||
let plan =
|
||||
@@ -355,7 +471,7 @@ impl RecordingRuleTask {
|
||||
df_plan_to_sql(&plan)?
|
||||
};
|
||||
|
||||
Ok(new_sql)
|
||||
Ok(Some(new_sql))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -366,16 +482,150 @@ pub struct RecordingRuleState {
|
||||
last_update_time: Instant,
|
||||
/// last time query duration
|
||||
last_query_duration: Duration,
|
||||
/// Dirty Time windows need to be updated
|
||||
/// mapping of `start -> end` and non-overlapping
|
||||
dirty_time_windows: DirtyTimeWindows,
|
||||
exec_state: ExecState,
|
||||
shutdown_rx: oneshot::Receiver<()>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Default)]
|
||||
pub struct DirtyTimeWindows {
|
||||
windows: BTreeMap<Timestamp, Option<Timestamp>>,
|
||||
}
|
||||
|
||||
fn to_df_literal(value: Timestamp) -> Result<datafusion_common::ScalarValue, Error> {
|
||||
let value = Value::from(value);
|
||||
let value = value
|
||||
.try_to_scalar_value(&value.data_type())
|
||||
.with_context(|_| DatatypesSnafu {
|
||||
extra: format!("Failed to convert to scalar value: {}", value),
|
||||
})?;
|
||||
Ok(value)
|
||||
}
|
||||
|
||||
impl DirtyTimeWindows {
|
||||
/// Time window merge distance
|
||||
const MERGE_DIST: i32 = 3;
|
||||
/// Add lower bounds to the dirty time windows. Upper bounds are ignored.
|
||||
///
|
||||
/// # Arguments
|
||||
///
|
||||
/// * `lower_bounds` - An iterator of lower bounds to be added.
|
||||
pub fn add_lower_bounds(&mut self, lower_bounds: impl Iterator<Item = Timestamp>) {
|
||||
for lower_bound in lower_bounds {
|
||||
let entry = self.windows.entry(lower_bound);
|
||||
entry.or_insert(None);
|
||||
}
|
||||
}
|
||||
|
||||
/// Generate all filter expressions consuming all time windows
|
||||
pub fn gen_filter_exprs(
|
||||
&mut self,
|
||||
col_name: &str,
|
||||
expire_lower_bound: Option<Timestamp>,
|
||||
window_size: chrono::Duration,
|
||||
) -> Result<Option<datafusion_expr::Expr>, Error> {
|
||||
debug!(
|
||||
"expire_lower_bound: {:?}, window_size: {:?}",
|
||||
expire_lower_bound.map(|t| t.to_iso8601_string()),
|
||||
window_size
|
||||
);
|
||||
self.merge_dirty_time_windows(window_size)?;
|
||||
let mut expr_lst = vec![];
|
||||
for (start, end) in std::mem::take(&mut self.windows).into_iter() {
|
||||
debug!(
|
||||
"Time window start: {:?}, end: {:?}",
|
||||
start.to_iso8601_string(),
|
||||
end.map(|t| t.to_iso8601_string())
|
||||
);
|
||||
let start = if let Some(expire) = expire_lower_bound {
|
||||
start.max(expire)
|
||||
} else {
|
||||
start
|
||||
};
|
||||
|
||||
// filter out expired time window
|
||||
if let Some(end) = end {
|
||||
if end <= start {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
use datafusion_expr::{col, lit};
|
||||
let lower = to_df_literal(start)?;
|
||||
let upper = end.map(to_df_literal).transpose()?;
|
||||
let expr = if let Some(upper) = upper {
|
||||
col(col_name)
|
||||
.gt_eq(lit(lower))
|
||||
.and(col(col_name).lt(lit(upper)))
|
||||
} else {
|
||||
col(col_name).gt_eq(lit(lower))
|
||||
};
|
||||
expr_lst.push(expr);
|
||||
}
|
||||
let expr = expr_lst.into_iter().reduce(|a, b| a.and(b));
|
||||
Ok(expr)
|
||||
}
|
||||
|
||||
/// Merge time windows that overlaps or get too close
|
||||
pub fn merge_dirty_time_windows(&mut self, window_size: chrono::Duration) -> Result<(), Error> {
|
||||
let mut new_windows = BTreeMap::new();
|
||||
|
||||
let mut prev_tw = None;
|
||||
for (lower_bound, upper_bound) in std::mem::take(&mut self.windows) {
|
||||
let Some(prev_tw) = &mut prev_tw else {
|
||||
prev_tw = Some((lower_bound, upper_bound));
|
||||
continue;
|
||||
};
|
||||
|
||||
let std_window_size = window_size.to_std().map_err(|e| {
|
||||
InternalSnafu {
|
||||
reason: e.to_string(),
|
||||
}
|
||||
.build()
|
||||
})?;
|
||||
|
||||
// if cur.lower - prev.upper <= window_size * 2, merge
|
||||
let prev_upper = prev_tw
|
||||
.1
|
||||
.unwrap_or(prev_tw.0.add_duration(std_window_size).context(TimeSnafu)?);
|
||||
|
||||
let cur_upper = upper_bound.unwrap_or(
|
||||
lower_bound
|
||||
.add_duration(std_window_size)
|
||||
.context(TimeSnafu)?,
|
||||
);
|
||||
|
||||
if lower_bound
|
||||
.sub(&prev_upper)
|
||||
.map(|dist| dist <= window_size * Self::MERGE_DIST)
|
||||
.unwrap_or(false)
|
||||
{
|
||||
prev_tw.1 = Some(cur_upper);
|
||||
} else {
|
||||
new_windows.insert(prev_tw.0, prev_tw.1);
|
||||
*prev_tw = (lower_bound, Some(cur_upper));
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(prev_tw) = prev_tw {
|
||||
new_windows.insert(prev_tw.0, prev_tw.1);
|
||||
}
|
||||
|
||||
self.windows = new_windows;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl RecordingRuleState {
|
||||
pub fn new(query_ctx: QueryContextRef, shutdown_rx: oneshot::Receiver<()>) -> Self {
|
||||
Self {
|
||||
query_ctx,
|
||||
last_update_time: Instant::now(),
|
||||
last_query_duration: Duration::from_secs(0),
|
||||
dirty_time_windows: Default::default(),
|
||||
exec_state: ExecState::Idle,
|
||||
shutdown_rx,
|
||||
}
|
||||
|
||||
@@ -453,8 +453,12 @@ impl FlownodeBuilder {
|
||||
|
||||
let node_id = self.opts.node_id.map(|id| id as u32);
|
||||
|
||||
let rule_engine =
|
||||
RecordingRuleEngine::new(self.frontend_client.clone(), query_engine.clone());
|
||||
let rule_engine = RecordingRuleEngine::new(
|
||||
self.frontend_client.clone(),
|
||||
query_engine.clone(),
|
||||
self.flow_metadata_manager.clone(),
|
||||
table_meta.clone(),
|
||||
);
|
||||
|
||||
let mut man = FlowWorkerManager::new(node_id, query_engine, table_meta, rule_engine);
|
||||
for worker_id in 0..num_workers {
|
||||
|
||||
@@ -336,11 +336,11 @@ impl Inserter {
|
||||
|
||||
let InstantAndNormalInsertRequests {
|
||||
normal_requests,
|
||||
instant_requests: _,
|
||||
instant_requests,
|
||||
} = requests;
|
||||
|
||||
// TODO(discord9): mirror some
|
||||
/*
|
||||
|
||||
// Mirror requests for source table to flownode asynchronously
|
||||
let flow_mirror_task = FlowMirrorTask::new(
|
||||
&self.table_flownode_set_cache,
|
||||
@@ -350,7 +350,7 @@ impl Inserter {
|
||||
.chain(instant_requests.requests.iter()),
|
||||
)
|
||||
.await?;
|
||||
flow_mirror_task.detach(self.node_manager.clone())?;*/
|
||||
flow_mirror_task.detach(self.node_manager.clone())?;
|
||||
|
||||
// Write requests to datanode and wait for response
|
||||
let write_tasks = self
|
||||
|
||||
Reference in New Issue
Block a user