fix(servers): flight stuck on waiting for first message (#7413)

* fix/flight-stuck-on-first-message:
 **Refactor GRPC Stream Handling and Table Resolution**

 - **`grpc.rs`**: Refactored the `GrpcQueryHandler` to resolve table references and check permissions only once per stream, improving efficiency. Introduced a mechanism to handle table resolution and permission checks after receiving the first `RecordBatch`.
 - **`flight.rs`**: Enhanced `PutRecordBatchRequestStream` to manage stream states (`Init` and `Ready`) for better handling of schema and table name extraction. Improved error handling and logging for unexpected flight messages.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* chore: add some doc

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

---------

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>
This commit is contained in:
Lei, HUANG
2025-12-16 16:54:13 +08:00
committed by GitHub
parent f6afb10e33
commit e0c1566e92
2 changed files with 200 additions and 138 deletions

View File

@@ -301,39 +301,15 @@ impl GrpcQueryHandler for Instance {
mut stream: servers::grpc::flight::PutRecordBatchRequestStream, mut stream: servers::grpc::flight::PutRecordBatchRequestStream,
ctx: QueryContextRef, ctx: QueryContextRef,
) -> Pin<Box<dyn Stream<Item = Result<DoPutResponse>> + Send>> { ) -> Pin<Box<dyn Stream<Item = Result<DoPutResponse>> + Send>> {
// Resolve table once for the stream
// Clone all necessary data to make it 'static // Clone all necessary data to make it 'static
let catalog_manager = self.catalog_manager().clone(); let catalog_manager = self.catalog_manager().clone();
let plugins = self.plugins.clone(); let plugins = self.plugins.clone();
let inserter = self.inserter.clone(); let inserter = self.inserter.clone();
let table_name = stream.table_name().clone();
let ctx = ctx.clone(); let ctx = ctx.clone();
let mut table_ref: Option<TableRef> = None;
let mut table_checked = false;
Box::pin(try_stream! { Box::pin(try_stream! {
plugins
.get::<PermissionCheckerRef>()
.as_ref()
.check_permission(ctx.current_user(), PermissionReq::BulkInsert)
.context(PermissionSnafu)?;
// Cache for resolved table reference - resolve once and reuse
let table_ref = catalog_manager
.table(
&table_name.catalog_name,
&table_name.schema_name,
&table_name.table_name,
None,
)
.await
.context(CatalogSnafu)?
.with_context(|| TableNotFoundSnafu {
table_name: table_name.to_string(),
})?;
// Check permissions once for the stream
let interceptor_ref = plugins.get::<GrpcQueryInterceptorRef<Error>>();
let interceptor = interceptor_ref.as_ref();
interceptor.pre_bulk_insert(table_ref.clone(), ctx.clone())?;
// Process each request in the stream // Process each request in the stream
while let Some(request_result) = stream.next().await { while let Some(request_result) = stream.next().await {
let request = request_result.map_err(|e| { let request = request_result.map_err(|e| {
@@ -341,11 +317,45 @@ impl GrpcQueryHandler for Instance {
IncompleteGrpcRequestSnafu { err_msg: error_msg }.build() IncompleteGrpcRequestSnafu { err_msg: error_msg }.build()
})?; })?;
// Resolve table and check permissions on first RecordBatch (after schema is received)
if !table_checked {
let table_name = &request.table_name;
plugins
.get::<PermissionCheckerRef>()
.as_ref()
.check_permission(ctx.current_user(), PermissionReq::BulkInsert)
.context(PermissionSnafu)?;
// Resolve table reference
table_ref = Some(
catalog_manager
.table(
&table_name.catalog_name,
&table_name.schema_name,
&table_name.table_name,
None,
)
.await
.context(CatalogSnafu)?
.with_context(|| TableNotFoundSnafu {
table_name: table_name.to_string(),
})?,
);
// Check permissions for the table
let interceptor_ref = plugins.get::<GrpcQueryInterceptorRef<Error>>();
let interceptor = interceptor_ref.as_ref();
interceptor.pre_bulk_insert(table_ref.clone().unwrap(), ctx.clone())?;
table_checked = true;
}
let request_id = request.request_id; let request_id = request.request_id;
let start = Instant::now(); let start = Instant::now();
let rows = inserter let rows = inserter
.handle_bulk_insert( .handle_bulk_insert(
table_ref.clone(), table_ref.clone().unwrap(),
request.flight_data, request.flight_data,
request.record_batch, request.record_batch,
request.schema_bytes, request.schema_bytes,

View File

@@ -31,6 +31,7 @@ use common_grpc::flight::do_put::{DoPutMetadata, DoPutResponse};
use common_grpc::flight::{FlightDecoder, FlightEncoder, FlightMessage}; use common_grpc::flight::{FlightDecoder, FlightEncoder, FlightMessage};
use common_query::{Output, OutputData}; use common_query::{Output, OutputData};
use common_recordbatch::DfRecordBatch; use common_recordbatch::DfRecordBatch;
use common_telemetry::debug;
use common_telemetry::tracing::info_span; use common_telemetry::tracing::info_span;
use common_telemetry::tracing_context::{FutureExt, TracingContext}; use common_telemetry::tracing_context::{FutureExt, TracingContext};
use datatypes::arrow::datatypes::SchemaRef; use datatypes::arrow::datatypes::SchemaRef;
@@ -233,7 +234,7 @@ impl FlightCraft for GreptimeRequestHandler {
limiter, limiter,
) )
.await?; .await?;
// Ack to the first schema message when we successfully built the stream. // Ack immediately when stream is created successfully (in Init state)
let _ = tx.send(Ok(DoPutResponse::new(0, 0, 0.0))).await; let _ = tx.send(Ok(DoPutResponse::new(0, 0, 0.0))).await;
self.put_record_batches(stream, tx, query_ctx).await; self.put_record_batches(stream, tx, query_ctx).await;
@@ -305,98 +306,83 @@ impl PutRecordBatchRequest {
pub struct PutRecordBatchRequestStream { pub struct PutRecordBatchRequestStream {
flight_data_stream: Streaming<FlightData>, flight_data_stream: Streaming<FlightData>,
table_name: TableName, catalog: String,
schema: SchemaRef, schema_name: String,
schema_bytes: Bytes,
decoder: FlightDecoder,
limiter: Option<RequestMemoryLimiter>, limiter: Option<RequestMemoryLimiter>,
// Client now lazily sends schema data so we cannot eagerly wait for it.
// Instead, we need to decode while receiving record batches.
state: StreamState,
}
enum StreamState {
Init,
Ready {
table_name: TableName,
schema: SchemaRef,
schema_bytes: Bytes,
decoder: FlightDecoder,
},
} }
impl PutRecordBatchRequestStream { impl PutRecordBatchRequestStream {
/// Creates a new `PutRecordBatchRequestStream` by waiting for the first message, /// Creates a new `PutRecordBatchRequestStream` in Init state.
/// extracting the table name from the flight descriptor, and decoding the schema. /// The stream will transition to Ready state when it receives the schema message.
pub async fn new( pub async fn new(
mut flight_data_stream: Streaming<FlightData>, flight_data_stream: Streaming<FlightData>,
catalog: String, catalog: String,
schema: String, schema: String,
limiter: Option<RequestMemoryLimiter>, limiter: Option<RequestMemoryLimiter>,
) -> TonicResult<Self> { ) -> TonicResult<Self> {
fn extract_table_name(mut descriptor: FlightDescriptor) -> Result<String> {
ensure!(
descriptor.r#type == arrow_flight::flight_descriptor::DescriptorType::Path as i32,
InvalidParameterSnafu {
reason: "expect FlightDescriptor::type == 'Path' only",
}
);
ensure!(
descriptor.path.len() == 1,
InvalidParameterSnafu {
reason: "expect FlightDescriptor::path has only one table name",
}
);
Ok(descriptor.path.remove(0))
}
// Wait for the first message which must be a Schema message
let first_message = flight_data_stream.next().await.ok_or_else(|| {
Status::failed_precondition("flight data stream ended unexpectedly")
})??;
let flight_descriptor = first_message
.flight_descriptor
.as_ref()
.ok_or_else(|| {
Status::failed_precondition("table to put is not found in flight descriptor")
})?
.clone();
let table_name_str = extract_table_name(flight_descriptor)
.map_err(|e| Status::invalid_argument(e.to_string()))?;
let table_name = TableName::new(catalog, schema, table_name_str);
// Decode the first message as schema
let mut decoder = FlightDecoder::default();
let schema_message = decoder
.try_decode(&first_message)
.map_err(|e| Status::invalid_argument(format!("Failed to decode schema: {}", e)))?;
let (schema, schema_bytes) = match schema_message {
Some(FlightMessage::Schema(schema)) => {
let schema_bytes = decoder.schema_bytes().ok_or_else(|| {
Status::internal("decoder should have schema bytes after decoding schema")
})?;
(schema, schema_bytes)
}
_ => {
return Err(Status::failed_precondition(
"first message must be a Schema message",
));
}
};
Ok(Self { Ok(Self {
flight_data_stream, flight_data_stream,
table_name, catalog,
schema, schema_name: schema,
schema_bytes,
decoder,
limiter, limiter,
state: StreamState::Init,
}) })
} }
/// Returns the table name extracted from the flight descriptor. /// Returns the table name extracted from the flight descriptor.
pub fn table_name(&self) -> &TableName { /// Returns None if the stream is still in Init state.
&self.table_name pub fn table_name(&self) -> Option<&TableName> {
match &self.state {
StreamState::Init => None,
StreamState::Ready { table_name, .. } => Some(table_name),
}
} }
/// Returns the Arrow schema decoded from the first flight message. /// Returns the Arrow schema decoded from the first flight message.
pub fn schema(&self) -> &SchemaRef { /// Returns None if the stream is still in Init state.
&self.schema pub fn schema(&self) -> Option<&SchemaRef> {
match &self.state {
StreamState::Init => None,
StreamState::Ready { schema, .. } => Some(schema),
}
} }
/// Returns the raw schema bytes in IPC format. /// Returns the raw schema bytes in IPC format.
pub fn schema_bytes(&self) -> &Bytes { /// Returns None if the stream is still in Init state.
&self.schema_bytes pub fn schema_bytes(&self) -> Option<&Bytes> {
match &self.state {
StreamState::Init => None,
StreamState::Ready { schema_bytes, .. } => Some(schema_bytes),
}
}
fn extract_table_name(mut descriptor: FlightDescriptor) -> Result<String> {
ensure!(
descriptor.r#type == arrow_flight::flight_descriptor::DescriptorType::Path as i32,
InvalidParameterSnafu {
reason: "expect FlightDescriptor::type == 'Path' only",
}
);
ensure!(
descriptor.path.len() == 1,
InvalidParameterSnafu {
reason: "expect FlightDescriptor::path has only one table name",
}
);
Ok(descriptor.path.remove(0))
} }
} }
@@ -409,48 +395,114 @@ impl Stream for PutRecordBatchRequestStream {
match poll { match poll {
Some(Ok(flight_data)) => { Some(Ok(flight_data)) => {
// Extract request_id and body_size from FlightData before decoding // Clone limiter once to avoid borrowing issues
let request_id = if !flight_data.app_metadata.is_empty() { let limiter = self.limiter.clone();
match serde_json::from_slice::<DoPutMetadata>(&flight_data.app_metadata) {
Ok(metadata) => metadata.request_id(),
Err(_) => 0,
}
} else {
0
};
// Decode FlightData to RecordBatch match &mut self.state {
match self.decoder.try_decode(&flight_data) { StreamState::Init => {
Ok(Some(FlightMessage::RecordBatch(record_batch))) => { // First message - expecting schema
let limiter = self.limiter.clone(); let flight_descriptor = match flight_data.flight_descriptor.as_ref() {
let table_name = self.table_name.clone(); Some(descriptor) => descriptor.clone(),
let schema_bytes = self.schema_bytes.clone(); None => {
return Poll::Ready(Some( return Poll::Ready(Some(Err(Status::failed_precondition(
PutRecordBatchRequest::try_new( "table to put is not found in flight descriptor",
table_name, ))));
record_batch, }
request_id, };
schema_bytes,
flight_data, let table_name_str = match Self::extract_table_name(flight_descriptor) {
limiter.as_ref(), Ok(name) => name,
) Err(e) => {
.map_err(|e| Status::invalid_argument(e.to_string())), return Poll::Ready(Some(Err(Status::invalid_argument(
)); e.to_string(),
))));
}
};
let table_name = TableName::new(
self.catalog.clone(),
self.schema_name.clone(),
table_name_str,
);
// Decode the schema
let mut decoder = FlightDecoder::default();
let schema_message = decoder.try_decode(&flight_data).map_err(|e| {
Status::invalid_argument(format!("Failed to decode schema: {}", e))
})?;
match schema_message {
Some(FlightMessage::Schema(schema)) => {
let schema_bytes = decoder.schema_bytes().ok_or_else(|| {
Status::internal(
"decoder should have schema bytes after decoding schema",
)
})?;
// Transition to Ready state with all necessary data
self.state = StreamState::Ready {
table_name,
schema,
schema_bytes,
decoder,
};
// Continue to next iteration to process RecordBatch messages
continue;
}
_ => {
return Poll::Ready(Some(Err(Status::failed_precondition(
"first message must be a Schema message",
))));
}
}
} }
Ok(Some(_)) => { StreamState::Ready {
return Poll::Ready(Some(Err(Status::invalid_argument( table_name,
"Expected RecordBatch message, got other message type", schema: _,
)))); schema_bytes,
} decoder,
Ok(None) => { } => {
// Dictionary batch - processed internally by decoder, continue polling // Extract request_id and body_size from FlightData before decoding
continue; let request_id = if !flight_data.app_metadata.is_empty() {
} serde_json::from_slice::<DoPutMetadata>(&flight_data.app_metadata)
Err(e) => { .map(|meta| meta.request_id())
return Poll::Ready(Some(Err(Status::invalid_argument(format!( .unwrap_or_default()
"Failed to decode RecordBatch: {}", } else {
e 0
))))); };
// Decode FlightData to RecordBatch
match decoder.try_decode(&flight_data) {
Ok(Some(FlightMessage::RecordBatch(record_batch))) => {
let table_name = table_name.clone();
let schema_bytes = schema_bytes.clone();
return Poll::Ready(Some(
PutRecordBatchRequest::try_new(
table_name,
record_batch,
request_id,
schema_bytes,
flight_data,
limiter.as_ref(),
)
.map_err(|e| Status::invalid_argument(e.to_string())),
));
}
Ok(Some(other)) => {
debug!("Unexpected flight message: {:?}", other);
return Poll::Ready(Some(Err(Status::invalid_argument(
"Expected RecordBatch message, got other message type",
))));
}
Ok(None) => {
// Dictionary batch - processed internally by decoder, continue polling
continue;
}
Err(e) => {
return Poll::Ready(Some(Err(Status::invalid_argument(
format!("Failed to decode RecordBatch: {}", e),
))));
}
}
} }
} }
} }