mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-05-28 02:40:38 +00:00
feat: add flow query-context plumbing for terminal watermarks (#8154)
* feat: add flow checkpoint plumbing Signed-off-by: discord9 <discord9@163.com> * fix: restore when fail Signed-off-by: discord9 <discord9@163.com> * refactor: per review Signed-off-by: discord9 <discord9@163.com> * refactor: per review Signed-off-by: discord9 <discord9@163.com> * chore: clean up some test Signed-off-by: discord9 <discord9@163.com> * clippy Signed-off-by: discord9 <discord9@163.com> * refactor: move more to pr3b Signed-off-by: discord9 <discord9@163.com> * refactor: per review Signed-off-by: discord9 <discord9@163.com> --------- Signed-off-by: discord9 <discord9@163.com>
This commit is contained in:
@@ -22,6 +22,7 @@ use async_trait::async_trait;
|
||||
use common_catalog::format_full_table_name;
|
||||
use datafusion::catalog::{CatalogProvider, CatalogProviderList, SchemaProvider};
|
||||
use datafusion::datasource::TableProvider;
|
||||
use session::context::QueryContextRef;
|
||||
use snafu::OptionExt;
|
||||
use table::table::adapter::DfTableProviderAdapter;
|
||||
|
||||
@@ -32,12 +33,27 @@ use crate::error::TableNotExistSnafu;
|
||||
#[derive(Clone)]
|
||||
pub struct DummyCatalogList {
|
||||
catalog_manager: CatalogManagerRef,
|
||||
query_ctx: Option<QueryContextRef>,
|
||||
}
|
||||
|
||||
impl DummyCatalogList {
|
||||
/// Creates a new catalog list with the given catalog manager.
|
||||
/// Creates a new catalog list with the given catalog manager (no query context).
|
||||
pub fn new(catalog_manager: CatalogManagerRef) -> Self {
|
||||
Self { catalog_manager }
|
||||
Self {
|
||||
catalog_manager,
|
||||
query_ctx: None,
|
||||
}
|
||||
}
|
||||
|
||||
/// Creates a new catalog list with the given catalog manager and query context.
|
||||
pub fn new_with_query_ctx(
|
||||
catalog_manager: CatalogManagerRef,
|
||||
query_ctx: QueryContextRef,
|
||||
) -> Self {
|
||||
Self {
|
||||
catalog_manager,
|
||||
query_ctx: Some(query_ctx),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -68,6 +84,7 @@ impl CatalogProviderList for DummyCatalogList {
|
||||
Some(Arc::new(DummyCatalogProvider {
|
||||
catalog_name: catalog_name.to_string(),
|
||||
catalog_manager: self.catalog_manager.clone(),
|
||||
query_ctx: self.query_ctx.clone(),
|
||||
}))
|
||||
}
|
||||
}
|
||||
@@ -77,6 +94,7 @@ impl CatalogProviderList for DummyCatalogList {
|
||||
struct DummyCatalogProvider {
|
||||
catalog_name: String,
|
||||
catalog_manager: CatalogManagerRef,
|
||||
query_ctx: Option<QueryContextRef>,
|
||||
}
|
||||
|
||||
impl CatalogProvider for DummyCatalogProvider {
|
||||
@@ -93,6 +111,7 @@ impl CatalogProvider for DummyCatalogProvider {
|
||||
catalog_name: self.catalog_name.clone(),
|
||||
schema_name: schema_name.to_string(),
|
||||
catalog_manager: self.catalog_manager.clone(),
|
||||
query_ctx: self.query_ctx.clone(),
|
||||
}))
|
||||
}
|
||||
}
|
||||
@@ -111,6 +130,7 @@ struct DummySchemaProvider {
|
||||
catalog_name: String,
|
||||
schema_name: String,
|
||||
catalog_manager: CatalogManagerRef,
|
||||
query_ctx: Option<QueryContextRef>,
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
@@ -126,7 +146,12 @@ impl SchemaProvider for DummySchemaProvider {
|
||||
async fn table(&self, name: &str) -> datafusion::error::Result<Option<Arc<dyn TableProvider>>> {
|
||||
let table = self
|
||||
.catalog_manager
|
||||
.table(&self.catalog_name, &self.schema_name, name, None)
|
||||
.table(
|
||||
&self.catalog_name,
|
||||
&self.schema_name,
|
||||
name,
|
||||
self.query_ctx.as_deref(),
|
||||
)
|
||||
.await?
|
||||
.with_context(|| TableNotExistSnafu {
|
||||
table: format_full_table_name(&self.catalog_name, &self.schema_name, name),
|
||||
|
||||
@@ -314,6 +314,7 @@ impl RegionServer {
|
||||
let ctx = request.header.as_ref().map(|h| h.into());
|
||||
let query_ctx = Arc::new(ctx.unwrap_or_else(|| QueryContextBuilder::default().build()));
|
||||
|
||||
let region_id = request.region_id;
|
||||
let injector_builder = NameAwareDataSourceInjectorBuilder::from_plan(&request.plan)
|
||||
.context(DataFusionSnafu)?;
|
||||
let mut injector = injector_builder
|
||||
@@ -326,7 +327,6 @@ impl RegionServer {
|
||||
.context(DataFusionSnafu)?
|
||||
.data;
|
||||
|
||||
let region_id = request.region_id;
|
||||
let stream = self
|
||||
.inner
|
||||
.handle_read(QueryRequest { plan, ..request }, query_ctx.clone())
|
||||
@@ -837,14 +837,13 @@ fn wrap_flow_region_watermark_stream(
|
||||
region_id: RegionId,
|
||||
query_ctx: &QueryContextRef,
|
||||
) -> SendableRecordBatchStream {
|
||||
let Some(seq) = should_collect_region_watermark_from_extensions(&query_ctx.extensions())
|
||||
.then(|| query_ctx.get_snapshot(region_id.as_u64()))
|
||||
.flatten()
|
||||
else {
|
||||
return stream;
|
||||
};
|
||||
|
||||
Box::pin(RegionWatermarkStream::new(stream, region_id, seq))
|
||||
if should_collect_region_watermark_from_extensions(&query_ctx.extensions())
|
||||
&& let Some(seq) = query_ctx.get_snapshot(region_id.as_u64())
|
||||
{
|
||||
Box::pin(RegionWatermarkStream::new(stream, region_id, seq)) as SendableRecordBatchStream
|
||||
} else {
|
||||
stream
|
||||
}
|
||||
}
|
||||
|
||||
/// Wraps a region read stream so terminal metrics can carry the scan-open watermark.
|
||||
|
||||
@@ -20,12 +20,14 @@ use common_grpc::channel_manager::ClientTlsOption;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use session::ReadPreference;
|
||||
|
||||
mod checkpoint;
|
||||
pub(crate) mod engine;
|
||||
pub(crate) mod frontend_client;
|
||||
mod state;
|
||||
mod table_creator;
|
||||
mod task;
|
||||
mod time_window;
|
||||
mod utils;
|
||||
pub(crate) mod utils;
|
||||
|
||||
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
|
||||
pub struct BatchingModeOptions {
|
||||
|
||||
23
src/flow/src/batching_mode/checkpoint.rs
Normal file
23
src/flow/src/batching_mode/checkpoint.rs
Normal file
@@ -0,0 +1,23 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
#[allow(dead_code)]
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||
pub(super) enum CheckpointMode {
|
||||
/// Full-snapshot reads over the source tables.
|
||||
FullSnapshot,
|
||||
/// Incremental reads driven by explicitly emitted incremental scan
|
||||
/// extensions.
|
||||
Incremental,
|
||||
}
|
||||
@@ -59,8 +59,7 @@ use crate::{CreateFlowArgs, Error, FlowId, TableName};
|
||||
///
|
||||
/// TODO(discord9): determine how to configure refresh rate
|
||||
pub struct BatchingEngine {
|
||||
tasks: RwLock<BTreeMap<FlowId, BatchingTask>>,
|
||||
shutdown_txs: RwLock<BTreeMap<FlowId, oneshot::Sender<()>>>,
|
||||
runtime: RwLock<FlowRuntimeRegistry>,
|
||||
/// frontend client for insert request
|
||||
pub(crate) frontend_client: Arc<FrontendClient>,
|
||||
flow_metadata_manager: FlowMetadataManagerRef,
|
||||
@@ -72,6 +71,51 @@ pub struct BatchingEngine {
|
||||
pub(crate) batch_opts: Arc<BatchingModeOptions>,
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
struct FlowRuntimeRegistry {
|
||||
tasks: BTreeMap<FlowId, BatchingTask>,
|
||||
shutdown_txs: BTreeMap<FlowId, oneshot::Sender<()>>,
|
||||
}
|
||||
|
||||
impl FlowRuntimeRegistry {
|
||||
fn insert(
|
||||
&mut self,
|
||||
flow_id: FlowId,
|
||||
task: BatchingTask,
|
||||
shutdown_tx: oneshot::Sender<()>,
|
||||
) -> (Option<BatchingTask>, Option<oneshot::Sender<()>>) {
|
||||
(
|
||||
self.tasks.insert(flow_id, task),
|
||||
self.shutdown_txs.insert(flow_id, shutdown_tx),
|
||||
)
|
||||
}
|
||||
|
||||
fn remove(&mut self, flow_id: FlowId) -> Option<(BatchingTask, Option<oneshot::Sender<()>>)> {
|
||||
let task = self.tasks.remove(&flow_id)?;
|
||||
let shutdown_tx = self.shutdown_txs.remove(&flow_id);
|
||||
Some((task, shutdown_tx))
|
||||
}
|
||||
|
||||
fn remove_if_current(
|
||||
&mut self,
|
||||
flow_id: FlowId,
|
||||
task: &BatchingTask,
|
||||
) -> (Option<BatchingTask>, Option<oneshot::Sender<()>>) {
|
||||
if self
|
||||
.tasks
|
||||
.get(&flow_id)
|
||||
.is_some_and(|current| Arc::ptr_eq(¤t.state, &task.state))
|
||||
{
|
||||
let Some((removed_task, removed_shutdown_tx)) = self.remove(flow_id) else {
|
||||
return (None, None);
|
||||
};
|
||||
(Some(removed_task), removed_shutdown_tx)
|
||||
} else {
|
||||
(None, None)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl BatchingEngine {
|
||||
pub fn new(
|
||||
frontend_client: Arc<FrontendClient>,
|
||||
@@ -82,8 +126,7 @@ impl BatchingEngine {
|
||||
batch_opts: BatchingModeOptions,
|
||||
) -> Self {
|
||||
Self {
|
||||
tasks: Default::default(),
|
||||
shutdown_txs: Default::default(),
|
||||
runtime: Default::default(),
|
||||
frontend_client,
|
||||
flow_metadata_manager,
|
||||
table_meta,
|
||||
@@ -95,8 +138,9 @@ impl BatchingEngine {
|
||||
|
||||
/// Returns last execution timestamps (millisecond) for all batching flows.
|
||||
pub async fn get_last_exec_time_map(&self) -> BTreeMap<FlowId, i64> {
|
||||
let tasks = self.tasks.read().await;
|
||||
tasks
|
||||
let runtime = self.runtime.read().await;
|
||||
runtime
|
||||
.tasks
|
||||
.iter()
|
||||
.filter_map(|(flow_id, task)| {
|
||||
task.last_execution_time_millis()
|
||||
@@ -151,10 +195,17 @@ impl BatchingEngine {
|
||||
|
||||
let group_by_table_name = Arc::new(group_by_table_name);
|
||||
|
||||
let tasks = self
|
||||
.runtime
|
||||
.read()
|
||||
.await
|
||||
.tasks
|
||||
.values()
|
||||
.cloned()
|
||||
.collect::<Vec<_>>();
|
||||
let mut handles = Vec::new();
|
||||
let tasks = self.tasks.read().await;
|
||||
|
||||
for (_flow_id, task) in tasks.iter() {
|
||||
for task in tasks {
|
||||
let src_table_names = &task.config.source_table_names;
|
||||
|
||||
if src_table_names
|
||||
@@ -204,7 +255,6 @@ impl BatchingEngine {
|
||||
});
|
||||
handles.push(handle);
|
||||
}
|
||||
drop(tasks);
|
||||
for handle in handles {
|
||||
match handle.await {
|
||||
Err(e) => {
|
||||
@@ -274,9 +324,16 @@ impl BatchingEngine {
|
||||
|
||||
let group_by_table_name = Arc::new(group_by_table_name);
|
||||
|
||||
let tasks = self
|
||||
.runtime
|
||||
.read()
|
||||
.await
|
||||
.tasks
|
||||
.values()
|
||||
.cloned()
|
||||
.collect::<Vec<_>>();
|
||||
let mut handles = Vec::new();
|
||||
let tasks = self.tasks.read().await;
|
||||
for (_flow_id, task) in tasks.iter() {
|
||||
for task in tasks {
|
||||
let src_table_names = &task.config.source_table_names;
|
||||
|
||||
if src_table_names
|
||||
@@ -327,8 +384,6 @@ impl BatchingEngine {
|
||||
}
|
||||
}
|
||||
}
|
||||
drop(tasks);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
@@ -390,7 +445,7 @@ impl BatchingEngine {
|
||||
|
||||
// or replace logic
|
||||
{
|
||||
let is_exist = self.tasks.read().await.contains_key(&flow_id);
|
||||
let is_exist = self.runtime.read().await.tasks.contains_key(&flow_id);
|
||||
match (create_if_not_exists, or_replace, is_exist) {
|
||||
// if replace, ignore that old flow exists
|
||||
(_, true, true) => {
|
||||
@@ -521,17 +576,60 @@ impl BatchingEngine {
|
||||
// check execute once first to detect any error early
|
||||
task.check_or_create_sink_table(&engine, &frontend).await?;
|
||||
|
||||
let (start_tx, start_rx) = oneshot::channel();
|
||||
|
||||
// TODO(discord9): use time wheel or what for better
|
||||
let handle = common_runtime::spawn_global(async move {
|
||||
task_inner.start_executing_loop(engine, frontend).await;
|
||||
if start_rx.await.is_ok() {
|
||||
task_inner.start_executing_loop(engine, frontend).await;
|
||||
}
|
||||
});
|
||||
task.state.write().unwrap().task_handle = Some(handle);
|
||||
let task_for_rollback = task.clone();
|
||||
|
||||
// only replace here not earlier because we want the old one intact if something went wrong before this line
|
||||
let replaced_old_task_opt = self.tasks.write().await.insert(flow_id, task);
|
||||
drop(replaced_old_task_opt);
|
||||
// Only replace here, not earlier, because we want the old one intact if
|
||||
// something went wrong before this line. Keep the task and shutdown
|
||||
// sender in one registry lock so create/remove can't observe one
|
||||
// without the other.
|
||||
let (replaced_old_task_opt, replaced_old_shutdown_tx) = {
|
||||
let mut runtime = self.runtime.write().await;
|
||||
|
||||
self.shutdown_txs.write().await.insert(flow_id, tx);
|
||||
let is_exist = runtime.tasks.contains_key(&flow_id);
|
||||
match (create_if_not_exists, or_replace, is_exist) {
|
||||
(_, true, true) => {
|
||||
info!(
|
||||
"Replacing flow with id={} after final registry check",
|
||||
flow_id
|
||||
);
|
||||
}
|
||||
(false, false, true) => {
|
||||
abort_flow_task(flow_id, Some(task), "unregistered");
|
||||
return FlowAlreadyExistSnafu { id: flow_id }.fail();
|
||||
}
|
||||
(true, false, true) => {
|
||||
info!(
|
||||
"Flow with id={} already exists at final registry check, do nothing",
|
||||
flow_id
|
||||
);
|
||||
abort_flow_task(flow_id, Some(task), "unregistered");
|
||||
return Ok(None);
|
||||
}
|
||||
(_, _, false) => (),
|
||||
}
|
||||
|
||||
runtime.insert(flow_id, task, tx)
|
||||
};
|
||||
|
||||
notify_flow_shutdown(flow_id, replaced_old_shutdown_tx, "replaced");
|
||||
abort_flow_task(flow_id, replaced_old_task_opt, "replaced");
|
||||
if start_tx.send(()).is_err() {
|
||||
self.rollback_flow_runtime_if_current(flow_id, &task_for_rollback)
|
||||
.await;
|
||||
UnexpectedSnafu {
|
||||
reason: format!("Failed to start flow {flow_id} due to task already dropped"),
|
||||
}
|
||||
.fail()?;
|
||||
}
|
||||
|
||||
Ok(Some(flow_id))
|
||||
}
|
||||
@@ -662,21 +760,25 @@ impl BatchingEngine {
|
||||
}
|
||||
|
||||
pub async fn remove_flow_inner(&self, flow_id: FlowId) -> Result<(), Error> {
|
||||
if self.tasks.write().await.remove(&flow_id).is_none() {
|
||||
warn!("Flow {flow_id} not found in tasks");
|
||||
FlowNotFoundSnafu { id: flow_id }.fail()?;
|
||||
}
|
||||
let Some(tx) = self.shutdown_txs.write().await.remove(&flow_id) else {
|
||||
let (task, shutdown_tx) = {
|
||||
let mut runtime = self.runtime.write().await;
|
||||
let Some((task, shutdown_tx)) = runtime.remove(flow_id) else {
|
||||
warn!("Flow {flow_id} not found in tasks");
|
||||
FlowNotFoundSnafu { id: flow_id }.fail()?
|
||||
};
|
||||
(task, shutdown_tx)
|
||||
};
|
||||
|
||||
let had_shutdown_tx = notify_flow_shutdown(flow_id, shutdown_tx, "removed");
|
||||
abort_flow_task(flow_id, Some(task), "removed");
|
||||
|
||||
if !had_shutdown_tx {
|
||||
UnexpectedSnafu {
|
||||
reason: format!("Can't found shutdown tx for flow {flow_id}"),
|
||||
}
|
||||
.fail()?
|
||||
};
|
||||
if tx.send(()).is_err() {
|
||||
warn!(
|
||||
"Fail to shutdown flow {flow_id} due to receiver already dropped, maybe flow {flow_id} is already dropped?"
|
||||
)
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -688,7 +790,7 @@ impl BatchingEngine {
|
||||
// this is only useful for the case when we are flushing the flow right after inserting data into it
|
||||
// TODO(discord9): find a better way to ensure the data is ready, maybe inform flownode from frontend?
|
||||
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
|
||||
let task = self.tasks.read().await.get(&flow_id).cloned();
|
||||
let task = self.runtime.read().await.tasks.get(&flow_id).cloned();
|
||||
let task = task.with_context(|| FlowNotFoundSnafu { id: flow_id })?;
|
||||
|
||||
let time_window_size = task
|
||||
@@ -713,7 +815,7 @@ impl BatchingEngine {
|
||||
)
|
||||
.await?;
|
||||
|
||||
let affected_rows = res.map(|(r, _)| r).unwrap_or_default() as usize;
|
||||
let affected_rows = res.map(|(r, _)| r).unwrap_or_default();
|
||||
debug!(
|
||||
"Successfully flush flow {flow_id}, affected rows={}",
|
||||
affected_rows
|
||||
@@ -723,8 +825,46 @@ impl BatchingEngine {
|
||||
|
||||
/// Determine if the batching mode flow task exists with given flow id
|
||||
pub async fn flow_exist_inner(&self, flow_id: FlowId) -> bool {
|
||||
self.tasks.read().await.contains_key(&flow_id)
|
||||
self.runtime.read().await.tasks.contains_key(&flow_id)
|
||||
}
|
||||
|
||||
async fn rollback_flow_runtime_if_current(&self, flow_id: FlowId, task: &BatchingTask) {
|
||||
let (removed_task, removed_shutdown_tx) = {
|
||||
let mut runtime = self.runtime.write().await;
|
||||
runtime.remove_if_current(flow_id, task)
|
||||
};
|
||||
|
||||
notify_flow_shutdown(flow_id, removed_shutdown_tx, "rolled back");
|
||||
abort_flow_task(flow_id, removed_task, "rolled back");
|
||||
}
|
||||
}
|
||||
|
||||
fn notify_flow_shutdown(flow_id: FlowId, tx: Option<oneshot::Sender<()>>, action: &str) -> bool {
|
||||
let Some(tx) = tx else {
|
||||
return false;
|
||||
};
|
||||
|
||||
if tx.send(()).is_err() {
|
||||
warn!(
|
||||
"Fail to shutdown {action} flow {flow_id} due to receiver already dropped, maybe flow {flow_id} is already dropped?"
|
||||
);
|
||||
}
|
||||
|
||||
true
|
||||
}
|
||||
|
||||
fn abort_flow_task(flow_id: FlowId, task: Option<BatchingTask>, action: &str) -> bool {
|
||||
let Some(task) = task else {
|
||||
return false;
|
||||
};
|
||||
|
||||
if let Some(handle) = task.state.write().unwrap().task_handle.take() {
|
||||
handle.abort();
|
||||
debug!("Aborted {action} flow task {flow_id}");
|
||||
return true;
|
||||
}
|
||||
|
||||
false
|
||||
}
|
||||
|
||||
impl FlowEngine for BatchingEngine {
|
||||
@@ -741,7 +881,14 @@ impl FlowEngine for BatchingEngine {
|
||||
Ok(self.flow_exist_inner(flow_id).await)
|
||||
}
|
||||
async fn list_flows(&self) -> Result<impl IntoIterator<Item = FlowId>, Error> {
|
||||
Ok(self.tasks.read().await.keys().cloned().collect::<Vec<_>>())
|
||||
Ok(self
|
||||
.runtime
|
||||
.read()
|
||||
.await
|
||||
.tasks
|
||||
.keys()
|
||||
.cloned()
|
||||
.collect::<Vec<_>>())
|
||||
}
|
||||
async fn handle_flow_inserts(
|
||||
&self,
|
||||
@@ -756,3 +903,241 @@ impl FlowEngine for BatchingEngine {
|
||||
self.handle_mark_dirty_time_window(req).await
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use catalog::memory::new_memory_catalog_manager;
|
||||
use common_meta::key::TableMetadataManager;
|
||||
use common_meta::key::flow::FlowMetadataManager;
|
||||
use common_meta::kv_backend::memory::MemoryKvBackend;
|
||||
use query::options::QueryOptions;
|
||||
use session::context::QueryContext;
|
||||
|
||||
use super::*;
|
||||
use crate::test_utils::create_test_query_engine;
|
||||
|
||||
struct DropNotify(Option<oneshot::Sender<()>>);
|
||||
|
||||
impl Drop for DropNotify {
|
||||
fn drop(&mut self) {
|
||||
if let Some(tx) = self.0.take() {
|
||||
let _ = tx.send(());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn new_test_engine() -> BatchingEngine {
|
||||
let kv_backend = Arc::new(MemoryKvBackend::new());
|
||||
let table_meta = Arc::new(TableMetadataManager::new(kv_backend.clone()));
|
||||
table_meta.init().await.unwrap();
|
||||
let flow_meta = Arc::new(FlowMetadataManager::new(kv_backend));
|
||||
let catalog_manager = new_memory_catalog_manager().unwrap();
|
||||
let query_engine = create_test_query_engine();
|
||||
let (frontend_client, _handler) =
|
||||
FrontendClient::from_empty_grpc_handler(QueryOptions::default());
|
||||
|
||||
BatchingEngine::new(
|
||||
Arc::new(frontend_client),
|
||||
query_engine,
|
||||
flow_meta,
|
||||
table_meta,
|
||||
catalog_manager,
|
||||
BatchingModeOptions::default(),
|
||||
)
|
||||
}
|
||||
|
||||
async fn new_test_task(flow_id: FlowId) -> (BatchingTask, oneshot::Sender<()>) {
|
||||
let query_engine = create_test_query_engine();
|
||||
let ctx = QueryContext::arc();
|
||||
let plan = sql_to_df_plan(
|
||||
ctx.clone(),
|
||||
query_engine.clone(),
|
||||
"SELECT number, ts FROM numbers_with_ts",
|
||||
true,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
let (tx, rx) = oneshot::channel();
|
||||
|
||||
let task = BatchingTask::try_new(TaskArgs {
|
||||
flow_id,
|
||||
query: "SELECT number, ts FROM numbers_with_ts",
|
||||
plan,
|
||||
time_window_expr: None,
|
||||
expire_after: None,
|
||||
sink_table_name: [
|
||||
"greptime".to_string(),
|
||||
"public".to_string(),
|
||||
"sink".to_string(),
|
||||
],
|
||||
source_table_names: vec![[
|
||||
"greptime".to_string(),
|
||||
"public".to_string(),
|
||||
"numbers_with_ts".to_string(),
|
||||
]],
|
||||
query_ctx: ctx,
|
||||
catalog_manager: query_engine.engine_state().catalog_manager().clone(),
|
||||
shutdown_rx: rx,
|
||||
batch_opts: Arc::new(BatchingModeOptions::default()),
|
||||
flow_eval_interval: None,
|
||||
})
|
||||
.unwrap();
|
||||
|
||||
(task, tx)
|
||||
}
|
||||
|
||||
async fn install_abort_observed_handle(task: &BatchingTask) -> oneshot::Receiver<()> {
|
||||
let (drop_tx, drop_rx) = oneshot::channel();
|
||||
let (entered_tx, entered_rx) = oneshot::channel();
|
||||
let handle = tokio::spawn(async move {
|
||||
let _guard = DropNotify(Some(drop_tx));
|
||||
let _ = entered_tx.send(());
|
||||
std::future::pending::<()>().await;
|
||||
});
|
||||
task.state.write().unwrap().task_handle = Some(handle);
|
||||
tokio::time::timeout(Duration::from_secs(1), entered_rx)
|
||||
.await
|
||||
.expect("test task handle should start")
|
||||
.expect("test task handle should report start");
|
||||
drop_rx
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_notify_flow_shutdown_sends_signal() {
|
||||
let (tx, rx) = oneshot::channel();
|
||||
|
||||
assert!(notify_flow_shutdown(42, Some(tx), "test"));
|
||||
|
||||
rx.await.expect("replaced flow should receive shutdown");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_notify_flow_shutdown_accepts_missing_sender() {
|
||||
assert!(!notify_flow_shutdown(42, None, "test"));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_abort_flow_task_aborts_handle() {
|
||||
let (task, _shutdown_tx) = new_test_task(42).await;
|
||||
let drop_rx = install_abort_observed_handle(&task).await;
|
||||
|
||||
assert!(abort_flow_task(42, Some(task), "test"));
|
||||
|
||||
tokio::time::timeout(Duration::from_secs(1), drop_rx)
|
||||
.await
|
||||
.expect("aborted task should be dropped")
|
||||
.expect("drop notifier should fire");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_remove_flow_inner_aborts_registered_task() {
|
||||
let engine = new_test_engine().await;
|
||||
let (task, shutdown_tx) = new_test_task(42).await;
|
||||
let drop_rx = install_abort_observed_handle(&task).await;
|
||||
|
||||
engine.runtime.write().await.insert(42, task, shutdown_tx);
|
||||
|
||||
engine.remove_flow_inner(42).await.unwrap();
|
||||
|
||||
tokio::time::timeout(Duration::from_secs(1), drop_rx)
|
||||
.await
|
||||
.expect("removed task should be dropped")
|
||||
.expect("drop notifier should fire");
|
||||
assert!(!engine.flow_exist_inner(42).await);
|
||||
assert!(!engine.runtime.read().await.shutdown_txs.contains_key(&42));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_or_replace_flow_runtime_replaces_old_handles_and_keeps_new_task() {
|
||||
let engine = new_test_engine().await;
|
||||
let (old_task, old_shutdown_tx) = new_test_task(42).await;
|
||||
let old_task_identity = old_task.clone();
|
||||
let old_drop_rx = install_abort_observed_handle(&old_task).await;
|
||||
let (new_task, new_shutdown_tx) = new_test_task(42).await;
|
||||
let new_task_identity = new_task.clone();
|
||||
|
||||
engine
|
||||
.runtime
|
||||
.write()
|
||||
.await
|
||||
.insert(42, old_task, old_shutdown_tx);
|
||||
let (replaced_old_task, replaced_old_shutdown_tx) =
|
||||
engine
|
||||
.runtime
|
||||
.write()
|
||||
.await
|
||||
.insert(42, new_task, new_shutdown_tx);
|
||||
|
||||
let replaced_old_task = replaced_old_task.expect("old task should be returned");
|
||||
assert!(Arc::ptr_eq(
|
||||
&replaced_old_task.state,
|
||||
&old_task_identity.state
|
||||
));
|
||||
assert!(notify_flow_shutdown(
|
||||
42,
|
||||
replaced_old_shutdown_tx,
|
||||
"replaced"
|
||||
));
|
||||
old_task_identity
|
||||
.state
|
||||
.write()
|
||||
.unwrap()
|
||||
.shutdown_rx
|
||||
.try_recv()
|
||||
.expect("old shutdown receiver should receive signal");
|
||||
assert!(abort_flow_task(42, Some(replaced_old_task), "replaced"));
|
||||
|
||||
tokio::time::timeout(Duration::from_secs(1), old_drop_rx)
|
||||
.await
|
||||
.expect("replaced task should be dropped")
|
||||
.expect("drop notifier should fire");
|
||||
|
||||
let runtime = engine.runtime.read().await;
|
||||
assert_eq!(1, runtime.tasks.len());
|
||||
assert_eq!(1, runtime.shutdown_txs.len());
|
||||
let registered_task = runtime.tasks.get(&42).expect("new task should remain");
|
||||
assert!(Arc::ptr_eq(
|
||||
®istered_task.state,
|
||||
&new_task_identity.state
|
||||
));
|
||||
assert!(runtime.shutdown_txs.contains_key(&42));
|
||||
assert!(matches!(
|
||||
new_task_identity
|
||||
.state
|
||||
.write()
|
||||
.unwrap()
|
||||
.shutdown_rx
|
||||
.try_recv(),
|
||||
Err(oneshot::error::TryRecvError::Empty)
|
||||
));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_rollback_flow_runtime_if_current_removes_matching_task_only() {
|
||||
let engine = new_test_engine().await;
|
||||
let (old_task, _old_shutdown_tx) = new_test_task(42).await;
|
||||
let (current_task, current_shutdown_tx) = new_test_task(42).await;
|
||||
let current_task_identity = current_task.clone();
|
||||
|
||||
engine
|
||||
.runtime
|
||||
.write()
|
||||
.await
|
||||
.insert(42, current_task, current_shutdown_tx);
|
||||
|
||||
engine.rollback_flow_runtime_if_current(42, &old_task).await;
|
||||
|
||||
let registered_task = engine.runtime.read().await.tasks.get(&42).cloned().unwrap();
|
||||
assert!(Arc::ptr_eq(
|
||||
®istered_task.state,
|
||||
¤t_task_identity.state
|
||||
));
|
||||
assert!(engine.runtime.read().await.shutdown_txs.contains_key(&42));
|
||||
|
||||
engine
|
||||
.rollback_flow_runtime_if_current(42, ¤t_task_identity)
|
||||
.await;
|
||||
assert!(!engine.flow_exist_inner(42).await);
|
||||
assert!(!engine.runtime.read().await.shutdown_txs.contains_key(&42));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -20,15 +20,17 @@ use std::sync::{Arc, Mutex, Weak};
|
||||
use api::v1::greptime_request::Request;
|
||||
use api::v1::query_request::Query;
|
||||
use api::v1::{CreateTableExpr, QueryRequest};
|
||||
use client::{Client, Database};
|
||||
use client::{Client, Database, OutputWithMetrics};
|
||||
use common_error::ext::BoxedError;
|
||||
use common_grpc::channel_manager::{ChannelConfig, ChannelManager, load_client_tls_config};
|
||||
use common_meta::peer::{Peer, PeerDiscovery};
|
||||
use common_query::Output;
|
||||
use common_query::{Output, OutputData};
|
||||
use common_recordbatch::adapter::{RecordBatchMetrics, RegionWatermarkEntry};
|
||||
use common_telemetry::warn;
|
||||
use meta_client::client::MetaClient;
|
||||
use query::datafusion::QUERY_PARALLELISM_HINT;
|
||||
use query::options::QueryOptions;
|
||||
use query::metrics::terminal_recordbatch_metrics_from_plan;
|
||||
use query::options::{FlowQueryExtensions, QueryOptions};
|
||||
use rand::rng;
|
||||
use rand::seq::SliceRandom;
|
||||
use servers::query_handler::grpc::GrpcQueryHandler;
|
||||
@@ -196,9 +198,6 @@ impl DatabaseWithPeer {
|
||||
}
|
||||
|
||||
impl FrontendClient {
|
||||
// TODO: support more fine-grained load balancing strategies for frontend
|
||||
// selection, such as AZ (availability zone) awareness, to prefer frontends
|
||||
// in the same zone as the flownode and reduce cross-AZ latency.
|
||||
/// scan for available frontend from metadata
|
||||
pub(crate) async fn scan_for_frontend(&self) -> Result<Vec<Peer>, Error> {
|
||||
let Self::Distributed { meta_client, .. } = self else {
|
||||
@@ -341,6 +340,78 @@ impl FrontendClient {
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn query_with_terminal_metrics(
|
||||
&self,
|
||||
catalog: &str,
|
||||
schema: &str,
|
||||
request: QueryRequest,
|
||||
extensions: &[(&str, &str)],
|
||||
) -> Result<OutputWithMetrics, Error> {
|
||||
let flow_extensions = build_flow_extensions(extensions)?;
|
||||
match self {
|
||||
FrontendClient::Distributed {
|
||||
query, batch_opts, ..
|
||||
} => {
|
||||
let query_parallelism = query.parallelism.to_string();
|
||||
let hints = vec![
|
||||
(QUERY_PARALLELISM_HINT, query_parallelism.as_str()),
|
||||
(READ_PREFERENCE_HINT, batch_opts.read_preference.as_ref()),
|
||||
];
|
||||
let db = self.get_random_active_frontend(catalog, schema).await?;
|
||||
db.database
|
||||
.query_with_terminal_metrics_and_flow_extensions(request, &hints, extensions)
|
||||
.await
|
||||
.map_err(BoxedError::new)
|
||||
.context(ExternalSnafu)
|
||||
}
|
||||
FrontendClient::Standalone {
|
||||
database_client,
|
||||
query,
|
||||
} => {
|
||||
let mut extensions_map = HashMap::from([(
|
||||
QUERY_PARALLELISM_HINT.to_string(),
|
||||
query.parallelism.to_string(),
|
||||
)]);
|
||||
for (key, value) in extensions {
|
||||
extensions_map.insert((*key).to_string(), (*value).to_string());
|
||||
}
|
||||
let ctx = QueryContextBuilder::default()
|
||||
.current_catalog(catalog.to_string())
|
||||
.current_schema(schema.to_string())
|
||||
.extensions(extensions_map)
|
||||
.build();
|
||||
let ctx = Arc::new(ctx);
|
||||
let database_client = {
|
||||
database_client
|
||||
.handler
|
||||
.lock()
|
||||
.map_err(|e| {
|
||||
UnexpectedSnafu {
|
||||
reason: format!("Failed to lock database client: {e}"),
|
||||
}
|
||||
.build()
|
||||
})?
|
||||
.as_ref()
|
||||
.context(UnexpectedSnafu {
|
||||
reason: "Standalone's frontend instance is not set",
|
||||
})?
|
||||
.upgrade()
|
||||
.context(UnexpectedSnafu {
|
||||
reason: "Failed to upgrade database client",
|
||||
})?
|
||||
};
|
||||
database_client
|
||||
.do_query(Request::Query(request), ctx.clone())
|
||||
.await
|
||||
.map(|output| {
|
||||
wrap_standalone_output_with_terminal_metrics(output, &flow_extensions, &ctx)
|
||||
})
|
||||
.map_err(BoxedError::new)
|
||||
.context(ExternalSnafu)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Handle a request to frontend
|
||||
pub(crate) async fn handle(
|
||||
&self,
|
||||
@@ -426,6 +497,64 @@ impl FrontendClient {
|
||||
}
|
||||
}
|
||||
|
||||
fn build_flow_extensions(extensions: &[(&str, &str)]) -> Result<FlowQueryExtensions, Error> {
|
||||
let flow_extensions = HashMap::from_iter(
|
||||
extensions
|
||||
.iter()
|
||||
.map(|(key, value)| ((*key).to_string(), (*value).to_string())),
|
||||
);
|
||||
FlowQueryExtensions::parse_flow_extensions(&flow_extensions)
|
||||
.map_err(BoxedError::new)
|
||||
.context(ExternalSnafu)
|
||||
.map(|extensions| extensions.unwrap_or_default())
|
||||
}
|
||||
|
||||
fn wrap_standalone_output_with_terminal_metrics(
|
||||
output: Output,
|
||||
flow_extensions: &FlowQueryExtensions,
|
||||
query_ctx: &QueryContextRef,
|
||||
) -> OutputWithMetrics {
|
||||
let should_collect_region_watermark = flow_extensions.should_collect_region_watermark();
|
||||
let terminal_metrics =
|
||||
if should_collect_region_watermark && !matches!(&output.data, OutputData::Stream(_)) {
|
||||
output
|
||||
.meta
|
||||
.plan
|
||||
.clone()
|
||||
.and_then(terminal_recordbatch_metrics_from_plan)
|
||||
.or_else(|| terminal_recordbatch_metrics_from_snapshots(query_ctx))
|
||||
} else {
|
||||
None
|
||||
};
|
||||
let result = OutputWithMetrics::from_output(output);
|
||||
if let Some(metrics) = terminal_metrics {
|
||||
result.metrics.update(Some(metrics));
|
||||
}
|
||||
result
|
||||
}
|
||||
|
||||
fn terminal_recordbatch_metrics_from_snapshots(
|
||||
query_ctx: &QueryContextRef,
|
||||
) -> Option<RecordBatchMetrics> {
|
||||
let mut region_watermarks = query_ctx
|
||||
.snapshots()
|
||||
.into_iter()
|
||||
.map(|(region_id, watermark)| RegionWatermarkEntry {
|
||||
region_id,
|
||||
watermark: Some(watermark),
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
if region_watermarks.is_empty() {
|
||||
return None;
|
||||
}
|
||||
|
||||
region_watermarks.sort_by_key(|entry| entry.region_id);
|
||||
Some(RecordBatchMetrics {
|
||||
region_watermarks,
|
||||
..Default::default()
|
||||
})
|
||||
}
|
||||
|
||||
/// Describe a peer of frontend
|
||||
#[derive(Debug, Default)]
|
||||
pub(crate) enum PeerDesc {
|
||||
@@ -450,9 +579,17 @@ impl std::fmt::Display for PeerDesc {
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::pin::Pin;
|
||||
use std::task::{Context, Poll};
|
||||
use std::time::Duration;
|
||||
|
||||
use common_query::Output;
|
||||
use common_query::{Output, OutputData};
|
||||
use common_recordbatch::adapter::RecordBatchMetrics;
|
||||
use common_recordbatch::{OrderOption, RecordBatch, RecordBatchStream};
|
||||
use datatypes::prelude::{ConcreteDataType, VectorRef};
|
||||
use datatypes::schema::{ColumnSchema, Schema};
|
||||
use datatypes::vectors::Int32Vector;
|
||||
use futures::StreamExt;
|
||||
use tokio::time::timeout;
|
||||
|
||||
use super::*;
|
||||
@@ -460,6 +597,58 @@ mod tests {
|
||||
#[derive(Debug)]
|
||||
struct NoopHandler;
|
||||
|
||||
struct MockMetricsStream {
|
||||
schema: datatypes::schema::SchemaRef,
|
||||
batch: Option<RecordBatch>,
|
||||
metrics: RecordBatchMetrics,
|
||||
terminal_metrics_only: bool,
|
||||
}
|
||||
|
||||
impl futures::Stream for MockMetricsStream {
|
||||
type Item = common_recordbatch::error::Result<RecordBatch>;
|
||||
|
||||
fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
||||
Poll::Ready(self.batch.take().map(Ok))
|
||||
}
|
||||
|
||||
fn size_hint(&self) -> (usize, Option<usize>) {
|
||||
(
|
||||
usize::from(self.batch.is_some()),
|
||||
Some(usize::from(self.batch.is_some())),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
impl RecordBatchStream for MockMetricsStream {
|
||||
fn name(&self) -> &str {
|
||||
"MockMetricsStream"
|
||||
}
|
||||
|
||||
fn schema(&self) -> datatypes::schema::SchemaRef {
|
||||
self.schema.clone()
|
||||
}
|
||||
|
||||
fn output_ordering(&self) -> Option<&[OrderOption]> {
|
||||
None
|
||||
}
|
||||
|
||||
fn metrics(&self) -> Option<RecordBatchMetrics> {
|
||||
if self.terminal_metrics_only && self.batch.is_some() {
|
||||
return None;
|
||||
}
|
||||
Some(self.metrics.clone())
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct MetricsHandler;
|
||||
|
||||
#[derive(Debug)]
|
||||
struct ExtensionAwareHandler;
|
||||
|
||||
#[derive(Debug)]
|
||||
struct SnapshotBindingHandler;
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl GrpcQueryHandlerWithBoxedError for NoopHandler {
|
||||
async fn do_query(
|
||||
@@ -471,6 +660,63 @@ mod tests {
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl GrpcQueryHandlerWithBoxedError for MetricsHandler {
|
||||
async fn do_query(
|
||||
&self,
|
||||
_query: Request,
|
||||
_ctx: QueryContextRef,
|
||||
) -> std::result::Result<Output, BoxedError> {
|
||||
let schema = Arc::new(Schema::new(vec![ColumnSchema::new(
|
||||
"v",
|
||||
ConcreteDataType::int32_datatype(),
|
||||
false,
|
||||
)]));
|
||||
let batch = RecordBatch::new(
|
||||
schema.clone(),
|
||||
vec![Arc::new(Int32Vector::from_slice([1, 2])) as VectorRef],
|
||||
)
|
||||
.unwrap();
|
||||
Ok(Output::new_with_stream(Box::pin(MockMetricsStream {
|
||||
schema,
|
||||
batch: Some(batch),
|
||||
metrics: RecordBatchMetrics {
|
||||
region_watermarks: vec![common_recordbatch::adapter::RegionWatermarkEntry {
|
||||
region_id: 42,
|
||||
watermark: Some(99),
|
||||
}],
|
||||
..Default::default()
|
||||
},
|
||||
terminal_metrics_only: true,
|
||||
})))
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl GrpcQueryHandlerWithBoxedError for ExtensionAwareHandler {
|
||||
async fn do_query(
|
||||
&self,
|
||||
_query: Request,
|
||||
ctx: QueryContextRef,
|
||||
) -> std::result::Result<Output, BoxedError> {
|
||||
assert_eq!(ctx.extension("flow.return_region_seq"), Some("true"));
|
||||
Ok(Output::new_with_affected_rows(1))
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl GrpcQueryHandlerWithBoxedError for SnapshotBindingHandler {
|
||||
async fn do_query(
|
||||
&self,
|
||||
_query: Request,
|
||||
ctx: QueryContextRef,
|
||||
) -> std::result::Result<Output, BoxedError> {
|
||||
assert_eq!(ctx.extension("flow.return_region_seq"), Some("true"));
|
||||
ctx.set_snapshot(42, 99);
|
||||
Ok(Output::new_with_affected_rows(1))
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn wait_initialized() {
|
||||
let (client, handler_mut) =
|
||||
@@ -516,4 +762,106 @@ mod tests {
|
||||
.is_ok()
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_query_with_terminal_metrics_tracks_watermark_in_standalone_mode() {
|
||||
let handler: Arc<dyn GrpcQueryHandlerWithBoxedError> = Arc::new(MetricsHandler);
|
||||
let client =
|
||||
FrontendClient::from_grpc_handler(Arc::downgrade(&handler), QueryOptions::default());
|
||||
|
||||
let result = client
|
||||
.query_with_terminal_metrics(
|
||||
"greptime",
|
||||
"public",
|
||||
QueryRequest {
|
||||
query: Some(Query::Sql("select 1".to_string())),
|
||||
},
|
||||
&[],
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let terminal_metrics = result.metrics.clone();
|
||||
assert!(!result.metrics.is_ready());
|
||||
assert!(terminal_metrics.get().is_none());
|
||||
|
||||
let OutputData::Stream(mut stream) = result.output.data else {
|
||||
panic!("expected stream output");
|
||||
};
|
||||
while stream.next().await.is_some() {}
|
||||
|
||||
assert!(terminal_metrics.is_ready());
|
||||
assert_eq!(
|
||||
terminal_metrics.region_watermark_map(),
|
||||
Some(HashMap::from([(42_u64, 99_u64)]))
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_query_with_terminal_metrics_forwards_flow_extensions_in_standalone_mode() {
|
||||
let handler: Arc<dyn GrpcQueryHandlerWithBoxedError> = Arc::new(ExtensionAwareHandler);
|
||||
let client =
|
||||
FrontendClient::from_grpc_handler(Arc::downgrade(&handler), QueryOptions::default());
|
||||
|
||||
let result = client
|
||||
.query_with_terminal_metrics(
|
||||
"greptime",
|
||||
"public",
|
||||
QueryRequest {
|
||||
query: Some(Query::Sql("insert into t select 1".to_string())),
|
||||
},
|
||||
&[("flow.return_region_seq", "true")],
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
assert!(result.metrics.is_ready());
|
||||
assert!(result.region_watermark_map().is_none());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_query_with_terminal_metrics_uses_standalone_snapshot_bounds() {
|
||||
let handler: Arc<dyn GrpcQueryHandlerWithBoxedError> = Arc::new(SnapshotBindingHandler);
|
||||
let client =
|
||||
FrontendClient::from_grpc_handler(Arc::downgrade(&handler), QueryOptions::default());
|
||||
|
||||
let result = client
|
||||
.query_with_terminal_metrics(
|
||||
"greptime",
|
||||
"public",
|
||||
QueryRequest {
|
||||
query: Some(Query::Sql("insert into t select * from src".to_string())),
|
||||
},
|
||||
&[("flow.return_region_seq", "true")],
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
assert!(result.metrics.is_ready());
|
||||
assert_eq!(
|
||||
result.region_watermark_map(),
|
||||
Some(HashMap::from([(42, 99)]))
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_query_with_terminal_metrics_rejects_invalid_flow_extensions() {
|
||||
let handler: Arc<dyn GrpcQueryHandlerWithBoxedError> = Arc::new(NoopHandler);
|
||||
let client =
|
||||
FrontendClient::from_grpc_handler(Arc::downgrade(&handler), QueryOptions::default());
|
||||
|
||||
let err = client
|
||||
.query_with_terminal_metrics(
|
||||
"greptime",
|
||||
"public",
|
||||
QueryRequest {
|
||||
query: Some(Query::Sql("select 1".to_string())),
|
||||
},
|
||||
&[("flow.return_region_seq", "not-a-bool")],
|
||||
)
|
||||
.await
|
||||
.unwrap_err();
|
||||
|
||||
assert!(format!("{err:?}").contains("Invalid value for flow.return_region_seq"));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -13,6 +13,7 @@
|
||||
// limitations under the License.
|
||||
|
||||
//! Batching mode task state, which changes frequently
|
||||
//!
|
||||
|
||||
use std::collections::BTreeMap;
|
||||
use std::time::Duration;
|
||||
@@ -199,12 +200,42 @@ impl DirtyTimeWindows {
|
||||
}
|
||||
|
||||
pub fn add_window(&mut self, start: Timestamp, end: Option<Timestamp>) {
|
||||
self.windows.insert(start, end);
|
||||
self.add_or_merge_window(start, end);
|
||||
}
|
||||
|
||||
pub fn add_windows(&mut self, time_ranges: Vec<(Timestamp, Timestamp)>) {
|
||||
for (start, end) in time_ranges {
|
||||
self.windows.insert(start, Some(end));
|
||||
self.add_or_merge_window(start, Some(end));
|
||||
}
|
||||
}
|
||||
|
||||
/// Add all dirty markers from another dirty-window set.
|
||||
pub fn add_dirty_windows(&mut self, dirty_windows: &DirtyTimeWindows) {
|
||||
for (start, end) in &dirty_windows.windows {
|
||||
self.add_or_merge_window(*start, *end);
|
||||
}
|
||||
}
|
||||
|
||||
fn add_or_merge_window(&mut self, start: Timestamp, end: Option<Timestamp>) {
|
||||
self.windows
|
||||
.entry(start)
|
||||
.and_modify(|current_end| {
|
||||
*current_end = Self::union_window_end(*current_end, end);
|
||||
})
|
||||
.or_insert(end);
|
||||
}
|
||||
|
||||
fn union_window_end(
|
||||
current_end: Option<Timestamp>,
|
||||
incoming_end: Option<Timestamp>,
|
||||
) -> Option<Timestamp> {
|
||||
match (current_end, incoming_end) {
|
||||
(Some(current), Some(incoming)) => Some(current.max(incoming)),
|
||||
// `None` is a dirty marker without a known upper bound. When one
|
||||
// side has a concrete end, keep it so merging a restored snapshot
|
||||
// never shrinks an already-known dirty range with the same start.
|
||||
(Some(end), None) | (None, Some(end)) => Some(end),
|
||||
(None, None) => None,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -216,7 +247,7 @@ impl DirtyTimeWindows {
|
||||
/// Set windows to be dirty, only useful for full aggr without time window
|
||||
/// to mark some new data is inserted
|
||||
pub fn set_dirty(&mut self) {
|
||||
self.windows.insert(Timestamp::new_second(0), None);
|
||||
self.add_or_merge_window(Timestamp::new_second(0), None);
|
||||
}
|
||||
|
||||
/// Number of dirty windows.
|
||||
|
||||
381
src/flow/src/batching_mode/table_creator.rs
Normal file
381
src/flow/src/batching_mode/table_creator.rs
Normal file
@@ -0,0 +1,381 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use api::v1::CreateTableExpr;
|
||||
use datafusion_common::tree_node::TreeNode;
|
||||
use datafusion_expr::LogicalPlan;
|
||||
use datatypes::prelude::ConcreteDataType;
|
||||
use datatypes::schema::ColumnSchema;
|
||||
use operator::expr_helper::column_schemas_to_defs;
|
||||
use snafu::ResultExt;
|
||||
|
||||
use crate::Error;
|
||||
use crate::adapter::{AUTO_CREATED_PLACEHOLDER_TS_COL, AUTO_CREATED_UPDATE_AT_TS_COL};
|
||||
use crate::batching_mode::utils::FindGroupByFinalName;
|
||||
use crate::error::{ConvertColumnSchemaSnafu, DatafusionSnafu};
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub enum QueryType {
|
||||
/// query is a tql query
|
||||
Tql,
|
||||
/// query is a sql query
|
||||
Sql,
|
||||
}
|
||||
|
||||
// auto created table have a auto added column `update_at`, and optional have a `AUTO_CREATED_PLACEHOLDER_TS_COL` column for time index placeholder if no timestamp column is specified
|
||||
// TODO(discord9): for now no default value is set for auto added column for compatibility reason with streaming mode, but this might change in favor of simpler code?
|
||||
pub(super) fn create_table_with_expr(
|
||||
plan: &LogicalPlan,
|
||||
sink_table_name: &[String; 3],
|
||||
query_type: &QueryType,
|
||||
) -> Result<CreateTableExpr, Error> {
|
||||
let table_def = match query_type {
|
||||
&QueryType::Sql => {
|
||||
if let Some(def) = build_pk_from_aggr(plan)? {
|
||||
def
|
||||
} else {
|
||||
build_by_sql_schema(plan)?
|
||||
}
|
||||
}
|
||||
QueryType::Tql => {
|
||||
// first try build from aggr, then from tql schema because tql query might not have aggr node
|
||||
if let Some(table_def) = build_pk_from_aggr(plan)? {
|
||||
table_def
|
||||
} else {
|
||||
build_by_tql_schema(plan)?
|
||||
}
|
||||
}
|
||||
};
|
||||
let first_time_stamp = table_def.ts_col;
|
||||
let primary_keys = table_def.pks;
|
||||
|
||||
let mut column_schemas = Vec::new();
|
||||
for field in plan.schema().fields() {
|
||||
let name = field.name();
|
||||
let ty = ConcreteDataType::from_arrow_type(field.data_type());
|
||||
let col_schema = if first_time_stamp == Some(name.clone()) {
|
||||
ColumnSchema::new(name, ty, false).with_time_index(true)
|
||||
} else {
|
||||
ColumnSchema::new(name, ty, true)
|
||||
};
|
||||
|
||||
match query_type {
|
||||
QueryType::Sql => {
|
||||
column_schemas.push(col_schema);
|
||||
}
|
||||
QueryType::Tql => {
|
||||
// if is val column, need to rename as val DOUBLE NULL
|
||||
// if is tag column, need to cast type as STRING NULL
|
||||
let is_tag_column = primary_keys.contains(name);
|
||||
let is_val_column = !is_tag_column && first_time_stamp.as_ref() != Some(name);
|
||||
if is_val_column {
|
||||
let col_schema =
|
||||
ColumnSchema::new(name, ConcreteDataType::float64_datatype(), true);
|
||||
column_schemas.push(col_schema);
|
||||
} else if is_tag_column {
|
||||
let col_schema =
|
||||
ColumnSchema::new(name, ConcreteDataType::string_datatype(), true);
|
||||
column_schemas.push(col_schema);
|
||||
} else {
|
||||
// time index column
|
||||
column_schemas.push(col_schema);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if query_type == &QueryType::Sql {
|
||||
let update_at_schema = ColumnSchema::new(
|
||||
AUTO_CREATED_UPDATE_AT_TS_COL,
|
||||
ConcreteDataType::timestamp_millisecond_datatype(),
|
||||
true,
|
||||
);
|
||||
column_schemas.push(update_at_schema);
|
||||
}
|
||||
|
||||
let time_index = if let Some(time_index) = first_time_stamp {
|
||||
time_index
|
||||
} else {
|
||||
column_schemas.push(
|
||||
ColumnSchema::new(
|
||||
AUTO_CREATED_PLACEHOLDER_TS_COL,
|
||||
ConcreteDataType::timestamp_millisecond_datatype(),
|
||||
false,
|
||||
)
|
||||
.with_time_index(true),
|
||||
);
|
||||
AUTO_CREATED_PLACEHOLDER_TS_COL.to_string()
|
||||
};
|
||||
|
||||
let column_defs =
|
||||
column_schemas_to_defs(column_schemas, &primary_keys).context(ConvertColumnSchemaSnafu)?;
|
||||
Ok(CreateTableExpr {
|
||||
catalog_name: sink_table_name[0].clone(),
|
||||
schema_name: sink_table_name[1].clone(),
|
||||
table_name: sink_table_name[2].clone(),
|
||||
desc: "Auto created table by flow engine".to_string(),
|
||||
column_defs,
|
||||
time_index,
|
||||
primary_keys,
|
||||
create_if_not_exists: true,
|
||||
table_options: Default::default(),
|
||||
table_id: None,
|
||||
engine: "mito".to_string(),
|
||||
})
|
||||
}
|
||||
|
||||
/// simply build by schema, return first timestamp column and no primary key
|
||||
fn build_by_sql_schema(plan: &LogicalPlan) -> Result<TableDef, Error> {
|
||||
let first_time_stamp = plan.schema().fields().iter().find_map(|f| {
|
||||
if ConcreteDataType::from_arrow_type(f.data_type()).is_timestamp() {
|
||||
Some(f.name().clone())
|
||||
} else {
|
||||
None
|
||||
}
|
||||
});
|
||||
Ok(TableDef {
|
||||
ts_col: first_time_stamp,
|
||||
pks: vec![],
|
||||
})
|
||||
}
|
||||
|
||||
/// Return first timestamp column found in output schema and all string columns
|
||||
fn build_by_tql_schema(plan: &LogicalPlan) -> Result<TableDef, Error> {
|
||||
let first_time_stamp = plan.schema().fields().iter().find_map(|f| {
|
||||
if ConcreteDataType::from_arrow_type(f.data_type()).is_timestamp() {
|
||||
Some(f.name().clone())
|
||||
} else {
|
||||
None
|
||||
}
|
||||
});
|
||||
let string_columns = plan
|
||||
.schema()
|
||||
.fields()
|
||||
.iter()
|
||||
.filter_map(|f| {
|
||||
if ConcreteDataType::from_arrow_type(f.data_type()).is_string() {
|
||||
Some(f.name().clone())
|
||||
} else {
|
||||
None
|
||||
}
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
Ok(TableDef {
|
||||
ts_col: first_time_stamp,
|
||||
pks: string_columns,
|
||||
})
|
||||
}
|
||||
|
||||
struct TableDef {
|
||||
ts_col: Option<String>,
|
||||
pks: Vec<String>,
|
||||
}
|
||||
|
||||
/// Return first timestamp column which is in group by clause and other columns which are also in group by clause
|
||||
///
|
||||
/// # Returns
|
||||
///
|
||||
/// * `Option<String>` - first timestamp column which is in group by clause
|
||||
/// * `Vec<String>` - other columns which are also in group by clause
|
||||
///
|
||||
/// if no aggregation found, return None
|
||||
fn build_pk_from_aggr(plan: &LogicalPlan) -> Result<Option<TableDef>, Error> {
|
||||
let fields = plan.schema().fields();
|
||||
let mut pk_names = FindGroupByFinalName::default();
|
||||
|
||||
plan.visit(&mut pk_names)
|
||||
.with_context(|_| DatafusionSnafu {
|
||||
context: format!("Can't find aggr expr in plan {plan:?}"),
|
||||
})?;
|
||||
|
||||
// if no group by clause, return empty with first timestamp column found in output schema
|
||||
let Some(pk_final_names) = pk_names.get_group_expr_names() else {
|
||||
return Ok(None);
|
||||
};
|
||||
if pk_final_names.is_empty() {
|
||||
let first_ts_col = fields
|
||||
.iter()
|
||||
.find(|f| ConcreteDataType::from_arrow_type(f.data_type()).is_timestamp())
|
||||
.map(|f| f.name().clone());
|
||||
return Ok(Some(TableDef {
|
||||
ts_col: first_ts_col,
|
||||
pks: vec![],
|
||||
}));
|
||||
}
|
||||
|
||||
let all_pk_cols: Vec<_> = fields
|
||||
.iter()
|
||||
.filter(|f| pk_final_names.contains(f.name()))
|
||||
.map(|f| f.name().clone())
|
||||
.collect();
|
||||
// Auto-created tables use the first timestamp column in the group-by keys
|
||||
// as the time index. It is possible that timestamp columns appear only as
|
||||
// aggregate outputs (for example `max(ts)`) and are not group-by keys; in
|
||||
// that case `first_time_stamp` stays `None` and the caller falls back to a
|
||||
// placeholder time index column.
|
||||
let first_time_stamp = fields
|
||||
.iter()
|
||||
.find(|f| {
|
||||
all_pk_cols.contains(&f.name().clone())
|
||||
&& ConcreteDataType::from_arrow_type(f.data_type()).is_timestamp()
|
||||
})
|
||||
.map(|f| f.name().clone());
|
||||
|
||||
let all_pk_cols: Vec<_> = all_pk_cols
|
||||
.into_iter()
|
||||
.filter(|col| first_time_stamp.as_ref() != Some(col))
|
||||
.collect();
|
||||
|
||||
Ok(Some(TableDef {
|
||||
ts_col: first_time_stamp,
|
||||
pks: all_pk_cols,
|
||||
}))
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use api::v1::column_def::try_as_column_schema;
|
||||
use datatypes::prelude::ConcreteDataType;
|
||||
use datatypes::schema::ColumnSchema;
|
||||
use pretty_assertions::assert_eq;
|
||||
use session::context::QueryContext;
|
||||
|
||||
use super::*;
|
||||
use crate::adapter::{AUTO_CREATED_PLACEHOLDER_TS_COL, AUTO_CREATED_UPDATE_AT_TS_COL};
|
||||
use crate::batching_mode::utils::sql_to_df_plan;
|
||||
use crate::test_utils::create_test_query_engine;
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_gen_create_table_sql() {
|
||||
let query_engine = create_test_query_engine();
|
||||
let ctx = QueryContext::arc();
|
||||
struct TestCase {
|
||||
sql: String,
|
||||
sink_table_name: String,
|
||||
column_schemas: Vec<ColumnSchema>,
|
||||
primary_keys: Vec<String>,
|
||||
time_index: String,
|
||||
}
|
||||
|
||||
let update_at_schema = ColumnSchema::new(
|
||||
AUTO_CREATED_UPDATE_AT_TS_COL,
|
||||
ConcreteDataType::timestamp_millisecond_datatype(),
|
||||
true,
|
||||
);
|
||||
|
||||
let ts_placeholder_schema = ColumnSchema::new(
|
||||
AUTO_CREATED_PLACEHOLDER_TS_COL,
|
||||
ConcreteDataType::timestamp_millisecond_datatype(),
|
||||
false,
|
||||
)
|
||||
.with_time_index(true);
|
||||
|
||||
let testcases = vec![
|
||||
TestCase {
|
||||
sql: "SELECT number, ts FROM numbers_with_ts".to_string(),
|
||||
sink_table_name: "new_table".to_string(),
|
||||
column_schemas: vec![
|
||||
ColumnSchema::new("number", ConcreteDataType::uint32_datatype(), true),
|
||||
ColumnSchema::new(
|
||||
"ts",
|
||||
ConcreteDataType::timestamp_millisecond_datatype(),
|
||||
false,
|
||||
)
|
||||
.with_time_index(true),
|
||||
update_at_schema.clone(),
|
||||
],
|
||||
primary_keys: vec![],
|
||||
time_index: "ts".to_string(),
|
||||
},
|
||||
TestCase {
|
||||
sql: "SELECT number, max(ts) FROM numbers_with_ts GROUP BY number".to_string(),
|
||||
sink_table_name: "new_table".to_string(),
|
||||
column_schemas: vec![
|
||||
ColumnSchema::new("number", ConcreteDataType::uint32_datatype(), true),
|
||||
ColumnSchema::new(
|
||||
"max(numbers_with_ts.ts)",
|
||||
ConcreteDataType::timestamp_millisecond_datatype(),
|
||||
true,
|
||||
),
|
||||
update_at_schema.clone(),
|
||||
ts_placeholder_schema.clone(),
|
||||
],
|
||||
primary_keys: vec!["number".to_string()],
|
||||
time_index: AUTO_CREATED_PLACEHOLDER_TS_COL.to_string(),
|
||||
},
|
||||
TestCase {
|
||||
sql: "SELECT max(number), ts FROM numbers_with_ts GROUP BY ts".to_string(),
|
||||
sink_table_name: "new_table".to_string(),
|
||||
column_schemas: vec![
|
||||
ColumnSchema::new(
|
||||
"max(numbers_with_ts.number)",
|
||||
ConcreteDataType::uint32_datatype(),
|
||||
true,
|
||||
),
|
||||
ColumnSchema::new(
|
||||
"ts",
|
||||
ConcreteDataType::timestamp_millisecond_datatype(),
|
||||
false,
|
||||
)
|
||||
.with_time_index(true),
|
||||
update_at_schema.clone(),
|
||||
],
|
||||
primary_keys: vec![],
|
||||
time_index: "ts".to_string(),
|
||||
},
|
||||
TestCase {
|
||||
sql: "SELECT number, ts FROM numbers_with_ts GROUP BY ts, number".to_string(),
|
||||
sink_table_name: "new_table".to_string(),
|
||||
column_schemas: vec![
|
||||
ColumnSchema::new("number", ConcreteDataType::uint32_datatype(), true),
|
||||
ColumnSchema::new(
|
||||
"ts",
|
||||
ConcreteDataType::timestamp_millisecond_datatype(),
|
||||
false,
|
||||
)
|
||||
.with_time_index(true),
|
||||
update_at_schema.clone(),
|
||||
],
|
||||
primary_keys: vec!["number".to_string()],
|
||||
time_index: "ts".to_string(),
|
||||
},
|
||||
];
|
||||
|
||||
for tc in testcases {
|
||||
let plan = sql_to_df_plan(ctx.clone(), query_engine.clone(), &tc.sql, true)
|
||||
.await
|
||||
.unwrap();
|
||||
let expr = create_table_with_expr(
|
||||
&plan,
|
||||
&[
|
||||
"greptime".to_string(),
|
||||
"public".to_string(),
|
||||
tc.sink_table_name.clone(),
|
||||
],
|
||||
&QueryType::Sql,
|
||||
)
|
||||
.unwrap();
|
||||
// TODO(discord9): assert expr
|
||||
let column_schemas = expr
|
||||
.column_defs
|
||||
.iter()
|
||||
.map(|c| try_as_column_schema(c).unwrap())
|
||||
.collect::<Vec<_>>();
|
||||
assert_eq!(tc.column_schemas, column_schemas, "{:?}", tc.sql);
|
||||
assert_eq!(tc.primary_keys, expr.primary_keys, "{:?}", tc.sql);
|
||||
assert_eq!(tc.time_index, expr.time_index, "{:?}", tc.sql);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -28,13 +28,11 @@ use datafusion::sql::unparser::expr_to_sql;
|
||||
use datafusion_common::DFSchemaRef;
|
||||
use datafusion_common::tree_node::{Transformed, TreeNode};
|
||||
use datafusion_expr::{DmlStatement, LogicalPlan, WriteOp};
|
||||
use datatypes::prelude::ConcreteDataType;
|
||||
use datatypes::schema::{ColumnSchema, Schema};
|
||||
use operator::expr_helper::column_schemas_to_defs;
|
||||
use datatypes::schema::Schema;
|
||||
use query::QueryEngineRef;
|
||||
use query::query_engine::DefaultSerializer;
|
||||
use session::context::QueryContextRef;
|
||||
use snafu::{OptionExt, ResultExt, ensure};
|
||||
use snafu::{OptionExt, ResultExt};
|
||||
use sql::parsers::utils::is_tql;
|
||||
use store_api::mito_engine_options::MERGE_MODE_KEY;
|
||||
use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan};
|
||||
@@ -43,19 +41,19 @@ use tokio::sync::oneshot;
|
||||
use tokio::sync::oneshot::error::TryRecvError;
|
||||
use tokio::time::Instant;
|
||||
|
||||
use crate::adapter::{AUTO_CREATED_PLACEHOLDER_TS_COL, AUTO_CREATED_UPDATE_AT_TS_COL};
|
||||
use crate::batching_mode::BatchingModeOptions;
|
||||
use crate::batching_mode::frontend_client::FrontendClient;
|
||||
use crate::batching_mode::state::{FilterExprInfo, TaskState};
|
||||
use crate::batching_mode::state::{DirtyTimeWindows, FilterExprInfo, TaskState};
|
||||
use crate::batching_mode::table_creator::{QueryType, create_table_with_expr};
|
||||
use crate::batching_mode::time_window::TimeWindowExpr;
|
||||
use crate::batching_mode::utils::{
|
||||
AddFilterRewriter, ColumnMatcherRewriter, FindGroupByFinalName, gen_plan_with_matching_schema,
|
||||
AddFilterRewriter, ColumnMatcherRewriter, gen_plan_with_matching_schema,
|
||||
get_table_info_df_schema, sql_to_df_plan,
|
||||
};
|
||||
use crate::df_optimizer::apply_df_optimizer;
|
||||
use crate::error::{
|
||||
ConvertColumnSchemaSnafu, DatafusionSnafu, ExternalSnafu, InvalidQuerySnafu,
|
||||
SubstraitEncodeLogicalPlanSnafu, UnexpectedSnafu,
|
||||
DatafusionSnafu, ExternalSnafu, InvalidQuerySnafu, SubstraitEncodeLogicalPlanSnafu,
|
||||
UnexpectedSnafu,
|
||||
};
|
||||
use crate::metrics::{
|
||||
METRIC_FLOW_BATCHING_ENGINE_ERROR_CNT, METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME,
|
||||
@@ -100,14 +98,6 @@ fn is_merge_mode_last_non_null(options: &HashMap<String, String>) -> bool {
|
||||
.unwrap_or(false)
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub enum QueryType {
|
||||
/// query is a tql query
|
||||
Tql,
|
||||
/// query is a sql query
|
||||
Sql,
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct BatchingTask {
|
||||
pub config: Arc<TaskConfig>,
|
||||
@@ -132,7 +122,20 @@ pub struct TaskArgs<'a> {
|
||||
|
||||
pub struct PlanInfo {
|
||||
pub plan: LogicalPlan,
|
||||
pub filter: Option<FilterExprInfo>,
|
||||
pub dirty_restore: DirtyRestore,
|
||||
}
|
||||
|
||||
pub enum DirtyRestore {
|
||||
/// The query was scoped to dirty time ranges; restore those ranges if the
|
||||
/// run fails.
|
||||
Scoped(FilterExprInfo),
|
||||
/// The query could not be scoped to dirty time ranges, so the dirty-window
|
||||
/// state is only a dirty signal. Restore the consumed signal if the full
|
||||
/// run fails.
|
||||
///
|
||||
/// TODO(discord9): Full-query runs only need a dirty bool flag. Refactor
|
||||
/// the unscoped path to stop reusing `DirtyTimeWindows` for this signal.
|
||||
Unscoped(DirtyTimeWindows),
|
||||
}
|
||||
|
||||
impl BatchingTask {
|
||||
@@ -210,7 +213,7 @@ impl BatchingTask {
|
||||
&self,
|
||||
engine: &QueryEngineRef,
|
||||
frontend_client: &Arc<FrontendClient>,
|
||||
) -> Result<Option<(u32, Duration)>, Error> {
|
||||
) -> Result<Option<(usize, Duration)>, Error> {
|
||||
if !self.is_table_exist(&self.config.sink_table_name).await? {
|
||||
let create_table = self.gen_create_table_expr(engine.clone()).await?;
|
||||
info!(
|
||||
@@ -241,11 +244,19 @@ impl BatchingTask {
|
||||
engine: &QueryEngineRef,
|
||||
frontend_client: &Arc<FrontendClient>,
|
||||
max_window_cnt: Option<usize>,
|
||||
) -> Result<Option<(u32, Duration)>, Error> {
|
||||
) -> Result<Option<(usize, Duration)>, Error> {
|
||||
if let Some(new_query) = self.gen_insert_plan(engine, max_window_cnt).await? {
|
||||
debug!("Generate new query: {}", new_query.plan);
|
||||
self.execute_logical_plan(frontend_client, &new_query.plan)
|
||||
match self
|
||||
.execute_logical_plan(frontend_client, &new_query.plan)
|
||||
.await
|
||||
{
|
||||
Ok(result) => Ok(result),
|
||||
Err(err) => {
|
||||
self.handle_executed_query_failure(Some(&new_query));
|
||||
Err(err)
|
||||
}
|
||||
}
|
||||
} else {
|
||||
debug!("Generate no query");
|
||||
Ok(None)
|
||||
@@ -278,57 +289,66 @@ impl BatchingTask {
|
||||
)
|
||||
.await?;
|
||||
|
||||
let insert_into_info = if let Some(new_query) = new_query {
|
||||
// first check if all columns in input query exists in sink table
|
||||
// since insert into ref to names in record batch generate by given query
|
||||
let table_columns = df_schema
|
||||
.columns()
|
||||
.into_iter()
|
||||
.map(|c| c.name)
|
||||
.collect::<BTreeSet<_>>();
|
||||
for column in new_query.plan.schema().columns() {
|
||||
ensure!(
|
||||
table_columns.contains(column.name()),
|
||||
InvalidQuerySnafu {
|
||||
reason: format!(
|
||||
"Column {} not found in sink table with columns {:?}",
|
||||
column, table_columns
|
||||
),
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
let table_provider = Arc::new(DfTableProviderAdapter::new(table));
|
||||
let table_source = Arc::new(DefaultTableSource::new(table_provider));
|
||||
|
||||
// update_at& time index placeholder (if exists) should have default value
|
||||
let plan = LogicalPlan::Dml(DmlStatement::new(
|
||||
datafusion_common::TableReference::Full {
|
||||
catalog: self.config.sink_table_name[0].clone().into(),
|
||||
schema: self.config.sink_table_name[1].clone().into(),
|
||||
table: self.config.sink_table_name[2].clone().into(),
|
||||
},
|
||||
table_source,
|
||||
WriteOp::Insert(datafusion_expr::dml::InsertOp::Append),
|
||||
Arc::new(new_query.plan),
|
||||
));
|
||||
PlanInfo {
|
||||
plan,
|
||||
filter: new_query.filter,
|
||||
}
|
||||
} else {
|
||||
let Some(new_query) = new_query else {
|
||||
return Ok(None);
|
||||
};
|
||||
let insert_into = insert_into_info
|
||||
.plan
|
||||
.recompute_schema()
|
||||
.context(DatafusionSnafu {
|
||||
context: "Failed to recompute schema",
|
||||
})?;
|
||||
|
||||
// first check if all columns in input query exists in sink table
|
||||
// since insert into ref to names in record batch generate by given query
|
||||
let table_columns = df_schema
|
||||
.columns()
|
||||
.into_iter()
|
||||
.map(|c| c.name)
|
||||
.collect::<BTreeSet<_>>();
|
||||
for column in new_query.plan.schema().columns() {
|
||||
if !table_columns.contains(column.name()) {
|
||||
self.restore_dirty_windows_after_failure(&new_query);
|
||||
return InvalidQuerySnafu {
|
||||
reason: format!(
|
||||
"Column {} not found in sink table with columns {:?}",
|
||||
column, table_columns
|
||||
),
|
||||
}
|
||||
.fail();
|
||||
}
|
||||
}
|
||||
|
||||
let table_provider = Arc::new(DfTableProviderAdapter::new(table));
|
||||
let table_source = Arc::new(DefaultTableSource::new(table_provider));
|
||||
|
||||
// update_at& time index placeholder (if exists) should have default value
|
||||
let plan = LogicalPlan::Dml(DmlStatement::new(
|
||||
datafusion_common::TableReference::Full {
|
||||
catalog: self.config.sink_table_name[0].clone().into(),
|
||||
schema: self.config.sink_table_name[1].clone().into(),
|
||||
table: self.config.sink_table_name[2].clone().into(),
|
||||
},
|
||||
table_source,
|
||||
WriteOp::Insert(datafusion_expr::dml::InsertOp::Append),
|
||||
Arc::new(new_query.plan.clone()),
|
||||
));
|
||||
let insert_into_info = PlanInfo {
|
||||
plan,
|
||||
dirty_restore: new_query.dirty_restore,
|
||||
};
|
||||
let insert_into =
|
||||
match insert_into_info
|
||||
.plan
|
||||
.clone()
|
||||
.recompute_schema()
|
||||
.context(DatafusionSnafu {
|
||||
context: "Failed to recompute schema",
|
||||
}) {
|
||||
Ok(insert_into) => insert_into,
|
||||
Err(err) => {
|
||||
self.restore_dirty_windows_after_failure(&insert_into_info);
|
||||
return Err(err);
|
||||
}
|
||||
};
|
||||
|
||||
Ok(Some(PlanInfo {
|
||||
plan: insert_into,
|
||||
filter: insert_into_info.filter,
|
||||
dirty_restore: insert_into_info.dirty_restore,
|
||||
}))
|
||||
}
|
||||
|
||||
@@ -349,7 +369,7 @@ impl BatchingTask {
|
||||
&self,
|
||||
frontend_client: &Arc<FrontendClient>,
|
||||
plan: &LogicalPlan,
|
||||
) -> Result<Option<(u32, Duration)>, Error> {
|
||||
) -> Result<Option<(usize, Duration)>, Error> {
|
||||
let instant = Instant::now();
|
||||
let flow_id = self.config.flow_id;
|
||||
|
||||
@@ -385,7 +405,6 @@ impl BatchingTask {
|
||||
.with_label_values(&[flow_id.to_string().as_str()])
|
||||
.start_timer();
|
||||
|
||||
// hack and special handling the insert logical plan
|
||||
let req = if let Some((insert_to, insert_plan)) =
|
||||
breakup_insert_plan(&plan, catalog, schema)
|
||||
{
|
||||
@@ -451,8 +470,46 @@ impl BatchingTask {
|
||||
.after_query_exec(elapsed, res.is_ok());
|
||||
|
||||
let res = res?;
|
||||
Ok(Some((res as usize, elapsed)))
|
||||
}
|
||||
|
||||
Ok(Some((res, elapsed)))
|
||||
/// Restore dirty windows consumed by a failed query so they are retried on
|
||||
/// the next execution.
|
||||
///
|
||||
fn restore_dirty_windows_after_failure(&self, query: &PlanInfo) {
|
||||
match &query.dirty_restore {
|
||||
DirtyRestore::Scoped(filter) => self.restore_scoped_dirty_windows(filter),
|
||||
DirtyRestore::Unscoped(dirty_windows) => self
|
||||
.state
|
||||
.write()
|
||||
.unwrap()
|
||||
.dirty_time_windows
|
||||
.add_dirty_windows(dirty_windows),
|
||||
}
|
||||
}
|
||||
|
||||
fn restore_scoped_dirty_windows(&self, filter: &FilterExprInfo) {
|
||||
self.state
|
||||
.write()
|
||||
.unwrap()
|
||||
.dirty_time_windows
|
||||
.add_windows(filter.time_ranges.clone());
|
||||
}
|
||||
|
||||
fn restore_scoped_dirty_windows_on_err<T>(
|
||||
&self,
|
||||
filter: &FilterExprInfo,
|
||||
result: Result<T, Error>,
|
||||
) -> Result<T, Error> {
|
||||
result.inspect_err(|_| {
|
||||
self.restore_scoped_dirty_windows(filter);
|
||||
})
|
||||
}
|
||||
|
||||
fn handle_executed_query_failure(&self, query: Option<&PlanInfo>) {
|
||||
if let Some(query) = query {
|
||||
self.restore_dirty_windows_after_failure(query);
|
||||
}
|
||||
}
|
||||
|
||||
/// start executing query in a loop, break when receive shutdown signal
|
||||
@@ -558,16 +615,13 @@ impl BatchingTask {
|
||||
}
|
||||
// TODO(discord9): this error should have better place to go, but for now just print error, also more context is needed
|
||||
Err(err) => {
|
||||
self.handle_executed_query_failure(new_query.as_ref());
|
||||
METRIC_FLOW_BATCHING_ENGINE_ERROR_CNT
|
||||
.with_label_values(&[&flow_id_str])
|
||||
.inc();
|
||||
match new_query {
|
||||
Some(query) => {
|
||||
common_telemetry::error!(err; "Failed to execute query for flow={} with query: {}", self.config.flow_id, query.plan);
|
||||
// Re-add dirty windows back since query failed
|
||||
self.state.write().unwrap().dirty_time_windows.add_windows(
|
||||
query.filter.map(|f| f.time_ranges).unwrap_or_default(),
|
||||
);
|
||||
// TODO(discord9): add some backoff here? half the query time window or what
|
||||
// backoff meaning use smaller `max_window_cnt` for next query
|
||||
|
||||
@@ -641,8 +695,13 @@ impl BatchingTask {
|
||||
self.config.flow_id
|
||||
);
|
||||
// clean dirty time window too, this could be from create flow's check_execute
|
||||
let is_dirty = !self.state.read().unwrap().dirty_time_windows.is_empty();
|
||||
self.state.write().unwrap().dirty_time_windows.clean();
|
||||
let (is_dirty, dirty_windows_to_restore) = {
|
||||
let mut state = self.state.write().unwrap();
|
||||
let dirty_windows_to_restore = state.dirty_time_windows.clone();
|
||||
let is_dirty = !dirty_windows_to_restore.is_empty();
|
||||
state.dirty_time_windows.clean();
|
||||
(is_dirty, dirty_windows_to_restore)
|
||||
};
|
||||
|
||||
if !is_dirty {
|
||||
// no dirty data, hence no need to update
|
||||
@@ -650,7 +709,7 @@ impl BatchingTask {
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
let plan = gen_plan_with_matching_schema(
|
||||
let plan = match gen_plan_with_matching_schema(
|
||||
&self.config.query,
|
||||
query_ctx,
|
||||
engine,
|
||||
@@ -658,15 +717,35 @@ impl BatchingTask {
|
||||
primary_key_indices,
|
||||
allow_partial,
|
||||
)
|
||||
.await?;
|
||||
.await
|
||||
{
|
||||
Ok(plan) => plan,
|
||||
Err(err) => {
|
||||
self.state
|
||||
.write()
|
||||
.unwrap()
|
||||
.dirty_time_windows
|
||||
.add_dirty_windows(&dirty_windows_to_restore);
|
||||
return Err(err);
|
||||
}
|
||||
};
|
||||
|
||||
return Ok(Some(PlanInfo { plan, filter: None }));
|
||||
return Ok(Some(PlanInfo {
|
||||
plan,
|
||||
dirty_restore: DirtyRestore::Unscoped(dirty_windows_to_restore),
|
||||
}));
|
||||
}
|
||||
_ => {
|
||||
// clean for tql have no use for time window
|
||||
self.state.write().unwrap().dirty_time_windows.clean();
|
||||
// Clean dirty windows for full-query/non-scoped paths,
|
||||
// such as TQL, that cannot use a time-window filter.
|
||||
let dirty_windows_to_restore = {
|
||||
let mut state = self.state.write().unwrap();
|
||||
let dirty_windows_to_restore = state.dirty_time_windows.clone();
|
||||
state.dirty_time_windows.clean();
|
||||
dirty_windows_to_restore
|
||||
};
|
||||
|
||||
let plan = gen_plan_with_matching_schema(
|
||||
let plan = match gen_plan_with_matching_schema(
|
||||
&self.config.query,
|
||||
query_ctx,
|
||||
engine,
|
||||
@@ -674,9 +753,23 @@ impl BatchingTask {
|
||||
primary_key_indices,
|
||||
allow_partial,
|
||||
)
|
||||
.await?;
|
||||
.await
|
||||
{
|
||||
Ok(plan) => plan,
|
||||
Err(err) => {
|
||||
self.state
|
||||
.write()
|
||||
.unwrap()
|
||||
.dirty_time_windows
|
||||
.add_dirty_windows(&dirty_windows_to_restore);
|
||||
return Err(err);
|
||||
}
|
||||
};
|
||||
|
||||
return Ok(Some(PlanInfo { plan, filter: None }));
|
||||
return Ok(Some(PlanInfo {
|
||||
plan,
|
||||
dirty_restore: DirtyRestore::Unscoped(dirty_windows_to_restore),
|
||||
}));
|
||||
}
|
||||
};
|
||||
|
||||
@@ -721,25 +814,21 @@ impl BatchingTask {
|
||||
Some(self),
|
||||
)?;
|
||||
|
||||
debug!(
|
||||
"Flow id={:?}, Generated filter expr: {:?}",
|
||||
self.config.flow_id,
|
||||
expr.as_ref()
|
||||
.map(
|
||||
|expr| expr_to_sql(&expr.expr).with_context(|_| DatafusionSnafu {
|
||||
context: format!("Failed to generate filter expr from {expr:?}"),
|
||||
})
|
||||
)
|
||||
.transpose()?
|
||||
.map(|s| s.to_string())
|
||||
);
|
||||
|
||||
let Some(expr) = expr else {
|
||||
// no new data, hence no need to update
|
||||
debug!("Flow id={:?}, no new data, not update", self.config.flow_id);
|
||||
return Ok(None);
|
||||
};
|
||||
|
||||
let filter_sql = expr_to_sql(&expr.expr)
|
||||
.map(|sql| sql.to_string())
|
||||
.unwrap_or_else(|err| format!("<failed to format filter expr: {err}>"));
|
||||
|
||||
debug!(
|
||||
"Flow id={:?}, Generated filter expr: {:?}",
|
||||
self.config.flow_id, filter_sql
|
||||
);
|
||||
|
||||
let mut add_filter = AddFilterRewriter::new(expr.expr.clone());
|
||||
let mut add_auto_column = ColumnMatcherRewriter::new(
|
||||
sink_table_schema.clone(),
|
||||
@@ -747,363 +836,34 @@ impl BatchingTask {
|
||||
allow_partial,
|
||||
);
|
||||
|
||||
let plan =
|
||||
sql_to_df_plan(query_ctx.clone(), engine.clone(), &self.config.query, false).await?;
|
||||
let rewrite = plan
|
||||
.clone()
|
||||
.rewrite(&mut add_filter)
|
||||
.and_then(|p| p.data.rewrite(&mut add_auto_column))
|
||||
.with_context(|_| DatafusionSnafu {
|
||||
context: format!("Failed to rewrite plan:\n {}\n", plan),
|
||||
})?
|
||||
.data;
|
||||
let plan = self.restore_scoped_dirty_windows_on_err(
|
||||
&expr,
|
||||
sql_to_df_plan(query_ctx.clone(), engine.clone(), &self.config.query, false).await,
|
||||
)?;
|
||||
let rewrite = self.restore_scoped_dirty_windows_on_err(
|
||||
&expr,
|
||||
plan.clone()
|
||||
.rewrite(&mut add_filter)
|
||||
.and_then(|p| p.data.rewrite(&mut add_auto_column))
|
||||
.with_context(|_| DatafusionSnafu {
|
||||
context: format!("Failed to rewrite plan:\n {}\n", plan),
|
||||
})
|
||||
.map(|rewrite| rewrite.data),
|
||||
)?;
|
||||
// only apply optimize after complex rewrite is done
|
||||
let new_plan = apply_df_optimizer(rewrite, &query_ctx).await?;
|
||||
let new_plan = self.restore_scoped_dirty_windows_on_err(
|
||||
&expr,
|
||||
apply_df_optimizer(rewrite, &query_ctx).await,
|
||||
)?;
|
||||
|
||||
let info = PlanInfo {
|
||||
plan: new_plan.clone(),
|
||||
filter: Some(expr),
|
||||
dirty_restore: DirtyRestore::Scoped(expr),
|
||||
};
|
||||
|
||||
Ok(Some(info))
|
||||
}
|
||||
}
|
||||
|
||||
// auto created table have a auto added column `update_at`, and optional have a `AUTO_CREATED_PLACEHOLDER_TS_COL` column for time index placeholder if no timestamp column is specified
|
||||
// TODO(discord9): for now no default value is set for auto added column for compatibility reason with streaming mode, but this might change in favor of simpler code?
|
||||
fn create_table_with_expr(
|
||||
plan: &LogicalPlan,
|
||||
sink_table_name: &[String; 3],
|
||||
query_type: &QueryType,
|
||||
) -> Result<CreateTableExpr, Error> {
|
||||
let table_def = match query_type {
|
||||
&QueryType::Sql => {
|
||||
if let Some(def) = build_pk_from_aggr(plan)? {
|
||||
def
|
||||
} else {
|
||||
build_by_sql_schema(plan)?
|
||||
}
|
||||
}
|
||||
QueryType::Tql => {
|
||||
// first try build from aggr, then from tql schema because tql query might not have aggr node
|
||||
if let Some(table_def) = build_pk_from_aggr(plan)? {
|
||||
table_def
|
||||
} else {
|
||||
build_by_tql_schema(plan)?
|
||||
}
|
||||
}
|
||||
};
|
||||
let first_time_stamp = table_def.ts_col;
|
||||
let primary_keys = table_def.pks;
|
||||
|
||||
let mut column_schemas = Vec::new();
|
||||
for field in plan.schema().fields() {
|
||||
let name = field.name();
|
||||
let ty = ConcreteDataType::from_arrow_type(field.data_type());
|
||||
let col_schema = if first_time_stamp == Some(name.clone()) {
|
||||
ColumnSchema::new(name, ty, false).with_time_index(true)
|
||||
} else {
|
||||
ColumnSchema::new(name, ty, true)
|
||||
};
|
||||
|
||||
match query_type {
|
||||
QueryType::Sql => {
|
||||
column_schemas.push(col_schema);
|
||||
}
|
||||
QueryType::Tql => {
|
||||
// if is val column, need to rename as val DOUBLE NULL
|
||||
// if is tag column, need to cast type as STRING NULL
|
||||
let is_tag_column = primary_keys.contains(name);
|
||||
let is_val_column = !is_tag_column && first_time_stamp.as_ref() != Some(name);
|
||||
if is_val_column {
|
||||
let col_schema =
|
||||
ColumnSchema::new(name, ConcreteDataType::float64_datatype(), true);
|
||||
column_schemas.push(col_schema);
|
||||
} else if is_tag_column {
|
||||
let col_schema =
|
||||
ColumnSchema::new(name, ConcreteDataType::string_datatype(), true);
|
||||
column_schemas.push(col_schema);
|
||||
} else {
|
||||
// time index column
|
||||
column_schemas.push(col_schema);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if query_type == &QueryType::Sql {
|
||||
let update_at_schema = ColumnSchema::new(
|
||||
AUTO_CREATED_UPDATE_AT_TS_COL,
|
||||
ConcreteDataType::timestamp_millisecond_datatype(),
|
||||
true,
|
||||
);
|
||||
column_schemas.push(update_at_schema);
|
||||
}
|
||||
|
||||
let time_index = if let Some(time_index) = first_time_stamp {
|
||||
time_index
|
||||
} else {
|
||||
column_schemas.push(
|
||||
ColumnSchema::new(
|
||||
AUTO_CREATED_PLACEHOLDER_TS_COL,
|
||||
ConcreteDataType::timestamp_millisecond_datatype(),
|
||||
false,
|
||||
)
|
||||
.with_time_index(true),
|
||||
);
|
||||
AUTO_CREATED_PLACEHOLDER_TS_COL.to_string()
|
||||
};
|
||||
|
||||
let column_defs =
|
||||
column_schemas_to_defs(column_schemas, &primary_keys).context(ConvertColumnSchemaSnafu)?;
|
||||
Ok(CreateTableExpr {
|
||||
catalog_name: sink_table_name[0].clone(),
|
||||
schema_name: sink_table_name[1].clone(),
|
||||
table_name: sink_table_name[2].clone(),
|
||||
desc: "Auto created table by flow engine".to_string(),
|
||||
column_defs,
|
||||
time_index,
|
||||
primary_keys,
|
||||
create_if_not_exists: true,
|
||||
table_options: Default::default(),
|
||||
table_id: None,
|
||||
engine: "mito".to_string(),
|
||||
})
|
||||
}
|
||||
|
||||
/// simply build by schema, return first timestamp column and no primary key
|
||||
fn build_by_sql_schema(plan: &LogicalPlan) -> Result<TableDef, Error> {
|
||||
let first_time_stamp = plan.schema().fields().iter().find_map(|f| {
|
||||
if ConcreteDataType::from_arrow_type(f.data_type()).is_timestamp() {
|
||||
Some(f.name().clone())
|
||||
} else {
|
||||
None
|
||||
}
|
||||
});
|
||||
Ok(TableDef {
|
||||
ts_col: first_time_stamp,
|
||||
pks: vec![],
|
||||
})
|
||||
}
|
||||
|
||||
/// Return first timestamp column found in output schema and all string columns
|
||||
fn build_by_tql_schema(plan: &LogicalPlan) -> Result<TableDef, Error> {
|
||||
let first_time_stamp = plan.schema().fields().iter().find_map(|f| {
|
||||
if ConcreteDataType::from_arrow_type(f.data_type()).is_timestamp() {
|
||||
Some(f.name().clone())
|
||||
} else {
|
||||
None
|
||||
}
|
||||
});
|
||||
let string_columns = plan
|
||||
.schema()
|
||||
.fields()
|
||||
.iter()
|
||||
.filter_map(|f| {
|
||||
if ConcreteDataType::from_arrow_type(f.data_type()).is_string() {
|
||||
Some(f.name().clone())
|
||||
} else {
|
||||
None
|
||||
}
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
Ok(TableDef {
|
||||
ts_col: first_time_stamp,
|
||||
pks: string_columns,
|
||||
})
|
||||
}
|
||||
|
||||
struct TableDef {
|
||||
ts_col: Option<String>,
|
||||
pks: Vec<String>,
|
||||
}
|
||||
|
||||
/// Return first timestamp column which is in group by clause and other columns which are also in group by clause
|
||||
///
|
||||
/// # Returns
|
||||
///
|
||||
/// * `Option<String>` - first timestamp column which is in group by clause
|
||||
/// * `Vec<String>` - other columns which are also in group by clause
|
||||
///
|
||||
/// if no aggregation found, return None
|
||||
fn build_pk_from_aggr(plan: &LogicalPlan) -> Result<Option<TableDef>, Error> {
|
||||
let fields = plan.schema().fields();
|
||||
let mut pk_names = FindGroupByFinalName::default();
|
||||
|
||||
plan.visit(&mut pk_names)
|
||||
.with_context(|_| DatafusionSnafu {
|
||||
context: format!("Can't find aggr expr in plan {plan:?}"),
|
||||
})?;
|
||||
|
||||
// if no group by clause, return empty with first timestamp column found in output schema
|
||||
let Some(pk_final_names) = pk_names.get_group_expr_names() else {
|
||||
return Ok(None);
|
||||
};
|
||||
if pk_final_names.is_empty() {
|
||||
let first_ts_col = fields
|
||||
.iter()
|
||||
.find(|f| ConcreteDataType::from_arrow_type(f.data_type()).is_timestamp())
|
||||
.map(|f| f.name().clone());
|
||||
return Ok(Some(TableDef {
|
||||
ts_col: first_ts_col,
|
||||
pks: vec![],
|
||||
}));
|
||||
}
|
||||
|
||||
let all_pk_cols: Vec<_> = fields
|
||||
.iter()
|
||||
.filter(|f| pk_final_names.contains(f.name()))
|
||||
.map(|f| f.name().clone())
|
||||
.collect();
|
||||
// auto create table use first timestamp column in group by clause as time index
|
||||
let first_time_stamp = fields
|
||||
.iter()
|
||||
.find(|f| {
|
||||
all_pk_cols.contains(&f.name().clone())
|
||||
&& ConcreteDataType::from_arrow_type(f.data_type()).is_timestamp()
|
||||
})
|
||||
.map(|f| f.name().clone());
|
||||
|
||||
let all_pk_cols: Vec<_> = all_pk_cols
|
||||
.into_iter()
|
||||
.filter(|col| first_time_stamp.as_ref() != Some(col))
|
||||
.collect();
|
||||
|
||||
Ok(Some(TableDef {
|
||||
ts_col: first_time_stamp,
|
||||
pks: all_pk_cols,
|
||||
}))
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use api::v1::column_def::try_as_column_schema;
|
||||
use pretty_assertions::assert_eq;
|
||||
use session::context::QueryContext;
|
||||
|
||||
use super::*;
|
||||
use crate::test_utils::create_test_query_engine;
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_gen_create_table_sql() {
|
||||
let query_engine = create_test_query_engine();
|
||||
let ctx = QueryContext::arc();
|
||||
struct TestCase {
|
||||
sql: String,
|
||||
sink_table_name: String,
|
||||
column_schemas: Vec<ColumnSchema>,
|
||||
primary_keys: Vec<String>,
|
||||
time_index: String,
|
||||
}
|
||||
|
||||
let update_at_schema = ColumnSchema::new(
|
||||
AUTO_CREATED_UPDATE_AT_TS_COL,
|
||||
ConcreteDataType::timestamp_millisecond_datatype(),
|
||||
true,
|
||||
);
|
||||
|
||||
let ts_placeholder_schema = ColumnSchema::new(
|
||||
AUTO_CREATED_PLACEHOLDER_TS_COL,
|
||||
ConcreteDataType::timestamp_millisecond_datatype(),
|
||||
false,
|
||||
)
|
||||
.with_time_index(true);
|
||||
|
||||
let testcases = vec![
|
||||
TestCase {
|
||||
sql: "SELECT number, ts FROM numbers_with_ts".to_string(),
|
||||
sink_table_name: "new_table".to_string(),
|
||||
column_schemas: vec![
|
||||
ColumnSchema::new("number", ConcreteDataType::uint32_datatype(), true),
|
||||
ColumnSchema::new(
|
||||
"ts",
|
||||
ConcreteDataType::timestamp_millisecond_datatype(),
|
||||
false,
|
||||
)
|
||||
.with_time_index(true),
|
||||
update_at_schema.clone(),
|
||||
],
|
||||
primary_keys: vec![],
|
||||
time_index: "ts".to_string(),
|
||||
},
|
||||
TestCase {
|
||||
sql: "SELECT number, max(ts) FROM numbers_with_ts GROUP BY number".to_string(),
|
||||
sink_table_name: "new_table".to_string(),
|
||||
column_schemas: vec![
|
||||
ColumnSchema::new("number", ConcreteDataType::uint32_datatype(), true),
|
||||
ColumnSchema::new(
|
||||
"max(numbers_with_ts.ts)",
|
||||
ConcreteDataType::timestamp_millisecond_datatype(),
|
||||
true,
|
||||
),
|
||||
update_at_schema.clone(),
|
||||
ts_placeholder_schema.clone(),
|
||||
],
|
||||
primary_keys: vec!["number".to_string()],
|
||||
time_index: AUTO_CREATED_PLACEHOLDER_TS_COL.to_string(),
|
||||
},
|
||||
TestCase {
|
||||
sql: "SELECT max(number), ts FROM numbers_with_ts GROUP BY ts".to_string(),
|
||||
sink_table_name: "new_table".to_string(),
|
||||
column_schemas: vec![
|
||||
ColumnSchema::new(
|
||||
"max(numbers_with_ts.number)",
|
||||
ConcreteDataType::uint32_datatype(),
|
||||
true,
|
||||
),
|
||||
ColumnSchema::new(
|
||||
"ts",
|
||||
ConcreteDataType::timestamp_millisecond_datatype(),
|
||||
false,
|
||||
)
|
||||
.with_time_index(true),
|
||||
update_at_schema.clone(),
|
||||
],
|
||||
primary_keys: vec![],
|
||||
time_index: "ts".to_string(),
|
||||
},
|
||||
TestCase {
|
||||
sql: "SELECT number, ts FROM numbers_with_ts GROUP BY ts, number".to_string(),
|
||||
sink_table_name: "new_table".to_string(),
|
||||
column_schemas: vec![
|
||||
ColumnSchema::new("number", ConcreteDataType::uint32_datatype(), true),
|
||||
ColumnSchema::new(
|
||||
"ts",
|
||||
ConcreteDataType::timestamp_millisecond_datatype(),
|
||||
false,
|
||||
)
|
||||
.with_time_index(true),
|
||||
update_at_schema.clone(),
|
||||
],
|
||||
primary_keys: vec!["number".to_string()],
|
||||
time_index: "ts".to_string(),
|
||||
},
|
||||
];
|
||||
|
||||
for tc in testcases {
|
||||
let plan = sql_to_df_plan(ctx.clone(), query_engine.clone(), &tc.sql, true)
|
||||
.await
|
||||
.unwrap();
|
||||
let expr = create_table_with_expr(
|
||||
&plan,
|
||||
&[
|
||||
"greptime".to_string(),
|
||||
"public".to_string(),
|
||||
tc.sink_table_name.clone(),
|
||||
],
|
||||
&QueryType::Sql,
|
||||
)
|
||||
.unwrap();
|
||||
// TODO(discord9): assert expr
|
||||
let column_schemas = expr
|
||||
.column_defs
|
||||
.iter()
|
||||
.map(|c| try_as_column_schema(c).unwrap())
|
||||
.collect::<Vec<_>>();
|
||||
assert_eq!(tc.column_schemas, column_schemas, "{:?}", tc.sql);
|
||||
assert_eq!(tc.primary_keys, expr.primary_keys, "{:?}", tc.sql);
|
||||
assert_eq!(tc.time_index, expr.time_index, "{:?}", tc.sql);
|
||||
}
|
||||
}
|
||||
}
|
||||
mod test;
|
||||
|
||||
337
src/flow/src/batching_mode/task/test.rs
Normal file
337
src/flow/src/batching_mode/task/test.rs
Normal file
@@ -0,0 +1,337 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use catalog::RegisterTableRequest;
|
||||
use catalog::memory::MemoryCatalogManager;
|
||||
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
|
||||
use common_recordbatch::RecordBatch;
|
||||
use datatypes::data_type::ConcreteDataType as CDT;
|
||||
use datatypes::schema::ColumnSchema;
|
||||
use datatypes::vectors::{UInt32Vector, VectorRef};
|
||||
use pretty_assertions::assert_eq;
|
||||
use session::context::QueryContext;
|
||||
use table::test_util::MemTable;
|
||||
|
||||
use super::*;
|
||||
use crate::batching_mode::time_window::find_time_window_expr;
|
||||
use crate::test_utils::create_test_query_engine;
|
||||
|
||||
async fn new_test_task_and_plan_with_missing_sink() -> (BatchingTask, LogicalPlan) {
|
||||
new_test_task_engine_and_plan_with_query(
|
||||
"SELECT number, ts FROM numbers_with_ts",
|
||||
"missing_sink",
|
||||
)
|
||||
.await
|
||||
.into_task_and_plan()
|
||||
}
|
||||
|
||||
struct TestTaskParts {
|
||||
task: BatchingTask,
|
||||
query_engine: QueryEngineRef,
|
||||
plan: LogicalPlan,
|
||||
}
|
||||
|
||||
impl TestTaskParts {
|
||||
fn into_task_and_plan(self) -> (BatchingTask, LogicalPlan) {
|
||||
(self.task, self.plan)
|
||||
}
|
||||
}
|
||||
|
||||
async fn new_test_task_engine_and_plan_with_query(query: &str, sink_table: &str) -> TestTaskParts {
|
||||
let query_engine = create_test_query_engine();
|
||||
let ctx = QueryContext::arc();
|
||||
let plan = sql_to_df_plan(
|
||||
ctx.clone(),
|
||||
query_engine.clone(),
|
||||
"SELECT number, ts FROM numbers_with_ts",
|
||||
true,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
let (_tx, rx) = tokio::sync::oneshot::channel();
|
||||
|
||||
let task = BatchingTask::try_new(TaskArgs {
|
||||
flow_id: 1,
|
||||
query,
|
||||
plan: plan.clone(),
|
||||
time_window_expr: None,
|
||||
expire_after: None,
|
||||
sink_table_name: [
|
||||
"greptime".to_string(),
|
||||
"public".to_string(),
|
||||
sink_table.to_string(),
|
||||
],
|
||||
source_table_names: vec![[
|
||||
"greptime".to_string(),
|
||||
"public".to_string(),
|
||||
"numbers_with_ts".to_string(),
|
||||
]],
|
||||
query_ctx: ctx,
|
||||
catalog_manager: query_engine.engine_state().catalog_manager().clone(),
|
||||
shutdown_rx: rx,
|
||||
batch_opts: Arc::new(BatchingModeOptions::default()),
|
||||
flow_eval_interval: None,
|
||||
})
|
||||
.unwrap();
|
||||
|
||||
TestTaskParts {
|
||||
task,
|
||||
query_engine,
|
||||
plan,
|
||||
}
|
||||
}
|
||||
|
||||
async fn new_time_window_test_task_with_query(query: &str) -> TestTaskParts {
|
||||
let query_engine = create_test_query_engine();
|
||||
let ctx = QueryContext::arc();
|
||||
let plan_query = "SELECT number, date_bin(INTERVAL '5 second', ts) AS time_window FROM numbers_with_ts GROUP BY time_window, number";
|
||||
let plan = sql_to_df_plan(ctx.clone(), query_engine.clone(), plan_query, true)
|
||||
.await
|
||||
.unwrap();
|
||||
let (column_name, time_window_expr, _, df_schema) = find_time_window_expr(
|
||||
&plan,
|
||||
query_engine.engine_state().catalog_manager().clone(),
|
||||
ctx.clone(),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
let time_window_expr = time_window_expr.map(|expr| {
|
||||
TimeWindowExpr::from_expr(
|
||||
&expr,
|
||||
&column_name,
|
||||
&df_schema,
|
||||
&query_engine.engine_state().session_state(),
|
||||
)
|
||||
.unwrap()
|
||||
});
|
||||
let (_tx, rx) = tokio::sync::oneshot::channel();
|
||||
|
||||
let task = BatchingTask::try_new(TaskArgs {
|
||||
flow_id: 1,
|
||||
query,
|
||||
plan: plan.clone(),
|
||||
time_window_expr,
|
||||
expire_after: None,
|
||||
sink_table_name: [
|
||||
"greptime".to_string(),
|
||||
"public".to_string(),
|
||||
"missing_sink".to_string(),
|
||||
],
|
||||
source_table_names: vec![[
|
||||
"greptime".to_string(),
|
||||
"public".to_string(),
|
||||
"numbers_with_ts".to_string(),
|
||||
]],
|
||||
query_ctx: ctx,
|
||||
catalog_manager: query_engine.engine_state().catalog_manager().clone(),
|
||||
shutdown_rx: rx,
|
||||
batch_opts: Arc::new(BatchingModeOptions::default()),
|
||||
flow_eval_interval: None,
|
||||
})
|
||||
.unwrap();
|
||||
|
||||
TestTaskParts {
|
||||
task,
|
||||
query_engine,
|
||||
plan,
|
||||
}
|
||||
}
|
||||
|
||||
fn register_number_only_sink(query_engine: &QueryEngineRef, table_name: &str) {
|
||||
let schema = Arc::new(Schema::new(vec![ColumnSchema::new(
|
||||
"number",
|
||||
CDT::uint32_datatype(),
|
||||
false,
|
||||
)]));
|
||||
let columns: Vec<VectorRef> = vec![Arc::new(UInt32Vector::from_slice([1_u32]))];
|
||||
let recordbatch = RecordBatch::new(schema, columns).unwrap();
|
||||
let table = MemTable::table(table_name, recordbatch);
|
||||
let request = RegisterTableRequest {
|
||||
catalog: DEFAULT_CATALOG_NAME.to_string(),
|
||||
schema: DEFAULT_SCHEMA_NAME.to_string(),
|
||||
table_name: table_name.to_string(),
|
||||
table_id: 9001,
|
||||
table,
|
||||
};
|
||||
let catalog_manager = query_engine.engine_state().catalog_manager();
|
||||
let memory_catalog = catalog_manager
|
||||
.as_any()
|
||||
.downcast_ref::<MemoryCatalogManager>()
|
||||
.unwrap();
|
||||
memory_catalog.register_table_sync(request).unwrap();
|
||||
}
|
||||
|
||||
fn dirty_marker() -> DirtyTimeWindows {
|
||||
let mut dirty = DirtyTimeWindows::default();
|
||||
dirty.set_dirty();
|
||||
dirty
|
||||
}
|
||||
|
||||
fn dirty_range(start: i64, end: i64) -> DirtyTimeWindows {
|
||||
let mut dirty = DirtyTimeWindows::default();
|
||||
dirty.add_window(
|
||||
Timestamp::new_second(start),
|
||||
Some(Timestamp::new_second(end)),
|
||||
);
|
||||
dirty
|
||||
}
|
||||
|
||||
async fn assert_unscoped_failure_restore(
|
||||
consumed_dirty_windows: DirtyTimeWindows,
|
||||
current_dirty_windows: DirtyTimeWindows,
|
||||
expected_len: usize,
|
||||
expected_window_size_secs: u64,
|
||||
) {
|
||||
let (task, plan) = new_test_task_and_plan_with_missing_sink().await;
|
||||
{
|
||||
let mut state = task.state.write().unwrap();
|
||||
state.dirty_time_windows.clean();
|
||||
state
|
||||
.dirty_time_windows
|
||||
.add_dirty_windows(¤t_dirty_windows);
|
||||
}
|
||||
let unscoped_query = PlanInfo {
|
||||
plan,
|
||||
dirty_restore: DirtyRestore::Unscoped(consumed_dirty_windows),
|
||||
};
|
||||
|
||||
task.handle_executed_query_failure(Some(&unscoped_query));
|
||||
|
||||
let state = task.state.read().unwrap();
|
||||
assert_eq!(state.dirty_time_windows.len(), expected_len);
|
||||
assert_eq!(
|
||||
state.dirty_time_windows.window_size(),
|
||||
std::time::Duration::from_secs(expected_window_size_secs)
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_executed_query_failure_restores_scoped_dirty_windows_for_flush_path() {
|
||||
let (task, plan) = new_test_task_and_plan_with_missing_sink().await;
|
||||
{
|
||||
let mut state = task.state.write().unwrap();
|
||||
state.dirty_time_windows.clean();
|
||||
}
|
||||
let scoped_query = PlanInfo {
|
||||
plan,
|
||||
dirty_restore: DirtyRestore::Scoped(FilterExprInfo {
|
||||
expr: datafusion_expr::lit(true),
|
||||
col_name: "ts".to_string(),
|
||||
time_ranges: vec![(Timestamp::new_second(10), Timestamp::new_second(20))],
|
||||
window_size: chrono::Duration::seconds(10),
|
||||
}),
|
||||
};
|
||||
|
||||
task.handle_executed_query_failure(Some(&scoped_query));
|
||||
|
||||
let state = task.state.read().unwrap();
|
||||
assert_eq!(state.dirty_time_windows.len(), 1);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_unscoped_failure_restores_consumed_dirty_signal() {
|
||||
assert_unscoped_failure_restore(dirty_marker(), DirtyTimeWindows::default(), 1, 0).await;
|
||||
assert_unscoped_failure_restore(dirty_range(30, 40), dirty_range(10, 20), 2, 20).await;
|
||||
assert_unscoped_failure_restore(dirty_range(30, 40), dirty_range(30, 50), 1, 20).await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_unscoped_plan_generation_failure_restores_consumed_dirty_signal() {
|
||||
let TestTaskParts {
|
||||
task, query_engine, ..
|
||||
} = new_test_task_engine_and_plan_with_query(
|
||||
"SELECT missing_column FROM numbers_with_ts",
|
||||
"missing_sink",
|
||||
)
|
||||
.await;
|
||||
task.state.write().unwrap().dirty_time_windows.set_dirty();
|
||||
let sink_schema = Arc::new(Schema::new(vec![
|
||||
ColumnSchema::new("number", CDT::uint32_datatype(), false),
|
||||
ColumnSchema::new("ts", CDT::timestamp_millisecond_datatype(), false).with_time_index(true),
|
||||
]));
|
||||
|
||||
let result = task
|
||||
.gen_query_with_time_window(query_engine, &sink_schema, &[], false, None)
|
||||
.await;
|
||||
|
||||
assert!(result.is_err());
|
||||
let state = task.state.read().unwrap();
|
||||
assert_eq!(state.dirty_time_windows.len(), 1);
|
||||
assert_eq!(
|
||||
state.dirty_time_windows.window_size(),
|
||||
std::time::Duration::from_secs(0)
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_scoped_plan_generation_failure_restores_consumed_dirty_windows() {
|
||||
let TestTaskParts {
|
||||
task,
|
||||
query_engine,
|
||||
..
|
||||
} = new_time_window_test_task_with_query(
|
||||
"SELECT missing_column, date_bin(INTERVAL '5 second', ts) AS time_window FROM numbers_with_ts GROUP BY time_window, missing_column",
|
||||
)
|
||||
.await;
|
||||
task.state
|
||||
.write()
|
||||
.unwrap()
|
||||
.dirty_time_windows
|
||||
.add_window(Timestamp::new_second(10), Some(Timestamp::new_second(15)));
|
||||
let sink_schema = Arc::new(Schema::new(vec![
|
||||
ColumnSchema::new("number", CDT::uint32_datatype(), false),
|
||||
ColumnSchema::new("time_window", CDT::timestamp_millisecond_datatype(), false)
|
||||
.with_time_index(true),
|
||||
]));
|
||||
|
||||
let result = task
|
||||
.gen_query_with_time_window(query_engine, &sink_schema, &[], false, None)
|
||||
.await;
|
||||
|
||||
assert!(result.is_err());
|
||||
let state = task.state.read().unwrap();
|
||||
assert_eq!(state.dirty_time_windows.len(), 1);
|
||||
assert_eq!(
|
||||
state.dirty_time_windows.window_size(),
|
||||
std::time::Duration::from_secs(5)
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_insert_plan_matching_failure_restores_consumed_dirty_marker() {
|
||||
let sink_table = "partial_sink";
|
||||
let TestTaskParts {
|
||||
task, query_engine, ..
|
||||
} = new_test_task_engine_and_plan_with_query(
|
||||
"SELECT number, ts FROM numbers_with_ts",
|
||||
sink_table,
|
||||
)
|
||||
.await;
|
||||
register_number_only_sink(&query_engine, sink_table);
|
||||
task.state.write().unwrap().dirty_time_windows.set_dirty();
|
||||
|
||||
let result = task.gen_insert_plan(&query_engine, None).await;
|
||||
|
||||
assert!(result.is_err());
|
||||
let _err = match result {
|
||||
Ok(_) => panic!("gen_insert_plan should fail with a sink column mismatch"),
|
||||
Err(err) => err,
|
||||
};
|
||||
let state = task.state.read().unwrap();
|
||||
assert_eq!(state.dirty_time_windows.len(), 1);
|
||||
assert_eq!(
|
||||
state.dirty_time_windows.window_size(),
|
||||
std::time::Duration::from_secs(0)
|
||||
);
|
||||
}
|
||||
@@ -121,8 +121,9 @@ impl GrpcQueryHandler for Instance {
|
||||
.context(PlanStatementSnafu)?;
|
||||
|
||||
let dummy_catalog_list =
|
||||
Arc::new(catalog::table_source::dummy_catalog::DummyCatalogList::new(
|
||||
Arc::new(catalog::table_source::dummy_catalog::DummyCatalogList::new_with_query_ctx(
|
||||
self.catalog_manager().clone(),
|
||||
ctx.clone(),
|
||||
));
|
||||
|
||||
let logical_plan = plan_decoder
|
||||
@@ -416,10 +417,12 @@ impl Instance {
|
||||
.new_plan_decoder()
|
||||
.context(PlanStatementSnafu)?;
|
||||
|
||||
let dummy_catalog_list =
|
||||
Arc::new(catalog::table_source::dummy_catalog::DummyCatalogList::new(
|
||||
let dummy_catalog_list = Arc::new(
|
||||
catalog::table_source::dummy_catalog::DummyCatalogList::new_with_query_ctx(
|
||||
self.catalog_manager().clone(),
|
||||
));
|
||||
ctx.clone(),
|
||||
),
|
||||
);
|
||||
|
||||
// no optimize yet since we still need to add stuff
|
||||
let logical_plan = plan_decoder
|
||||
|
||||
@@ -620,7 +620,10 @@ mod tests {
|
||||
|
||||
use super::*;
|
||||
use crate::error::Error;
|
||||
use crate::options::{FLOW_INCREMENTAL_AFTER_SEQS, FLOW_INCREMENTAL_MODE, FLOW_SINK_TABLE_ID};
|
||||
use crate::options::{
|
||||
FLOW_INCREMENTAL_AFTER_SEQS, FLOW_INCREMENTAL_MODE, FLOW_RETURN_REGION_SEQ,
|
||||
FLOW_SINK_TABLE_ID,
|
||||
};
|
||||
|
||||
fn test_region_id() -> RegionId {
|
||||
RegionId::new(1024, 1)
|
||||
@@ -651,6 +654,49 @@ mod tests {
|
||||
assert_eq!(request.sst_min_sequence, Some(7));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_terminal_watermark_context_source_and_sink_scan_semantics() {
|
||||
let region_id = test_region_id();
|
||||
let query_ctx = QueryContextBuilder::default()
|
||||
.extensions(HashMap::from([(
|
||||
FLOW_RETURN_REGION_SEQ.to_string(),
|
||||
"true".to_string(),
|
||||
)]))
|
||||
.build();
|
||||
|
||||
let request = scan_request_from_query_context(region_id, &query_ctx).unwrap();
|
||||
|
||||
assert!(request.snapshot_on_scan);
|
||||
assert_eq!(request.memtable_min_sequence, None);
|
||||
assert_eq!(request.memtable_max_sequence, None);
|
||||
assert_eq!(request.sst_min_sequence, None);
|
||||
|
||||
let query_ctx = QueryContextBuilder::default()
|
||||
.extensions(HashMap::from([
|
||||
(FLOW_RETURN_REGION_SEQ.to_string(), "true".to_string()),
|
||||
(
|
||||
FLOW_SINK_TABLE_ID.to_string(),
|
||||
region_id.table_id().to_string(),
|
||||
),
|
||||
]))
|
||||
.snapshot_seqs(Arc::new(RwLock::new(HashMap::from([(
|
||||
region_id.as_u64(),
|
||||
88_u64,
|
||||
)]))))
|
||||
.sst_min_sequences(Arc::new(RwLock::new(HashMap::from([(
|
||||
region_id.as_u64(),
|
||||
77_u64,
|
||||
)]))))
|
||||
.build();
|
||||
|
||||
let request = scan_request_from_query_context(region_id, &query_ctx).unwrap();
|
||||
|
||||
assert!(!request.snapshot_on_scan);
|
||||
assert_eq!(request.memtable_min_sequence, None);
|
||||
assert_eq!(request.memtable_max_sequence, None);
|
||||
assert_eq!(request.sst_min_sequence, None);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_scan_request_from_incremental_context_uses_snapshot_bound_intent() {
|
||||
let region_id = test_region_id();
|
||||
|
||||
Reference in New Issue
Block a user