refactor: per review

This commit is contained in:
discord9
2025-04-21 15:53:03 +08:00
parent e647559d27
commit b4aa0c8b8b
4 changed files with 34 additions and 34 deletions

View File

@@ -57,7 +57,6 @@ pub struct FlowDualEngine {
streaming_engine: Arc<FlowWorkerManager>,
batching_engine: Arc<BatchingEngine>,
/// helper struct for faster query flow by table id or vice versa
/// need to also use as a lock so use tokio RwLock
src_table2flow: RwLock<SrcTableToFlow>,
flow_metadata_manager: Arc<FlowMetadataManager>,
catalog_manager: Arc<dyn CatalogManager>,
@@ -112,7 +111,13 @@ impl FlowDualEngine {
}
if retry == max_retry {
return FlowNotFoundSnafu { id: flow_id }.fail();
return crate::error::UnexpectedSnafu {
reason: format!(
"Can't sync with check task for flow {} with allow_drop={}",
flow_id, allow_drop
),
}
.fail();
}
info!("Successfully sync with check task for flow {}", flow_id);
@@ -170,12 +175,12 @@ impl FlowDualEngine {
.into_iter()
.map(|i| i as FlowId)
.collect::<HashSet<_>>();
let actual_exist = self.list_flows().await?.into_iter().collect::<HashSet<_>>();
let actual_exists = self.list_flows().await?.into_iter().collect::<HashSet<_>>();
let to_be_created = should_exists
.iter()
.filter(|id| !actual_exist.contains(id))
.filter(|id| !actual_exists.contains(id))
.collect::<Vec<_>>();
let to_be_dropped = actual_exist
let to_be_dropped = actual_exists
.iter()
.filter(|id| !should_exists.contains(id))
.collect::<Vec<_>>();
@@ -463,20 +468,21 @@ impl FlowEngine for FlowDualEngine {
let flow_id = args.flow_id;
let src_table_ids = args.source_table_ids.clone();
let mut src_table2flow = self.src_table2flow.write().await;
let res = match flow_type {
FlowType::Batching => self.batching_engine.create_flow(args).await,
FlowType::Streaming => self.streaming_engine.create_flow(args).await,
}?;
src_table2flow.add_flow(flow_id, flow_type, src_table_ids);
self.src_table2flow
.write()
.await
.add_flow(flow_id, flow_type, src_table_ids);
Ok(res)
}
async fn remove_flow(&self, flow_id: FlowId) -> Result<(), Error> {
let mut src_table2flow = self.src_table2flow.write().await;
let flow_type = src_table2flow.get_flow_type(flow_id);
let flow_type = self.src_table2flow.read().await.get_flow_type(flow_id);
match flow_type {
Some(FlowType::Batching) => self.batching_engine.remove_flow(flow_id).await,
@@ -495,7 +501,7 @@ impl FlowEngine for FlowDualEngine {
}
}?;
// remove mapping
src_table2flow.remove_flow(flow_id);
self.src_table2flow.write().await.remove_flow(flow_id);
Ok(())
}

View File

@@ -241,7 +241,12 @@ impl FrontendClient {
let database_client = {
database_client
.lock()
.unwrap()
.map_err(|e| {
UnexpectedSnafu {
reason: format!("Failed to lock database client: {e}"),
}
.build()
})?
.as_ref()
.context(UnexpectedSnafu {
reason: "Standalone's frontend instance is not set",

View File

@@ -284,7 +284,7 @@ impl BatchingTask {
// fix all table ref by make it fully qualified, i.e. "table_name" => "catalog_name.schema_name.table_name"
let fixed_plan = plan
.clone()
.transform(|p| {
.transform_down_with_subqueries(|p| {
if let LogicalPlan::TableScan(mut table_scan) = p {
let resolved = table_scan.table_name.resolve(catalog, schema);
table_scan.table_name = resolved.into();
@@ -609,11 +609,7 @@ fn create_table_with_expr(
AUTO_CREATED_UPDATE_AT_TS_COL,
ConcreteDataType::timestamp_millisecond_datatype(),
true,
)
/* .with_default_constraint(Some(ColumnDefaultConstraint::Function(NOW_FN.to_string())))
.context(DatatypesSnafu {
extra: "Failed to build column `update_at TimestampMillisecond default now()`",
})?*/ ;
);
column_schemas.push(update_at_schema);
let time_index = if let Some(time_index) = first_time_stamp {
@@ -625,15 +621,7 @@ fn create_table_with_expr(
ConcreteDataType::timestamp_millisecond_datatype(),
false,
)
.with_time_index(true), /* .with_default_constraint(Some(ColumnDefaultConstraint::Value(Value::Timestamp(
Timestamp::new_millisecond(0),
))))
.context(DatatypesSnafu {
extra: format!(
"Failed to build column `{} TimestampMillisecond TIME INDEX default 0`",
AUTO_CREATED_PLACEHOLDER_TS_COL
),
})?*/
.with_time_index(true),
);
AUTO_CREATED_PLACEHOLDER_TS_COL.to_string()
};
@@ -726,7 +714,6 @@ mod test {
ConcreteDataType::timestamp_millisecond_datatype(),
true,
);
// .with_default_constraint(Some(ColumnDefaultConstraint::Function(NOW_FN.to_string())))
let ts_placeholder_schema = ColumnSchema::new(
AUTO_CREATED_PLACEHOLDER_TS_COL,
@@ -734,8 +721,6 @@ mod test {
false,
)
.with_time_index(true);
// .with_default_constraint(Some(ColumnDefaultConstraint::Value(Value::Timestamp(
// Timestamp::new_millisecond(0), ))))
let testcases = vec![
TestCase {

View File

@@ -26,7 +26,7 @@ use datafusion::sql::unparser::Unparser;
use datafusion_common::tree_node::{
Transformed, TreeNodeRecursion, TreeNodeRewriter, TreeNodeVisitor,
};
use datafusion_common::{DFSchema, DataFusionError};
use datafusion_common::{DFSchema, DataFusionError, ScalarValue};
use datafusion_expr::{Distinct, LogicalPlan, Projection};
use datatypes::schema::SchemaRef;
use query::parser::QueryLanguageParser;
@@ -278,6 +278,9 @@ impl TreeNodeRewriter for AddAutoColumnRewriter {
.get(idx)
.map(|c| c.name.clone())
{
// if the data type mismatched, later check_execute will error out
// hence no need to check it here, beside, optimize pass might be able to cast it
// so checking here is not necessary
*expr = expr.clone().alias(col_name);
}
}
@@ -289,7 +292,8 @@ impl TreeNodeRewriter for AddAutoColumnRewriter {
debug!("query_col_cnt={query_col_cnt}, table_col_cnt={table_col_cnt}");
let placeholder_ts_expr =
datafusion::logical_expr::lit(0).alias(AUTO_CREATED_PLACEHOLDER_TS_COL);
datafusion::logical_expr::lit(ScalarValue::TimestampMillisecond(Some(0), None))
.alias(AUTO_CREATED_PLACEHOLDER_TS_COL);
if query_col_cnt == table_col_cnt {
// still need to add alias, see below
@@ -499,7 +503,7 @@ mod test {
// add ts placeholder
(
"SELECT number FROM numbers_with_ts",
Ok("SELECT numbers_with_ts.number, 0 AS __ts_placeholder FROM numbers_with_ts"),
Ok("SELECT numbers_with_ts.number, CAST('1970-01-01 00:00:00' AS TIMESTAMP) AS __ts_placeholder FROM numbers_with_ts"),
vec![
ColumnSchema::new("number", ConcreteDataType::int32_datatype(), true),
ColumnSchema::new(
@@ -527,7 +531,7 @@ mod test {
// add update_at and ts placeholder
(
"SELECT number FROM numbers_with_ts",
Ok("SELECT numbers_with_ts.number, now() AS update_at, 0 AS __ts_placeholder FROM numbers_with_ts"),
Ok("SELECT numbers_with_ts.number, now() AS update_at, CAST('1970-01-01 00:00:00' AS TIMESTAMP) AS __ts_placeholder FROM numbers_with_ts"),
vec![
ColumnSchema::new("number", ConcreteDataType::int32_datatype(), true),
ColumnSchema::new(
@@ -546,7 +550,7 @@ mod test {
// add ts placeholder
(
"SELECT number, ts FROM numbers_with_ts",
Ok("SELECT numbers_with_ts.number, numbers_with_ts.ts AS update_at, 0 AS __ts_placeholder FROM numbers_with_ts"),
Ok("SELECT numbers_with_ts.number, numbers_with_ts.ts AS update_at, CAST('1970-01-01 00:00:00' AS TIMESTAMP) AS __ts_placeholder FROM numbers_with_ts"),
vec![
ColumnSchema::new("number", ConcreteDataType::int32_datatype(), true),
ColumnSchema::new(