diff --git a/src/common/grpc/src/flight/do_put.rs b/src/common/grpc/src/flight/do_put.rs index 3898740afa..e124726366 100644 --- a/src/common/grpc/src/flight/do_put.rs +++ b/src/common/grpc/src/flight/do_put.rs @@ -55,6 +55,12 @@ impl DoPutMetadata { pub fn max_timestamp(&self) -> Option { self.max_timestamp } + + pub fn timestamp_range(&self) -> Option<(i64, i64)> { + self.min_timestamp + .zip(self.max_timestamp) + .filter(|(min, max)| max >= min) + } } /// The response in the "DoPut" returned stream. @@ -109,6 +115,14 @@ mod tests { assert_eq!(metadata.request_id(), 42); } + #[test] + fn test_serde_do_put_metadata_with_timestamp_range() { + let serialized = r#"{"request_id":42,"min_timestamp":1000,"max_timestamp":2000}"#; + let metadata = serde_json::from_str::(serialized).unwrap(); + assert_eq!(metadata.request_id(), 42); + assert_eq!(metadata.timestamp_range(), Some((1000, 2000))); + } + #[test] fn test_serde_do_put_response() { let x = DoPutResponse::new(42, 88, 0.123); diff --git a/src/servers/src/grpc/flight.rs b/src/servers/src/grpc/flight.rs index 21a03a41d3..b90bbba64f 100644 --- a/src/servers/src/grpc/flight.rs +++ b/src/servers/src/grpc/flight.rs @@ -276,6 +276,7 @@ impl FlightCraft for GreptimeRequestHandler { pub struct PutRecordBatchRequest { pub table_name: TableName, pub request_id: i64, + pub timestamp_range: Option<(i64, i64)>, pub record_batch: DfRecordBatch, pub schema_bytes: Bytes, pub flight_data: FlightData, @@ -287,6 +288,7 @@ impl PutRecordBatchRequest { table_name: TableName, record_batch: DfRecordBatch, request_id: i64, + timestamp_range: Option<(i64, i64)>, schema_bytes: Bytes, flight_data: FlightData, limiter: Option<&ServerMemoryLimiter>, @@ -311,6 +313,7 @@ impl PutRecordBatchRequest { Ok(Self { table_name, request_id, + timestamp_range, record_batch, schema_bytes, flight_data, @@ -475,14 +478,18 @@ impl Stream for PutRecordBatchRequestStream { schema_bytes, decoder, } => { - // Extract request_id and body_size from FlightData before decoding - let request_id = if !flight_data.app_metadata.is_empty() { + // Extract request_id and time range from FlightData before decoding + let metadata = if !flight_data.app_metadata.is_empty() { serde_json::from_slice::(&flight_data.app_metadata) - .map(|meta| meta.request_id()) - .unwrap_or_default() + .ok() } else { - 0 + None }; + let request_id = metadata + .as_ref() + .map(|meta| meta.request_id()) + .unwrap_or_default(); + let timestamp_range = metadata.and_then(|meta| meta.timestamp_range()); // Decode FlightData to RecordBatch match decoder.try_decode(&flight_data) { @@ -494,6 +501,7 @@ impl Stream for PutRecordBatchRequestStream { table_name, record_batch, request_id, + timestamp_range, schema_bytes, flight_data, limiter.as_ref(),