feat: disable compression for do_get API (#6254)

* feat/disable-flight-compression:
 ### Commit Summary

 - **Add Compression Control in Flight Encoder**: Introduced a new method `with_compression_disabled` in `FlightEncoder` to allow encoding without compression in `flight.rs`.
 - **Update Flight Stream Initialization**: Modified `FlightRecordBatchStream` to use the new `FlightEncoder::with_compression_disabled` method for initializing the encoder in `stream.rs`.

* feat/disable-flight-compression:
 Remove Unused Import in `flight.rs`

 - Removed the unused import `write_message` from `flight.rs` to clean up the codebase.

* feat/disable-flight-compression:
 ### Disable Compression in Flight Encoder

 - Updated `tests-integration/src/grpc/flight.rs` to use `FlightEncoder::with_compression_disabled()` instead of `FlightEncoder::default()` for encoding `FlightMessage::Schema` and `FlightMessage::RecordBatch`. This change disables compression in the Flight encoder for these operations.

Signed-off-by: Lei, HUANG <lhuang@greptime.com>

* Signed-off-by: Lei, HUANG <lhuang@greptime.com>

* disable flight client compression

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Lei, HUANG <lhuang@greptime.com>
Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
Co-authored-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
Lei, HUANG
2025-06-09 11:02:28 +08:00
committed by GitHub
parent d3d233257d
commit c50e84095e
4 changed files with 18 additions and 9 deletions

View File

@@ -167,9 +167,7 @@ impl Client {
let client = FlightServiceClient::new(channel)
.max_decoding_message_size(self.max_grpc_recv_message_size())
.max_encoding_message_size(self.max_grpc_send_message_size())
.accept_compressed(CompressionEncoding::Zstd)
.send_compressed(CompressionEncoding::Zstd);
.max_encoding_message_size(self.max_grpc_send_message_size());
Ok(FlightClient { addr, client })
}
@@ -178,9 +176,7 @@ impl Client {
let (addr, channel) = self.find_channel()?;
let client = PbRegionClient::new(channel)
.max_decoding_message_size(self.max_grpc_recv_message_size())
.max_encoding_message_size(self.max_grpc_send_message_size())
.accept_compressed(CompressionEncoding::Zstd)
.send_compressed(CompressionEncoding::Zstd);
.max_encoding_message_size(self.max_grpc_send_message_size());
Ok((addr, client))
}

View File

@@ -64,6 +64,18 @@ impl Default for FlightEncoder {
}
impl FlightEncoder {
pub fn with_compression_disabled() -> Self {
let write_options = writer::IpcWriteOptions::default()
.try_with_compression(None)
.unwrap();
Self {
write_options,
data_gen: writer::IpcDataGenerator::default(),
dictionary_tracker: writer::DictionaryTracker::new(false),
}
}
pub fn encode(&mut self, flight_message: FlightMessage) -> FlightData {
match flight_message {
FlightMessage::Schema(schema) => SchemaAsIpc::new(&schema, &self.write_options).into(),

View File

@@ -52,7 +52,7 @@ impl FlightRecordBatchStream {
rx,
join_handle,
done: false,
encoder: FlightEncoder::default(),
encoder: FlightEncoder::with_compression_disabled(),
}
}

View File

@@ -139,7 +139,8 @@ mod test {
let schema = record_batches[0].schema.arrow_schema().clone();
let stream = futures::stream::once(async move {
let mut schema_data = FlightEncoder::default().encode(FlightMessage::Schema(schema));
let mut schema_data =
FlightEncoder::with_compression_disabled().encode(FlightMessage::Schema(schema));
let metadata = DoPutMetadata::new(0);
schema_data.app_metadata = serde_json::to_vec(&metadata).unwrap().into();
// first message in "DoPut" stream should carry table name in flight descriptor
@@ -154,7 +155,7 @@ mod test {
tokio_stream::iter(record_batches)
.enumerate()
.map(|(i, x)| {
let mut encoder = FlightEncoder::default();
let mut encoder = FlightEncoder::with_compression_disabled();
let message = FlightMessage::RecordBatch(x.into_df_record_batch());
let mut data = encoder.encode(message);
let metadata = DoPutMetadata::new((i + 1) as i64);