From e0c1566e92c841ba0442d55e433de5b0f00aa7f1 Mon Sep 17 00:00:00 2001 From: "Lei, HUANG" <6406592+v0y4g3r@users.noreply.github.com> Date: Tue, 16 Dec 2025 16:54:13 +0800 Subject: [PATCH] fix(servers): flight stuck on waiting for first message (#7413) * fix/flight-stuck-on-first-message: **Refactor GRPC Stream Handling and Table Resolution** - **`grpc.rs`**: Refactored the `GrpcQueryHandler` to resolve table references and check permissions only once per stream, improving efficiency. Introduced a mechanism to handle table resolution and permission checks after receiving the first `RecordBatch`. - **`flight.rs`**: Enhanced `PutRecordBatchRequestStream` to manage stream states (`Init` and `Ready`) for better handling of schema and table name extraction. Improved error handling and logging for unexpected flight messages. Signed-off-by: Lei, HUANG * chore: add some doc Signed-off-by: Lei, HUANG --------- Signed-off-by: Lei, HUANG --- src/frontend/src/instance/grpc.rs | 64 ++++--- src/servers/src/grpc/flight.rs | 274 ++++++++++++++++++------------ 2 files changed, 200 insertions(+), 138 deletions(-) diff --git a/src/frontend/src/instance/grpc.rs b/src/frontend/src/instance/grpc.rs index ab68cb14dd..9d3e3ac85c 100644 --- a/src/frontend/src/instance/grpc.rs +++ b/src/frontend/src/instance/grpc.rs @@ -301,39 +301,15 @@ impl GrpcQueryHandler for Instance { mut stream: servers::grpc::flight::PutRecordBatchRequestStream, ctx: QueryContextRef, ) -> Pin> + Send>> { - // Resolve table once for the stream // Clone all necessary data to make it 'static let catalog_manager = self.catalog_manager().clone(); let plugins = self.plugins.clone(); let inserter = self.inserter.clone(); - let table_name = stream.table_name().clone(); let ctx = ctx.clone(); + let mut table_ref: Option = None; + let mut table_checked = false; Box::pin(try_stream! { - plugins - .get::() - .as_ref() - .check_permission(ctx.current_user(), PermissionReq::BulkInsert) - .context(PermissionSnafu)?; - // Cache for resolved table reference - resolve once and reuse - let table_ref = catalog_manager - .table( - &table_name.catalog_name, - &table_name.schema_name, - &table_name.table_name, - None, - ) - .await - .context(CatalogSnafu)? - .with_context(|| TableNotFoundSnafu { - table_name: table_name.to_string(), - })?; - - // Check permissions once for the stream - let interceptor_ref = plugins.get::>(); - let interceptor = interceptor_ref.as_ref(); - interceptor.pre_bulk_insert(table_ref.clone(), ctx.clone())?; - // Process each request in the stream while let Some(request_result) = stream.next().await { let request = request_result.map_err(|e| { @@ -341,11 +317,45 @@ impl GrpcQueryHandler for Instance { IncompleteGrpcRequestSnafu { err_msg: error_msg }.build() })?; + // Resolve table and check permissions on first RecordBatch (after schema is received) + if !table_checked { + let table_name = &request.table_name; + + plugins + .get::() + .as_ref() + .check_permission(ctx.current_user(), PermissionReq::BulkInsert) + .context(PermissionSnafu)?; + + // Resolve table reference + table_ref = Some( + catalog_manager + .table( + &table_name.catalog_name, + &table_name.schema_name, + &table_name.table_name, + None, + ) + .await + .context(CatalogSnafu)? + .with_context(|| TableNotFoundSnafu { + table_name: table_name.to_string(), + })?, + ); + + // Check permissions for the table + let interceptor_ref = plugins.get::>(); + let interceptor = interceptor_ref.as_ref(); + interceptor.pre_bulk_insert(table_ref.clone().unwrap(), ctx.clone())?; + + table_checked = true; + } + let request_id = request.request_id; let start = Instant::now(); let rows = inserter .handle_bulk_insert( - table_ref.clone(), + table_ref.clone().unwrap(), request.flight_data, request.record_batch, request.schema_bytes, diff --git a/src/servers/src/grpc/flight.rs b/src/servers/src/grpc/flight.rs index 0cc3ddd7f8..a3835b14ff 100644 --- a/src/servers/src/grpc/flight.rs +++ b/src/servers/src/grpc/flight.rs @@ -31,6 +31,7 @@ use common_grpc::flight::do_put::{DoPutMetadata, DoPutResponse}; use common_grpc::flight::{FlightDecoder, FlightEncoder, FlightMessage}; use common_query::{Output, OutputData}; use common_recordbatch::DfRecordBatch; +use common_telemetry::debug; use common_telemetry::tracing::info_span; use common_telemetry::tracing_context::{FutureExt, TracingContext}; use datatypes::arrow::datatypes::SchemaRef; @@ -233,7 +234,7 @@ impl FlightCraft for GreptimeRequestHandler { limiter, ) .await?; - // Ack to the first schema message when we successfully built the stream. + // Ack immediately when stream is created successfully (in Init state) let _ = tx.send(Ok(DoPutResponse::new(0, 0, 0.0))).await; self.put_record_batches(stream, tx, query_ctx).await; @@ -305,98 +306,83 @@ impl PutRecordBatchRequest { pub struct PutRecordBatchRequestStream { flight_data_stream: Streaming, - table_name: TableName, - schema: SchemaRef, - schema_bytes: Bytes, - decoder: FlightDecoder, + catalog: String, + schema_name: String, limiter: Option, + // Client now lazily sends schema data so we cannot eagerly wait for it. + // Instead, we need to decode while receiving record batches. + state: StreamState, +} + +enum StreamState { + Init, + Ready { + table_name: TableName, + schema: SchemaRef, + schema_bytes: Bytes, + decoder: FlightDecoder, + }, } impl PutRecordBatchRequestStream { - /// Creates a new `PutRecordBatchRequestStream` by waiting for the first message, - /// extracting the table name from the flight descriptor, and decoding the schema. + /// Creates a new `PutRecordBatchRequestStream` in Init state. + /// The stream will transition to Ready state when it receives the schema message. pub async fn new( - mut flight_data_stream: Streaming, + flight_data_stream: Streaming, catalog: String, schema: String, limiter: Option, ) -> TonicResult { - fn extract_table_name(mut descriptor: FlightDescriptor) -> Result { - ensure!( - descriptor.r#type == arrow_flight::flight_descriptor::DescriptorType::Path as i32, - InvalidParameterSnafu { - reason: "expect FlightDescriptor::type == 'Path' only", - } - ); - ensure!( - descriptor.path.len() == 1, - InvalidParameterSnafu { - reason: "expect FlightDescriptor::path has only one table name", - } - ); - Ok(descriptor.path.remove(0)) - } - - // Wait for the first message which must be a Schema message - let first_message = flight_data_stream.next().await.ok_or_else(|| { - Status::failed_precondition("flight data stream ended unexpectedly") - })??; - - let flight_descriptor = first_message - .flight_descriptor - .as_ref() - .ok_or_else(|| { - Status::failed_precondition("table to put is not found in flight descriptor") - })? - .clone(); - - let table_name_str = extract_table_name(flight_descriptor) - .map_err(|e| Status::invalid_argument(e.to_string()))?; - let table_name = TableName::new(catalog, schema, table_name_str); - - // Decode the first message as schema - let mut decoder = FlightDecoder::default(); - let schema_message = decoder - .try_decode(&first_message) - .map_err(|e| Status::invalid_argument(format!("Failed to decode schema: {}", e)))?; - - let (schema, schema_bytes) = match schema_message { - Some(FlightMessage::Schema(schema)) => { - let schema_bytes = decoder.schema_bytes().ok_or_else(|| { - Status::internal("decoder should have schema bytes after decoding schema") - })?; - (schema, schema_bytes) - } - _ => { - return Err(Status::failed_precondition( - "first message must be a Schema message", - )); - } - }; - Ok(Self { flight_data_stream, - table_name, - schema, - schema_bytes, - decoder, + catalog, + schema_name: schema, limiter, + state: StreamState::Init, }) } /// Returns the table name extracted from the flight descriptor. - pub fn table_name(&self) -> &TableName { - &self.table_name + /// Returns None if the stream is still in Init state. + pub fn table_name(&self) -> Option<&TableName> { + match &self.state { + StreamState::Init => None, + StreamState::Ready { table_name, .. } => Some(table_name), + } } /// Returns the Arrow schema decoded from the first flight message. - pub fn schema(&self) -> &SchemaRef { - &self.schema + /// Returns None if the stream is still in Init state. + pub fn schema(&self) -> Option<&SchemaRef> { + match &self.state { + StreamState::Init => None, + StreamState::Ready { schema, .. } => Some(schema), + } } /// Returns the raw schema bytes in IPC format. - pub fn schema_bytes(&self) -> &Bytes { - &self.schema_bytes + /// Returns None if the stream is still in Init state. + pub fn schema_bytes(&self) -> Option<&Bytes> { + match &self.state { + StreamState::Init => None, + StreamState::Ready { schema_bytes, .. } => Some(schema_bytes), + } + } + + fn extract_table_name(mut descriptor: FlightDescriptor) -> Result { + ensure!( + descriptor.r#type == arrow_flight::flight_descriptor::DescriptorType::Path as i32, + InvalidParameterSnafu { + reason: "expect FlightDescriptor::type == 'Path' only", + } + ); + ensure!( + descriptor.path.len() == 1, + InvalidParameterSnafu { + reason: "expect FlightDescriptor::path has only one table name", + } + ); + Ok(descriptor.path.remove(0)) } } @@ -409,48 +395,114 @@ impl Stream for PutRecordBatchRequestStream { match poll { Some(Ok(flight_data)) => { - // Extract request_id and body_size from FlightData before decoding - let request_id = if !flight_data.app_metadata.is_empty() { - match serde_json::from_slice::(&flight_data.app_metadata) { - Ok(metadata) => metadata.request_id(), - Err(_) => 0, - } - } else { - 0 - }; + // Clone limiter once to avoid borrowing issues + let limiter = self.limiter.clone(); - // Decode FlightData to RecordBatch - match self.decoder.try_decode(&flight_data) { - Ok(Some(FlightMessage::RecordBatch(record_batch))) => { - let limiter = self.limiter.clone(); - let table_name = self.table_name.clone(); - let schema_bytes = self.schema_bytes.clone(); - return Poll::Ready(Some( - PutRecordBatchRequest::try_new( - table_name, - record_batch, - request_id, - schema_bytes, - flight_data, - limiter.as_ref(), - ) - .map_err(|e| Status::invalid_argument(e.to_string())), - )); + match &mut self.state { + StreamState::Init => { + // First message - expecting schema + let flight_descriptor = match flight_data.flight_descriptor.as_ref() { + Some(descriptor) => descriptor.clone(), + None => { + return Poll::Ready(Some(Err(Status::failed_precondition( + "table to put is not found in flight descriptor", + )))); + } + }; + + let table_name_str = match Self::extract_table_name(flight_descriptor) { + Ok(name) => name, + Err(e) => { + return Poll::Ready(Some(Err(Status::invalid_argument( + e.to_string(), + )))); + } + }; + let table_name = TableName::new( + self.catalog.clone(), + self.schema_name.clone(), + table_name_str, + ); + + // Decode the schema + let mut decoder = FlightDecoder::default(); + let schema_message = decoder.try_decode(&flight_data).map_err(|e| { + Status::invalid_argument(format!("Failed to decode schema: {}", e)) + })?; + + match schema_message { + Some(FlightMessage::Schema(schema)) => { + let schema_bytes = decoder.schema_bytes().ok_or_else(|| { + Status::internal( + "decoder should have schema bytes after decoding schema", + ) + })?; + + // Transition to Ready state with all necessary data + self.state = StreamState::Ready { + table_name, + schema, + schema_bytes, + decoder, + }; + // Continue to next iteration to process RecordBatch messages + continue; + } + _ => { + return Poll::Ready(Some(Err(Status::failed_precondition( + "first message must be a Schema message", + )))); + } + } } - Ok(Some(_)) => { - return Poll::Ready(Some(Err(Status::invalid_argument( - "Expected RecordBatch message, got other message type", - )))); - } - Ok(None) => { - // Dictionary batch - processed internally by decoder, continue polling - continue; - } - Err(e) => { - return Poll::Ready(Some(Err(Status::invalid_argument(format!( - "Failed to decode RecordBatch: {}", - e - ))))); + StreamState::Ready { + table_name, + schema: _, + schema_bytes, + decoder, + } => { + // Extract request_id and body_size from FlightData before decoding + let request_id = if !flight_data.app_metadata.is_empty() { + serde_json::from_slice::(&flight_data.app_metadata) + .map(|meta| meta.request_id()) + .unwrap_or_default() + } else { + 0 + }; + + // Decode FlightData to RecordBatch + match decoder.try_decode(&flight_data) { + Ok(Some(FlightMessage::RecordBatch(record_batch))) => { + let table_name = table_name.clone(); + let schema_bytes = schema_bytes.clone(); + return Poll::Ready(Some( + PutRecordBatchRequest::try_new( + table_name, + record_batch, + request_id, + schema_bytes, + flight_data, + limiter.as_ref(), + ) + .map_err(|e| Status::invalid_argument(e.to_string())), + )); + } + Ok(Some(other)) => { + debug!("Unexpected flight message: {:?}", other); + return Poll::Ready(Some(Err(Status::invalid_argument( + "Expected RecordBatch message, got other message type", + )))); + } + Ok(None) => { + // Dictionary batch - processed internally by decoder, continue polling + continue; + } + Err(e) => { + return Poll::Ready(Some(Err(Status::invalid_argument( + format!("Failed to decode RecordBatch: {}", e), + )))); + } + } } } }