refactor: AddAutoColumnRewriter check for Projection

This commit is contained in:
discord9
2025-04-21 15:01:13 +08:00
parent d2c4767d41
commit e647559d27
4 changed files with 37 additions and 6 deletions

View File

@@ -256,7 +256,13 @@ impl TreeNodeRewriter for AddAutoColumnRewriter {
node = projection;
}
let mut exprs = node.expressions();
// only do rewrite if found the outermost projection
let mut exprs = if let LogicalPlan::Projection(project) = &node {
project.expr.clone()
} else {
return Ok(Transformed::no(node));
};
let all_names = self
.schema
.column_schemas()
@@ -333,7 +339,7 @@ impl TreeNodeRewriter for AddAutoColumnRewriter {
} else {
return Err(DataFusionError::Plan(format!(
"Expect table have 0,1 or 2 columns more than query columns, found {} query columns {:?}, {} table columns {:?} at node {:?}",
query_col_cnt, node.expressions(), table_col_cnt, self.schema.column_schemas(), node
query_col_cnt, exprs, table_col_cnt, self.schema.column_schemas(), node
)));
}

View File

@@ -29,6 +29,7 @@ use common_meta::key::TableMetadataManagerRef;
use common_meta::kv_backend::KvBackendRef;
use common_meta::node_manager::{Flownode, NodeManagerRef};
use common_query::Output;
use common_runtime::JoinHandle;
use common_telemetry::tracing::info;
use futures::{FutureExt, TryStreamExt};
use greptime_proto::v1::flow::{flow_server, FlowRequest, FlowResponse, InsertRequests};
@@ -144,11 +145,16 @@ pub struct FlownodeServer {
inner: Arc<FlownodeServerInner>,
}
/// FlownodeServerInner is the inner state of FlownodeServer,
/// this struct mostly useful for construct/start and stop the
/// flow node server
struct FlownodeServerInner {
/// worker shutdown signal, not to be confused with server_shutdown_tx
worker_shutdown_tx: Mutex<broadcast::Sender<()>>,
/// server shutdown signal for shutdown grpc server
server_shutdown_tx: Mutex<broadcast::Sender<()>>,
/// streaming task handler
streaming_task_handler: Mutex<Option<JoinHandle<()>>>,
flow_service: FlowService,
}
@@ -161,6 +167,7 @@ impl FlownodeServer {
flow_service,
worker_shutdown_tx: Mutex::new(tx),
server_shutdown_tx: Mutex::new(server_tx),
streaming_task_handler: Mutex::new(None),
}),
}
}
@@ -168,9 +175,15 @@ impl FlownodeServer {
/// Start the background task for streaming computation.
async fn start_workers(&self) -> Result<(), Error> {
let manager_ref = self.inner.flow_service.dual_engine.clone();
let _handle = manager_ref
let handle = manager_ref
.streaming_engine()
.run_background(Some(self.inner.worker_shutdown_tx.lock().await.subscribe()));
self.inner
.streaming_task_handler
.lock()
.await
.replace(handle);
self.inner
.flow_service
.dual_engine

View File

@@ -19,6 +19,8 @@ use common_error::define_into_tonic_status;
use common_error::ext::{BoxedError, ErrorExt};
use common_error::status_code::StatusCode;
use common_macro::stack_trace_debug;
use common_query::error::datafusion_status_code;
use datafusion::error::DataFusionError;
use session::ReadPreference;
use snafu::{Location, Snafu};
use store_api::storage::RegionId;
@@ -347,6 +349,14 @@ pub enum Error {
location: Location,
source: common_query::error::Error,
},
#[snafu(display("DataFusionError"))]
DataFusion {
#[snafu(source)]
error: DataFusionError,
#[snafu(implicit)]
location: Location,
},
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -423,6 +433,8 @@ impl ErrorExt for Error {
Error::TableOperation { source, .. } => source.status_code(),
Error::InFlightWriteBytesExceeded { .. } => StatusCode::RateLimited,
Error::DataFusion { error, .. } => datafusion_status_code::<Self>(error, None),
}
}

View File

@@ -36,8 +36,8 @@ use snafu::{ensure, OptionExt, ResultExt};
use table::table_name::TableName;
use crate::error::{
CatalogSnafu, Error, InFlightWriteBytesExceededSnafu, IncompleteGrpcRequestSnafu,
NotSupportedSnafu, PermissionSnafu, PlanStatementSnafu, Result,
CatalogSnafu, DataFusionSnafu, Error, InFlightWriteBytesExceededSnafu,
IncompleteGrpcRequestSnafu, NotSupportedSnafu, PermissionSnafu, PlanStatementSnafu, Result,
SubstraitDecodeLogicalPlanSnafu, TableNotFoundSnafu, TableOperationSnafu,
};
use crate::instance::{attach_timer, Instance};
@@ -365,7 +365,7 @@ impl Instance {
.arrow_schema()
.clone()
.try_into()
.unwrap(),
.context(DataFusionSnafu)?,
);
let insert_into = add_insert_to_logical_plan(table_name, df_schema, logical_plan)