mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-05-14 03:50:39 +00:00
fix: expose flight timestamp range metadata (#8104)
* fix: expose flight timestamp range metadata - Flight metadata: expose `min_timestamp` and `max_timestamp` from `src/common/grpc/src/flight/do_put.rs` and `src/servers/src/grpc/flight.rs` so callers can decide fast paths. Signed-off-by: Lei, HUANG <mrsatangel@gmail.com> * chore: address pr comments Signed-off-by: Lei, HUANG <mrsatangel@gmail.com> --------- Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>
This commit is contained in:
@@ -55,6 +55,12 @@ impl DoPutMetadata {
|
||||
pub fn max_timestamp(&self) -> Option<i64> {
|
||||
self.max_timestamp
|
||||
}
|
||||
|
||||
pub fn timestamp_range(&self) -> Option<(i64, i64)> {
|
||||
self.min_timestamp
|
||||
.zip(self.max_timestamp)
|
||||
.filter(|(min, max)| max >= min)
|
||||
}
|
||||
}
|
||||
|
||||
/// The response in the "DoPut" returned stream.
|
||||
@@ -109,6 +115,14 @@ mod tests {
|
||||
assert_eq!(metadata.request_id(), 42);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_serde_do_put_metadata_with_timestamp_range() {
|
||||
let serialized = r#"{"request_id":42,"min_timestamp":1000,"max_timestamp":2000}"#;
|
||||
let metadata = serde_json::from_str::<DoPutMetadata>(serialized).unwrap();
|
||||
assert_eq!(metadata.request_id(), 42);
|
||||
assert_eq!(metadata.timestamp_range(), Some((1000, 2000)));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_serde_do_put_response() {
|
||||
let x = DoPutResponse::new(42, 88, 0.123);
|
||||
|
||||
@@ -276,6 +276,7 @@ impl FlightCraft for GreptimeRequestHandler {
|
||||
pub struct PutRecordBatchRequest {
|
||||
pub table_name: TableName,
|
||||
pub request_id: i64,
|
||||
pub timestamp_range: Option<(i64, i64)>,
|
||||
pub record_batch: DfRecordBatch,
|
||||
pub schema_bytes: Bytes,
|
||||
pub flight_data: FlightData,
|
||||
@@ -287,6 +288,7 @@ impl PutRecordBatchRequest {
|
||||
table_name: TableName,
|
||||
record_batch: DfRecordBatch,
|
||||
request_id: i64,
|
||||
timestamp_range: Option<(i64, i64)>,
|
||||
schema_bytes: Bytes,
|
||||
flight_data: FlightData,
|
||||
limiter: Option<&ServerMemoryLimiter>,
|
||||
@@ -311,6 +313,7 @@ impl PutRecordBatchRequest {
|
||||
Ok(Self {
|
||||
table_name,
|
||||
request_id,
|
||||
timestamp_range,
|
||||
record_batch,
|
||||
schema_bytes,
|
||||
flight_data,
|
||||
@@ -475,14 +478,18 @@ impl Stream for PutRecordBatchRequestStream {
|
||||
schema_bytes,
|
||||
decoder,
|
||||
} => {
|
||||
// Extract request_id and body_size from FlightData before decoding
|
||||
let request_id = if !flight_data.app_metadata.is_empty() {
|
||||
// Extract request_id and time range from FlightData before decoding
|
||||
let metadata = if !flight_data.app_metadata.is_empty() {
|
||||
serde_json::from_slice::<DoPutMetadata>(&flight_data.app_metadata)
|
||||
.map(|meta| meta.request_id())
|
||||
.unwrap_or_default()
|
||||
.ok()
|
||||
} else {
|
||||
0
|
||||
None
|
||||
};
|
||||
let request_id = metadata
|
||||
.as_ref()
|
||||
.map(|meta| meta.request_id())
|
||||
.unwrap_or_default();
|
||||
let timestamp_range = metadata.and_then(|meta| meta.timestamp_range());
|
||||
|
||||
// Decode FlightData to RecordBatch
|
||||
match decoder.try_decode(&flight_data) {
|
||||
@@ -494,6 +501,7 @@ impl Stream for PutRecordBatchRequestStream {
|
||||
table_name,
|
||||
record_batch,
|
||||
request_id,
|
||||
timestamp_range,
|
||||
schema_bytes,
|
||||
flight_data,
|
||||
limiter.as_ref(),
|
||||
|
||||
Reference in New Issue
Block a user