feat/bulk-support-flow-batch:

### Refactor and Enhance Timestamp Handling in gRPC and Bulk Insert

 - **Refactor Table Handling**:
   - Updated `put_record_batch` method to use `TableRef` instead of `TableId` in `grpc.rs`, `greptime_handler.rs`, and `grpc.rs`.
   - Modified `handle_bulk_insert` to accept `TableRef` and extract `TableId` internally in `bulk_insert.rs`.

 - **Enhance Timestamp Processing**:
   - Added `compute_timestamp_range` function to calculate timestamp range in `bulk_insert.rs`.
   - Introduced error handling for invalid time index types in `error.rs`.

 - **Test Adjustments**:
   - Updated `DummyInstance` implementation in `tests/mod.rs` to align with new method signatures.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>
This commit is contained in:
Lei, HUANG
2025-06-10 13:37:46 +00:00
committed by discord9
parent 1d53dd26ae
commit 7527ff976e
6 changed files with 102 additions and 32 deletions

View File

@@ -35,8 +35,8 @@ use servers::query_handler::grpc::GrpcQueryHandler;
use servers::query_handler::sql::SqlQueryHandler;
use session::context::QueryContextRef;
use snafu::{ensure, OptionExt, ResultExt};
use table::metadata::TableId;
use table::table_name::TableName;
use table::TableRef;
use crate::error::{
CatalogSnafu, DataFusionSnafu, Error, InFlightWriteBytesExceededSnafu,
@@ -235,34 +235,33 @@ impl GrpcQueryHandler for Instance {
async fn put_record_batch(
&self,
table: &TableName,
table_id: &mut Option<TableId>,
table_name: &TableName,
table_ref: &mut Option<TableRef>,
decoder: &mut FlightDecoder,
data: FlightData,
) -> Result<AffectedRows> {
let table_id = if let Some(table_id) = table_id {
*table_id
let table = if let Some(table) = table_ref {
table.clone()
} else {
let table = self
.catalog_manager()
.table(
&table.catalog_name,
&table.schema_name,
&table.table_name,
&table_name.catalog_name,
&table_name.schema_name,
&table_name.table_name,
None,
)
.await
.context(CatalogSnafu)?
.with_context(|| TableNotFoundSnafu {
table_name: table.to_string(),
table_name: table_name.to_string(),
})?;
let id = table.table_info().table_id();
*table_id = Some(id);
id
*table_ref = Some(table.clone());
table
};
self.inserter
.handle_bulk_insert(table_id, decoder, data)
.handle_bulk_insert(table, decoder, data)
.await
.context(TableOperationSnafu)
}

View File

@@ -17,13 +17,20 @@ use api::v1::region::{
bulk_insert_request, region_request, BulkInsertRequest, RegionRequest, RegionRequestHeader,
};
use api::v1::ArrowIpc;
use arrow::array::{
Array, TimestampMicrosecondArray, TimestampMillisecondArray, TimestampNanosecondArray,
TimestampSecondArray,
};
use arrow::datatypes::{DataType, Int64Type, TimeUnit};
use arrow::record_batch::RecordBatch;
use common_base::AffectedRows;
use common_grpc::flight::{FlightDecoder, FlightEncoder, FlightMessage};
use common_grpc::FlightData;
use common_telemetry::tracing::Instrument;
use common_telemetry::tracing_context::TracingContext;
use snafu::ResultExt;
use snafu::{OptionExt, ResultExt};
use store_api::storage::RegionId;
use table::metadata::TableId;
use table::TableRef;
use crate::insert::Inserter;
use crate::{error, metrics};
@@ -32,10 +39,11 @@ impl Inserter {
/// Handle bulk insert request.
pub async fn handle_bulk_insert(
&self,
table_id: TableId,
table: TableRef,
decoder: &mut FlightDecoder,
data: FlightData,
) -> error::Result<AffectedRows> {
let table_id = table.table_info().table_id();
let decode_timer = metrics::HANDLE_BULK_INSERT_ELAPSED
.with_label_values(&["decode_request"])
.start_timer();
@@ -48,6 +56,19 @@ impl Inserter {
return Ok(0);
};
decode_timer.observe_duration();
if let Some((min, max)) = compute_timestamp_range(
&record_batch,
&table
.table_info()
.meta
.schema
.timestamp_column()
.as_ref()
.unwrap()
.name,
)? {
// notify flownode.
}
metrics::BULK_REQUEST_MESSAGE_SIZE.observe(body_size as f64);
metrics::BULK_REQUEST_ROWS
.with_label_values(&["raw"])
@@ -217,3 +238,47 @@ impl Inserter {
Ok(rows_inserted)
}
}
/// Calculate the timestamp range of record batch. Return `None` if record batch is empty.
fn compute_timestamp_range(
rb: &RecordBatch,
timestamp_index_name: &str,
) -> error::Result<Option<(i64, i64)>> {
let ts_col = rb
.column_by_name(timestamp_index_name)
.context(error::ColumnNotFoundSnafu {
msg: timestamp_index_name,
})?;
if rb.num_rows() == 0 {
return Ok(None);
}
let primitive = match ts_col.data_type() {
DataType::Timestamp(unit, _) => match unit {
TimeUnit::Second => ts_col
.as_any()
.downcast_ref::<TimestampSecondArray>()
.unwrap()
.reinterpret_cast::<Int64Type>(),
TimeUnit::Millisecond => ts_col
.as_any()
.downcast_ref::<TimestampMillisecondArray>()
.unwrap()
.reinterpret_cast::<Int64Type>(),
TimeUnit::Microsecond => ts_col
.as_any()
.downcast_ref::<TimestampMicrosecondArray>()
.unwrap()
.reinterpret_cast::<Int64Type>(),
TimeUnit::Nanosecond => ts_col
.as_any()
.downcast_ref::<TimestampNanosecondArray>()
.unwrap()
.reinterpret_cast::<Int64Type>(),
},
t @ _ => {
return error::InvalidTimeIndexTypeSnafu { ty: t.clone() }.fail();
}
};
Ok(arrow::compute::min(&primitive).zip(arrow::compute::max(&primitive)))
}

View File

@@ -837,6 +837,13 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Invalid time index type: {}", ty))]
InvalidTimeIndexType {
ty: arrow::datatypes::DataType,
#[snafu(implicit)]
location: Location,
},
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -964,6 +971,7 @@ impl ErrorExt for Error {
Error::ColumnOptions { source, .. } => source.status_code(),
Error::DecodeFlightData { source, .. } => source.status_code(),
Error::ComputeArrow { .. } => StatusCode::Internal,
Error::InvalidTimeIndexType { .. } => StatusCode::InvalidArguments,
}
}

View File

@@ -41,6 +41,7 @@ use session::context::{QueryContext, QueryContextBuilder, QueryContextRef};
use session::hints::READ_PREFERENCE_HINT;
use snafu::{OptionExt, ResultExt};
use table::metadata::TableId;
use table::TableRef;
use tokio::sync::mpsc;
use crate::error::Error::UnsupportedAuthScheme;
@@ -149,8 +150,8 @@ impl GreptimeRequestHandler {
.clone()
.unwrap_or_else(common_runtime::global_runtime);
runtime.spawn(async move {
// Cached table id
let mut table_id: Option<TableId> = None;
// Cached table ref
let mut table_ref: Option<TableRef> = None;
let mut decoder = FlightDecoder::default();
while let Some(request) = stream.next().await {
@@ -169,7 +170,7 @@ impl GreptimeRequestHandler {
let timer = metrics::GRPC_BULK_INSERT_ELAPSED.start_timer();
let result = handler
.put_record_batch(&table_name, &mut table_id, &mut decoder, data)
.put_record_batch(&table_name, &mut table_ref, &mut decoder, data)
.await
.inspect_err(|e| error!(e; "Failed to handle flight record batches"));
timer.observe_duration();

View File

@@ -25,6 +25,7 @@ use session::context::QueryContextRef;
use snafu::ResultExt;
use table::metadata::TableId;
use table::table_name::TableName;
use table::TableRef;
use crate::error::{self, Result};
@@ -45,8 +46,8 @@ pub trait GrpcQueryHandler {
async fn put_record_batch(
&self,
table: &TableName,
table_id: &mut Option<TableId>,
table_name: &TableName,
table_ref: &mut Option<TableRef>,
decoder: &mut FlightDecoder,
flight_data: FlightData,
) -> std::result::Result<AffectedRows, Self::Error>;
@@ -77,13 +78,13 @@ where
async fn put_record_batch(
&self,
table: &TableName,
table_id: &mut Option<TableId>,
table_name: &TableName,
table_ref: &mut Option<TableRef>,
decoder: &mut FlightDecoder,
data: FlightData,
) -> Result<AffectedRows> {
self.0
.put_record_batch(table, table_id, decoder, data)
.put_record_batch(table_name, table_ref, decoder, data)
.await
.map_err(BoxedError::new)
.context(error::ExecuteGrpcRequestSnafu)

View File

@@ -160,15 +160,11 @@ impl GrpcQueryHandler for DummyInstance {
async fn put_record_batch(
&self,
table: &TableName,
table_id: &mut Option<TableId>,
decoder: &mut FlightDecoder,
data: FlightData,
_table_name: &TableName,
_table_ref: &mut Option<TableRef>,
_decoder: &mut FlightDecoder,
_data: FlightData,
) -> std::result::Result<AffectedRows, Self::Error> {
let _ = table;
let _ = data;
let _ = table_id;
let _ = decoder;
unimplemented!()
}
}