feat(flow): batching mode engine (#5807)

* feat: partial impl of rr task/state

* feat: recording rule engine

* chore: rm unused

* chore: per review partially

* test: gen create table

* chore: rm some unused

* test: merge time window

* refactor: rename to batching mode

* refactor: per review

* refactor(partially): per review

* refactor: split engine.rs into three files

* refactor: use plan not sql

* chore: per review

* chore: per review

* refactor: per review

* refactor: per review

* chore: more per review

* refactor: per review

* refactor(partial): per review

* refactor: per review

* chore: clone task cheaper&more comments

* chore: fmt

* chore: typo
This commit is contained in:
discord9
2025-04-09 17:53:32 +08:00
committed by GitHub
parent 2ebe005e3c
commit df362be012
18 changed files with 1655 additions and 32 deletions

1
Cargo.lock generated
View File

@@ -4332,6 +4332,7 @@ dependencies = [
"sqlparser 0.54.0 (git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=e98e6b322426a9d397a71efef17075966223c089)",
"store-api",
"strfmt",
"substrait 0.14.0",
"table",
"tokio",
"toml 0.8.19",

View File

@@ -164,7 +164,7 @@ impl Database {
from_grpc_response(response)
}
async fn handle(&self, request: Request) -> Result<u32> {
pub async fn handle(&self, request: Request) -> Result<u32> {
let mut client = make_database_client(&self.client)?.inner;
let request = self.to_rpc_request(request);
let response = client.handle(request).await?.into_inner();

View File

@@ -307,8 +307,7 @@ impl Procedure for CreateFlowProcedure {
}
pub fn determine_flow_type(_flow_task: &CreateFlowTask) -> FlowType {
// TODO(discord9): determine flow type
FlowType::RecordingRule
FlowType::Batching
}
/// The state of [CreateFlowProcedure].
@@ -327,27 +326,28 @@ pub enum CreateFlowState {
/// The type of flow.
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
pub enum FlowType {
/// The flow is a recording rule task.
RecordingRule,
/// The flow is a batching task.
Batching,
/// The flow is a streaming task.
Streaming,
}
impl FlowType {
pub const RECORDING_RULE: &str = "recording_rule";
pub const BATCHING: &str = "batching";
pub const STREAMING: &str = "streaming";
pub const FLOW_TYPE_KEY: &str = "flow_type";
}
impl Default for FlowType {
fn default() -> Self {
Self::RecordingRule
Self::Batching
}
}
impl fmt::Display for FlowType {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
FlowType::RecordingRule => write!(f, "{}", FlowType::RECORDING_RULE),
FlowType::Batching => write!(f, "{}", FlowType::BATCHING),
FlowType::Streaming => write!(f, "{}", FlowType::STREAMING),
}
}
@@ -390,7 +390,8 @@ impl From<&CreateFlowData> for CreateRequest {
};
let flow_type = value.flow_type.unwrap_or_default().to_string();
req.flow_options.insert("flow_type".to_string(), flow_type);
req.flow_options
.insert(FlowType::FLOW_TYPE_KEY.to_string(), flow_type);
req
}
}

View File

@@ -75,7 +75,7 @@ use crate::FrontendInvoker;
// `GREPTIME_TIMESTAMP` is not used to distinguish when table is created automatically by flow
pub const AUTO_CREATED_PLACEHOLDER_TS_COL: &str = "__ts_placeholder";
pub const UPDATE_AT_TS_COL: &str = "update_at";
pub const AUTO_CREATED_UPDATE_AT_TS_COL: &str = "update_at";
// TODO(discord9): refactor common types for flow to a separate module
/// FlowId is a unique identifier for a flow task
@@ -508,7 +508,7 @@ impl FlowWorkerManager {
})
.unwrap_or_default();
let update_at = ColumnSchema::new(
UPDATE_AT_TS_COL,
AUTO_CREATED_UPDATE_AT_TS_COL,
ConcreteDataType::timestamp_millisecond_datatype(),
true,
);

View File

@@ -12,14 +12,23 @@
// See the License for the specific language governing permissions and
// limitations under the License.
//! Run flow as recording rule which is time-window-aware normal query triggered when new data arrives
//! Run flow as batching mode which is time-window-aware normal query triggered when new data arrives
use std::time::Duration;
mod engine;
mod frontend_client;
mod state;
mod task;
mod time_window;
mod utils;
/// TODO(discord9): make those constants configurable
/// The default rule engine query timeout is 10 minutes
pub const DEFAULT_RULE_ENGINE_QUERY_TIMEOUT: Duration = Duration::from_secs(10 * 60);
/// The default batching engine query timeout is 10 minutes
pub const DEFAULT_BATCHING_ENGINE_QUERY_TIMEOUT: Duration = Duration::from_secs(10 * 60);
/// will output a warn log for any query that runs for more that 1 minutes, and also every 1 minutes when that query is still running
pub const SLOW_QUERY_THRESHOLD: Duration = Duration::from_secs(60);
/// The minimum duration between two queries execution by batching mode task
const MIN_REFRESH_DURATION: Duration = Duration::new(5, 0);

View File

@@ -0,0 +1,342 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::{BTreeMap, HashMap};
use std::sync::Arc;
use api::v1::flow::FlowResponse;
use common_error::ext::BoxedError;
use common_meta::ddl::create_flow::FlowType;
use common_meta::key::flow::FlowMetadataManagerRef;
use common_meta::key::table_info::TableInfoManager;
use common_meta::key::TableMetadataManagerRef;
use common_runtime::JoinHandle;
use common_telemetry::info;
use common_telemetry::tracing::warn;
use query::QueryEngineRef;
use snafu::{ensure, OptionExt, ResultExt};
use store_api::storage::RegionId;
use table::metadata::TableId;
use tokio::sync::{oneshot, RwLock};
use crate::adapter::{CreateFlowArgs, FlowId, TableName};
use crate::batching_mode::frontend_client::FrontendClient;
use crate::batching_mode::task::BatchingTask;
use crate::batching_mode::time_window::{find_time_window_expr, TimeWindowExpr};
use crate::batching_mode::utils::sql_to_df_plan;
use crate::error::{ExternalSnafu, FlowAlreadyExistSnafu, TableNotFoundMetaSnafu, UnexpectedSnafu};
use crate::Error;
/// Batching mode Engine, responsible for driving all the batching mode tasks
///
/// TODO(discord9): determine how to configure refresh rate
pub struct BatchingEngine {
tasks: RwLock<BTreeMap<FlowId, BatchingTask>>,
shutdown_txs: RwLock<BTreeMap<FlowId, oneshot::Sender<()>>>,
frontend_client: Arc<FrontendClient>,
flow_metadata_manager: FlowMetadataManagerRef,
table_meta: TableMetadataManagerRef,
query_engine: QueryEngineRef,
}
impl BatchingEngine {
pub fn new(
frontend_client: Arc<FrontendClient>,
query_engine: QueryEngineRef,
flow_metadata_manager: FlowMetadataManagerRef,
table_meta: TableMetadataManagerRef,
) -> Self {
Self {
tasks: Default::default(),
shutdown_txs: Default::default(),
frontend_client,
flow_metadata_manager,
table_meta,
query_engine,
}
}
pub async fn handle_inserts(
&self,
request: api::v1::region::InsertRequests,
) -> Result<FlowResponse, Error> {
let table_info_mgr = self.table_meta.table_info_manager();
let mut group_by_table_id: HashMap<TableId, Vec<api::v1::Rows>> = HashMap::new();
for r in request.requests {
let tid = RegionId::from(r.region_id).table_id();
let entry = group_by_table_id.entry(tid).or_default();
if let Some(rows) = r.rows {
entry.push(rows);
}
}
let tids = group_by_table_id.keys().cloned().collect::<Vec<TableId>>();
let table_infos =
table_info_mgr
.batch_get(&tids)
.await
.with_context(|_| TableNotFoundMetaSnafu {
msg: format!("Failed to get table info for table ids: {:?}", tids),
})?;
let missing_tids = tids
.iter()
.filter(|id| !table_infos.contains_key(id))
.collect::<Vec<_>>();
if !missing_tids.is_empty() {
warn!(
"Failed to get all the table info for table ids, expected table ids: {:?}, those table doesn't exist: {:?}",
tids,
missing_tids
);
}
let group_by_table_name = group_by_table_id
.into_iter()
.filter_map(|(id, rows)| {
let table_name = table_infos.get(&id).map(|info| info.table_name());
let Some(table_name) = table_name else {
warn!("Failed to get table infos for table id: {:?}", id);
return None;
};
let table_name = [
table_name.catalog_name,
table_name.schema_name,
table_name.table_name,
];
Some((table_name, rows))
})
.collect::<HashMap<_, _>>();
let group_by_table_name = Arc::new(group_by_table_name);
let mut handles = Vec::new();
let tasks = self.tasks.read().await;
for (_flow_id, task) in tasks.iter() {
let src_table_names = &task.config.source_table_names;
if src_table_names
.iter()
.all(|name| !group_by_table_name.contains_key(name))
{
continue;
}
let group_by_table_name = group_by_table_name.clone();
let task = task.clone();
let handle: JoinHandle<Result<(), Error>> = tokio::spawn(async move {
let src_table_names = &task.config.source_table_names;
for src_table_name in src_table_names {
if let Some(entry) = group_by_table_name.get(src_table_name) {
let Some(expr) = &task.config.time_window_expr else {
continue;
};
let involved_time_windows = expr.handle_rows(entry.clone()).await?;
let mut state = task.state.write().unwrap();
state
.dirty_time_windows
.add_lower_bounds(involved_time_windows.into_iter());
}
}
Ok(())
});
handles.push(handle);
}
for handle in handles {
match handle.await {
Err(e) => {
warn!("Failed to handle inserts: {e}");
}
Ok(Ok(())) => (),
Ok(Err(e)) => {
warn!("Failed to handle inserts: {e}");
}
}
}
drop(tasks);
Ok(Default::default())
}
}
async fn get_table_name(
table_info: &TableInfoManager,
table_id: &TableId,
) -> Result<TableName, Error> {
table_info
.get(*table_id)
.await
.map_err(BoxedError::new)
.context(ExternalSnafu)?
.with_context(|| UnexpectedSnafu {
reason: format!("Table id = {:?}, couldn't found table name", table_id),
})
.map(|name| name.table_name())
.map(|name| [name.catalog_name, name.schema_name, name.table_name])
}
impl BatchingEngine {
pub async fn create_flow(&self, args: CreateFlowArgs) -> Result<Option<FlowId>, Error> {
let CreateFlowArgs {
flow_id,
sink_table_name,
source_table_ids,
create_if_not_exists,
or_replace,
expire_after,
comment: _,
sql,
flow_options,
query_ctx,
} = args;
// or replace logic
{
let is_exist = self.tasks.read().await.contains_key(&flow_id);
match (create_if_not_exists, or_replace, is_exist) {
// if replace, ignore that old flow exists
(_, true, true) => {
info!("Replacing flow with id={}", flow_id);
}
(false, false, true) => FlowAlreadyExistSnafu { id: flow_id }.fail()?,
// already exists, and not replace, return None
(true, false, true) => {
info!("Flow with id={} already exists, do nothing", flow_id);
return Ok(None);
}
// continue as normal
(_, _, false) => (),
}
}
let flow_type = flow_options.get(FlowType::FLOW_TYPE_KEY);
ensure!(
match flow_type {
None => true,
Some(ty) if ty == FlowType::BATCHING => true,
_ => false,
},
UnexpectedSnafu {
reason: format!("Flow type is not batching nor None, got {flow_type:?}")
}
);
let Some(query_ctx) = query_ctx else {
UnexpectedSnafu {
reason: "Query context is None".to_string(),
}
.fail()?
};
let query_ctx = Arc::new(query_ctx);
let mut source_table_names = Vec::with_capacity(2);
for src_id in source_table_ids {
let table_name = get_table_name(self.table_meta.table_info_manager(), &src_id).await?;
source_table_names.push(table_name);
}
let (tx, rx) = oneshot::channel();
let plan = sql_to_df_plan(query_ctx.clone(), self.query_engine.clone(), &sql, true).await?;
let (column_name, time_window_expr, _, df_schema) = find_time_window_expr(
&plan,
self.query_engine.engine_state().catalog_manager().clone(),
query_ctx.clone(),
)
.await?;
let phy_expr = time_window_expr
.map(|expr| {
TimeWindowExpr::from_expr(
&expr,
&column_name,
&df_schema,
&self.query_engine.engine_state().session_state(),
)
})
.transpose()?;
info!("Flow id={}, found time window expr={:?}", flow_id, phy_expr);
let task = BatchingTask::new(
flow_id,
&sql,
plan,
phy_expr,
expire_after,
sink_table_name,
source_table_names,
query_ctx,
self.table_meta.clone(),
rx,
);
let task_inner = task.clone();
let engine = self.query_engine.clone();
let frontend = self.frontend_client.clone();
// check execute once first to detect any error early
task.check_execute(&engine, &frontend).await?;
// TODO(discord9): also save handle & use time wheel or what for better
let _handle = common_runtime::spawn_global(async move {
task_inner.start_executing_loop(engine, frontend).await;
});
// only replace here not earlier because we want the old one intact if something went wrong before this line
let replaced_old_task_opt = self.tasks.write().await.insert(flow_id, task);
drop(replaced_old_task_opt);
self.shutdown_txs.write().await.insert(flow_id, tx);
Ok(Some(flow_id))
}
pub async fn remove_flow(&self, flow_id: FlowId) -> Result<(), Error> {
if self.tasks.write().await.remove(&flow_id).is_none() {
warn!("Flow {flow_id} not found in tasks")
}
let Some(tx) = self.shutdown_txs.write().await.remove(&flow_id) else {
UnexpectedSnafu {
reason: format!("Can't found shutdown tx for flow {flow_id}"),
}
.fail()?
};
if tx.send(()).is_err() {
warn!("Fail to shutdown flow {flow_id} due to receiver already dropped, maybe flow {flow_id} is already dropped?")
}
Ok(())
}
pub async fn flush_flow(&self, flow_id: FlowId) -> Result<(), Error> {
let task = self.tasks.read().await.get(&flow_id).cloned();
let task = task.with_context(|| UnexpectedSnafu {
reason: format!("Can't found task for flow {flow_id}"),
})?;
task.gen_exec_once(&self.query_engine, &self.frontend_client)
.await?;
Ok(())
}
/// Determine if the batching mode flow task exists with given flow id
pub async fn flow_exist(&self, flow_id: FlowId) -> bool {
self.tasks.read().await.contains_key(&flow_id)
}
}

View File

@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
//! Frontend client to run flow as recording rule which is time-window-aware normal query triggered every tick set by user
//! Frontend client to run flow as batching task which is time-window-aware normal query triggered every tick set by user
use std::sync::Arc;
@@ -25,12 +25,12 @@ use common_meta::rpc::store::RangeRequest;
use meta_client::client::MetaClient;
use snafu::ResultExt;
use crate::batching_mode::DEFAULT_BATCHING_ENGINE_QUERY_TIMEOUT;
use crate::error::{ExternalSnafu, UnexpectedSnafu};
use crate::recording_rules::DEFAULT_RULE_ENGINE_QUERY_TIMEOUT;
use crate::Error;
fn default_channel_mgr() -> ChannelManager {
let cfg = ChannelConfig::new().timeout(DEFAULT_RULE_ENGINE_QUERY_TIMEOUT);
let cfg = ChannelConfig::new().timeout(DEFAULT_BATCHING_ENGINE_QUERY_TIMEOUT);
ChannelManager::with_config(cfg)
}

View File

@@ -0,0 +1,408 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Batching mode task state, which changes frequently
use std::collections::BTreeMap;
use std::time::Duration;
use common_telemetry::debug;
use common_telemetry::tracing::warn;
use common_time::Timestamp;
use datatypes::value::Value;
use session::context::QueryContextRef;
use snafu::ResultExt;
use tokio::sync::oneshot;
use tokio::time::Instant;
use crate::adapter::FlowId;
use crate::batching_mode::task::BatchingTask;
use crate::batching_mode::MIN_REFRESH_DURATION;
use crate::error::{DatatypesSnafu, InternalSnafu, TimeSnafu};
use crate::Error;
/// The state of the [`BatchingTask`].
#[derive(Debug)]
pub struct TaskState {
/// Query context
pub(crate) query_ctx: QueryContextRef,
/// last query complete time
last_update_time: Instant,
/// last time query duration
last_query_duration: Duration,
/// Dirty Time windows need to be updated
/// mapping of `start -> end` and non-overlapping
pub(crate) dirty_time_windows: DirtyTimeWindows,
exec_state: ExecState,
/// Shutdown receiver
pub(crate) shutdown_rx: oneshot::Receiver<()>,
}
impl TaskState {
pub fn new(query_ctx: QueryContextRef, shutdown_rx: oneshot::Receiver<()>) -> Self {
Self {
query_ctx,
last_update_time: Instant::now(),
last_query_duration: Duration::from_secs(0),
dirty_time_windows: Default::default(),
exec_state: ExecState::Idle,
shutdown_rx,
}
}
/// called after last query is done
/// `is_succ` indicate whether the last query is successful
pub fn after_query_exec(&mut self, elapsed: Duration, _is_succ: bool) {
self.exec_state = ExecState::Idle;
self.last_query_duration = elapsed;
self.last_update_time = Instant::now();
}
/// wait for at least `last_query_duration`, at most `max_timeout` to start next query
///
/// if have more dirty time window, exec next query immediately
pub fn get_next_start_query_time(&self, max_timeout: Option<Duration>) -> Instant {
let next_duration = max_timeout
.unwrap_or(self.last_query_duration)
.min(self.last_query_duration);
let next_duration = next_duration.max(MIN_REFRESH_DURATION);
// if have dirty time window, execute immediately to clean dirty time window
if self.dirty_time_windows.windows.is_empty() {
self.last_update_time + next_duration
} else {
Instant::now()
}
}
}
/// For keep recording of dirty time windows, which is time window that have new data inserted
/// since last query.
#[derive(Debug, Clone, Default)]
pub struct DirtyTimeWindows {
/// windows's `start -> end` and non-overlapping
/// `end` is exclusive(and optional)
windows: BTreeMap<Timestamp, Option<Timestamp>>,
}
impl DirtyTimeWindows {
/// Time window merge distance
///
/// TODO(discord9): make those configurable
const MERGE_DIST: i32 = 3;
/// Maximum number of filters allowed in a single query
const MAX_FILTER_NUM: usize = 20;
/// Add lower bounds to the dirty time windows. Upper bounds are ignored.
///
/// # Arguments
///
/// * `lower_bounds` - An iterator of lower bounds to be added.
pub fn add_lower_bounds(&mut self, lower_bounds: impl Iterator<Item = Timestamp>) {
for lower_bound in lower_bounds {
let entry = self.windows.entry(lower_bound);
entry.or_insert(None);
}
}
/// Generate all filter expressions consuming all time windows
pub fn gen_filter_exprs(
&mut self,
col_name: &str,
expire_lower_bound: Option<Timestamp>,
window_size: chrono::Duration,
flow_id: FlowId,
task_ctx: Option<&BatchingTask>,
) -> Result<Option<datafusion_expr::Expr>, Error> {
debug!(
"expire_lower_bound: {:?}, window_size: {:?}",
expire_lower_bound.map(|t| t.to_iso8601_string()),
window_size
);
self.merge_dirty_time_windows(window_size, expire_lower_bound)?;
if self.windows.len() > Self::MAX_FILTER_NUM {
let first_time_window = self.windows.first_key_value();
let last_time_window = self.windows.last_key_value();
if let Some(task_ctx) = task_ctx {
warn!(
"Flow id = {:?}, too many time windows: {}, only the first {} are taken for this query, the group by expression might be wrong. Time window expr={:?}, expire_after={:?}, first_time_window={:?}, last_time_window={:?}, the original query: {:?}",
task_ctx.config.flow_id,
self.windows.len(),
Self::MAX_FILTER_NUM,
task_ctx.config.time_window_expr,
task_ctx.config.expire_after,
first_time_window,
last_time_window,
task_ctx.config.query
);
} else {
warn!("Flow id = {:?}, too many time windows: {}, only the first {} are taken for this query, the group by expression might be wrong. first_time_window={:?}, last_time_window={:?}",
flow_id,
self.windows.len(),
Self::MAX_FILTER_NUM,
first_time_window,
last_time_window
)
}
}
// get the first `MAX_FILTER_NUM` time windows
let nth = self
.windows
.iter()
.nth(Self::MAX_FILTER_NUM)
.map(|(key, _)| *key);
let first_nth = {
if let Some(nth) = nth {
let mut after = self.windows.split_off(&nth);
std::mem::swap(&mut self.windows, &mut after);
after
} else {
std::mem::take(&mut self.windows)
}
};
let mut expr_lst = vec![];
for (start, end) in first_nth.into_iter() {
debug!(
"Time window start: {:?}, end: {:?}",
start.to_iso8601_string(),
end.map(|t| t.to_iso8601_string())
);
use datafusion_expr::{col, lit};
let lower = to_df_literal(start)?;
let upper = end.map(to_df_literal).transpose()?;
let expr = if let Some(upper) = upper {
col(col_name)
.gt_eq(lit(lower))
.and(col(col_name).lt(lit(upper)))
} else {
col(col_name).gt_eq(lit(lower))
};
expr_lst.push(expr);
}
let expr = expr_lst.into_iter().reduce(|a, b| a.or(b));
Ok(expr)
}
/// Merge time windows that overlaps or get too close
pub fn merge_dirty_time_windows(
&mut self,
window_size: chrono::Duration,
expire_lower_bound: Option<Timestamp>,
) -> Result<(), Error> {
if self.windows.is_empty() {
return Ok(());
}
let mut new_windows = BTreeMap::new();
let std_window_size = window_size.to_std().map_err(|e| {
InternalSnafu {
reason: e.to_string(),
}
.build()
})?;
// previous time window
let mut prev_tw = None;
for (lower_bound, upper_bound) in std::mem::take(&mut self.windows) {
// filter out expired time window
if let Some(expire_lower_bound) = expire_lower_bound {
if lower_bound < expire_lower_bound {
continue;
}
}
let Some(prev_tw) = &mut prev_tw else {
prev_tw = Some((lower_bound, upper_bound));
continue;
};
// if cur.lower - prev.upper <= window_size * MERGE_DIST, merge
// this also deal with overlap windows because cur.lower > prev.lower is always true
let prev_upper = prev_tw
.1
.unwrap_or(prev_tw.0.add_duration(std_window_size).context(TimeSnafu)?);
prev_tw.1 = Some(prev_upper);
let cur_upper = upper_bound.unwrap_or(
lower_bound
.add_duration(std_window_size)
.context(TimeSnafu)?,
);
if lower_bound
.sub(&prev_upper)
.map(|dist| dist <= window_size * Self::MERGE_DIST)
.unwrap_or(false)
{
prev_tw.1 = Some(cur_upper);
} else {
new_windows.insert(prev_tw.0, prev_tw.1);
*prev_tw = (lower_bound, Some(cur_upper));
}
}
if let Some(prev_tw) = prev_tw {
new_windows.insert(prev_tw.0, prev_tw.1);
}
self.windows = new_windows;
Ok(())
}
}
fn to_df_literal(value: Timestamp) -> Result<datafusion_common::ScalarValue, Error> {
let value = Value::from(value);
let value = value
.try_to_scalar_value(&value.data_type())
.with_context(|_| DatatypesSnafu {
extra: format!("Failed to convert to scalar value: {}", value),
})?;
Ok(value)
}
#[derive(Debug, Clone)]
enum ExecState {
Idle,
Executing,
}
#[cfg(test)]
mod test {
use pretty_assertions::assert_eq;
use super::*;
#[test]
fn test_merge_dirty_time_windows() {
let testcases = vec![
// just enough to merge
(
vec![
Timestamp::new_second(0),
Timestamp::new_second((1 + DirtyTimeWindows::MERGE_DIST as i64) * 5 * 60),
],
(chrono::Duration::seconds(5 * 60), None),
BTreeMap::from([(
Timestamp::new_second(0),
Some(Timestamp::new_second(
(2 + DirtyTimeWindows::MERGE_DIST as i64) * 5 * 60,
)),
)]),
Some(
"((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts < CAST('1970-01-01 00:25:00' AS TIMESTAMP)))",
)
),
// separate time window
(
vec![
Timestamp::new_second(0),
Timestamp::new_second((2 + DirtyTimeWindows::MERGE_DIST as i64) * 5 * 60),
],
(chrono::Duration::seconds(5 * 60), None),
BTreeMap::from([
(
Timestamp::new_second(0),
Some(Timestamp::new_second(5 * 60)),
),
(
Timestamp::new_second((2 + DirtyTimeWindows::MERGE_DIST as i64) * 5 * 60),
Some(Timestamp::new_second(
(3 + DirtyTimeWindows::MERGE_DIST as i64) * 5 * 60,
)),
),
]),
Some(
"(((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts < CAST('1970-01-01 00:05:00' AS TIMESTAMP))) OR ((ts >= CAST('1970-01-01 00:25:00' AS TIMESTAMP)) AND (ts < CAST('1970-01-01 00:30:00' AS TIMESTAMP))))",
)
),
// overlapping
(
vec![
Timestamp::new_second(0),
Timestamp::new_second((DirtyTimeWindows::MERGE_DIST as i64) * 5 * 60),
],
(chrono::Duration::seconds(5 * 60), None),
BTreeMap::from([(
Timestamp::new_second(0),
Some(Timestamp::new_second(
(1 + DirtyTimeWindows::MERGE_DIST as i64) * 5 * 60,
)),
)]),
Some(
"((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts < CAST('1970-01-01 00:20:00' AS TIMESTAMP)))",
)
),
// complex overlapping
(
vec![
Timestamp::new_second(0),
Timestamp::new_second((DirtyTimeWindows::MERGE_DIST as i64) * 3),
Timestamp::new_second((DirtyTimeWindows::MERGE_DIST as i64) * 3 * 2),
],
(chrono::Duration::seconds(3), None),
BTreeMap::from([(
Timestamp::new_second(0),
Some(Timestamp::new_second(
(DirtyTimeWindows::MERGE_DIST as i64) * 7
)),
)]),
Some(
"((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts < CAST('1970-01-01 00:00:21' AS TIMESTAMP)))",
)
),
// expired
(
vec![
Timestamp::new_second(0),
Timestamp::new_second((DirtyTimeWindows::MERGE_DIST as i64) * 5 * 60),
],
(
chrono::Duration::seconds(5 * 60),
Some(Timestamp::new_second(
(DirtyTimeWindows::MERGE_DIST as i64) * 6 * 60,
)),
),
BTreeMap::from([]),
None
),
];
for (lower_bounds, (window_size, expire_lower_bound), expected, expected_filter_expr) in
testcases
{
let mut dirty = DirtyTimeWindows::default();
dirty.add_lower_bounds(lower_bounds.into_iter());
dirty
.merge_dirty_time_windows(window_size, expire_lower_bound)
.unwrap();
assert_eq!(expected, dirty.windows);
let filter_expr = dirty
.gen_filter_exprs("ts", expire_lower_bound, window_size, 0, None)
.unwrap();
let unparser = datafusion::sql::unparser::Unparser::default();
let to_sql = filter_expr
.as_ref()
.map(|e| unparser.expr_to_sql(e).unwrap().to_string());
assert_eq!(expected_filter_expr, to_sql.as_deref());
}
}
}

View File

@@ -0,0 +1,788 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashSet;
use std::ops::Deref;
use std::sync::{Arc, RwLock};
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use api::v1::CreateTableExpr;
use arrow_schema::Fields;
use common_error::ext::BoxedError;
use common_meta::key::table_name::TableNameKey;
use common_meta::key::TableMetadataManagerRef;
use common_telemetry::tracing::warn;
use common_telemetry::{debug, info};
use common_time::Timestamp;
use datafusion::sql::unparser::expr_to_sql;
use datafusion_common::tree_node::TreeNode;
use datafusion_expr::{DmlStatement, LogicalPlan, WriteOp};
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::constraint::NOW_FN;
use datatypes::schema::{ColumnDefaultConstraint, ColumnSchema};
use datatypes::value::Value;
use operator::expr_helper::column_schemas_to_defs;
use query::query_engine::DefaultSerializer;
use query::QueryEngineRef;
use session::context::QueryContextRef;
use snafu::{OptionExt, ResultExt};
use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan};
use table::metadata::RawTableMeta;
use tokio::sync::oneshot;
use tokio::sync::oneshot::error::TryRecvError;
use tokio::time::Instant;
use crate::adapter::{FlowId, AUTO_CREATED_PLACEHOLDER_TS_COL, AUTO_CREATED_UPDATE_AT_TS_COL};
use crate::batching_mode::frontend_client::FrontendClient;
use crate::batching_mode::state::TaskState;
use crate::batching_mode::time_window::TimeWindowExpr;
use crate::batching_mode::utils::{
sql_to_df_plan, AddAutoColumnRewriter, AddFilterRewriter, FindGroupByFinalName,
};
use crate::batching_mode::{
DEFAULT_BATCHING_ENGINE_QUERY_TIMEOUT, MIN_REFRESH_DURATION, SLOW_QUERY_THRESHOLD,
};
use crate::error::{
ConvertColumnSchemaSnafu, DatafusionSnafu, DatatypesSnafu, ExternalSnafu, InvalidRequestSnafu,
SubstraitEncodeLogicalPlanSnafu, TableNotFoundMetaSnafu, TableNotFoundSnafu, UnexpectedSnafu,
};
use crate::metrics::{
METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME, METRIC_FLOW_BATCHING_ENGINE_SLOW_QUERY,
};
use crate::Error;
/// The task's config, immutable once created
#[derive(Clone)]
pub struct TaskConfig {
pub flow_id: FlowId,
pub query: String,
plan: Arc<LogicalPlan>,
pub time_window_expr: Option<TimeWindowExpr>,
/// in seconds
pub expire_after: Option<i64>,
sink_table_name: [String; 3],
pub source_table_names: HashSet<[String; 3]>,
table_meta: TableMetadataManagerRef,
}
#[derive(Clone)]
pub struct BatchingTask {
pub config: Arc<TaskConfig>,
pub state: Arc<RwLock<TaskState>>,
}
impl BatchingTask {
#[allow(clippy::too_many_arguments)]
pub fn new(
flow_id: FlowId,
query: &str,
plan: LogicalPlan,
time_window_expr: Option<TimeWindowExpr>,
expire_after: Option<i64>,
sink_table_name: [String; 3],
source_table_names: Vec<[String; 3]>,
query_ctx: QueryContextRef,
table_meta: TableMetadataManagerRef,
shutdown_rx: oneshot::Receiver<()>,
) -> Self {
Self {
config: Arc::new(TaskConfig {
flow_id,
query: query.to_string(),
plan: Arc::new(plan),
time_window_expr,
expire_after,
sink_table_name,
source_table_names: source_table_names.into_iter().collect(),
table_meta,
}),
state: Arc::new(RwLock::new(TaskState::new(query_ctx, shutdown_rx))),
}
}
/// Test execute, for check syntax or such
pub async fn check_execute(
&self,
engine: &QueryEngineRef,
frontend_client: &Arc<FrontendClient>,
) -> Result<Option<(u32, Duration)>, Error> {
// use current time to test get a dirty time window, which should be safe
let start = SystemTime::now();
let ts = Timestamp::new_second(
start
.duration_since(UNIX_EPOCH)
.expect("Time went backwards")
.as_secs() as _,
);
self.state
.write()
.unwrap()
.dirty_time_windows
.add_lower_bounds(vec![ts].into_iter());
if !self.is_table_exist(&self.config.sink_table_name).await? {
let create_table = self.gen_create_table_expr(engine.clone()).await?;
info!(
"Try creating sink table(if not exists) with expr: {:?}",
create_table
);
self.create_table(frontend_client, create_table).await?;
info!(
"Sink table {}(if not exists) created",
self.config.sink_table_name.join(".")
);
}
self.gen_exec_once(engine, frontend_client).await
}
async fn is_table_exist(&self, table_name: &[String; 3]) -> Result<bool, Error> {
self.config
.table_meta
.table_name_manager()
.exists(TableNameKey {
catalog: &table_name[0],
schema: &table_name[1],
table: &table_name[2],
})
.await
.map_err(BoxedError::new)
.context(ExternalSnafu)
}
pub async fn gen_exec_once(
&self,
engine: &QueryEngineRef,
frontend_client: &Arc<FrontendClient>,
) -> Result<Option<(u32, Duration)>, Error> {
if let Some(new_query) = self.gen_insert_plan(engine).await? {
self.execute_logical_plan(frontend_client, &new_query).await
} else {
Ok(None)
}
}
pub async fn gen_insert_plan(
&self,
engine: &QueryEngineRef,
) -> Result<Option<LogicalPlan>, Error> {
let full_table_name = self.config.sink_table_name.clone().join(".");
let table_id = self
.config
.table_meta
.table_name_manager()
.get(common_meta::key::table_name::TableNameKey::new(
&self.config.sink_table_name[0],
&self.config.sink_table_name[1],
&self.config.sink_table_name[2],
))
.await
.with_context(|_| TableNotFoundMetaSnafu {
msg: full_table_name.clone(),
})?
.map(|t| t.table_id())
.with_context(|| TableNotFoundSnafu {
name: full_table_name.clone(),
})?;
let table = self
.config
.table_meta
.table_info_manager()
.get(table_id)
.await
.with_context(|_| TableNotFoundMetaSnafu {
msg: full_table_name.clone(),
})?
.with_context(|| TableNotFoundSnafu {
name: full_table_name.clone(),
})?
.into_inner();
let schema: datatypes::schema::Schema = table
.table_info
.meta
.schema
.clone()
.try_into()
.with_context(|_| DatatypesSnafu {
extra: format!(
"Failed to convert schema from raw schema, raw_schema={:?}",
table.table_info.meta.schema
),
})?;
let df_schema = Arc::new(schema.arrow_schema().clone().try_into().with_context(|_| {
DatafusionSnafu {
context: format!(
"Failed to convert arrow schema to datafusion schema, arrow_schema={:?}",
schema.arrow_schema()
),
}
})?);
let new_query = self
.gen_query_with_time_window(engine.clone(), &table.table_info.meta)
.await?;
let insert_into = if let Some((new_query, _column_cnt)) = new_query {
// update_at& time index placeholder (if exists) should have default value
LogicalPlan::Dml(DmlStatement::new(
datafusion_common::TableReference::Full {
catalog: self.config.sink_table_name[0].clone().into(),
schema: self.config.sink_table_name[1].clone().into(),
table: self.config.sink_table_name[2].clone().into(),
},
df_schema,
WriteOp::Insert(datafusion_expr::dml::InsertOp::Append),
Arc::new(new_query),
))
} else {
return Ok(None);
};
Ok(Some(insert_into))
}
pub async fn create_table(
&self,
frontend_client: &Arc<FrontendClient>,
expr: CreateTableExpr,
) -> Result<(), Error> {
let db_client = frontend_client.get_database_client().await?;
db_client
.database
.create(expr.clone())
.await
.with_context(|_| InvalidRequestSnafu {
context: format!("Failed to create table with expr: {:?}", expr),
})?;
Ok(())
}
pub async fn execute_logical_plan(
&self,
frontend_client: &Arc<FrontendClient>,
plan: &LogicalPlan,
) -> Result<Option<(u32, Duration)>, Error> {
let instant = Instant::now();
let flow_id = self.config.flow_id;
let db_client = frontend_client.get_database_client().await?;
let peer_addr = db_client.peer.addr;
debug!(
"Executing flow {flow_id}(expire_after={:?} secs) on {:?} with query {}",
self.config.expire_after, peer_addr, &plan
);
let timer = METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME
.with_label_values(&[flow_id.to_string().as_str()])
.start_timer();
let message = DFLogicalSubstraitConvertor {}
.encode(plan, DefaultSerializer)
.context(SubstraitEncodeLogicalPlanSnafu)?;
let req = api::v1::greptime_request::Request::Query(api::v1::QueryRequest {
query: Some(api::v1::query_request::Query::LogicalPlan(message.to_vec())),
});
let res = db_client.database.handle(req).await;
drop(timer);
let elapsed = instant.elapsed();
if let Ok(affected_rows) = &res {
debug!(
"Flow {flow_id} executed, affected_rows: {affected_rows:?}, elapsed: {:?}",
elapsed
);
} else if let Err(err) = &res {
warn!(
"Failed to execute Flow {flow_id} on frontend {}, result: {err:?}, elapsed: {:?} with query: {}",
peer_addr, elapsed, &plan
);
}
// record slow query
if elapsed >= SLOW_QUERY_THRESHOLD {
warn!(
"Flow {flow_id} on frontend {} executed for {:?} before complete, query: {}",
peer_addr, elapsed, &plan
);
METRIC_FLOW_BATCHING_ENGINE_SLOW_QUERY
.with_label_values(&[flow_id.to_string().as_str(), &plan.to_string(), &peer_addr])
.observe(elapsed.as_secs_f64());
}
self.state
.write()
.unwrap()
.after_query_exec(elapsed, res.is_ok());
let res = res.context(InvalidRequestSnafu {
context: format!(
"Failed to execute query for flow={}: \'{}\'",
self.config.flow_id, &plan
),
})?;
Ok(Some((res, elapsed)))
}
/// start executing query in a loop, break when receive shutdown signal
///
/// any error will be logged when executing query
pub async fn start_executing_loop(
&self,
engine: QueryEngineRef,
frontend_client: Arc<FrontendClient>,
) {
loop {
let mut new_query = None;
let mut gen_and_exec = async || {
new_query = self.gen_insert_plan(&engine).await?;
if let Some(new_query) = &new_query {
self.execute_logical_plan(&frontend_client, new_query).await
} else {
Ok(None)
}
};
match gen_and_exec().await {
// normal execute, sleep for some time before doing next query
Ok(Some(_)) => {
let sleep_until = {
let mut state = self.state.write().unwrap();
match state.shutdown_rx.try_recv() {
Ok(()) => break,
Err(TryRecvError::Closed) => {
warn!(
"Unexpected shutdown flow {}, shutdown anyway",
self.config.flow_id
);
break;
}
Err(TryRecvError::Empty) => (),
}
state.get_next_start_query_time(Some(DEFAULT_BATCHING_ENGINE_QUERY_TIMEOUT))
};
tokio::time::sleep_until(sleep_until).await;
}
// no new data, sleep for some time before checking for new data
Ok(None) => {
debug!(
"Flow id = {:?} found no new data, sleep for {:?} then continue",
self.config.flow_id, MIN_REFRESH_DURATION
);
tokio::time::sleep(MIN_REFRESH_DURATION).await;
continue;
}
// TODO(discord9): this error should have better place to go, but for now just print error, also more context is needed
Err(err) => match new_query {
Some(query) => {
common_telemetry::error!(err; "Failed to execute query for flow={} with query: {query}", self.config.flow_id)
}
None => {
common_telemetry::error!(err; "Failed to generate query for flow={}", self.config.flow_id)
}
},
}
}
}
/// Generate the create table SQL
///
/// the auto created table will automatically added a `update_at` Milliseconds DEFAULT now() column in the end
/// (for compatibility with flow streaming mode)
///
/// and it will use first timestamp column as time index, all other columns will be added as normal columns and nullable
async fn gen_create_table_expr(
&self,
engine: QueryEngineRef,
) -> Result<CreateTableExpr, Error> {
let query_ctx = self.state.read().unwrap().query_ctx.clone();
let plan =
sql_to_df_plan(query_ctx.clone(), engine.clone(), &self.config.query, true).await?;
create_table_with_expr(&plan, &self.config.sink_table_name)
}
/// will merge and use the first ten time window in query
async fn gen_query_with_time_window(
&self,
engine: QueryEngineRef,
sink_table_meta: &RawTableMeta,
) -> Result<Option<(LogicalPlan, usize)>, Error> {
let query_ctx = self.state.read().unwrap().query_ctx.clone();
let start = SystemTime::now();
let since_the_epoch = start
.duration_since(UNIX_EPOCH)
.expect("Time went backwards");
let low_bound = self
.config
.expire_after
.map(|e| since_the_epoch.as_secs() - e as u64)
.unwrap_or(u64::MIN);
let low_bound = Timestamp::new_second(low_bound as i64);
let schema_len = self.config.plan.schema().fields().len();
let expire_time_window_bound = self
.config
.time_window_expr
.as_ref()
.map(|expr| expr.eval(low_bound))
.transpose()?;
let new_plan = {
let expr = {
match expire_time_window_bound {
Some((Some(l), Some(u))) => {
let window_size = u.sub(&l).with_context(|| UnexpectedSnafu {
reason: format!("Can't get window size from {u:?} - {l:?}"),
})?;
let col_name = self
.config
.time_window_expr
.as_ref()
.map(|expr| expr.column_name.clone())
.with_context(|| UnexpectedSnafu {
reason: format!(
"Flow id={:?}, Failed to get column name from time window expr",
self.config.flow_id
),
})?;
self.state
.write()
.unwrap()
.dirty_time_windows
.gen_filter_exprs(
&col_name,
Some(l),
window_size,
self.config.flow_id,
Some(self),
)?
}
_ => {
// use sink_table_meta to add to query the `update_at` and `__ts_placeholder` column's value too for compatibility reason
debug!(
"Flow id = {:?}, can't get window size: precise_lower_bound={expire_time_window_bound:?}, using the same query", self.config.flow_id
);
let mut add_auto_column =
AddAutoColumnRewriter::new(sink_table_meta.schema.clone());
let plan = self
.config
.plan
.deref()
.clone()
.rewrite(&mut add_auto_column)
.with_context(|_| DatafusionSnafu {
context: format!("Failed to rewrite plan {:?}", self.config.plan),
})?
.data;
let schema_len = plan.schema().fields().len();
// since no time window lower/upper bound is found, just return the original query(with auto columns)
return Ok(Some((plan, schema_len)));
}
}
};
debug!(
"Flow id={:?}, Generated filter expr: {:?}",
self.config.flow_id,
expr.as_ref()
.map(|expr| expr_to_sql(expr).with_context(|_| DatafusionSnafu {
context: format!("Failed to generate filter expr from {expr:?}"),
}))
.transpose()?
.map(|s| s.to_string())
);
let Some(expr) = expr else {
// no new data, hence no need to update
debug!("Flow id={:?}, no new data, not update", self.config.flow_id);
return Ok(None);
};
let mut add_filter = AddFilterRewriter::new(expr);
let mut add_auto_column = AddAutoColumnRewriter::new(sink_table_meta.schema.clone());
// make a not optimized plan for clearer unparse
let plan = sql_to_df_plan(query_ctx.clone(), engine.clone(), &self.config.query, false)
.await?;
plan.clone()
.rewrite(&mut add_filter)
.and_then(|p| p.data.rewrite(&mut add_auto_column))
.with_context(|_| DatafusionSnafu {
context: format!("Failed to rewrite plan {plan:?}"),
})?
.data
};
Ok(Some((new_plan, schema_len)))
}
}
// auto created table have a auto added column `update_at`, and optional have a `AUTO_CREATED_PLACEHOLDER_TS_COL` column for time index placeholder if no timestamp column is specified
// TODO(discord9): unit test
fn create_table_with_expr(
plan: &LogicalPlan,
sink_table_name: &[String; 3],
) -> Result<CreateTableExpr, Error> {
let fields = plan.schema().fields();
let (first_time_stamp, primary_keys) = build_primary_key_constraint(plan, fields)?;
let mut column_schemas = Vec::new();
for field in fields {
let name = field.name();
let ty = ConcreteDataType::from_arrow_type(field.data_type());
let col_schema = if first_time_stamp == Some(name.clone()) {
ColumnSchema::new(name, ty, false).with_time_index(true)
} else {
ColumnSchema::new(name, ty, true)
};
column_schemas.push(col_schema);
}
let update_at_schema = ColumnSchema::new(
AUTO_CREATED_UPDATE_AT_TS_COL,
ConcreteDataType::timestamp_millisecond_datatype(),
true,
)
.with_default_constraint(Some(ColumnDefaultConstraint::Function(NOW_FN.to_string())))
.context(DatatypesSnafu {
extra: "Failed to build column `update_at TimestampMillisecond default now()`",
})?;
column_schemas.push(update_at_schema);
let time_index = if let Some(time_index) = first_time_stamp {
time_index
} else {
column_schemas.push(
ColumnSchema::new(
AUTO_CREATED_PLACEHOLDER_TS_COL,
ConcreteDataType::timestamp_millisecond_datatype(),
false,
)
.with_time_index(true)
.with_default_constraint(Some(ColumnDefaultConstraint::Value(Value::Timestamp(
Timestamp::new_millisecond(0),
))))
.context(DatatypesSnafu {
extra: format!(
"Failed to build column `{} TimestampMillisecond TIME INDEX default 0`",
AUTO_CREATED_PLACEHOLDER_TS_COL
),
})?,
);
AUTO_CREATED_PLACEHOLDER_TS_COL.to_string()
};
let column_defs =
column_schemas_to_defs(column_schemas, &primary_keys).context(ConvertColumnSchemaSnafu)?;
Ok(CreateTableExpr {
catalog_name: sink_table_name[0].clone(),
schema_name: sink_table_name[1].clone(),
table_name: sink_table_name[2].clone(),
desc: "Auto created table by flow engine".to_string(),
column_defs,
time_index,
primary_keys,
create_if_not_exists: true,
table_options: Default::default(),
table_id: None,
engine: "mito".to_string(),
})
}
/// Return first timestamp column which is in group by clause and other columns which are also in group by clause
///
/// # Returns
///
/// * `Option<String>` - first timestamp column which is in group by clause
/// * `Vec<String>` - other columns which are also in group by clause
fn build_primary_key_constraint(
plan: &LogicalPlan,
schema: &Fields,
) -> Result<(Option<String>, Vec<String>), Error> {
let mut pk_names = FindGroupByFinalName::default();
plan.visit(&mut pk_names)
.with_context(|_| DatafusionSnafu {
context: format!("Can't find aggr expr in plan {plan:?}"),
})?;
// if no group by clause, return empty
let pk_final_names = pk_names.get_group_expr_names().unwrap_or_default();
if pk_final_names.is_empty() {
return Ok((None, Vec::new()));
}
let all_pk_cols: Vec<_> = schema
.iter()
.filter(|f| pk_final_names.contains(f.name()))
.map(|f| f.name().clone())
.collect();
// auto create table use first timestamp column in group by clause as time index
let first_time_stamp = schema
.iter()
.find(|f| {
all_pk_cols.contains(&f.name().clone())
&& ConcreteDataType::from_arrow_type(f.data_type()).is_timestamp()
})
.map(|f| f.name().clone());
let all_pk_cols: Vec<_> = all_pk_cols
.into_iter()
.filter(|col| first_time_stamp != Some(col.to_string()))
.collect();
Ok((first_time_stamp, all_pk_cols))
}
#[cfg(test)]
mod test {
use api::v1::column_def::try_as_column_schema;
use pretty_assertions::assert_eq;
use session::context::QueryContext;
use super::*;
use crate::test_utils::create_test_query_engine;
#[tokio::test]
async fn test_gen_create_table_sql() {
let query_engine = create_test_query_engine();
let ctx = QueryContext::arc();
struct TestCase {
sql: String,
sink_table_name: String,
column_schemas: Vec<ColumnSchema>,
primary_keys: Vec<String>,
time_index: String,
}
let update_at_schema = ColumnSchema::new(
AUTO_CREATED_UPDATE_AT_TS_COL,
ConcreteDataType::timestamp_millisecond_datatype(),
true,
)
.with_default_constraint(Some(ColumnDefaultConstraint::Function(NOW_FN.to_string())))
.unwrap();
let ts_placeholder_schema = ColumnSchema::new(
AUTO_CREATED_PLACEHOLDER_TS_COL,
ConcreteDataType::timestamp_millisecond_datatype(),
false,
)
.with_time_index(true)
.with_default_constraint(Some(ColumnDefaultConstraint::Value(Value::Timestamp(
Timestamp::new_millisecond(0),
))))
.unwrap();
let testcases = vec![
TestCase {
sql: "SELECT number, ts FROM numbers_with_ts".to_string(),
sink_table_name: "new_table".to_string(),
column_schemas: vec![
ColumnSchema::new("number", ConcreteDataType::uint32_datatype(), true),
ColumnSchema::new(
"ts",
ConcreteDataType::timestamp_millisecond_datatype(),
true,
),
update_at_schema.clone(),
ts_placeholder_schema.clone(),
],
primary_keys: vec![],
time_index: AUTO_CREATED_PLACEHOLDER_TS_COL.to_string(),
},
TestCase {
sql: "SELECT number, max(ts) FROM numbers_with_ts GROUP BY number".to_string(),
sink_table_name: "new_table".to_string(),
column_schemas: vec![
ColumnSchema::new("number", ConcreteDataType::uint32_datatype(), true),
ColumnSchema::new(
"max(numbers_with_ts.ts)",
ConcreteDataType::timestamp_millisecond_datatype(),
true,
),
update_at_schema.clone(),
ts_placeholder_schema.clone(),
],
primary_keys: vec!["number".to_string()],
time_index: AUTO_CREATED_PLACEHOLDER_TS_COL.to_string(),
},
TestCase {
sql: "SELECT max(number), ts FROM numbers_with_ts GROUP BY ts".to_string(),
sink_table_name: "new_table".to_string(),
column_schemas: vec![
ColumnSchema::new(
"max(numbers_with_ts.number)",
ConcreteDataType::uint32_datatype(),
true,
),
ColumnSchema::new(
"ts",
ConcreteDataType::timestamp_millisecond_datatype(),
false,
)
.with_time_index(true),
update_at_schema.clone(),
],
primary_keys: vec![],
time_index: "ts".to_string(),
},
TestCase {
sql: "SELECT number, ts FROM numbers_with_ts GROUP BY ts, number".to_string(),
sink_table_name: "new_table".to_string(),
column_schemas: vec![
ColumnSchema::new("number", ConcreteDataType::uint32_datatype(), true),
ColumnSchema::new(
"ts",
ConcreteDataType::timestamp_millisecond_datatype(),
false,
)
.with_time_index(true),
update_at_schema.clone(),
],
primary_keys: vec!["number".to_string()],
time_index: "ts".to_string(),
},
];
for tc in testcases {
let plan = sql_to_df_plan(ctx.clone(), query_engine.clone(), &tc.sql, true)
.await
.unwrap();
let expr = create_table_with_expr(
&plan,
&[
"greptime".to_string(),
"public".to_string(),
tc.sink_table_name.clone(),
],
)
.unwrap();
// TODO(discord9): assert expr
let column_schemas = expr
.column_defs
.iter()
.map(|c| try_as_column_schema(c).unwrap())
.collect::<Vec<_>>();
assert_eq!(tc.column_schemas, column_schemas);
assert_eq!(tc.primary_keys, expr.primary_keys);
assert_eq!(tc.time_index, expr.time_index);
}
}
}

View File

@@ -67,7 +67,7 @@ use crate::Error;
#[derive(Debug, Clone)]
pub struct TimeWindowExpr {
phy_expr: PhysicalExprRef,
column_name: String,
pub column_name: String,
logical_expr: Expr,
df_schema: DFSchema,
}
@@ -262,7 +262,7 @@ fn columnar_to_ts_vector(columnar: &ColumnarValue) -> Result<Vec<Option<Timestam
/// to be monotonic increasing and appears in the innermost GROUP BY clause
///
/// note this plan should only contain one TableScan
async fn find_time_window_expr(
pub async fn find_time_window_expr(
plan: &LogicalPlan,
catalog_man: CatalogManagerRef,
query_ctx: QueryContextRef,
@@ -661,7 +661,7 @@ mod test {
use session::context::QueryContext;
use super::*;
use crate::recording_rules::utils::{df_plan_to_sql, sql_to_df_plan, AddFilterRewriter};
use crate::batching_mode::utils::{df_plan_to_sql, sql_to_df_plan, AddFilterRewriter};
use crate::test_utils::create_test_query_engine;
#[tokio::test]

View File

@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
//! some utils for helping with recording rule
//! some utils for helping with batching mode
use std::collections::HashSet;
use std::sync::Arc;
@@ -160,7 +160,10 @@ impl TreeNodeVisitor<'_> for FindGroupByFinalName {
}
}
/// Add to the final select columns like `update_at`(which doesn't necessary need to have exact name just need to be a extra timestamp column) and `__ts_placeholder`(this column need to have exact this name and be a timestamp) with values like `now()` and `0`
/// Add to the final select columns like `update_at`
/// (which doesn't necessary need to have exact name just need to be a extra timestamp column)
/// and `__ts_placeholder`(this column need to have exact this name and be a timestamp)
/// with values like `now()` and `0`
#[derive(Debug)]
pub struct AddAutoColumnRewriter {
pub schema: RawSchema,

View File

@@ -54,6 +54,13 @@ pub enum Error {
location: Location,
},
#[snafu(display("Time error"))]
Time {
source: common_time::error::Error,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("External error"))]
External {
source: BoxedError,
@@ -216,6 +223,28 @@ pub enum Error {
location: Location,
name: String,
},
#[snafu(display("Invalid request: {context}"))]
InvalidRequest {
context: String,
source: client::Error,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to encode logical plan in substrait"))]
SubstraitEncodeLogicalPlan {
#[snafu(implicit)]
location: Location,
source: substrait::error::Error,
},
#[snafu(display("Failed to convert column schema to proto column def"))]
ConvertColumnSchema {
#[snafu(implicit)]
location: Location,
source: operator::error::Error,
},
}
/// the outer message is the full error stack, and inner message in header is the last error message that can be show directly to user
@@ -248,7 +277,7 @@ impl ErrorExt for Error {
| Self::FlowNotFound { .. }
| Self::ListFlows { .. } => StatusCode::TableNotFound,
Self::Plan { .. } | Self::Datatypes { .. } => StatusCode::PlanQuery,
Self::InvalidQuery { .. } | Self::CreateFlow { .. } | Self::Arrow { .. } => {
Self::CreateFlow { .. } | Self::Arrow { .. } | Self::Time { .. } => {
StatusCode::EngineExecuteQuery
}
Self::Unexpected { .. } => StatusCode::Unexpected,
@@ -261,7 +290,14 @@ impl ErrorExt for Error {
source.status_code()
}
Self::MetaClientInit { source, .. } => source.status_code(),
Self::ParseAddr { .. } => StatusCode::InvalidArguments,
Self::InvalidQuery { .. } | Self::InvalidRequest { .. } | Self::ParseAddr { .. } => {
StatusCode::InvalidArguments
}
Error::SubstraitEncodeLogicalPlan { source, .. } => source.status_code(),
Error::ConvertColumnSchema { source, .. } => source.status_code(),
}
}

View File

@@ -26,6 +26,7 @@
// allow unused for now because it should be use later
mod adapter;
mod batching_mode;
mod compute;
mod df_optimizer;
pub mod error;
@@ -33,7 +34,6 @@ mod expr;
pub mod heartbeat;
mod metrics;
mod plan;
mod recording_rules;
mod repr;
mod server;
mod transform;

View File

@@ -28,6 +28,20 @@ lazy_static! {
&["table_id"]
)
.unwrap();
pub static ref METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME: HistogramVec = register_histogram_vec!(
"greptime_flow_batching_engine_query_time_secs",
"flow batching engine query time(seconds)",
&["flow_id"],
vec![0.0, 5., 10., 20., 40., 80., 160., 320., 640.,]
)
.unwrap();
pub static ref METRIC_FLOW_BATCHING_ENGINE_SLOW_QUERY: HistogramVec = register_histogram_vec!(
"greptime_flow_batching_engine_slow_query_secs",
"flow batching engine slow query(seconds)",
&["flow_id", "sql", "peer"],
vec![60., 2. * 60., 3. * 60., 5. * 60., 10. * 60.]
)
.unwrap();
pub static ref METRIC_FLOW_RUN_INTERVAL_MS: IntGauge =
register_int_gauge!("greptime_flow_run_interval_ms", "flow run interval in ms").unwrap();
pub static ref METRIC_FLOW_ROWS: IntCounterVec = register_int_counter_vec!(

View File

@@ -61,6 +61,7 @@ snafu.workspace = true
sql.workspace = true
sqlparser.workspace = true
store-api.workspace = true
substrait.workspace = true
table.workspace = true
tokio.workspace = true
toml.workspace = true

View File

@@ -347,6 +347,13 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to decode logical plan from substrait"))]
SubstraitDecodeLogicalPlan {
#[snafu(implicit)]
location: Location,
source: substrait::error::Error,
},
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -380,6 +387,8 @@ impl ErrorExt for Error {
| Error::PrometheusMetricNamesQueryPlan { source, .. }
| Error::ExecutePromql { source, .. } => source.status_code(),
Error::SubstraitDecodeLogicalPlan { source, .. } => source.status_code(),
Error::PrometheusLabelValuesQueryPlan { source, .. } => source.status_code(),
Error::CollectRecordbatch { .. } => StatusCode::EngineExecuteQuery,

View File

@@ -20,20 +20,24 @@ use async_trait::async_trait;
use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq};
use common_query::Output;
use common_telemetry::tracing::{self};
use datafusion::execution::SessionStateBuilder;
use query::parser::PromQuery;
use servers::interceptor::{GrpcQueryInterceptor, GrpcQueryInterceptorRef};
use servers::query_handler::grpc::GrpcQueryHandler;
use servers::query_handler::sql::SqlQueryHandler;
use session::context::QueryContextRef;
use snafu::{ensure, OptionExt, ResultExt};
use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan};
use table::table_name::TableName;
use crate::error::{
Error, InFlightWriteBytesExceededSnafu, IncompleteGrpcRequestSnafu, NotSupportedSnafu,
PermissionSnafu, Result, TableOperationSnafu,
PermissionSnafu, Result, SubstraitDecodeLogicalPlanSnafu, TableOperationSnafu,
};
use crate::instance::{attach_timer, Instance};
use crate::metrics::{GRPC_HANDLE_PROMQL_ELAPSED, GRPC_HANDLE_SQL_ELAPSED};
use crate::metrics::{
GRPC_HANDLE_PLAN_ELAPSED, GRPC_HANDLE_PROMQL_ELAPSED, GRPC_HANDLE_SQL_ELAPSED,
};
#[async_trait]
impl GrpcQueryHandler for Instance {
@@ -82,11 +86,16 @@ impl GrpcQueryHandler for Instance {
let output = result.remove(0)?;
attach_timer(output, timer)
}
Query::LogicalPlan(_) => {
return NotSupportedSnafu {
feat: "Execute LogicalPlan in Frontend",
}
.fail();
Query::LogicalPlan(plan) => {
// this path is useful internally when flownode needs to execute a logical plan through gRPC interface
let timer = GRPC_HANDLE_PLAN_ELAPSED.start_timer();
let plan = DFLogicalSubstraitConvertor {}
.decode(&*plan, SessionStateBuilder::default().build())
.await
.context(SubstraitDecodeLogicalPlanSnafu)?;
let output = SqlQueryHandler::do_exec_plan(self, plan, ctx.clone()).await?;
attach_timer(output, timer)
}
Query::PromRangeQuery(promql) => {
let timer = GRPC_HANDLE_PROMQL_ELAPSED.start_timer();

View File

@@ -26,6 +26,8 @@ lazy_static! {
.unwrap();
pub static ref GRPC_HANDLE_SQL_ELAPSED: Histogram = GRPC_HANDLE_QUERY_ELAPSED
.with_label_values(&["sql"]);
pub static ref GRPC_HANDLE_PLAN_ELAPSED: Histogram = GRPC_HANDLE_QUERY_ELAPSED
.with_label_values(&["plan"]);
pub static ref GRPC_HANDLE_PROMQL_ELAPSED: Histogram = GRPC_HANDLE_QUERY_ELAPSED
.with_label_values(&["promql"]);