From 44f1804b5ed2b122a57c8422770159947ef2794f Mon Sep 17 00:00:00 2001 From: discord9 Date: Tue, 26 May 2026 15:24:18 +0800 Subject: [PATCH] feat: add flow query-context plumbing for terminal watermarks (#8154) * feat: add flow checkpoint plumbing Signed-off-by: discord9 * fix: restore when fail Signed-off-by: discord9 * refactor: per review Signed-off-by: discord9 * refactor: per review Signed-off-by: discord9 * chore: clean up some test Signed-off-by: discord9 * clippy Signed-off-by: discord9 * refactor: move more to pr3b Signed-off-by: discord9 * refactor: per review Signed-off-by: discord9 --------- Signed-off-by: discord9 --- src/catalog/src/table_source/dummy_catalog.rs | 31 +- src/datanode/src/region_server.rs | 17 +- src/flow/src/batching_mode.rs | 4 +- src/flow/src/batching_mode/checkpoint.rs | 23 + src/flow/src/batching_mode/engine.rs | 451 +++++++++++- src/flow/src/batching_mode/frontend_client.rs | 362 +++++++++- src/flow/src/batching_mode/state.rs | 37 +- src/flow/src/batching_mode/table_creator.rs | 381 ++++++++++ src/flow/src/batching_mode/task.rs | 652 ++++++------------ src/flow/src/batching_mode/task/test.rs | 337 +++++++++ src/frontend/src/instance/grpc.rs | 11 +- src/query/src/dummy_catalog.rs | 48 +- 12 files changed, 1847 insertions(+), 507 deletions(-) create mode 100644 src/flow/src/batching_mode/checkpoint.rs create mode 100644 src/flow/src/batching_mode/table_creator.rs create mode 100644 src/flow/src/batching_mode/task/test.rs diff --git a/src/catalog/src/table_source/dummy_catalog.rs b/src/catalog/src/table_source/dummy_catalog.rs index db49db0eed..20637c3a3a 100644 --- a/src/catalog/src/table_source/dummy_catalog.rs +++ b/src/catalog/src/table_source/dummy_catalog.rs @@ -22,6 +22,7 @@ use async_trait::async_trait; use common_catalog::format_full_table_name; use datafusion::catalog::{CatalogProvider, CatalogProviderList, SchemaProvider}; use datafusion::datasource::TableProvider; +use session::context::QueryContextRef; use snafu::OptionExt; use table::table::adapter::DfTableProviderAdapter; @@ -32,12 +33,27 @@ use crate::error::TableNotExistSnafu; #[derive(Clone)] pub struct DummyCatalogList { catalog_manager: CatalogManagerRef, + query_ctx: Option, } impl DummyCatalogList { - /// Creates a new catalog list with the given catalog manager. + /// Creates a new catalog list with the given catalog manager (no query context). pub fn new(catalog_manager: CatalogManagerRef) -> Self { - Self { catalog_manager } + Self { + catalog_manager, + query_ctx: None, + } + } + + /// Creates a new catalog list with the given catalog manager and query context. + pub fn new_with_query_ctx( + catalog_manager: CatalogManagerRef, + query_ctx: QueryContextRef, + ) -> Self { + Self { + catalog_manager, + query_ctx: Some(query_ctx), + } } } @@ -68,6 +84,7 @@ impl CatalogProviderList for DummyCatalogList { Some(Arc::new(DummyCatalogProvider { catalog_name: catalog_name.to_string(), catalog_manager: self.catalog_manager.clone(), + query_ctx: self.query_ctx.clone(), })) } } @@ -77,6 +94,7 @@ impl CatalogProviderList for DummyCatalogList { struct DummyCatalogProvider { catalog_name: String, catalog_manager: CatalogManagerRef, + query_ctx: Option, } impl CatalogProvider for DummyCatalogProvider { @@ -93,6 +111,7 @@ impl CatalogProvider for DummyCatalogProvider { catalog_name: self.catalog_name.clone(), schema_name: schema_name.to_string(), catalog_manager: self.catalog_manager.clone(), + query_ctx: self.query_ctx.clone(), })) } } @@ -111,6 +130,7 @@ struct DummySchemaProvider { catalog_name: String, schema_name: String, catalog_manager: CatalogManagerRef, + query_ctx: Option, } #[async_trait] @@ -126,7 +146,12 @@ impl SchemaProvider for DummySchemaProvider { async fn table(&self, name: &str) -> datafusion::error::Result>> { let table = self .catalog_manager - .table(&self.catalog_name, &self.schema_name, name, None) + .table( + &self.catalog_name, + &self.schema_name, + name, + self.query_ctx.as_deref(), + ) .await? .with_context(|| TableNotExistSnafu { table: format_full_table_name(&self.catalog_name, &self.schema_name, name), diff --git a/src/datanode/src/region_server.rs b/src/datanode/src/region_server.rs index aa2e627ca2..d5711e1761 100644 --- a/src/datanode/src/region_server.rs +++ b/src/datanode/src/region_server.rs @@ -314,6 +314,7 @@ impl RegionServer { let ctx = request.header.as_ref().map(|h| h.into()); let query_ctx = Arc::new(ctx.unwrap_or_else(|| QueryContextBuilder::default().build())); + let region_id = request.region_id; let injector_builder = NameAwareDataSourceInjectorBuilder::from_plan(&request.plan) .context(DataFusionSnafu)?; let mut injector = injector_builder @@ -326,7 +327,6 @@ impl RegionServer { .context(DataFusionSnafu)? .data; - let region_id = request.region_id; let stream = self .inner .handle_read(QueryRequest { plan, ..request }, query_ctx.clone()) @@ -837,14 +837,13 @@ fn wrap_flow_region_watermark_stream( region_id: RegionId, query_ctx: &QueryContextRef, ) -> SendableRecordBatchStream { - let Some(seq) = should_collect_region_watermark_from_extensions(&query_ctx.extensions()) - .then(|| query_ctx.get_snapshot(region_id.as_u64())) - .flatten() - else { - return stream; - }; - - Box::pin(RegionWatermarkStream::new(stream, region_id, seq)) + if should_collect_region_watermark_from_extensions(&query_ctx.extensions()) + && let Some(seq) = query_ctx.get_snapshot(region_id.as_u64()) + { + Box::pin(RegionWatermarkStream::new(stream, region_id, seq)) as SendableRecordBatchStream + } else { + stream + } } /// Wraps a region read stream so terminal metrics can carry the scan-open watermark. diff --git a/src/flow/src/batching_mode.rs b/src/flow/src/batching_mode.rs index 4162daa20c..47b3054f54 100644 --- a/src/flow/src/batching_mode.rs +++ b/src/flow/src/batching_mode.rs @@ -20,12 +20,14 @@ use common_grpc::channel_manager::ClientTlsOption; use serde::{Deserialize, Serialize}; use session::ReadPreference; +mod checkpoint; pub(crate) mod engine; pub(crate) mod frontend_client; mod state; +mod table_creator; mod task; mod time_window; -mod utils; +pub(crate) mod utils; #[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] pub struct BatchingModeOptions { diff --git a/src/flow/src/batching_mode/checkpoint.rs b/src/flow/src/batching_mode/checkpoint.rs new file mode 100644 index 0000000000..c359360dc5 --- /dev/null +++ b/src/flow/src/batching_mode/checkpoint.rs @@ -0,0 +1,23 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#[allow(dead_code)] +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub(super) enum CheckpointMode { + /// Full-snapshot reads over the source tables. + FullSnapshot, + /// Incremental reads driven by explicitly emitted incremental scan + /// extensions. + Incremental, +} diff --git a/src/flow/src/batching_mode/engine.rs b/src/flow/src/batching_mode/engine.rs index 054f5db9d6..f37e54d80b 100644 --- a/src/flow/src/batching_mode/engine.rs +++ b/src/flow/src/batching_mode/engine.rs @@ -59,8 +59,7 @@ use crate::{CreateFlowArgs, Error, FlowId, TableName}; /// /// TODO(discord9): determine how to configure refresh rate pub struct BatchingEngine { - tasks: RwLock>, - shutdown_txs: RwLock>>, + runtime: RwLock, /// frontend client for insert request pub(crate) frontend_client: Arc, flow_metadata_manager: FlowMetadataManagerRef, @@ -72,6 +71,51 @@ pub struct BatchingEngine { pub(crate) batch_opts: Arc, } +#[derive(Default)] +struct FlowRuntimeRegistry { + tasks: BTreeMap, + shutdown_txs: BTreeMap>, +} + +impl FlowRuntimeRegistry { + fn insert( + &mut self, + flow_id: FlowId, + task: BatchingTask, + shutdown_tx: oneshot::Sender<()>, + ) -> (Option, Option>) { + ( + self.tasks.insert(flow_id, task), + self.shutdown_txs.insert(flow_id, shutdown_tx), + ) + } + + fn remove(&mut self, flow_id: FlowId) -> Option<(BatchingTask, Option>)> { + let task = self.tasks.remove(&flow_id)?; + let shutdown_tx = self.shutdown_txs.remove(&flow_id); + Some((task, shutdown_tx)) + } + + fn remove_if_current( + &mut self, + flow_id: FlowId, + task: &BatchingTask, + ) -> (Option, Option>) { + if self + .tasks + .get(&flow_id) + .is_some_and(|current| Arc::ptr_eq(¤t.state, &task.state)) + { + let Some((removed_task, removed_shutdown_tx)) = self.remove(flow_id) else { + return (None, None); + }; + (Some(removed_task), removed_shutdown_tx) + } else { + (None, None) + } + } +} + impl BatchingEngine { pub fn new( frontend_client: Arc, @@ -82,8 +126,7 @@ impl BatchingEngine { batch_opts: BatchingModeOptions, ) -> Self { Self { - tasks: Default::default(), - shutdown_txs: Default::default(), + runtime: Default::default(), frontend_client, flow_metadata_manager, table_meta, @@ -95,8 +138,9 @@ impl BatchingEngine { /// Returns last execution timestamps (millisecond) for all batching flows. pub async fn get_last_exec_time_map(&self) -> BTreeMap { - let tasks = self.tasks.read().await; - tasks + let runtime = self.runtime.read().await; + runtime + .tasks .iter() .filter_map(|(flow_id, task)| { task.last_execution_time_millis() @@ -151,10 +195,17 @@ impl BatchingEngine { let group_by_table_name = Arc::new(group_by_table_name); + let tasks = self + .runtime + .read() + .await + .tasks + .values() + .cloned() + .collect::>(); let mut handles = Vec::new(); - let tasks = self.tasks.read().await; - for (_flow_id, task) in tasks.iter() { + for task in tasks { let src_table_names = &task.config.source_table_names; if src_table_names @@ -204,7 +255,6 @@ impl BatchingEngine { }); handles.push(handle); } - drop(tasks); for handle in handles { match handle.await { Err(e) => { @@ -274,9 +324,16 @@ impl BatchingEngine { let group_by_table_name = Arc::new(group_by_table_name); + let tasks = self + .runtime + .read() + .await + .tasks + .values() + .cloned() + .collect::>(); let mut handles = Vec::new(); - let tasks = self.tasks.read().await; - for (_flow_id, task) in tasks.iter() { + for task in tasks { let src_table_names = &task.config.source_table_names; if src_table_names @@ -327,8 +384,6 @@ impl BatchingEngine { } } } - drop(tasks); - Ok(()) } } @@ -390,7 +445,7 @@ impl BatchingEngine { // or replace logic { - let is_exist = self.tasks.read().await.contains_key(&flow_id); + let is_exist = self.runtime.read().await.tasks.contains_key(&flow_id); match (create_if_not_exists, or_replace, is_exist) { // if replace, ignore that old flow exists (_, true, true) => { @@ -521,17 +576,60 @@ impl BatchingEngine { // check execute once first to detect any error early task.check_or_create_sink_table(&engine, &frontend).await?; + let (start_tx, start_rx) = oneshot::channel(); + // TODO(discord9): use time wheel or what for better let handle = common_runtime::spawn_global(async move { - task_inner.start_executing_loop(engine, frontend).await; + if start_rx.await.is_ok() { + task_inner.start_executing_loop(engine, frontend).await; + } }); task.state.write().unwrap().task_handle = Some(handle); + let task_for_rollback = task.clone(); - // only replace here not earlier because we want the old one intact if something went wrong before this line - let replaced_old_task_opt = self.tasks.write().await.insert(flow_id, task); - drop(replaced_old_task_opt); + // Only replace here, not earlier, because we want the old one intact if + // something went wrong before this line. Keep the task and shutdown + // sender in one registry lock so create/remove can't observe one + // without the other. + let (replaced_old_task_opt, replaced_old_shutdown_tx) = { + let mut runtime = self.runtime.write().await; - self.shutdown_txs.write().await.insert(flow_id, tx); + let is_exist = runtime.tasks.contains_key(&flow_id); + match (create_if_not_exists, or_replace, is_exist) { + (_, true, true) => { + info!( + "Replacing flow with id={} after final registry check", + flow_id + ); + } + (false, false, true) => { + abort_flow_task(flow_id, Some(task), "unregistered"); + return FlowAlreadyExistSnafu { id: flow_id }.fail(); + } + (true, false, true) => { + info!( + "Flow with id={} already exists at final registry check, do nothing", + flow_id + ); + abort_flow_task(flow_id, Some(task), "unregistered"); + return Ok(None); + } + (_, _, false) => (), + } + + runtime.insert(flow_id, task, tx) + }; + + notify_flow_shutdown(flow_id, replaced_old_shutdown_tx, "replaced"); + abort_flow_task(flow_id, replaced_old_task_opt, "replaced"); + if start_tx.send(()).is_err() { + self.rollback_flow_runtime_if_current(flow_id, &task_for_rollback) + .await; + UnexpectedSnafu { + reason: format!("Failed to start flow {flow_id} due to task already dropped"), + } + .fail()?; + } Ok(Some(flow_id)) } @@ -662,21 +760,25 @@ impl BatchingEngine { } pub async fn remove_flow_inner(&self, flow_id: FlowId) -> Result<(), Error> { - if self.tasks.write().await.remove(&flow_id).is_none() { - warn!("Flow {flow_id} not found in tasks"); - FlowNotFoundSnafu { id: flow_id }.fail()?; - } - let Some(tx) = self.shutdown_txs.write().await.remove(&flow_id) else { + let (task, shutdown_tx) = { + let mut runtime = self.runtime.write().await; + let Some((task, shutdown_tx)) = runtime.remove(flow_id) else { + warn!("Flow {flow_id} not found in tasks"); + FlowNotFoundSnafu { id: flow_id }.fail()? + }; + (task, shutdown_tx) + }; + + let had_shutdown_tx = notify_flow_shutdown(flow_id, shutdown_tx, "removed"); + abort_flow_task(flow_id, Some(task), "removed"); + + if !had_shutdown_tx { UnexpectedSnafu { reason: format!("Can't found shutdown tx for flow {flow_id}"), } .fail()? - }; - if tx.send(()).is_err() { - warn!( - "Fail to shutdown flow {flow_id} due to receiver already dropped, maybe flow {flow_id} is already dropped?" - ) } + Ok(()) } @@ -688,7 +790,7 @@ impl BatchingEngine { // this is only useful for the case when we are flushing the flow right after inserting data into it // TODO(discord9): find a better way to ensure the data is ready, maybe inform flownode from frontend? tokio::time::sleep(std::time::Duration::from_millis(100)).await; - let task = self.tasks.read().await.get(&flow_id).cloned(); + let task = self.runtime.read().await.tasks.get(&flow_id).cloned(); let task = task.with_context(|| FlowNotFoundSnafu { id: flow_id })?; let time_window_size = task @@ -713,7 +815,7 @@ impl BatchingEngine { ) .await?; - let affected_rows = res.map(|(r, _)| r).unwrap_or_default() as usize; + let affected_rows = res.map(|(r, _)| r).unwrap_or_default(); debug!( "Successfully flush flow {flow_id}, affected rows={}", affected_rows @@ -723,8 +825,46 @@ impl BatchingEngine { /// Determine if the batching mode flow task exists with given flow id pub async fn flow_exist_inner(&self, flow_id: FlowId) -> bool { - self.tasks.read().await.contains_key(&flow_id) + self.runtime.read().await.tasks.contains_key(&flow_id) } + + async fn rollback_flow_runtime_if_current(&self, flow_id: FlowId, task: &BatchingTask) { + let (removed_task, removed_shutdown_tx) = { + let mut runtime = self.runtime.write().await; + runtime.remove_if_current(flow_id, task) + }; + + notify_flow_shutdown(flow_id, removed_shutdown_tx, "rolled back"); + abort_flow_task(flow_id, removed_task, "rolled back"); + } +} + +fn notify_flow_shutdown(flow_id: FlowId, tx: Option>, action: &str) -> bool { + let Some(tx) = tx else { + return false; + }; + + if tx.send(()).is_err() { + warn!( + "Fail to shutdown {action} flow {flow_id} due to receiver already dropped, maybe flow {flow_id} is already dropped?" + ); + } + + true +} + +fn abort_flow_task(flow_id: FlowId, task: Option, action: &str) -> bool { + let Some(task) = task else { + return false; + }; + + if let Some(handle) = task.state.write().unwrap().task_handle.take() { + handle.abort(); + debug!("Aborted {action} flow task {flow_id}"); + return true; + } + + false } impl FlowEngine for BatchingEngine { @@ -741,7 +881,14 @@ impl FlowEngine for BatchingEngine { Ok(self.flow_exist_inner(flow_id).await) } async fn list_flows(&self) -> Result, Error> { - Ok(self.tasks.read().await.keys().cloned().collect::>()) + Ok(self + .runtime + .read() + .await + .tasks + .keys() + .cloned() + .collect::>()) } async fn handle_flow_inserts( &self, @@ -756,3 +903,241 @@ impl FlowEngine for BatchingEngine { self.handle_mark_dirty_time_window(req).await } } + +#[cfg(test)] +mod tests { + use catalog::memory::new_memory_catalog_manager; + use common_meta::key::TableMetadataManager; + use common_meta::key::flow::FlowMetadataManager; + use common_meta::kv_backend::memory::MemoryKvBackend; + use query::options::QueryOptions; + use session::context::QueryContext; + + use super::*; + use crate::test_utils::create_test_query_engine; + + struct DropNotify(Option>); + + impl Drop for DropNotify { + fn drop(&mut self) { + if let Some(tx) = self.0.take() { + let _ = tx.send(()); + } + } + } + + async fn new_test_engine() -> BatchingEngine { + let kv_backend = Arc::new(MemoryKvBackend::new()); + let table_meta = Arc::new(TableMetadataManager::new(kv_backend.clone())); + table_meta.init().await.unwrap(); + let flow_meta = Arc::new(FlowMetadataManager::new(kv_backend)); + let catalog_manager = new_memory_catalog_manager().unwrap(); + let query_engine = create_test_query_engine(); + let (frontend_client, _handler) = + FrontendClient::from_empty_grpc_handler(QueryOptions::default()); + + BatchingEngine::new( + Arc::new(frontend_client), + query_engine, + flow_meta, + table_meta, + catalog_manager, + BatchingModeOptions::default(), + ) + } + + async fn new_test_task(flow_id: FlowId) -> (BatchingTask, oneshot::Sender<()>) { + let query_engine = create_test_query_engine(); + let ctx = QueryContext::arc(); + let plan = sql_to_df_plan( + ctx.clone(), + query_engine.clone(), + "SELECT number, ts FROM numbers_with_ts", + true, + ) + .await + .unwrap(); + let (tx, rx) = oneshot::channel(); + + let task = BatchingTask::try_new(TaskArgs { + flow_id, + query: "SELECT number, ts FROM numbers_with_ts", + plan, + time_window_expr: None, + expire_after: None, + sink_table_name: [ + "greptime".to_string(), + "public".to_string(), + "sink".to_string(), + ], + source_table_names: vec![[ + "greptime".to_string(), + "public".to_string(), + "numbers_with_ts".to_string(), + ]], + query_ctx: ctx, + catalog_manager: query_engine.engine_state().catalog_manager().clone(), + shutdown_rx: rx, + batch_opts: Arc::new(BatchingModeOptions::default()), + flow_eval_interval: None, + }) + .unwrap(); + + (task, tx) + } + + async fn install_abort_observed_handle(task: &BatchingTask) -> oneshot::Receiver<()> { + let (drop_tx, drop_rx) = oneshot::channel(); + let (entered_tx, entered_rx) = oneshot::channel(); + let handle = tokio::spawn(async move { + let _guard = DropNotify(Some(drop_tx)); + let _ = entered_tx.send(()); + std::future::pending::<()>().await; + }); + task.state.write().unwrap().task_handle = Some(handle); + tokio::time::timeout(Duration::from_secs(1), entered_rx) + .await + .expect("test task handle should start") + .expect("test task handle should report start"); + drop_rx + } + + #[tokio::test] + async fn test_notify_flow_shutdown_sends_signal() { + let (tx, rx) = oneshot::channel(); + + assert!(notify_flow_shutdown(42, Some(tx), "test")); + + rx.await.expect("replaced flow should receive shutdown"); + } + + #[test] + fn test_notify_flow_shutdown_accepts_missing_sender() { + assert!(!notify_flow_shutdown(42, None, "test")); + } + + #[tokio::test] + async fn test_abort_flow_task_aborts_handle() { + let (task, _shutdown_tx) = new_test_task(42).await; + let drop_rx = install_abort_observed_handle(&task).await; + + assert!(abort_flow_task(42, Some(task), "test")); + + tokio::time::timeout(Duration::from_secs(1), drop_rx) + .await + .expect("aborted task should be dropped") + .expect("drop notifier should fire"); + } + + #[tokio::test] + async fn test_remove_flow_inner_aborts_registered_task() { + let engine = new_test_engine().await; + let (task, shutdown_tx) = new_test_task(42).await; + let drop_rx = install_abort_observed_handle(&task).await; + + engine.runtime.write().await.insert(42, task, shutdown_tx); + + engine.remove_flow_inner(42).await.unwrap(); + + tokio::time::timeout(Duration::from_secs(1), drop_rx) + .await + .expect("removed task should be dropped") + .expect("drop notifier should fire"); + assert!(!engine.flow_exist_inner(42).await); + assert!(!engine.runtime.read().await.shutdown_txs.contains_key(&42)); + } + + #[tokio::test] + async fn test_or_replace_flow_runtime_replaces_old_handles_and_keeps_new_task() { + let engine = new_test_engine().await; + let (old_task, old_shutdown_tx) = new_test_task(42).await; + let old_task_identity = old_task.clone(); + let old_drop_rx = install_abort_observed_handle(&old_task).await; + let (new_task, new_shutdown_tx) = new_test_task(42).await; + let new_task_identity = new_task.clone(); + + engine + .runtime + .write() + .await + .insert(42, old_task, old_shutdown_tx); + let (replaced_old_task, replaced_old_shutdown_tx) = + engine + .runtime + .write() + .await + .insert(42, new_task, new_shutdown_tx); + + let replaced_old_task = replaced_old_task.expect("old task should be returned"); + assert!(Arc::ptr_eq( + &replaced_old_task.state, + &old_task_identity.state + )); + assert!(notify_flow_shutdown( + 42, + replaced_old_shutdown_tx, + "replaced" + )); + old_task_identity + .state + .write() + .unwrap() + .shutdown_rx + .try_recv() + .expect("old shutdown receiver should receive signal"); + assert!(abort_flow_task(42, Some(replaced_old_task), "replaced")); + + tokio::time::timeout(Duration::from_secs(1), old_drop_rx) + .await + .expect("replaced task should be dropped") + .expect("drop notifier should fire"); + + let runtime = engine.runtime.read().await; + assert_eq!(1, runtime.tasks.len()); + assert_eq!(1, runtime.shutdown_txs.len()); + let registered_task = runtime.tasks.get(&42).expect("new task should remain"); + assert!(Arc::ptr_eq( + ®istered_task.state, + &new_task_identity.state + )); + assert!(runtime.shutdown_txs.contains_key(&42)); + assert!(matches!( + new_task_identity + .state + .write() + .unwrap() + .shutdown_rx + .try_recv(), + Err(oneshot::error::TryRecvError::Empty) + )); + } + + #[tokio::test] + async fn test_rollback_flow_runtime_if_current_removes_matching_task_only() { + let engine = new_test_engine().await; + let (old_task, _old_shutdown_tx) = new_test_task(42).await; + let (current_task, current_shutdown_tx) = new_test_task(42).await; + let current_task_identity = current_task.clone(); + + engine + .runtime + .write() + .await + .insert(42, current_task, current_shutdown_tx); + + engine.rollback_flow_runtime_if_current(42, &old_task).await; + + let registered_task = engine.runtime.read().await.tasks.get(&42).cloned().unwrap(); + assert!(Arc::ptr_eq( + ®istered_task.state, + ¤t_task_identity.state + )); + assert!(engine.runtime.read().await.shutdown_txs.contains_key(&42)); + + engine + .rollback_flow_runtime_if_current(42, ¤t_task_identity) + .await; + assert!(!engine.flow_exist_inner(42).await); + assert!(!engine.runtime.read().await.shutdown_txs.contains_key(&42)); + } +} diff --git a/src/flow/src/batching_mode/frontend_client.rs b/src/flow/src/batching_mode/frontend_client.rs index 7382f214e5..8fbffc5a38 100644 --- a/src/flow/src/batching_mode/frontend_client.rs +++ b/src/flow/src/batching_mode/frontend_client.rs @@ -20,15 +20,17 @@ use std::sync::{Arc, Mutex, Weak}; use api::v1::greptime_request::Request; use api::v1::query_request::Query; use api::v1::{CreateTableExpr, QueryRequest}; -use client::{Client, Database}; +use client::{Client, Database, OutputWithMetrics}; use common_error::ext::BoxedError; use common_grpc::channel_manager::{ChannelConfig, ChannelManager, load_client_tls_config}; use common_meta::peer::{Peer, PeerDiscovery}; -use common_query::Output; +use common_query::{Output, OutputData}; +use common_recordbatch::adapter::{RecordBatchMetrics, RegionWatermarkEntry}; use common_telemetry::warn; use meta_client::client::MetaClient; use query::datafusion::QUERY_PARALLELISM_HINT; -use query::options::QueryOptions; +use query::metrics::terminal_recordbatch_metrics_from_plan; +use query::options::{FlowQueryExtensions, QueryOptions}; use rand::rng; use rand::seq::SliceRandom; use servers::query_handler::grpc::GrpcQueryHandler; @@ -196,9 +198,6 @@ impl DatabaseWithPeer { } impl FrontendClient { - // TODO: support more fine-grained load balancing strategies for frontend - // selection, such as AZ (availability zone) awareness, to prefer frontends - // in the same zone as the flownode and reduce cross-AZ latency. /// scan for available frontend from metadata pub(crate) async fn scan_for_frontend(&self) -> Result, Error> { let Self::Distributed { meta_client, .. } = self else { @@ -341,6 +340,78 @@ impl FrontendClient { } } + pub async fn query_with_terminal_metrics( + &self, + catalog: &str, + schema: &str, + request: QueryRequest, + extensions: &[(&str, &str)], + ) -> Result { + let flow_extensions = build_flow_extensions(extensions)?; + match self { + FrontendClient::Distributed { + query, batch_opts, .. + } => { + let query_parallelism = query.parallelism.to_string(); + let hints = vec![ + (QUERY_PARALLELISM_HINT, query_parallelism.as_str()), + (READ_PREFERENCE_HINT, batch_opts.read_preference.as_ref()), + ]; + let db = self.get_random_active_frontend(catalog, schema).await?; + db.database + .query_with_terminal_metrics_and_flow_extensions(request, &hints, extensions) + .await + .map_err(BoxedError::new) + .context(ExternalSnafu) + } + FrontendClient::Standalone { + database_client, + query, + } => { + let mut extensions_map = HashMap::from([( + QUERY_PARALLELISM_HINT.to_string(), + query.parallelism.to_string(), + )]); + for (key, value) in extensions { + extensions_map.insert((*key).to_string(), (*value).to_string()); + } + let ctx = QueryContextBuilder::default() + .current_catalog(catalog.to_string()) + .current_schema(schema.to_string()) + .extensions(extensions_map) + .build(); + let ctx = Arc::new(ctx); + let database_client = { + database_client + .handler + .lock() + .map_err(|e| { + UnexpectedSnafu { + reason: format!("Failed to lock database client: {e}"), + } + .build() + })? + .as_ref() + .context(UnexpectedSnafu { + reason: "Standalone's frontend instance is not set", + })? + .upgrade() + .context(UnexpectedSnafu { + reason: "Failed to upgrade database client", + })? + }; + database_client + .do_query(Request::Query(request), ctx.clone()) + .await + .map(|output| { + wrap_standalone_output_with_terminal_metrics(output, &flow_extensions, &ctx) + }) + .map_err(BoxedError::new) + .context(ExternalSnafu) + } + } + } + /// Handle a request to frontend pub(crate) async fn handle( &self, @@ -426,6 +497,64 @@ impl FrontendClient { } } +fn build_flow_extensions(extensions: &[(&str, &str)]) -> Result { + let flow_extensions = HashMap::from_iter( + extensions + .iter() + .map(|(key, value)| ((*key).to_string(), (*value).to_string())), + ); + FlowQueryExtensions::parse_flow_extensions(&flow_extensions) + .map_err(BoxedError::new) + .context(ExternalSnafu) + .map(|extensions| extensions.unwrap_or_default()) +} + +fn wrap_standalone_output_with_terminal_metrics( + output: Output, + flow_extensions: &FlowQueryExtensions, + query_ctx: &QueryContextRef, +) -> OutputWithMetrics { + let should_collect_region_watermark = flow_extensions.should_collect_region_watermark(); + let terminal_metrics = + if should_collect_region_watermark && !matches!(&output.data, OutputData::Stream(_)) { + output + .meta + .plan + .clone() + .and_then(terminal_recordbatch_metrics_from_plan) + .or_else(|| terminal_recordbatch_metrics_from_snapshots(query_ctx)) + } else { + None + }; + let result = OutputWithMetrics::from_output(output); + if let Some(metrics) = terminal_metrics { + result.metrics.update(Some(metrics)); + } + result +} + +fn terminal_recordbatch_metrics_from_snapshots( + query_ctx: &QueryContextRef, +) -> Option { + let mut region_watermarks = query_ctx + .snapshots() + .into_iter() + .map(|(region_id, watermark)| RegionWatermarkEntry { + region_id, + watermark: Some(watermark), + }) + .collect::>(); + if region_watermarks.is_empty() { + return None; + } + + region_watermarks.sort_by_key(|entry| entry.region_id); + Some(RecordBatchMetrics { + region_watermarks, + ..Default::default() + }) +} + /// Describe a peer of frontend #[derive(Debug, Default)] pub(crate) enum PeerDesc { @@ -450,9 +579,17 @@ impl std::fmt::Display for PeerDesc { #[cfg(test)] mod tests { + use std::pin::Pin; + use std::task::{Context, Poll}; use std::time::Duration; - use common_query::Output; + use common_query::{Output, OutputData}; + use common_recordbatch::adapter::RecordBatchMetrics; + use common_recordbatch::{OrderOption, RecordBatch, RecordBatchStream}; + use datatypes::prelude::{ConcreteDataType, VectorRef}; + use datatypes::schema::{ColumnSchema, Schema}; + use datatypes::vectors::Int32Vector; + use futures::StreamExt; use tokio::time::timeout; use super::*; @@ -460,6 +597,58 @@ mod tests { #[derive(Debug)] struct NoopHandler; + struct MockMetricsStream { + schema: datatypes::schema::SchemaRef, + batch: Option, + metrics: RecordBatchMetrics, + terminal_metrics_only: bool, + } + + impl futures::Stream for MockMetricsStream { + type Item = common_recordbatch::error::Result; + + fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + Poll::Ready(self.batch.take().map(Ok)) + } + + fn size_hint(&self) -> (usize, Option) { + ( + usize::from(self.batch.is_some()), + Some(usize::from(self.batch.is_some())), + ) + } + } + + impl RecordBatchStream for MockMetricsStream { + fn name(&self) -> &str { + "MockMetricsStream" + } + + fn schema(&self) -> datatypes::schema::SchemaRef { + self.schema.clone() + } + + fn output_ordering(&self) -> Option<&[OrderOption]> { + None + } + + fn metrics(&self) -> Option { + if self.terminal_metrics_only && self.batch.is_some() { + return None; + } + Some(self.metrics.clone()) + } + } + + #[derive(Debug)] + struct MetricsHandler; + + #[derive(Debug)] + struct ExtensionAwareHandler; + + #[derive(Debug)] + struct SnapshotBindingHandler; + #[async_trait::async_trait] impl GrpcQueryHandlerWithBoxedError for NoopHandler { async fn do_query( @@ -471,6 +660,63 @@ mod tests { } } + #[async_trait::async_trait] + impl GrpcQueryHandlerWithBoxedError for MetricsHandler { + async fn do_query( + &self, + _query: Request, + _ctx: QueryContextRef, + ) -> std::result::Result { + let schema = Arc::new(Schema::new(vec![ColumnSchema::new( + "v", + ConcreteDataType::int32_datatype(), + false, + )])); + let batch = RecordBatch::new( + schema.clone(), + vec![Arc::new(Int32Vector::from_slice([1, 2])) as VectorRef], + ) + .unwrap(); + Ok(Output::new_with_stream(Box::pin(MockMetricsStream { + schema, + batch: Some(batch), + metrics: RecordBatchMetrics { + region_watermarks: vec![common_recordbatch::adapter::RegionWatermarkEntry { + region_id: 42, + watermark: Some(99), + }], + ..Default::default() + }, + terminal_metrics_only: true, + }))) + } + } + + #[async_trait::async_trait] + impl GrpcQueryHandlerWithBoxedError for ExtensionAwareHandler { + async fn do_query( + &self, + _query: Request, + ctx: QueryContextRef, + ) -> std::result::Result { + assert_eq!(ctx.extension("flow.return_region_seq"), Some("true")); + Ok(Output::new_with_affected_rows(1)) + } + } + + #[async_trait::async_trait] + impl GrpcQueryHandlerWithBoxedError for SnapshotBindingHandler { + async fn do_query( + &self, + _query: Request, + ctx: QueryContextRef, + ) -> std::result::Result { + assert_eq!(ctx.extension("flow.return_region_seq"), Some("true")); + ctx.set_snapshot(42, 99); + Ok(Output::new_with_affected_rows(1)) + } + } + #[tokio::test] async fn wait_initialized() { let (client, handler_mut) = @@ -516,4 +762,106 @@ mod tests { .is_ok() ); } + + #[tokio::test] + async fn test_query_with_terminal_metrics_tracks_watermark_in_standalone_mode() { + let handler: Arc = Arc::new(MetricsHandler); + let client = + FrontendClient::from_grpc_handler(Arc::downgrade(&handler), QueryOptions::default()); + + let result = client + .query_with_terminal_metrics( + "greptime", + "public", + QueryRequest { + query: Some(Query::Sql("select 1".to_string())), + }, + &[], + ) + .await + .unwrap(); + + let terminal_metrics = result.metrics.clone(); + assert!(!result.metrics.is_ready()); + assert!(terminal_metrics.get().is_none()); + + let OutputData::Stream(mut stream) = result.output.data else { + panic!("expected stream output"); + }; + while stream.next().await.is_some() {} + + assert!(terminal_metrics.is_ready()); + assert_eq!( + terminal_metrics.region_watermark_map(), + Some(HashMap::from([(42_u64, 99_u64)])) + ); + } + + #[tokio::test] + async fn test_query_with_terminal_metrics_forwards_flow_extensions_in_standalone_mode() { + let handler: Arc = Arc::new(ExtensionAwareHandler); + let client = + FrontendClient::from_grpc_handler(Arc::downgrade(&handler), QueryOptions::default()); + + let result = client + .query_with_terminal_metrics( + "greptime", + "public", + QueryRequest { + query: Some(Query::Sql("insert into t select 1".to_string())), + }, + &[("flow.return_region_seq", "true")], + ) + .await + .unwrap(); + + assert!(result.metrics.is_ready()); + assert!(result.region_watermark_map().is_none()); + } + + #[tokio::test] + async fn test_query_with_terminal_metrics_uses_standalone_snapshot_bounds() { + let handler: Arc = Arc::new(SnapshotBindingHandler); + let client = + FrontendClient::from_grpc_handler(Arc::downgrade(&handler), QueryOptions::default()); + + let result = client + .query_with_terminal_metrics( + "greptime", + "public", + QueryRequest { + query: Some(Query::Sql("insert into t select * from src".to_string())), + }, + &[("flow.return_region_seq", "true")], + ) + .await + .unwrap(); + + assert!(result.metrics.is_ready()); + assert_eq!( + result.region_watermark_map(), + Some(HashMap::from([(42, 99)])) + ); + } + + #[tokio::test] + async fn test_query_with_terminal_metrics_rejects_invalid_flow_extensions() { + let handler: Arc = Arc::new(NoopHandler); + let client = + FrontendClient::from_grpc_handler(Arc::downgrade(&handler), QueryOptions::default()); + + let err = client + .query_with_terminal_metrics( + "greptime", + "public", + QueryRequest { + query: Some(Query::Sql("select 1".to_string())), + }, + &[("flow.return_region_seq", "not-a-bool")], + ) + .await + .unwrap_err(); + + assert!(format!("{err:?}").contains("Invalid value for flow.return_region_seq")); + } } diff --git a/src/flow/src/batching_mode/state.rs b/src/flow/src/batching_mode/state.rs index d90023ae46..c470a2f2c1 100644 --- a/src/flow/src/batching_mode/state.rs +++ b/src/flow/src/batching_mode/state.rs @@ -13,6 +13,7 @@ // limitations under the License. //! Batching mode task state, which changes frequently +//! use std::collections::BTreeMap; use std::time::Duration; @@ -199,12 +200,42 @@ impl DirtyTimeWindows { } pub fn add_window(&mut self, start: Timestamp, end: Option) { - self.windows.insert(start, end); + self.add_or_merge_window(start, end); } pub fn add_windows(&mut self, time_ranges: Vec<(Timestamp, Timestamp)>) { for (start, end) in time_ranges { - self.windows.insert(start, Some(end)); + self.add_or_merge_window(start, Some(end)); + } + } + + /// Add all dirty markers from another dirty-window set. + pub fn add_dirty_windows(&mut self, dirty_windows: &DirtyTimeWindows) { + for (start, end) in &dirty_windows.windows { + self.add_or_merge_window(*start, *end); + } + } + + fn add_or_merge_window(&mut self, start: Timestamp, end: Option) { + self.windows + .entry(start) + .and_modify(|current_end| { + *current_end = Self::union_window_end(*current_end, end); + }) + .or_insert(end); + } + + fn union_window_end( + current_end: Option, + incoming_end: Option, + ) -> Option { + match (current_end, incoming_end) { + (Some(current), Some(incoming)) => Some(current.max(incoming)), + // `None` is a dirty marker without a known upper bound. When one + // side has a concrete end, keep it so merging a restored snapshot + // never shrinks an already-known dirty range with the same start. + (Some(end), None) | (None, Some(end)) => Some(end), + (None, None) => None, } } @@ -216,7 +247,7 @@ impl DirtyTimeWindows { /// Set windows to be dirty, only useful for full aggr without time window /// to mark some new data is inserted pub fn set_dirty(&mut self) { - self.windows.insert(Timestamp::new_second(0), None); + self.add_or_merge_window(Timestamp::new_second(0), None); } /// Number of dirty windows. diff --git a/src/flow/src/batching_mode/table_creator.rs b/src/flow/src/batching_mode/table_creator.rs new file mode 100644 index 0000000000..05da055a40 --- /dev/null +++ b/src/flow/src/batching_mode/table_creator.rs @@ -0,0 +1,381 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use api::v1::CreateTableExpr; +use datafusion_common::tree_node::TreeNode; +use datafusion_expr::LogicalPlan; +use datatypes::prelude::ConcreteDataType; +use datatypes::schema::ColumnSchema; +use operator::expr_helper::column_schemas_to_defs; +use snafu::ResultExt; + +use crate::Error; +use crate::adapter::{AUTO_CREATED_PLACEHOLDER_TS_COL, AUTO_CREATED_UPDATE_AT_TS_COL}; +use crate::batching_mode::utils::FindGroupByFinalName; +use crate::error::{ConvertColumnSchemaSnafu, DatafusionSnafu}; + +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum QueryType { + /// query is a tql query + Tql, + /// query is a sql query + Sql, +} + +// auto created table have a auto added column `update_at`, and optional have a `AUTO_CREATED_PLACEHOLDER_TS_COL` column for time index placeholder if no timestamp column is specified +// TODO(discord9): for now no default value is set for auto added column for compatibility reason with streaming mode, but this might change in favor of simpler code? +pub(super) fn create_table_with_expr( + plan: &LogicalPlan, + sink_table_name: &[String; 3], + query_type: &QueryType, +) -> Result { + let table_def = match query_type { + &QueryType::Sql => { + if let Some(def) = build_pk_from_aggr(plan)? { + def + } else { + build_by_sql_schema(plan)? + } + } + QueryType::Tql => { + // first try build from aggr, then from tql schema because tql query might not have aggr node + if let Some(table_def) = build_pk_from_aggr(plan)? { + table_def + } else { + build_by_tql_schema(plan)? + } + } + }; + let first_time_stamp = table_def.ts_col; + let primary_keys = table_def.pks; + + let mut column_schemas = Vec::new(); + for field in plan.schema().fields() { + let name = field.name(); + let ty = ConcreteDataType::from_arrow_type(field.data_type()); + let col_schema = if first_time_stamp == Some(name.clone()) { + ColumnSchema::new(name, ty, false).with_time_index(true) + } else { + ColumnSchema::new(name, ty, true) + }; + + match query_type { + QueryType::Sql => { + column_schemas.push(col_schema); + } + QueryType::Tql => { + // if is val column, need to rename as val DOUBLE NULL + // if is tag column, need to cast type as STRING NULL + let is_tag_column = primary_keys.contains(name); + let is_val_column = !is_tag_column && first_time_stamp.as_ref() != Some(name); + if is_val_column { + let col_schema = + ColumnSchema::new(name, ConcreteDataType::float64_datatype(), true); + column_schemas.push(col_schema); + } else if is_tag_column { + let col_schema = + ColumnSchema::new(name, ConcreteDataType::string_datatype(), true); + column_schemas.push(col_schema); + } else { + // time index column + column_schemas.push(col_schema); + } + } + } + } + + if query_type == &QueryType::Sql { + let update_at_schema = ColumnSchema::new( + AUTO_CREATED_UPDATE_AT_TS_COL, + ConcreteDataType::timestamp_millisecond_datatype(), + true, + ); + column_schemas.push(update_at_schema); + } + + let time_index = if let Some(time_index) = first_time_stamp { + time_index + } else { + column_schemas.push( + ColumnSchema::new( + AUTO_CREATED_PLACEHOLDER_TS_COL, + ConcreteDataType::timestamp_millisecond_datatype(), + false, + ) + .with_time_index(true), + ); + AUTO_CREATED_PLACEHOLDER_TS_COL.to_string() + }; + + let column_defs = + column_schemas_to_defs(column_schemas, &primary_keys).context(ConvertColumnSchemaSnafu)?; + Ok(CreateTableExpr { + catalog_name: sink_table_name[0].clone(), + schema_name: sink_table_name[1].clone(), + table_name: sink_table_name[2].clone(), + desc: "Auto created table by flow engine".to_string(), + column_defs, + time_index, + primary_keys, + create_if_not_exists: true, + table_options: Default::default(), + table_id: None, + engine: "mito".to_string(), + }) +} + +/// simply build by schema, return first timestamp column and no primary key +fn build_by_sql_schema(plan: &LogicalPlan) -> Result { + let first_time_stamp = plan.schema().fields().iter().find_map(|f| { + if ConcreteDataType::from_arrow_type(f.data_type()).is_timestamp() { + Some(f.name().clone()) + } else { + None + } + }); + Ok(TableDef { + ts_col: first_time_stamp, + pks: vec![], + }) +} + +/// Return first timestamp column found in output schema and all string columns +fn build_by_tql_schema(plan: &LogicalPlan) -> Result { + let first_time_stamp = plan.schema().fields().iter().find_map(|f| { + if ConcreteDataType::from_arrow_type(f.data_type()).is_timestamp() { + Some(f.name().clone()) + } else { + None + } + }); + let string_columns = plan + .schema() + .fields() + .iter() + .filter_map(|f| { + if ConcreteDataType::from_arrow_type(f.data_type()).is_string() { + Some(f.name().clone()) + } else { + None + } + }) + .collect::>(); + + Ok(TableDef { + ts_col: first_time_stamp, + pks: string_columns, + }) +} + +struct TableDef { + ts_col: Option, + pks: Vec, +} + +/// Return first timestamp column which is in group by clause and other columns which are also in group by clause +/// +/// # Returns +/// +/// * `Option` - first timestamp column which is in group by clause +/// * `Vec` - other columns which are also in group by clause +/// +/// if no aggregation found, return None +fn build_pk_from_aggr(plan: &LogicalPlan) -> Result, Error> { + let fields = plan.schema().fields(); + let mut pk_names = FindGroupByFinalName::default(); + + plan.visit(&mut pk_names) + .with_context(|_| DatafusionSnafu { + context: format!("Can't find aggr expr in plan {plan:?}"), + })?; + + // if no group by clause, return empty with first timestamp column found in output schema + let Some(pk_final_names) = pk_names.get_group_expr_names() else { + return Ok(None); + }; + if pk_final_names.is_empty() { + let first_ts_col = fields + .iter() + .find(|f| ConcreteDataType::from_arrow_type(f.data_type()).is_timestamp()) + .map(|f| f.name().clone()); + return Ok(Some(TableDef { + ts_col: first_ts_col, + pks: vec![], + })); + } + + let all_pk_cols: Vec<_> = fields + .iter() + .filter(|f| pk_final_names.contains(f.name())) + .map(|f| f.name().clone()) + .collect(); + // Auto-created tables use the first timestamp column in the group-by keys + // as the time index. It is possible that timestamp columns appear only as + // aggregate outputs (for example `max(ts)`) and are not group-by keys; in + // that case `first_time_stamp` stays `None` and the caller falls back to a + // placeholder time index column. + let first_time_stamp = fields + .iter() + .find(|f| { + all_pk_cols.contains(&f.name().clone()) + && ConcreteDataType::from_arrow_type(f.data_type()).is_timestamp() + }) + .map(|f| f.name().clone()); + + let all_pk_cols: Vec<_> = all_pk_cols + .into_iter() + .filter(|col| first_time_stamp.as_ref() != Some(col)) + .collect(); + + Ok(Some(TableDef { + ts_col: first_time_stamp, + pks: all_pk_cols, + })) +} + +#[cfg(test)] +mod test { + use api::v1::column_def::try_as_column_schema; + use datatypes::prelude::ConcreteDataType; + use datatypes::schema::ColumnSchema; + use pretty_assertions::assert_eq; + use session::context::QueryContext; + + use super::*; + use crate::adapter::{AUTO_CREATED_PLACEHOLDER_TS_COL, AUTO_CREATED_UPDATE_AT_TS_COL}; + use crate::batching_mode::utils::sql_to_df_plan; + use crate::test_utils::create_test_query_engine; + + #[tokio::test] + async fn test_gen_create_table_sql() { + let query_engine = create_test_query_engine(); + let ctx = QueryContext::arc(); + struct TestCase { + sql: String, + sink_table_name: String, + column_schemas: Vec, + primary_keys: Vec, + time_index: String, + } + + let update_at_schema = ColumnSchema::new( + AUTO_CREATED_UPDATE_AT_TS_COL, + ConcreteDataType::timestamp_millisecond_datatype(), + true, + ); + + let ts_placeholder_schema = ColumnSchema::new( + AUTO_CREATED_PLACEHOLDER_TS_COL, + ConcreteDataType::timestamp_millisecond_datatype(), + false, + ) + .with_time_index(true); + + let testcases = vec![ + TestCase { + sql: "SELECT number, ts FROM numbers_with_ts".to_string(), + sink_table_name: "new_table".to_string(), + column_schemas: vec![ + ColumnSchema::new("number", ConcreteDataType::uint32_datatype(), true), + ColumnSchema::new( + "ts", + ConcreteDataType::timestamp_millisecond_datatype(), + false, + ) + .with_time_index(true), + update_at_schema.clone(), + ], + primary_keys: vec![], + time_index: "ts".to_string(), + }, + TestCase { + sql: "SELECT number, max(ts) FROM numbers_with_ts GROUP BY number".to_string(), + sink_table_name: "new_table".to_string(), + column_schemas: vec![ + ColumnSchema::new("number", ConcreteDataType::uint32_datatype(), true), + ColumnSchema::new( + "max(numbers_with_ts.ts)", + ConcreteDataType::timestamp_millisecond_datatype(), + true, + ), + update_at_schema.clone(), + ts_placeholder_schema.clone(), + ], + primary_keys: vec!["number".to_string()], + time_index: AUTO_CREATED_PLACEHOLDER_TS_COL.to_string(), + }, + TestCase { + sql: "SELECT max(number), ts FROM numbers_with_ts GROUP BY ts".to_string(), + sink_table_name: "new_table".to_string(), + column_schemas: vec![ + ColumnSchema::new( + "max(numbers_with_ts.number)", + ConcreteDataType::uint32_datatype(), + true, + ), + ColumnSchema::new( + "ts", + ConcreteDataType::timestamp_millisecond_datatype(), + false, + ) + .with_time_index(true), + update_at_schema.clone(), + ], + primary_keys: vec![], + time_index: "ts".to_string(), + }, + TestCase { + sql: "SELECT number, ts FROM numbers_with_ts GROUP BY ts, number".to_string(), + sink_table_name: "new_table".to_string(), + column_schemas: vec![ + ColumnSchema::new("number", ConcreteDataType::uint32_datatype(), true), + ColumnSchema::new( + "ts", + ConcreteDataType::timestamp_millisecond_datatype(), + false, + ) + .with_time_index(true), + update_at_schema.clone(), + ], + primary_keys: vec!["number".to_string()], + time_index: "ts".to_string(), + }, + ]; + + for tc in testcases { + let plan = sql_to_df_plan(ctx.clone(), query_engine.clone(), &tc.sql, true) + .await + .unwrap(); + let expr = create_table_with_expr( + &plan, + &[ + "greptime".to_string(), + "public".to_string(), + tc.sink_table_name.clone(), + ], + &QueryType::Sql, + ) + .unwrap(); + // TODO(discord9): assert expr + let column_schemas = expr + .column_defs + .iter() + .map(|c| try_as_column_schema(c).unwrap()) + .collect::>(); + assert_eq!(tc.column_schemas, column_schemas, "{:?}", tc.sql); + assert_eq!(tc.primary_keys, expr.primary_keys, "{:?}", tc.sql); + assert_eq!(tc.time_index, expr.time_index, "{:?}", tc.sql); + } + } +} diff --git a/src/flow/src/batching_mode/task.rs b/src/flow/src/batching_mode/task.rs index 84c96cc7cd..51a417c0d1 100644 --- a/src/flow/src/batching_mode/task.rs +++ b/src/flow/src/batching_mode/task.rs @@ -28,13 +28,11 @@ use datafusion::sql::unparser::expr_to_sql; use datafusion_common::DFSchemaRef; use datafusion_common::tree_node::{Transformed, TreeNode}; use datafusion_expr::{DmlStatement, LogicalPlan, WriteOp}; -use datatypes::prelude::ConcreteDataType; -use datatypes::schema::{ColumnSchema, Schema}; -use operator::expr_helper::column_schemas_to_defs; +use datatypes::schema::Schema; use query::QueryEngineRef; use query::query_engine::DefaultSerializer; use session::context::QueryContextRef; -use snafu::{OptionExt, ResultExt, ensure}; +use snafu::{OptionExt, ResultExt}; use sql::parsers::utils::is_tql; use store_api::mito_engine_options::MERGE_MODE_KEY; use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan}; @@ -43,19 +41,19 @@ use tokio::sync::oneshot; use tokio::sync::oneshot::error::TryRecvError; use tokio::time::Instant; -use crate::adapter::{AUTO_CREATED_PLACEHOLDER_TS_COL, AUTO_CREATED_UPDATE_AT_TS_COL}; use crate::batching_mode::BatchingModeOptions; use crate::batching_mode::frontend_client::FrontendClient; -use crate::batching_mode::state::{FilterExprInfo, TaskState}; +use crate::batching_mode::state::{DirtyTimeWindows, FilterExprInfo, TaskState}; +use crate::batching_mode::table_creator::{QueryType, create_table_with_expr}; use crate::batching_mode::time_window::TimeWindowExpr; use crate::batching_mode::utils::{ - AddFilterRewriter, ColumnMatcherRewriter, FindGroupByFinalName, gen_plan_with_matching_schema, + AddFilterRewriter, ColumnMatcherRewriter, gen_plan_with_matching_schema, get_table_info_df_schema, sql_to_df_plan, }; use crate::df_optimizer::apply_df_optimizer; use crate::error::{ - ConvertColumnSchemaSnafu, DatafusionSnafu, ExternalSnafu, InvalidQuerySnafu, - SubstraitEncodeLogicalPlanSnafu, UnexpectedSnafu, + DatafusionSnafu, ExternalSnafu, InvalidQuerySnafu, SubstraitEncodeLogicalPlanSnafu, + UnexpectedSnafu, }; use crate::metrics::{ METRIC_FLOW_BATCHING_ENGINE_ERROR_CNT, METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME, @@ -100,14 +98,6 @@ fn is_merge_mode_last_non_null(options: &HashMap) -> bool { .unwrap_or(false) } -#[derive(Debug, Clone, PartialEq, Eq)] -pub enum QueryType { - /// query is a tql query - Tql, - /// query is a sql query - Sql, -} - #[derive(Clone)] pub struct BatchingTask { pub config: Arc, @@ -132,7 +122,20 @@ pub struct TaskArgs<'a> { pub struct PlanInfo { pub plan: LogicalPlan, - pub filter: Option, + pub dirty_restore: DirtyRestore, +} + +pub enum DirtyRestore { + /// The query was scoped to dirty time ranges; restore those ranges if the + /// run fails. + Scoped(FilterExprInfo), + /// The query could not be scoped to dirty time ranges, so the dirty-window + /// state is only a dirty signal. Restore the consumed signal if the full + /// run fails. + /// + /// TODO(discord9): Full-query runs only need a dirty bool flag. Refactor + /// the unscoped path to stop reusing `DirtyTimeWindows` for this signal. + Unscoped(DirtyTimeWindows), } impl BatchingTask { @@ -210,7 +213,7 @@ impl BatchingTask { &self, engine: &QueryEngineRef, frontend_client: &Arc, - ) -> Result, Error> { + ) -> Result, Error> { if !self.is_table_exist(&self.config.sink_table_name).await? { let create_table = self.gen_create_table_expr(engine.clone()).await?; info!( @@ -241,11 +244,19 @@ impl BatchingTask { engine: &QueryEngineRef, frontend_client: &Arc, max_window_cnt: Option, - ) -> Result, Error> { + ) -> Result, Error> { if let Some(new_query) = self.gen_insert_plan(engine, max_window_cnt).await? { debug!("Generate new query: {}", new_query.plan); - self.execute_logical_plan(frontend_client, &new_query.plan) + match self + .execute_logical_plan(frontend_client, &new_query.plan) .await + { + Ok(result) => Ok(result), + Err(err) => { + self.handle_executed_query_failure(Some(&new_query)); + Err(err) + } + } } else { debug!("Generate no query"); Ok(None) @@ -278,57 +289,66 @@ impl BatchingTask { ) .await?; - let insert_into_info = if let Some(new_query) = new_query { - // first check if all columns in input query exists in sink table - // since insert into ref to names in record batch generate by given query - let table_columns = df_schema - .columns() - .into_iter() - .map(|c| c.name) - .collect::>(); - for column in new_query.plan.schema().columns() { - ensure!( - table_columns.contains(column.name()), - InvalidQuerySnafu { - reason: format!( - "Column {} not found in sink table with columns {:?}", - column, table_columns - ), - } - ); - } - - let table_provider = Arc::new(DfTableProviderAdapter::new(table)); - let table_source = Arc::new(DefaultTableSource::new(table_provider)); - - // update_at& time index placeholder (if exists) should have default value - let plan = LogicalPlan::Dml(DmlStatement::new( - datafusion_common::TableReference::Full { - catalog: self.config.sink_table_name[0].clone().into(), - schema: self.config.sink_table_name[1].clone().into(), - table: self.config.sink_table_name[2].clone().into(), - }, - table_source, - WriteOp::Insert(datafusion_expr::dml::InsertOp::Append), - Arc::new(new_query.plan), - )); - PlanInfo { - plan, - filter: new_query.filter, - } - } else { + let Some(new_query) = new_query else { return Ok(None); }; - let insert_into = insert_into_info - .plan - .recompute_schema() - .context(DatafusionSnafu { - context: "Failed to recompute schema", - })?; + + // first check if all columns in input query exists in sink table + // since insert into ref to names in record batch generate by given query + let table_columns = df_schema + .columns() + .into_iter() + .map(|c| c.name) + .collect::>(); + for column in new_query.plan.schema().columns() { + if !table_columns.contains(column.name()) { + self.restore_dirty_windows_after_failure(&new_query); + return InvalidQuerySnafu { + reason: format!( + "Column {} not found in sink table with columns {:?}", + column, table_columns + ), + } + .fail(); + } + } + + let table_provider = Arc::new(DfTableProviderAdapter::new(table)); + let table_source = Arc::new(DefaultTableSource::new(table_provider)); + + // update_at& time index placeholder (if exists) should have default value + let plan = LogicalPlan::Dml(DmlStatement::new( + datafusion_common::TableReference::Full { + catalog: self.config.sink_table_name[0].clone().into(), + schema: self.config.sink_table_name[1].clone().into(), + table: self.config.sink_table_name[2].clone().into(), + }, + table_source, + WriteOp::Insert(datafusion_expr::dml::InsertOp::Append), + Arc::new(new_query.plan.clone()), + )); + let insert_into_info = PlanInfo { + plan, + dirty_restore: new_query.dirty_restore, + }; + let insert_into = + match insert_into_info + .plan + .clone() + .recompute_schema() + .context(DatafusionSnafu { + context: "Failed to recompute schema", + }) { + Ok(insert_into) => insert_into, + Err(err) => { + self.restore_dirty_windows_after_failure(&insert_into_info); + return Err(err); + } + }; Ok(Some(PlanInfo { plan: insert_into, - filter: insert_into_info.filter, + dirty_restore: insert_into_info.dirty_restore, })) } @@ -349,7 +369,7 @@ impl BatchingTask { &self, frontend_client: &Arc, plan: &LogicalPlan, - ) -> Result, Error> { + ) -> Result, Error> { let instant = Instant::now(); let flow_id = self.config.flow_id; @@ -385,7 +405,6 @@ impl BatchingTask { .with_label_values(&[flow_id.to_string().as_str()]) .start_timer(); - // hack and special handling the insert logical plan let req = if let Some((insert_to, insert_plan)) = breakup_insert_plan(&plan, catalog, schema) { @@ -451,8 +470,46 @@ impl BatchingTask { .after_query_exec(elapsed, res.is_ok()); let res = res?; + Ok(Some((res as usize, elapsed))) + } - Ok(Some((res, elapsed))) + /// Restore dirty windows consumed by a failed query so they are retried on + /// the next execution. + /// + fn restore_dirty_windows_after_failure(&self, query: &PlanInfo) { + match &query.dirty_restore { + DirtyRestore::Scoped(filter) => self.restore_scoped_dirty_windows(filter), + DirtyRestore::Unscoped(dirty_windows) => self + .state + .write() + .unwrap() + .dirty_time_windows + .add_dirty_windows(dirty_windows), + } + } + + fn restore_scoped_dirty_windows(&self, filter: &FilterExprInfo) { + self.state + .write() + .unwrap() + .dirty_time_windows + .add_windows(filter.time_ranges.clone()); + } + + fn restore_scoped_dirty_windows_on_err( + &self, + filter: &FilterExprInfo, + result: Result, + ) -> Result { + result.inspect_err(|_| { + self.restore_scoped_dirty_windows(filter); + }) + } + + fn handle_executed_query_failure(&self, query: Option<&PlanInfo>) { + if let Some(query) = query { + self.restore_dirty_windows_after_failure(query); + } } /// start executing query in a loop, break when receive shutdown signal @@ -558,16 +615,13 @@ impl BatchingTask { } // TODO(discord9): this error should have better place to go, but for now just print error, also more context is needed Err(err) => { + self.handle_executed_query_failure(new_query.as_ref()); METRIC_FLOW_BATCHING_ENGINE_ERROR_CNT .with_label_values(&[&flow_id_str]) .inc(); match new_query { Some(query) => { common_telemetry::error!(err; "Failed to execute query for flow={} with query: {}", self.config.flow_id, query.plan); - // Re-add dirty windows back since query failed - self.state.write().unwrap().dirty_time_windows.add_windows( - query.filter.map(|f| f.time_ranges).unwrap_or_default(), - ); // TODO(discord9): add some backoff here? half the query time window or what // backoff meaning use smaller `max_window_cnt` for next query @@ -641,8 +695,13 @@ impl BatchingTask { self.config.flow_id ); // clean dirty time window too, this could be from create flow's check_execute - let is_dirty = !self.state.read().unwrap().dirty_time_windows.is_empty(); - self.state.write().unwrap().dirty_time_windows.clean(); + let (is_dirty, dirty_windows_to_restore) = { + let mut state = self.state.write().unwrap(); + let dirty_windows_to_restore = state.dirty_time_windows.clone(); + let is_dirty = !dirty_windows_to_restore.is_empty(); + state.dirty_time_windows.clean(); + (is_dirty, dirty_windows_to_restore) + }; if !is_dirty { // no dirty data, hence no need to update @@ -650,7 +709,7 @@ impl BatchingTask { return Ok(None); } - let plan = gen_plan_with_matching_schema( + let plan = match gen_plan_with_matching_schema( &self.config.query, query_ctx, engine, @@ -658,15 +717,35 @@ impl BatchingTask { primary_key_indices, allow_partial, ) - .await?; + .await + { + Ok(plan) => plan, + Err(err) => { + self.state + .write() + .unwrap() + .dirty_time_windows + .add_dirty_windows(&dirty_windows_to_restore); + return Err(err); + } + }; - return Ok(Some(PlanInfo { plan, filter: None })); + return Ok(Some(PlanInfo { + plan, + dirty_restore: DirtyRestore::Unscoped(dirty_windows_to_restore), + })); } _ => { - // clean for tql have no use for time window - self.state.write().unwrap().dirty_time_windows.clean(); + // Clean dirty windows for full-query/non-scoped paths, + // such as TQL, that cannot use a time-window filter. + let dirty_windows_to_restore = { + let mut state = self.state.write().unwrap(); + let dirty_windows_to_restore = state.dirty_time_windows.clone(); + state.dirty_time_windows.clean(); + dirty_windows_to_restore + }; - let plan = gen_plan_with_matching_schema( + let plan = match gen_plan_with_matching_schema( &self.config.query, query_ctx, engine, @@ -674,9 +753,23 @@ impl BatchingTask { primary_key_indices, allow_partial, ) - .await?; + .await + { + Ok(plan) => plan, + Err(err) => { + self.state + .write() + .unwrap() + .dirty_time_windows + .add_dirty_windows(&dirty_windows_to_restore); + return Err(err); + } + }; - return Ok(Some(PlanInfo { plan, filter: None })); + return Ok(Some(PlanInfo { + plan, + dirty_restore: DirtyRestore::Unscoped(dirty_windows_to_restore), + })); } }; @@ -721,25 +814,21 @@ impl BatchingTask { Some(self), )?; - debug!( - "Flow id={:?}, Generated filter expr: {:?}", - self.config.flow_id, - expr.as_ref() - .map( - |expr| expr_to_sql(&expr.expr).with_context(|_| DatafusionSnafu { - context: format!("Failed to generate filter expr from {expr:?}"), - }) - ) - .transpose()? - .map(|s| s.to_string()) - ); - let Some(expr) = expr else { // no new data, hence no need to update debug!("Flow id={:?}, no new data, not update", self.config.flow_id); return Ok(None); }; + let filter_sql = expr_to_sql(&expr.expr) + .map(|sql| sql.to_string()) + .unwrap_or_else(|err| format!("")); + + debug!( + "Flow id={:?}, Generated filter expr: {:?}", + self.config.flow_id, filter_sql + ); + let mut add_filter = AddFilterRewriter::new(expr.expr.clone()); let mut add_auto_column = ColumnMatcherRewriter::new( sink_table_schema.clone(), @@ -747,363 +836,34 @@ impl BatchingTask { allow_partial, ); - let plan = - sql_to_df_plan(query_ctx.clone(), engine.clone(), &self.config.query, false).await?; - let rewrite = plan - .clone() - .rewrite(&mut add_filter) - .and_then(|p| p.data.rewrite(&mut add_auto_column)) - .with_context(|_| DatafusionSnafu { - context: format!("Failed to rewrite plan:\n {}\n", plan), - })? - .data; + let plan = self.restore_scoped_dirty_windows_on_err( + &expr, + sql_to_df_plan(query_ctx.clone(), engine.clone(), &self.config.query, false).await, + )?; + let rewrite = self.restore_scoped_dirty_windows_on_err( + &expr, + plan.clone() + .rewrite(&mut add_filter) + .and_then(|p| p.data.rewrite(&mut add_auto_column)) + .with_context(|_| DatafusionSnafu { + context: format!("Failed to rewrite plan:\n {}\n", plan), + }) + .map(|rewrite| rewrite.data), + )?; // only apply optimize after complex rewrite is done - let new_plan = apply_df_optimizer(rewrite, &query_ctx).await?; + let new_plan = self.restore_scoped_dirty_windows_on_err( + &expr, + apply_df_optimizer(rewrite, &query_ctx).await, + )?; let info = PlanInfo { plan: new_plan.clone(), - filter: Some(expr), + dirty_restore: DirtyRestore::Scoped(expr), }; Ok(Some(info)) } } -// auto created table have a auto added column `update_at`, and optional have a `AUTO_CREATED_PLACEHOLDER_TS_COL` column for time index placeholder if no timestamp column is specified -// TODO(discord9): for now no default value is set for auto added column for compatibility reason with streaming mode, but this might change in favor of simpler code? -fn create_table_with_expr( - plan: &LogicalPlan, - sink_table_name: &[String; 3], - query_type: &QueryType, -) -> Result { - let table_def = match query_type { - &QueryType::Sql => { - if let Some(def) = build_pk_from_aggr(plan)? { - def - } else { - build_by_sql_schema(plan)? - } - } - QueryType::Tql => { - // first try build from aggr, then from tql schema because tql query might not have aggr node - if let Some(table_def) = build_pk_from_aggr(plan)? { - table_def - } else { - build_by_tql_schema(plan)? - } - } - }; - let first_time_stamp = table_def.ts_col; - let primary_keys = table_def.pks; - - let mut column_schemas = Vec::new(); - for field in plan.schema().fields() { - let name = field.name(); - let ty = ConcreteDataType::from_arrow_type(field.data_type()); - let col_schema = if first_time_stamp == Some(name.clone()) { - ColumnSchema::new(name, ty, false).with_time_index(true) - } else { - ColumnSchema::new(name, ty, true) - }; - - match query_type { - QueryType::Sql => { - column_schemas.push(col_schema); - } - QueryType::Tql => { - // if is val column, need to rename as val DOUBLE NULL - // if is tag column, need to cast type as STRING NULL - let is_tag_column = primary_keys.contains(name); - let is_val_column = !is_tag_column && first_time_stamp.as_ref() != Some(name); - if is_val_column { - let col_schema = - ColumnSchema::new(name, ConcreteDataType::float64_datatype(), true); - column_schemas.push(col_schema); - } else if is_tag_column { - let col_schema = - ColumnSchema::new(name, ConcreteDataType::string_datatype(), true); - column_schemas.push(col_schema); - } else { - // time index column - column_schemas.push(col_schema); - } - } - } - } - - if query_type == &QueryType::Sql { - let update_at_schema = ColumnSchema::new( - AUTO_CREATED_UPDATE_AT_TS_COL, - ConcreteDataType::timestamp_millisecond_datatype(), - true, - ); - column_schemas.push(update_at_schema); - } - - let time_index = if let Some(time_index) = first_time_stamp { - time_index - } else { - column_schemas.push( - ColumnSchema::new( - AUTO_CREATED_PLACEHOLDER_TS_COL, - ConcreteDataType::timestamp_millisecond_datatype(), - false, - ) - .with_time_index(true), - ); - AUTO_CREATED_PLACEHOLDER_TS_COL.to_string() - }; - - let column_defs = - column_schemas_to_defs(column_schemas, &primary_keys).context(ConvertColumnSchemaSnafu)?; - Ok(CreateTableExpr { - catalog_name: sink_table_name[0].clone(), - schema_name: sink_table_name[1].clone(), - table_name: sink_table_name[2].clone(), - desc: "Auto created table by flow engine".to_string(), - column_defs, - time_index, - primary_keys, - create_if_not_exists: true, - table_options: Default::default(), - table_id: None, - engine: "mito".to_string(), - }) -} - -/// simply build by schema, return first timestamp column and no primary key -fn build_by_sql_schema(plan: &LogicalPlan) -> Result { - let first_time_stamp = plan.schema().fields().iter().find_map(|f| { - if ConcreteDataType::from_arrow_type(f.data_type()).is_timestamp() { - Some(f.name().clone()) - } else { - None - } - }); - Ok(TableDef { - ts_col: first_time_stamp, - pks: vec![], - }) -} - -/// Return first timestamp column found in output schema and all string columns -fn build_by_tql_schema(plan: &LogicalPlan) -> Result { - let first_time_stamp = plan.schema().fields().iter().find_map(|f| { - if ConcreteDataType::from_arrow_type(f.data_type()).is_timestamp() { - Some(f.name().clone()) - } else { - None - } - }); - let string_columns = plan - .schema() - .fields() - .iter() - .filter_map(|f| { - if ConcreteDataType::from_arrow_type(f.data_type()).is_string() { - Some(f.name().clone()) - } else { - None - } - }) - .collect::>(); - - Ok(TableDef { - ts_col: first_time_stamp, - pks: string_columns, - }) -} - -struct TableDef { - ts_col: Option, - pks: Vec, -} - -/// Return first timestamp column which is in group by clause and other columns which are also in group by clause -/// -/// # Returns -/// -/// * `Option` - first timestamp column which is in group by clause -/// * `Vec` - other columns which are also in group by clause -/// -/// if no aggregation found, return None -fn build_pk_from_aggr(plan: &LogicalPlan) -> Result, Error> { - let fields = plan.schema().fields(); - let mut pk_names = FindGroupByFinalName::default(); - - plan.visit(&mut pk_names) - .with_context(|_| DatafusionSnafu { - context: format!("Can't find aggr expr in plan {plan:?}"), - })?; - - // if no group by clause, return empty with first timestamp column found in output schema - let Some(pk_final_names) = pk_names.get_group_expr_names() else { - return Ok(None); - }; - if pk_final_names.is_empty() { - let first_ts_col = fields - .iter() - .find(|f| ConcreteDataType::from_arrow_type(f.data_type()).is_timestamp()) - .map(|f| f.name().clone()); - return Ok(Some(TableDef { - ts_col: first_ts_col, - pks: vec![], - })); - } - - let all_pk_cols: Vec<_> = fields - .iter() - .filter(|f| pk_final_names.contains(f.name())) - .map(|f| f.name().clone()) - .collect(); - // auto create table use first timestamp column in group by clause as time index - let first_time_stamp = fields - .iter() - .find(|f| { - all_pk_cols.contains(&f.name().clone()) - && ConcreteDataType::from_arrow_type(f.data_type()).is_timestamp() - }) - .map(|f| f.name().clone()); - - let all_pk_cols: Vec<_> = all_pk_cols - .into_iter() - .filter(|col| first_time_stamp.as_ref() != Some(col)) - .collect(); - - Ok(Some(TableDef { - ts_col: first_time_stamp, - pks: all_pk_cols, - })) -} - #[cfg(test)] -mod test { - use api::v1::column_def::try_as_column_schema; - use pretty_assertions::assert_eq; - use session::context::QueryContext; - - use super::*; - use crate::test_utils::create_test_query_engine; - - #[tokio::test] - async fn test_gen_create_table_sql() { - let query_engine = create_test_query_engine(); - let ctx = QueryContext::arc(); - struct TestCase { - sql: String, - sink_table_name: String, - column_schemas: Vec, - primary_keys: Vec, - time_index: String, - } - - let update_at_schema = ColumnSchema::new( - AUTO_CREATED_UPDATE_AT_TS_COL, - ConcreteDataType::timestamp_millisecond_datatype(), - true, - ); - - let ts_placeholder_schema = ColumnSchema::new( - AUTO_CREATED_PLACEHOLDER_TS_COL, - ConcreteDataType::timestamp_millisecond_datatype(), - false, - ) - .with_time_index(true); - - let testcases = vec![ - TestCase { - sql: "SELECT number, ts FROM numbers_with_ts".to_string(), - sink_table_name: "new_table".to_string(), - column_schemas: vec![ - ColumnSchema::new("number", ConcreteDataType::uint32_datatype(), true), - ColumnSchema::new( - "ts", - ConcreteDataType::timestamp_millisecond_datatype(), - false, - ) - .with_time_index(true), - update_at_schema.clone(), - ], - primary_keys: vec![], - time_index: "ts".to_string(), - }, - TestCase { - sql: "SELECT number, max(ts) FROM numbers_with_ts GROUP BY number".to_string(), - sink_table_name: "new_table".to_string(), - column_schemas: vec![ - ColumnSchema::new("number", ConcreteDataType::uint32_datatype(), true), - ColumnSchema::new( - "max(numbers_with_ts.ts)", - ConcreteDataType::timestamp_millisecond_datatype(), - true, - ), - update_at_schema.clone(), - ts_placeholder_schema.clone(), - ], - primary_keys: vec!["number".to_string()], - time_index: AUTO_CREATED_PLACEHOLDER_TS_COL.to_string(), - }, - TestCase { - sql: "SELECT max(number), ts FROM numbers_with_ts GROUP BY ts".to_string(), - sink_table_name: "new_table".to_string(), - column_schemas: vec![ - ColumnSchema::new( - "max(numbers_with_ts.number)", - ConcreteDataType::uint32_datatype(), - true, - ), - ColumnSchema::new( - "ts", - ConcreteDataType::timestamp_millisecond_datatype(), - false, - ) - .with_time_index(true), - update_at_schema.clone(), - ], - primary_keys: vec![], - time_index: "ts".to_string(), - }, - TestCase { - sql: "SELECT number, ts FROM numbers_with_ts GROUP BY ts, number".to_string(), - sink_table_name: "new_table".to_string(), - column_schemas: vec![ - ColumnSchema::new("number", ConcreteDataType::uint32_datatype(), true), - ColumnSchema::new( - "ts", - ConcreteDataType::timestamp_millisecond_datatype(), - false, - ) - .with_time_index(true), - update_at_schema.clone(), - ], - primary_keys: vec!["number".to_string()], - time_index: "ts".to_string(), - }, - ]; - - for tc in testcases { - let plan = sql_to_df_plan(ctx.clone(), query_engine.clone(), &tc.sql, true) - .await - .unwrap(); - let expr = create_table_with_expr( - &plan, - &[ - "greptime".to_string(), - "public".to_string(), - tc.sink_table_name.clone(), - ], - &QueryType::Sql, - ) - .unwrap(); - // TODO(discord9): assert expr - let column_schemas = expr - .column_defs - .iter() - .map(|c| try_as_column_schema(c).unwrap()) - .collect::>(); - assert_eq!(tc.column_schemas, column_schemas, "{:?}", tc.sql); - assert_eq!(tc.primary_keys, expr.primary_keys, "{:?}", tc.sql); - assert_eq!(tc.time_index, expr.time_index, "{:?}", tc.sql); - } - } -} +mod test; diff --git a/src/flow/src/batching_mode/task/test.rs b/src/flow/src/batching_mode/task/test.rs new file mode 100644 index 0000000000..55a0a3057f --- /dev/null +++ b/src/flow/src/batching_mode/task/test.rs @@ -0,0 +1,337 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use catalog::RegisterTableRequest; +use catalog::memory::MemoryCatalogManager; +use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; +use common_recordbatch::RecordBatch; +use datatypes::data_type::ConcreteDataType as CDT; +use datatypes::schema::ColumnSchema; +use datatypes::vectors::{UInt32Vector, VectorRef}; +use pretty_assertions::assert_eq; +use session::context::QueryContext; +use table::test_util::MemTable; + +use super::*; +use crate::batching_mode::time_window::find_time_window_expr; +use crate::test_utils::create_test_query_engine; + +async fn new_test_task_and_plan_with_missing_sink() -> (BatchingTask, LogicalPlan) { + new_test_task_engine_and_plan_with_query( + "SELECT number, ts FROM numbers_with_ts", + "missing_sink", + ) + .await + .into_task_and_plan() +} + +struct TestTaskParts { + task: BatchingTask, + query_engine: QueryEngineRef, + plan: LogicalPlan, +} + +impl TestTaskParts { + fn into_task_and_plan(self) -> (BatchingTask, LogicalPlan) { + (self.task, self.plan) + } +} + +async fn new_test_task_engine_and_plan_with_query(query: &str, sink_table: &str) -> TestTaskParts { + let query_engine = create_test_query_engine(); + let ctx = QueryContext::arc(); + let plan = sql_to_df_plan( + ctx.clone(), + query_engine.clone(), + "SELECT number, ts FROM numbers_with_ts", + true, + ) + .await + .unwrap(); + let (_tx, rx) = tokio::sync::oneshot::channel(); + + let task = BatchingTask::try_new(TaskArgs { + flow_id: 1, + query, + plan: plan.clone(), + time_window_expr: None, + expire_after: None, + sink_table_name: [ + "greptime".to_string(), + "public".to_string(), + sink_table.to_string(), + ], + source_table_names: vec![[ + "greptime".to_string(), + "public".to_string(), + "numbers_with_ts".to_string(), + ]], + query_ctx: ctx, + catalog_manager: query_engine.engine_state().catalog_manager().clone(), + shutdown_rx: rx, + batch_opts: Arc::new(BatchingModeOptions::default()), + flow_eval_interval: None, + }) + .unwrap(); + + TestTaskParts { + task, + query_engine, + plan, + } +} + +async fn new_time_window_test_task_with_query(query: &str) -> TestTaskParts { + let query_engine = create_test_query_engine(); + let ctx = QueryContext::arc(); + let plan_query = "SELECT number, date_bin(INTERVAL '5 second', ts) AS time_window FROM numbers_with_ts GROUP BY time_window, number"; + let plan = sql_to_df_plan(ctx.clone(), query_engine.clone(), plan_query, true) + .await + .unwrap(); + let (column_name, time_window_expr, _, df_schema) = find_time_window_expr( + &plan, + query_engine.engine_state().catalog_manager().clone(), + ctx.clone(), + ) + .await + .unwrap(); + let time_window_expr = time_window_expr.map(|expr| { + TimeWindowExpr::from_expr( + &expr, + &column_name, + &df_schema, + &query_engine.engine_state().session_state(), + ) + .unwrap() + }); + let (_tx, rx) = tokio::sync::oneshot::channel(); + + let task = BatchingTask::try_new(TaskArgs { + flow_id: 1, + query, + plan: plan.clone(), + time_window_expr, + expire_after: None, + sink_table_name: [ + "greptime".to_string(), + "public".to_string(), + "missing_sink".to_string(), + ], + source_table_names: vec![[ + "greptime".to_string(), + "public".to_string(), + "numbers_with_ts".to_string(), + ]], + query_ctx: ctx, + catalog_manager: query_engine.engine_state().catalog_manager().clone(), + shutdown_rx: rx, + batch_opts: Arc::new(BatchingModeOptions::default()), + flow_eval_interval: None, + }) + .unwrap(); + + TestTaskParts { + task, + query_engine, + plan, + } +} + +fn register_number_only_sink(query_engine: &QueryEngineRef, table_name: &str) { + let schema = Arc::new(Schema::new(vec![ColumnSchema::new( + "number", + CDT::uint32_datatype(), + false, + )])); + let columns: Vec = vec![Arc::new(UInt32Vector::from_slice([1_u32]))]; + let recordbatch = RecordBatch::new(schema, columns).unwrap(); + let table = MemTable::table(table_name, recordbatch); + let request = RegisterTableRequest { + catalog: DEFAULT_CATALOG_NAME.to_string(), + schema: DEFAULT_SCHEMA_NAME.to_string(), + table_name: table_name.to_string(), + table_id: 9001, + table, + }; + let catalog_manager = query_engine.engine_state().catalog_manager(); + let memory_catalog = catalog_manager + .as_any() + .downcast_ref::() + .unwrap(); + memory_catalog.register_table_sync(request).unwrap(); +} + +fn dirty_marker() -> DirtyTimeWindows { + let mut dirty = DirtyTimeWindows::default(); + dirty.set_dirty(); + dirty +} + +fn dirty_range(start: i64, end: i64) -> DirtyTimeWindows { + let mut dirty = DirtyTimeWindows::default(); + dirty.add_window( + Timestamp::new_second(start), + Some(Timestamp::new_second(end)), + ); + dirty +} + +async fn assert_unscoped_failure_restore( + consumed_dirty_windows: DirtyTimeWindows, + current_dirty_windows: DirtyTimeWindows, + expected_len: usize, + expected_window_size_secs: u64, +) { + let (task, plan) = new_test_task_and_plan_with_missing_sink().await; + { + let mut state = task.state.write().unwrap(); + state.dirty_time_windows.clean(); + state + .dirty_time_windows + .add_dirty_windows(¤t_dirty_windows); + } + let unscoped_query = PlanInfo { + plan, + dirty_restore: DirtyRestore::Unscoped(consumed_dirty_windows), + }; + + task.handle_executed_query_failure(Some(&unscoped_query)); + + let state = task.state.read().unwrap(); + assert_eq!(state.dirty_time_windows.len(), expected_len); + assert_eq!( + state.dirty_time_windows.window_size(), + std::time::Duration::from_secs(expected_window_size_secs) + ); +} + +#[tokio::test] +async fn test_executed_query_failure_restores_scoped_dirty_windows_for_flush_path() { + let (task, plan) = new_test_task_and_plan_with_missing_sink().await; + { + let mut state = task.state.write().unwrap(); + state.dirty_time_windows.clean(); + } + let scoped_query = PlanInfo { + plan, + dirty_restore: DirtyRestore::Scoped(FilterExprInfo { + expr: datafusion_expr::lit(true), + col_name: "ts".to_string(), + time_ranges: vec![(Timestamp::new_second(10), Timestamp::new_second(20))], + window_size: chrono::Duration::seconds(10), + }), + }; + + task.handle_executed_query_failure(Some(&scoped_query)); + + let state = task.state.read().unwrap(); + assert_eq!(state.dirty_time_windows.len(), 1); +} + +#[tokio::test] +async fn test_unscoped_failure_restores_consumed_dirty_signal() { + assert_unscoped_failure_restore(dirty_marker(), DirtyTimeWindows::default(), 1, 0).await; + assert_unscoped_failure_restore(dirty_range(30, 40), dirty_range(10, 20), 2, 20).await; + assert_unscoped_failure_restore(dirty_range(30, 40), dirty_range(30, 50), 1, 20).await; +} + +#[tokio::test] +async fn test_unscoped_plan_generation_failure_restores_consumed_dirty_signal() { + let TestTaskParts { + task, query_engine, .. + } = new_test_task_engine_and_plan_with_query( + "SELECT missing_column FROM numbers_with_ts", + "missing_sink", + ) + .await; + task.state.write().unwrap().dirty_time_windows.set_dirty(); + let sink_schema = Arc::new(Schema::new(vec![ + ColumnSchema::new("number", CDT::uint32_datatype(), false), + ColumnSchema::new("ts", CDT::timestamp_millisecond_datatype(), false).with_time_index(true), + ])); + + let result = task + .gen_query_with_time_window(query_engine, &sink_schema, &[], false, None) + .await; + + assert!(result.is_err()); + let state = task.state.read().unwrap(); + assert_eq!(state.dirty_time_windows.len(), 1); + assert_eq!( + state.dirty_time_windows.window_size(), + std::time::Duration::from_secs(0) + ); +} + +#[tokio::test] +async fn test_scoped_plan_generation_failure_restores_consumed_dirty_windows() { + let TestTaskParts { + task, + query_engine, + .. + } = new_time_window_test_task_with_query( + "SELECT missing_column, date_bin(INTERVAL '5 second', ts) AS time_window FROM numbers_with_ts GROUP BY time_window, missing_column", + ) + .await; + task.state + .write() + .unwrap() + .dirty_time_windows + .add_window(Timestamp::new_second(10), Some(Timestamp::new_second(15))); + let sink_schema = Arc::new(Schema::new(vec![ + ColumnSchema::new("number", CDT::uint32_datatype(), false), + ColumnSchema::new("time_window", CDT::timestamp_millisecond_datatype(), false) + .with_time_index(true), + ])); + + let result = task + .gen_query_with_time_window(query_engine, &sink_schema, &[], false, None) + .await; + + assert!(result.is_err()); + let state = task.state.read().unwrap(); + assert_eq!(state.dirty_time_windows.len(), 1); + assert_eq!( + state.dirty_time_windows.window_size(), + std::time::Duration::from_secs(5) + ); +} + +#[tokio::test] +async fn test_insert_plan_matching_failure_restores_consumed_dirty_marker() { + let sink_table = "partial_sink"; + let TestTaskParts { + task, query_engine, .. + } = new_test_task_engine_and_plan_with_query( + "SELECT number, ts FROM numbers_with_ts", + sink_table, + ) + .await; + register_number_only_sink(&query_engine, sink_table); + task.state.write().unwrap().dirty_time_windows.set_dirty(); + + let result = task.gen_insert_plan(&query_engine, None).await; + + assert!(result.is_err()); + let _err = match result { + Ok(_) => panic!("gen_insert_plan should fail with a sink column mismatch"), + Err(err) => err, + }; + let state = task.state.read().unwrap(); + assert_eq!(state.dirty_time_windows.len(), 1); + assert_eq!( + state.dirty_time_windows.window_size(), + std::time::Duration::from_secs(0) + ); +} diff --git a/src/frontend/src/instance/grpc.rs b/src/frontend/src/instance/grpc.rs index 8d18293cb8..0ca1a2cf20 100644 --- a/src/frontend/src/instance/grpc.rs +++ b/src/frontend/src/instance/grpc.rs @@ -121,8 +121,9 @@ impl GrpcQueryHandler for Instance { .context(PlanStatementSnafu)?; let dummy_catalog_list = - Arc::new(catalog::table_source::dummy_catalog::DummyCatalogList::new( + Arc::new(catalog::table_source::dummy_catalog::DummyCatalogList::new_with_query_ctx( self.catalog_manager().clone(), + ctx.clone(), )); let logical_plan = plan_decoder @@ -416,10 +417,12 @@ impl Instance { .new_plan_decoder() .context(PlanStatementSnafu)?; - let dummy_catalog_list = - Arc::new(catalog::table_source::dummy_catalog::DummyCatalogList::new( + let dummy_catalog_list = Arc::new( + catalog::table_source::dummy_catalog::DummyCatalogList::new_with_query_ctx( self.catalog_manager().clone(), - )); + ctx.clone(), + ), + ); // no optimize yet since we still need to add stuff let logical_plan = plan_decoder diff --git a/src/query/src/dummy_catalog.rs b/src/query/src/dummy_catalog.rs index 0ff985ea00..f08c6e6ec6 100644 --- a/src/query/src/dummy_catalog.rs +++ b/src/query/src/dummy_catalog.rs @@ -620,7 +620,10 @@ mod tests { use super::*; use crate::error::Error; - use crate::options::{FLOW_INCREMENTAL_AFTER_SEQS, FLOW_INCREMENTAL_MODE, FLOW_SINK_TABLE_ID}; + use crate::options::{ + FLOW_INCREMENTAL_AFTER_SEQS, FLOW_INCREMENTAL_MODE, FLOW_RETURN_REGION_SEQ, + FLOW_SINK_TABLE_ID, + }; fn test_region_id() -> RegionId { RegionId::new(1024, 1) @@ -651,6 +654,49 @@ mod tests { assert_eq!(request.sst_min_sequence, Some(7)); } + #[test] + fn test_terminal_watermark_context_source_and_sink_scan_semantics() { + let region_id = test_region_id(); + let query_ctx = QueryContextBuilder::default() + .extensions(HashMap::from([( + FLOW_RETURN_REGION_SEQ.to_string(), + "true".to_string(), + )])) + .build(); + + let request = scan_request_from_query_context(region_id, &query_ctx).unwrap(); + + assert!(request.snapshot_on_scan); + assert_eq!(request.memtable_min_sequence, None); + assert_eq!(request.memtable_max_sequence, None); + assert_eq!(request.sst_min_sequence, None); + + let query_ctx = QueryContextBuilder::default() + .extensions(HashMap::from([ + (FLOW_RETURN_REGION_SEQ.to_string(), "true".to_string()), + ( + FLOW_SINK_TABLE_ID.to_string(), + region_id.table_id().to_string(), + ), + ])) + .snapshot_seqs(Arc::new(RwLock::new(HashMap::from([( + region_id.as_u64(), + 88_u64, + )])))) + .sst_min_sequences(Arc::new(RwLock::new(HashMap::from([( + region_id.as_u64(), + 77_u64, + )])))) + .build(); + + let request = scan_request_from_query_context(region_id, &query_ctx).unwrap(); + + assert!(!request.snapshot_on_scan); + assert_eq!(request.memtable_min_sequence, None); + assert_eq!(request.memtable_max_sequence, None); + assert_eq!(request.sst_min_sequence, None); + } + #[test] fn test_scan_request_from_incremental_context_uses_snapshot_bound_intent() { let region_id = test_region_id();