refactor(servers): bulk insert service (#7329)

* refactor/bulk-insert-service:
 refactor: decode FlightData early in put_record_batch pipeline

 - Move FlightDecoder usage from Inserter up to PutRecordBatchRequestStream,
   passing decoded RecordBatch and schema bytes instead of raw FlightData.
 - Eliminate redundant per-request decoding/encoding in Inserter; encode
   once and reuse for all region requests.
 - Streamline GrpcQueryHandler trait and implementations to accept
   PutRecordBatchRequest containing pre-decoded data.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* refactor/bulk-insert-service:
 feat: stream-based bulk insert with per-batch responses

 - Introduce handle_put_record_batch_stream() to process Flight DoPut streams
 - Resolve table & permissions once, yield (request_id, AffectedRows) per batch
 - Replace loop-over-request with async-stream in frontend & server
 - Make PutRecordBatchRequestStream public for cross-crate usage

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* refactor/bulk-insert-service:
 fix: propagate request_id with errors in bulk insert stream

 Changes the bulk-insert stream item type from
 Result<(i64, AffectedRows), E> to (i64, Result<AffectedRows, E>)
 so every emitted tuple carries the request_id even on failure,
 letting callers correlate errors with the originating request.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* refactor/bulk-insert-service:
 refactor: unify DoPut response stream to return DoPutResponse

 Replace the tuple (i64, Result<AffectedRows>) with Result<DoPutResponse>
 throughout the gRPC bulk-insert path so the handler, adapter and server
 all speak the same type.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* refactor/bulk-insert-service:
 feat: add elapsed_secs to DoPutResponse for bulk-insert timing

 - DoPutResponse now carries elapsed_secs field
 - Frontend measures and attaches insert duration
 - Server observes GRPC_BULK_INSERT_ELAPSED metric from response

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* refactor/bulk-insert-service:
 refactor: unify Bytes import in flight module

 - Replace `bytes::Bytes` with `Bytes` alias for consistency
 - Remove redundant `ProstBytes` alias

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* refactor/bulk-insert-service:
 fix: terminate gRPC stream on error and optimize FlightData handling

 - Stop retrying on stream errors in gRPC handler
 - Replace Vec1 indexing with into_iter().next() for FlightData
 - Remove redundant clones in bulk_insert and flight modules

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* refactor/bulk-insert-service:
 Improve permission check placement in `grpc.rs`

 - Moved the permission check for `BulkInsert` to occur before resolving the table reference in `GrpcQueryHandler` implementation.
 - Ensures permission validation is performed earlier in the process, potentially avoiding unnecessary operations if permission is denied.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* refactor/bulk-insert-service:
 **Refactor Bulk Insert Handling in gRPC**

 - **`grpc.rs`**:
   - Switched from `async_stream::stream` to `async_stream::try_stream` for error handling.
   - Removed `body_size` parameter and added `flight_data` to `handle_bulk_insert`.
   - Simplified error handling and permission checks in `GrpcQueryHandler`.

 - **`bulk_insert.rs`**:
   - Added `raw_flight_data` parameter to `handle_bulk_insert`.
   - Calculated `body_size` from `raw_flight_data` and removed redundant encoding logic.

 - **`flight.rs`**:
   - Replaced `body_size` with `flight_data` in `PutRecordBatchRequest`.
   - Updated memory usage calculation to include `flight_data` components.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* refactor/bulk-insert-service:
 perf(bulk_insert): encode record batch once per datanode

 Move FlightData encoding outside the per-region loop so the same
 encoded bytes are reused when mask.select_all(), eliminating redundant
 serialisation work.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

---------

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>
This commit is contained in:
Lei, HUANG
2025-12-03 23:08:02 -08:00
committed by GitHub
parent 2a760f010f
commit 11ecb7a28a
7 changed files with 330 additions and 174 deletions

View File

@@ -46,13 +46,16 @@ pub struct DoPutResponse {
request_id: i64, request_id: i64,
/// The successfully ingested rows number. /// The successfully ingested rows number.
affected_rows: AffectedRows, affected_rows: AffectedRows,
/// The elapsed time in seconds for handling the bulk insert.
elapsed_secs: f64,
} }
impl DoPutResponse { impl DoPutResponse {
pub fn new(request_id: i64, affected_rows: AffectedRows) -> Self { pub fn new(request_id: i64, affected_rows: AffectedRows, elapsed_secs: f64) -> Self {
Self { Self {
request_id, request_id,
affected_rows, affected_rows,
elapsed_secs,
} }
} }
@@ -63,6 +66,10 @@ impl DoPutResponse {
pub fn affected_rows(&self) -> AffectedRows { pub fn affected_rows(&self) -> AffectedRows {
self.affected_rows self.affected_rows
} }
pub fn elapsed_secs(&self) -> f64 {
self.elapsed_secs
}
} }
impl TryFrom<PutResult> for DoPutResponse { impl TryFrom<PutResult> for DoPutResponse {
@@ -86,8 +93,11 @@ mod tests {
#[test] #[test]
fn test_serde_do_put_response() { fn test_serde_do_put_response() {
let x = DoPutResponse::new(42, 88); let x = DoPutResponse::new(42, 88, 0.123);
let serialized = serde_json::to_string(&x).unwrap(); let serialized = serde_json::to_string(&x).unwrap();
assert_eq!(serialized, r#"{"request_id":42,"affected_rows":88}"#); assert_eq!(
serialized,
r#"{"request_id":42,"affected_rows":88,"elapsed_secs":0.123}"#
);
} }
} }

View File

@@ -12,7 +12,9 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
use std::pin::Pin;
use std::sync::Arc; use std::sync::Arc;
use std::time::Instant;
use api::helper::from_pb_time_ranges; use api::helper::from_pb_time_ranges;
use api::v1::ddl_request::{Expr as DdlExpr, Expr}; use api::v1::ddl_request::{Expr as DdlExpr, Expr};
@@ -22,16 +24,18 @@ use api::v1::{
DeleteRequests, DropFlowExpr, InsertIntoPlan, InsertRequests, RowDeleteRequests, DeleteRequests, DropFlowExpr, InsertIntoPlan, InsertRequests, RowDeleteRequests,
RowInsertRequests, RowInsertRequests,
}; };
use async_stream::try_stream;
use async_trait::async_trait; use async_trait::async_trait;
use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq}; use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq};
use common_base::AffectedRows; use common_base::AffectedRows;
use common_error::ext::BoxedError; use common_error::ext::BoxedError;
use common_grpc::FlightData; use common_grpc::flight::do_put::DoPutResponse;
use common_grpc::flight::FlightDecoder;
use common_query::Output; use common_query::Output;
use common_query::logical_plan::add_insert_to_logical_plan; use common_query::logical_plan::add_insert_to_logical_plan;
use common_telemetry::tracing::{self}; use common_telemetry::tracing::{self};
use datafusion::datasource::DefaultTableSource; use datafusion::datasource::DefaultTableSource;
use futures::Stream;
use futures::stream::StreamExt;
use query::parser::PromQuery; use query::parser::PromQuery;
use servers::interceptor::{GrpcQueryInterceptor, GrpcQueryInterceptorRef}; use servers::interceptor::{GrpcQueryInterceptor, GrpcQueryInterceptorRef};
use servers::query_handler::grpc::GrpcQueryHandler; use servers::query_handler::grpc::GrpcQueryHandler;
@@ -240,10 +244,8 @@ impl GrpcQueryHandler for Instance {
async fn put_record_batch( async fn put_record_batch(
&self, &self,
table_name: &TableName, request: servers::grpc::flight::PutRecordBatchRequest,
table_ref: &mut Option<TableRef>, table_ref: &mut Option<TableRef>,
decoder: &mut FlightDecoder,
data: FlightData,
ctx: QueryContextRef, ctx: QueryContextRef,
) -> Result<AffectedRows> { ) -> Result<AffectedRows> {
let table = if let Some(table) = table_ref { let table = if let Some(table) = table_ref {
@@ -252,15 +254,15 @@ impl GrpcQueryHandler for Instance {
let table = self let table = self
.catalog_manager() .catalog_manager()
.table( .table(
&table_name.catalog_name, &request.table_name.catalog_name,
&table_name.schema_name, &request.table_name.schema_name,
&table_name.table_name, &request.table_name.table_name,
None, None,
) )
.await .await
.context(CatalogSnafu)? .context(CatalogSnafu)?
.with_context(|| TableNotFoundSnafu { .with_context(|| TableNotFoundSnafu {
table_name: table_name.to_string(), table_name: request.table_name.to_string(),
})?; })?;
*table_ref = Some(table.clone()); *table_ref = Some(table.clone());
table table
@@ -279,10 +281,77 @@ impl GrpcQueryHandler for Instance {
// do we check limit for bulk insert? // do we check limit for bulk insert?
self.inserter self.inserter
.handle_bulk_insert(table, decoder, data) .handle_bulk_insert(
table,
request.flight_data,
request.record_batch,
request.schema_bytes,
)
.await .await
.context(TableOperationSnafu) .context(TableOperationSnafu)
} }
fn handle_put_record_batch_stream(
&self,
mut stream: servers::grpc::flight::PutRecordBatchRequestStream,
ctx: QueryContextRef,
) -> Pin<Box<dyn Stream<Item = Result<DoPutResponse>> + Send>> {
// Resolve table once for the stream
// Clone all necessary data to make it 'static
let catalog_manager = self.catalog_manager().clone();
let plugins = self.plugins.clone();
let inserter = self.inserter.clone();
let table_name = stream.table_name().clone();
let ctx = ctx.clone();
Box::pin(try_stream! {
plugins
.get::<PermissionCheckerRef>()
.as_ref()
.check_permission(ctx.current_user(), PermissionReq::BulkInsert)
.context(PermissionSnafu)?;
// Cache for resolved table reference - resolve once and reuse
let table_ref = catalog_manager
.table(
&table_name.catalog_name,
&table_name.schema_name,
&table_name.table_name,
None,
)
.await
.context(CatalogSnafu)?
.with_context(|| TableNotFoundSnafu {
table_name: table_name.to_string(),
})?;
// Check permissions once for the stream
let interceptor_ref = plugins.get::<GrpcQueryInterceptorRef<Error>>();
let interceptor = interceptor_ref.as_ref();
interceptor.pre_bulk_insert(table_ref.clone(), ctx.clone())?;
// Process each request in the stream
while let Some(request_result) = stream.next().await {
let request = request_result.map_err(|e| {
let error_msg = format!("Stream error: {:?}", e);
IncompleteGrpcRequestSnafu { err_msg: error_msg }.build()
})?;
let request_id = request.request_id;
let start = Instant::now();
let rows = inserter
.handle_bulk_insert(
table_ref.clone(),
request.flight_data,
request.record_batch,
request.schema_bytes,
)
.await
.context(TableOperationSnafu)?;
let elapsed_secs = start.elapsed().as_secs_f64();
yield DoPutResponse::new(request_id, rows, elapsed_secs);
}
})
}
} }
fn fill_catalog_and_schema_from_context(ddl_expr: &mut DdlExpr, ctx: &QueryContextRef) { fn fill_catalog_and_schema_from_context(ddl_expr: &mut DdlExpr, ctx: &QueryContextRef) {

View File

@@ -22,9 +22,10 @@ use api::v1::region::{
}; };
use arrow::array::Array; use arrow::array::Array;
use arrow::record_batch::RecordBatch; use arrow::record_batch::RecordBatch;
use bytes::Bytes;
use common_base::AffectedRows; use common_base::AffectedRows;
use common_grpc::FlightData; use common_grpc::FlightData;
use common_grpc::flight::{FlightDecoder, FlightEncoder, FlightMessage}; use common_grpc::flight::{FlightEncoder, FlightMessage};
use common_telemetry::error; use common_telemetry::error;
use common_telemetry::tracing_context::TracingContext; use common_telemetry::tracing_context::TracingContext;
use snafu::{OptionExt, ResultExt, ensure}; use snafu::{OptionExt, ResultExt, ensure};
@@ -40,33 +41,20 @@ impl Inserter {
pub async fn handle_bulk_insert( pub async fn handle_bulk_insert(
&self, &self,
table: TableRef, table: TableRef,
decoder: &mut FlightDecoder, raw_flight_data: FlightData,
data: FlightData, record_batch: RecordBatch,
schema_bytes: Bytes,
) -> error::Result<AffectedRows> { ) -> error::Result<AffectedRows> {
let table_info = table.table_info(); let table_info = table.table_info();
let table_id = table_info.table_id(); let table_id = table_info.table_id();
let db_name = table_info.get_db_string(); let db_name = table_info.get_db_string();
let decode_timer = metrics::HANDLE_BULK_INSERT_ELAPSED
.with_label_values(&["decode_request"])
.start_timer();
let body_size = data.data_body.len();
// Build region server requests
let message = decoder
.try_decode(&data)
.context(error::DecodeFlightDataSnafu)?
.context(error::NotSupportedSnafu {
feat: "bulk insert RecordBatch with dictionary arrays",
})?;
let FlightMessage::RecordBatch(record_batch) = message else {
return Ok(0);
};
decode_timer.observe_duration();
if record_batch.num_rows() == 0 { if record_batch.num_rows() == 0 {
return Ok(0); return Ok(0);
} }
// TODO(yingwen): Fill record batch impure default values. let body_size = raw_flight_data.data_body.len();
// TODO(yingwen): Fill record batch impure default values. Note that we should override `raw_flight_data` if we have to fill defaults.
// notify flownode to update dirty timestamps if flow is configured. // notify flownode to update dirty timestamps if flow is configured.
self.maybe_update_flow_dirty_window(table_info.clone(), record_batch.clone()); self.maybe_update_flow_dirty_window(table_info.clone(), record_batch.clone());
@@ -75,8 +63,6 @@ impl Inserter {
.with_label_values(&["raw"]) .with_label_values(&["raw"])
.observe(record_batch.num_rows() as f64); .observe(record_batch.num_rows() as f64);
// safety: when reach here schema must be present.
let schema_bytes = decoder.schema_bytes().unwrap();
let partition_timer = metrics::HANDLE_BULK_INSERT_ELAPSED let partition_timer = metrics::HANDLE_BULK_INSERT_ELAPSED
.with_label_values(&["partition"]) .with_label_values(&["partition"])
.start_timer(); .start_timer();
@@ -106,6 +92,7 @@ impl Inserter {
.find_region_leader(region_id) .find_region_leader(region_id)
.await .await
.context(error::FindRegionLeaderSnafu)?; .context(error::FindRegionLeaderSnafu)?;
let request = RegionRequest { let request = RegionRequest {
header: Some(RegionRequestHeader { header: Some(RegionRequestHeader {
tracing_context: TracingContext::from_current_span().to_w3c(), tracing_context: TracingContext::from_current_span().to_w3c(),
@@ -114,9 +101,9 @@ impl Inserter {
body: Some(region_request::Body::BulkInsert(BulkInsertRequest { body: Some(region_request::Body::BulkInsert(BulkInsertRequest {
region_id: region_id.as_u64(), region_id: region_id.as_u64(),
body: Some(bulk_insert_request::Body::ArrowIpc(ArrowIpc { body: Some(bulk_insert_request::Body::ArrowIpc(ArrowIpc {
schema: schema_bytes, schema: schema_bytes.clone(),
data_header: data.data_header, data_header: raw_flight_data.data_header,
payload: data.data_body, payload: raw_flight_data.data_body,
})), })),
})), })),
}; };
@@ -158,8 +145,6 @@ impl Inserter {
let mut handles = Vec::with_capacity(mask_per_datanode.len()); let mut handles = Vec::with_capacity(mask_per_datanode.len());
// raw daya header and payload bytes.
let mut raw_data_bytes = None;
for (peer, masks) in mask_per_datanode { for (peer, masks) in mask_per_datanode {
for (region_id, mask) in masks { for (region_id, mask) in masks {
if mask.select_none() { if mask.select_none() {
@@ -170,13 +155,10 @@ impl Inserter {
let node_manager = self.node_manager.clone(); let node_manager = self.node_manager.clone();
let peer = peer.clone(); let peer = peer.clone();
let raw_header_and_data = if mask.select_all() { let raw_header_and_data = if mask.select_all() {
Some( Some((
raw_data_bytes raw_flight_data.data_header.clone(),
.get_or_insert_with(|| { raw_flight_data.data_body.clone(),
(data.data_header.clone(), data.data_body.clone()) ))
})
.clone(),
)
} else { } else {
None None
}; };

View File

@@ -25,12 +25,15 @@ use arrow_flight::{
HandshakeRequest, HandshakeResponse, PollInfo, PutResult, SchemaResult, Ticket, HandshakeRequest, HandshakeResponse, PollInfo, PutResult, SchemaResult, Ticket,
}; };
use async_trait::async_trait; use async_trait::async_trait;
use bytes;
use bytes::Bytes; use bytes::Bytes;
use common_grpc::flight::do_put::{DoPutMetadata, DoPutResponse}; use common_grpc::flight::do_put::{DoPutMetadata, DoPutResponse};
use common_grpc::flight::{FlightEncoder, FlightMessage}; use common_grpc::flight::{FlightDecoder, FlightEncoder, FlightMessage};
use common_query::{Output, OutputData}; use common_query::{Output, OutputData};
use common_recordbatch::DfRecordBatch;
use common_telemetry::tracing::info_span; use common_telemetry::tracing::info_span;
use common_telemetry::tracing_context::{FutureExt, TracingContext}; use common_telemetry::tracing_context::{FutureExt, TracingContext};
use datatypes::arrow::datatypes::SchemaRef;
use futures::{Stream, future, ready}; use futures::{Stream, future, ready};
use futures_util::{StreamExt, TryStreamExt}; use futures_util::{StreamExt, TryStreamExt};
use prost::Message; use prost::Message;
@@ -41,7 +44,7 @@ use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream; use tokio_stream::wrappers::ReceiverStream;
use tonic::{Request, Response, Status, Streaming}; use tonic::{Request, Response, Status, Streaming};
use crate::error::{InvalidParameterSnafu, ParseJsonSnafu, Result, ToJsonSnafu}; use crate::error::{InvalidParameterSnafu, Result, ToJsonSnafu};
pub use crate::grpc::flight::stream::FlightRecordBatchStream; pub use crate::grpc::flight::stream::FlightRecordBatchStream;
use crate::grpc::greptime_handler::{GreptimeRequestHandler, get_request_type}; use crate::grpc::greptime_handler::{GreptimeRequestHandler, get_request_type};
use crate::grpc::{FlightCompression, TonicResult, context_auth}; use crate::grpc::{FlightCompression, TonicResult, context_auth};
@@ -223,14 +226,15 @@ impl FlightCraft for GreptimeRequestHandler {
const MAX_PENDING_RESPONSES: usize = 32; const MAX_PENDING_RESPONSES: usize = 32;
let (tx, rx) = mpsc::channel::<TonicResult<DoPutResponse>>(MAX_PENDING_RESPONSES); let (tx, rx) = mpsc::channel::<TonicResult<DoPutResponse>>(MAX_PENDING_RESPONSES);
let stream = PutRecordBatchRequestStream { let stream = PutRecordBatchRequestStream::new(
flight_data_stream: stream, stream,
state: PutRecordBatchRequestStreamState::Init( query_ctx.current_catalog().to_string(),
query_ctx.current_catalog().to_string(), query_ctx.current_schema(),
query_ctx.current_schema(),
),
limiter, limiter,
}; )
.await?;
// Ack to the first schema message when we successfully built the stream.
let _ = tx.send(Ok(DoPutResponse::new(0, 0, 0.0))).await;
self.put_record_batches(stream, tx, query_ctx).await; self.put_record_batches(stream, tx, query_ctx).await;
let response = ReceiverStream::new(rx) let response = ReceiverStream::new(rx)
@@ -252,30 +256,30 @@ impl FlightCraft for GreptimeRequestHandler {
pub struct PutRecordBatchRequest { pub struct PutRecordBatchRequest {
pub table_name: TableName, pub table_name: TableName,
pub request_id: i64, pub request_id: i64,
pub data: FlightData, pub record_batch: DfRecordBatch,
pub _guard: Option<RequestMemoryGuard>, pub schema_bytes: Bytes,
pub flight_data: FlightData,
pub(crate) _guard: Option<RequestMemoryGuard>,
} }
impl PutRecordBatchRequest { impl PutRecordBatchRequest {
fn try_new( fn try_new(
table_name: TableName, table_name: TableName,
record_batch: DfRecordBatch,
request_id: i64,
schema_bytes: Bytes,
flight_data: FlightData, flight_data: FlightData,
limiter: Option<&RequestMemoryLimiter>, limiter: Option<&RequestMemoryLimiter>,
) -> Result<Self> { ) -> Result<Self> {
let request_id = if !flight_data.app_metadata.is_empty() { let memory_usage = flight_data.data_body.len()
let metadata: DoPutMetadata = + flight_data.app_metadata.len()
serde_json::from_slice(&flight_data.app_metadata).context(ParseJsonSnafu)?; + flight_data.data_header.len();
metadata.request_id()
} else {
0
};
let _guard = limiter let _guard = limiter
.filter(|limiter| limiter.is_enabled()) .filter(|limiter| limiter.is_enabled())
.map(|limiter| { .map(|limiter| {
let message_size = flight_data.encoded_len();
limiter limiter
.try_acquire(message_size) .try_acquire(memory_usage)
.map(|guard| { .map(|guard| {
guard.inspect(|g| { guard.inspect(|g| {
METRIC_GRPC_MEMORY_USAGE_BYTES.set(g.current_usage() as i64); METRIC_GRPC_MEMORY_USAGE_BYTES.set(g.current_usage() as i64);
@@ -291,27 +295,32 @@ impl PutRecordBatchRequest {
Ok(Self { Ok(Self {
table_name, table_name,
request_id, request_id,
data: flight_data, record_batch,
schema_bytes,
flight_data,
_guard, _guard,
}) })
} }
} }
pub struct PutRecordBatchRequestStream { pub struct PutRecordBatchRequestStream {
pub flight_data_stream: Streaming<FlightData>, flight_data_stream: Streaming<FlightData>,
pub state: PutRecordBatchRequestStreamState, table_name: TableName,
pub limiter: Option<RequestMemoryLimiter>, schema: SchemaRef,
schema_bytes: Bytes,
decoder: FlightDecoder,
limiter: Option<RequestMemoryLimiter>,
} }
pub enum PutRecordBatchRequestStreamState { impl PutRecordBatchRequestStream {
Init(String, String), /// Creates a new `PutRecordBatchRequestStream` by waiting for the first message,
Started(TableName), /// extracting the table name from the flight descriptor, and decoding the schema.
} pub async fn new(
mut flight_data_stream: Streaming<FlightData>,
impl Stream for PutRecordBatchRequestStream { catalog: String,
type Item = TonicResult<PutRecordBatchRequest>; schema: String,
limiter: Option<RequestMemoryLimiter>,
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { ) -> TonicResult<Self> {
fn extract_table_name(mut descriptor: FlightDescriptor) -> Result<String> { fn extract_table_name(mut descriptor: FlightDescriptor) -> Result<String> {
ensure!( ensure!(
descriptor.r#type == arrow_flight::flight_descriptor::DescriptorType::Path as i32, descriptor.r#type == arrow_flight::flight_descriptor::DescriptorType::Path as i32,
@@ -328,56 +337,131 @@ impl Stream for PutRecordBatchRequestStream {
Ok(descriptor.path.remove(0)) Ok(descriptor.path.remove(0))
} }
let poll = ready!(self.flight_data_stream.poll_next_unpin(cx)); // Wait for the first message which must be a Schema message
let limiter = self.limiter.clone(); let first_message = flight_data_stream.next().await.ok_or_else(|| {
Status::failed_precondition("flight data stream ended unexpectedly")
})??;
let result = match &mut self.state { let flight_descriptor = first_message
PutRecordBatchRequestStreamState::Init(catalog, schema) => match poll { .flight_descriptor
Some(Ok(mut flight_data)) => { .as_ref()
let flight_descriptor = flight_data.flight_descriptor.take(); .ok_or_else(|| {
let result = if let Some(descriptor) = flight_descriptor { Status::failed_precondition("table to put is not found in flight descriptor")
let table_name = extract_table_name(descriptor) })?
.map(|x| TableName::new(catalog.clone(), schema.clone(), x)); .clone();
let table_name = match table_name {
Ok(table_name) => table_name,
Err(e) => return Poll::Ready(Some(Err(e.into()))),
};
let request = PutRecordBatchRequest::try_new( let table_name_str = extract_table_name(flight_descriptor)
table_name.clone(), .map_err(|e| Status::invalid_argument(e.to_string()))?;
flight_data, let table_name = TableName::new(catalog, schema, table_name_str);
limiter.as_ref(),
);
let request = match request {
Ok(request) => request,
Err(e) => return Poll::Ready(Some(Err(e.into()))),
};
self.state = PutRecordBatchRequestStreamState::Started(table_name); // Decode the first message as schema
let mut decoder = FlightDecoder::default();
let schema_message = decoder
.try_decode(&first_message)
.map_err(|e| Status::invalid_argument(format!("Failed to decode schema: {}", e)))?;
Ok(request) let (schema, schema_bytes) = match schema_message {
} else { Some(FlightMessage::Schema(schema)) => {
Err(Status::failed_precondition( let schema_bytes = decoder.schema_bytes().ok_or_else(|| {
"table to put is not found in flight descriptor", Status::internal("decoder should have schema bytes after decoding schema")
)) })?;
}; (schema, schema_bytes)
Some(result) }
} _ => {
Some(Err(e)) => Some(Err(e)), return Err(Status::failed_precondition(
None => None, "first message must be a Schema message",
}, ));
PutRecordBatchRequestStreamState::Started(table_name) => poll.map(|x| { }
x.and_then(|flight_data| {
PutRecordBatchRequest::try_new(
table_name.clone(),
flight_data,
limiter.as_ref(),
)
.map_err(Into::into)
})
}),
}; };
Poll::Ready(result)
Ok(Self {
flight_data_stream,
table_name,
schema,
schema_bytes,
decoder,
limiter,
})
}
/// Returns the table name extracted from the flight descriptor.
pub fn table_name(&self) -> &TableName {
&self.table_name
}
/// Returns the Arrow schema decoded from the first flight message.
pub fn schema(&self) -> &SchemaRef {
&self.schema
}
/// Returns the raw schema bytes in IPC format.
pub fn schema_bytes(&self) -> &Bytes {
&self.schema_bytes
}
}
impl Stream for PutRecordBatchRequestStream {
type Item = TonicResult<PutRecordBatchRequest>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
loop {
let poll = ready!(self.flight_data_stream.poll_next_unpin(cx));
match poll {
Some(Ok(flight_data)) => {
// Extract request_id and body_size from FlightData before decoding
let request_id = if !flight_data.app_metadata.is_empty() {
match serde_json::from_slice::<DoPutMetadata>(&flight_data.app_metadata) {
Ok(metadata) => metadata.request_id(),
Err(_) => 0,
}
} else {
0
};
// Decode FlightData to RecordBatch
match self.decoder.try_decode(&flight_data) {
Ok(Some(FlightMessage::RecordBatch(record_batch))) => {
let limiter = self.limiter.clone();
let table_name = self.table_name.clone();
let schema_bytes = self.schema_bytes.clone();
return Poll::Ready(Some(
PutRecordBatchRequest::try_new(
table_name,
record_batch,
request_id,
schema_bytes,
flight_data,
limiter.as_ref(),
)
.map_err(|e| Status::invalid_argument(e.to_string())),
));
}
Ok(Some(_)) => {
return Poll::Ready(Some(Err(Status::invalid_argument(
"Expected RecordBatch message, got other message type",
))));
}
Ok(None) => {
// Dictionary batch - processed internally by decoder, continue polling
continue;
}
Err(e) => {
return Poll::Ready(Some(Err(Status::invalid_argument(format!(
"Failed to decode RecordBatch: {}",
e
)))));
}
}
}
Some(Err(e)) => {
return Poll::Ready(Some(Err(e)));
}
None => {
return Poll::Ready(None);
}
}
}
} }
} }

View File

@@ -24,7 +24,6 @@ use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_catalog::parse_catalog_and_schema_from_db_string; use common_catalog::parse_catalog_and_schema_from_db_string;
use common_error::ext::ErrorExt; use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode; use common_error::status_code::StatusCode;
use common_grpc::flight::FlightDecoder;
use common_grpc::flight::do_put::DoPutResponse; use common_grpc::flight::do_put::DoPutResponse;
use common_query::Output; use common_query::Output;
use common_runtime::Runtime; use common_runtime::Runtime;
@@ -37,15 +36,14 @@ use futures_util::StreamExt;
use session::context::{Channel, QueryContextBuilder, QueryContextRef}; use session::context::{Channel, QueryContextBuilder, QueryContextRef};
use session::hints::READ_PREFERENCE_HINT; use session::hints::READ_PREFERENCE_HINT;
use snafu::{OptionExt, ResultExt}; use snafu::{OptionExt, ResultExt};
use table::TableRef;
use tokio::sync::mpsc; use tokio::sync::mpsc;
use tokio::sync::mpsc::error::TrySendError; use tokio::sync::mpsc::error::TrySendError;
use tonic::Status;
use crate::error::{InvalidQuerySnafu, JoinTaskSnafu, Result, UnknownHintSnafu}; use crate::error::{InvalidQuerySnafu, JoinTaskSnafu, Result, UnknownHintSnafu};
use crate::grpc::flight::{PutRecordBatchRequest, PutRecordBatchRequestStream}; use crate::grpc::flight::PutRecordBatchRequestStream;
use crate::grpc::{FlightCompression, TonicResult, context_auth}; use crate::grpc::{FlightCompression, TonicResult, context_auth};
use crate::metrics; use crate::metrics::{self, METRIC_SERVER_GRPC_DB_REQUEST_TIMER};
use crate::metrics::METRIC_SERVER_GRPC_DB_REQUEST_TIMER;
use crate::query_handler::grpc::ServerGrpcQueryHandlerRef; use crate::query_handler::grpc::ServerGrpcQueryHandlerRef;
#[derive(Clone)] #[derive(Clone)]
@@ -134,7 +132,7 @@ impl GreptimeRequestHandler {
pub(crate) async fn put_record_batches( pub(crate) async fn put_record_batches(
&self, &self,
mut stream: PutRecordBatchRequestStream, stream: PutRecordBatchRequestStream,
result_sender: mpsc::Sender<TonicResult<DoPutResponse>>, result_sender: mpsc::Sender<TonicResult<DoPutResponse>>,
query_ctx: QueryContextRef, query_ctx: QueryContextRef,
) { ) {
@@ -144,37 +142,24 @@ impl GreptimeRequestHandler {
.clone() .clone()
.unwrap_or_else(common_runtime::global_runtime); .unwrap_or_else(common_runtime::global_runtime);
runtime.spawn(async move { runtime.spawn(async move {
// Cached table ref let mut result_stream = handler.handle_put_record_batch_stream(stream, query_ctx);
let mut table_ref: Option<TableRef> = None;
let mut decoder = FlightDecoder::default(); while let Some(result) = result_stream.next().await {
while let Some(request) = stream.next().await { match &result {
let request = match request { Ok(response) => {
Ok(request) => request, // Record the elapsed time metric from the response
Err(e) => { metrics::GRPC_BULK_INSERT_ELAPSED.observe(response.elapsed_secs());
let _ = result_sender.try_send(Err(e));
break;
} }
}; Err(e) => {
let PutRecordBatchRequest { error!(e; "Failed to handle flight record batches");
table_name, }
request_id, }
data,
_guard,
} = request;
let timer = metrics::GRPC_BULK_INSERT_ELAPSED.start_timer(); if let Err(e) =
let result = handler result_sender.try_send(result.map_err(|e| Status::from_error(Box::new(e))))
.put_record_batch(&table_name, &mut table_ref, &mut decoder, data, query_ctx.clone()) && let TrySendError::Closed(_) = e
.await {
.inspect_err(|e| error!(e; "Failed to handle flight record batches")); warn!(r#""DoPut" client maybe unreachable, abort handling its message"#);
timer.observe_duration();
let result = result
.map(|x| DoPutResponse::new(request_id, x))
.map_err(Into::into);
if let Err(e)= result_sender.try_send(result)
&& let TrySendError::Closed(_) = e {
warn!(r#""DoPut" client with request_id {} maybe unreachable, abort handling its message"#, request_id);
break; break;
} }
} }

View File

@@ -12,21 +12,22 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
use std::pin::Pin;
use std::sync::Arc; use std::sync::Arc;
use api::v1::greptime_request::Request; use api::v1::greptime_request::Request;
use arrow_flight::FlightData;
use async_trait::async_trait; use async_trait::async_trait;
use common_base::AffectedRows; use common_base::AffectedRows;
use common_error::ext::{BoxedError, ErrorExt}; use common_error::ext::{BoxedError, ErrorExt};
use common_grpc::flight::FlightDecoder; use common_grpc::flight::do_put::DoPutResponse;
use common_query::Output; use common_query::Output;
use futures::Stream;
use session::context::QueryContextRef; use session::context::QueryContextRef;
use snafu::ResultExt; use snafu::ResultExt;
use table::TableRef; use table::TableRef;
use table::table_name::TableName;
use crate::error::{self, Result}; use crate::error::{self, Result};
use crate::grpc::flight::{PutRecordBatchRequest, PutRecordBatchRequestStream};
pub type GrpcQueryHandlerRef<E> = Arc<dyn GrpcQueryHandler<Error = E> + Send + Sync>; pub type GrpcQueryHandlerRef<E> = Arc<dyn GrpcQueryHandler<Error = E> + Send + Sync>;
pub type ServerGrpcQueryHandlerRef = GrpcQueryHandlerRef<error::Error>; pub type ServerGrpcQueryHandlerRef = GrpcQueryHandlerRef<error::Error>;
@@ -45,12 +46,16 @@ pub trait GrpcQueryHandler {
async fn put_record_batch( async fn put_record_batch(
&self, &self,
table_name: &TableName, request: PutRecordBatchRequest,
table_ref: &mut Option<TableRef>, table_ref: &mut Option<TableRef>,
decoder: &mut FlightDecoder,
flight_data: FlightData,
ctx: QueryContextRef, ctx: QueryContextRef,
) -> std::result::Result<AffectedRows, Self::Error>; ) -> std::result::Result<AffectedRows, Self::Error>;
fn handle_put_record_batch_stream(
&self,
stream: PutRecordBatchRequestStream,
ctx: QueryContextRef,
) -> Pin<Box<dyn Stream<Item = std::result::Result<DoPutResponse, Self::Error>> + Send>>;
} }
pub struct ServerGrpcQueryHandlerAdapter<E>(GrpcQueryHandlerRef<E>); pub struct ServerGrpcQueryHandlerAdapter<E>(GrpcQueryHandlerRef<E>);
@@ -78,16 +83,31 @@ where
async fn put_record_batch( async fn put_record_batch(
&self, &self,
table_name: &TableName, request: PutRecordBatchRequest,
table_ref: &mut Option<TableRef>, table_ref: &mut Option<TableRef>,
decoder: &mut FlightDecoder,
data: FlightData,
ctx: QueryContextRef, ctx: QueryContextRef,
) -> Result<AffectedRows> { ) -> Result<AffectedRows> {
self.0 self.0
.put_record_batch(table_name, table_ref, decoder, data, ctx) .put_record_batch(request, table_ref, ctx)
.await .await
.map_err(BoxedError::new) .map_err(BoxedError::new)
.context(error::ExecuteGrpcRequestSnafu) .context(error::ExecuteGrpcRequestSnafu)
} }
fn handle_put_record_batch_stream(
&self,
stream: PutRecordBatchRequestStream,
ctx: QueryContextRef,
) -> Pin<Box<dyn Stream<Item = Result<DoPutResponse>> + Send>> {
use futures_util::StreamExt;
Box::pin(
self.0
.handle_put_record_batch_stream(stream, ctx)
.map(|result| {
result
.map_err(|e| BoxedError::new(e))
.context(error::ExecuteGrpcRequestSnafu)
}),
)
}
} }

View File

@@ -16,12 +16,11 @@ use std::sync::Arc;
use api::v1::greptime_request::Request; use api::v1::greptime_request::Request;
use api::v1::query_request::Query; use api::v1::query_request::Query;
use arrow_flight::FlightData;
use async_trait::async_trait; use async_trait::async_trait;
use catalog::memory::MemoryCatalogManager; use catalog::memory::MemoryCatalogManager;
use common_base::AffectedRows; use common_base::AffectedRows;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_grpc::flight::FlightDecoder; use common_grpc::flight::do_put::DoPutResponse;
use common_query::Output; use common_query::Output;
use datafusion_expr::LogicalPlan; use datafusion_expr::LogicalPlan;
use query::options::QueryOptions; use query::options::QueryOptions;
@@ -35,7 +34,6 @@ use session::context::QueryContextRef;
use snafu::ensure; use snafu::ensure;
use sql::statements::statement::Statement; use sql::statements::statement::Statement;
use table::TableRef; use table::TableRef;
use table::table_name::TableName;
mod http; mod http;
mod interceptor; mod interceptor;
@@ -165,14 +163,22 @@ impl GrpcQueryHandler for DummyInstance {
async fn put_record_batch( async fn put_record_batch(
&self, &self,
_table_name: &TableName, _request: servers::grpc::flight::PutRecordBatchRequest,
_table_ref: &mut Option<TableRef>, _table_ref: &mut Option<TableRef>,
_decoder: &mut FlightDecoder,
_data: FlightData,
_ctx: QueryContextRef, _ctx: QueryContextRef,
) -> std::result::Result<AffectedRows, Self::Error> { ) -> std::result::Result<AffectedRows, Self::Error> {
unimplemented!() unimplemented!()
} }
fn handle_put_record_batch_stream(
&self,
_stream: servers::grpc::flight::PutRecordBatchRequestStream,
_ctx: QueryContextRef,
) -> std::pin::Pin<
Box<dyn futures::Stream<Item = std::result::Result<DoPutResponse, Self::Error>> + Send>,
> {
unimplemented!()
}
} }
fn create_testing_instance(table: TableRef) -> DummyInstance { fn create_testing_instance(table: TableRef) -> DummyInstance {