From 640c102b216168f7bf339cec108cfeacaa5fd266 Mon Sep 17 00:00:00 2001 From: "Lei, HUANG" <6406592+v0y4g3r@users.noreply.github.com> Date: Wed, 13 May 2026 15:01:15 +0800 Subject: [PATCH] fix: expose flight timestamp range metadata (#8104) * fix: expose flight timestamp range metadata - Flight metadata: expose `min_timestamp` and `max_timestamp` from `src/common/grpc/src/flight/do_put.rs` and `src/servers/src/grpc/flight.rs` so callers can decide fast paths. Signed-off-by: Lei, HUANG * chore: address pr comments Signed-off-by: Lei, HUANG --------- Signed-off-by: Lei, HUANG --- src/common/grpc/src/flight/do_put.rs | 14 ++++++++++++++ src/servers/src/grpc/flight.rs | 18 +++++++++++++----- 2 files changed, 27 insertions(+), 5 deletions(-) diff --git a/src/common/grpc/src/flight/do_put.rs b/src/common/grpc/src/flight/do_put.rs index 3898740afa..e124726366 100644 --- a/src/common/grpc/src/flight/do_put.rs +++ b/src/common/grpc/src/flight/do_put.rs @@ -55,6 +55,12 @@ impl DoPutMetadata { pub fn max_timestamp(&self) -> Option { self.max_timestamp } + + pub fn timestamp_range(&self) -> Option<(i64, i64)> { + self.min_timestamp + .zip(self.max_timestamp) + .filter(|(min, max)| max >= min) + } } /// The response in the "DoPut" returned stream. @@ -109,6 +115,14 @@ mod tests { assert_eq!(metadata.request_id(), 42); } + #[test] + fn test_serde_do_put_metadata_with_timestamp_range() { + let serialized = r#"{"request_id":42,"min_timestamp":1000,"max_timestamp":2000}"#; + let metadata = serde_json::from_str::(serialized).unwrap(); + assert_eq!(metadata.request_id(), 42); + assert_eq!(metadata.timestamp_range(), Some((1000, 2000))); + } + #[test] fn test_serde_do_put_response() { let x = DoPutResponse::new(42, 88, 0.123); diff --git a/src/servers/src/grpc/flight.rs b/src/servers/src/grpc/flight.rs index 21a03a41d3..b90bbba64f 100644 --- a/src/servers/src/grpc/flight.rs +++ b/src/servers/src/grpc/flight.rs @@ -276,6 +276,7 @@ impl FlightCraft for GreptimeRequestHandler { pub struct PutRecordBatchRequest { pub table_name: TableName, pub request_id: i64, + pub timestamp_range: Option<(i64, i64)>, pub record_batch: DfRecordBatch, pub schema_bytes: Bytes, pub flight_data: FlightData, @@ -287,6 +288,7 @@ impl PutRecordBatchRequest { table_name: TableName, record_batch: DfRecordBatch, request_id: i64, + timestamp_range: Option<(i64, i64)>, schema_bytes: Bytes, flight_data: FlightData, limiter: Option<&ServerMemoryLimiter>, @@ -311,6 +313,7 @@ impl PutRecordBatchRequest { Ok(Self { table_name, request_id, + timestamp_range, record_batch, schema_bytes, flight_data, @@ -475,14 +478,18 @@ impl Stream for PutRecordBatchRequestStream { schema_bytes, decoder, } => { - // Extract request_id and body_size from FlightData before decoding - let request_id = if !flight_data.app_metadata.is_empty() { + // Extract request_id and time range from FlightData before decoding + let metadata = if !flight_data.app_metadata.is_empty() { serde_json::from_slice::(&flight_data.app_metadata) - .map(|meta| meta.request_id()) - .unwrap_or_default() + .ok() } else { - 0 + None }; + let request_id = metadata + .as_ref() + .map(|meta| meta.request_id()) + .unwrap_or_default(); + let timestamp_range = metadata.and_then(|meta| meta.timestamp_range()); // Decode FlightData to RecordBatch match decoder.try_decode(&flight_data) { @@ -494,6 +501,7 @@ impl Stream for PutRecordBatchRequestStream { table_name, record_batch, request_id, + timestamp_range, schema_bytes, flight_data, limiter.as_ref(),