refactor: per review

This commit is contained in:
discord9
2025-04-21 19:16:31 +08:00
parent 8e815fc385
commit 87f1a8c622
6 changed files with 66 additions and 42 deletions

View File

@@ -88,7 +88,8 @@ pub fn rename_logical_plan_columns(
/// ///
/// if input logical plan is not `insert into table_name <input>`, return None /// if input logical plan is not `insert into table_name <input>`, return None
/// ///
/// Returned TableName will use provided catalog and schema if not specified in the logical plan /// Returned TableName will use provided catalog and schema if not specified in the logical plan,
/// if table scan in logical plan have full table name, will **NOT** override it.
pub fn breakup_insert_plan( pub fn breakup_insert_plan(
plan: &LogicalPlan, plan: &LogicalPlan,
catalog: &str, catalog: &str,

View File

@@ -31,7 +31,7 @@ use datatypes::value::Value;
use futures::TryStreamExt; use futures::TryStreamExt;
use itertools::Itertools; use itertools::Itertools;
use session::context::QueryContextBuilder; use session::context::QueryContextBuilder;
use snafu::{IntoError, OptionExt, ResultExt}; use snafu::{ensure, IntoError, OptionExt, ResultExt};
use store_api::storage::{RegionId, TableId}; use store_api::storage::{RegionId, TableId};
use tokio::sync::{Mutex, RwLock}; use tokio::sync::{Mutex, RwLock};
@@ -40,7 +40,7 @@ use crate::batching_mode::engine::BatchingEngine;
use crate::engine::FlowEngine; use crate::engine::FlowEngine;
use crate::error::{ use crate::error::{
CreateFlowSnafu, ExternalSnafu, FlowNotFoundSnafu, InsertIntoFlowSnafu, InternalSnafu, CreateFlowSnafu, ExternalSnafu, FlowNotFoundSnafu, InsertIntoFlowSnafu, InternalSnafu,
ListFlowsSnafu, ListFlowsSnafu, SyncCheckTaskSnafu,
}; };
use crate::metrics::METRIC_FLOW_TASK_COUNT; use crate::metrics::METRIC_FLOW_TASK_COUNT;
use crate::repr::{self, DiffRow}; use crate::repr::{self, DiffRow};
@@ -111,11 +111,13 @@ impl FlowDualEngine {
} }
if retry == max_retry { if retry == max_retry {
return crate::error::UnexpectedSnafu { error!(
reason: format!( "Can't sync with check task for flow {} with allow_drop={}",
"Can't sync with check task for flow {} with allow_drop={}", flow_id, allow_drop
flow_id, allow_drop );
), return SyncCheckTaskSnafu {
flow_id,
allow_drop,
} }
.fail(); .fail();
} }
@@ -273,27 +275,30 @@ impl FlowDualEngine {
} }
pub async fn start_flow_consistent_check_task(self: &Arc<Self>) -> Result<(), Error> { pub async fn start_flow_consistent_check_task(self: &Arc<Self>) -> Result<(), Error> {
if self.check_task.lock().await.is_some() { let mut check_task = self.check_task.lock().await;
ensure!(
check_task.is_none(),
crate::error::UnexpectedSnafu { crate::error::UnexpectedSnafu {
reason: "Flow consistent check task already exists", reason: "Flow consistent check task already exists",
} }
.fail()?; );
}
let task = ConsistentCheckTask::start_check_task(self).await?; let task = ConsistentCheckTask::start_check_task(self).await?;
self.check_task.lock().await.replace(task); *check_task = Some(task);
Ok(()) Ok(())
} }
pub async fn stop_flow_consistent_check_task(&self) -> Result<(), Error> { pub async fn stop_flow_consistent_check_task(&self) -> Result<(), Error> {
info!("Stopping flow consistent check task"); info!("Stopping flow consistent check task");
if let Some(task) = self.check_task.lock().await.take() { let mut check_task = self.check_task.lock().await;
task.stop().await?;
} else { ensure!(
check_task.is_some(),
crate::error::UnexpectedSnafu { crate::error::UnexpectedSnafu {
reason: "Flow consistent check task does not exist", reason: "Flow consistent check task does not exist",
} }
.fail()?; );
}
check_task.take().expect("Already checked").stop().await?;
info!("Stopped flow consistent check task"); info!("Stopped flow consistent check task");
Ok(()) Ok(())
} }
@@ -526,11 +531,11 @@ impl FlowEngine for FlowDualEngine {
} }
} }
async fn list_flows(&self) -> Result<Vec<FlowId>, Error> { async fn list_flows(&self) -> Result<impl IntoIterator<Item = FlowId>, Error> {
let mut stream_flows = self.streaming_engine.list_flows().await?; let stream_flows = self.streaming_engine.list_flows().await?;
let batch_flows = self.batching_engine.list_flows().await?; let batch_flows = self.batching_engine.list_flows().await?;
stream_flows.extend(batch_flows);
Ok(stream_flows) Ok(stream_flows.into_iter().chain(batch_flows))
} }
async fn handle_flow_inserts( async fn handle_flow_inserts(
@@ -807,14 +812,14 @@ impl FlowEngine for FlowWorkerManager {
self.flow_exist_inner(flow_id).await self.flow_exist_inner(flow_id).await
} }
async fn list_flows(&self) -> Result<Vec<FlowId>, Error> { async fn list_flows(&self) -> Result<impl IntoIterator<Item = FlowId>, Error> {
Ok(self Ok(self
.flow_err_collectors .flow_err_collectors
.read() .read()
.await .await
.keys() .keys()
.cloned() .cloned()
.collect()) .collect::<Vec<_>>())
} }
async fn handle_flow_inserts( async fn handle_flow_inserts(

View File

@@ -24,8 +24,8 @@ use common_meta::key::flow::FlowMetadataManagerRef;
use common_meta::key::table_info::{TableInfoManager, TableInfoValue}; use common_meta::key::table_info::{TableInfoManager, TableInfoValue};
use common_meta::key::TableMetadataManagerRef; use common_meta::key::TableMetadataManagerRef;
use common_runtime::JoinHandle; use common_runtime::JoinHandle;
use common_telemetry::info;
use common_telemetry::tracing::warn; use common_telemetry::tracing::warn;
use common_telemetry::{debug, info};
use common_time::TimeToLive; use common_time::TimeToLive;
use query::QueryEngineRef; use query::QueryEngineRef;
use snafu::{ensure, OptionExt, ResultExt}; use snafu::{ensure, OptionExt, ResultExt};
@@ -39,8 +39,7 @@ use crate::batching_mode::time_window::{find_time_window_expr, TimeWindowExpr};
use crate::batching_mode::utils::sql_to_df_plan; use crate::batching_mode::utils::sql_to_df_plan;
use crate::engine::FlowEngine; use crate::engine::FlowEngine;
use crate::error::{ use crate::error::{
ExternalSnafu, FlowAlreadyExistSnafu, InvalidQuerySnafu, TableNotFoundMetaSnafu, ExternalSnafu, FlowAlreadyExistSnafu, TableNotFoundMetaSnafu, UnexpectedSnafu, UnsupportedSnafu,
UnexpectedSnafu,
}; };
use crate::{CreateFlowArgs, Error, FlowId, TableName}; use crate::{CreateFlowArgs, Error, FlowId, TableName};
@@ -187,10 +186,10 @@ async fn get_table_name(
table_info: &TableInfoManager, table_info: &TableInfoManager,
table_id: &TableId, table_id: &TableId,
) -> Result<TableName, Error> { ) -> Result<TableName, Error> {
get_table_info(table_info, table_id) get_table_info(table_info, table_id).await.map(|info| {
.await let name = info.table_name();
.map(|info| info.table_name()) [name.catalog_name, name.schema_name, name.table_name]
.map(|name| [name.catalog_name, name.schema_name, name.table_name]) })
} }
async fn get_table_info( async fn get_table_info(
@@ -269,9 +268,9 @@ impl BatchingEngine {
let table_name = get_table_name(self.table_meta.table_info_manager(), &src_id).await?; let table_name = get_table_name(self.table_meta.table_info_manager(), &src_id).await?;
let table_info = get_table_info(self.table_meta.table_info_manager(), &src_id).await?; let table_info = get_table_info(self.table_meta.table_info_manager(), &src_id).await?;
if table_info.table_info.meta.options.ttl == Some(TimeToLive::Instant) { if table_info.table_info.meta.options.ttl == Some(TimeToLive::Instant) {
InvalidQuerySnafu { UnsupportedSnafu {
reason: format!( reason: format!(
"Source table `{}`(id={}) has instant TTL, flow will only evaluate to empty results with such table, use a small ttl instead of instant", "Source table `{}`(id={}) has instant TTL, Instant TTL is not supported under batching mode. Consider using a TTL longer than flush interval",
table_name.join("."), table_name.join("."),
src_id src_id
), ),
@@ -363,7 +362,7 @@ impl BatchingEngine {
} }
pub async fn flush_flow_inner(&self, flow_id: FlowId) -> Result<usize, Error> { pub async fn flush_flow_inner(&self, flow_id: FlowId) -> Result<usize, Error> {
info!("Try flush flow {flow_id}"); debug!("Try flush flow {flow_id}");
let task = self.tasks.read().await.get(&flow_id).cloned(); let task = self.tasks.read().await.get(&flow_id).cloned();
let task = task.with_context(|| UnexpectedSnafu { let task = task.with_context(|| UnexpectedSnafu {
reason: format!("Can't found task for flow {flow_id}"), reason: format!("Can't found task for flow {flow_id}"),
@@ -376,7 +375,7 @@ impl BatchingEngine {
.await?; .await?;
let affected_rows = res.map(|(r, _)| r).unwrap_or_default() as usize; let affected_rows = res.map(|(r, _)| r).unwrap_or_default() as usize;
info!( debug!(
"Successfully flush flow {flow_id}, affected rows={}", "Successfully flush flow {flow_id}, affected rows={}",
affected_rows affected_rows
); );
@@ -402,8 +401,8 @@ impl FlowEngine for BatchingEngine {
async fn flow_exist(&self, flow_id: FlowId) -> Result<bool, Error> { async fn flow_exist(&self, flow_id: FlowId) -> Result<bool, Error> {
Ok(self.flow_exist_inner(flow_id).await) Ok(self.flow_exist_inner(flow_id).await)
} }
async fn list_flows(&self) -> Result<Vec<FlowId>, Error> { async fn list_flows(&self) -> Result<impl IntoIterator<Item = FlowId>, Error> {
Ok(self.tasks.read().await.keys().cloned().collect()) Ok(self.tasks.read().await.keys().cloned().collect::<Vec<_>>())
} }
async fn handle_flow_inserts( async fn handle_flow_inserts(
&self, &self,

View File

@@ -50,7 +50,7 @@ pub trait FlowEngine {
/// Check if the flow exists /// Check if the flow exists
async fn flow_exist(&self, flow_id: FlowId) -> Result<bool, Error>; async fn flow_exist(&self, flow_id: FlowId) -> Result<bool, Error>;
/// List all flows /// List all flows
async fn list_flows(&self) -> Result<Vec<FlowId>, Error>; async fn list_flows(&self) -> Result<impl IntoIterator<Item = FlowId>, Error>;
/// Handle the insert requests for the flow /// Handle the insert requests for the flow
async fn handle_flow_inserts( async fn handle_flow_inserts(
&self, &self,

View File

@@ -149,6 +149,13 @@ pub enum Error {
location: Location, location: Location,
}, },
#[snafu(display("Unsupported: {reason}"))]
Unsupported {
reason: String,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Unsupported temporal filter: {reason}"))] #[snafu(display("Unsupported temporal filter: {reason}"))]
UnsupportedTemporalFilter { UnsupportedTemporalFilter {
reason: String, reason: String,
@@ -189,6 +196,18 @@ pub enum Error {
location: Location, location: Location,
}, },
#[snafu(display(
"Failed to sync with check task for flow {} with allow_drop={}",
flow_id,
allow_drop
))]
SyncCheckTask {
flow_id: FlowId,
allow_drop: bool,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to start server"))] #[snafu(display("Failed to start server"))]
StartServer { StartServer {
#[snafu(implicit)] #[snafu(implicit)]
@@ -280,10 +299,10 @@ impl ErrorExt for Error {
Self::CreateFlow { .. } | Self::Arrow { .. } | Self::Time { .. } => { Self::CreateFlow { .. } | Self::Arrow { .. } | Self::Time { .. } => {
StatusCode::EngineExecuteQuery StatusCode::EngineExecuteQuery
} }
Self::Unexpected { .. } => StatusCode::Unexpected, Self::Unexpected { .. } | Self::SyncCheckTask { .. } => StatusCode::Unexpected,
Self::NotImplemented { .. } | Self::UnsupportedTemporalFilter { .. } => { Self::NotImplemented { .. }
StatusCode::Unsupported | Self::UnsupportedTemporalFilter { .. }
} | Self::Unsupported { .. } => StatusCode::Unsupported,
Self::External { source, .. } => source.status_code(), Self::External { source, .. } => source.status_code(),
Self::Internal { .. } | Self::CacheRequired { .. } => StatusCode::Internal, Self::Internal { .. } | Self::CacheRequired { .. } => StatusCode::Internal,
Self::StartServer { source, .. } | Self::ShutdownServer { source, .. } => { Self::StartServer { source, .. } | Self::ShutdownServer { source, .. } => {

View File

@@ -16,7 +16,7 @@ SELECT
FROM FROM
distinct_basic; distinct_basic;
Error: 3001(EngineExecuteQuery), Invalid query: Source table `greptime.public.distinct_basic`(id=REDACTED) has instant TTL, flow will only evaluate to empty results with such table, use a small ttl instead of instant Error: 3001(EngineExecuteQuery), Unsupported: Source table `greptime.public.distinct_basic`(id=REDACTED) has instant TTL, Instant TTL is not supported under batching mode. Consider using a TTL longer than flush interval
ALTER TABLE distinct_basic SET 'ttl' = '5s'; ALTER TABLE distinct_basic SET 'ttl' = '5s';