From c50e84095e4022317f4aa560968d5e5e263454d7 Mon Sep 17 00:00:00 2001 From: "Lei, HUANG" <6406592+v0y4g3r@users.noreply.github.com> Date: Mon, 9 Jun 2025 11:02:28 +0800 Subject: [PATCH] feat: disable compression for `do_get` API (#6254) * feat/disable-flight-compression: ### Commit Summary - **Add Compression Control in Flight Encoder**: Introduced a new method `with_compression_disabled` in `FlightEncoder` to allow encoding without compression in `flight.rs`. - **Update Flight Stream Initialization**: Modified `FlightRecordBatchStream` to use the new `FlightEncoder::with_compression_disabled` method for initializing the encoder in `stream.rs`. * feat/disable-flight-compression: Remove Unused Import in `flight.rs` - Removed the unused import `write_message` from `flight.rs` to clean up the codebase. * feat/disable-flight-compression: ### Disable Compression in Flight Encoder - Updated `tests-integration/src/grpc/flight.rs` to use `FlightEncoder::with_compression_disabled()` instead of `FlightEncoder::default()` for encoding `FlightMessage::Schema` and `FlightMessage::RecordBatch`. This change disables compression in the Flight encoder for these operations. Signed-off-by: Lei, HUANG * Signed-off-by: Lei, HUANG * disable flight client compression Signed-off-by: Ruihang Xia --------- Signed-off-by: Lei, HUANG Signed-off-by: Ruihang Xia Co-authored-by: Ruihang Xia --- src/client/src/client.rs | 8 ++------ src/common/grpc/src/flight.rs | 12 ++++++++++++ src/servers/src/grpc/flight/stream.rs | 2 +- tests-integration/src/grpc/flight.rs | 5 +++-- 4 files changed, 18 insertions(+), 9 deletions(-) diff --git a/src/client/src/client.rs b/src/client/src/client.rs index 429944d329..a604bf5124 100644 --- a/src/client/src/client.rs +++ b/src/client/src/client.rs @@ -167,9 +167,7 @@ impl Client { let client = FlightServiceClient::new(channel) .max_decoding_message_size(self.max_grpc_recv_message_size()) - .max_encoding_message_size(self.max_grpc_send_message_size()) - .accept_compressed(CompressionEncoding::Zstd) - .send_compressed(CompressionEncoding::Zstd); + .max_encoding_message_size(self.max_grpc_send_message_size()); Ok(FlightClient { addr, client }) } @@ -178,9 +176,7 @@ impl Client { let (addr, channel) = self.find_channel()?; let client = PbRegionClient::new(channel) .max_decoding_message_size(self.max_grpc_recv_message_size()) - .max_encoding_message_size(self.max_grpc_send_message_size()) - .accept_compressed(CompressionEncoding::Zstd) - .send_compressed(CompressionEncoding::Zstd); + .max_encoding_message_size(self.max_grpc_send_message_size()); Ok((addr, client)) } diff --git a/src/common/grpc/src/flight.rs b/src/common/grpc/src/flight.rs index 3d4d94dc60..0b83db9cfd 100644 --- a/src/common/grpc/src/flight.rs +++ b/src/common/grpc/src/flight.rs @@ -64,6 +64,18 @@ impl Default for FlightEncoder { } impl FlightEncoder { + pub fn with_compression_disabled() -> Self { + let write_options = writer::IpcWriteOptions::default() + .try_with_compression(None) + .unwrap(); + + Self { + write_options, + data_gen: writer::IpcDataGenerator::default(), + dictionary_tracker: writer::DictionaryTracker::new(false), + } + } + pub fn encode(&mut self, flight_message: FlightMessage) -> FlightData { match flight_message { FlightMessage::Schema(schema) => SchemaAsIpc::new(&schema, &self.write_options).into(), diff --git a/src/servers/src/grpc/flight/stream.rs b/src/servers/src/grpc/flight/stream.rs index bd9ebe1762..fa3aaf5061 100644 --- a/src/servers/src/grpc/flight/stream.rs +++ b/src/servers/src/grpc/flight/stream.rs @@ -52,7 +52,7 @@ impl FlightRecordBatchStream { rx, join_handle, done: false, - encoder: FlightEncoder::default(), + encoder: FlightEncoder::with_compression_disabled(), } } diff --git a/tests-integration/src/grpc/flight.rs b/tests-integration/src/grpc/flight.rs index 3d16466f28..ddef4a79f3 100644 --- a/tests-integration/src/grpc/flight.rs +++ b/tests-integration/src/grpc/flight.rs @@ -139,7 +139,8 @@ mod test { let schema = record_batches[0].schema.arrow_schema().clone(); let stream = futures::stream::once(async move { - let mut schema_data = FlightEncoder::default().encode(FlightMessage::Schema(schema)); + let mut schema_data = + FlightEncoder::with_compression_disabled().encode(FlightMessage::Schema(schema)); let metadata = DoPutMetadata::new(0); schema_data.app_metadata = serde_json::to_vec(&metadata).unwrap().into(); // first message in "DoPut" stream should carry table name in flight descriptor @@ -154,7 +155,7 @@ mod test { tokio_stream::iter(record_batches) .enumerate() .map(|(i, x)| { - let mut encoder = FlightEncoder::default(); + let mut encoder = FlightEncoder::with_compression_disabled(); let message = FlightMessage::RecordBatch(x.into_df_record_batch()); let mut data = encoder.encode(message); let metadata = DoPutMetadata::new((i + 1) as i64);