From 7527ff976e5ee887540ca1e9bb58262117252fca Mon Sep 17 00:00:00 2001 From: "Lei, HUANG" Date: Tue, 10 Jun 2025 13:37:46 +0000 Subject: [PATCH] feat/bulk-support-flow-batch: ### Refactor and Enhance Timestamp Handling in gRPC and Bulk Insert - **Refactor Table Handling**: - Updated `put_record_batch` method to use `TableRef` instead of `TableId` in `grpc.rs`, `greptime_handler.rs`, and `grpc.rs`. - Modified `handle_bulk_insert` to accept `TableRef` and extract `TableId` internally in `bulk_insert.rs`. - **Enhance Timestamp Processing**: - Added `compute_timestamp_range` function to calculate timestamp range in `bulk_insert.rs`. - Introduced error handling for invalid time index types in `error.rs`. - **Test Adjustments**: - Updated `DummyInstance` implementation in `tests/mod.rs` to align with new method signatures. Signed-off-by: Lei, HUANG --- src/frontend/src/instance/grpc.rs | 25 ++++----- src/operator/src/bulk_insert.rs | 71 +++++++++++++++++++++++- src/operator/src/error.rs | 8 +++ src/servers/src/grpc/greptime_handler.rs | 7 ++- src/servers/src/query_handler/grpc.rs | 11 ++-- src/servers/tests/mod.rs | 12 ++-- 6 files changed, 102 insertions(+), 32 deletions(-) diff --git a/src/frontend/src/instance/grpc.rs b/src/frontend/src/instance/grpc.rs index 555330e76a..5383bd931a 100644 --- a/src/frontend/src/instance/grpc.rs +++ b/src/frontend/src/instance/grpc.rs @@ -35,8 +35,8 @@ use servers::query_handler::grpc::GrpcQueryHandler; use servers::query_handler::sql::SqlQueryHandler; use session::context::QueryContextRef; use snafu::{ensure, OptionExt, ResultExt}; -use table::metadata::TableId; use table::table_name::TableName; +use table::TableRef; use crate::error::{ CatalogSnafu, DataFusionSnafu, Error, InFlightWriteBytesExceededSnafu, @@ -235,34 +235,33 @@ impl GrpcQueryHandler for Instance { async fn put_record_batch( &self, - table: &TableName, - table_id: &mut Option, + table_name: &TableName, + table_ref: &mut Option, decoder: &mut FlightDecoder, data: FlightData, ) -> Result { - let table_id = if let Some(table_id) = table_id { - *table_id + let table = if let Some(table) = table_ref { + table.clone() } else { let table = self .catalog_manager() .table( - &table.catalog_name, - &table.schema_name, - &table.table_name, + &table_name.catalog_name, + &table_name.schema_name, + &table_name.table_name, None, ) .await .context(CatalogSnafu)? .with_context(|| TableNotFoundSnafu { - table_name: table.to_string(), + table_name: table_name.to_string(), })?; - let id = table.table_info().table_id(); - *table_id = Some(id); - id + *table_ref = Some(table.clone()); + table }; self.inserter - .handle_bulk_insert(table_id, decoder, data) + .handle_bulk_insert(table, decoder, data) .await .context(TableOperationSnafu) } diff --git a/src/operator/src/bulk_insert.rs b/src/operator/src/bulk_insert.rs index ead97d9ce7..99fc9e327b 100644 --- a/src/operator/src/bulk_insert.rs +++ b/src/operator/src/bulk_insert.rs @@ -17,13 +17,20 @@ use api::v1::region::{ bulk_insert_request, region_request, BulkInsertRequest, RegionRequest, RegionRequestHeader, }; use api::v1::ArrowIpc; +use arrow::array::{ + Array, TimestampMicrosecondArray, TimestampMillisecondArray, TimestampNanosecondArray, + TimestampSecondArray, +}; +use arrow::datatypes::{DataType, Int64Type, TimeUnit}; +use arrow::record_batch::RecordBatch; use common_base::AffectedRows; use common_grpc::flight::{FlightDecoder, FlightEncoder, FlightMessage}; use common_grpc::FlightData; +use common_telemetry::tracing::Instrument; use common_telemetry::tracing_context::TracingContext; -use snafu::ResultExt; +use snafu::{OptionExt, ResultExt}; use store_api::storage::RegionId; -use table::metadata::TableId; +use table::TableRef; use crate::insert::Inserter; use crate::{error, metrics}; @@ -32,10 +39,11 @@ impl Inserter { /// Handle bulk insert request. pub async fn handle_bulk_insert( &self, - table_id: TableId, + table: TableRef, decoder: &mut FlightDecoder, data: FlightData, ) -> error::Result { + let table_id = table.table_info().table_id(); let decode_timer = metrics::HANDLE_BULK_INSERT_ELAPSED .with_label_values(&["decode_request"]) .start_timer(); @@ -48,6 +56,19 @@ impl Inserter { return Ok(0); }; decode_timer.observe_duration(); + if let Some((min, max)) = compute_timestamp_range( + &record_batch, + &table + .table_info() + .meta + .schema + .timestamp_column() + .as_ref() + .unwrap() + .name, + )? { + // notify flownode. + } metrics::BULK_REQUEST_MESSAGE_SIZE.observe(body_size as f64); metrics::BULK_REQUEST_ROWS .with_label_values(&["raw"]) @@ -217,3 +238,47 @@ impl Inserter { Ok(rows_inserted) } } + +/// Calculate the timestamp range of record batch. Return `None` if record batch is empty. +fn compute_timestamp_range( + rb: &RecordBatch, + timestamp_index_name: &str, +) -> error::Result> { + let ts_col = rb + .column_by_name(timestamp_index_name) + .context(error::ColumnNotFoundSnafu { + msg: timestamp_index_name, + })?; + if rb.num_rows() == 0 { + return Ok(None); + } + let primitive = match ts_col.data_type() { + DataType::Timestamp(unit, _) => match unit { + TimeUnit::Second => ts_col + .as_any() + .downcast_ref::() + .unwrap() + .reinterpret_cast::(), + TimeUnit::Millisecond => ts_col + .as_any() + .downcast_ref::() + .unwrap() + .reinterpret_cast::(), + TimeUnit::Microsecond => ts_col + .as_any() + .downcast_ref::() + .unwrap() + .reinterpret_cast::(), + TimeUnit::Nanosecond => ts_col + .as_any() + .downcast_ref::() + .unwrap() + .reinterpret_cast::(), + }, + t @ _ => { + return error::InvalidTimeIndexTypeSnafu { ty: t.clone() }.fail(); + } + }; + + Ok(arrow::compute::min(&primitive).zip(arrow::compute::max(&primitive))) +} diff --git a/src/operator/src/error.rs b/src/operator/src/error.rs index 1242138539..4f6575bf36 100644 --- a/src/operator/src/error.rs +++ b/src/operator/src/error.rs @@ -837,6 +837,13 @@ pub enum Error { #[snafu(implicit)] location: Location, }, + + #[snafu(display("Invalid time index type: {}", ty))] + InvalidTimeIndexType { + ty: arrow::datatypes::DataType, + #[snafu(implicit)] + location: Location, + }, } pub type Result = std::result::Result; @@ -964,6 +971,7 @@ impl ErrorExt for Error { Error::ColumnOptions { source, .. } => source.status_code(), Error::DecodeFlightData { source, .. } => source.status_code(), Error::ComputeArrow { .. } => StatusCode::Internal, + Error::InvalidTimeIndexType { .. } => StatusCode::InvalidArguments, } } diff --git a/src/servers/src/grpc/greptime_handler.rs b/src/servers/src/grpc/greptime_handler.rs index 14bf55bf6b..b0f26fa0ee 100644 --- a/src/servers/src/grpc/greptime_handler.rs +++ b/src/servers/src/grpc/greptime_handler.rs @@ -41,6 +41,7 @@ use session::context::{QueryContext, QueryContextBuilder, QueryContextRef}; use session::hints::READ_PREFERENCE_HINT; use snafu::{OptionExt, ResultExt}; use table::metadata::TableId; +use table::TableRef; use tokio::sync::mpsc; use crate::error::Error::UnsupportedAuthScheme; @@ -149,8 +150,8 @@ impl GreptimeRequestHandler { .clone() .unwrap_or_else(common_runtime::global_runtime); runtime.spawn(async move { - // Cached table id - let mut table_id: Option = None; + // Cached table ref + let mut table_ref: Option = None; let mut decoder = FlightDecoder::default(); while let Some(request) = stream.next().await { @@ -169,7 +170,7 @@ impl GreptimeRequestHandler { let timer = metrics::GRPC_BULK_INSERT_ELAPSED.start_timer(); let result = handler - .put_record_batch(&table_name, &mut table_id, &mut decoder, data) + .put_record_batch(&table_name, &mut table_ref, &mut decoder, data) .await .inspect_err(|e| error!(e; "Failed to handle flight record batches")); timer.observe_duration(); diff --git a/src/servers/src/query_handler/grpc.rs b/src/servers/src/query_handler/grpc.rs index c7055092f6..f2a941da39 100644 --- a/src/servers/src/query_handler/grpc.rs +++ b/src/servers/src/query_handler/grpc.rs @@ -25,6 +25,7 @@ use session::context::QueryContextRef; use snafu::ResultExt; use table::metadata::TableId; use table::table_name::TableName; +use table::TableRef; use crate::error::{self, Result}; @@ -45,8 +46,8 @@ pub trait GrpcQueryHandler { async fn put_record_batch( &self, - table: &TableName, - table_id: &mut Option, + table_name: &TableName, + table_ref: &mut Option, decoder: &mut FlightDecoder, flight_data: FlightData, ) -> std::result::Result; @@ -77,13 +78,13 @@ where async fn put_record_batch( &self, - table: &TableName, - table_id: &mut Option, + table_name: &TableName, + table_ref: &mut Option, decoder: &mut FlightDecoder, data: FlightData, ) -> Result { self.0 - .put_record_batch(table, table_id, decoder, data) + .put_record_batch(table_name, table_ref, decoder, data) .await .map_err(BoxedError::new) .context(error::ExecuteGrpcRequestSnafu) diff --git a/src/servers/tests/mod.rs b/src/servers/tests/mod.rs index 1c5049e3ba..63f2ec4a53 100644 --- a/src/servers/tests/mod.rs +++ b/src/servers/tests/mod.rs @@ -160,15 +160,11 @@ impl GrpcQueryHandler for DummyInstance { async fn put_record_batch( &self, - table: &TableName, - table_id: &mut Option, - decoder: &mut FlightDecoder, - data: FlightData, + _table_name: &TableName, + _table_ref: &mut Option, + _decoder: &mut FlightDecoder, + _data: FlightData, ) -> std::result::Result { - let _ = table; - let _ = data; - let _ = table_id; - let _ = decoder; unimplemented!() } }